Contact
CoCalc Logo Icon
StoreFeaturesDocsShareSupport News AboutSign UpSign In
| Download
Views: 39539
1
##############################################################################
2
#
3
# CoCalc: Collaborative Calculation in the Cloud
4
#
5
# Copyright (C) 2017, Sagemath Inc.
6
#
7
# This program is free software: you can redistribute it and/or modify
8
# it under the terms of the GNU General Public License as published by
9
# the Free Software Foundation, either version 3 of the License, or
10
# (at your option) any later version.
11
#
12
# This program is distributed in the hope that it will be useful,
13
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
# GNU General Public License for more details.
16
#
17
# You should have received a copy of the GNU General Public License
18
# along with this program. If not, see <http://www.gnu.org/licenses/>.
19
#
20
###############################################################################
21
22
###
23
Server side synchronized tables built on PostgreSQL, and basic support
24
for user get query updates.
25
###
26
27
EventEmitter = require('events')
28
29
immutable = require('immutable')
30
async = require('async')
31
underscore = require('underscore')
32
33
{defaults} = misc = require('smc-util/misc')
34
required = defaults.required
35
misc_node = require('smc-util-node/misc_node')
36
37
{PostgreSQL, pg_type, one_result, all_results} = require('./postgres')
38
{quote_field} = require('./postgres-base')
39
40
{SCHEMA} = require('smc-util/schema')
41
42
43
class exports.PostgreSQL extends PostgreSQL
44
45
_ensure_trigger_exists: (table, select, watch, cb) =>
46
dbg = @_dbg("_ensure_trigger_exists(#{table})")
47
dbg("select=#{misc.to_json(select)}")
48
if misc.len(select) == 0
49
cb('there must be at least one column selected')
50
return
51
tgname = trigger_name(table, select, watch)
52
trigger_exists = undefined
53
async.series([
54
(cb) =>
55
dbg("checking whether or not trigger exists")
56
@_query
57
query : "SELECT count(*) FROM pg_trigger WHERE tgname = '#{tgname}'"
58
cb : (err, result) =>
59
if err
60
cb(err)
61
else
62
trigger_exists = parseInt(result.rows[0].count) > 0
63
cb()
64
(cb) =>
65
if trigger_exists
66
dbg("trigger #{tgname} already exists")
67
cb()
68
return
69
dbg("creating trigger #{tgname}")
70
code = trigger_code(table, select, watch)
71
async.series([
72
(cb) =>
73
@_query
74
query : code.function
75
cb : cb
76
(cb) =>
77
@_query
78
query : code.trigger
79
cb : cb
80
], cb)
81
], cb)
82
83
_listen: (table, select, watch, cb) =>
84
dbg = @_dbg("_listen(#{table})")
85
dbg("select = #{misc.to_json(select)}")
86
if not misc.is_object(select)
87
cb('select must be an object')
88
return
89
if misc.len(select) == 0
90
cb('there must be at least one column')
91
return
92
if not misc.is_array(watch)
93
cb('watch must be an array')
94
return
95
@_listening ?= {}
96
tgname = trigger_name(table, select, watch)
97
if @_listening[tgname] > 0
98
dbg("already listening")
99
@_listening[tgname] += 1
100
cb?(undefined, tgname)
101
return
102
async.series([
103
(cb) =>
104
dbg("ensure trigger exists")
105
@_ensure_trigger_exists(table, select, watch, cb)
106
(cb) =>
107
dbg("add listener")
108
@_query
109
query : "LISTEN #{tgname}"
110
cb : cb
111
], (err) =>
112
if err
113
dbg("fail: err = #{err}")
114
cb?(err)
115
else
116
@_listening[tgname] ?= 0
117
@_listening[tgname] += 1
118
dbg("success")
119
cb?(undefined, tgname)
120
)
121
122
_notification: (mesg) =>
123
#@_dbg('notification')(misc.to_json(mesg)) # this is way too verbose...
124
@emit(mesg.channel, JSON.parse(mesg.payload))
125
126
_clear_listening_state: =>
127
@_listening = {}
128
129
_stop_listening: (table, select, watch, cb) =>
130
@_listening ?= {}
131
tgname = trigger_name(table, select, watch)
132
if not @_listening[tgname]? or @_listening[tgname] == 0
133
cb?()
134
return
135
if @_listening[tgname] > 0
136
@_listening[tgname] -= 1
137
if @_listening[tgname] == 0
138
@_query
139
query : "UNLISTEN #{tgname}"
140
cb : cb
141
142
# Server-side changefeed-updated table, which automatically restart changefeed
143
# on error, etc. See SyncTable docs where the class is defined.
144
synctable: (opts) =>
145
opts = defaults opts,
146
table : required
147
columns : undefined
148
where : undefined
149
limit : undefined
150
order_by : undefined
151
where_function : undefined # if given; a function of the *primary* key that returns true if and only if it matches the changefeed
152
idle_timeout_s : undefined # TODO: currently ignored
153
cb : required
154
new SyncTable(@, opts.table, opts.columns, opts.where, opts.where_function, opts.limit, opts.order_by, opts.cb)
155
return
156
157
changefeed: (opts) =>
158
opts = defaults opts,
159
table : required # Name of the table
160
select : required # Map from field names to postgres data types. These must
161
# determine entries of table (e.g., primary key).
162
watch : required # Array of field names we watch for changes
163
where : required # Condition involving only the fields in select; or function taking obj with select and returning true or false
164
cb : required
165
new Changes(@, opts.table, opts.select, opts.watch, opts.where, opts.cb)
166
return
167
168
# Event emitter that
169
project_and_user_tracker: (opts) =>
170
opts = defaults opts,
171
cb : required
172
if @_project_and_user_tracker?
173
opts.cb(undefined, @_project_and_user_tracker)
174
return
175
@_project_and_user_tracker_cbs ?= []
176
@_project_and_user_tracker_cbs.push(opts.cb)
177
if @_project_and_user_tracker_cbs.length == 1
178
x = new ProjectAndUserTracker @, (err) =>
179
if not err
180
@_project_and_user_tracker = x
181
x.on 'error', =>
182
delete @_project_and_user_tracker
183
else
184
x = undefined
185
for cb in @_project_and_user_tracker_cbs
186
cb?(err, x)
187
delete @_project_and_user_tracker_cbs
188
189
190
class ProjectAndUserTracker extends EventEmitter
191
constructor: (@_db, cb) ->
192
dbg = @_dbg('constructor')
193
dbg("Initializing Project and user tracker...")
194
@setMaxListeners(10000) # every changefeed might result in a listener on this one object.
195
# by a "set" we mean map to bool
196
@_accounts = {} # set of accounts we care about
197
@_users = {} # map from from project_id to set of users of a given project
198
@_projects = {} # map from account_id to set of projects of a given user
199
@_collabs = {} # map from account_id to map from account_ids to *number* of projects you have in common
200
# create changefeed listening on changes to projects table
201
@_db.changefeed
202
table : 'projects'
203
select : {project_id:'UUID'}
204
watch : ['users']
205
where : {}
206
cb : (err, feed) =>
207
if err
208
dbg("Error = #{err}")
209
cb(err)
210
else
211
dbg("Done")
212
@_feed = feed
213
@_feed.on 'change', @_handle_change
214
@_feed.on 'error', @_handle_error
215
@_feed.on 'close', (=> @_handle_error("changefeed closed"))
216
cb()
217
_dbg: (f) =>
218
return @_db._dbg("Tracker.#{f}")
219
220
_handle_error: (err) =>
221
if @_closed
222
return
223
# There was an error in the changefeed.
224
# Error is totally fatal, so we close up shop.
225
dbg = @_dbg("_handle_error")
226
dbg("err='#{err}'")
227
@emit('error', err)
228
@close()
229
230
close: =>
231
if @_closed
232
return
233
@_closed = true
234
@emit('close')
235
@removeAllListeners()
236
@_feed?.close()
237
delete @_feed
238
239
_handle_change: (x) =>
240
if x.action == 'delete'
241
project_id = x.old_val.project_id
242
if not @_users[project_id]?
243
# no users
244
return
245
for account_id of @_users[project_id]
246
@_remove_user_from_project(account_id, project_id)
247
return
248
# users on a project changed or project created
249
project_id = x.new_val.project_id
250
@_db._query
251
query : "SELECT jsonb_object_keys(users) AS account_id FROM projects"
252
where : "project_id = $::UUID":project_id
253
cb : all_results 'account_id', (err, users) =>
254
if err
255
# TODO! -- will have to try again... or make a version of _query that can't fail...?
256
return
257
if not @_users[project_id]?
258
# we are not already watching this project
259
any = false
260
for account_id in users
261
if @_accounts[account_id]
262
any = true
263
break
264
if not any
265
# *and* none of our tracked users are on this project... so don't care
266
return
267
268
# first add any users who got added, and record which accounts are relevant
269
users_now = {}
270
for account_id in users
271
users_now[account_id] = true
272
users_before = @_users[project_id] ? {}
273
for account_id of users_now
274
if not users_before[account_id]
275
@_add_user_to_project(account_id, project_id)
276
for account_id of users_before
277
if not users_now[account_id]
278
@_remove_user_from_project(account_id, project_id)
279
280
# add and remove user from a project, maintaining our data structures (@_accounts, @_projects, @_collabs)
281
_add_user_to_project: (account_id, project_id) =>
282
dbg = @_dbg('_add_user_to_project')
283
if account_id?.length != 36 or project_id?.length != 36
284
# nothing to do -- better than crashing the server...
285
dbg("WARNING: invalid account_id (='#{account_id}') or project_id (='#{project_id}')")
286
return
287
if @_projects[account_id]?[project_id]
288
return
289
@emit 'add_user_to_project', {account_id:account_id, project_id:project_id}
290
users = @_users[project_id] ?= {}
291
users[account_id] = true
292
projects = @_projects[account_id] ?= {}
293
projects[project_id] = true
294
collabs = @_collabs[account_id] ?= {}
295
for other_account_id of users
296
if collabs[other_account_id]?
297
collabs[other_account_id] += 1
298
else
299
collabs[other_account_id] = 1
300
@emit 'add_collaborator', {account_id:account_id, collab_id:other_account_id}
301
other_collabs = @_collabs[other_account_id]
302
if other_collabs[account_id]?
303
other_collabs[account_id] += 1
304
else
305
other_collabs[account_id] = 1
306
@emit 'add_collaborator', {account_id:other_account_id, collab_id:account_id}
307
308
_remove_user_from_project: (account_id, project_id, no_emit) =>
309
if account_id?.length != 36 or project_id?.length != 36
310
throw Error("invalid account_id or project_id")
311
if not @_projects[account_id]?[project_id]
312
return
313
if not no_emit
314
@emit 'remove_user_from_project', {account_id:account_id, project_id:project_id}
315
collabs = @_collabs[account_id] ?= {}
316
for other_account_id of @_users[project_id]
317
@_collabs[account_id][other_account_id] -= 1
318
if @_collabs[account_id][other_account_id] == 0
319
delete @_collabs[account_id][other_account_id]
320
if not no_emit
321
@emit 'remove_collaborator', {account_id:account_id, collab_id:other_account_id}
322
@_collabs[other_account_id][account_id] -= 1
323
if @_collabs[other_account_id][account_id] == 0
324
delete @_collabs[other_account_id][account_id]
325
if not no_emit
326
@emit 'remove_collaborator', {account_id:other_account_id, collab_id:account_id}
327
delete @_users[project_id][account_id]
328
delete @_projects[account_id][project_id]
329
330
# Register the given account so that this client watches the database
331
# in order to be aware of all projects and collaborators of the
332
# given account.
333
register: (opts) =>
334
opts = defaults opts,
335
account_id : required
336
cb : required
337
if @_accounts[opts.account_id]?
338
# already registered
339
opts.cb()
340
return
341
@_register_todo ?= {}
342
if misc.len(@_register_todo) == 0
343
# no registration is currently happening
344
@_register_todo[opts.account_id] = [opts.cb]
345
# kick things off -- this will keep registering accounts
346
# until everything is done, then set @_register_todo to undefined
347
@_register()
348
else
349
# Accounts are being registered right now. Add to the todo list.
350
v = @_register_todo[opts.account_id]
351
if v?
352
v.push(opts.cb)
353
else
354
@_register_todo[opts.account_id] = [opts.cb]
355
356
# Call _register to completely clear the work @_register_todo work queue.
357
# NOTE: _register does each account, *one after another*, rather than doing
358
# everything in parallel. WARNING: DO NOT rewrite this to do everything in parallel,
359
# unless you think you thoroughly understand the algorithm, since I think
360
# doing things in parallel would horribly break!
361
_register: =>
362
account_id = misc.keys(@_register_todo)?[0]
363
if not account_id?
364
# no work
365
return
366
# Register this account
367
dbg = @_dbg("_register")
368
dbg("registering account='#{account_id}'...")
369
@_db._query
370
query : "SELECT project_id, json_agg(o) as users FROM (select project_id, jsonb_object_keys(users) AS o FROM projects WHERE users ? $1::TEXT) s group by s.project_id"
371
params : [account_id]
372
cb : all_results (err, x) =>
373
if not err
374
@_accounts[account_id] = true
375
for a in x
376
if @_users[a.project_id]?
377
# already have data about this project
378
continue
379
else
380
for collab_account_id in a.users
381
# NOTE: Very rarely, sometimes collab_account_id is not defined
382
if collab_account_id?
383
@_add_user_to_project(collab_account_id, a.project_id)
384
# call the callbacks
385
if err
386
dbg("error registering '#{account_id}' -- err=#{err}")
387
else
388
dbg("successfully registered '#{account_id}'")
389
for cb in @_register_todo[account_id]
390
cb?(err)
391
# We are done (trying to) register account_id, for good or ill.
392
delete @_register_todo[account_id]
393
if misc.len(@_register_todo) > 0
394
# Deal with next account that needs to be registered
395
@_register()
396
397
unregister: (opts) =>
398
opts = defaults opts,
399
account_id : required
400
if not @_accounts[opts.account_id]?
401
return
402
v = []
403
for project_id of @_projects[opts.account_id]
404
v.push(project_id)
405
delete @_accounts[opts.account_id]
406
# Forget about any projects they we are on that are no longer
407
# necessary to watch...
408
for project_id in v
409
need = false
410
for account_id of @_users[project_id]
411
if @_accounts[account_id]?
412
need = true
413
break
414
if not need
415
for account_id of @_users[project_id]
416
@_remove_user_from_project(account_id, project_id, true)
417
delete @_users[project_id]
418
return
419
420
# return *set* of projects that this user is a collaborator on
421
projects: (account_id) =>
422
if not @_accounts[account_id]?
423
throw Error("account (='#{account_id}') must be registered")
424
return @_projects[account_id] ? {}
425
426
# map from collabs of account_id to number of projects they collab on (account_id itself counted twice)
427
collabs: (account_id) =>
428
return @_collabs[account_id]
429
430
###
431
The Changes class is a useful building block
432
for making changefeeds. It lets you watch when given
433
columns change in a given table, and be notified
434
when a where condition is satisfied.
435
436
IMPORTANT: If an error event is emitted then then
437
Changes object will close and not work any further!
438
You must recreate it.
439
###
440
class Changes extends EventEmitter
441
constructor: (@_db, @_table, @_select, @_watch, @_where, cb) ->
442
@dbg = @_dbg("constructor")
443
@dbg("select=#{misc.to_json(@_select)}, watch=#{misc.to_json(@_watch)}, @_where=#{misc.to_json(@_where)}")
444
try
445
@_init_where()
446
catch e
447
cb?("error initializing where conditions -- #{e}")
448
return
449
@_db._listen @_table, @_select, @_watch, (err, tgname) =>
450
if err
451
cb?(err); return
452
@_tgname = tgname
453
@_db.on(@_tgname, @_handle_change)
454
# NOTE: we close on *connect*, not on disconnect, since then clients
455
# that try to reconnect will only try to do so when we have an actual
456
# connection to the database. No point in worrying them while trying
457
# to reconnect, which only makes matters worse (as they panic and
458
# requests pile up!).
459
@_db.once('connect', @close)
460
cb?(undefined, @)
461
462
_dbg: (f) =>
463
return @_db._dbg("Changes(table='#{@_table}').#{f}")
464
465
# this breaks the changefeed -- client must recreate it; nothing further will work at all.
466
_fail: (err) =>
467
if @_closed
468
return
469
dbg = @_dbg("_fail")
470
dbg("err='#{err}'")
471
@emit('error', new Error(err))
472
@close()
473
474
close: (cb) =>
475
if @_closed
476
cb?()
477
return
478
@_closed = true
479
@emit('close', {action:'close'})
480
@removeAllListeners()
481
@_db.removeListener(@_tgname, @_handle_change)
482
@_db.removeListener('connect', @close)
483
@_db._stop_listening(@_table, @_select, @_watch, cb)
484
delete @_tgname
485
delete @_condition
486
cb?()
487
488
_old_val: (result, action, mesg) =>
489
# include only changed fields if action is 'update'
490
if action == 'update'
491
old_val = {}
492
for field, val of mesg[1]
493
old = mesg[2][field]
494
if val != old
495
old_val[field] = old
496
if misc.len(old_val) > 0
497
result.old_val = old_val
498
499
_handle_change: (mesg) =>
500
#console.log '_handle_change', mesg
501
if mesg[0] == 'DELETE'
502
if not @_match_condition(mesg[2])
503
return
504
@emit('change', {action:'delete', old_val:mesg[2]})
505
else
506
action = "#{mesg[0].toLowerCase()}"
507
if not @_match_condition(mesg[1])
508
if action != 'update'
509
return
510
for k, v of mesg[1]
511
if not mesg[2][k]?
512
mesg[2][k] = v
513
if @_match_condition(mesg[2])
514
@emit('change', {action:'delete', old_val:mesg[2]})
515
return
516
if @_watch.length == 0
517
r = {action:action, new_val:mesg[1]}
518
@_old_val(r, action, mesg)
519
@emit('change', r)
520
return
521
where = {}
522
for k, v of mesg[1]
523
where["#{k} = $"] = v
524
@_db._query
525
select: @_watch
526
table : @_table
527
where : where
528
cb : one_result (err, result) =>
529
if err
530
@_fail(err)
531
return
532
if not result?
533
# This happens when record isn't deleted, but some
534
# update results in the object being removed from our
535
# selection criterion... which we view as "delete".
536
@emit('change', {action:'delete', old_val:mesg[1]})
537
return
538
r = {action:action, new_val:misc.merge(result, mesg[1])}
539
@_old_val(r, action, mesg)
540
@emit('change', r)
541
542
insert: (where) =>
543
where0 = {}
544
for k, v of where
545
where0["#{k} = $"] = v
546
@_db._query
547
select : @_watch.concat(misc.keys(@_select))
548
table : @_table
549
where : where0
550
cb : all_results (err, results) =>
551
## Useful for testing that the @_fail thing below actually works.
552
##if Math.random() < .7
553
## err = "simulated error"
554
if err
555
@_dbg("insert")("FAKE ERROR!")
556
@_fail(err) # this is game over
557
return
558
else
559
for x in results
560
if @_match_condition(x)
561
misc.map_mutate_out_undefined(x)
562
@emit('change', {action:'insert', new_val:x})
563
delete: (where) =>
564
# listener is meant to delete everything that *matches* the where, so
565
# there is no need to actually do a query.
566
@emit('change', {action:'delete', old_val:where})
567
568
_init_where: =>
569
if typeof(@_where) == 'function'
570
# user provided function
571
@_match_condition = @_where
572
return
573
if misc.is_object(@_where)
574
w = [@_where]
575
else
576
w = @_where
577
578
@_condition = {}
579
add_condition = (field, op, val) =>
580
field = field.trim()
581
if field[0] == '"' # de-quote
582
field = field.slice(1,field.length-1)
583
if not @_select[field]?
584
throw Error("'#{field}' must be in select")
585
if misc.is_object(val)
586
throw Error("val (=#{misc.to_json(val)}) must not be an object")
587
if misc.is_array(val)
588
if op == '=' or op == '=='
589
# containment
590
f = (x) ->
591
for v in val
592
if x == v
593
return true
594
return false
595
else if op == '!=' or op == '<>'
596
# not contained in
597
f = (x) ->
598
for v in val
599
if x == v
600
return false
601
return true
602
else
603
throw Error("if val is an array, then op must be = or !=")
604
else if misc.is_date(val)
605
# Inputs to condition come back as JSON, which doesn't know
606
# about timestamps, so we convert them to date objects.
607
if op in ['=', '==']
608
f = (x) -> (new Date(x) - val == 0)
609
else if op in ['!=', '<>']
610
f = (x) -> (new Date(x) - val != 0)
611
else
612
g = misc.op_to_function(op)
613
f = (x) -> g(new Date(x), val)
614
else
615
g = misc.op_to_function(op)
616
f = (x) -> g(x, val)
617
@_condition[field] = f
618
619
for obj in w
620
if misc.is_object(obj)
621
for k, val of obj
622
###
623
k should be of one of the following forms
624
- "field op $::TYPE"
625
- "field op $" or
626
- "field op any($)"
627
- 'field' (defaults to =)
628
where op is one of =, <, >, <=, >=, !=
629
630
val must be:
631
- something where javascript === and comparisons works as you expect!
632
- or an array, in which case op must be = or !=, and we ALWAYS do inclusion (analogue of any).
633
###
634
found = false
635
for op in misc.operators
636
i = k.indexOf(op)
637
if i != -1
638
add_condition(k.slice(0, i).trim(), op, val)
639
found = true
640
break
641
if not found
642
throw Error("unable to parse '#{k}'")
643
else if typeof(obj) == 'string'
644
found = false
645
for op in misc.operators
646
i = obj.indexOf(op)
647
if i != -1
648
add_condition(obj.slice(0, i), op, eval(obj.slice(i+op.length).trim()))
649
found = true
650
break
651
if not found
652
throw Error("unable to parse '#{obj}'")
653
else
654
throw Error("NotImplementedError")
655
if misc.len(@_condition) == 0
656
delete @_condition
657
658
@_match_condition = (obj) =>
659
#console.log '_match_condition', obj
660
if not @_condition?
661
return true
662
for field, f of @_condition
663
if not f(obj[field])
664
#console.log 'failed due to field ', field
665
return false
666
return true
667
668
class SyncTable extends EventEmitter
669
constructor: (@_db, @_table, @_columns, @_where, @_where_function, @_limit, @_order_by, cb) ->
670
t = SCHEMA[@_table]
671
if not t?
672
@_state = 'error'
673
cb("unknown table #{@_table}")
674
return
675
676
try
677
@_primary_key = @_db._primary_key(@_table)
678
catch e
679
cb(e)
680
return
681
682
@_listen_columns = {"#{@_primary_key}" : pg_type(t.fields[@_primary_key], @_primary_key)}
683
684
# We only trigger an update when one of the columns we care about actually changes.
685
686
if @_columns
687
@_watch_columns = misc.copy(@_columns) # don't include primary key since it can't change.
688
if @_primary_key not in @_columns
689
@_columns = @_columns.concat([@_primary_key]) # required
690
@_select_columns = @_columns
691
else
692
@_watch_columns = [] # means all of them
693
@_select_columns = misc.keys(SCHEMA[@_table].fields)
694
695
@_select_query = "SELECT #{(quote_field(x) for x in @_select_columns)} FROM #{@_table}"
696
697
#@_update = underscore.throttle(@_update, 500)
698
699
@_init (err) => cb(err, @)
700
701
_dbg: (f) =>
702
return @_db._dbg("SyncTable(table='#{@_table}').#{f}")
703
704
_query_opts: () =>
705
opts = {}
706
opts.query = @_select_query
707
opts.where = @_where
708
opts.limit = @_limit
709
opts.order_by = @_order_by
710
return opts
711
712
close: (cb) =>
713
@removeAllListeners()
714
@_db.removeListener(@_tgname, @_notification)
715
@_db.removeListener('connect', @_reconnect)
716
delete @_value
717
@_state = 'closed'
718
@_db._stop_listening(@_table, @_listen_columns, @_watch_columns, cb)
719
720
connect: (opts) =>
721
opts?.cb?() # NO-OP -- only needed for backward compatibility
722
723
_notification: (obj) =>
724
#console.log 'notification', obj
725
[action, new_val, old_val] = obj
726
if action == 'DELETE' or not new_val?
727
k = old_val[@_primary_key]
728
if @_value.has(k)
729
@_value = @_value.delete(k)
730
process.nextTick(=>@emit('change', k))
731
else
732
k = new_val[@_primary_key]
733
if @_where_function? and not @_where_function(k)
734
# doesn't match -- nothing to do -- ignore
735
return
736
@_changed[k] = true
737
@_update()
738
739
_init: (cb) =>
740
misc.retry_until_success
741
f : @_do_init
742
start_delay : 3000
743
max_delay : 10000
744
log : @_dbg("_init")
745
cb : cb
746
747
_do_init: (cb) =>
748
@_state = 'init' # 'init' -> ['error', 'ready'] -> 'closed'
749
@_value = immutable.Map()
750
@_changed = {}
751
async.series([
752
(cb) =>
753
# ensure database client is listening for primary keys changes to our table
754
@_db._listen @_table, @_listen_columns, @_watch_columns, (err, tgname) =>
755
@_tgname = tgname
756
@_db.on(@_tgname, @_notification)
757
cb(err)
758
(cb) =>
759
opts = @_query_opts()
760
opts.cb = (err, result) =>
761
if err
762
cb(err)
763
else
764
@_process_results(result.rows)
765
@_db.once('connect', @_reconnect)
766
cb()
767
@_db._query(opts)
768
(cb) =>
769
@_update(cb)
770
], (err) =>
771
if err
772
@_state = 'error'
773
cb(err)
774
else
775
@_state = 'ready'
776
cb()
777
)
778
779
_reconnect: (cb) =>
780
dbg = @_dbg("_reconnect")
781
if @_state != 'ready'
782
dbg("only attempt reconnect if we were already successfully connected at some point.")
783
return
784
# Everything was already initialized, but then the connection to the
785
# database was dropped... and then successfully re-connected. Now
786
# we need to (1) setup everything again, and (2) send out notifications
787
# about anything in the table that changed.
788
789
dbg("Save state from before disconnect")
790
before = @_value
791
792
dbg("Clean up everything.")
793
@_db.removeListener(@_tgname, @_notification)
794
@_db.removeListener('connect', @_reconnect)
795
delete @_value
796
797
dbg("connect and initialize")
798
@_init (err) =>
799
if err
800
cb?(err)
801
return
802
dbg("notify about anything that changed when we were disconnected")
803
before.map (v, k) =>
804
if not v.equals(@_value.get(k))
805
@emit('change', k)
806
@_value.map (v, k) =>
807
if not before.has(k)
808
@emit('change', k)
809
810
_process_results: (rows) =>
811
if @_state == 'closed'
812
return
813
for x in rows
814
k = x[@_primary_key]
815
v = immutable.fromJS(misc.map_without_undefined(x))
816
if not v.equals(@_value.get(k))
817
@_value = @_value.set(k, v)
818
if @_state == 'ready' # only send out change notifications after ready.
819
process.nextTick(=>@emit('change', k))
820
821
# Grab any entries from table about which we have been notified of changes.
822
_update: (cb) =>
823
if misc.len(@_changed) == 0 # nothing to do
824
cb?()
825
return
826
changed = @_changed
827
@_changed = {} # reset changed set -- could get modified during query below, which is fine.
828
if @_select_columns.length == 1 # special case where we don't have to query for more info
829
@_process_results((("#{@_primary_key}" : x) for x in misc.keys(changed)))
830
cb?()
831
return
832
833
# Have to query to get actual changed data.
834
@_db._query
835
query : @_select_query
836
where : misc.merge("#{@_primary_key} = ANY($)" : misc.keys(changed), @_where)
837
cb : (err, result) =>
838
if err
839
@_dbg("update")("error #{err}")
840
for k of changed
841
@_changed[k] = true # will try again later
842
else
843
@_process_results(result.rows)
844
cb?()
845
846
get: (key) =>
847
return if key? then @_value?.get(key) else @_value
848
849
getIn: (x) =>
850
return @_value?.getIn(x)
851
852
has: (key) =>
853
return @_value?.has(key)
854
855
# wait until some function of this synctable is truthy
856
wait: (opts) =>
857
opts = defaults opts,
858
until : required # waits until "until(@)" evaluates to something truthy
859
timeout : 30 # in *seconds* -- set to 0 to disable (sort of DANGEROUS if 0, obviously.)
860
cb : required # cb(undefined, until(@)) on success and cb('timeout') on failure due to timeout
861
x = opts.until(@)
862
if x
863
opts.cb(undefined, x) # already true
864
return
865
fail_timer = undefined
866
f = =>
867
x = opts.until(@)
868
if x
869
@removeListener('change', f)
870
if fail_timer?
871
clearTimeout(fail_timer)
872
fail_timer = undefined
873
opts.cb(undefined, x)
874
@on('change', f)
875
if opts.timeout
876
fail = =>
877
@removeListener('change', f)
878
opts.cb('timeout')
879
fail_timer = setTimeout(fail, 1000*opts.timeout)
880
return
881
882
###
883
Trigger functions
884
###
885
trigger_name = (table, select, watch) ->
886
if not misc.is_object(select)
887
throw Error("trigger_name -- columns must be a map of colname:type")
888
c = misc.keys(select)
889
c.sort()
890
watch = misc.copy(watch)
891
watch.sort()
892
if watch.length > 0
893
c.push('|')
894
c = c.concat(watch)
895
return 'change_' + misc_node.sha1("#{table} #{c.join(' ')}").slice(0,16)
896
897
###
898
INPUT:
899
table -- name of a table
900
select -- map from field names (of table) to their postgres types
901
change -- array of field names (of table)
902
903
Creates a trigger function that fires whenever any of the given
904
columns changes, and sends the columns in select out as a notification.
905
###
906
907
trigger_code = (table, select, watch) ->
908
tgname = trigger_name(table, select, watch)
909
column_decl_old = ("#{field}_old #{type ? 'text'};" for field, type of select)
910
column_decl_new = ("#{field}_new #{type ? 'text'};" for field, type of select)
911
assign_old = ("#{field}_old = OLD.#{field};" for field, _ of select)
912
assign_new = ("#{field}_new = NEW.#{field};" for field, _ of select)
913
build_obj_old = ("'#{field}', #{field}_old" for field, _ of select)
914
build_obj_new = ("'#{field}', #{field}_new" for field, _ of select)
915
if watch.length > 0
916
no_change = ("OLD.#{field} = NEW.#{field}" for field in watch.concat(misc.keys(select))).join(' AND ')
917
else
918
no_change = 'FALSE'
919
if watch.length > 0
920
x = {}
921
for k in watch
922
x[k] = true
923
for k in misc.keys(select)
924
x[k] = true
925
update_of = "OF #{(quote_field(field) for field in misc.keys(x)).join(',')}"
926
else
927
update_of = ""
928
code = {}
929
code.function = """
930
CREATE OR REPLACE FUNCTION #{tgname}() RETURNS TRIGGER AS $$
931
DECLARE
932
notification json;
933
obj_old json;
934
obj_new json;
935
#{column_decl_old.join('\n')}
936
#{column_decl_new.join('\n')}
937
BEGIN
938
-- TG_OP is 'DELETE', 'INSERT' or 'UPDATE'
939
IF TG_OP = 'DELETE' THEN
940
#{assign_old.join('\n')}
941
obj_old = json_build_object(#{build_obj_old.join(',')});
942
END IF;
943
IF TG_OP = 'INSERT' THEN
944
#{assign_new.join('\n')}
945
obj_new = json_build_object(#{build_obj_new.join(',')});
946
END IF;
947
IF TG_OP = 'UPDATE' THEN
948
IF #{no_change} THEN
949
RETURN NULL;
950
END IF;
951
#{assign_old.join('\n')}
952
obj_old = json_build_object(#{build_obj_old.join(',')});
953
#{assign_new.join('\n')}
954
obj_new = json_build_object(#{build_obj_new.join(',')});
955
END IF;
956
notification = json_build_array(TG_OP, obj_new, obj_old);
957
PERFORM pg_notify('#{tgname}', notification::text);
958
RETURN NULL;
959
END;
960
$$ LANGUAGE plpgsql;"""
961
code.trigger = "CREATE TRIGGER #{tgname} AFTER INSERT OR DELETE OR UPDATE #{update_of} ON #{table} FOR EACH ROW EXECUTE PROCEDURE #{tgname}();"
962
return code
963
964
###
965
966
NOTES: The following is a way to back the changes with a small table.
967
This allows to have changes which are larger than the hard 8000 bytes limit.
968
HSY did this with the idea of having a temporary workaround for a bug related to this.
969
https://github.com/sagemathinc/cocalc/issues/1718
970
971
1. Create a table trigger_notifications via the db-schema.
972
For performance reasons, the table itself should be created with "UNLOGGED"
973
see: https://www.postgresql.org/docs/current/static/sql-createtable.html
974
(I've no idea how to specify that in the code here)
975
976
schema.trigger_notifications =
977
primary_key : 'id'
978
fields:
979
id:
980
type : 'uuid'
981
desc : 'primary key'
982
time:
983
type : 'timestamp'
984
desc : 'time of when the change was created -- used for TTL'
985
notification:
986
type : 'map'
987
desc : "notification payload -- up to 1GB"
988
pg_indexes : [ 'time' ]
989
990
2. Modify the trigger function created by trigger_code above such that
991
pg_notifies no longer contains the data structure,
992
but a UUID for an entry in the trigger_notifications table.
993
It creates that UUID on its own and stores the data via a normal insert.
994
995
notification_id = md5(random()::text || clock_timestamp()::text)::uuid;
996
notification = json_build_array(TG_OP, obj_new, obj_old);
997
INSERT INTO trigger_notifications(id, time, notification)
998
VALUES(notification_id, NOW(), notification);
999
1000
3. PostgresQL::_notification is modified in such a way, that it looks up that UUID
1001
in the trigger_notifications table:
1002
1003
@_query
1004
query: "SELECT notification FROM trigger_notifications WHERE id ='#{mesg.payload}'"
1005
cb : (err, result) =>
1006
if err
1007
dbg("err=#{err}")
1008
else
1009
payload = result.rows[0].notification
1010
# dbg("payload: type=#{typeof(payload)}, data=#{misc.to_json(payload)}")
1011
@emit(mesg.channel, payload)
1012
1013
Fortunately, there is no string -> json conversion necessary.
1014
1015
4. Below, that function and trigger implement a TTL for the trigger_notifications table.
1016
The `date_trunc` is a good idea, because then there is just one lock + delete op
1017
per minute, instead of potentially at every write.
1018
1019
-- 10 minutes TTL for the trigger_notifications table, deleting only every full minute
1020
1021
CREATE FUNCTION delete_old_trigger_notifications() RETURNS trigger
1022
LANGUAGE plpgsql
1023
AS $$
1024
BEGIN
1025
DELETE FROM trigger_notifications
1026
WHERE time < date_trunc('minute', NOW() - '10 minute'::interval);
1027
RETURN NULL;
1028
END;
1029
$$;
1030
1031
-- creating the trigger
1032
1033
CREATE TRIGGER trigger_delete_old_trigger_notifications
1034
AFTER INSERT ON trigger_notifications
1035
EXECUTE PROCEDURE delete_old_trigger_notifications();
1036
1037
###
1038
1039
1040