Contact
CoCalc Logo Icon
StoreFeaturesDocsShareSupport News AboutSign UpSign In
| Download
Views: 39539
1
###
2
Manage storage
3
4
CONFIGURATION: see storage-config.md
5
6
SMC uses a tiered storage system. The highest existing files are the "source of truth"
7
for the current state of the project. If any actual files are at level n, they take
8
precedence over files at level n+1.
9
10
1. LIVE: The /projects/project_id directory compute server (named "compute*") on
11
which the project is sitting, as defined by the 'host' field in the projects
12
table of the database.
13
14
2. SNAPSHOT: The /projects/project_id directory on a storage server (named "projects*"),
15
as defined by the 'storage' field in the projects table. Some files are
16
excluded here (see the excludes function below).
17
18
3. BUP: The /bups/project_id/newest_timestamp directory on a storage server.
19
This is a bup repository of snapshots of 2. The project is the contents
20
of master/latest/ (which is stripped so it equals the contents of projects/)
21
22
4. GCS: Google cloud storage path gs://#{BUCKET}/project_id/newest_timstamp,
23
which is again a bup repository of snapshots of 2, with some superfulous files
24
removed. This gets copied to 3 and extracted.
25
26
5. OFFSITE: Offsite drive(s) -- bup repositories: #{BUCKET}/project_id/newest_timstamp
27
28
29
High level functions:
30
31
- close_TIER - saved tolower tier, then delete project from TIER
32
- open_TIER - open on TIER using files from lower tier (assumes project not already open)
33
- save_TIER - save project to TIER, from the tier above it.
34
35
Low level functions:
36
- delete_TIER - lower-level function that removes files from TIER with no checks or saves
37
- copy_TIER_to_TIER
38
39
NOTES:
40
- save_BUP always saves to GCS as well, so there is no save_GCS.
41
###
42
require('coffee-cache')
43
44
BUCKET = 'smc-projects-bup' # if given, will upload there using gsutil rsync
45
46
{join} = require('path')
47
fs = require('fs')
48
os = require('os')
49
50
async = require('async')
51
rmdir = require('rimraf')
52
winston = require('winston')
53
54
misc_node = require('smc-util-node/misc_node')
55
56
misc = require('smc-util/misc')
57
{defaults, required} = misc
58
59
postgres = require('./postgres')
60
61
process.env['PGHOST'] = 'postgres0' # just hardcode this since all this storage stuff is going away anyways
62
63
64
# Set the log level
65
winston.remove(winston.transports.Console)
66
winston.add(winston.transports.Console, {level: 'debug', timestamp:true, colorize:true})
67
68
exclude = () ->
69
return ("--exclude=#{x}" for x in misc.split('.sage/cache .sage/temp .trash .Trash .sagemathcloud .smc .node-gyp .cache .forever .snapshots *.sage-backup'))
70
71
get_db = (cb) ->
72
db = postgres.db()
73
db.connect(cb : (err) => cb?(err, db))
74
75
# Low level function that save all changed files from a compute VM to a local path.
76
# This must be run as root.
77
copy_project_from_LIVE_to_SNAPSHOT = (opts) ->
78
opts = defaults opts,
79
project_id : required # uuid
80
host : required # hostname of compute server, e.g., 'compute2-us'
81
max_size_G : 50
82
delete : true
83
cb : required
84
dbg = (m) -> winston.debug("copy_project_from_LIVE_to_SNAPSHOT(project_id='#{opts.project_id}'): #{m}")
85
dbg("host='#{opts.host}'")
86
args = ['-axH', "--max-size=#{opts.max_size_G}G", "--ignore-errors"]
87
if opts.delete
88
args = args.concat(["--delete", "--delete-excluded"])
89
else
90
args.push('--update')
91
args = args.concat(exclude())
92
args = args.concat(['-e', 'ssh -T -c arcfour -o Compression=no -x -o StrictHostKeyChecking=no'])
93
source = "#{opts.host}:/projects/#{opts.project_id}/"
94
target = "/projects/#{opts.project_id}/"
95
args = args.concat([source, target])
96
dbg("starting rsync...")
97
start = misc.walltime()
98
misc_node.execute_code
99
command : 'rsync'
100
args : args
101
timeout : 3600*2 # up to 2 hours...
102
err_on_exit : true
103
cb : (err, output) ->
104
if err and output?.exit_code == 24 or output?.exit_code == 23
105
# exit code 24 = partial transfer due to vanishing files
106
# exit code 23 = didn't finish due to permissions; this happens due to fuse mounts
107
err = undefined
108
dbg("...finished rsync -- time=#{misc.walltime(start)}s")#; #{misc.to_json(output)}")
109
opts.cb(err)
110
111
copy_project_from_SNAPSHOT_to_LIVE = (opts) ->
112
opts = defaults opts,
113
project_id : required # uuid
114
host : required # hostname of computer, e.g., compute2-us
115
cb : required
116
dbg = (m) -> winston.debug("copy_project_from_SNAPSHOT_to_LIVE(project_id='#{opts.project_id}'): #{m}")
117
dbg("host='#{opts.host}'")
118
args = ['-axH']
119
args = args.concat(['-e', 'ssh -T -c arcfour -o Compression=no -x -o StrictHostKeyChecking=no'])
120
source = "/projects/#{opts.project_id}/"
121
target = "#{opts.host}:/projects/#{opts.project_id}/"
122
args = args.concat([source, target])
123
dbg("starting rsync...")
124
start = misc.walltime()
125
misc_node.execute_code
126
command : 'rsync'
127
args : args
128
timeout : 10000
129
verbose : true
130
err_on_exit : true
131
cb : (out...) ->
132
dbg("finished rsync -- time=#{misc.walltime(start)}s")
133
opts.cb(out...)
134
135
get_storage = (project_id, database, cb) ->
136
dbg = (m) -> winston.debug("get_storage(project_id='#{project_id}'): #{m}")
137
database.get_project
138
project_id : project_id
139
columns : ['storage']
140
cb : (err, x) ->
141
if err
142
cb(err)
143
else if not x?
144
cb("no such project")
145
else
146
cb(undefined, x.storage?.host)
147
148
get_host_and_storage = (project_id, database, cb) ->
149
dbg = (m) -> winston.debug("get_host_and_storage(project_id='#{project_id}'): #{m}")
150
host = undefined
151
storage = undefined
152
async.series([
153
(cb) ->
154
dbg("determine project location info")
155
database.get_project
156
project_id : project_id
157
columns : ['storage', 'host']
158
cb : (err, x) ->
159
if err
160
cb(err)
161
else if not x?
162
cb("no such project")
163
else
164
host = x.host?.host
165
storage = x.storage?.host
166
if not host
167
cb("project not currently open on a compute host")
168
else
169
cb()
170
(cb) ->
171
if storage?
172
cb()
173
return
174
dbg("allocate storage host")
175
database._query
176
query : "SELECT host FROM storage_servers"
177
cb : postgres.all_results 'host', (err, hosts) ->
178
if err
179
cb(err)
180
else if not hosts? or hosts.length == 0
181
cb("no storage servers in storage_server table")
182
else
183
storage = misc.random_choice(hosts)
184
database.set_project_storage
185
project_id : project_id
186
host : storage
187
cb : cb
188
], (err) ->
189
cb(err, {host:host, storage:storage})
190
)
191
192
# Save project from compute VM to its assigned storage server. Error
193
# if project not opened LIVE.
194
exports.save_SNAPSHOT = save_SNAPSHOT = (opts) ->
195
opts = defaults opts,
196
database : required
197
project_id : required # uuid
198
max_size_G : 50
199
cb : required
200
dbg = (m) -> winston.debug("save_SNAPSHOT(project_id='#{opts.project_id}'): #{m}")
201
host = undefined
202
async.series([
203
(cb) ->
204
get_host_and_storage opts.project_id, opts.database, (err, x) ->
205
if err
206
cb(err)
207
else
208
{host, storage} = x
209
if storage != os.hostname()
210
cb("project is assigned to '#{storage}', but this server is '#{os.hostname()}'")
211
else
212
cb()
213
(cb) ->
214
dbg("do the save")
215
copy_project_from_LIVE_to_SNAPSHOT
216
project_id : opts.project_id
217
host : host
218
cb : cb
219
(cb) ->
220
dbg("save succeeded -- record in database")
221
opts.database.update_project_storage_save
222
project_id : opts.project_id
223
cb : cb
224
], (err) -> opts.cb(err))
225
226
227
###
228
Save all projects that have been modified in the last age_m minutes
229
which are stored on this machine.
230
If there are errors, then will get cb({project_id:'error...', ...})
231
232
To save(=rsync over) everything modified in the last week:
233
234
s.save_SNAPSHOT_age(database:db, age_m:60*24*7, cb:console.log)
235
236
###
237
exports.save_SNAPSHOT_age = (opts) ->
238
opts = defaults opts,
239
database : required
240
age_m : required # save all projects with last_edited at most this long ago in minutes
241
threads : 5 # number of saves to do at once.
242
cb : required
243
dbg = (m) -> winston.debug("save_all_projects(last_edited_m:#{opts.age_m}): #{m}")
244
dbg()
245
246
errors = {}
247
hostname = os.hostname()
248
projects = undefined
249
async.series([
250
(cb) ->
251
dbg("get all recently modified projects from the database")
252
opts.database.recent_projects
253
age_m : opts.age_m
254
pluck : ['project_id', 'storage']
255
cb : (err, v) ->
256
if err
257
cb(err)
258
else
259
dbg("got #{v.length} recently modified projects")
260
# we should do this filtering on the server
261
projects = (x.project_id for x in v when x.storage?.host == hostname)
262
dbg("got #{projects.length} projects stored here")
263
cb()
264
(cb) ->
265
dbg("save each modified project")
266
n = 0
267
f = (project_id, cb) ->
268
n += 1
269
m = n
270
dbg("#{m}/#{projects.length}: START")
271
save_SNAPSHOT
272
project_id : project_id
273
database : opts.database
274
cb : (err) ->
275
dbg("#{m}/#{projects.length}: DONE -- #{err}")
276
if err
277
errors[project_id] = err
278
cb()
279
async.mapLimit(projects, opts.threads, f, cb)
280
], (err) ->
281
opts.cb(if misc.len(errors) > 0 then errors)
282
)
283
284
# Assuming LIVE is deleted, make sure project is properly saved to BUP,
285
# then delete from SNAPSHOT.
286
exports.close_SNAPSHOT = close_SNAPSHOT = (opts) ->
287
opts = defaults opts,
288
database : required
289
project_id : required
290
cb : required
291
dbg = (m) -> winston.debug("close_SNAPSHOT(project_id='#{opts.project_id}'): #{m}")
292
async.series([
293
(cb) ->
294
dbg('check that project is NOT currently opened LIVE')
295
opts.database.get_project_host
296
project_id : opts.project_id
297
cb : (err, host) ->
298
if err
299
cb(err)
300
else if host
301
cb("project must not be open LIVE")
302
else
303
cb()
304
(cb) ->
305
dbg('save project to BUP (and GCS)')
306
save_BUP
307
database : opts.database
308
project_id : opts.project_id
309
cb : cb
310
(cb) ->
311
dbg('saving to BUP succeeded; now deleting SNAPSHOT')
312
delete_SNAPSHOT
313
project_id : opts.project_id
314
cb : cb
315
], opts.cb)
316
317
delete_SNAPSHOT = (opts) ->
318
opts = defaults opts,
319
project_id : required
320
cb : required
321
winston.debug("delete_SNAPSHOT('#{opts.project_id}')")
322
rmdir("/projects/#{opts.project_id}", opts.cb)
323
324
# Delete old bup repos to free up disk space -- also needed
325
# since at most 64K directories in ext4.
326
exports.delete_old_BUPs = (opts) ->
327
opts = defaults opts,
328
database : required
329
age_m : 60*24*365*10 # select projects at most this old
330
min_age_m : 60*24*365 # if given, selects only projects that are at least this old
331
threads : 1
332
limit : undefined # delete at most this many (mainly used for testing)
333
cb : required
334
if process.env.USER != 'root'
335
opts.cb("must be root")
336
return
337
projects = undefined
338
hostname = os.hostname()
339
dbg = (m) -> winston.debug("delete_old_BUPs: #{m}")
340
dbg()
341
async.series([
342
(cb) ->
343
dbg("doing query....")
344
opts.database.recent_projects
345
age_m : opts.age_m
346
min_age_m : opts.min_age_m
347
pluck : ['project_id', 'last_edited', 'storage']
348
cb : (err, v) ->
349
if err
350
cb(err)
351
else
352
dbg("Got #{v.length} total projects")
353
projects = (x for x in v when x.storage?.host == hostname)
354
dbg("Got #{projects.length} projects on this host '#{hostname}'")
355
cb()
356
(cb) ->
357
if opts.limit?
358
projects = projects.slice(0, opts.limit)
359
dbg("deleting bups")
360
m = 0
361
f = (x, cb) ->
362
delete_BUP
363
project_id : x.project_id
364
cb : (err) ->
365
m += 1
366
dbg("#{Math.round(m*100/projects.length)}%: finished #{m} of #{projects.length} -- #{err}")
367
cb(err)
368
async.mapLimit(projects, opts.threads, f, ((err)->cb(err)))
369
], opts.cb)
370
371
delete_BUP = (opts) ->
372
opts = defaults opts,
373
project_id : required
374
cb : required
375
winston.debug("delete_BUP('#{opts.project_id}')")
376
target = "/bups/#{opts.project_id}"
377
fs.exists target, (exists) ->
378
if exists
379
rmdir(target, opts.cb)
380
else
381
opts.cb()
382
383
# Both LIVE and SNAPSHOT must already be closed; this sync BUP repo to GCS,
384
# then delete BUP from this machine. So the only copy of this project
385
# that remains is the one in GCS.
386
exports.close_BUP = close_BUP = (opts) ->
387
opts = defaults opts,
388
database : required
389
project_id : required
390
cb : required
391
dbg = (m) -> winston.debug("close_BUP(project_id='#{opts.project_id}'): #{m}")
392
async.series([
393
(cb) ->
394
dbg('check that SNAPSHOT is deleted on this computer')
395
fs.exists "/projects/#{opts.project_id}", (exists) ->
396
if exists
397
cb("must first close SNAPSHOT")
398
else
399
cb()
400
(cb) ->
401
dbg('check that BUP is available on this computer')
402
fs.exists "/bups/#{opts.project_id}", (exists) ->
403
if not exists
404
cb("no BUP on this host")
405
else
406
cb()
407
(cb) ->
408
dbg('save BUP to GCS')
409
copy_BUP_to_GCS
410
project_id : opts.project_id
411
cb : cb
412
(cb) ->
413
dbg('saving BUP to GCS succeeded; now delete BUP')
414
delete_BUP
415
project_id : opts.project_id
416
cb : cb
417
], opts.cb)
418
419
# Make sure project is properly saved to SNAPSHOT, then delete from LIVE.
420
exports.close_LIVE = close_LIVE = (opts) ->
421
opts = defaults opts,
422
database : required
423
project_id : required
424
cb : required
425
dbg = (m) -> winston.debug("close_LIVE(project_id='#{opts.project_id}'): #{m}")
426
host = undefined
427
async.series([
428
(cb) ->
429
dbg('figure out where project is currently opened')
430
opts.database.get_project_host
431
project_id : opts.project_id
432
cb : (err, x) ->
433
host = x
434
cb(err)
435
(cb) ->
436
if not host
437
dbg('project not currently opened')
438
cb()
439
return
440
dbg('do a last copy of the project to this server')
441
copy_project_from_LIVE_to_SNAPSHOT
442
project_id : opts.project_id
443
host : host
444
cb : cb
445
(cb) ->
446
if not host
447
cb(); return
448
dbg('save succeeded: mark project host as not set in database')
449
opts.database.unset_project_host
450
project_id : opts.project_id
451
cb : cb
452
(cb) ->
453
if not host
454
cb(); return
455
dbg("finally, actually deleting the project from '#{host}' to free disk space")
456
delete_LIVE
457
project_id : opts.project_id
458
host : host
459
cb : cb
460
], opts.cb)
461
462
# Low level function that removes project from a given compute server. DANGEROUS, obviously.
463
delete_LIVE = (opts) ->
464
opts = defaults opts,
465
project_id : required
466
host : required # hostname of compute server where project will be DELETED
467
cb : required
468
# Do a check on the input, given how dangerous this command is!
469
if not misc.is_valid_uuid_string(opts.project_id)
470
opts.cb("project_id='#{opts.project_id}' is not a valid uuid")
471
return
472
if not misc.startswith(opts.host, 'compute')
473
opts.cb("host='#{opts.host}' does not start with 'compute', which is suspicious")
474
return
475
target = "/projects/#{opts.project_id}"
476
misc_node.execute_code
477
command : 'ssh'
478
args : ['-o', 'StrictHostKeyChecking=no', "root@#{opts.host}", "rm -rf #{target}"]
479
timeout : 1800
480
cb : opts.cb
481
482
# Open project on a given compute server (so copy from storage to compute server).
483
# Error if project is already open on a server according to the database.
484
exports.open_LIVE = open_LIVE = (opts) ->
485
opts = defaults opts,
486
database : required
487
host : required # hostname of compute server where project will be opened
488
project_id : required
489
cb : required
490
dbg = (m) -> winston.debug("open_LIVE(project_id='#{opts.project_id}', host='#{opts.host}'): #{m}")
491
async.series([
492
(cb) ->
493
dbg('make sure project is not already opened somewhere')
494
opts.database.get_project_host
495
project_id : opts.project_id
496
cb : (err, host) ->
497
if err
498
cb(err)
499
else
500
if host
501
cb("project already opened")
502
else
503
cb()
504
(cb) ->
505
fs.exists "/projects/#{opts.project_id}", (exists) ->
506
if exists
507
dbg("project is available locally in /projects directory")
508
cb()
509
else
510
dbg("project is NOT available locally in /projects directory -- restore from bup archive (if one exists)")
511
exports.open_SNAPSHOT
512
database : opts.database
513
project_id : opts.project_id
514
cb : cb
515
(cb) ->
516
dbg("do the open")
517
copy_project_from_SNAPSHOT_to_LIVE
518
project_id : opts.project_id
519
host : opts.host
520
cb : cb
521
(cb) ->
522
dbg("open succeeded -- record in database")
523
opts.database.set_project_host
524
project_id : opts.project_id
525
host : opts.host
526
cb : cb
527
], opts.cb)
528
529
# Move project, which must be open on LIVE, from one compute server to another.
530
exports.move_project = move_project = (opts) ->
531
opts = defaults opts,
532
database : required
533
project_id : required
534
target : required
535
cb : required
536
dbg = (m) -> winston.debug("move_project(project_id='#{opts.project_id}'): #{m}")
537
source = undefined
538
async.series([
539
(cb) ->
540
dbg('determine current location of project')
541
opts.database.get_project_host
542
project_id : opts.project_id
543
cb : (err, host) ->
544
source = host
545
if err
546
cb(err)
547
else
548
if not source
549
cb("project not opened, so can't move")
550
else if source == opts.target
551
cb("project is already on '#{opts.target}'")
552
else
553
cb()
554
(cb) ->
555
dbg("copy the project")
556
copy_project_from_one_compute_server_to_another
557
project_id : opts.project_id
558
source : source
559
target : opts.target
560
cb : cb
561
(cb) ->
562
dbg("successfully copied the project, now setting host in database")
563
opts.database.set_project_host
564
project_id : opts.project_id
565
host : opts.target
566
cb : cb
567
(cb) ->
568
dbg("also, delete from the source to save space")
569
delete_LIVE
570
project_id : opts.project_id
571
host : source
572
cb : cb
573
], opts.cb)
574
575
# Low level function that copies a project from one compute server to another.
576
# We assume the target is empty (so no need for dangerous --delete).
577
copy_project_from_one_compute_server_to_another = (opts) ->
578
opts = defaults opts,
579
project_id : required
580
source : required
581
target : required
582
cb : required
583
winston.debug("copy the project from '#{opts.source}' to '#{opts.target}'")
584
# Do a check on the input, given how dangerous this command is!
585
if not misc.is_valid_uuid_string(opts.project_id)
586
opts.cb("project_id='#{opts.project_id}' is not a valid uuid")
587
return
588
for host in [opts.source, opts.target]
589
if not misc.startswith(host, 'compute')
590
opts.cb("host='#{host}' must start with 'compute'")
591
return
592
source = "/projects/#{opts.project_id}/"
593
target = "#{opts.target}:/projects/#{opts.project_id}/"
594
excludes = exclude().join(' ')
595
596
misc_node.execute_code
597
command : 'ssh'
598
args : ["root@#{opts.source}", "rsync -axH -e 'ssh -T -c arcfour -o Compression=no -x -o StrictHostKeyChecking=no' #{excludes} #{source} #{target}"]
599
timeout : 3600*2 # up to 2 hours...
600
cb : (err, output) ->
601
if err and output?.exit_code == 24 or output?.exit_code == 23 # see copy_project_from_LIVE_to_SNAPSHOT
602
err = undefined
603
opts.cb(err)
604
605
606
607
608
###
609
Snapshoting projects using bup
610
###
611
612
###
613
Must run as root:
614
615
s = require('smc-hub/storage');
616
617
# make sure everything not touched in a year has a backup as recorded in the database...
618
# (except use limit to only do that many)
619
s.save_BUP_age(database:db, limit:1, threads:1, min_age_m:60*24*365, age_m:1e8, time_since_last_backup_m:1e8, cb:(e)->console.log("DONE",e))
620
621
# make sure everything not touched in 2 years has a backup on the local /bups disk if it exists in /projects
622
s.save_BUP_age(threads:2, local:true, database:db, min_age_m:2 * 60*24*365, age_m:1e8, time_since_last_backup_m:1e8, cb:(e)->console.log("DONE",e))
623
624
# make sure everything modified in the last week has at least one backup made within
625
# the last day (if it was backed up after last edited, it won't be backed up again)
626
s.save_BUP_age(database:db, age_m:7*24*60, time_since_last_backup_m:60*24, threads:1, cb:(e)->console.log("DONE",e))
627
###
628
exports.save_BUP_age = (opts) ->
629
opts = defaults opts,
630
database : required
631
age_m : undefined # if given, select projects at most this old
632
min_age_m : undefined # if given, selects only projects that are at least this old
633
threads : 1
634
time_since_last_backup_m : undefined # if given, only backup projects for which it has been at least this long since they were backed up
635
local : false # if true, backs up *every* project on this host for which no backup exists in the /bups directory.
636
limit : undefined # backup at most this many
637
cb : required
638
if process.env.USER != 'root'
639
opts.cb("must be root")
640
return
641
projects = undefined
642
hostname = os.hostname()
643
dbg = (m) -> winston.debug("save_BUP_age: #{m}")
644
dbg("age_m=#{opts.age_m}; min_age_m=#{opts.min_age_m}; time_since_last_backup_m=#{opts.time_since_last_backup_m}")
645
async.series([
646
(cb) ->
647
if opts.time_since_last_backup_m? or opts.local?
648
opts.database.recent_projects
649
age_m : opts.age_m
650
min_age_m : opts.min_age_m
651
pluck : ['last_backup', 'project_id', 'last_edited', 'storage']
652
cb : (err, v) ->
653
if err
654
cb(err)
655
else
656
dbg("got #{v.length} recent projects")
657
projects = []
658
cutoff = misc.minutes_ago(opts.time_since_last_backup_m)
659
for x in v
660
if opts.limit? and projects.length >= opts.limit
661
break
662
if x.storage?.host != hostname
663
# only consider projects on this VM
664
continue
665
if opts.local and not fs.existsSync("/bups/#{x.project_id}") and fs.existsSync("/projects/#{x.project_id}")
666
projects.push(x.project_id)
667
continue
668
if x.last_backup? and x.last_edited? and x.last_backup >= x.last_edited
669
# no need to make another backup, since already have an up to date backup
670
continue
671
if not x.last_backup? or x.last_backup <= cutoff
672
projects.push(x.project_id)
673
dbg("of these recent projects, #{projects.length} DO NOT have a backup made within the last #{opts.time_since_last_backup_m} minutes")
674
cb()
675
else
676
opts.database.recent_projects
677
age_m : opts.age_m
678
min_age_m : opts.min_age_m
679
cb : (err, v) ->
680
projects = v
681
cb(err)
682
(cb) ->
683
if opts.limit?
684
projects = projects.slice(0, opts.limit)
685
dbg("making backup of #{projects.length} projects")
686
save_BUP_many
687
database : opts.database
688
projects : projects
689
threads : opts.threads
690
cb : cb
691
], opts.cb)
692
693
save_BUP_many = (opts) ->
694
opts = defaults opts,
695
database : required
696
projects : required
697
threads : 1
698
cb : required
699
# back up a list of projects that are stored on this computer
700
dbg = (m) -> winston.debug("save_BUP_many(projects.length=#{opts.projects.length}): #{m}")
701
dbg("threads=#{opts.threads}")
702
errors = {}
703
n = 0
704
done = 0
705
f = (project_id, cb) ->
706
n += 1
707
m = n
708
dbg("#{m}/#{opts.projects.length}: backing up #{project_id}")
709
save_BUP
710
database : opts.database
711
project_id : project_id
712
cb : (err) ->
713
done += 1
714
dbg("#{m}/#{opts.projects.length}: #{done} DONE #{project_id} -- #{err}")
715
if done >= opts.projects.length
716
dbg("**COMPLETELY DONE!!**")
717
if err
718
errors[project_id] = err
719
cb()
720
finish = ->
721
if misc.len(errors) == 0
722
opts.cb()
723
else
724
opts.cb(errors)
725
726
fs.exists '/bups', (exists) ->
727
if not exists
728
opts.cb("/bups directory not mounted -- no bup access")
729
else
730
async.mapLimit(opts.projects, opts.threads, f, finish)
731
732
733
# Make snapshot of project using bup to local cache, then
734
# rsync that repo to google cloud storage. Records successful
735
# save in the database. Must be run as root.
736
save_BUP = exports.save_BUP = (opts) ->
737
opts = defaults opts,
738
database : required
739
project_id : required
740
cb : required
741
dbg = (m) -> winston.debug("save_BUP(project_id='#{opts.project_id}'): #{m}")
742
dbg()
743
if process.env.USER != 'root'
744
opts.cb("must be root")
745
return
746
exists = bup = undefined
747
async.series([
748
(cb) ->
749
fs.exists '/bups', (exists) ->
750
if not exists
751
cb("/bups directory not mounted -- no bup access")
752
else
753
cb()
754
(cb) ->
755
fs.exists join('/projects', opts.project_id), (_exists) ->
756
# not an error -- this means project was never used at all (and saved)
757
exists = _exists
758
cb()
759
(cb) ->
760
if not exists
761
cb(); return
762
dbg("saving project to local bup repo")
763
bup_save_project
764
project_id : opts.project_id
765
cb : (err, _bup) ->
766
if err
767
cb(err)
768
else
769
bup = _bup # "/bups/#{project_id}/{timestamp}"
770
cb()
771
(cb) ->
772
if not exists
773
cb(); return
774
if not BUCKET
775
cb(); return
776
copy_BUP_to_GCS
777
project_id : opts.project_id
778
bup : bup
779
cb :cb
780
(cb) ->
781
dbg("recording successful backup in database")
782
opts.database._query
783
query : "UPDATE projects"
784
set :
785
last_backup: new Date()
786
where :
787
'project_id :: UUID = $' : opts.project_id
788
cb : cb
789
], (err) -> opts.cb(err))
790
791
copy_BUP_to_GCS = (opts) ->
792
opts = defaults opts,
793
project_id : required
794
bup : undefined # optionally give path to specific bup repo with timestamp
795
cb : required
796
dbg = (m) -> winston.debug("copy_BUP_to_GCS(project_id='#{opts.project_id}'): #{m}")
797
dbg()
798
bup = opts.bup
799
async.series([
800
(cb) ->
801
if bup?
802
cb(); return
803
get_bup_path opts.project_id, (err, x) ->
804
bup = x; cb(err)
805
(cb) ->
806
i = bup.indexOf(opts.project_id)
807
if i == -1
808
cb("bup path must contain project_id")
809
return
810
else
811
bup1 = bup.slice(i) # "#{project_id}/{timestamp}"
812
async.parallel([
813
(cb) ->
814
dbg("rsync'ing pack files")
815
# Upload new pack file objects -- don't use -c, since it would be very (!!) slow on these
816
# huge files, and isn't needed, since time stamps are enough. We also don't save the
817
# midx and bloom files, since they also can be recreated from the pack files.
818
misc_node.execute_code
819
timeout : 2*3600
820
command : 'gsutil'
821
args : ['-m', 'rsync', '-x', '.*\.bloom|.*\.midx', '-r', "#{bup}/objects/", "gs://#{BUCKET}/#{bup1}/objects/"]
822
cb : cb
823
(cb) ->
824
dbg("rsync'ing refs and logs files")
825
f = (path, cb) ->
826
# upload refs; using -c below is critical, since filenames don't change but content does (and timestamps aren't
827
# used by gsutil!).
828
misc_node.execute_code
829
timeout : 300
830
command : 'gsutil'
831
args : ['-m', 'rsync', '-c', '-r', "#{bup}/#{path}/", "gs://#{BUCKET}/#{bup1}/#{path}/"]
832
cb : cb
833
async.map(['refs', 'logs'], f, cb)
834
# NOTE: we don't save HEAD, since it is always "ref: refs/heads/master"
835
], cb)
836
], opts.cb)
837
838
get_bup_path = (project_id, cb) ->
839
dir = "/bups/#{project_id}"
840
fs.readdir dir, (err, files) ->
841
if err
842
cb(err)
843
else
844
files = files.sort()
845
if files.length > 0
846
bup = join(dir, files[files.length-1])
847
cb(undefined, bup)
848
849
# this must be run as root.
850
bup_save_project = (opts) ->
851
opts = defaults opts,
852
project_id : required
853
cb : required # opts.cb(err, BUP_DIR)
854
dbg = (m) -> winston.debug("bup_save_project(project_id='#{opts.project_id}'): #{m}")
855
dbg()
856
source = join('/projects', opts.project_id)
857
dir = "/bups/#{opts.project_id}"
858
bup = undefined # will be set below to abs path of newest bup repo
859
async.series([
860
(cb) ->
861
dbg("create target bup repo")
862
fs.exists dir, (exists) ->
863
if exists
864
cb()
865
else
866
fs.mkdir(dir, cb)
867
(cb) ->
868
dbg('ensure there is a bup repo')
869
get_bup_path opts.project_id, (err, x) ->
870
bup = x; cb(err)
871
(cb) ->
872
if bup?
873
cb(); return
874
dbg("must create bup repo")
875
bup = join(dir, misc.date_to_snapshot_format(new Date()))
876
fs.mkdir(bup, cb)
877
(cb) ->
878
dbg("init bup repo")
879
misc_node.execute_code
880
command : 'bup'
881
args : ['init']
882
timeout : 120
883
env : {BUP_DIR:bup}
884
cb : cb
885
(cb) ->
886
dbg("index the project")
887
misc_node.execute_code
888
command : 'bup'
889
args : ['index', source]
890
timeout : 60*30 # 30 minutes
891
env : {BUP_DIR:bup}
892
cb : cb
893
(cb) ->
894
dbg("save the bup snapshot")
895
misc_node.execute_code
896
command : 'bup'
897
args : ['save', source, '-n', 'master', '--strip']
898
timeout : 60*60*2 # 2 hours
899
env : {BUP_DIR:bup}
900
cb : cb
901
(cb) ->
902
dbg('ensure that all backup files are readable by the salvus user (only user on this system)')
903
misc_node.execute_code
904
command : 'chmod'
905
args : ['a+r', '-R', bup]
906
timeout : 60
907
cb : cb
908
], (err) ->
909
opts.cb(err, bup)
910
)
911
912
# Copy most recent bup archive of project to local bup cache, put the HEAD file in,
913
# then restore the most recent snapshot in the archive to the local projects path.
914
exports.open_SNAPSHOT = (opts) ->
915
opts = defaults opts,
916
database : required
917
project_id : required
918
cb : required
919
dbg = (m) -> winston.debug("restore_project(project_id='#{opts.project_id}'): #{m}")
920
dbg()
921
async.series([
922
(cb) ->
923
dbg("update/get bup rep from google cloud storage")
924
open_BUP
925
project_id : opts.project_id
926
database : opts.database
927
cb : cb
928
(cb) ->
929
dbg("extract project")
930
copy_BUP_to_SNAPSHOT
931
project_id : opts.project_id
932
cb : cb
933
(cb) ->
934
dbg("record that project is now stored here")
935
opts.database.update_project_storage_save
936
project_id : opts.project_id
937
cb : cb
938
], (err)->opts.cb(err))
939
940
# Extract most recent snapshot of project from local bup archive to the
941
# local directory /projects/project_id, which either does not exist (or is empty).
942
# bup archive is assumed to be in /bups/project_id/[timestamp].
943
copy_BUP_to_SNAPSHOT = (opts) ->
944
opts = defaults opts,
945
project_id : required
946
cb : required
947
dbg = (m) -> winston.debug("open_SNAPSHOT(project_id='#{opts.project_id}'): #{m}")
948
dbg()
949
outdir = "/projects/#{opts.project_id}"
950
local_path = "/bups/#{opts.project_id}"
951
bup = undefined
952
async.series([
953
(cb) ->
954
dbg("ensure local bup path '#{local_path}' exists")
955
fs.exists local_path, (exists) ->
956
if exists
957
cb()
958
else
959
fs.mkdir(local_path, cb)
960
(cb) ->
961
dbg("check if outdir='#{outdir}' exists")
962
fs.exists outdir, (exists) ->
963
if exists
964
cb()
965
else
966
async.series([
967
(cb) ->
968
dbg("create outdir='#{outdir}'")
969
fs.mkdir(outdir, 0o700, cb)
970
(cb) ->
971
dbg("set ownership of '#{outdir}'")
972
uid = misc_node.uid(opts.project_id)
973
fs.chown(outdir, uid, uid, cb)
974
], cb)
975
(cb) ->
976
dbg("determine newest bup repos")
977
fs.readdir local_path, (err, files) ->
978
if err
979
cb(err)
980
else
981
if files.length > 0
982
files.sort()
983
snapshot = files[files.length-1] # newest snapshot
984
bup = join(local_path, snapshot)
985
cb()
986
(cb) ->
987
if not bup?
988
dbg("nothing to do -- no bup repos made yet")
989
cb(); return
990
dbg("extracting bup repo '#{bup}'")
991
misc_node.execute_code
992
command : 'bup'
993
args : ['restore', '--outdir', outdir, 'master/latest/']
994
env : {BUP_DIR:bup}
995
timeout : 3600 # up to an hour....
996
cb : cb
997
], opts.cb)
998
999
open_BUP = exports.open_BUP = (opts) ->
1000
opts = defaults opts,
1001
database : required
1002
project_id : required
1003
cb : required # cb(err, path_to_bup_repo or undefined if no repo in cloud)
1004
dbg = (m) -> winston.debug("open_BUP(project_id='#{opts.project_id}'): #{m}")
1005
dbg()
1006
bup = source = undefined
1007
async.series([
1008
(cb) ->
1009
fs.exists '/bups', (exists) ->
1010
if not exists
1011
cb("/bups directory not mounted -- no bup access")
1012
else
1013
cb()
1014
(cb) ->
1015
dbg("rsync bup repo from Google cloud storage -- first get list of available repos")
1016
misc_node.execute_code
1017
timeout : 120
1018
command : 'gsutil'
1019
args : ['ls', "gs://#{BUCKET}/#{opts.project_id}"]
1020
cb : (err, output) ->
1021
if err
1022
if output?.stderr?.indexOf('matched no objects') != -1
1023
# gs://#{BUCKET}/project_id doesn't exist at all -- get a no objects error
1024
cb()
1025
else
1026
cb(err)
1027
else
1028
v = misc.split(output.stdout).sort()
1029
if v.length > 0
1030
source = v[v.length-1] # like 'gs://#{BUCKET}/06e7df74-b68b-4370-9cdc-86aec577e162/2015-12-05-041330/'
1031
dbg("most recent bup repo '#{source}'")
1032
timestamp = require('path').parse(source).name
1033
bup = "/bups/#{opts.project_id}/#{timestamp}"
1034
else
1035
dbg("WARNING: no known backups in GCS")
1036
cb()
1037
(cb) ->
1038
if not source?
1039
# nothing to do -- nothing in GCS
1040
cb(); return
1041
dbg("determine local bup repos (already in /bups directory) -- these would take precedence if timestamp is as new")
1042
fs.readdir "/bups/#{opts.project_id}", (err, v) ->
1043
if err
1044
# no directory
1045
cb()
1046
else
1047
v.sort()
1048
if v.length > 0 and v[v.length-1] >= require('path').parse(source).name
1049
dbg("newest local version is as new, so don't get anything from GCS.")
1050
source = undefined
1051
else
1052
dbg("GCS is newer, will still get it")
1053
cb()
1054
(cb) ->
1055
if not source?
1056
cb(); return
1057
misc_node.ensure_containing_directory_exists(bup+"/HEAD", cb)
1058
(cb) ->
1059
if not source?
1060
cb(); return
1061
async.parallel([
1062
(cb) ->
1063
dbg("rsync'ing pack files")
1064
fs.mkdir bup+'/objects', ->
1065
misc_node.execute_code
1066
timeout : 2*3600
1067
command : 'gsutil'
1068
args : ['-m', 'rsync', '-r', "#{source}objects/", bup+'/objects/']
1069
cb : cb
1070
(cb) ->
1071
dbg("rsync'ing refs files")
1072
fs.mkdir bup+'/refs', ->
1073
misc_node.execute_code
1074
timeout : 2*3600
1075
command : 'gsutil'
1076
args : ['-m', 'rsync', '-c', '-r', "#{source}refs/", bup+'/refs/']
1077
cb : cb
1078
(cb) ->
1079
dbg("creating HEAD")
1080
fs.writeFile(join(bup, 'HEAD'), 'ref: refs/heads/master', cb)
1081
], (err) ->
1082
if err
1083
# Attempt to remove the new bup repo we just tried and failed to get from GCS,
1084
# so that next time we will try again.
1085
rmdir bup, () ->
1086
cb(err) # but still report error
1087
else
1088
cb()
1089
)
1090
(cb) ->
1091
dbg("record that project is now stored here")
1092
opts.database.update_project_storage_save
1093
project_id : opts.project_id
1094
cb : cb
1095
], (err) -> opts.cb(err, bup))
1096
1097
# Make sure everything modified in the last week has at least one backup made within
1098
# the last day (if it was backed up after last edited, it won't be backed up again).
1099
# For now we just run this (from the update_backups script) once per day to ensure
1100
# we have useful offsite backups.
1101
exports.update_BUP = () ->
1102
db = undefined
1103
async.series([
1104
(cb) ->
1105
get_db (err, x) ->
1106
db = x
1107
cb(err)
1108
(cb) ->
1109
exports.save_BUP_age
1110
database : db
1111
age_m : 60*24*14 # 2 weeks: consider all projects edited in the last 2 weeks
1112
time_since_last_backup_m : 60*12 # 1 day: ensure they have a bup snapshot that is at most 12 hours old if edited since last snapshot
1113
threads : 2
1114
cb : cb
1115
], (err) ->
1116
winston.debug("!DONE! #{err}")
1117
process.exit(if err then 1 else 0)
1118
)
1119
1120
# Probably soon we won't need this since projects will get storage
1121
# assigned right when they are created.
1122
exports.assign_storage_to_all_projects = (database, cb) ->
1123
# Ensure that every project is assigned to some storage host.
1124
dbg = (m) -> winston.debug("assign_storage_to_all_projects: #{m}")
1125
dbg()
1126
projects = hosts = undefined
1127
async.series([
1128
(cb) ->
1129
dbg("get projects with no assigned storage")
1130
database._query
1131
query : "SELECT project_id FROM projects WHERE storage IS NULL"
1132
cb : postgres.all_results 'project_id', (err, v) ->
1133
dbg("get #{v?.length} projects")
1134
projects = v
1135
cb(err)
1136
(cb) ->
1137
database._query
1138
query : "SELECT host FROM storage_servers"
1139
cb : postgres.all_results 'host', (err, v) ->
1140
dbg("get #{v?.length} storage_servers")
1141
hosts = v
1142
cb(err)
1143
(cb) ->
1144
n = 0
1145
f = (project_id, cb) ->
1146
n += 1
1147
host = misc.random_choice(hosts)
1148
dbg("#{n}/#{projects.length}: assigning #{project_id} to #{host}")
1149
database.get_project_storage # do a quick check that storage isn't defined -- maybe slightly avoid race condition (we are being lazy)
1150
project_id : project_id
1151
cb : (err, storage) ->
1152
if err or storage?
1153
cb(err)
1154
else
1155
database.set_project_storage
1156
project_id : project_id
1157
host : host
1158
cb : cb
1159
1160
async.mapLimit(projects, 10, f, cb)
1161
], cb)
1162
1163
exports.update_SNAPSHOT = () ->
1164
# This should be run from the command line.
1165
# It checks that it isn't already running. If not, it then
1166
# writes a pid file, copies everything over that was modified
1167
# since last time the pid file was written, then updates
1168
# all snapshots and exits.
1169
fs = require('fs')
1170
path = require('path')
1171
PID_FILE = '/home/salvus/.update_storage.pid'
1172
dbg = (m) -> winston.debug("update_storage: #{m}")
1173
last_pid = undefined
1174
last_run = undefined
1175
database = undefined
1176
async.series([
1177
(cb) ->
1178
dbg("read pid file #{PID_FILE}")
1179
fs.readFile PID_FILE, (err, data) ->
1180
if not err
1181
last_pid = data.toString()
1182
cb()
1183
(cb) ->
1184
if last_pid?
1185
try
1186
process.kill(last_pid, 0)
1187
cb("previous process still running")
1188
catch e
1189
dbg("good -- process not running")
1190
cb()
1191
else
1192
cb()
1193
(cb) ->
1194
if last_pid?
1195
fs.stat PID_FILE, (err, stats) ->
1196
if err
1197
cb(err)
1198
else
1199
last_run = stats.mtime
1200
cb()
1201
else
1202
last_run = misc.days_ago(1) # go back one day the first time
1203
cb()
1204
(cb) ->
1205
dbg("last run: #{last_run}")
1206
dbg("create new pid file")
1207
fs.writeFile(PID_FILE, "#{process.pid}", cb)
1208
(cb) ->
1209
get_db (err, db) ->
1210
database = db
1211
cb(err)
1212
(cb) ->
1213
exports.assign_storage_to_all_projects(database, cb)
1214
(cb) ->
1215
exports.save_SNAPSHOT_age
1216
database : database
1217
age_m : (new Date() - last_run)/1000/60
1218
threads : 5
1219
cb : (err) ->
1220
dbg("save_all_projects returned errors=#{misc.to_json(err)}")
1221
cb()
1222
#(cb) ->
1223
# require('./rolling_snapshots').update_snapshots
1224
# filesystem : 'projects'
1225
# cb : cb
1226
], (err) ->
1227
dbg("finished -- err=#{err}")
1228
if err
1229
process.exit(1)
1230
else
1231
process.exit(0)
1232
)
1233
1234
1235
exports.mount_snapshots_on_all_compute_vms_command_line = ->
1236
database = undefined
1237
async.series([
1238
(cb) ->
1239
get_db (err, db) ->
1240
database = db
1241
cb(err)
1242
(cb) ->
1243
exports.mount_snapshots_on_all_compute_vms
1244
database : database
1245
cb : cb
1246
], (err) ->
1247
if err
1248
process.exit(1)
1249
else
1250
winston.debug("SUCCESS!")
1251
process.exit(0)
1252
)
1253
1254
###
1255
s = require('smc-hub/storage')
1256
s.mount_snapshots_on_all_compute_vms(database:db, cb:console.log)
1257
###
1258
exports.mount_snapshots_on_all_compute_vms = (opts) ->
1259
opts = defaults opts,
1260
database : required
1261
cb : required # cb() or cb({host:error, ..., host:error})
1262
dbg = (m) -> winston.debug("mount_snapshots_on_all_compute_vm: #{m}")
1263
server = os.hostname() # name of this server
1264
hosts = undefined
1265
errors = {}
1266
async.series([
1267
(cb) ->
1268
dbg("check that sshd is setup with important restrictions (slightly limits damage in case compute machine is rooted)")
1269
fs.readFile '/etc/ssh/sshd_config', (err, data) ->
1270
if err
1271
cb(err)
1272
else if data.toString().indexOf("Match User root") == -1
1273
cb("Put this in /etc/ssh/sshd_config, then 'service sshd restart'!:\n\nMatch User root\n\tChrootDirectory /projects/.zfs/snapshot\n\tForceCommand internal-sftp")
1274
else
1275
cb()
1276
(cb) ->
1277
dbg("query database for all compute vm's")
1278
opts.database.get_all_compute_servers
1279
cb : (err, v) ->
1280
if err
1281
cb(err)
1282
else
1283
hosts = (x.host for x in v)
1284
cb()
1285
(cb) ->
1286
dbg("mounting snapshots on all compute vm's")
1287
errors = {}
1288
f = (host, cb) ->
1289
exports.mount_snapshots_on_compute_vm
1290
host : host
1291
cb : (err) ->
1292
if err
1293
errors[host] = err
1294
cb()
1295
async.map(hosts, f, cb)
1296
], (err) ->
1297
if err
1298
opts.cb(err)
1299
else if misc.len(errors) > 0
1300
opts.cb(errors)
1301
else
1302
opts.cb()
1303
)
1304
1305
# ssh to the given compute server and setup an sshfs mount on
1306
# it to this machine, if it isn't already setup.
1307
# This must be run as root.
1308
exports.mount_snapshots_on_compute_vm = (opts) ->
1309
opts = defaults opts,
1310
host : required # hostname of compute server
1311
cb : required
1312
server = os.hostname() # name of this server
1313
mnt = "/mnt/snapshots/#{server}/"
1314
remote = "fusermount -u -z #{mnt}; mkdir -p #{mnt}/; chmod a+rx /mnt/snapshots/ #{mnt}; sshfs -o StrictHostKeyChecking=no,ro,allow_other,default_permissions #{server}:/ #{mnt}/"
1315
winston.debug("mount_snapshots_on_compute_vm(host='#{opts.host}'): run this on #{opts.host}: #{remote}")
1316
misc_node.execute_code
1317
command : 'ssh'
1318
args : ['-o', 'StrictHostKeyChecking=no', opts.host, remote]
1319
timeout : 120
1320
cb : opts.cb
1321
1322
1323
###
1324
Listen to database for requested storage actions and do them. We listen for projects
1325
that have this host assigned for storage, and of course do nothing for projects
1326
that are assigned to a different storage host.
1327
###
1328
process_update = (tasks, database, project) ->
1329
dbg = (m) -> winston.debug("process_update(project_id=#{project.project_id}): #{m}")
1330
1331
project = misc.deep_copy(project) # avoid any possibility of mutating the project object below.
1332
if tasks[project.project_id]
1333
# definitely already running some task involving this project
1334
return
1335
1336
if project.storage_request.finished
1337
# definitely nothing to do -- it's finished some storage request
1338
return
1339
1340
dbg(misc.to_json(project))
1341
1342
storage_request = project.storage_request
1343
action = storage_request?.action
1344
1345
dbg("START storage action #{action} for project #{project.project_id}")
1346
1347
if not action?
1348
dbg("ERROR: action not set -- suspicious -- please investigate")
1349
return
1350
1351
if not project.project_id
1352
dbg("project.project_id must be a uuid")
1353
return
1354
1355
update_db = (cb) ->
1356
database._query
1357
query : "UPDATE projects"
1358
where :
1359
'project_id :: UUID = $' : project.project_id
1360
set :
1361
storage_request : storage_request
1362
cb : cb
1363
1364
opts =
1365
database : database
1366
project_id : project.project_id
1367
cb : (err) ->
1368
storage_request.finished = new Date()
1369
if err
1370
storage_request.err = err
1371
else
1372
delete storage_request.err
1373
update_db (err) ->
1374
if err
1375
dbg("ERROR: failed to record finishing the storage request - #{err}")
1376
# Still, we are done so we set this in tasks, so we don't block on doing one later, once
1377
# things time out in the database.
1378
tasks[project.project_id] = false
1379
1380
# Figure out which storage action to take
1381
func = err = undefined
1382
switch action
1383
when 'save'
1384
func = save_SNAPSHOT
1385
when 'close'
1386
func = close_LIVE
1387
when 'move'
1388
target = project.storage_request.target
1389
if not target?
1390
err = "move must specify target"
1391
else
1392
func = move_project
1393
opts.target = target
1394
when 'open'
1395
target = project.storage_request.target
1396
if not target?
1397
err = "open must specify target"
1398
else
1399
func = open_LIVE
1400
opts.host = target
1401
else
1402
err = "unknown action '#{action}'"
1403
1404
if not func? and not err
1405
err = "bug in action handler"
1406
if err
1407
dbg(err)
1408
storage_request.finished = new Date()
1409
storage_request.err = err
1410
update_db (err) ->
1411
dbg("ERROR: failed to record that there was an error doing storage request - #{err}")
1412
else
1413
dbg("doing action '#{action}'")
1414
tasks[project.project_id] = true
1415
storage_request.started = new Date()
1416
update_db (err) ->
1417
if err
1418
dbg("ERROR: failed to declare intention to start storage request -- #{err}")
1419
# This would happen if the database was down. After a while this request
1420
# will be considered stale and will get ignored, and a new one will be made.
1421
else
1422
# Now actually do the action...
1423
func(opts)
1424
1425
start_server = (cb) ->
1426
host = os.hostname()
1427
dbg = (m) -> winston.debug("storage(host='#{host}'): #{m}")
1428
dbg()
1429
1430
# ensure that modified projects have a snapshot that is at most this old
1431
BUP_INTERVAL_H = 6
1432
1433
FIELDS = ['project_id', 'storage_request', 'storage', 'host']
1434
projects = {} # map from project_id to object
1435
query = undefined
1436
database = undefined
1437
tasks = {}
1438
if process.env.USER != 'root'
1439
dbg("you must be root!")
1440
process.exit(1)
1441
return
1442
async.series([
1443
(cb) ->
1444
dbg("ensure projects zpool is imported")
1445
misc_node.execute_code
1446
command : '/sbin/zpool'
1447
args : ['import', 'projects']
1448
timeout : 180
1449
cb : (err,output) ->
1450
if err and output?.stderr?.indexOf('already exists') == -1
1451
dbg("err = #{misc.to_json([err, output])}")
1452
setTimeout((=>cb(err)), 10000) # wait 10s before dying (then trying again)
1453
else
1454
cb()
1455
(cb) ->
1456
dbg("connect to database")
1457
get_db (err, db) ->
1458
database = db
1459
cb(err)
1460
(cb) ->
1461
dbg("create synchronized table")
1462
1463
# Get every project assigned to this host that has done a storage
1464
# request starting within the last two hours.
1465
age = misc.hours_ago(2)
1466
database.synctable
1467
table : 'projects'
1468
columns : FIELDS
1469
where :
1470
"storage#>>'{host}' = $" : host
1471
"storage_request#>>'{requested}' >= $" : age.toISOString()
1472
cb : (err, synctable) ->
1473
if err
1474
cb(err)
1475
else
1476
dbg("initialized synctable with #{synctable.get().size} projects")
1477
# process all recent projects
1478
synctable.get().map (x, project_id) ->
1479
process_update(tasks, database, x.toJS())
1480
# process any time a project changes
1481
synctable.on 'change', (project_id) ->
1482
x = synctable.get(project_id)
1483
if x?
1484
process_update(tasks, database, x.toJS())
1485
cb()
1486
(cb) ->
1487
dbg("setup periodic tasks")
1488
1489
task_update_BUP = (cb) ->
1490
exports.save_BUP_age
1491
database : database
1492
age_m : 60*24*14 # 2 weeks: consider only projects edited in the last 2 weeks
1493
time_since_last_backup_m : 60*BUP_INTERVAL_H
1494
threads : 3 # how many too do at once
1495
cb : (err) ->
1496
if err
1497
dbg("ERROR: task_update_BUP failed! -- #{misc.to_json(err)}")
1498
else
1499
dbg("SUCCESS: task_update_BUP")
1500
cb?(err)
1501
1502
1503
task_update_snapshots = (cb) ->
1504
require('./rolling_snapshots').update_snapshots
1505
filesystem : 'projects'
1506
cb : (err) ->
1507
if err
1508
dbg("ERROR: task_update_snapshots failed! -- #{misc.to_json(err)}")
1509
else
1510
dbg("SUCCESS: task_update_snapshots")
1511
cb?(err)
1512
1513
task_mount_snapshots_on_all_compute_vms = (cb) ->
1514
exports.mount_snapshots_on_all_compute_vms
1515
database : database
1516
cb : (err) ->
1517
if err
1518
dbg("ERROR: task_mount_snapshots_on_all_compute_vms failed! -- #{misc.to_json(err)}")
1519
else
1520
dbg("SUCCESS: task_mount_snapshots_on_all_compute_vms")
1521
cb?(err)
1522
1523
task_ensure_zfs_snapshots_are_mounted = (cb) ->
1524
misc_node.execute_code
1525
command : "mountpoint -q /projects && ls /projects/.zfs/snapshot/*/XXX"
1526
bash : true
1527
timeout : 60*5 # it can take a while to get the listing (usually like 20-30s first time)
1528
cb : (err, output) ->
1529
if err and output?.stderr?.indexOf("Object is remote") == -1
1530
# Object is remote *is* an expected error
1531
dbg("ERROR: task_ensure_zfs_snapshots_are_mounted failed! -- #{misc.to_json(err)}")
1532
dbg("will try again in 15s")
1533
setInterval(task_ensure_zfs_snapshots_are_mounted, 15000)
1534
else
1535
dbg("SUCCESS: task_ensure_zfs_snapshots_are_mounted")
1536
cb?(err)
1537
1538
# check which bup snapshots need updates once every 13 minutes
1539
setInterval(task_update_BUP, 1000*60*13)
1540
task_update_BUP()
1541
1542
# update sshfs mounts of snapshots every 3 minutes
1543
setInterval(task_mount_snapshots_on_all_compute_vms, 1000*60*3)
1544
1545
# update ZFS snapshots every 5 minutes
1546
setInterval(task_update_snapshots, 1000*60*5)
1547
task_update_snapshots()
1548
1549
# mount all of the ZFS snapshots
1550
# they should stay mounted due to
1551
# echo "options zfs zfs_expire_snapshot=8388608" >> /etc/modprobe.d/zfs.conf
1552
task_ensure_zfs_snapshots_are_mounted () ->
1553
task_mount_snapshots_on_all_compute_vms()
1554
1555
zfs_expire_snapshot = 8388608
1556
# anything more than 8388608s (=2^23s = 97 days!) for the zfs_expire_snapshot parameter fails to work, causing instant unmount :-(
1557
# Math.min due to http://stackoverflow.com/questions/12633405/what-is-the-maximum-delay-for-setinterval
1558
setInterval(task_ensure_zfs_snapshots_are_mounted, Math.min(2**31-1, zfs_expire_snapshot*1000))
1559
1560
cb()
1561
], (err) ->
1562
if err
1563
dbg("error -- #{err}")
1564
process.exit(1)
1565
)
1566
1567
1568
1569
###
1570
Watch for storage_request activity.
1571
###
1572
exports.activity = (opts) ->
1573
new Activity(opts)
1574
1575
class Activity
1576
constructor: (opts) ->
1577
opts = defaults opts,
1578
age_m : 10
1579
num : 30 # how many to show in summary
1580
cb : required
1581
@_age_m = opts.age_m
1582
@_num = opts.num
1583
@_init (err) =>
1584
opts.cb(err, @)
1585
1586
_init: (cb) =>
1587
dbg = (m) => winston.debug("activity: #{m}")
1588
async.series([
1589
(cb) =>
1590
dbg("connect to database")
1591
get_db (err, db) =>
1592
@_database = db
1593
cb(err)
1594
(cb) =>
1595
dbg("create synchronized table")
1596
# TODO: Get every project that has done a storage request recently
1597
age = misc.minutes_ago(@_age_m)
1598
FIELDS = ['project_id', 'storage_request', 'storage', 'host', 'state']
1599
database = @_database
1600
database.synctable
1601
table : 'projects'
1602
columns : FIELDS
1603
where :
1604
"storage_request#>>'{requested}' >= $" : age.toISOString()
1605
cb : (err, synctable) =>
1606
if err
1607
dbg("fail: #{err}")
1608
else
1609
dbg("got synctable")
1610
@_synctable = synctable
1611
cb(err)
1612
], cb)
1613
1614
get: (project_id) =>
1615
return @_synctable.get(project_id).toJS()
1616
1617
list: () =>
1618
return (x for x in @_synctable.get().valueSeq().toJS() when x.storage_request?.requested >= misc.minutes_ago(@_age_m))
1619
1620
# activity that was requested but not started -- this is BAD!
1621
ignored: () =>
1622
return (x for x in @list() when x.storage_request?.requested? and not x.storage_request?.finished and not x.storage_request?.started)
1623
1624
# activity that was requested and is running, but not done yet. This is probably OK.
1625
running: () =>
1626
return (x for x in @list() when x.storage_request?.requested? and not x.storage_request?.finished and x.storage_request?.started)
1627
1628
# activity that were requested, stated and finished.
1629
finished: () =>
1630
return (x for x in @list() when x.storage_request?.requested? and x.storage_request?.finished and x.storage_request?.started)
1631
1632
# pairs {project_id:?, action:?, wait:?, work:?}, where wait = how long from request to start, and work = how long from start to finish
1633
times: () =>
1634
v = []
1635
for x in @finished()
1636
v.push
1637
project_id : x.project_id
1638
requested : x.storage_request.requested
1639
host : x.host?.host
1640
storage : x.storage?.host
1641
action : x.storage_request.action
1642
wait : (x.storage_request.started - x.storage_request.requested)/1000
1643
work : (x.storage_request.finished - x.storage_request.started)/1000
1644
v.sort (a,b) ->
1645
return misc.cmp(a.wait + a.work, b.wait + b.work)
1646
return v
1647
1648
summary: () =>
1649
t = @times()
1650
data =
1651
times : t.slice(Math.max(0,t.length - @_num))
1652
running : @running().length
1653
finished : @finished().length
1654
ignored : @ignored().length
1655
s = misc.to_json(data)
1656
if s == @_last_data
1657
return
1658
@_last_data = s
1659
console.log('\n\n\n---------------------------------------------------\n\n')
1660
console.log(new Date())
1661
console.log " worst times: wait work action requested storage_host host"
1662
for x in data.times
1663
console.log " #{x.project_id} #{x.wait} #{x.work} #{x.action} #{x.requested} #{x.storage} #{x.host}"
1664
console.log " running : #{data.running}"
1665
console.log " finished : #{data.finished}"
1666
if data.ignored > 0 then warn = '*************************' else warn=''
1667
console.log " pending : #{data.ignored} #{warn}"
1668
1669
monitor: () =>
1670
f = require('underscore').debounce((=>@summary()), 1500)
1671
@_synctable.on('change', f)
1672
f()
1673
return
1674
1675
1676
# Return the storage requests (from the last age_m minutes) that are being ignored.
1677
# If this gets big then something is **terribly wrong**, e.g., a storage server isn't
1678
# working. This must get fixed, since otherwise user projects won't get saved to
1679
# longterm storage, new projects can't be created/opened, etc. This function below
1680
# gives the same thing as `activity.ignored()` would give above, but is a faster more
1681
# direct query (rather than setting up a changefeed, etc.). It's something that should
1682
# be done periodically as part of monitoring.
1683
exports.ignored_storage_requests = (opts) ->
1684
opts = defaults opts,
1685
age_m : 10
1686
all : true # if true, get's all ignored storage requests -- if false gets ones just for the host of this process.
1687
cb : required
1688
dbg = (m) -> winston.debug("ignored_storage_requests: #{m}")
1689
dbg()
1690
db = undefined
1691
v = undefined
1692
async.series([
1693
(cb) ->
1694
dbg("connect to database")
1695
get_db (err, _db) ->
1696
db = _db
1697
cb(err)
1698
(cb) ->
1699
dbg("doing query")
1700
# Projects that had a storage request recently (in the last age_m minutes)...
1701
# and we only want the ignored requests...
1702
# And the ones that haven't started and haven't finished
1703
query = "SELECT project_id,storage_request,storage,host,state FROM projects WHERE "
1704
params = [misc.minutes_ago(opts.age_m).toISOString()]
1705
query += " storage_request#>>'{requested}' >= $1 AND storage_request#>'{started}' IS NULL AND storage_request#>'{finished}' IS NULL "
1706
if not opts.all
1707
query += " AND storage#>>'{host}'=$2 "
1708
params.push(os.hostname())
1709
db._query
1710
query : query
1711
params : params
1712
cb : postgres.all_results (err, x) ->
1713
v = x
1714
cb(err)
1715
], (err) ->
1716
opts.cb(err, v)
1717
)
1718
1719
###
1720
1721
If the storage servers get messed up for some reason, run this. It'll ensure all projects
1722
that should have been saved for the last day are saved:
1723
1724
require 'c'; (require 'smc-hub/storage').save_projects_with_ignored_save_requests(age_m:60*24, limit:10, cb:done(), dry_run:false)
1725
1726
###
1727
exports.save_projects_with_ignored_save_requests = (opts) ->
1728
opts = defaults opts,
1729
age_m : 10
1730
limit : 5
1731
dry_run : true
1732
cb : undefined
1733
dbg = (m) -> winston.debug("save_projects_with_ignored_save_requests: #{m}")
1734
dbg()
1735
db = undefined
1736
compute_server = undefined
1737
v = undefined
1738
async.series([
1739
(cb) ->
1740
dbg("connect to database")
1741
get_db (err, _db) ->
1742
db = _db
1743
cb(err)
1744
(cb) ->
1745
dbg("get projects with ignored save requests")
1746
exports.ignored_storage_requests
1747
age_m : opts.age_m
1748
cb : (err, z) ->
1749
if err
1750
cb(err)
1751
else
1752
v = (x for x in z when x.storage_request.action == 'save')
1753
cb()
1754
(cb) ->
1755
if opts.dry_run or v.length == 0
1756
cb()
1757
return
1758
require('./compute-client').compute_server
1759
database : db
1760
cb : (err, x) ->
1761
if err
1762
cb(err)
1763
else
1764
compute_server = x
1765
cb()
1766
(cb) ->
1767
if opts.dry_run
1768
dbg("would save #{v.length} projects")
1769
cb()
1770
return
1771
f = (x, cb) ->
1772
compute_server.project
1773
project_id : x.project_id
1774
cb : (err, project) ->
1775
if err
1776
cb(err)
1777
else
1778
project.save(cb:cb)
1779
async.mapLimit(v, opts.limit, f, cb)
1780
], (err) => opts.cb?(err))
1781
1782
###########################
1783
# Command line interface
1784
###########################
1785
1786
program = require('commander')
1787
1788
main = () ->
1789
LOGS = join(process.env.HOME, 'logs')
1790
program.usage('[start/stop/restart/status] [options]')
1791
.option('--pidfile [string]', 'store pid in this file', String, "#{LOGS}/storage.pid")
1792
.option('--logfile [string]', 'write log to this file', String, "#{LOGS}/storage.log")
1793
.option('-e') # gets passed by coffee -e
1794
.parse(process.argv)
1795
1796
winston.debug("running as a deamon")
1797
daemon = require('start-stop-daemon')
1798
# run as a server/daemon (otherwise, is being imported as a library)
1799
process.addListener "uncaughtException", (err) ->
1800
winston.debug("BUG ****************************************************************************")
1801
winston.debug("Uncaught exception: " + err)
1802
winston.debug(err.stack)
1803
winston.debug("BUG ****************************************************************************")
1804
get_db (e, db) ->
1805
if not e
1806
db?.uncaught_exception(err)
1807
1808
async.series([
1809
(cb) ->
1810
misc_node.ensure_containing_directory_exists(program.pidfile, cb)
1811
(cb) ->
1812
misc_node.ensure_containing_directory_exists(program.logfile, cb)
1813
(cb) ->
1814
daemon({max:9999, pidFile:program.pidfile, outFile:program.logfile, errFile:program.logfile, logFile:'/dev/null'}, start_server)
1815
])
1816
1817
if program._name.split('.')[0] == 'storage'
1818
main()
1819
else
1820
winston.debug("imported storage as a library -- #{program._name}")
1821
1822
1823
1824