immutable = require('immutable')
underscore = require('underscore')
syncstring = require('./syncstring')
misc = require('./misc')
{required, defaults} = misc
{EventEmitter} = require('events')
json_stable = require('json-stable-stringify')
to_key = (s) ->
if immutable.Map.isMap(s)
s = s.toJS()
return json_stable(s)
exports.db_doc = (opts) ->
opts = defaults opts,
primary_keys : required
string_cols : []
if not misc.is_array(opts.primary_keys)
throw Error("primary_keys must be an array")
if not misc.is_array(opts.string_cols)
throw Error("_string_cols must be an array")
return new DBDoc(opts.primary_keys, opts.string_cols)
exports.from_obj = (opts) ->
opts = defaults opts,
obj : required
primary_keys : required
string_cols : []
if not misc.is_array(opts.obj)
throw Error("obj must be an array")
records = immutable.fromJS(opts.obj)
return new DBDoc(opts.primary_keys, opts.string_cols, records)
exports.from_str = (opts) ->
opts = defaults opts,
str : required
primary_keys : required
string_cols : []
if not misc.is_string(opts.str)
throw Error("obj must be a string")
obj = []
for line in opts.str.split('\n')
if line.length > 0
try
obj.push(misc.from_json(line))
catch e
console.warn("CORRUPT db-doc string: #{e} -- skipping '#{line}'")
return exports.from_obj(obj:obj, primary_keys:opts.primary_keys, string_cols:opts.string_cols)
merge_set = (obj, change) ->
change.map (v, k) ->
if v == null or not v?
obj = obj.delete(k)
else
obj = obj.set(k, v)
return
return obj
map_merge_patch = (obj1, obj2) ->
change = {}
for key, val1 of obj1
val2 = obj2[key]
if underscore.isEqual(val1, val2)
else if not val2?
change[key] = null
else
change[key] = val2
for key, val2 of obj2
if obj1[key]?
continue
change[key] = val2
return change
nonnull_cols = (f) ->
return f.filter((v,k) => v != null)
class DBDoc
constructor: (@_primary_keys, @_string_cols, @_records, @_everything, @_indexes, @_changes) ->
@_primary_keys = @_process_cols(@_primary_keys)
@_string_cols = @_process_cols(@_string_cols)
@_records ?= immutable.List()
@_everything ?= immutable.Set((n for n in [0...@_records.size] when @_records.get(n)?)).sort()
if not @_indexes?
@_indexes = immutable.Map()
for field of @_primary_keys
@_indexes = @_indexes.set(field, immutable.Map())
n = 0
@_records.map (record, n) =>
@_indexes.map (index, field) =>
val = record.get(field)
if val?
k = to_key(val)
matches = index.get(k)
if matches?
matches = matches.add(n).sort()
else
matches = immutable.Set([n])
@_indexes = @_indexes.set(field, index.set(k, matches))
return
return
@size = @_everything.size
if not @_changes?
@reset_changes()
reset_changes: =>
@_changes = {changes: immutable.Set(), from_db:@}
changes: =>
return @_changes
_primary_key_cols: (f) =>
return f.filter((v, k) => @_primary_keys[k])
_process_cols: (v) =>
if misc.is_array(v)
p = {}
for field in v
p[field] = true
return p
else if not misc.is_object(v)
throw Error("primary_keys must be a map or array")
return v
_select: (where) =>
if immutable.Map.isMap(where)
where = where.toJS()
len = misc.len(where)
result = undefined
for field, value of where
index = @_indexes.get(field)
if not index?
throw Error("field '#{field}' must be a primary key")
v = index.get(to_key(value))
if not v?
return immutable.Set()
if len == 1
return v
if result?
result = result.intersect(v)
else
result = v
if not result?
return @_everything
else
return result
_parse: (obj) =>
if immutable.Map.isMap(obj)
obj = obj.toJS()
if not misc.is_object(obj)
throw Error("obj must be a Javascript object")
where = {}
set = {}
for field, val of obj
if @_primary_keys[field]?
if val?
where[field] = val
else
set[field] = val
return {where:where, set:set, obj:obj}
set: (obj) =>
if misc.is_array(obj)
z = @
for x in obj
z = z.set(x)
return z
{where, set, obj} = @_parse(obj)
matches = @_select(where)
{changes} = @_changes
n = matches?.first()
if n?
before = record = @_records.get(n)
for field, value of set
if value == null
record = record.delete(field)
else
if @_string_cols[field] and misc.is_array(value)
record = record.set(field, syncstring.apply_patch(value, before.get(field) ? '')[0])
else
cur = record.get(field)
change = immutable.fromJS(value)
if immutable.Map.isMap(cur) and immutable.Map.isMap(change)
new_val = merge_set(cur, change)
else
new_val = change
record = record.set(field, new_val)
if not before.equals(record)
changes = changes.add(@_primary_key_cols(record))
return new DBDoc(@_primary_keys, @_string_cols, @_records.set(n, record), @_everything, @_indexes, {changes:changes, from_db:@_changes.from_db})
else
return @
else
for field of @_string_cols
if obj[field]? and misc.is_array(obj[field])
obj = misc.copy_without(obj, field)
record = nonnull_cols(immutable.fromJS(obj))
changes = changes.add(@_primary_key_cols(record))
records = @_records.push(record)
n = records.size - 1
everything = @_everything.add(n)
indexes = @_indexes
for field of @_primary_keys
val = obj[field]
if val? and val != null
index = indexes.get(field) ? immutable.Map()
k = to_key(val)
matches = index.get(k)
if matches?
matches = matches.add(n).sort()
else
matches = immutable.Set([n])
indexes = indexes.set(field, index.set(k, matches))
return new DBDoc(@_primary_keys, @_string_cols, records, everything, indexes, {changes:changes, from_db:@_changes.from_db})
delete: (where) =>
if misc.is_array(where)
z = @
for x in where
z = z.delete(x)
return z
if @_everything.size == 0
return @
{changes} = @_changes
remove = @_select(where)
if remove.size == @_everything.size
changes = changes.union(@_records.filter((record)=>record?).map(@_primary_key_cols))
return new DBDoc(@_primary_keys, @_string_cols, undefined, undefined, undefined, {changes:changes, from_db:@_changes.from_db})
indexes = @_indexes
for field of @_primary_keys
index = indexes.get(field)
if not index?
continue
remove.map (n) =>
record = @_records.get(n)
val = record.get(field)
if val?
k = to_key(val)
matches = index.get(k).delete(n)
if matches.size == 0
index = index.delete(k)
else
index = index.set(k, matches)
indexes = indexes.set(field, index)
return
records = @_records
remove.map (n) =>
changes = changes.add(@_primary_key_cols(records.get(n)))
records = records.set(n, undefined)
everything = @_everything.subtract(remove)
return new DBDoc(@_primary_keys, @_string_cols, records, everything, indexes, {changes:changes, from_db:@_changes.from_db})
get: (where) =>
matches = @_select(where)
if not matches?
return immutable.List()
return @_records.filter((x,n)->matches.includes(n))
get_one: (where) =>
matches = @_select(where)
if not matches?
return
return @_records.get(matches.first())
equals: (other) =>
if @_records == other._records
return true
if @size != other.size
return false
return immutable.Set(@_records).add(undefined).equals(immutable.Set(other._records).add(undefined))
to_obj: =>
return @get().toJS()
to_str: =>
if @_to_str_cache?
return @_to_str_cache
return @_to_str_cache = (misc.to_json(x) for x in @to_obj()).join('\n')
_primary_key_part: (x) =>
where = {}
for k, v of x
if @_primary_keys[k]
where[k] = v
return where
make_patch: (other) =>
if other.size == 0
return [-1,[{}]]
t0 = immutable.Set(@_records)
t1 = immutable.Set(other._records)
common = t0.intersect(t1).add(undefined)
t0 = t0.subtract(common)
t1 = t1.subtract(common)
if t0.size == 0
return [1, t1.toJS()]
if t1.size == 0
v = []
t0.map (x) =>
v.push(@_primary_key_part(x.toJS()))
return
return [-1, v]
k0 = t0.map(@_primary_key_cols)
k1 = t1.map(@_primary_key_cols)
add = []
remove = undefined
deletes = k0.subtract(k1)
if deletes.size > 0
remove = deletes.toJS()
inserts = k1.subtract(k0)
if inserts.size > 0
inserts.map (k) =>
add.push(other.get_one(k.toJS()).toJS())
return
changed = k1.intersect(k0)
if changed.size > 0
changed.map (k) =>
obj = k.toJS()
obj0 = @_primary_key_part(obj)
from = @get_one(obj0).toJS()
to = other.get_one(obj0).toJS()
for k of from
if not to[k]?
obj[k] = null
for k, v of to
if not underscore.isEqual(from[k], v)
if @_string_cols[k] and from[k]? and v?
obj[k] = syncstring.make_patch(from[k], v)
else if misc.is_object(from[k]) and misc.is_object(v)
obj[k] = map_merge_patch(from[k], v)
else
obj[k] = v
add.push(obj)
return
patch = []
if remove?
patch.push(-1)
patch.push(remove)
if add.length > 0
patch.push(1)
patch.push(add)
return patch
apply_patch: (patch) =>
i = 0
db = @
while i < patch.length
if patch[i] == -1
db = db.delete(patch[i+1])
else if patch[i] == 1
db = db.set(patch[i+1])
i += 2
return db
changed_keys: (other) =>
if @_records == other?._records
return immutable.Set()
t0 = immutable.Set(@_records).filter((x) -> x?)
if not other?
return t0.map(@_primary_key_cols)
t1 = immutable.Set(other._records).filter((x) -> x?)
common = t0.intersect(t1)
t0 = t0.subtract(common)
t1 = t1.subtract(common)
k0 = t0.map(@_primary_key_cols)
k1 = t1.map(@_primary_key_cols)
return k0.union(k1)
class Doc
constructor: (@_db) ->
if not @_db?
throw Error("@_db must be defined")
to_str: =>
return @_db.to_str()
is_equal: (other) =>
return @_db.equals(other._db)
apply_patch: (patch) =>
return new Doc(@_db.apply_patch(patch))
make_patch: (other) =>
if not @_db? or not other?._db?
return
return @_db.make_patch(other._db)
changes: =>
return @_db.changes()
reset_changes: =>
@_db.reset_changes()
return
get: (where) =>
return @_db?.get(where)
get_one: (where) =>
return @_db?.get_one(where)
class SyncDoc extends syncstring.SyncDoc
constructor: (opts) ->
opts = defaults opts,
client : required
project_id : undefined
path : undefined
save_interval : undefined
patch_interval : undefined
file_use_interval : undefined
cursors : false
primary_keys : required
string_cols : []
from_str = (str) ->
db = exports.from_str
str : str
primary_keys : opts.primary_keys
string_cols : opts.string_cols
return new Doc(db)
super
string_id : opts.id
client : opts.client
project_id : opts.project_id
path : opts.path
save_interval : opts.save_interval
patch_interval : opts.patch_interval
file_use_interval : opts.file_use_interval
cursors : opts.cursors
from_str : from_str
doctype :
type : 'db'
patch_format : 1
opts :
primary_keys : opts.primary_keys
string_cols : opts.string_cols
class exports.SyncDB extends EventEmitter
constructor: (opts) ->
@_path = opts.path
if opts.change_throttle
@_on_change = underscore.throttle(@_on_change, opts.change_throttle)
delete opts.change_throttle
@_doc = new SyncDoc(opts)
@_first_change_event = true
@_doc.on('change', @_on_change)
@_doc.on('metadata-change', => @emit('metadata-change'))
@_doc.on('before-change', => @emit('before-change'))
@_doc.on('sync', => @emit('sync'))
if opts.cursors
@_doc.on('cursor_activity', (args...) => @emit('cursor_activity', args...))
@_doc.on('connected', => @emit('connected'))
@_doc.on('init', (err) => @emit('init', err))
@_doc.on('save_to_disk_project', (err) => @emit('save_to_disk_project', err))
@setMaxListeners(100)
_check: =>
if not @_doc?
throw Error("SyncDB('#{@_path}') is closed")
has_unsaved_changes: =>
@_check()
return @_doc.has_unsaved_changes()
has_uncommitted_changes: =>
@_check()
return @_doc.has_uncommitted_changes()
is_read_only: =>
@_check()
return @_doc.get_read_only()
_on_change: =>
if not @_doc?
return
db = @_doc.get_doc()._db
if not @_last_db?
changes = db.changed_keys()
else
{changes, from_db} = @_doc.get_doc().changes()
@_doc.get_doc().reset_changes()
if from_db != @_last_db
changes = db.changed_keys(@_last_db)
if changes.size > 0 or @_first_change_event
@emit('change', changes)
@_last_db = db
delete @_first_change_event
close: () =>
if not @_doc?
return
@removeAllListeners()
@_doc?.close()
delete @_doc
is_closed: =>
return not @_doc?
sync: (cb) =>
@_check()
@_doc.save(cb)
return
save: (cb) =>
@_check()
@_doc.save_to_disk(cb)
return
save_asap: (cb) =>
@_check()
@_doc.save_asap(cb)
return
set_doc: (value) =>
@_check()
@_doc.set_doc(value)
return
get_doc: () =>
@_check()
return @_doc.get_doc()
get_path: =>
@_check()
return @_doc.get_path()
get_project_id: =>
return @_doc.get_project_id()
set: (obj, save=true) =>
if not @_doc?
return
@_doc.set_doc(new Doc(@_doc.get_doc()._db.set(obj)))
if save
@_doc.save()
@_on_change()
return
get: (where, time) =>
if not @_doc?
return immutable.List()
if time?
d = @_doc.version(time)
else
d = @_doc.get_doc()
if not d?
return
return d._db.get(where)
get_one: (where, time) =>
if not @_doc?
return
if time?
d = @_doc.version(time)
else
d = @_doc.get_doc()
if not d?
return
return d._db.get_one(where)
delete: (where, save=true) =>
if not @_doc?
return
d = @_doc.get_doc()
if not d?
return
@_doc.set_doc(new Doc(d._db.delete(where)))
if save
@_doc.save()
@_on_change()
return
versions: =>
@_check()
return @_doc.versions()
last_changed: =>
@_check()
return @_doc.last_changed()
all_versions: =>
@_check()
return @_doc.all_versions()
version: (t) =>
@_check()
return @_doc.version(t)
account_id: (t) =>
@_check()
return @_doc.account_id(t)
time_sent: (t) =>
@_check()
return @_doc.time_sent(t)
show_history: (opts) =>
@_check()
return @_doc.show_history(opts)
has_full_history: =>
@_check()
return @_doc.has_full_history()
load_full_history: (cb) =>
@_check()
@_doc.load_full_history(cb)
wait_until_read_only_known: (cb) =>
@_check()
return @_doc.wait_until_read_only_known(cb)
get_read_only: =>
@_check()
return @_doc.get_read_only()
count: =>
@_check()
return @_doc.get_doc()._db.size
undo: =>
@_check()
@_doc.set_doc(@_doc.undo())
@_doc.save()
@_on_change()
return
redo: =>
@_check()
@_doc.set_doc(@_doc.redo())
@_doc.save()
@_on_change()
return
exit_undo_mode: =>
@_check()
@_doc.exit_undo_mode()
in_undo_mode: =>
@_check()
return @_doc.in_undo_mode()
revert: (version) =>
@_check()
@_doc.revert(version)
@_doc.save()
return
set_cursor_locs: (locs) =>
@_check()
@_doc.set_cursor_locs(locs)
return
get_cursors: =>
return @_doc?.get_cursors()
exports.open_existing_sync_document = (opts) ->
opts = defaults opts,
client : required
project_id : required
path : required
cb : required
opts.client.query
query :
syncstrings:
project_id : opts.project_id
path : opts.path
doctype : null
cb: (err, resp) ->
if err
opts.cb(err)
return
if resp.event == 'error'
opts.cb(resp.error)
return
if not resp.query?.syncstrings?
opts.cb("no document '#{opts.path}' in project '#{opts.project_id}'")
return
doctype = JSON.parse(resp.query.syncstrings.doctype ? '{"type":"string"}')
opts2 =
project_id : opts.project_id
path : opts.path
if doctype.opts?
opts2 = misc.merge(opts2, doctype.opts)
doc = opts.client["sync_#{doctype.type}"](opts2)
opts.cb(undefined, doc)