###1CoCalc, Copyright (C) 2016, Sagemath Inc.23This program is free software: you can redistribute it and/or modify4it under the terms of the GNU General Public License as published by5the Free Software Foundation, either version 3 of the License, or6(at your option) any later version.78---910SYNCHRONIZED TABLE -- defined by an object query1112- Do a query against a PostgreSQL table using our object query description.13- Synchronization with the backend database is done automatically.1415Methods:16- constructor(query): query = the name of a table (or a more complicated object)1718- set(map): Set the given keys of map to their values; one key must be19the primary key for the table. NOTE: Computed primary keys will20get automatically filled in; these are keys in schema.coffee,21where the set query looks like this say:22(obj, db) -> db.sha1(obj.project_id, obj.path)23- get(): Current value of the query, as an immutable.js Map from24the primary key to the records, which are also immutable.js Maps.25- get(key): The record with given key, as an immutable Map.26- get(keys): Immutable Map from given keys to the corresponding records.27- get_one(): Returns one record as an immutable Map (useful if there28is only one record)2930- close(): Frees up resources, stops syncing, don't use object further3132Events:33- 'before-change': fired right before (and in the same event loop) actually34applying remote incoming changes35- 'change', [array of string primary keys] : fired any time the value of the query result36changes, *including* if changed by calling set on this object.37Also, called with empty list on first connection if there happens38to be nothing in this table. If the primary key is not a string it is39converted to a JSON string.40- 'disconnected': fired when table is disconnected from the server for some reason41- 'connected': fired when table has successfully connected and finished initializing42and is ready to use43- 'saved', [array of saved objects]: fired after confirmed successful save of objects to backend4445STATES:4647A SyncTable is a finite state machine as follows:4849-------------------<------------------50\|/ |51[connecting] --> [connected] --> [disconnected] --> [reconnecting]5253Also, there is a final state called 'closed', that the SyncTable moves to when54it will not be used further; this frees up all connections and used memory.55The table can't be used after it is closed. The only way to get to the56closed state is to explicitly call close() on the table; otherwise, the57table will keep attempting to connect and work, until it works.5859(anything) --> [closed]60616263- connecting -- connecting to the backend, and have never connected before.6465- connected -- successfully connected to the backend, initialized, and receiving updates.6667- disconnected -- table was successfully initialized, but the network connection68died. Can still takes writes, but they will never try to save to69the backend. Waiting to reconnect when user connects back to the backend.7071- reconnecting -- client just reconnected to the backend, so this table is now trying72to get the full current state of the table and initialize a changefeed.7374- closed -- table is closed, and memory/connections used by the table is freed.757677WORRY: what if the user does a set and connecting (or reconnecting) takes a long time, e.g., suspend78a laptop, then resume? The changes may get saved... a month later. For some things, e.g., logs,79this could be fine. However, on reconnect, the first thing is that complete upstream state of80table is set on server version of table, so reconnecting user only sends its changes if upstream81hasn't changed anything in that same record.82###8384# if true, will log to the console a huge amount of info about every get/set85DEBUG = false8687exports.set_debug = (x) ->88DEBUG = !!x8990{EventEmitter} = require('events')91immutable = require('immutable')92async = require('async')93underscore = require('underscore')9495misc = require('./misc')96schema = require('./schema')9798{defaults, required} = misc99100# We represent synchronized tables by an immutable.js mapping from the primary101# key to the object. Since PostgresQL primary keys can be compound (more than102# just strings), e.g., they can be arrays, so we convert complicated keys to their103# JSON representation. A binary object doesn't make sense here in pure javascript,104# but these do:105# string, number, time, boolean, or array106# Everything automatically converts fine to a string except array, which is the107# main thing this function deals with below.108# NOTE (1) RIGHT NOW: This should be safe to change at109# any time, since the keys aren't stored longterm.110# If we do something with localStorage, this will no longer be safe111# without a version number.112# NOTE (2) Of course you could use both a string and an array as primary keys113# in the same table. You could evily make the string equal the json of an array,114# and this *would* break things. We are thus assuming that such mixing115# doesn't happen. An alternative would be to just *always* use a *stable* version of stringify.116# NOTE (3) we use a stable version, since otherwise things will randomly break if the117# key is an object.118119json_stable_stringify = require('json-stable-stringify')120121to_key = (x) ->122if typeof(x) == 'object'123return json_stable_stringify(x)124else125return x126127# Plug: Class to ensure that the SyncTable stays "plugged" into the hub, if at all possible.128# NOTE: I implemented this outside of SyncTable so that it would be much easier129# to reason about, and be sure the code is right.130class Plug131constructor : (opts) ->132@_opts = defaults opts,133name : 'plug' # Used only for debug logging134no_sign_in : required # True if sign is isn't required before connecting, e.g., anonymous synctable and project.135client : required # The client object, which provides:136# 'connected' and 'signed_in' events, and137# is_connected() and is_signed_in() functions.138connect : required # A function to call to create a connection; it should run as139# quickly as it can and call it's callback with an error if140# and only if it fails. It will definitely only be called141# once at a time, so no need to put in any sort of block.142@connect()143144dbg: (f) =>145#return @_opts.client.dbg("Plug('#{@_opts.name}').#{f}")146return =>147148# Keep trying until we connect - always succeeds if it terminates149connect: (cb) =>150dbg = @dbg('connect')151if @_is_connecting152dbg("already connecting")153return154@_is_connecting = true155dbg('')156misc.retry_until_success157f : @__try_to_connect_once158log : dbg159start_delay : 4000160max_delay : 20000161cb : =>162delete @_is_connecting163dbg("success!")164cb?()165166# Try to connect exactly once. cb gets error if and only if fails to connect.167__try_to_connect_once: (cb) =>168# timer for giving up on waiting to try to connect169give_up_timer = undefined170171# actually try to connect172do_connect = =>173if give_up_timer174clearInterval(give_up_timer)175@_opts.connect(cb)176177# Which event/condition has too be true before we even try to connect.178if @_opts.no_sign_in179event = 'connected'180else181event = 'signed_in'182183if @_opts.client["is_#{event}"]()184# The condition is satisfied, so try once to connect.185do_connect()186else187# Wait until condition is satisfied...188@_opts.client.once(event, do_connect)189# ... but don't wait forever, in case for some reason we miss190# the event (this can maybe rarely happen).191give_up = =>192@_opts.client.removeListener(event, do_connect)193cb("timeout")194timer = setTimeout(give_up, 5000+Math.random()*10000)195196class SyncTable extends EventEmitter197constructor: (@_query, @_options, @_client, @_debounce_interval, @_throttle_changes, @_cache_key) ->198@_init_query()199# The value of this query locally.200@_value_local = undefined201202# Our best guess as to the value of this query on the server,203# according to queries and updates the server pushes to us.204@_value_server = undefined205206# The changefeed id, when set by doing a change-feed aware query.207@_id = undefined208209# Not connected yet210@_state = 'disconnected' # disconnected <--> connected --> closed211@_created = new Date()212213@_plug = new Plug214name : @_table215client : @_client216connect : @_connect217no_sign_in : @_schema.anonymous or @_client.is_project() # note: projects don't have to authenticate218219@_client.on 'disconnected', =>220#console.log("synctable: DISCONNECTED")221# When the connection is dropped, the backend hub notices that it was dropped222# and immediately cancels all changefeeds. Thus we set @_id to undefined223# below, so that we don't redundantly cancel them again, which leads to an error224# and wastes resources (which can pile up).225@_id = undefined226@_disconnected('client disconnect')227228# No throttling of change events unless explicitly requested *or* part of the schema.229@_throttle_changes ?= schema.SCHEMA[@_table]?.user_query?.get?.throttle_changes230231if not @_throttle_changes232@emit_change = (changed_keys) => @emit('change', changed_keys)233else234# throttle emitting of change events235all_changed_keys = {}236do_emit_changes = =>237#console.log("#{@_table} -- emitting changes", misc.keys(all_changed_keys))238# CRITICAL: some code depends on emitting change even for the *empty* list of keys!239# E.g., projects page won't load for new users. This is the *change* from not240# loaded to being loaded, which does make sense.241@emit('change', misc.keys(all_changed_keys))242all_changed_keys = {}243do_emit_changes = underscore.throttle(do_emit_changes, @_throttle_changes)244@emit_change = (changed_keys) =>245#console.log("#{@_table} -- queue changes", changed_keys)246for key in changed_keys247all_changed_keys[key] = true248do_emit_changes()249250dbg: (f) =>251#return @_client.dbg("SyncTable('#{@_table}').#{f}")252return =>253254_connect: (cb) =>255dbg = @dbg("connect")256dbg()257if @_state == 'closed'258cb?('closed')259return260if @_state == 'connected'261cb?()262return263if @_id?264@_client.query_cancel(id:@_id)265@_id = undefined266267async.series([268(cb) =>269# 1. save, in case we have any local unsaved changes, then sync with upstream.270if @_value_local? and @_value_server?271@_save(cb)272else273cb()274(cb) =>275# 2. Now actually do the changefeed query.276@_reconnect(cb)277], cb)278279_reconnect: (cb) =>280dbg = @dbg("_run")281if @_state == 'closed'282dbg("closed so don't do anything ever again")283cb?()284return285first_resp = true286this_query_id = undefined287dbg("do the query")288@_client.query289query : @_query290changes : true291timeout : 30292options : @_options293cb : (err, resp) =>294295if @_state == 'closed'296# already closed so ignore anything else.297return298299if first_resp300dbg("query got ", err, resp)301first_resp = false302if @_state == 'closed'303cb?("closed")304else if resp?.event == 'query_cancel'305cb?("query-cancel")306else if err307cb?(err)308else if not resp?.query?[@_table]?309cb?("got no data")310else311# Successfully completed query312this_query_id = @_id = resp.id313@_state = 'connected'314@_update_all(resp.query[@_table])315@emit("connected", resp.query[@_table]) # ready to use!316cb?()317# Do any pending saves318for cb in @_connected_save_cbs ? []319@save(cb)320delete @_connected_save_cbs321else322if @_state != 'connected'323dbg("nothing to do -- ignore these, and make sure they stop")324if this_query_id?325@_client.query_cancel(id:this_query_id)326return327if err or resp?.event == 'query_cancel'328@_disconnected("err=#{err}, resp?.event=#{resp?.event}")329else330# Handle the update331@_update_change(resp)332333_disconnected: (why) =>334dbg = @dbg("_disconnected")335dbg("why=#{why}")336if @_state == 'disconnected'337dbg("already disconnected")338return339if @_id340@_client.query_cancel(id:@_id)341@_state = 'disconnected'342@_plug.connect() # start trying to connect again343344# Return string key used in the immutable map in which this table is stored.345key: (obj) =>346return @_key(obj)347348# Return true if there are changes to this synctable that349# have NOT been confirmed as saved to the backend database.350has_uncommitted_changes: () =>351if not @_value_server? and not @_value_local?352return false353if @_value_local? and not @_value_server?354return true355return not @_value_server.equals(@_value_local)356357get: (arg) =>358if not @_value_local?359return360if arg?361if misc.is_array(arg)362x = {}363for k in arg364x[to_key(k)] = @_value_local.get(to_key(k))365return immutable.fromJS(x)366else367return @_value_local.get(to_key(arg))368else369return @_value_local370371get_one: =>372return @_value_local?.toSeq().first()373374_parse_query: (query) =>375if typeof(query) == 'string'376# name of a table -- get all fields377v = misc.copy(schema.SCHEMA[query].user_query.get.fields)378for k, _ of v379v[k] = null380return {"#{query}": [v]}381else382keys = misc.keys(query)383if keys.length != 1384throw Error("must specify exactly one table")385table = keys[0]386x = {}387if not misc.is_array(query[table])388return {"#{table}": [query[table]]}389else390return {"#{table}": query[table]}391392_init_query: =>393# first parse the query to allow for some convenient shortcuts394@_query = @_parse_query(@_query)395396# Check that the query is probably valid, and record the table and schema397if misc.is_array(@_query)398throw Error("must be a single query")399tables = misc.keys(@_query)400if misc.len(tables) != 1401throw Error("must query only a single table")402@_table = tables[0]403if @_client.is_project()404@_client_query = schema.SCHEMA[@_table].project_query405else406@_client_query = schema.SCHEMA[@_table].user_query407if not misc.is_array(@_query[@_table])408throw Error("must be a multi-document queries")409@_schema = schema.SCHEMA[@_table]410if not @_schema?411throw Error("unknown schema for table #{@_table}")412@_primary_keys = schema.client_db.primary_keys(@_table)413# TODO: could put in more checks on validity of query here, using schema...414for primary_key in @_primary_keys415if not @_query[@_table][0][primary_key]?416# must include each primary key in query417@_query[@_table][0][primary_key] = null418# Function @_to_key to extract primary key from object419if @_primary_keys.length == 1420# very common case421pk = @_primary_keys[0]422@_key = (obj) =>423if not obj?424return425if immutable.Map.isMap(obj)426return to_key(obj.get(pk))427else428return to_key(obj[pk])429else430# compound primary key431@_key = (obj) =>432if not obj?433return434v = []435if immutable.Map.isMap(obj)436for pk in @_primary_keys437a = obj.get(pk)438if not a?439return440v.push(a)441else442for pk in @_primary_keys443a = obj[pk]444if not a?445return446v.push(a)447return to_key(v)448449# Which fields the user is allowed to set.450@_set_fields = []451# Which fields *must* be included in any set query452@_required_set_fields = {}453for field in misc.keys(@_query[@_table][0])454if @_client_query?.set?.fields?[field]?455@_set_fields.push(field)456if @_client_query?.set?.required_fields?[field]?457@_required_set_fields[field] = true458459# Is anonymous access to this table allowed?460@_anonymous = !!@_schema.anonymous461462# Return map from keys that have changed along with how they changed, or undefined463# if the value of local or the server hasn't been initialized464_changes: =>465if not @_value_server? or not @_value_local?466return467changed = {}468@_value_local.map (new_val, key) =>469old_val = @_value_server.get(key)470if not new_val.equals(old_val)471changed[key] = {new_val:new_val, old_val:old_val}472return changed473474_save: (cb) =>475if @__is_saving476cb?("already saving")477else478@__is_saving = true479@__save (err) =>480@__is_saving = false481cb?(err)482483__save: (cb) =>484if @_state == 'closed'485cb?("closed")486return487# console.log("_save('#{@_table}')")488# Determine which records have changed and what their new values are.489if not @_value_server?490cb?("don't know server yet")491return492if not @_value_local?493cb?("don't know local yet")494return495496if not @_client_query.set?497# Nothing to do -- can never set anything for this table.498# There are some tables (e.g., stats) where the remote values499# could change while user is offline, and the code below would500# result in warnings.501cb?()502return503504changed = @_changes()505at_start = @_value_local506507# Send our changes to the server.508query = []509saved_objs = []510# sort so that behavior is more predictable = faster (e.g., sync patches are in511# order); the keys are strings so default sort is fine512for key in misc.keys(changed).sort()513c = changed[key]514obj = {}515# NOTE: this may get replaced below with proper javascript, e.g., for compound primary key516if @_primary_keys.length == 1517obj[@_primary_keys[0]] = key518else519# unwrap compound primary key520v = JSON.parse(key)521i = 0522for primary_key in @_primary_keys523obj[primary_key] = v[i]524i += 1525526for k in @_set_fields527v = c.new_val.get(k)528if v?529if @_required_set_fields[k] or not immutable.is(v, c.old_val?.get(k))530if immutable.Iterable.isIterable(v)531obj[k] = v.toJS()532else533obj[k] = v534query.push({"#{@_table}":obj})535saved_objs.push(obj)536537# console.log("sending #{query.length} changes: #{misc.to_json(query)}")538if query.length == 0539cb?()540return541#console.log("query=#{misc.to_json(query)}")542#Use this to test fix_if_no_update_soon:543# if Math.random() <= .5544# query = []545#@_fix_if_no_update_soon() # -disabled -- instead use "checking changefeed ids".546@_client.query547query : query548options : [{set:true}] # force it to be a set query549timeout : 30550cb : (err) =>551if err552console.warn("_save('#{@_table}') error:", err)553if err == 'clock'554@_client.alert_message(type:'error', timeout:9999, message:"Your computer's clock is or was off! Fix it and **refresh your browser**.")555cb?(err)556else557if @_state == 'closed'558# this can happen in case synctable is closed after _save is called but before returning from this query.559cb?("closed")560return561if not @_value_server? or not @_value_local?562# There is absolutely no possible way this can happen, since it was563# checked for above before the call, and these can only get set by564# the close method to undefined, which also sets the @_state to closed,565# so would get caught above. However, evidently this **does happen**:566# https://github.com/sagemathinc/cocalc/issues/1870567cb?("value_server and value_local must be set")568return569@emit('saved', saved_objs)570# success: each change in the query what committed successfully to the database; we can571# safely set @_value_server (for each value) as long as it didn't change in the meantime.572for k, v of changed573if immutable.is(@_value_server.get(k), v.old_val) # immutable.is since either could be undefined574#console.log "setting @_value_server[#{k}] =", v.new_val?.toJS()575@_value_server = @_value_server.set(k, v.new_val)576if not at_start.equals(@_value_local)577# keep saving until @_value_local doesn't change *during* the save -- this means578# when saving stops that we guarantee there are no unsaved changes.579@_save(cb)580else581cb?()582583save: (cb) =>584if @_state == 'closed'585cb?("closed")586return587588if @_state != 'connected'589cb?("not connected") # do not change this error message; it is assumed elsewhere.590return591592@_save_debounce ?= {}593594if not @_value_server? or not @_value_local?595@_connected_save_cbs ?= []596@_connected_save_cbs.push(cb)597return598599misc.async_debounce600f : (cb) =>601misc.retry_until_success602f : @_save603max_delay : 5000604max_time : 30000605cb : cb606interval : @_debounce_interval607state : @_save_debounce608cb : cb609610# Handle an update of all records from the database. This happens on611# initialization, and also if we disconnect and reconnect.612_update_all: (v) =>613dbg = @dbg("_update_all")614615if @_state == 'closed'616# nothing to do -- just ignore updates from db617return618619if not v?620console.warn("_update_all('#{@_table}') called with v=undefined")621return622623@emit('before-change')624# Restructure the array of records in v as a mapping from the primary key625# to the corresponding record.626x = {}627for y in v628x[@_key(y)] = y629630conflict = false631632# Figure out what to change in our local view of the database query result.633if not @_value_local? or not @_value_server?634dbg("easy case -- nothing has been initialized yet, so just set everything.")635@_value_local = @_value_server = immutable.fromJS(x)636first_connect = true637changed_keys = misc.keys(x) # of course all keys have been changed.638else639dbg("harder case -- everything has already been initialized.")640changed_keys = []641642# DELETE or CHANGED:643# First check through each key in our local view of the query644# and if the value differs from what is in the database (i.e.,645# what we just got from DB), make that change.646# (Later we will possibly merge in the change647# using the last known upstream database state.)648@_value_local.map (local, key) =>649if x[key]?650# update value we have locally651if @_handle_new_val(x[key], changed_keys)652conflict = true653else654# This is a value defined locally that does not exist655# on the remote serve. It could be that the value656# was deleted when we weren't connected, in which case657# we should delete the value we have locally. On the658# other hand, maybe the local value was newly set659# while we weren't connected, so we know it but the660# backend server doesn't, which case we should keep it,661# and set conflict=true, so it gets saved to the backend.662663if @_value_local.get(key).equals(@_value_server.get(key))664# The local value for this key was saved to the backend before665# we got disconnected, so there's definitely no need to try666# keep it around, given that the backend no longer has it667# as part of the query. CRITICAL: This doesn't necessarily mean668# the value was deleted from the database, but instead that669# it doesn't satisfy the synctable query, e.g., it isn't one670# of the 150 most recent file_use notifications, or it isn't671# a patch that is at least as new as the newest snapshot.672#console.log("removing local value: #{key}")673@_value_local = @_value_local.delete(key)674changed_keys.push(key)675else676conflict = true677678# NEWLY ADDED:679# Next check through each key in what's on the remote database,680# and if the corresponding local key isn't defined, set its value.681# Here we are simply checking for newly added records.682for key, val of x683if not @_value_local.get(key)?684@_value_local = @_value_local.set(key, immutable.fromJS(val))685changed_keys.push(key)686687# It's possibly that nothing changed (e.g., typical case on reconnect!) so we check.688# If something really did change, we set the server state to what we just got, and689# also inform listeners of which records changed (by giving keys).690#console.log("update_all: changed_keys=", changed_keys)691if changed_keys.length != 0692@_value_server = immutable.fromJS(x)693@emit_change(changed_keys)694else if first_connect695# First connection and table is empty.696@emit_change(changed_keys)697if conflict698@save()699700# Apply one incoming change from the database to the in-memory701# local synchronized table702_update_change: (change) =>703#console.log("_update_change", change)704if @_state == 'closed'705# We might get a few more updates even after706# canceling the changefeed, so we just ignore them.707return708if not @_value_local?709console.warn("_update_change(#{@_table}): tried to call _update_change even though local not yet defined (ignoring)")710return711if not @_value_server?712console.warn("_update_change(#{@_table}): tried to call _update_change even though set not yet defined (ignoring)")713return714if DEBUG715console.log("_update_change('#{@_table}'): #{misc.to_json(change)}")716@emit('before-change')717changed_keys = []718conflict = false719if change.new_val?720conflict = @_handle_new_val(change.new_val, changed_keys)721722if change.old_val? and @_key(change.old_val) != @_key(change.new_val)723# Delete a record (TODO: untested)724key = @_key(change.old_val)725@_value_local = @_value_local.delete(key)726@_value_server = @_value_server.delete(key)727changed_keys.push(key)728729#console.log("update_change: changed_keys=", changed_keys)730if changed_keys.length > 0731#console.log("_update_change: change")732@emit_change(changed_keys)733if conflict734@save()735736_handle_new_val: (val, changed_keys) =>737key = @_key(val)738new_val = immutable.fromJS(val)739local_val = @_value_local.get(key)740conflict = false741if not new_val.equals(local_val)742#console.log("change table='#{@_table}': #{misc.to_json(local_val?.toJS())} --> #{misc.to_json(new_val.toJS())}") if @_table == 'patches'743if not local_val?744@_value_local = @_value_local.set(key, new_val)745changed_keys.push(key)746else747server = @_value_server.get(key)748# Set in @_value_local every key whose value changed between new_val and server; basically, we're749# determining and applying the "patch" from upstream, even though it was sent as a complete record.750# We can compute the patch, since we know the last server value.751new_val.map (v, k) =>752if not immutable.is(v, server?.get(k))753local_val = local_val.set(k, v)754#console.log("#{@_table}: set #{k} to #{v}")755server?.map (v, k) =>756if not new_val.has(k)757local_val = local_val.delete(k)758if not local_val.equals(@_value_local.get(key))759@_value_local = @_value_local.set(key, local_val)760changed_keys.push(key)761if not local_val.equals(new_val)762#console.log("#{@_table}: conflict! ", local_val, new_val) if @_table == 'patches'763@emit('conflict', {new_val:new_val, old_val:local_val})764conflict = true765@_value_server = @_value_server.set(key, new_val)766return conflict767768# obj is an immutable.js Map without the primary key769# set. If the database schema defines a way to compute770# the primary key from other keys, try to use it here.771# This function returns the computed primary key if it works,772# and returns undefined otherwise.773_computed_primary_key: (obj) =>774if @_primary_keys.length == 1775f = @_client_query.set.fields[@_primary_keys[0]]776if typeof(f) == 'function'777return f(obj.toJS(), schema.client_db)778else779return780else781v = []782for pk in @_primary_keys783f = @_client_query.set.fields[pk]784if typeof(f) == 'function'785v.push(f(obj.toJS(), schema.client_db))786else787return788return v789790# Changes (or creates) one entry in the table.791# The input field changes is either an Immutable.js Map or a JS Object map.792# If changes does not have the primary key then a random record is updated,793# and there *must* be at least one record. Exception: computed primary794# keys will be computed (see stuff about computed primary keys above).795# The second parameter 'merge' can be one of three values:796# 'deep' : (DEFAULT) deep merges the changes into the record, keep as much info as possible.797# 'shallow': shallow merges, replacing keys by corresponding values798# 'none' : do no merging at all -- just replace record completely799# The cb is called with cb(err) if something goes wrong.800# Returns the updated value.801set: (changes, merge, cb) =>802if @_state == 'closed'803# Attempting to set on a closed table is dangerous since any data set *will* be804# silently lost. So spit out a visible warning.805console.warn("WARNING: attempt to do a set on a closed table: '#{@_table}', #{misc.to_json(@_query)}")806cb?("closed")807return808809if not immutable.Map.isMap(changes)810changes = immutable.fromJS(changes)811if not @_value_local?812@_value_local = immutable.Map({})813814if not merge?815merge = 'deep'816else if typeof(merge) == 'function'817cb = merge818merge = 'deep'819820if not immutable.Map.isMap(changes)821cb?("type error -- changes must be an immutable.js Map or JS map")822return823824if DEBUG825console.log("set('#{@_table}'): #{misc.to_json(changes.toJS())}")826827# Ensure that each key is allowed to be set.828if not @_client_query.set?829cb?("users may not set #{@_table}")830return831can_set = @_client_query.set.fields832try833changes.map (v, k) => if (can_set[k] == undefined) then throw Error("users may not set #{@_table}.#{k}")834catch e835cb?(e)836return837838# Determine the primary key's value839id = @_key(changes)840if not id?841# attempt to compute primary key if it is a computed primary key842id0 = @_computed_primary_key(changes)843id = to_key(id0)844if not id? and @_primary_keys.length == 1845# use a "random" primary key from existing data846id0 = id = @_value_local.keySeq().first()847if not id?848cb?("must specify primary key #{@_primary_keys.join(',')}, have at least one record, or have a computed primary key")849return850# Now id is defined851if @_primary_keys.length == 1852changes = changes.set(@_primary_keys[0], id0)853else854i = 0855for pk in @_primary_keys856changes = changes.set(pk, id0[i])857i += 1858859# Get the current value860cur = @_value_local.get(id)861if not cur?862# No record with the given primary key. Require that all the @_required_set_fields863# are specified, or it will become impossible to sync this table to the backend.864for k,_ of @_required_set_fields865if not changes.get(k)?866cb?("must specify field '#{k}' for new records")867return868# If no current value, then next value is easy -- it equals the current value in all cases.869new_val = changes870else871# Use the appropriate merge strategy to get the next val. Fortunately these are all built872# into immutable.js!873switch merge874when 'deep'875new_val = cur.mergeDeep(changes)876when 'shallow'877new_val = cur.merge(changes)878when 'none'879new_val = changes880else881cb?("merge must be one of 'deep', 'shallow', 'none'")882return883# If something changed, then change in our local store, and also kick off a save to the backend.884if not immutable.is(new_val, cur)885@_value_local = @_value_local.set(id, new_val)886@save(cb)887@emit_change([id]) # CRITICAL: other code assumes the key is *NOT* sent with this change event!888else889cb?()890891return new_val892893close: =>894if @_state == 'closed'895# already closed896return897# decrement the reference to this synctable898if global_cache_decref(@)899# close: not zero -- so don't close it yet -- still in use by multiple clients900return901@_client.removeListener('disconnected', @_disconnected)902# do a last attempt at a save (so we don't lose data), then really close.903@_save() # this will synchronously construct the last save and send it904# The moment the sync part of @_save is done, we remove listeners and clear905# everything up. It's critical that as soon as @close is called that there906# be no possible way any further connect events (etc) can make this SyncTable907# do anything!! That finality assumption is made elsewhere (e.g in smc-project/client.coffee)908@removeAllListeners()909if @_id?910@_client.query_cancel(id:@_id)911delete @_id912@_state = 'closed'913delete @_value_local914delete @_value_server915916# wait until some function of this synctable is truthy917# (this might be exactly the same code as in the postgres-synctable.coffee SyncTable....)918wait: (opts) =>919opts = defaults opts,920until : required # waits until "until(@)" evaluates to something truthy921timeout : 30 # in *seconds* -- set to 0 to disable (sort of DANGEROUS, obviously.)922cb : required # cb(undefined, until(@)) on success and cb('timeout') on failure due to timeout; cb('closed') if closed923if @_state == 'closed'924# instantly fail -- table is closed so can't wait for anything925opts.cb("closed")926return927x = opts.until(@)928if x929opts.cb(undefined, x) # already true930return931fail_timer = undefined932f = =>933x = opts.until(@)934if x935@removeListener('change', f)936if fail_timer? then clearTimeout(fail_timer)937opts.cb(undefined, x)938@on('change', f)939if opts.timeout940fail = =>941@removeListener('change', f)942opts.cb('timeout')943fail_timer = setTimeout(fail, 1000*opts.timeout)944return945946synctables = {}947948# for debugging; in particular, verify that synctables are freed.949# Do not leave in production; could be slight security risk.950## window?.synctables = synctables951952exports.sync_table = (query, options, client, debounce_interval=2000, throttle_changes=undefined, use_cache=true) ->953cache_key = json_stable_stringify(query:query, options:options, debounce_interval:debounce_interval, throttle_changes:throttle_changes)954if not use_cache955return new SyncTable(query, options, client, debounce_interval, throttle_changes, cache_key)956957S = synctables[cache_key]958if S?959if S._state == 'connected'960# same behavior as newly created synctable961async.nextTick () ->962if S._state == 'connected'963S.emit('connected')964S._reference_count += 1965return S966else967S = synctables[cache_key] = new SyncTable(query, options, client, debounce_interval, throttle_changes, cache_key)968S._reference_count = 1969return S970971global_cache_decref = (S) ->972if S._reference_count?973S._reference_count -= 1974if S._reference_count <= 0975delete synctables[S._cache_key]976return false # not in use977else978return true # still in use979980#if window?981# window.synctables = synctables982983984985###986Various mock clients for unit testing987988Events:989- disconnected990- connected991###992993class exports.TestBrowserClient1 extends EventEmitter994995is_project: =>996return false997998is_connected: =>999return true10001001is_signed_in: =>1002return true10031004dbg: =>1005return =>10061007query_cancel: =>10081009query: (opts) =>1010opts = defaults opts,1011query : required1012changes : undefined1013options : undefined # if given must be an array of objects, e.g., [{limit:5}]1014timeout : 301015cb : undefined1016@emit 'query', opts1017101810191020