Contact
CoCalc Logo Icon
StoreFeaturesDocsShareSupport News AboutSign UpSign In
| Download
Views: 39538
1
###
2
CoCalc, Copyright (C) 2016, Sagemath Inc.
3
4
This program is free software: you can redistribute it and/or modify
5
it under the terms of the GNU General Public License as published by
6
the Free Software Foundation, either version 3 of the License, or
7
(at your option) any later version.
8
9
---
10
11
SYNCHRONIZED TABLE -- defined by an object query
12
13
- Do a query against a PostgreSQL table using our object query description.
14
- Synchronization with the backend database is done automatically.
15
16
Methods:
17
- constructor(query): query = the name of a table (or a more complicated object)
18
19
- set(map): Set the given keys of map to their values; one key must be
20
the primary key for the table. NOTE: Computed primary keys will
21
get automatically filled in; these are keys in schema.coffee,
22
where the set query looks like this say:
23
(obj, db) -> db.sha1(obj.project_id, obj.path)
24
- get(): Current value of the query, as an immutable.js Map from
25
the primary key to the records, which are also immutable.js Maps.
26
- get(key): The record with given key, as an immutable Map.
27
- get(keys): Immutable Map from given keys to the corresponding records.
28
- get_one(): Returns one record as an immutable Map (useful if there
29
is only one record)
30
31
- close(): Frees up resources, stops syncing, don't use object further
32
33
Events:
34
- 'before-change': fired right before (and in the same event loop) actually
35
applying remote incoming changes
36
- 'change', [array of string primary keys] : fired any time the value of the query result
37
changes, *including* if changed by calling set on this object.
38
Also, called with empty list on first connection if there happens
39
to be nothing in this table. If the primary key is not a string it is
40
converted to a JSON string.
41
- 'disconnected': fired when table is disconnected from the server for some reason
42
- 'connected': fired when table has successfully connected and finished initializing
43
and is ready to use
44
- 'saved', [array of saved objects]: fired after confirmed successful save of objects to backend
45
46
STATES:
47
48
A SyncTable is a finite state machine as follows:
49
50
-------------------<------------------
51
\|/ |
52
[connecting] --> [connected] --> [disconnected] --> [reconnecting]
53
54
Also, there is a final state called 'closed', that the SyncTable moves to when
55
it will not be used further; this frees up all connections and used memory.
56
The table can't be used after it is closed. The only way to get to the
57
closed state is to explicitly call close() on the table; otherwise, the
58
table will keep attempting to connect and work, until it works.
59
60
(anything) --> [closed]
61
62
63
64
- connecting -- connecting to the backend, and have never connected before.
65
66
- connected -- successfully connected to the backend, initialized, and receiving updates.
67
68
- disconnected -- table was successfully initialized, but the network connection
69
died. Can still takes writes, but they will never try to save to
70
the backend. Waiting to reconnect when user connects back to the backend.
71
72
- reconnecting -- client just reconnected to the backend, so this table is now trying
73
to get the full current state of the table and initialize a changefeed.
74
75
- closed -- table is closed, and memory/connections used by the table is freed.
76
77
78
WORRY: what if the user does a set and connecting (or reconnecting) takes a long time, e.g., suspend
79
a laptop, then resume? The changes may get saved... a month later. For some things, e.g., logs,
80
this could be fine. However, on reconnect, the first thing is that complete upstream state of
81
table is set on server version of table, so reconnecting user only sends its changes if upstream
82
hasn't changed anything in that same record.
83
###
84
85
# if true, will log to the console a huge amount of info about every get/set
86
DEBUG = false
87
88
exports.set_debug = (x) ->
89
DEBUG = !!x
90
91
{EventEmitter} = require('events')
92
immutable = require('immutable')
93
async = require('async')
94
underscore = require('underscore')
95
96
misc = require('./misc')
97
schema = require('./schema')
98
99
{defaults, required} = misc
100
101
# We represent synchronized tables by an immutable.js mapping from the primary
102
# key to the object. Since PostgresQL primary keys can be compound (more than
103
# just strings), e.g., they can be arrays, so we convert complicated keys to their
104
# JSON representation. A binary object doesn't make sense here in pure javascript,
105
# but these do:
106
# string, number, time, boolean, or array
107
# Everything automatically converts fine to a string except array, which is the
108
# main thing this function deals with below.
109
# NOTE (1) RIGHT NOW: This should be safe to change at
110
# any time, since the keys aren't stored longterm.
111
# If we do something with localStorage, this will no longer be safe
112
# without a version number.
113
# NOTE (2) Of course you could use both a string and an array as primary keys
114
# in the same table. You could evily make the string equal the json of an array,
115
# and this *would* break things. We are thus assuming that such mixing
116
# doesn't happen. An alternative would be to just *always* use a *stable* version of stringify.
117
# NOTE (3) we use a stable version, since otherwise things will randomly break if the
118
# key is an object.
119
120
json_stable_stringify = require('json-stable-stringify')
121
122
to_key = (x) ->
123
if typeof(x) == 'object'
124
return json_stable_stringify(x)
125
else
126
return x
127
128
# Plug: Class to ensure that the SyncTable stays "plugged" into the hub, if at all possible.
129
# NOTE: I implemented this outside of SyncTable so that it would be much easier
130
# to reason about, and be sure the code is right.
131
class Plug
132
constructor : (opts) ->
133
@_opts = defaults opts,
134
name : 'plug' # Used only for debug logging
135
no_sign_in : required # True if sign is isn't required before connecting, e.g., anonymous synctable and project.
136
client : required # The client object, which provides:
137
# 'connected' and 'signed_in' events, and
138
# is_connected() and is_signed_in() functions.
139
connect : required # A function to call to create a connection; it should run as
140
# quickly as it can and call it's callback with an error if
141
# and only if it fails. It will definitely only be called
142
# once at a time, so no need to put in any sort of block.
143
@connect()
144
145
dbg: (f) =>
146
#return @_opts.client.dbg("Plug('#{@_opts.name}').#{f}")
147
return =>
148
149
# Keep trying until we connect - always succeeds if it terminates
150
connect: (cb) =>
151
dbg = @dbg('connect')
152
if @_is_connecting
153
dbg("already connecting")
154
return
155
@_is_connecting = true
156
dbg('')
157
misc.retry_until_success
158
f : @__try_to_connect_once
159
log : dbg
160
start_delay : 4000
161
max_delay : 20000
162
cb : =>
163
delete @_is_connecting
164
dbg("success!")
165
cb?()
166
167
# Try to connect exactly once. cb gets error if and only if fails to connect.
168
__try_to_connect_once: (cb) =>
169
# timer for giving up on waiting to try to connect
170
give_up_timer = undefined
171
172
# actually try to connect
173
do_connect = =>
174
if give_up_timer
175
clearInterval(give_up_timer)
176
@_opts.connect(cb)
177
178
# Which event/condition has too be true before we even try to connect.
179
if @_opts.no_sign_in
180
event = 'connected'
181
else
182
event = 'signed_in'
183
184
if @_opts.client["is_#{event}"]()
185
# The condition is satisfied, so try once to connect.
186
do_connect()
187
else
188
# Wait until condition is satisfied...
189
@_opts.client.once(event, do_connect)
190
# ... but don't wait forever, in case for some reason we miss
191
# the event (this can maybe rarely happen).
192
give_up = =>
193
@_opts.client.removeListener(event, do_connect)
194
cb("timeout")
195
timer = setTimeout(give_up, 5000+Math.random()*10000)
196
197
class SyncTable extends EventEmitter
198
constructor: (@_query, @_options, @_client, @_debounce_interval, @_throttle_changes, @_cache_key) ->
199
@_init_query()
200
# The value of this query locally.
201
@_value_local = undefined
202
203
# Our best guess as to the value of this query on the server,
204
# according to queries and updates the server pushes to us.
205
@_value_server = undefined
206
207
# The changefeed id, when set by doing a change-feed aware query.
208
@_id = undefined
209
210
# Not connected yet
211
@_state = 'disconnected' # disconnected <--> connected --> closed
212
@_created = new Date()
213
214
@_plug = new Plug
215
name : @_table
216
client : @_client
217
connect : @_connect
218
no_sign_in : @_schema.anonymous or @_client.is_project() # note: projects don't have to authenticate
219
220
@_client.on 'disconnected', =>
221
#console.log("synctable: DISCONNECTED")
222
# When the connection is dropped, the backend hub notices that it was dropped
223
# and immediately cancels all changefeeds. Thus we set @_id to undefined
224
# below, so that we don't redundantly cancel them again, which leads to an error
225
# and wastes resources (which can pile up).
226
@_id = undefined
227
@_disconnected('client disconnect')
228
229
# No throttling of change events unless explicitly requested *or* part of the schema.
230
@_throttle_changes ?= schema.SCHEMA[@_table]?.user_query?.get?.throttle_changes
231
232
if not @_throttle_changes
233
@emit_change = (changed_keys) => @emit('change', changed_keys)
234
else
235
# throttle emitting of change events
236
all_changed_keys = {}
237
do_emit_changes = =>
238
#console.log("#{@_table} -- emitting changes", misc.keys(all_changed_keys))
239
# CRITICAL: some code depends on emitting change even for the *empty* list of keys!
240
# E.g., projects page won't load for new users. This is the *change* from not
241
# loaded to being loaded, which does make sense.
242
@emit('change', misc.keys(all_changed_keys))
243
all_changed_keys = {}
244
do_emit_changes = underscore.throttle(do_emit_changes, @_throttle_changes)
245
@emit_change = (changed_keys) =>
246
#console.log("#{@_table} -- queue changes", changed_keys)
247
for key in changed_keys
248
all_changed_keys[key] = true
249
do_emit_changes()
250
251
dbg: (f) =>
252
#return @_client.dbg("SyncTable('#{@_table}').#{f}")
253
return =>
254
255
_connect: (cb) =>
256
dbg = @dbg("connect")
257
dbg()
258
if @_state == 'closed'
259
cb?('closed')
260
return
261
if @_state == 'connected'
262
cb?()
263
return
264
if @_id?
265
@_client.query_cancel(id:@_id)
266
@_id = undefined
267
268
async.series([
269
(cb) =>
270
# 1. save, in case we have any local unsaved changes, then sync with upstream.
271
if @_value_local? and @_value_server?
272
@_save(cb)
273
else
274
cb()
275
(cb) =>
276
# 2. Now actually do the changefeed query.
277
@_reconnect(cb)
278
], cb)
279
280
_reconnect: (cb) =>
281
dbg = @dbg("_run")
282
if @_state == 'closed'
283
dbg("closed so don't do anything ever again")
284
cb?()
285
return
286
first_resp = true
287
this_query_id = undefined
288
dbg("do the query")
289
@_client.query
290
query : @_query
291
changes : true
292
timeout : 30
293
options : @_options
294
cb : (err, resp) =>
295
296
if @_state == 'closed'
297
# already closed so ignore anything else.
298
return
299
300
if first_resp
301
dbg("query got ", err, resp)
302
first_resp = false
303
if @_state == 'closed'
304
cb?("closed")
305
else if resp?.event == 'query_cancel'
306
cb?("query-cancel")
307
else if err
308
cb?(err)
309
else if not resp?.query?[@_table]?
310
cb?("got no data")
311
else
312
# Successfully completed query
313
this_query_id = @_id = resp.id
314
@_state = 'connected'
315
@_update_all(resp.query[@_table])
316
@emit("connected", resp.query[@_table]) # ready to use!
317
cb?()
318
# Do any pending saves
319
for cb in @_connected_save_cbs ? []
320
@save(cb)
321
delete @_connected_save_cbs
322
else
323
if @_state != 'connected'
324
dbg("nothing to do -- ignore these, and make sure they stop")
325
if this_query_id?
326
@_client.query_cancel(id:this_query_id)
327
return
328
if err or resp?.event == 'query_cancel'
329
@_disconnected("err=#{err}, resp?.event=#{resp?.event}")
330
else
331
# Handle the update
332
@_update_change(resp)
333
334
_disconnected: (why) =>
335
dbg = @dbg("_disconnected")
336
dbg("why=#{why}")
337
if @_state == 'disconnected'
338
dbg("already disconnected")
339
return
340
if @_id
341
@_client.query_cancel(id:@_id)
342
@_state = 'disconnected'
343
@_plug.connect() # start trying to connect again
344
345
# Return string key used in the immutable map in which this table is stored.
346
key: (obj) =>
347
return @_key(obj)
348
349
# Return true if there are changes to this synctable that
350
# have NOT been confirmed as saved to the backend database.
351
has_uncommitted_changes: () =>
352
if not @_value_server? and not @_value_local?
353
return false
354
if @_value_local? and not @_value_server?
355
return true
356
return not @_value_server.equals(@_value_local)
357
358
get: (arg) =>
359
if not @_value_local?
360
return
361
if arg?
362
if misc.is_array(arg)
363
x = {}
364
for k in arg
365
x[to_key(k)] = @_value_local.get(to_key(k))
366
return immutable.fromJS(x)
367
else
368
return @_value_local.get(to_key(arg))
369
else
370
return @_value_local
371
372
get_one: =>
373
return @_value_local?.toSeq().first()
374
375
_parse_query: (query) =>
376
if typeof(query) == 'string'
377
# name of a table -- get all fields
378
v = misc.copy(schema.SCHEMA[query].user_query.get.fields)
379
for k, _ of v
380
v[k] = null
381
return {"#{query}": [v]}
382
else
383
keys = misc.keys(query)
384
if keys.length != 1
385
throw Error("must specify exactly one table")
386
table = keys[0]
387
x = {}
388
if not misc.is_array(query[table])
389
return {"#{table}": [query[table]]}
390
else
391
return {"#{table}": query[table]}
392
393
_init_query: =>
394
# first parse the query to allow for some convenient shortcuts
395
@_query = @_parse_query(@_query)
396
397
# Check that the query is probably valid, and record the table and schema
398
if misc.is_array(@_query)
399
throw Error("must be a single query")
400
tables = misc.keys(@_query)
401
if misc.len(tables) != 1
402
throw Error("must query only a single table")
403
@_table = tables[0]
404
if @_client.is_project()
405
@_client_query = schema.SCHEMA[@_table].project_query
406
else
407
@_client_query = schema.SCHEMA[@_table].user_query
408
if not misc.is_array(@_query[@_table])
409
throw Error("must be a multi-document queries")
410
@_schema = schema.SCHEMA[@_table]
411
if not @_schema?
412
throw Error("unknown schema for table #{@_table}")
413
@_primary_keys = schema.client_db.primary_keys(@_table)
414
# TODO: could put in more checks on validity of query here, using schema...
415
for primary_key in @_primary_keys
416
if not @_query[@_table][0][primary_key]?
417
# must include each primary key in query
418
@_query[@_table][0][primary_key] = null
419
# Function @_to_key to extract primary key from object
420
if @_primary_keys.length == 1
421
# very common case
422
pk = @_primary_keys[0]
423
@_key = (obj) =>
424
if not obj?
425
return
426
if immutable.Map.isMap(obj)
427
return to_key(obj.get(pk))
428
else
429
return to_key(obj[pk])
430
else
431
# compound primary key
432
@_key = (obj) =>
433
if not obj?
434
return
435
v = []
436
if immutable.Map.isMap(obj)
437
for pk in @_primary_keys
438
a = obj.get(pk)
439
if not a?
440
return
441
v.push(a)
442
else
443
for pk in @_primary_keys
444
a = obj[pk]
445
if not a?
446
return
447
v.push(a)
448
return to_key(v)
449
450
# Which fields the user is allowed to set.
451
@_set_fields = []
452
# Which fields *must* be included in any set query
453
@_required_set_fields = {}
454
for field in misc.keys(@_query[@_table][0])
455
if @_client_query?.set?.fields?[field]?
456
@_set_fields.push(field)
457
if @_client_query?.set?.required_fields?[field]?
458
@_required_set_fields[field] = true
459
460
# Is anonymous access to this table allowed?
461
@_anonymous = !!@_schema.anonymous
462
463
# Return map from keys that have changed along with how they changed, or undefined
464
# if the value of local or the server hasn't been initialized
465
_changes: =>
466
if not @_value_server? or not @_value_local?
467
return
468
changed = {}
469
@_value_local.map (new_val, key) =>
470
old_val = @_value_server.get(key)
471
if not new_val.equals(old_val)
472
changed[key] = {new_val:new_val, old_val:old_val}
473
return changed
474
475
_save: (cb) =>
476
if @__is_saving
477
cb?("already saving")
478
else
479
@__is_saving = true
480
@__save (err) =>
481
@__is_saving = false
482
cb?(err)
483
484
__save: (cb) =>
485
if @_state == 'closed'
486
cb?("closed")
487
return
488
# console.log("_save('#{@_table}')")
489
# Determine which records have changed and what their new values are.
490
if not @_value_server?
491
cb?("don't know server yet")
492
return
493
if not @_value_local?
494
cb?("don't know local yet")
495
return
496
497
if not @_client_query.set?
498
# Nothing to do -- can never set anything for this table.
499
# There are some tables (e.g., stats) where the remote values
500
# could change while user is offline, and the code below would
501
# result in warnings.
502
cb?()
503
return
504
505
changed = @_changes()
506
at_start = @_value_local
507
508
# Send our changes to the server.
509
query = []
510
saved_objs = []
511
# sort so that behavior is more predictable = faster (e.g., sync patches are in
512
# order); the keys are strings so default sort is fine
513
for key in misc.keys(changed).sort()
514
c = changed[key]
515
obj = {}
516
# NOTE: this may get replaced below with proper javascript, e.g., for compound primary key
517
if @_primary_keys.length == 1
518
obj[@_primary_keys[0]] = key
519
else
520
# unwrap compound primary key
521
v = JSON.parse(key)
522
i = 0
523
for primary_key in @_primary_keys
524
obj[primary_key] = v[i]
525
i += 1
526
527
for k in @_set_fields
528
v = c.new_val.get(k)
529
if v?
530
if @_required_set_fields[k] or not immutable.is(v, c.old_val?.get(k))
531
if immutable.Iterable.isIterable(v)
532
obj[k] = v.toJS()
533
else
534
obj[k] = v
535
query.push({"#{@_table}":obj})
536
saved_objs.push(obj)
537
538
# console.log("sending #{query.length} changes: #{misc.to_json(query)}")
539
if query.length == 0
540
cb?()
541
return
542
#console.log("query=#{misc.to_json(query)}")
543
#Use this to test fix_if_no_update_soon:
544
# if Math.random() <= .5
545
# query = []
546
#@_fix_if_no_update_soon() # -disabled -- instead use "checking changefeed ids".
547
@_client.query
548
query : query
549
options : [{set:true}] # force it to be a set query
550
timeout : 30
551
cb : (err) =>
552
if err
553
console.warn("_save('#{@_table}') error:", err)
554
if err == 'clock'
555
@_client.alert_message(type:'error', timeout:9999, message:"Your computer's clock is or was off! Fix it and **refresh your browser**.")
556
cb?(err)
557
else
558
if @_state == 'closed'
559
# this can happen in case synctable is closed after _save is called but before returning from this query.
560
cb?("closed")
561
return
562
if not @_value_server? or not @_value_local?
563
# There is absolutely no possible way this can happen, since it was
564
# checked for above before the call, and these can only get set by
565
# the close method to undefined, which also sets the @_state to closed,
566
# so would get caught above. However, evidently this **does happen**:
567
# https://github.com/sagemathinc/cocalc/issues/1870
568
cb?("value_server and value_local must be set")
569
return
570
@emit('saved', saved_objs)
571
# success: each change in the query what committed successfully to the database; we can
572
# safely set @_value_server (for each value) as long as it didn't change in the meantime.
573
for k, v of changed
574
if immutable.is(@_value_server.get(k), v.old_val) # immutable.is since either could be undefined
575
#console.log "setting @_value_server[#{k}] =", v.new_val?.toJS()
576
@_value_server = @_value_server.set(k, v.new_val)
577
if not at_start.equals(@_value_local)
578
# keep saving until @_value_local doesn't change *during* the save -- this means
579
# when saving stops that we guarantee there are no unsaved changes.
580
@_save(cb)
581
else
582
cb?()
583
584
save: (cb) =>
585
if @_state == 'closed'
586
cb?("closed")
587
return
588
589
if @_state != 'connected'
590
cb?("not connected") # do not change this error message; it is assumed elsewhere.
591
return
592
593
@_save_debounce ?= {}
594
595
if not @_value_server? or not @_value_local?
596
@_connected_save_cbs ?= []
597
@_connected_save_cbs.push(cb)
598
return
599
600
misc.async_debounce
601
f : (cb) =>
602
misc.retry_until_success
603
f : @_save
604
max_delay : 5000
605
max_time : 30000
606
cb : cb
607
interval : @_debounce_interval
608
state : @_save_debounce
609
cb : cb
610
611
# Handle an update of all records from the database. This happens on
612
# initialization, and also if we disconnect and reconnect.
613
_update_all: (v) =>
614
dbg = @dbg("_update_all")
615
616
if @_state == 'closed'
617
# nothing to do -- just ignore updates from db
618
return
619
620
if not v?
621
console.warn("_update_all('#{@_table}') called with v=undefined")
622
return
623
624
@emit('before-change')
625
# Restructure the array of records in v as a mapping from the primary key
626
# to the corresponding record.
627
x = {}
628
for y in v
629
x[@_key(y)] = y
630
631
conflict = false
632
633
# Figure out what to change in our local view of the database query result.
634
if not @_value_local? or not @_value_server?
635
dbg("easy case -- nothing has been initialized yet, so just set everything.")
636
@_value_local = @_value_server = immutable.fromJS(x)
637
first_connect = true
638
changed_keys = misc.keys(x) # of course all keys have been changed.
639
else
640
dbg("harder case -- everything has already been initialized.")
641
changed_keys = []
642
643
# DELETE or CHANGED:
644
# First check through each key in our local view of the query
645
# and if the value differs from what is in the database (i.e.,
646
# what we just got from DB), make that change.
647
# (Later we will possibly merge in the change
648
# using the last known upstream database state.)
649
@_value_local.map (local, key) =>
650
if x[key]?
651
# update value we have locally
652
if @_handle_new_val(x[key], changed_keys)
653
conflict = true
654
else
655
# This is a value defined locally that does not exist
656
# on the remote serve. It could be that the value
657
# was deleted when we weren't connected, in which case
658
# we should delete the value we have locally. On the
659
# other hand, maybe the local value was newly set
660
# while we weren't connected, so we know it but the
661
# backend server doesn't, which case we should keep it,
662
# and set conflict=true, so it gets saved to the backend.
663
664
if @_value_local.get(key).equals(@_value_server.get(key))
665
# The local value for this key was saved to the backend before
666
# we got disconnected, so there's definitely no need to try
667
# keep it around, given that the backend no longer has it
668
# as part of the query. CRITICAL: This doesn't necessarily mean
669
# the value was deleted from the database, but instead that
670
# it doesn't satisfy the synctable query, e.g., it isn't one
671
# of the 150 most recent file_use notifications, or it isn't
672
# a patch that is at least as new as the newest snapshot.
673
#console.log("removing local value: #{key}")
674
@_value_local = @_value_local.delete(key)
675
changed_keys.push(key)
676
else
677
conflict = true
678
679
# NEWLY ADDED:
680
# Next check through each key in what's on the remote database,
681
# and if the corresponding local key isn't defined, set its value.
682
# Here we are simply checking for newly added records.
683
for key, val of x
684
if not @_value_local.get(key)?
685
@_value_local = @_value_local.set(key, immutable.fromJS(val))
686
changed_keys.push(key)
687
688
# It's possibly that nothing changed (e.g., typical case on reconnect!) so we check.
689
# If something really did change, we set the server state to what we just got, and
690
# also inform listeners of which records changed (by giving keys).
691
#console.log("update_all: changed_keys=", changed_keys)
692
if changed_keys.length != 0
693
@_value_server = immutable.fromJS(x)
694
@emit_change(changed_keys)
695
else if first_connect
696
# First connection and table is empty.
697
@emit_change(changed_keys)
698
if conflict
699
@save()
700
701
# Apply one incoming change from the database to the in-memory
702
# local synchronized table
703
_update_change: (change) =>
704
#console.log("_update_change", change)
705
if @_state == 'closed'
706
# We might get a few more updates even after
707
# canceling the changefeed, so we just ignore them.
708
return
709
if not @_value_local?
710
console.warn("_update_change(#{@_table}): tried to call _update_change even though local not yet defined (ignoring)")
711
return
712
if not @_value_server?
713
console.warn("_update_change(#{@_table}): tried to call _update_change even though set not yet defined (ignoring)")
714
return
715
if DEBUG
716
console.log("_update_change('#{@_table}'): #{misc.to_json(change)}")
717
@emit('before-change')
718
changed_keys = []
719
conflict = false
720
if change.new_val?
721
conflict = @_handle_new_val(change.new_val, changed_keys)
722
723
if change.old_val? and @_key(change.old_val) != @_key(change.new_val)
724
# Delete a record (TODO: untested)
725
key = @_key(change.old_val)
726
@_value_local = @_value_local.delete(key)
727
@_value_server = @_value_server.delete(key)
728
changed_keys.push(key)
729
730
#console.log("update_change: changed_keys=", changed_keys)
731
if changed_keys.length > 0
732
#console.log("_update_change: change")
733
@emit_change(changed_keys)
734
if conflict
735
@save()
736
737
_handle_new_val: (val, changed_keys) =>
738
key = @_key(val)
739
new_val = immutable.fromJS(val)
740
local_val = @_value_local.get(key)
741
conflict = false
742
if not new_val.equals(local_val)
743
#console.log("change table='#{@_table}': #{misc.to_json(local_val?.toJS())} --> #{misc.to_json(new_val.toJS())}") if @_table == 'patches'
744
if not local_val?
745
@_value_local = @_value_local.set(key, new_val)
746
changed_keys.push(key)
747
else
748
server = @_value_server.get(key)
749
# Set in @_value_local every key whose value changed between new_val and server; basically, we're
750
# determining and applying the "patch" from upstream, even though it was sent as a complete record.
751
# We can compute the patch, since we know the last server value.
752
new_val.map (v, k) =>
753
if not immutable.is(v, server?.get(k))
754
local_val = local_val.set(k, v)
755
#console.log("#{@_table}: set #{k} to #{v}")
756
server?.map (v, k) =>
757
if not new_val.has(k)
758
local_val = local_val.delete(k)
759
if not local_val.equals(@_value_local.get(key))
760
@_value_local = @_value_local.set(key, local_val)
761
changed_keys.push(key)
762
if not local_val.equals(new_val)
763
#console.log("#{@_table}: conflict! ", local_val, new_val) if @_table == 'patches'
764
@emit('conflict', {new_val:new_val, old_val:local_val})
765
conflict = true
766
@_value_server = @_value_server.set(key, new_val)
767
return conflict
768
769
# obj is an immutable.js Map without the primary key
770
# set. If the database schema defines a way to compute
771
# the primary key from other keys, try to use it here.
772
# This function returns the computed primary key if it works,
773
# and returns undefined otherwise.
774
_computed_primary_key: (obj) =>
775
if @_primary_keys.length == 1
776
f = @_client_query.set.fields[@_primary_keys[0]]
777
if typeof(f) == 'function'
778
return f(obj.toJS(), schema.client_db)
779
else
780
return
781
else
782
v = []
783
for pk in @_primary_keys
784
f = @_client_query.set.fields[pk]
785
if typeof(f) == 'function'
786
v.push(f(obj.toJS(), schema.client_db))
787
else
788
return
789
return v
790
791
# Changes (or creates) one entry in the table.
792
# The input field changes is either an Immutable.js Map or a JS Object map.
793
# If changes does not have the primary key then a random record is updated,
794
# and there *must* be at least one record. Exception: computed primary
795
# keys will be computed (see stuff about computed primary keys above).
796
# The second parameter 'merge' can be one of three values:
797
# 'deep' : (DEFAULT) deep merges the changes into the record, keep as much info as possible.
798
# 'shallow': shallow merges, replacing keys by corresponding values
799
# 'none' : do no merging at all -- just replace record completely
800
# The cb is called with cb(err) if something goes wrong.
801
# Returns the updated value.
802
set: (changes, merge, cb) =>
803
if @_state == 'closed'
804
# Attempting to set on a closed table is dangerous since any data set *will* be
805
# silently lost. So spit out a visible warning.
806
console.warn("WARNING: attempt to do a set on a closed table: '#{@_table}', #{misc.to_json(@_query)}")
807
cb?("closed")
808
return
809
810
if not immutable.Map.isMap(changes)
811
changes = immutable.fromJS(changes)
812
if not @_value_local?
813
@_value_local = immutable.Map({})
814
815
if not merge?
816
merge = 'deep'
817
else if typeof(merge) == 'function'
818
cb = merge
819
merge = 'deep'
820
821
if not immutable.Map.isMap(changes)
822
cb?("type error -- changes must be an immutable.js Map or JS map")
823
return
824
825
if DEBUG
826
console.log("set('#{@_table}'): #{misc.to_json(changes.toJS())}")
827
828
# Ensure that each key is allowed to be set.
829
if not @_client_query.set?
830
cb?("users may not set #{@_table}")
831
return
832
can_set = @_client_query.set.fields
833
try
834
changes.map (v, k) => if (can_set[k] == undefined) then throw Error("users may not set #{@_table}.#{k}")
835
catch e
836
cb?(e)
837
return
838
839
# Determine the primary key's value
840
id = @_key(changes)
841
if not id?
842
# attempt to compute primary key if it is a computed primary key
843
id0 = @_computed_primary_key(changes)
844
id = to_key(id0)
845
if not id? and @_primary_keys.length == 1
846
# use a "random" primary key from existing data
847
id0 = id = @_value_local.keySeq().first()
848
if not id?
849
cb?("must specify primary key #{@_primary_keys.join(',')}, have at least one record, or have a computed primary key")
850
return
851
# Now id is defined
852
if @_primary_keys.length == 1
853
changes = changes.set(@_primary_keys[0], id0)
854
else
855
i = 0
856
for pk in @_primary_keys
857
changes = changes.set(pk, id0[i])
858
i += 1
859
860
# Get the current value
861
cur = @_value_local.get(id)
862
if not cur?
863
# No record with the given primary key. Require that all the @_required_set_fields
864
# are specified, or it will become impossible to sync this table to the backend.
865
for k,_ of @_required_set_fields
866
if not changes.get(k)?
867
cb?("must specify field '#{k}' for new records")
868
return
869
# If no current value, then next value is easy -- it equals the current value in all cases.
870
new_val = changes
871
else
872
# Use the appropriate merge strategy to get the next val. Fortunately these are all built
873
# into immutable.js!
874
switch merge
875
when 'deep'
876
new_val = cur.mergeDeep(changes)
877
when 'shallow'
878
new_val = cur.merge(changes)
879
when 'none'
880
new_val = changes
881
else
882
cb?("merge must be one of 'deep', 'shallow', 'none'")
883
return
884
# If something changed, then change in our local store, and also kick off a save to the backend.
885
if not immutable.is(new_val, cur)
886
@_value_local = @_value_local.set(id, new_val)
887
@save(cb)
888
@emit_change([id]) # CRITICAL: other code assumes the key is *NOT* sent with this change event!
889
else
890
cb?()
891
892
return new_val
893
894
close: =>
895
if @_state == 'closed'
896
# already closed
897
return
898
# decrement the reference to this synctable
899
if global_cache_decref(@)
900
# close: not zero -- so don't close it yet -- still in use by multiple clients
901
return
902
@_client.removeListener('disconnected', @_disconnected)
903
# do a last attempt at a save (so we don't lose data), then really close.
904
@_save() # this will synchronously construct the last save and send it
905
# The moment the sync part of @_save is done, we remove listeners and clear
906
# everything up. It's critical that as soon as @close is called that there
907
# be no possible way any further connect events (etc) can make this SyncTable
908
# do anything!! That finality assumption is made elsewhere (e.g in smc-project/client.coffee)
909
@removeAllListeners()
910
if @_id?
911
@_client.query_cancel(id:@_id)
912
delete @_id
913
@_state = 'closed'
914
delete @_value_local
915
delete @_value_server
916
917
# wait until some function of this synctable is truthy
918
# (this might be exactly the same code as in the postgres-synctable.coffee SyncTable....)
919
wait: (opts) =>
920
opts = defaults opts,
921
until : required # waits until "until(@)" evaluates to something truthy
922
timeout : 30 # in *seconds* -- set to 0 to disable (sort of DANGEROUS, obviously.)
923
cb : required # cb(undefined, until(@)) on success and cb('timeout') on failure due to timeout; cb('closed') if closed
924
if @_state == 'closed'
925
# instantly fail -- table is closed so can't wait for anything
926
opts.cb("closed")
927
return
928
x = opts.until(@)
929
if x
930
opts.cb(undefined, x) # already true
931
return
932
fail_timer = undefined
933
f = =>
934
x = opts.until(@)
935
if x
936
@removeListener('change', f)
937
if fail_timer? then clearTimeout(fail_timer)
938
opts.cb(undefined, x)
939
@on('change', f)
940
if opts.timeout
941
fail = =>
942
@removeListener('change', f)
943
opts.cb('timeout')
944
fail_timer = setTimeout(fail, 1000*opts.timeout)
945
return
946
947
synctables = {}
948
949
# for debugging; in particular, verify that synctables are freed.
950
# Do not leave in production; could be slight security risk.
951
## window?.synctables = synctables
952
953
exports.sync_table = (query, options, client, debounce_interval=2000, throttle_changes=undefined, use_cache=true) ->
954
cache_key = json_stable_stringify(query:query, options:options, debounce_interval:debounce_interval, throttle_changes:throttle_changes)
955
if not use_cache
956
return new SyncTable(query, options, client, debounce_interval, throttle_changes, cache_key)
957
958
S = synctables[cache_key]
959
if S?
960
if S._state == 'connected'
961
# same behavior as newly created synctable
962
async.nextTick () ->
963
if S._state == 'connected'
964
S.emit('connected')
965
S._reference_count += 1
966
return S
967
else
968
S = synctables[cache_key] = new SyncTable(query, options, client, debounce_interval, throttle_changes, cache_key)
969
S._reference_count = 1
970
return S
971
972
global_cache_decref = (S) ->
973
if S._reference_count?
974
S._reference_count -= 1
975
if S._reference_count <= 0
976
delete synctables[S._cache_key]
977
return false # not in use
978
else
979
return true # still in use
980
981
#if window?
982
# window.synctables = synctables
983
984
985
986
###
987
Various mock clients for unit testing
988
989
Events:
990
- disconnected
991
- connected
992
###
993
994
class exports.TestBrowserClient1 extends EventEmitter
995
996
is_project: =>
997
return false
998
999
is_connected: =>
1000
return true
1001
1002
is_signed_in: =>
1003
return true
1004
1005
dbg: =>
1006
return =>
1007
1008
query_cancel: =>
1009
1010
query: (opts) =>
1011
opts = defaults opts,
1012
query : required
1013
changes : undefined
1014
options : undefined # if given must be an array of objects, e.g., [{limit:5}]
1015
timeout : 30
1016
cb : undefined
1017
@emit 'query', opts
1018
1019
1020