require('coffee-cache')
BUCKET = 'smc-projects-bup'
{join} = require('path')
fs = require('fs')
os = require('os')
async = require('async')
rmdir = require('rimraf')
winston = require('winston')
misc_node = require('smc-util-node/misc_node')
misc = require('smc-util/misc')
{defaults, required} = misc
postgres = require('./postgres')
process.env['PGHOST'] = 'postgres0'
winston.remove(winston.transports.Console)
winston.add(winston.transports.Console, {level: 'debug', timestamp:true, colorize:true})
exclude = () ->
return ("--exclude=#{x}" for x in misc.split('.sage/cache .sage/temp .trash .Trash .sagemathcloud .smc .node-gyp .cache .forever .snapshots *.sage-backup'))
get_db = (cb) ->
db = postgres.db()
db.connect(cb : (err) => cb?(err, db))
copy_project_from_LIVE_to_SNAPSHOT = (opts) ->
opts = defaults opts,
project_id : required
host : required
max_size_G : 50
delete : true
cb : required
dbg = (m) -> winston.debug("copy_project_from_LIVE_to_SNAPSHOT(project_id='#{opts.project_id}'): #{m}")
dbg("host='#{opts.host}'")
args = ['-axH', "--max-size=#{opts.max_size_G}G", "--ignore-errors"]
if opts.delete
args = args.concat(["--delete", "--delete-excluded"])
else
args.push('--update')
args = args.concat(exclude())
args = args.concat(['-e', 'ssh -T -c arcfour -o Compression=no -x -o StrictHostKeyChecking=no'])
source = "#{opts.host}:/projects/#{opts.project_id}/"
target = "/projects/#{opts.project_id}/"
args = args.concat([source, target])
dbg("starting rsync...")
start = misc.walltime()
misc_node.execute_code
command : 'rsync'
args : args
timeout : 3600*2
err_on_exit : true
cb : (err, output) ->
if err and output?.exit_code == 24 or output?.exit_code == 23
err = undefined
dbg("...finished rsync -- time=#{misc.walltime(start)}s")
opts.cb(err)
copy_project_from_SNAPSHOT_to_LIVE = (opts) ->
opts = defaults opts,
project_id : required
host : required
cb : required
dbg = (m) -> winston.debug("copy_project_from_SNAPSHOT_to_LIVE(project_id='#{opts.project_id}'): #{m}")
dbg("host='#{opts.host}'")
args = ['-axH']
args = args.concat(['-e', 'ssh -T -c arcfour -o Compression=no -x -o StrictHostKeyChecking=no'])
source = "/projects/#{opts.project_id}/"
target = "#{opts.host}:/projects/#{opts.project_id}/"
args = args.concat([source, target])
dbg("starting rsync...")
start = misc.walltime()
misc_node.execute_code
command : 'rsync'
args : args
timeout : 10000
verbose : true
err_on_exit : true
cb : (out...) ->
dbg("finished rsync -- time=#{misc.walltime(start)}s")
opts.cb(out...)
get_storage = (project_id, database, cb) ->
dbg = (m) -> winston.debug("get_storage(project_id='#{project_id}'): #{m}")
database.get_project
project_id : project_id
columns : ['storage']
cb : (err, x) ->
if err
cb(err)
else if not x?
cb("no such project")
else
cb(undefined, x.storage?.host)
get_host_and_storage = (project_id, database, cb) ->
dbg = (m) -> winston.debug("get_host_and_storage(project_id='#{project_id}'): #{m}")
host = undefined
storage = undefined
async.series([
(cb) ->
dbg("determine project location info")
database.get_project
project_id : project_id
columns : ['storage', 'host']
cb : (err, x) ->
if err
cb(err)
else if not x?
cb("no such project")
else
host = x.host?.host
storage = x.storage?.host
if not host
cb("project not currently open on a compute host")
else
cb()
(cb) ->
if storage?
cb()
return
dbg("allocate storage host")
database._query
query : "SELECT host FROM storage_servers"
cb : postgres.all_results 'host', (err, hosts) ->
if err
cb(err)
else if not hosts? or hosts.length == 0
cb("no storage servers in storage_server table")
else
storage = misc.random_choice(hosts)
database.set_project_storage
project_id : project_id
host : storage
cb : cb
], (err) ->
cb(err, {host:host, storage:storage})
)
exports.save_SNAPSHOT = save_SNAPSHOT = (opts) ->
opts = defaults opts,
database : required
project_id : required
max_size_G : 50
cb : required
dbg = (m) -> winston.debug("save_SNAPSHOT(project_id='#{opts.project_id}'): #{m}")
host = undefined
async.series([
(cb) ->
get_host_and_storage opts.project_id, opts.database, (err, x) ->
if err
cb(err)
else
{host, storage} = x
if storage != os.hostname()
cb("project is assigned to '#{storage}', but this server is '#{os.hostname()}'")
else
cb()
(cb) ->
dbg("do the save")
copy_project_from_LIVE_to_SNAPSHOT
project_id : opts.project_id
host : host
cb : cb
(cb) ->
dbg("save succeeded -- record in database")
opts.database.update_project_storage_save
project_id : opts.project_id
cb : cb
], (err) -> opts.cb(err))
exports.save_SNAPSHOT_age = (opts) ->
opts = defaults opts,
database : required
age_m : required
threads : 5
cb : required
dbg = (m) -> winston.debug("save_all_projects(last_edited_m:#{opts.age_m}): #{m}")
dbg()
errors = {}
hostname = os.hostname()
projects = undefined
async.series([
(cb) ->
dbg("get all recently modified projects from the database")
opts.database.recent_projects
age_m : opts.age_m
pluck : ['project_id', 'storage']
cb : (err, v) ->
if err
cb(err)
else
dbg("got #{v.length} recently modified projects")
projects = (x.project_id for x in v when x.storage?.host == hostname)
dbg("got #{projects.length} projects stored here")
cb()
(cb) ->
dbg("save each modified project")
n = 0
f = (project_id, cb) ->
n += 1
m = n
dbg("#{m}/#{projects.length}: START")
save_SNAPSHOT
project_id : project_id
database : opts.database
cb : (err) ->
dbg("#{m}/#{projects.length}: DONE -- #{err}")
if err
errors[project_id] = err
cb()
async.mapLimit(projects, opts.threads, f, cb)
], (err) ->
opts.cb(if misc.len(errors) > 0 then errors)
)
exports.close_SNAPSHOT = close_SNAPSHOT = (opts) ->
opts = defaults opts,
database : required
project_id : required
cb : required
dbg = (m) -> winston.debug("close_SNAPSHOT(project_id='#{opts.project_id}'): #{m}")
async.series([
(cb) ->
dbg('check that project is NOT currently opened LIVE')
opts.database.get_project_host
project_id : opts.project_id
cb : (err, host) ->
if err
cb(err)
else if host
cb("project must not be open LIVE")
else
cb()
(cb) ->
dbg('save project to BUP (and GCS)')
save_BUP
database : opts.database
project_id : opts.project_id
cb : cb
(cb) ->
dbg('saving to BUP succeeded; now deleting SNAPSHOT')
delete_SNAPSHOT
project_id : opts.project_id
cb : cb
], opts.cb)
delete_SNAPSHOT = (opts) ->
opts = defaults opts,
project_id : required
cb : required
winston.debug("delete_SNAPSHOT('#{opts.project_id}')")
rmdir("/projects/#{opts.project_id}", opts.cb)
exports.delete_old_BUPs = (opts) ->
opts = defaults opts,
database : required
age_m : 60*24*365*10
min_age_m : 60*24*365
threads : 1
limit : undefined
cb : required
if process.env.USER != 'root'
opts.cb("must be root")
return
projects = undefined
hostname = os.hostname()
dbg = (m) -> winston.debug("delete_old_BUPs: #{m}")
dbg()
async.series([
(cb) ->
dbg("doing query....")
opts.database.recent_projects
age_m : opts.age_m
min_age_m : opts.min_age_m
pluck : ['project_id', 'last_edited', 'storage']
cb : (err, v) ->
if err
cb(err)
else
dbg("Got #{v.length} total projects")
projects = (x for x in v when x.storage?.host == hostname)
dbg("Got #{projects.length} projects on this host '#{hostname}'")
cb()
(cb) ->
if opts.limit?
projects = projects.slice(0, opts.limit)
dbg("deleting bups")
m = 0
f = (x, cb) ->
delete_BUP
project_id : x.project_id
cb : (err) ->
m += 1
dbg("#{Math.round(m*100/projects.length)}%: finished #{m} of #{projects.length} -- #{err}")
cb(err)
async.mapLimit(projects, opts.threads, f, ((err)->cb(err)))
], opts.cb)
delete_BUP = (opts) ->
opts = defaults opts,
project_id : required
cb : required
winston.debug("delete_BUP('#{opts.project_id}')")
target = "/bups/#{opts.project_id}"
fs.exists target, (exists) ->
if exists
rmdir(target, opts.cb)
else
opts.cb()
exports.close_BUP = close_BUP = (opts) ->
opts = defaults opts,
database : required
project_id : required
cb : required
dbg = (m) -> winston.debug("close_BUP(project_id='#{opts.project_id}'): #{m}")
async.series([
(cb) ->
dbg('check that SNAPSHOT is deleted on this computer')
fs.exists "/projects/#{opts.project_id}", (exists) ->
if exists
cb("must first close SNAPSHOT")
else
cb()
(cb) ->
dbg('check that BUP is available on this computer')
fs.exists "/bups/#{opts.project_id}", (exists) ->
if not exists
cb("no BUP on this host")
else
cb()
(cb) ->
dbg('save BUP to GCS')
copy_BUP_to_GCS
project_id : opts.project_id
cb : cb
(cb) ->
dbg('saving BUP to GCS succeeded; now delete BUP')
delete_BUP
project_id : opts.project_id
cb : cb
], opts.cb)
exports.close_LIVE = close_LIVE = (opts) ->
opts = defaults opts,
database : required
project_id : required
cb : required
dbg = (m) -> winston.debug("close_LIVE(project_id='#{opts.project_id}'): #{m}")
host = undefined
async.series([
(cb) ->
dbg('figure out where project is currently opened')
opts.database.get_project_host
project_id : opts.project_id
cb : (err, x) ->
host = x
cb(err)
(cb) ->
if not host
dbg('project not currently opened')
cb()
return
dbg('do a last copy of the project to this server')
copy_project_from_LIVE_to_SNAPSHOT
project_id : opts.project_id
host : host
cb : cb
(cb) ->
if not host
cb(); return
dbg('save succeeded: mark project host as not set in database')
opts.database.unset_project_host
project_id : opts.project_id
cb : cb
(cb) ->
if not host
cb(); return
dbg("finally, actually deleting the project from '#{host}' to free disk space")
delete_LIVE
project_id : opts.project_id
host : host
cb : cb
], opts.cb)
delete_LIVE = (opts) ->
opts = defaults opts,
project_id : required
host : required
cb : required
if not misc.is_valid_uuid_string(opts.project_id)
opts.cb("project_id='#{opts.project_id}' is not a valid uuid")
return
if not misc.startswith(opts.host, 'compute')
opts.cb("host='#{opts.host}' does not start with 'compute', which is suspicious")
return
target = "/projects/#{opts.project_id}"
misc_node.execute_code
command : 'ssh'
args : ['-o', 'StrictHostKeyChecking=no', "root@#{opts.host}", "rm -rf #{target}"]
timeout : 1800
cb : opts.cb
exports.open_LIVE = open_LIVE = (opts) ->
opts = defaults opts,
database : required
host : required
project_id : required
cb : required
dbg = (m) -> winston.debug("open_LIVE(project_id='#{opts.project_id}', host='#{opts.host}'): #{m}")
async.series([
(cb) ->
dbg('make sure project is not already opened somewhere')
opts.database.get_project_host
project_id : opts.project_id
cb : (err, host) ->
if err
cb(err)
else
if host
cb("project already opened")
else
cb()
(cb) ->
fs.exists "/projects/#{opts.project_id}", (exists) ->
if exists
dbg("project is available locally in /projects directory")
cb()
else
dbg("project is NOT available locally in /projects directory -- restore from bup archive (if one exists)")
exports.open_SNAPSHOT
database : opts.database
project_id : opts.project_id
cb : cb
(cb) ->
dbg("do the open")
copy_project_from_SNAPSHOT_to_LIVE
project_id : opts.project_id
host : opts.host
cb : cb
(cb) ->
dbg("open succeeded -- record in database")
opts.database.set_project_host
project_id : opts.project_id
host : opts.host
cb : cb
], opts.cb)
exports.move_project = move_project = (opts) ->
opts = defaults opts,
database : required
project_id : required
target : required
cb : required
dbg = (m) -> winston.debug("move_project(project_id='#{opts.project_id}'): #{m}")
source = undefined
async.series([
(cb) ->
dbg('determine current location of project')
opts.database.get_project_host
project_id : opts.project_id
cb : (err, host) ->
source = host
if err
cb(err)
else
if not source
cb("project not opened, so can't move")
else if source == opts.target
cb("project is already on '#{opts.target}'")
else
cb()
(cb) ->
dbg("copy the project")
copy_project_from_one_compute_server_to_another
project_id : opts.project_id
source : source
target : opts.target
cb : cb
(cb) ->
dbg("successfully copied the project, now setting host in database")
opts.database.set_project_host
project_id : opts.project_id
host : opts.target
cb : cb
(cb) ->
dbg("also, delete from the source to save space")
delete_LIVE
project_id : opts.project_id
host : source
cb : cb
], opts.cb)
copy_project_from_one_compute_server_to_another = (opts) ->
opts = defaults opts,
project_id : required
source : required
target : required
cb : required
winston.debug("copy the project from '#{opts.source}' to '#{opts.target}'")
if not misc.is_valid_uuid_string(opts.project_id)
opts.cb("project_id='#{opts.project_id}' is not a valid uuid")
return
for host in [opts.source, opts.target]
if not misc.startswith(host, 'compute')
opts.cb("host='#{host}' must start with 'compute'")
return
source = "/projects/#{opts.project_id}/"
target = "#{opts.target}:/projects/#{opts.project_id}/"
excludes = exclude().join(' ')
misc_node.execute_code
command : 'ssh'
args : ["root@#{opts.source}", "rsync -axH -e 'ssh -T -c arcfour -o Compression=no -x -o StrictHostKeyChecking=no' #{excludes} #{source} #{target}"]
timeout : 3600*2
cb : (err, output) ->
if err and output?.exit_code == 24 or output?.exit_code == 23
err = undefined
opts.cb(err)
exports.save_BUP_age = (opts) ->
opts = defaults opts,
database : required
age_m : undefined
min_age_m : undefined
threads : 1
time_since_last_backup_m : undefined
local : false
limit : undefined
cb : required
if process.env.USER != 'root'
opts.cb("must be root")
return
projects = undefined
hostname = os.hostname()
dbg = (m) -> winston.debug("save_BUP_age: #{m}")
dbg("age_m=#{opts.age_m}; min_age_m=#{opts.min_age_m}; time_since_last_backup_m=#{opts.time_since_last_backup_m}")
async.series([
(cb) ->
if opts.time_since_last_backup_m? or opts.local?
opts.database.recent_projects
age_m : opts.age_m
min_age_m : opts.min_age_m
pluck : ['last_backup', 'project_id', 'last_edited', 'storage']
cb : (err, v) ->
if err
cb(err)
else
dbg("got #{v.length} recent projects")
projects = []
cutoff = misc.minutes_ago(opts.time_since_last_backup_m)
for x in v
if opts.limit? and projects.length >= opts.limit
break
if x.storage?.host != hostname
continue
if opts.local and not fs.existsSync("/bups/#{x.project_id}") and fs.existsSync("/projects/#{x.project_id}")
projects.push(x.project_id)
continue
if x.last_backup? and x.last_edited? and x.last_backup >= x.last_edited
continue
if not x.last_backup? or x.last_backup <= cutoff
projects.push(x.project_id)
dbg("of these recent projects, #{projects.length} DO NOT have a backup made within the last #{opts.time_since_last_backup_m} minutes")
cb()
else
opts.database.recent_projects
age_m : opts.age_m
min_age_m : opts.min_age_m
cb : (err, v) ->
projects = v
cb(err)
(cb) ->
if opts.limit?
projects = projects.slice(0, opts.limit)
dbg("making backup of #{projects.length} projects")
save_BUP_many
database : opts.database
projects : projects
threads : opts.threads
cb : cb
], opts.cb)
save_BUP_many = (opts) ->
opts = defaults opts,
database : required
projects : required
threads : 1
cb : required
dbg = (m) -> winston.debug("save_BUP_many(projects.length=#{opts.projects.length}): #{m}")
dbg("threads=#{opts.threads}")
errors = {}
n = 0
done = 0
f = (project_id, cb) ->
n += 1
m = n
dbg("#{m}/#{opts.projects.length}: backing up #{project_id}")
save_BUP
database : opts.database
project_id : project_id
cb : (err) ->
done += 1
dbg("#{m}/#{opts.projects.length}: #{done} DONE #{project_id} -- #{err}")
if done >= opts.projects.length
dbg("**COMPLETELY DONE!!**")
if err
errors[project_id] = err
cb()
finish = ->
if misc.len(errors) == 0
opts.cb()
else
opts.cb(errors)
fs.exists '/bups', (exists) ->
if not exists
opts.cb("/bups directory not mounted -- no bup access")
else
async.mapLimit(opts.projects, opts.threads, f, finish)
save_BUP = exports.save_BUP = (opts) ->
opts = defaults opts,
database : required
project_id : required
cb : required
dbg = (m) -> winston.debug("save_BUP(project_id='#{opts.project_id}'): #{m}")
dbg()
if process.env.USER != 'root'
opts.cb("must be root")
return
exists = bup = undefined
async.series([
(cb) ->
fs.exists '/bups', (exists) ->
if not exists
cb("/bups directory not mounted -- no bup access")
else
cb()
(cb) ->
fs.exists join('/projects', opts.project_id), (_exists) ->
exists = _exists
cb()
(cb) ->
if not exists
cb(); return
dbg("saving project to local bup repo")
bup_save_project
project_id : opts.project_id
cb : (err, _bup) ->
if err
cb(err)
else
bup = _bup
cb()
(cb) ->
if not exists
cb(); return
if not BUCKET
cb(); return
copy_BUP_to_GCS
project_id : opts.project_id
bup : bup
cb :cb
(cb) ->
dbg("recording successful backup in database")
opts.database._query
query : "UPDATE projects"
set :
last_backup: new Date()
where :
'project_id :: UUID = $' : opts.project_id
cb : cb
], (err) -> opts.cb(err))
copy_BUP_to_GCS = (opts) ->
opts = defaults opts,
project_id : required
bup : undefined
cb : required
dbg = (m) -> winston.debug("copy_BUP_to_GCS(project_id='#{opts.project_id}'): #{m}")
dbg()
bup = opts.bup
async.series([
(cb) ->
if bup?
cb(); return
get_bup_path opts.project_id, (err, x) ->
bup = x; cb(err)
(cb) ->
i = bup.indexOf(opts.project_id)
if i == -1
cb("bup path must contain project_id")
return
else
bup1 = bup.slice(i)
async.parallel([
(cb) ->
dbg("rsync'ing pack files")
misc_node.execute_code
timeout : 2*3600
command : 'gsutil'
args : ['-m', 'rsync', '-x', '.*\.bloom|.*\.midx', '-r', "#{bup}/objects/", "gs://#{BUCKET}/#{bup1}/objects/"]
cb : cb
(cb) ->
dbg("rsync'ing refs and logs files")
f = (path, cb) ->
misc_node.execute_code
timeout : 300
command : 'gsutil'
args : ['-m', 'rsync', '-c', '-r', "#{bup}/#{path}/", "gs://#{BUCKET}/#{bup1}/#{path}/"]
cb : cb
async.map(['refs', 'logs'], f, cb)
], cb)
], opts.cb)
get_bup_path = (project_id, cb) ->
dir = "/bups/#{project_id}"
fs.readdir dir, (err, files) ->
if err
cb(err)
else
files = files.sort()
if files.length > 0
bup = join(dir, files[files.length-1])
cb(undefined, bup)
bup_save_project = (opts) ->
opts = defaults opts,
project_id : required
cb : required
dbg = (m) -> winston.debug("bup_save_project(project_id='#{opts.project_id}'): #{m}")
dbg()
source = join('/projects', opts.project_id)
dir = "/bups/#{opts.project_id}"
bup = undefined
async.series([
(cb) ->
dbg("create target bup repo")
fs.exists dir, (exists) ->
if exists
cb()
else
fs.mkdir(dir, cb)
(cb) ->
dbg('ensure there is a bup repo')
get_bup_path opts.project_id, (err, x) ->
bup = x; cb(err)
(cb) ->
if bup?
cb(); return
dbg("must create bup repo")
bup = join(dir, misc.date_to_snapshot_format(new Date()))
fs.mkdir(bup, cb)
(cb) ->
dbg("init bup repo")
misc_node.execute_code
command : 'bup'
args : ['init']
timeout : 120
env : {BUP_DIR:bup}
cb : cb
(cb) ->
dbg("index the project")
misc_node.execute_code
command : 'bup'
args : ['index', source]
timeout : 60*30
env : {BUP_DIR:bup}
cb : cb
(cb) ->
dbg("save the bup snapshot")
misc_node.execute_code
command : 'bup'
args : ['save', source, '-n', 'master', '--strip']
timeout : 60*60*2
env : {BUP_DIR:bup}
cb : cb
(cb) ->
dbg('ensure that all backup files are readable by the salvus user (only user on this system)')
misc_node.execute_code
command : 'chmod'
args : ['a+r', '-R', bup]
timeout : 60
cb : cb
], (err) ->
opts.cb(err, bup)
)
exports.open_SNAPSHOT = (opts) ->
opts = defaults opts,
database : required
project_id : required
cb : required
dbg = (m) -> winston.debug("restore_project(project_id='#{opts.project_id}'): #{m}")
dbg()
async.series([
(cb) ->
dbg("update/get bup rep from google cloud storage")
open_BUP
project_id : opts.project_id
database : opts.database
cb : cb
(cb) ->
dbg("extract project")
copy_BUP_to_SNAPSHOT
project_id : opts.project_id
cb : cb
(cb) ->
dbg("record that project is now stored here")
opts.database.update_project_storage_save
project_id : opts.project_id
cb : cb
], (err)->opts.cb(err))
copy_BUP_to_SNAPSHOT = (opts) ->
opts = defaults opts,
project_id : required
cb : required
dbg = (m) -> winston.debug("open_SNAPSHOT(project_id='#{opts.project_id}'): #{m}")
dbg()
outdir = "/projects/#{opts.project_id}"
local_path = "/bups/#{opts.project_id}"
bup = undefined
async.series([
(cb) ->
dbg("ensure local bup path '#{local_path}' exists")
fs.exists local_path, (exists) ->
if exists
cb()
else
fs.mkdir(local_path, cb)
(cb) ->
dbg("check if outdir='#{outdir}' exists")
fs.exists outdir, (exists) ->
if exists
cb()
else
async.series([
(cb) ->
dbg("create outdir='#{outdir}'")
fs.mkdir(outdir, 0o700, cb)
(cb) ->
dbg("set ownership of '#{outdir}'")
uid = misc_node.uid(opts.project_id)
fs.chown(outdir, uid, uid, cb)
], cb)
(cb) ->
dbg("determine newest bup repos")
fs.readdir local_path, (err, files) ->
if err
cb(err)
else
if files.length > 0
files.sort()
snapshot = files[files.length-1]
bup = join(local_path, snapshot)
cb()
(cb) ->
if not bup?
dbg("nothing to do -- no bup repos made yet")
cb(); return
dbg("extracting bup repo '#{bup}'")
misc_node.execute_code
command : 'bup'
args : ['restore', '--outdir', outdir, 'master/latest/']
env : {BUP_DIR:bup}
timeout : 3600
cb : cb
], opts.cb)
open_BUP = exports.open_BUP = (opts) ->
opts = defaults opts,
database : required
project_id : required
cb : required
dbg = (m) -> winston.debug("open_BUP(project_id='#{opts.project_id}'): #{m}")
dbg()
bup = source = undefined
async.series([
(cb) ->
fs.exists '/bups', (exists) ->
if not exists
cb("/bups directory not mounted -- no bup access")
else
cb()
(cb) ->
dbg("rsync bup repo from Google cloud storage -- first get list of available repos")
misc_node.execute_code
timeout : 120
command : 'gsutil'
args : ['ls', "gs://#{BUCKET}/#{opts.project_id}"]
cb : (err, output) ->
if err
if output?.stderr?.indexOf('matched no objects') != -1
cb()
else
cb(err)
else
v = misc.split(output.stdout).sort()
if v.length > 0
source = v[v.length-1]
dbg("most recent bup repo '#{source}'")
timestamp = require('path').parse(source).name
bup = "/bups/#{opts.project_id}/#{timestamp}"
else
dbg("WARNING: no known backups in GCS")
cb()
(cb) ->
if not source?
cb(); return
dbg("determine local bup repos (already in /bups directory) -- these would take precedence if timestamp is as new")
fs.readdir "/bups/#{opts.project_id}", (err, v) ->
if err
cb()
else
v.sort()
if v.length > 0 and v[v.length-1] >= require('path').parse(source).name
dbg("newest local version is as new, so don't get anything from GCS.")
source = undefined
else
dbg("GCS is newer, will still get it")
cb()
(cb) ->
if not source?
cb(); return
misc_node.ensure_containing_directory_exists(bup+"/HEAD", cb)
(cb) ->
if not source?
cb(); return
async.parallel([
(cb) ->
dbg("rsync'ing pack files")
fs.mkdir bup+'/objects', ->
misc_node.execute_code
timeout : 2*3600
command : 'gsutil'
args : ['-m', 'rsync', '-r', "#{source}objects/", bup+'/objects/']
cb : cb
(cb) ->
dbg("rsync'ing refs files")
fs.mkdir bup+'/refs', ->
misc_node.execute_code
timeout : 2*3600
command : 'gsutil'
args : ['-m', 'rsync', '-c', '-r', "#{source}refs/", bup+'/refs/']
cb : cb
(cb) ->
dbg("creating HEAD")
fs.writeFile(join(bup, 'HEAD'), 'ref: refs/heads/master', cb)
], (err) ->
if err
rmdir bup, () ->
cb(err)
else
cb()
)
(cb) ->
dbg("record that project is now stored here")
opts.database.update_project_storage_save
project_id : opts.project_id
cb : cb
], (err) -> opts.cb(err, bup))
exports.update_BUP = () ->
db = undefined
async.series([
(cb) ->
get_db (err, x) ->
db = x
cb(err)
(cb) ->
exports.save_BUP_age
database : db
age_m : 60*24*14
time_since_last_backup_m : 60*12
threads : 2
cb : cb
], (err) ->
winston.debug("!DONE! #{err}")
process.exit(if err then 1 else 0)
)
exports.assign_storage_to_all_projects = (database, cb) ->
dbg = (m) -> winston.debug("assign_storage_to_all_projects: #{m}")
dbg()
projects = hosts = undefined
async.series([
(cb) ->
dbg("get projects with no assigned storage")
database._query
query : "SELECT project_id FROM projects WHERE storage IS NULL"
cb : postgres.all_results 'project_id', (err, v) ->
dbg("get #{v?.length} projects")
projects = v
cb(err)
(cb) ->
database._query
query : "SELECT host FROM storage_servers"
cb : postgres.all_results 'host', (err, v) ->
dbg("get #{v?.length} storage_servers")
hosts = v
cb(err)
(cb) ->
n = 0
f = (project_id, cb) ->
n += 1
host = misc.random_choice(hosts)
dbg("#{n}/#{projects.length}: assigning #{project_id} to #{host}")
database.get_project_storage
project_id : project_id
cb : (err, storage) ->
if err or storage?
cb(err)
else
database.set_project_storage
project_id : project_id
host : host
cb : cb
async.mapLimit(projects, 10, f, cb)
], cb)
exports.update_SNAPSHOT = () ->
fs = require('fs')
path = require('path')
PID_FILE = '/home/salvus/.update_storage.pid'
dbg = (m) -> winston.debug("update_storage: #{m}")
last_pid = undefined
last_run = undefined
database = undefined
async.series([
(cb) ->
dbg("read pid file #{PID_FILE}")
fs.readFile PID_FILE, (err, data) ->
if not err
last_pid = data.toString()
cb()
(cb) ->
if last_pid?
try
process.kill(last_pid, 0)
cb("previous process still running")
catch e
dbg("good -- process not running")
cb()
else
cb()
(cb) ->
if last_pid?
fs.stat PID_FILE, (err, stats) ->
if err
cb(err)
else
last_run = stats.mtime
cb()
else
last_run = misc.days_ago(1)
cb()
(cb) ->
dbg("last run: #{last_run}")
dbg("create new pid file")
fs.writeFile(PID_FILE, "#{process.pid}", cb)
(cb) ->
get_db (err, db) ->
database = db
cb(err)
(cb) ->
exports.assign_storage_to_all_projects(database, cb)
(cb) ->
exports.save_SNAPSHOT_age
database : database
age_m : (new Date() - last_run)/1000/60
threads : 5
cb : (err) ->
dbg("save_all_projects returned errors=#{misc.to_json(err)}")
cb()
], (err) ->
dbg("finished -- err=#{err}")
if err
process.exit(1)
else
process.exit(0)
)
exports.mount_snapshots_on_all_compute_vms_command_line = ->
database = undefined
async.series([
(cb) ->
get_db (err, db) ->
database = db
cb(err)
(cb) ->
exports.mount_snapshots_on_all_compute_vms
database : database
cb : cb
], (err) ->
if err
process.exit(1)
else
winston.debug("SUCCESS!")
process.exit(0)
)
exports.mount_snapshots_on_all_compute_vms = (opts) ->
opts = defaults opts,
database : required
cb : required
dbg = (m) -> winston.debug("mount_snapshots_on_all_compute_vm: #{m}")
server = os.hostname()
hosts = undefined
errors = {}
async.series([
(cb) ->
dbg("check that sshd is setup with important restrictions (slightly limits damage in case compute machine is rooted)")
fs.readFile '/etc/ssh/sshd_config', (err, data) ->
if err
cb(err)
else if data.toString().indexOf("Match User root") == -1
cb("Put this in /etc/ssh/sshd_config, then 'service sshd restart'!:\n\nMatch User root\n\tChrootDirectory /projects/.zfs/snapshot\n\tForceCommand internal-sftp")
else
cb()
(cb) ->
dbg("query database for all compute vm's")
opts.database.get_all_compute_servers
cb : (err, v) ->
if err
cb(err)
else
hosts = (x.host for x in v)
cb()
(cb) ->
dbg("mounting snapshots on all compute vm's")
errors = {}
f = (host, cb) ->
exports.mount_snapshots_on_compute_vm
host : host
cb : (err) ->
if err
errors[host] = err
cb()
async.map(hosts, f, cb)
], (err) ->
if err
opts.cb(err)
else if misc.len(errors) > 0
opts.cb(errors)
else
opts.cb()
)
exports.mount_snapshots_on_compute_vm = (opts) ->
opts = defaults opts,
host : required
cb : required
server = os.hostname()
mnt = "/mnt/snapshots/#{server}/"
remote = "fusermount -u -z #{mnt}; mkdir -p #{mnt}/; chmod a+rx /mnt/snapshots/ #{mnt}; sshfs -o StrictHostKeyChecking=no,ro,allow_other,default_permissions #{server}:/ #{mnt}/"
winston.debug("mount_snapshots_on_compute_vm(host='#{opts.host}'): run this on #{opts.host}: #{remote}")
misc_node.execute_code
command : 'ssh'
args : ['-o', 'StrictHostKeyChecking=no', opts.host, remote]
timeout : 120
cb : opts.cb
process_update = (tasks, database, project) ->
dbg = (m) -> winston.debug("process_update(project_id=#{project.project_id}): #{m}")
project = misc.deep_copy(project)
if tasks[project.project_id]
return
if project.storage_request.finished
return
dbg(misc.to_json(project))
storage_request = project.storage_request
action = storage_request?.action
dbg("START storage action #{action} for project #{project.project_id}")
if not action?
dbg("ERROR: action not set -- suspicious -- please investigate")
return
if not project.project_id
dbg("project.project_id must be a uuid")
return
update_db = (cb) ->
database._query
query : "UPDATE projects"
where :
'project_id :: UUID = $' : project.project_id
set :
storage_request : storage_request
cb : cb
opts =
database : database
project_id : project.project_id
cb : (err) ->
storage_request.finished = new Date()
if err
storage_request.err = err
else
delete storage_request.err
update_db (err) ->
if err
dbg("ERROR: failed to record finishing the storage request - #{err}")
tasks[project.project_id] = false
func = err = undefined
switch action
when 'save'
func = save_SNAPSHOT
when 'close'
func = close_LIVE
when 'move'
target = project.storage_request.target
if not target?
err = "move must specify target"
else
func = move_project
opts.target = target
when 'open'
target = project.storage_request.target
if not target?
err = "open must specify target"
else
func = open_LIVE
opts.host = target
else
err = "unknown action '#{action}'"
if not func? and not err
err = "bug in action handler"
if err
dbg(err)
storage_request.finished = new Date()
storage_request.err = err
update_db (err) ->
dbg("ERROR: failed to record that there was an error doing storage request - #{err}")
else
dbg("doing action '#{action}'")
tasks[project.project_id] = true
storage_request.started = new Date()
update_db (err) ->
if err
dbg("ERROR: failed to declare intention to start storage request -- #{err}")
else
func(opts)
start_server = (cb) ->
host = os.hostname()
dbg = (m) -> winston.debug("storage(host='#{host}'): #{m}")
dbg()
BUP_INTERVAL_H = 6
FIELDS = ['project_id', 'storage_request', 'storage', 'host']
projects = {}
query = undefined
database = undefined
tasks = {}
if process.env.USER != 'root'
dbg("you must be root!")
process.exit(1)
return
async.series([
(cb) ->
dbg("ensure projects zpool is imported")
misc_node.execute_code
command : '/sbin/zpool'
args : ['import', 'projects']
timeout : 180
cb : (err,output) ->
if err and output?.stderr?.indexOf('already exists') == -1
dbg("err = #{misc.to_json([err, output])}")
setTimeout((=>cb(err)), 10000)
else
cb()
(cb) ->
dbg("connect to database")
get_db (err, db) ->
database = db
cb(err)
(cb) ->
dbg("create synchronized table")
age = misc.hours_ago(2)
database.synctable
table : 'projects'
columns : FIELDS
where :
"storage#>>'{host}' = $" : host
"storage_request#>>'{requested}' >= $" : age.toISOString()
cb : (err, synctable) ->
if err
cb(err)
else
dbg("initialized synctable with #{synctable.get().size} projects")
synctable.get().map (x, project_id) ->
process_update(tasks, database, x.toJS())
synctable.on 'change', (project_id) ->
x = synctable.get(project_id)
if x?
process_update(tasks, database, x.toJS())
cb()
(cb) ->
dbg("setup periodic tasks")
task_update_BUP = (cb) ->
exports.save_BUP_age
database : database
age_m : 60*24*14
time_since_last_backup_m : 60*BUP_INTERVAL_H
threads : 3
cb : (err) ->
if err
dbg("ERROR: task_update_BUP failed! -- #{misc.to_json(err)}")
else
dbg("SUCCESS: task_update_BUP")
cb?(err)
task_update_snapshots = (cb) ->
require('./rolling_snapshots').update_snapshots
filesystem : 'projects'
cb : (err) ->
if err
dbg("ERROR: task_update_snapshots failed! -- #{misc.to_json(err)}")
else
dbg("SUCCESS: task_update_snapshots")
cb?(err)
task_mount_snapshots_on_all_compute_vms = (cb) ->
exports.mount_snapshots_on_all_compute_vms
database : database
cb : (err) ->
if err
dbg("ERROR: task_mount_snapshots_on_all_compute_vms failed! -- #{misc.to_json(err)}")
else
dbg("SUCCESS: task_mount_snapshots_on_all_compute_vms")
cb?(err)
task_ensure_zfs_snapshots_are_mounted = (cb) ->
misc_node.execute_code
command : "mountpoint -q /projects && ls /projects/.zfs/snapshot/*/XXX"
bash : true
timeout : 60*5
cb : (err, output) ->
if err and output?.stderr?.indexOf("Object is remote") == -1
dbg("ERROR: task_ensure_zfs_snapshots_are_mounted failed! -- #{misc.to_json(err)}")
dbg("will try again in 15s")
setInterval(task_ensure_zfs_snapshots_are_mounted, 15000)
else
dbg("SUCCESS: task_ensure_zfs_snapshots_are_mounted")
cb?(err)
setInterval(task_update_BUP, 1000*60*13)
task_update_BUP()
setInterval(task_mount_snapshots_on_all_compute_vms, 1000*60*3)
setInterval(task_update_snapshots, 1000*60*5)
task_update_snapshots()
task_ensure_zfs_snapshots_are_mounted () ->
task_mount_snapshots_on_all_compute_vms()
zfs_expire_snapshot = 8388608
setInterval(task_ensure_zfs_snapshots_are_mounted, Math.min(2**31-1, zfs_expire_snapshot*1000))
cb()
], (err) ->
if err
dbg("error -- #{err}")
process.exit(1)
)
exports.activity = (opts) ->
new Activity(opts)
class Activity
constructor: (opts) ->
opts = defaults opts,
age_m : 10
num : 30
cb : required
@_age_m = opts.age_m
@_num = opts.num
@_init (err) =>
opts.cb(err, @)
_init: (cb) =>
dbg = (m) => winston.debug("activity: #{m}")
async.series([
(cb) =>
dbg("connect to database")
get_db (err, db) =>
@_database = db
cb(err)
(cb) =>
dbg("create synchronized table")
age = misc.minutes_ago(@_age_m)
FIELDS = ['project_id', 'storage_request', 'storage', 'host', 'state']
database = @_database
database.synctable
table : 'projects'
columns : FIELDS
where :
"storage_request#>>'{requested}' >= $" : age.toISOString()
cb : (err, synctable) =>
if err
dbg("fail: #{err}")
else
dbg("got synctable")
@_synctable = synctable
cb(err)
], cb)
get: (project_id) =>
return @_synctable.get(project_id).toJS()
list: () =>
return (x for x in @_synctable.get().valueSeq().toJS() when x.storage_request?.requested >= misc.minutes_ago(@_age_m))
ignored: () =>
return (x for x in @list() when x.storage_request?.requested? and not x.storage_request?.finished and not x.storage_request?.started)
running: () =>
return (x for x in @list() when x.storage_request?.requested? and not x.storage_request?.finished and x.storage_request?.started)
finished: () =>
return (x for x in @list() when x.storage_request?.requested? and x.storage_request?.finished and x.storage_request?.started)
times: () =>
v = []
for x in @finished()
v.push
project_id : x.project_id
requested : x.storage_request.requested
host : x.host?.host
storage : x.storage?.host
action : x.storage_request.action
wait : (x.storage_request.started - x.storage_request.requested)/1000
work : (x.storage_request.finished - x.storage_request.started)/1000
v.sort (a,b) ->
return misc.cmp(a.wait + a.work, b.wait + b.work)
return v
summary: () =>
t = @times()
data =
times : t.slice(Math.max(0,t.length - @_num))
running : @running().length
finished : @finished().length
ignored : @ignored().length
s = misc.to_json(data)
if s == @_last_data
return
@_last_data = s
console.log('\n\n\n---------------------------------------------------\n\n')
console.log(new Date())
console.log " worst times: wait work action requested storage_host host"
for x in data.times
console.log " #{x.project_id} #{x.wait} #{x.work} #{x.action} #{x.requested} #{x.storage} #{x.host}"
console.log " running : #{data.running}"
console.log " finished : #{data.finished}"
if data.ignored > 0 then warn = '*************************' else warn=''
console.log " pending : #{data.ignored} #{warn}"
monitor: () =>
f = require('underscore').debounce((=>@summary()), 1500)
@_synctable.on('change', f)
f()
return
exports.ignored_storage_requests = (opts) ->
opts = defaults opts,
age_m : 10
all : true
cb : required
dbg = (m) -> winston.debug("ignored_storage_requests: #{m}")
dbg()
db = undefined
v = undefined
async.series([
(cb) ->
dbg("connect to database")
get_db (err, _db) ->
db = _db
cb(err)
(cb) ->
dbg("doing query")
query = "SELECT project_id,storage_request,storage,host,state FROM projects WHERE "
params = [misc.minutes_ago(opts.age_m).toISOString()]
query += " storage_request#>>'{requested}' >= $1 AND storage_request#>'{started}' IS NULL AND storage_request#>'{finished}' IS NULL "
if not opts.all
query += " AND storage#>>'{host}'=$2 "
params.push(os.hostname())
db._query
query : query
params : params
cb : postgres.all_results (err, x) ->
v = x
cb(err)
], (err) ->
opts.cb(err, v)
)
exports.save_projects_with_ignored_save_requests = (opts) ->
opts = defaults opts,
age_m : 10
limit : 5
dry_run : true
cb : undefined
dbg = (m) -> winston.debug("save_projects_with_ignored_save_requests: #{m}")
dbg()
db = undefined
compute_server = undefined
v = undefined
async.series([
(cb) ->
dbg("connect to database")
get_db (err, _db) ->
db = _db
cb(err)
(cb) ->
dbg("get projects with ignored save requests")
exports.ignored_storage_requests
age_m : opts.age_m
cb : (err, z) ->
if err
cb(err)
else
v = (x for x in z when x.storage_request.action == 'save')
cb()
(cb) ->
if opts.dry_run or v.length == 0
cb()
return
require('./compute-client').compute_server
database : db
cb : (err, x) ->
if err
cb(err)
else
compute_server = x
cb()
(cb) ->
if opts.dry_run
dbg("would save #{v.length} projects")
cb()
return
f = (x, cb) ->
compute_server.project
project_id : x.project_id
cb : (err, project) ->
if err
cb(err)
else
project.save(cb:cb)
async.mapLimit(v, opts.limit, f, cb)
], (err) => opts.cb?(err))
program = require('commander')
main = () ->
LOGS = join(process.env.HOME, 'logs')
program.usage('[start/stop/restart/status] [options]')
.option('--pidfile [string]', 'store pid in this file', String, "#{LOGS}/storage.pid")
.option('--logfile [string]', 'write log to this file', String, "#{LOGS}/storage.log")
.option('-e')
.parse(process.argv)
winston.debug("running as a deamon")
daemon = require('start-stop-daemon')
process.addListener "uncaughtException", (err) ->
winston.debug("BUG ****************************************************************************")
winston.debug("Uncaught exception: " + err)
winston.debug(err.stack)
winston.debug("BUG ****************************************************************************")
get_db (e, db) ->
if not e
db?.uncaught_exception(err)
async.series([
(cb) ->
misc_node.ensure_containing_directory_exists(program.pidfile, cb)
(cb) ->
misc_node.ensure_containing_directory_exists(program.logfile, cb)
(cb) ->
daemon({max:9999, pidFile:program.pidfile, outFile:program.logfile, errFile:program.logfile, logFile:'/dev/null'}, start_server)
])
if program._name.split('.')[0] == 'storage'
main()
else
winston.debug("imported storage as a library -- #{program._name}")