COCALC_BLOB_STORE = process.env.COCALC_BLOB_STORE
async = require('async')
snappy = require('snappy')
zlib = require('zlib')
fs = require('fs')
misc_node = require('smc-util-node/misc_node')
{defaults} = misc = require('smc-util/misc')
required = defaults.required
{expire_time, one_result, all_results, PostgreSQL} = require('./postgres')
{filesystem_bucket} = require('./filesystem-bucket')
class exports.PostgreSQL extends PostgreSQL
save_blob: (opts) =>
opts = defaults opts,
uuid : undefined
blob : required
ttl : 0
project_id : required
check : false
compress : undefined
level : -1
cb : required
if not Buffer.isBuffer(opts.blob)
opts.blob = new Buffer(opts.blob)
if not opts.uuid?
opts.uuid = misc_node.uuidsha1(opts.blob)
else if opts.check
uuid = misc_node.uuidsha1(opts.blob)
if uuid != opts.uuid
opts.cb("the sha1 uuid (='#{uuid}') of the blob must equal the given uuid (='#{opts.uuid}')")
return
if not misc.is_valid_uuid_string(opts.uuid)
opts.cb("uuid is invalid")
return
dbg = @_dbg("save_blob(uuid='#{opts.uuid}')")
dbg()
rows = ttl = undefined
async.series([
(cb) =>
@_query
query : 'SELECT expire FROM blobs'
where : "id = $::UUID" : opts.uuid
cb : (err, x) =>
rows = x.rows; cb(err)
(cb) =>
if rows.length == 0 and opts.compress
dbg("compression requested and blob not already saved, so we compress blob")
switch opts.compress
when 'gzip'
zlib.gzip opts.blob, {level:opts.level}, (err, blob) =>
opts.blob = blob; cb(err)
when 'zlib'
zlib.deflate opts.blob, {level:opts.level}, (err, blob) =>
opts.blob = blob; cb(err)
when 'snappy'
snappy.compress opts.blob, (err, blob) =>
opts.blob = blob; cb(err)
else
cb("compression format '#{opts.compress}' not implemented")
else
cb()
(cb) =>
if rows.length == 0
dbg("nothing in DB, so we insert the blob.")
ttl = opts.ttl
@_query
query : "INSERT INTO blobs"
values :
id : opts.uuid
blob : '\\x'+opts.blob.toString('hex')
project_id : opts.project_id
count : 0
size : opts.blob.length
created : new Date()
compress : opts.compress
expire : if ttl then expire_time(ttl)
cb : cb
else
dbg("blob already in the DB, so see if we need to change the expire time")
@_extend_blob_ttl
expire : rows[0].expire
ttl : opts.ttl
uuid : opts.uuid
cb : (err, _ttl) =>
ttl = _ttl; cb(err)
], (err) => opts.cb(err, ttl))
_extend_blob_ttl : (opts) =>
opts = defaults opts,
expire : undefined
ttl : required
uuid : required
cb : required
if not misc.is_valid_uuid_string(opts.uuid)
opts.cb("uuid is invalid")
return
if not opts.expire
opts.cb(undefined, 0)
return
new_expire = ttl = undefined
if opts.ttl
z = expire_time(opts.ttl)
if z > opts.expire
new_expire = z
ttl = opts.ttl
else
ttl = (opts.expire - new Date())/1000.0
else
ttl = new_expire = 0
if new_expire?
@_query
query : 'UPDATE blobs'
where : "id = $::UUID" : opts.uuid
set : "expire :: TIMESTAMP " : if new_expire == 0 then undefined else new_expire
cb : (err) => opts.cb(err, ttl)
else
opts.cb(undefined, ttl)
get_blob: (opts) =>
opts = defaults opts,
uuid : required
save_in_db : false
touch : true
cb : required
if not misc.is_valid_uuid_string(opts.uuid)
opts.cb("uuid is invalid")
return
x = undefined
blob = undefined
async.series([
(cb) =>
@_query
query : "SELECT expire, blob, gcloud, compress FROM blobs"
where : "id = $::UUID" : opts.uuid
cb : one_result (err, _x) =>
x = _x; cb(err)
(cb) =>
if not x?
cb()
else if x.expire and x.expire <= new Date()
@_query
query : "DELETE FROM blobs"
where : "id = $::UUID" : opts.uuid
cb()
else if x.blob?
blob = x.blob
cb()
else if x.gcloud
if not COCALC_BLOB_STORE?
cb("no blob store configured -- set the COCALC_BLOB_STORE env variable")
return
@blob_store().read
name : opts.uuid
cb : (err, _blob) =>
if err
cb(err)
else
blob = _blob
cb()
if opts.save_in_db
@_query
query : "UPDATE blobs"
set : {blob : blob}
where : "id = $::UUID" : opts.uuid
else
cb()
(cb) =>
if not blob? or not x?.compress?
cb(); return
switch x.compress
when 'gzip'
zlib.gunzip blob, (err, _blob) =>
blob = _blob; cb(err)
when 'zlib'
zlib.inflate blob, (err, _blob) =>
blob = _blob; cb(err)
when 'snappy'
snappy.uncompress blob, (err, _blob) =>
blob = _blob; cb(err)
else
cb("compression format '#{x.compress}' not implemented")
], (err) =>
opts.cb(err, blob)
if blob? and opts.touch
@touch_blob(uuid : opts.uuid)
)
touch_blob: (opts) =>
opts = defaults opts,
uuid : required
cb : undefined
if not misc.is_valid_uuid_string(opts.uuid)
opts.cb?("uuid is invalid")
return
@_query
query : "UPDATE blobs SET count = count + 1, last_active = NOW()"
where : "id = $::UUID" : opts.uuid
cb : opts.cb
gcloud: () =>
return @_gcloud ?= require('./smc_gcloud').gcloud()
blob_store: (bucket) =>
if not bucket
bucket = COCALC_BLOB_STORE
if misc.startswith(bucket, 'gs://')
return @gcloud().bucket(name: bucket.slice('gs://'.length))
else
return filesystem_bucket(name: bucket)
copy_blob_to_gcloud: (opts) =>
opts = defaults opts,
uuid : required
bucket : COCALC_BLOB_STORE
force : false
remove : false
cb : undefined
if not misc.is_valid_uuid_string(opts.uuid)
opts.cb?("uuid is invalid")
return
if not opts.bucket
opts.cb?("no blob store configured -- set the COCALC_BLOB_STORE env variable")
return
x = undefined
async.series([
(cb) =>
@_query
query : "SELECT blob, gcloud FROM blobs"
where : "id = $::UUID" : opts.uuid
cb : one_result (err, _x) =>
x = _x
if err
cb(err)
else if not x?
cb('no such blob')
else if not x.blob and not x.gcloud
cb('blob not available -- this should not be possible')
else if not x.blob and opts.force
cb("blob can't be re-uploaded since it was already deleted")
else
cb()
(cb) =>
if x.gcloud? and not opts.force
cb(); return
if not x.blob?
cb(); return
@blob_store(opts.bucket).write
name : opts.uuid
content : x.blob
cb : cb
(cb) =>
if not x.blob?
cb()
else
set = {gcloud: opts.bucket}
if opts.remove
set.blob = null
@_query
query : "UPDATE blobs"
where : "id = $::UUID" : opts.uuid
set : set
cb : cb
], (err) => opts.cb?(err))
backup_blobs_to_tarball: (opts) =>
opts = defaults opts,
limit : 10000
path : required
throttle : 0
repeat_until_done : 0
map_limit : 5
cb : undefined
dbg = @_dbg("backup_blobs_to_tarball(limit=#{opts.limit},path='#{opts.path}')")
join = require('path').join
dir = misc.date_to_snapshot_format(new Date())
target = join(opts.path, dir)
tarball = target + '.tar.gz'
v = undefined
to_remove = []
async.series([
(cb) =>
dbg("make target='#{target}'")
fs.mkdir(target, cb)
(cb) =>
dbg("get blobs that we need to back up")
@_query
query : "SELECT id FROM blobs"
where : "expire IS NULL and backup IS NOT true"
limit : opts.limit
cb : all_results 'id', (err, x) =>
v = x; cb(err)
(cb) =>
dbg("backing up #{v.length} blobs")
f = (id, cb) =>
@get_blob
uuid : id
touch : false
cb : (err, blob) =>
if err
dbg("ERROR! blob #{id} -- #{err}")
cb(err)
else if blob?
dbg("got blob #{id} from db -- now write to disk")
to_remove.push(id)
fs.writeFile join(target, id), blob, (err) =>
if opts.throttle
setTimeout(cb, opts.throttle*1000)
else
cb()
else
dbg("blob #{id} is expired, so nothing to be done, ever.")
cb()
async.mapLimit(v, opts.map_limit, f, cb)
(cb) =>
dbg("successfully wrote all blobs to files; now make tarball")
misc_node.execute_code
command : 'tar'
args : ['zcvf', tarball, dir]
path : opts.path
timeout : 3600
cb : cb
(cb) =>
dbg("remove temporary blobs")
f = (x, cb) =>
fs.unlink(join(target, x), cb)
async.mapLimit(to_remove, 10, f, cb)
(cb) =>
dbg("remove temporary directory")
fs.rmdir(target, cb)
(cb) =>
dbg("backup succeeded completely -- mark all blobs as backed up")
@_query
query : "UPDATE blobs"
set : {backup: true}
where : "id = ANY($)" : v
cb : cb
], (err) =>
if err
dbg("ERROR: #{err}")
opts.cb?(err)
else
dbg("done")
if opts.repeat_until_done and to_remove.length == opts.limit
f = () =>
@backup_blobs_to_tarball(opts)
setTimeout(f, opts.repeat_until_done*1000)
else
opts.cb?(undefined, tarball)
)
copy_all_blobs_to_gcloud: (opts) =>
opts = defaults opts,
bucket : COCALC_BLOB_STORE
limit : 1000
map_limit : 1
throttle : 0
repeat_until_done_s : 0
errors : {}
remove : false
cb : required
dbg = @_dbg("copy_all_blobs_to_gcloud")
dbg()
dbg("getting blob id's...")
@_query
query : 'SELECT id, size FROM blobs'
where : "expire IS NULL AND gcloud IS NULL"
limit : opts.limit
cb : all_results (err, v) =>
if err
dbg("fail: #{err}")
opts.cb(err)
else
n = v.length; m = 0
dbg("got #{n} blob id's")
f = (x, cb) =>
m += 1
k = m; start = new Date()
dbg("**** #{k}/#{n}: uploading #{x.id} of size #{x.size/1000}KB")
@copy_blob_to_gcloud
uuid : x.id
bucket : opts.bucket
remove : opts.remove
cb : (err) =>
dbg("**** #{k}/#{n}: finished -- #{err}; size #{x.size/1000}KB; time=#{new Date() - start}ms")
if err
opts.errors[x.id] = err
if opts.throttle
setTimeout(cb, 1000*opts.throttle)
else
cb()
async.mapLimit v, opts.map_limit, f, (err) =>
dbg("finished this round -- #{err}")
if opts.repeat_until_done_s and v.length > 0
dbg("repeat_until_done triggering another round")
setTimeout((=> @copy_all_blobs_to_gcloud(opts)), opts.repeat_until_done_s*1000)
else
dbg("done : #{misc.to_json(opts.errors)}")
opts.cb(if misc.len(opts.errors) > 0 then opts.errors)
blob_maintenance: (opts) =>
opts = defaults opts,
path : '/backup/blobs'
map_limit : 1
blobs_per_tarball : 10000
throttle : 0
cb : undefined
dbg = @_dbg("blob_maintenance()")
dbg()
async.series([
(cb) =>
dbg("maintain the patches and syncstrings")
@syncstring_maintenance
repeat_until_done : true
limit : 500
map_limit : opts.map_limit
delay : 1000
cb : cb
(cb) =>
dbg("backup_blobs_to_tarball")
@backup_blobs_to_tarball
throttle : opts.throttle
limit : opts.blobs_per_tarball
path : opts.path
map_limit : opts.map_limit
repeat_until_done : 5
cb : cb
(cb) =>
dbg("copy_all_blobs_to_gcloud")
errors = {}
@copy_all_blobs_to_gcloud
limit : 1000
repeat_until_done_s : 5
errors : errors
remove : true
map_limit : opts.map_limit
throttle : opts.throttle
cb : (err) =>
if misc.len(errors) > 0
dbg("errors! #{misc.to_json(errors)}")
cb(err)
], (err) =>
opts.cb?(err)
)
remove_blob_ttls: (opts) =>
opts = defaults opts,
uuids : required
cb : required
@_query
query : "UPDATE blobs"
set : {expire: null}
where : "id::UUID = ANY($)" : (x for x in opts.uuids when misc.is_valid_uuid_string(x))
cb : opts.cb
close_blob: (opts) =>
opts = defaults opts,
uuid : required
bucket : COCALC_BLOB_STORE
cb : undefined
if not misc.is_valid_uuid_string(opts.uuid)
opts.cb?("uuid is invalid")
return
async.series([
(cb) =>
@_query
query : 'SELECT gcloud FROM blobs'
where : 'id = $::UUID' : opts.uuid
cb : one_result 'gcloud', (err, gcloud) =>
if err
cb(err)
else if not gcloud
@copy_blob_to_gcloud
uuid : opts.uuid
bucket : opts.bucket
cb : cb
else
cb()
(cb) =>
@_query
query : 'SELECT gcloud FROM blobs'
where : 'id = $::UUID' : opts.uuid
set : {blob: null}
cb : cb
], (err) => opts.cb?(err))
syncstring_maintenance: (opts) =>
opts = defaults opts,
age_days : 30
map_limit : 1
limit : 1000
repeat_until_done : true
delay : 0
cb : undefined
dbg = @_dbg("syncstring_maintenance")
dbg(opts)
syncstrings = undefined
async.series([
(cb) =>
dbg("determine inactive syncstring ids")
@_query
query : 'SELECT string_id FROM syncstrings'
where : [{'last_active <= $::TIMESTAMP' : misc.days_ago(opts.age_days)}, 'archived IS NULL']
limit : opts.limit
cb : all_results 'string_id', (err, v) =>
syncstrings = v
cb(err)
(cb) =>
dbg("archive patches for inactive syncstrings")
i = 0
f = (string_id, cb) =>
i += 1
console.log("*** #{i}/#{syncstrings.length}: archiving string #{string_id} ***")
@archive_patches
string_id : string_id
cb : (err) ->
if err or not opts.delay
cb(err)
else
setTimeout(cb, opts.delay)
async.mapLimit(syncstrings, opts.map_limit, f, cb)
], (err) =>
if err
opts.cb?(err)
else if opts.repeat_until_done and syncstrings.length == opts.limit
dbg("doing it again")
@syncstring_maintenance(opts)
else
opts.cb?()
)
archive_patches: (opts) =>
opts = defaults opts,
string_id : required
compress : 'zlib'
level : -1
cutoff : misc.minutes_ago(30)
cb : undefined
dbg = @_dbg("archive_patches(string_id='#{opts.string_id}')")
syncstring = patches = blob_uuid = project_id = last_active =undefined
where = {"string_id = $::CHAR(40)" : opts.string_id}
async.series([
(cb) =>
dbg("get project_id")
@_query
query : "SELECT project_id, archived, last_active FROM syncstrings"
where : where
cb : one_result (err, x) =>
if err
cb(err)
else if not x?
cb("no such syncstring with id '#{opts.string_id}'")
else if x.archived
cb("already archived")
else
project_id = x.project_id
last_active = x.last_active
cb()
(cb) =>
if last_active? and last_active >= opts.cutoff
dbg("excluding due to cutoff")
cb(); return
dbg("get patches")
@export_patches
string_id : opts.string_id
cb : (err, x) =>
patches = x
cb(err)
(cb) =>
if last_active? and last_active >= opts.cutoff
cb(); return
dbg("create blob from patches")
try
blob = new Buffer(JSON.stringify(patches))
catch err
cb(err)
return
dbg('save blob')
blob_uuid = misc_node.uuidsha1(blob)
@save_blob
uuid : blob_uuid
blob : blob
project_id : project_id
compress : opts.compress
level : opts.level
cb : cb
(cb) =>
if last_active? and last_active >= opts.cutoff
cb(); return
dbg("update syncstring to indicate patches have been archived in a blob")
@_query
query : "UPDATE syncstrings"
set : {archived : blob_uuid}
where : where
cb : cb
(cb) =>
if last_active? and last_active >= opts.cutoff
cb(); return
dbg("actually delete patches")
@_query
query : "DELETE FROM patches"
where : where
cb : cb
], (err) => opts.cb?(err))
unarchive_patches: (opts) =>
opts = defaults opts,
string_id : required
cb : undefined
dbg = @_dbg("unarchive_patches(string_id='#{opts.string_id}')")
where = {"string_id = $::CHAR(40)" : opts.string_id}
@_query
query : "SELECT archived FROM syncstrings"
where : where
cb : one_result 'archived', (err, blob_uuid) =>
if err or not blob_uuid?
opts.cb?(err)
return
blob = undefined
async.series([
(cb) =>
dbg("download blob")
@get_blob
uuid : blob_uuid
cb : (err, x) =>
if err
cb(err)
else if not x?
cb("blob is gone")
else
blob = x
cb(err)
(cb) =>
dbg("extract blob")
try
patches = JSON.parse(blob)
catch e
cb("corrupt patches blob -- #{e}")
return
@import_patches
patches : patches
cb : cb
(cb) =>
async.parallel([
(cb) =>
dbg("update syncstring to indicate that patches are now available")
@_query
query : "UPDATE syncstrings SET archived=NULL"
where : where
cb : cb
(cb) =>
dbg('delete blob, which is no longer needed')
@delete_blob
uuid : blob_uuid
cb : cb
], cb)
], (err) => opts.cb?(err))
export_patches: (opts) =>
opts = defaults opts,
string_id : required
cb : required
@_query
query : "SELECT extract(epoch from time)*1000 as epoch, * FROM patches"
where : {"string_id = $::CHAR(40)" : opts.string_id}
cb : all_results (err, patches) =>
if err
opts.cb(err)
else
for p in patches
p.time = new Date(p.epoch)
delete p.epoch
opts.cb(undefined, patches)
import_patches: (opts) =>
opts = defaults opts,
patches : required
string_id : undefined
cb : undefined
patches = opts.patches
if patches.length == 0
opts.cb?()
return
if patches[0].id?
v = []
for x in patches
patch =
string_id : x.id[0]
time : new Date(x.id[1])
user_id : x.user
patch : x.patch
snapshot : x.snapshot
sent : x.sent
prev : x.prev
v.push(patch)
patches = v
if opts.string_id?
for x in patches
x.string_id = opts.string_id
insert_block_size = 1000
f = (i, cb) =>
@_query
query : 'INSERT INTO patches'
values : patches.slice(insert_block_size*i, insert_block_size*(i+1))
conflict : 'ON CONFLICT DO NOTHING'
cb : cb
async.mapSeries([0...patches.length/insert_block_size], f, (err) => opts.cb?(err))
export_syncstring: (opts) =>
opts = defaults opts,
string_id : required
cb : undefined
import_syncstring: (opts) =>
opts = defaults opts,
obj : required
cb : undefined
delete_blob: (opts) =>
opts = defaults opts,
uuid : required
cb : undefined
if not misc.is_valid_uuid_string(opts.uuid)
opts.cb?("uuid is invalid")
return
gcloud = undefined
dbg = @_dbg("delete_blob(uuid='#{opts.uuid}')")
async.series([
(cb) =>
dbg("check if blob in gcloud")
@_query
query : "SELECT gcloud FROM blobs"
where : "id = $::UUID" : opts.uuid
cb : one_result 'gcloud', (err, x) =>
gcloud = x
cb(err)
(cb) =>
if not gcloud or not COCALC_BLOB_STORE
cb()
return
dbg("delete from gcloud")
@blob_store(gcloud).delete
name : opts.uuid
cb : cb
(cb) =>
dbg("delete from local database")
@_query
query : "DELETE FROM blobs"
where : "id = $::UUID" : opts.uuid
cb : cb
], (err) => opts.cb?(err))