Contact
CoCalc Logo Icon
StoreFeaturesDocsShareSupport News AboutSign UpSign In
| Download
Views: 39539
1
###
2
PostgreSQL -- implementation of queries needed for storage and managing blobs,
3
including backups, integration with google cloud storage, etc.
4
5
COPYRIGHT : (c) 2017 SageMath, Inc.
6
LICENSE : AGPLv3
7
###
8
9
# Bucket used for cheaper longterm storage of blobs (outside of PostgreSQL).
10
# NOTE: We should add this to site configuration, and have it get read once when first
11
# needed and cached. Also it would be editable in admin account settings.
12
# If this env variable begins with a / it is assumed to be a path in the filesystem,
13
# e.g., a remote mount (in practice, we are using gcsfuse to mount gcloud buckets).
14
# If it is gs:// then it is a google cloud storage bucket.
15
COCALC_BLOB_STORE = process.env.COCALC_BLOB_STORE
16
17
async = require('async')
18
snappy = require('snappy')
19
zlib = require('zlib')
20
fs = require('fs')
21
22
misc_node = require('smc-util-node/misc_node')
23
24
{defaults} = misc = require('smc-util/misc')
25
required = defaults.required
26
27
{expire_time, one_result, all_results, PostgreSQL} = require('./postgres')
28
29
{filesystem_bucket} = require('./filesystem-bucket')
30
31
class exports.PostgreSQL extends PostgreSQL
32
save_blob: (opts) =>
33
opts = defaults opts,
34
uuid : undefined # uuid=sha1-based id coming from blob
35
blob : required # unless check=true, we assume misc_node.uuidsha1(opts.blob) == opts.uuid;
36
# blob must be a string or Buffer
37
ttl : 0 # object in blobstore will have *at least* this ttl in seconds;
38
# if there is already something in blobstore with longer ttl, we leave it;
39
# infinite ttl = 0.
40
project_id : required # the id of the project that is saving the blob
41
check : false # if true, will give error if misc_node.uuidsha1(opts.blob) != opts.uuid
42
compress : undefined # optional compression to use: 'gzip', 'zlib', 'snappy'; only used if blob not already in db.
43
level : -1 # compression level (if compressed) -- see https://github.com/expressjs/compression#level
44
cb : required # cb(err, ttl actually used in seconds); ttl=0 for infinite ttl
45
if not Buffer.isBuffer(opts.blob)
46
# CRITICAL: We assume everywhere below that opts.blob is a
47
# buffer, e.g., in the .toString('hex') method!
48
opts.blob = new Buffer(opts.blob)
49
if not opts.uuid?
50
opts.uuid = misc_node.uuidsha1(opts.blob)
51
else if opts.check
52
uuid = misc_node.uuidsha1(opts.blob)
53
if uuid != opts.uuid
54
opts.cb("the sha1 uuid (='#{uuid}') of the blob must equal the given uuid (='#{opts.uuid}')")
55
return
56
if not misc.is_valid_uuid_string(opts.uuid)
57
opts.cb("uuid is invalid")
58
return
59
dbg = @_dbg("save_blob(uuid='#{opts.uuid}')")
60
dbg()
61
rows = ttl = undefined
62
async.series([
63
(cb) =>
64
@_query
65
query : 'SELECT expire FROM blobs'
66
where : "id = $::UUID" : opts.uuid
67
cb : (err, x) =>
68
rows = x.rows; cb(err)
69
(cb) =>
70
if rows.length == 0 and opts.compress
71
dbg("compression requested and blob not already saved, so we compress blob")
72
switch opts.compress
73
when 'gzip'
74
zlib.gzip opts.blob, {level:opts.level}, (err, blob) =>
75
opts.blob = blob; cb(err)
76
when 'zlib'
77
zlib.deflate opts.blob, {level:opts.level}, (err, blob) =>
78
opts.blob = blob; cb(err)
79
when 'snappy'
80
snappy.compress opts.blob, (err, blob) =>
81
opts.blob = blob; cb(err)
82
else
83
cb("compression format '#{opts.compress}' not implemented")
84
else
85
cb()
86
(cb) =>
87
if rows.length == 0
88
dbg("nothing in DB, so we insert the blob.")
89
ttl = opts.ttl
90
@_query
91
query : "INSERT INTO blobs"
92
values :
93
id : opts.uuid
94
blob : '\\x'+opts.blob.toString('hex')
95
project_id : opts.project_id
96
count : 0
97
size : opts.blob.length
98
created : new Date()
99
compress : opts.compress
100
expire : if ttl then expire_time(ttl)
101
cb : cb
102
else
103
dbg("blob already in the DB, so see if we need to change the expire time")
104
@_extend_blob_ttl
105
expire : rows[0].expire
106
ttl : opts.ttl
107
uuid : opts.uuid
108
cb : (err, _ttl) =>
109
ttl = _ttl; cb(err)
110
], (err) => opts.cb(err, ttl))
111
112
# Used internally by save_blob to possibly extend the expire time of a blob.
113
_extend_blob_ttl : (opts) =>
114
opts = defaults opts,
115
expire : undefined # what expire is currently set to in the database
116
ttl : required # requested ttl -- extend expire to at least this
117
uuid : required
118
cb : required # (err, effective ttl (with 0=oo))
119
if not misc.is_valid_uuid_string(opts.uuid)
120
opts.cb("uuid is invalid")
121
return
122
if not opts.expire
123
# ttl already infinite -- nothing to do
124
opts.cb(undefined, 0)
125
return
126
new_expire = ttl = undefined
127
if opts.ttl
128
# saved ttl is finite as is requested one; change in DB if requested is longer
129
z = expire_time(opts.ttl)
130
if z > opts.expire
131
new_expire = z
132
ttl = opts.ttl
133
else
134
ttl = (opts.expire - new Date())/1000.0
135
else
136
# saved ttl is finite but requested one is infinite
137
ttl = new_expire = 0
138
if new_expire?
139
# change the expire time for the blob already in the DB
140
@_query
141
query : 'UPDATE blobs'
142
where : "id = $::UUID" : opts.uuid
143
set : "expire :: TIMESTAMP " : if new_expire == 0 then undefined else new_expire
144
cb : (err) => opts.cb(err, ttl)
145
else
146
opts.cb(undefined, ttl)
147
148
get_blob: (opts) =>
149
opts = defaults opts,
150
uuid : required
151
save_in_db : false # if true and blob isn't in DB and is only in gcloud, copies to local DB
152
# (for faster access e.g., 20ms versus 5ms -- i.e., not much faster; gcloud is FAST too.)
153
touch : true
154
cb : required # cb(err) or cb(undefined, blob_value) or cb(undefined, undefined) in case no such blob
155
if not misc.is_valid_uuid_string(opts.uuid)
156
opts.cb("uuid is invalid")
157
return
158
x = undefined
159
blob = undefined
160
async.series([
161
(cb) =>
162
@_query
163
query : "SELECT expire, blob, gcloud, compress FROM blobs"
164
where : "id = $::UUID" : opts.uuid
165
cb : one_result (err, _x) =>
166
x = _x; cb(err)
167
(cb) =>
168
if not x?
169
# nothing to do -- blob not in db (probably expired)
170
cb()
171
else if x.expire and x.expire <= new Date()
172
# the blob already expired -- background delete it
173
@_query # delete it (but don't wait for this to finish)
174
query : "DELETE FROM blobs"
175
where : "id = $::UUID" : opts.uuid
176
cb()
177
else if x.blob?
178
# blob not expired and is in database
179
blob = x.blob
180
cb()
181
else if x.gcloud
182
if not COCALC_BLOB_STORE?
183
cb("no blob store configured -- set the COCALC_BLOB_STORE env variable")
184
return
185
# blob not available locally, but should be in a Google cloud storage bucket -- try to get it
186
# NOTE: we now ignore the actual content of x.gcloud -- we don't support spreading blobs
187
# across multiple buckets... as it isn't needed because buckets are infinite, and it
188
# is potentially confusing to manage.
189
@blob_store().read
190
name : opts.uuid
191
cb : (err, _blob) =>
192
if err
193
cb(err)
194
else
195
blob = _blob
196
cb()
197
if opts.save_in_db
198
# also save in database so will be faster next time (again, don't wait on this)
199
@_query # delete it (but don't wait for this to finish)
200
query : "UPDATE blobs"
201
set : {blob : blob}
202
where : "id = $::UUID" : opts.uuid
203
else
204
# blob not local and not in gcloud -- this shouldn't happen
205
# (just view this as "expired" by not setting blob)
206
cb()
207
(cb) =>
208
if not blob? or not x?.compress?
209
cb(); return
210
# blob is compressed -- decompress it
211
switch x.compress
212
when 'gzip'
213
zlib.gunzip blob, (err, _blob) =>
214
blob = _blob; cb(err)
215
when 'zlib'
216
zlib.inflate blob, (err, _blob) =>
217
blob = _blob; cb(err)
218
when 'snappy'
219
snappy.uncompress blob, (err, _blob) =>
220
blob = _blob; cb(err)
221
else
222
cb("compression format '#{x.compress}' not implemented")
223
], (err) =>
224
opts.cb(err, blob)
225
if blob? and opts.touch
226
# blob was pulled from db or gcloud, so note that it was accessed (updates a counter)
227
@touch_blob(uuid : opts.uuid)
228
)
229
230
touch_blob: (opts) =>
231
opts = defaults opts,
232
uuid : required
233
cb : undefined
234
if not misc.is_valid_uuid_string(opts.uuid)
235
opts.cb?("uuid is invalid")
236
return
237
@_query
238
query : "UPDATE blobs SET count = count + 1, last_active = NOW()"
239
where : "id = $::UUID" : opts.uuid
240
cb : opts.cb
241
242
# Return gcloud API interface
243
gcloud: () =>
244
return @_gcloud ?= require('./smc_gcloud').gcloud()
245
246
blob_store: (bucket) =>
247
if not bucket
248
bucket = COCALC_BLOB_STORE
249
if misc.startswith(bucket, 'gs://')
250
# Google Cloud Storage -- only works if hub has full direct gcloud storage API access, so
251
# NOT in KuCalc or Docker or really anywhere anymore...
252
return @gcloud().bucket(name: bucket.slice('gs://'.length))
253
else
254
# Filesystem -- could be a big NFS volume, remotely mounted gcsfuse, or just
255
# a single big local filesystem -- etc. -- we don't care.
256
return filesystem_bucket(name: bucket)
257
258
# Uploads the blob with given sha1 uuid to gcloud storage, if it hasn't already
259
# been uploaded there.
260
copy_blob_to_gcloud: (opts) =>
261
opts = defaults opts,
262
uuid : required # uuid=sha1-based uuid coming from blob
263
bucket : COCALC_BLOB_STORE # name of bucket
264
force : false # if true, upload even if already uploaded
265
remove : false # if true, deletes blob from database after successful upload to gcloud (to free space)
266
cb : undefined # cb(err)
267
if not misc.is_valid_uuid_string(opts.uuid)
268
opts.cb?("uuid is invalid")
269
return
270
if not opts.bucket
271
opts.cb?("no blob store configured -- set the COCALC_BLOB_STORE env variable")
272
return
273
x = undefined
274
async.series([
275
(cb) =>
276
@_query
277
query : "SELECT blob, gcloud FROM blobs"
278
where : "id = $::UUID" : opts.uuid
279
cb : one_result (err, _x) =>
280
x = _x
281
if err
282
cb(err)
283
else if not x?
284
cb('no such blob')
285
else if not x.blob and not x.gcloud
286
cb('blob not available -- this should not be possible')
287
else if not x.blob and opts.force
288
cb("blob can't be re-uploaded since it was already deleted")
289
else
290
cb()
291
(cb) =>
292
if x.gcloud? and not opts.force
293
# already uploaded -- don't need to do anything
294
cb(); return
295
if not x.blob?
296
# blob already deleted locally
297
cb(); return
298
# upload to Google cloud storage
299
@blob_store(opts.bucket).write
300
name : opts.uuid
301
content : x.blob
302
cb : cb
303
(cb) =>
304
if not x.blob?
305
# no blob in db; nothing further to do.
306
cb()
307
else
308
# We successful upload to gcloud -- set x.gcloud
309
set = {gcloud: opts.bucket}
310
if opts.remove
311
set.blob = null # remove blob content from database to save space
312
@_query
313
query : "UPDATE blobs"
314
where : "id = $::UUID" : opts.uuid
315
set : set
316
cb : cb
317
], (err) => opts.cb?(err))
318
319
###
320
Backup limit blobs that previously haven't been dumped to blobs, and put them in
321
a tarball in the given path. The tarball's name is the time when the backup starts.
322
The tarball is compressed using gzip compression.
323
324
db._error_thresh=1e6; db.backup_blobs_to_tarball(limit:10000,path:'/backup/tmp-blobs',repeat_until_done:60, cb:done())
325
326
I have not written code to restore from these tarballs. Assuming the database has been restored,
327
so there is an entry in the blobs table for each blob, it would suffice to upload the tarballs,
328
then copy their contents straight into the COCALC_BLOB_STORE, and that’s it.
329
If we don't have the blobs table in the DB, make dummy entries from the blob names in the tarballs.
330
###
331
backup_blobs_to_tarball: (opts) =>
332
opts = defaults opts,
333
limit : 10000 # number of blobs to backup
334
path : required # path where [timestamp].tar file is placed
335
throttle : 0 # wait this many seconds between pulling blobs from database
336
repeat_until_done : 0 # if positive, keeps re-call'ing this function until no more
337
# results to backup (pauses this many seconds between)
338
map_limit : 5
339
cb : undefined# cb(err, '[timestamp].tar')
340
dbg = @_dbg("backup_blobs_to_tarball(limit=#{opts.limit},path='#{opts.path}')")
341
join = require('path').join
342
dir = misc.date_to_snapshot_format(new Date())
343
target = join(opts.path, dir)
344
tarball = target + '.tar.gz'
345
v = undefined
346
to_remove = []
347
async.series([
348
(cb) =>
349
dbg("make target='#{target}'")
350
fs.mkdir(target, cb)
351
(cb) =>
352
dbg("get blobs that we need to back up")
353
@_query
354
query : "SELECT id FROM blobs"
355
where : "expire IS NULL and backup IS NOT true"
356
limit : opts.limit
357
cb : all_results 'id', (err, x) =>
358
v = x; cb(err)
359
(cb) =>
360
dbg("backing up #{v.length} blobs")
361
f = (id, cb) =>
362
@get_blob
363
uuid : id
364
touch : false
365
cb : (err, blob) =>
366
if err
367
dbg("ERROR! blob #{id} -- #{err}")
368
cb(err)
369
else if blob?
370
dbg("got blob #{id} from db -- now write to disk")
371
to_remove.push(id)
372
fs.writeFile join(target, id), blob, (err) =>
373
if opts.throttle
374
setTimeout(cb, opts.throttle*1000)
375
else
376
cb()
377
else
378
dbg("blob #{id} is expired, so nothing to be done, ever.")
379
cb()
380
async.mapLimit(v, opts.map_limit, f, cb)
381
(cb) =>
382
dbg("successfully wrote all blobs to files; now make tarball")
383
misc_node.execute_code
384
command : 'tar'
385
args : ['zcvf', tarball, dir]
386
path : opts.path
387
timeout : 3600
388
cb : cb
389
(cb) =>
390
dbg("remove temporary blobs")
391
f = (x, cb) =>
392
fs.unlink(join(target, x), cb)
393
async.mapLimit(to_remove, 10, f, cb)
394
(cb) =>
395
dbg("remove temporary directory")
396
fs.rmdir(target, cb)
397
(cb) =>
398
dbg("backup succeeded completely -- mark all blobs as backed up")
399
@_query
400
query : "UPDATE blobs"
401
set : {backup: true}
402
where : "id = ANY($)" : v
403
cb : cb
404
], (err) =>
405
if err
406
dbg("ERROR: #{err}")
407
opts.cb?(err)
408
else
409
dbg("done")
410
if opts.repeat_until_done and to_remove.length == opts.limit
411
f = () =>
412
@backup_blobs_to_tarball(opts)
413
setTimeout(f, opts.repeat_until_done*1000)
414
else
415
opts.cb?(undefined, tarball)
416
)
417
418
###
419
Copied all blobs that will never expire to a google cloud storage bucket.
420
421
errors={}; db.copy_all_blobs_to_gcloud(limit:500, cb:done(), remove:true, repeat_until_done_s:10, errors:errors)
422
###
423
copy_all_blobs_to_gcloud: (opts) =>
424
opts = defaults opts,
425
bucket : COCALC_BLOB_STORE
426
limit : 1000 # copy this many in each batch
427
map_limit : 1 # copy this many at once.
428
throttle : 0 # wait this many seconds between uploads
429
repeat_until_done_s : 0 # if nonzero, waits this many seconds, then calls this function again until nothing gets uploaded.
430
errors : {} # used to accumulate errors
431
remove : false
432
cb : required
433
dbg = @_dbg("copy_all_blobs_to_gcloud")
434
dbg()
435
# This query selects the blobs that will never expire, but have not yet
436
# been copied to Google cloud storage.
437
dbg("getting blob id's...")
438
@_query
439
query : 'SELECT id, size FROM blobs'
440
where : "expire IS NULL AND gcloud IS NULL"
441
limit : opts.limit
442
cb : all_results (err, v) =>
443
if err
444
dbg("fail: #{err}")
445
opts.cb(err)
446
else
447
n = v.length; m = 0
448
dbg("got #{n} blob id's")
449
f = (x, cb) =>
450
m += 1
451
k = m; start = new Date()
452
dbg("**** #{k}/#{n}: uploading #{x.id} of size #{x.size/1000}KB")
453
@copy_blob_to_gcloud
454
uuid : x.id
455
bucket : opts.bucket
456
remove : opts.remove
457
cb : (err) =>
458
dbg("**** #{k}/#{n}: finished -- #{err}; size #{x.size/1000}KB; time=#{new Date() - start}ms")
459
if err
460
opts.errors[x.id] = err
461
if opts.throttle
462
setTimeout(cb, 1000*opts.throttle)
463
else
464
cb()
465
async.mapLimit v, opts.map_limit, f, (err) =>
466
dbg("finished this round -- #{err}")
467
if opts.repeat_until_done_s and v.length > 0
468
dbg("repeat_until_done triggering another round")
469
setTimeout((=> @copy_all_blobs_to_gcloud(opts)), opts.repeat_until_done_s*1000)
470
else
471
dbg("done : #{misc.to_json(opts.errors)}")
472
opts.cb(if misc.len(opts.errors) > 0 then opts.errors)
473
474
blob_maintenance: (opts) =>
475
opts = defaults opts,
476
path : '/backup/blobs'
477
map_limit : 1
478
blobs_per_tarball : 10000
479
throttle : 0
480
cb : undefined
481
dbg = @_dbg("blob_maintenance()")
482
dbg()
483
async.series([
484
(cb) =>
485
dbg("maintain the patches and syncstrings")
486
@syncstring_maintenance
487
repeat_until_done : true
488
limit : 500
489
map_limit : opts.map_limit
490
delay : 1000 # 1s, since syncstring_maintence heavily loads db
491
cb : cb
492
(cb) =>
493
dbg("backup_blobs_to_tarball")
494
@backup_blobs_to_tarball
495
throttle : opts.throttle
496
limit : opts.blobs_per_tarball
497
path : opts.path
498
map_limit : opts.map_limit
499
repeat_until_done : 5
500
cb : cb
501
(cb) =>
502
dbg("copy_all_blobs_to_gcloud")
503
errors = {}
504
@copy_all_blobs_to_gcloud
505
limit : 1000
506
repeat_until_done_s : 5
507
errors : errors
508
remove : true
509
map_limit : opts.map_limit
510
throttle : opts.throttle
511
cb : (err) =>
512
if misc.len(errors) > 0
513
dbg("errors! #{misc.to_json(errors)}")
514
cb(err)
515
], (err) =>
516
opts.cb?(err)
517
)
518
519
remove_blob_ttls: (opts) =>
520
opts = defaults opts,
521
uuids : required # uuid=sha1-based from blob
522
cb : required # cb(err)
523
@_query
524
query : "UPDATE blobs"
525
set : {expire: null}
526
where : "id::UUID = ANY($)" : (x for x in opts.uuids when misc.is_valid_uuid_string(x))
527
cb : opts.cb
528
529
# If blob has been copied to gcloud, remove the BLOB part of the data
530
# from the database (to save space). If not copied, copy it to gcloud,
531
# then remove from database.
532
close_blob: (opts) =>
533
opts = defaults opts,
534
uuid : required # uuid=sha1-based from blob
535
bucket : COCALC_BLOB_STORE
536
cb : undefined # cb(err)
537
if not misc.is_valid_uuid_string(opts.uuid)
538
opts.cb?("uuid is invalid")
539
return
540
async.series([
541
(cb) =>
542
# ensure blob is in gcloud
543
@_query
544
query : 'SELECT gcloud FROM blobs'
545
where : 'id = $::UUID' : opts.uuid
546
cb : one_result 'gcloud', (err, gcloud) =>
547
if err
548
cb(err)
549
else if not gcloud
550
# not yet copied to gcloud storage
551
@copy_blob_to_gcloud
552
uuid : opts.uuid
553
bucket : opts.bucket
554
cb : cb
555
else
556
# copied already
557
cb()
558
(cb) =>
559
# now blob is in gcloud -- delete blob data in database
560
@_query
561
query : 'SELECT gcloud FROM blobs'
562
where : 'id = $::UUID' : opts.uuid
563
set : {blob: null}
564
cb : cb
565
], (err) => opts.cb?(err))
566
567
568
569
###
570
# Syncstring maintainence
571
###
572
syncstring_maintenance: (opts) =>
573
opts = defaults opts,
574
age_days : 30 # archive patches of syncstrings that are inactive for at least this long
575
map_limit : 1 # how much parallelism to use
576
limit : 1000 # do only this many
577
repeat_until_done : true
578
delay : 0
579
cb : undefined
580
dbg = @_dbg("syncstring_maintenance")
581
dbg(opts)
582
syncstrings = undefined
583
async.series([
584
(cb) =>
585
dbg("determine inactive syncstring ids")
586
@_query
587
query : 'SELECT string_id FROM syncstrings'
588
where : [{'last_active <= $::TIMESTAMP' : misc.days_ago(opts.age_days)}, 'archived IS NULL']
589
limit : opts.limit
590
cb : all_results 'string_id', (err, v) =>
591
syncstrings = v
592
cb(err)
593
(cb) =>
594
dbg("archive patches for inactive syncstrings")
595
i = 0
596
f = (string_id, cb) =>
597
i += 1
598
console.log("*** #{i}/#{syncstrings.length}: archiving string #{string_id} ***")
599
@archive_patches
600
string_id : string_id
601
cb : (err) ->
602
if err or not opts.delay
603
cb(err)
604
else
605
setTimeout(cb, opts.delay)
606
async.mapLimit(syncstrings, opts.map_limit, f, cb)
607
], (err) =>
608
if err
609
opts.cb?(err)
610
else if opts.repeat_until_done and syncstrings.length == opts.limit
611
dbg("doing it again")
612
@syncstring_maintenance(opts)
613
else
614
opts.cb?()
615
)
616
617
# Offlines and archives the patch, unless the string is active very recently, in
618
# which case this is a no-op.
619
archive_patches: (opts) =>
620
opts = defaults opts,
621
string_id : required
622
compress : 'zlib'
623
level : -1 # the default
624
cutoff : misc.minutes_ago(30) # never touch anything this new
625
cb : undefined
626
dbg = @_dbg("archive_patches(string_id='#{opts.string_id}')")
627
syncstring = patches = blob_uuid = project_id = last_active =undefined
628
where = {"string_id = $::CHAR(40)" : opts.string_id}
629
async.series([
630
(cb) =>
631
dbg("get project_id")
632
@_query
633
query : "SELECT project_id, archived, last_active FROM syncstrings"
634
where : where
635
cb : one_result (err, x) =>
636
if err
637
cb(err)
638
else if not x?
639
cb("no such syncstring with id '#{opts.string_id}'")
640
else if x.archived
641
cb("already archived")
642
else
643
project_id = x.project_id
644
last_active = x.last_active
645
cb()
646
(cb) =>
647
if last_active? and last_active >= opts.cutoff
648
dbg("excluding due to cutoff")
649
cb(); return
650
dbg("get patches")
651
@export_patches
652
string_id : opts.string_id
653
cb : (err, x) =>
654
patches = x
655
cb(err)
656
(cb) =>
657
if last_active? and last_active >= opts.cutoff
658
cb(); return
659
dbg("create blob from patches")
660
try
661
blob = new Buffer(JSON.stringify(patches))
662
catch err
663
# TODO: This *will* happen if the total length of all patches is too big.
664
# need to break patches up...
665
# This is not exactly the end of the world as the entire point of all this is to
666
# just save some space in the database...
667
cb(err)
668
return
669
dbg('save blob')
670
blob_uuid = misc_node.uuidsha1(blob)
671
@save_blob
672
uuid : blob_uuid
673
blob : blob
674
project_id : project_id
675
compress : opts.compress
676
level : opts.level
677
cb : cb
678
(cb) =>
679
if last_active? and last_active >= opts.cutoff
680
cb(); return
681
dbg("update syncstring to indicate patches have been archived in a blob")
682
@_query
683
query : "UPDATE syncstrings"
684
set : {archived : blob_uuid}
685
where : where
686
cb : cb
687
(cb) =>
688
if last_active? and last_active >= opts.cutoff
689
cb(); return
690
dbg("actually delete patches")
691
@_query
692
query : "DELETE FROM patches"
693
where : where
694
cb : cb
695
], (err) => opts.cb?(err))
696
697
unarchive_patches: (opts) =>
698
opts = defaults opts,
699
string_id : required
700
cb : undefined
701
dbg = @_dbg("unarchive_patches(string_id='#{opts.string_id}')")
702
where = {"string_id = $::CHAR(40)" : opts.string_id}
703
@_query
704
query : "SELECT archived FROM syncstrings"
705
where : where
706
cb : one_result 'archived', (err, blob_uuid) =>
707
if err or not blob_uuid?
708
opts.cb?(err)
709
return
710
blob = undefined
711
async.series([
712
(cb) =>
713
dbg("download blob")
714
@get_blob
715
uuid : blob_uuid
716
cb : (err, x) =>
717
if err
718
cb(err)
719
else if not x?
720
cb("blob is gone")
721
else
722
blob = x
723
cb(err)
724
(cb) =>
725
dbg("extract blob")
726
try
727
patches = JSON.parse(blob)
728
catch e
729
cb("corrupt patches blob -- #{e}")
730
return
731
@import_patches
732
patches : patches
733
cb : cb
734
(cb) =>
735
async.parallel([
736
(cb) =>
737
dbg("update syncstring to indicate that patches are now available")
738
@_query
739
query : "UPDATE syncstrings SET archived=NULL"
740
where : where
741
cb : cb
742
(cb) =>
743
dbg('delete blob, which is no longer needed')
744
@delete_blob
745
uuid : blob_uuid
746
cb : cb
747
], cb)
748
], (err) => opts.cb?(err))
749
750
###
751
Export/import of syncstring history and info. Right now used mainly for debugging
752
purposes, but will obviously be useful for a user-facing feature involving import
753
and export (and copying) of complete edit history.
754
###
755
export_patches: (opts) =>
756
opts = defaults opts,
757
string_id : required
758
cb : required # cb(err, string)
759
@_query
760
query : "SELECT extract(epoch from time)*1000 as epoch, * FROM patches"
761
where : {"string_id = $::CHAR(40)" : opts.string_id}
762
cb : all_results (err, patches) =>
763
if err
764
opts.cb(err)
765
else
766
for p in patches
767
p.time = new Date(p.epoch)
768
delete p.epoch
769
opts.cb(undefined, patches)
770
771
import_patches: (opts) =>
772
opts = defaults opts,
773
patches : required # as exported by export_patches
774
string_id : undefined # if given, change the string_id when importing the patches to this
775
cb : undefined
776
patches = opts.patches
777
if patches.length == 0 # easy
778
opts.cb?()
779
return
780
if patches[0].id?
781
# convert from OLD RethinkDB format!
782
v = []
783
for x in patches
784
patch =
785
string_id : x.id[0]
786
time : new Date(x.id[1])
787
user_id : x.user
788
patch : x.patch
789
snapshot : x.snapshot
790
sent : x.sent
791
prev : x.prev
792
v.push(patch)
793
patches = v
794
# change string_id, if requested.
795
if opts.string_id?
796
for x in patches
797
x.string_id = opts.string_id
798
# We break into blocks since there is limit (about 65K) on
799
# number of params that can be inserted in a single query.
800
insert_block_size = 1000
801
f = (i, cb) =>
802
@_query
803
query : 'INSERT INTO patches'
804
values : patches.slice(insert_block_size*i, insert_block_size*(i+1))
805
conflict : 'ON CONFLICT DO NOTHING' # in case multiple servers (or this server) are doing this import at once -- this can and does happen sometimes.
806
cb : cb
807
async.mapSeries([0...patches.length/insert_block_size], f, (err) => opts.cb?(err))
808
809
export_syncstring: (opts) =>
810
opts = defaults opts,
811
string_id : required
812
cb : undefined
813
814
import_syncstring: (opts) =>
815
opts = defaults opts,
816
obj : required
817
cb : undefined
818
819
delete_blob: (opts) =>
820
opts = defaults opts,
821
uuid : required
822
cb : undefined
823
if not misc.is_valid_uuid_string(opts.uuid)
824
opts.cb?("uuid is invalid")
825
return
826
gcloud = undefined
827
dbg = @_dbg("delete_blob(uuid='#{opts.uuid}')")
828
async.series([
829
(cb) =>
830
dbg("check if blob in gcloud")
831
@_query
832
query : "SELECT gcloud FROM blobs"
833
where : "id = $::UUID" : opts.uuid
834
cb : one_result 'gcloud', (err, x) =>
835
gcloud = x
836
cb(err)
837
(cb) =>
838
if not gcloud or not COCALC_BLOB_STORE
839
cb()
840
return
841
dbg("delete from gcloud")
842
@blob_store(gcloud).delete
843
name : opts.uuid
844
cb : cb
845
(cb) =>
846
dbg("delete from local database")
847
@_query
848
query : "DELETE FROM blobs"
849
where : "id = $::UUID" : opts.uuid
850
cb : cb
851
], (err) => opts.cb?(err))
852
853
854