Contact
CoCalc Logo Icon
StoreFeaturesDocsShareSupport News AboutSign UpSign In
| Download
Views: 39539
1
###############################################################################
2
#
3
# CoCalc: Collaborative Calculation in the Cloud
4
#
5
# Copyright (C) 2016, Sagemath Inc.
6
#
7
# This program is free software: you can redistribute it and/or modify
8
# it under the terms of the GNU General Public License as published by
9
# the Free Software Foundation, either version 3 of the License, or
10
# (at your option) any later version.
11
#
12
# This program is distributed in the hope that it will be useful,
13
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
# GNU General Public License for more details.
16
#
17
# You should have received a copy of the GNU General Public License
18
# along with this program. If not, see <http://www.gnu.org/licenses/>.
19
#
20
###############################################################################
21
22
require('coffee-cache')
23
24
EXPERIMENTAL = false
25
26
if process.env.DEVEL
27
console.log("compute-client: DEVEL mode")
28
DEVEL = true
29
30
31
###
32
33
id='eb5c61ae-b37c-411f-9509-10adb51eb90b';require('smc-hub/compute-client').compute_server(cb:(e,s)->console.log(e);global.s=s; s.project(project_id:id,cb:(e,p)->global.p=p;cidonsole.log(e)))
34
35
Another example with database on local host
36
37
id='7fffd5b4-d140-4a34-a960-9f71fa7fc54b';require('smc-hub/compute-client').compute_server(cb:(e,s)->console.log(e);global.t=s; s.project(project_id:id,cb:(e, p)->global.p=p))
38
39
###
40
41
# obviously don't want to trigger this too quickly, since it may mean file loss.
42
AUTOMATIC_FAILOVER_TIME_S = 60*3 # NOTE: actual failover is actually disabled below; instead this is the timeout for giving up on getting status.
43
44
SERVER_STATUS_TIMEOUT_S = 7 # 7 seconds
45
46
#################################################################
47
#
48
# compute-client -- a node.js client that connects to a TCP server
49
# that is used by the hubs to organize compute nodes
50
#
51
#################################################################
52
53
# IMPORTANT: see schema.coffee for some important information about the project states.
54
STATES = require('smc-util/schema').COMPUTE_STATES
55
56
fs = require('fs')
57
os = require('os')
58
{EventEmitter} = require('events')
59
60
async = require('async')
61
winston = require('winston')
62
program = require('commander')
63
64
uuid = require('node-uuid')
65
66
misc_node = require('smc-util-node/misc_node')
67
68
message = require('smc-util/message')
69
misc = require('smc-util/misc')
70
71
# Set the log level
72
try
73
winston.remove(winston.transports.Console)
74
winston.add(winston.transports.Console, {level: 'debug', timestamp:true, colorize:true})
75
catch err
76
# ignore
77
78
{defaults, required} = misc
79
80
if process.env.SMC_STORAGE?
81
STORAGE = process.env.SMC_STORAGE
82
else if misc.startswith(require('os').hostname(), 'compute') # my official deploy: TODO -- should be moved to conf file.
83
STORAGE = 'storage0-us'
84
else
85
STORAGE = ''
86
# TEMPORARY:
87
88
89
#################################################################
90
#
91
# Client code -- runs in hub
92
#
93
#################################################################
94
95
###
96
On dev machine
97
98
require('smc-hub/compute-client').compute_server(cb:(e,s)->console.log(e);global.s=s)
99
100
In a project (the port depends on project):
101
require('smc-hub/compute-client').compute_server(dev:true, cb:(e,s)->console.log(e);global.s=s)
102
103
###
104
compute_server_cache = undefined
105
exports.compute_server = compute_server = (opts) ->
106
opts = defaults opts,
107
database : undefined
108
base_url : ''
109
dev : false # dev -- for single-user *development*; compute server runs in same process as client on localhost
110
single : false # single -- for single-server use/development; everything runs on a single machine.
111
cb : required
112
if compute_server_cache?
113
opts.cb(undefined, compute_server_cache)
114
else
115
compute_server_cache = new ComputeServerClient(opts)
116
117
class ComputeServerClient
118
constructor: (opts) ->
119
opts = defaults opts,
120
database : undefined
121
dev : false
122
single : false
123
base_url : '' # base url of webserver -- passed on to local_hub so it can start servers with correct base_url
124
cb : required
125
dbg = @dbg("constructor")
126
dbg(misc.to_json(misc.copy_without(opts, ['cb', 'database'])))
127
@_base_url = opts.base_url
128
@_project_cache = {}
129
@_project_cache_cb = {}
130
@_dev = opts.dev
131
@_single = opts.single
132
async.series([
133
(cb) =>
134
@_init_db(opts, cb)
135
(cb) =>
136
async.parallel([
137
(cb) =>
138
@_init_storage_servers_feed(cb)
139
(cb) =>
140
@_init_compute_servers_feed(cb)
141
], cb)
142
], (err) =>
143
if err
144
opts.cb(err)
145
else
146
compute_server_cache = @
147
opts.cb(err, @)
148
)
149
150
_init_db: (opts, cb) =>
151
if opts.database?
152
@database = opts.database
153
cb()
154
return
155
else
156
@database = require('./postgres').db(pool:1)
157
@database.connect(cb:cb)
158
159
_init_storage_servers_feed: (cb) =>
160
@database.synctable
161
table : 'storage_servers'
162
cb : (err, synctable) =>
163
@storage_servers = synctable
164
cb(err)
165
166
_init_compute_servers_feed: (cb) =>
167
@database.synctable
168
table : 'compute_servers'
169
cb : (err, synctable) =>
170
@compute_servers = synctable
171
cb(err)
172
173
dbg: (method) =>
174
return (m) => winston.debug("ComputeServerClient.#{method}: #{m}")
175
176
###
177
Get info about server and add to database
178
179
require 'c'; compute_server()
180
s.add_server(host:'compute8-us', cb:done())
181
s.add_server(host:'compute8-us', cb:done(), experimental:true)
182
s.add_server(host:os.hostname(), cb:done())
183
###
184
add_server: (opts) =>
185
opts = defaults opts,
186
host : required
187
dc : '' # deduced from hostname (everything after -) if not given
188
experimental : false # if true, don't allocate new projects here
189
member_host : false # if true, only for members-only projects
190
timeout : 30
191
cb : required
192
dbg = @dbg("add_server(#{opts.host})")
193
dbg("adding compute server to the database by grabbing conf files, etc.")
194
if @_single
195
dbg("single machine server -- just copy files directly")
196
@_add_server_single(opts)
197
return
198
199
if not opts.dc
200
i = opts.host.indexOf('-')
201
if i != -1
202
opts.dc = opts.host.slice(0,i)
203
204
get_file = (path, cb) =>
205
dbg("get_file: #{path}")
206
misc_node.execute_code
207
command : "ssh"
208
path : process.cwd()
209
timeout : opts.timeout
210
args : ['-o', 'StrictHostKeyChecking=no', opts.host, "cat #{path}"]
211
verbose : 0
212
cb : (err, output) =>
213
if err
214
cb(err)
215
else if output?.stderr and output.stderr.indexOf('No such file or directory') != -1
216
cb(output.stderr)
217
else
218
cb(undefined, output.stdout)
219
220
port = undefined; secret = undefined
221
{program} = require('smc-hub/compute-server')
222
async.series([
223
(cb) =>
224
async.parallel([
225
(cb) =>
226
get_file program.port_file, (err, x) =>
227
port = parseInt(x); cb(err)
228
(cb) =>
229
get_file program.secret_file, (err, x) =>
230
secret = x; cb(err)
231
], cb)
232
(cb) =>
233
dbg("update database")
234
@database.save_compute_server
235
host : opts.host
236
dc : opts.dc
237
port : port
238
secret : secret
239
experimental : opts.experimental
240
member_host : opts.member_host
241
cb : cb
242
], opts.cb)
243
244
_add_server_single: (opts) =>
245
opts = defaults opts,
246
timeout : 30
247
cb : required
248
dbg = @dbg("_add_server_single")
249
dbg("adding the compute server to the database by grabbing conf files, etc.")
250
port = secret = undefined
251
{program} = require('smc-hub/compute-server')
252
async.series([
253
(cb) =>
254
async.parallel([
255
(cb) =>
256
fs.readFile program.port_file, (err, x) =>
257
if x?
258
port = parseInt(x.toString())
259
cb(err)
260
(cb) =>
261
fs.readFile program.secret_file, (err, x) =>
262
if x?
263
secret = x.toString().trim()
264
cb(err)
265
], cb)
266
(cb) =>
267
dbg("update database")
268
@database.save_compute_server
269
host : 'localhost'
270
dc : ''
271
port : port
272
secret : secret
273
experimental : false
274
member_host : false
275
cb : cb
276
], opts.cb)
277
278
279
# Choose a host from the available compute_servers according to some
280
# notion of load balancing (not really worked out yet)
281
assign_host: (opts) =>
282
opts = defaults opts,
283
exclude : []
284
member_host : undefined # if true, put project on a member host; if false, don't put on a member host - ignore if not defined
285
cb : required
286
##opts.cb(undefined, 'compute20'); return # FOR TESTING!!!! -- would force to open there
287
dbg = @dbg("assign_host")
288
dbg("querying database")
289
@status
290
cb : (err, nodes) =>
291
if err
292
opts.cb(err)
293
else
294
# Ignore any exclude nodes
295
for host in opts.exclude
296
delete nodes[host]
297
# We want to choose the best (="least loaded?") working node.
298
v = []
299
for host, info of nodes
300
if EXPERIMENTAL
301
# only use experimental nodes
302
if not info.experimental
303
continue
304
else
305
# definitely don't assign experimental nodes
306
if info.experimental
307
continue
308
if opts.member_host? and (opts.member_host != !!info.member_host)
309
# host is member but project isn't (or vice versa)
310
continue
311
v.push(info)
312
info.host = host
313
if info.error?
314
info.score = 0
315
else
316
# 10 points if no load; 0 points if massive load
317
load = info.load?[0] ? 1 # 1 if not defined
318
info.score = Math.max(0, Math.round(10*(1 - load)))
319
# 1 point for each Gigabyte of available RAM that won't
320
# result in swapping if used
321
mem = info.memory?.MemAvailable ? 1000 # 1GB if not defined
322
mem /= 1000
323
info.score += Math.round(mem)
324
if v.length == 0
325
opts.cb("no hosts available")
326
return
327
# sort so highest scoring is first.
328
v.sort (a,b) =>
329
if a.score < b.score
330
return 1
331
else if a.score > b.score
332
return -1
333
else
334
return 0
335
dbg("scored host info = #{misc.to_json(([info.host,info.score] for info in v))}")
336
# finally choose one of the hosts with the highest score at random.
337
###
338
best_score = v[0].score
339
i = 0
340
while i < v.length and v[i].score == best_score
341
i += 1
342
v = v.slice(0,i)
343
###
344
345
# I'm disabling this, since in practice random is probably
346
# better than the very naive approach above, given that we sometimes
347
# have 100+ projects created at once, and they all end up in the same place!
348
opts.cb(undefined, misc.random_choice(v).host)
349
350
remove_from_cache: (opts) =>
351
opts = defaults opts,
352
host : required
353
winston.debug("remove_from_cache(host=#{opts.host})")
354
if @_socket_cache?
355
delete @_socket_cache[opts.host]
356
357
# get a socket connection to a particular compute server
358
socket: (opts) =>
359
opts = defaults opts,
360
host : required
361
cb : required
362
if not @_socket_cache?
363
@_socket_cache = {}
364
socket = @_socket_cache[opts.host]
365
if socket?
366
opts.cb(undefined, socket)
367
return
368
# IMPORTANT: in case socket gets called many times at once with the same host as input,
369
# we must only get a socket once, then return it to all the callers.
370
if not @_socket_cbs?
371
@_socket_cbs = {}
372
if not @_socket_cbs[opts.host]?
373
@_socket_cbs[opts.host] = [opts.cb]
374
@_get_socket opts.host, (err, socket) =>
375
if socket?
376
# Cache the socket we just got
377
@_socket_cache[opts.host] = socket
378
# Get list of callbacks to notify
379
v = @_socket_cbs[opts.host]
380
delete @_socket_cbs[opts.host] # don't notify again
381
# Notify callbacks
382
for cb in v
383
cb(err, socket)
384
else
385
@_socket_cbs[opts.host].push(opts.cb)
386
387
# the following is used internally by @socket to actually get a socket, but with no
388
# checking or caching.
389
_get_socket: (host, cb) =>
390
dbg = @dbg("socket(#{host})")
391
if @_dev
392
dbg("development mode 'socket'")
393
require('./compute-server').fake_dev_socket (err, socket) =>
394
if err
395
cb(err)
396
else
397
@_socket_cache[host] = socket
398
socket.on 'mesg', (type, mesg) =>
399
if type == 'json'
400
if mesg.event == 'project_state_update'
401
winston.debug("state_update #{misc.to_safe_str(mesg)}")
402
@database.set_project_state
403
project_id : mesg.project_id
404
state : mesg.state
405
time : mesg.time
406
error : mesg.state_error
407
cb : (err) =>
408
if err
409
winston.debug("Error setting state of #{mesg.project_id} in database -- #{err}")
410
cb(undefined, socket)
411
return
412
413
info = undefined
414
socket = undefined
415
async.series([
416
(cb) =>
417
dbg("getting port and secret (host='#{host}')...")
418
@database.get_compute_server
419
host : host
420
cb : (err, x) =>
421
info = x; cb(err)
422
(cb) =>
423
if not info?
424
err = "no information about host='#{host}' in database"
425
dbg(err)
426
cb(err)
427
return
428
dbg("connecting to #{host}:#{info.port}...")
429
misc_node.connect_to_locked_socket
430
host : host
431
port : info.port
432
token : info.secret
433
timeout : 15
434
cb : (err, _socket) =>
435
if err
436
dbg("failed to connect: #{err}")
437
cb(err)
438
else
439
socket = _socket
440
misc_node.enable_mesg(socket)
441
socket.id = uuid.v4()
442
dbg("successfully connected -- socket #{socket.id}")
443
socket.on 'end', () =>
444
dbg("socket #{socket.id} ended")
445
for _, p of @_project_cache
446
if p._socket_id == socket.id
447
delete p._socket_id
448
if @_socket_cache[host]?.id == socket.id
449
delete @_socket_cache[host]
450
socket.removeAllListeners()
451
socket.on 'mesg', (type, mesg) =>
452
if type == 'json'
453
if mesg.event == 'project_state_update'
454
winston.debug("state_update #{misc.to_safe_str(mesg)}")
455
@database.set_project_state
456
project_id : mesg.project_id
457
state : mesg.state
458
time : mesg.time
459
error : mesg.state_error
460
cb : (err) =>
461
if err
462
winston.debug("Error setting state of #{mesg.project_id} in database -- #{err}")
463
else
464
winston.debug("mesg (hub <- #{host}): #{misc.to_safe_str(mesg)}")
465
cb()
466
], (err) =>
467
cb(err, socket)
468
)
469
470
###
471
Send message to a server and get back result:
472
473
x={};require('smc-hub/compute-client').compute_server(keyspace:'devel',cb:(e,s)->console.log(e);x.s=s;x.s.call(host:'localhost',mesg:{event:'ping'},cb:console.log))
474
###
475
call: (opts) =>
476
opts = defaults opts,
477
host : required
478
mesg : undefined
479
timeout : 15
480
project : undefined
481
cb : required
482
dbg = @dbg("call(hub --> #{opts.host})")
483
if DEVEL
484
dbg("(hub --> compute) #{misc.to_json(opts.mesg)}")
485
#dbg("(hub --> compute) #{misc.to_safe_str(opts.mesg)}")
486
socket = undefined
487
resp = undefined
488
if not opts.mesg.id?
489
opts.mesg.id = uuid.v4()
490
async.series([
491
(cb) =>
492
dbg('getting socket')
493
@socket
494
host : opts.host
495
cb : (err, s) =>
496
dbg("got socket #{err}")
497
socket = s; cb(err)
498
(cb) =>
499
dbg("sending mesg")
500
if opts.project?
501
# record that this socket was used by the given project
502
# (so on close can invalidate info)
503
opts.project._socket_id = socket.id
504
socket.write_mesg 'json', opts.mesg, (err) =>
505
if err
506
e = "error writing to socket -- #{err}"
507
dbg(e)
508
cb(e)
509
else
510
dbg("waiting to receive response with id #{opts.mesg.id}")
511
start_time = new Date()
512
socket.recv_mesg
513
type : 'json'
514
id : opts.mesg.id
515
timeout : opts.timeout
516
cb : (mesg) =>
517
dbg("got response -- #{misc.to_safe_str(mesg)}")
518
if mesg.event == 'error'
519
dbg("error = #{mesg.error}")
520
if new Date() - start_time >= Math.max(opts.timeout/1.1, 3000)
521
# very likely a timeout error, so don't re-use this socket (this is dangerous though since we can accumulate sockets and DOS the compute server!)
522
dbg("deleting socket cache for '#{opts.host}'")
523
delete @_socket_cache[opts.host]
524
cb(mesg.error)
525
else
526
delete mesg.id
527
resp = mesg
528
dbg("success: resp=#{misc.to_safe_str(resp)}")
529
cb()
530
], (err) =>
531
opts.cb(err, resp)
532
)
533
534
###
535
Get a project:
536
x={};require('smc-hub/compute-client').compute_server(cb:(e,s)->console.log(e);x.s=s;x.s.project(project_id:'20257d4e-387c-4b94-a987-5d89a3149a00',cb:(e,p)->console.log(e);x.p=p))
537
###
538
project: (opts) =>
539
opts = defaults opts,
540
project_id : required
541
cb : required
542
p = @_project_cache[opts.project_id]
543
if p?
544
opts.cb(undefined, p)
545
else
546
# This v is so that if project is called again before the first
547
# call returns, then both calls get the same project back.
548
v = @_project_cache_cb[opts.project_id]
549
if v?
550
v.push(opts.cb)
551
return
552
v = @_project_cache_cb[opts.project_id] = [opts.cb]
553
new ProjectClient
554
project_id : opts.project_id
555
compute_server : @
556
cb : (err, project) =>
557
delete @_project_cache_cb[opts.project_id]
558
if not err
559
@_project_cache[opts.project_id] = project
560
for cb in v
561
if err
562
cb(err)
563
else
564
cb(undefined, project)
565
566
# get status information about compute servers
567
status: (opts) =>
568
opts = defaults opts,
569
hosts : undefined # list of hosts or undefined=all compute servers
570
timeout : SERVER_STATUS_TIMEOUT_S # compute server must respond this quickly or {error:some sort of timeout error..}
571
min_interval_s : 60 # don't connect to compute servers and update their status more frequently than this.
572
cb : required # cb(err, {host1:status, host2:status2, ...})
573
dbg = @dbg('status')
574
if @_dev
575
opts.hosts = ['localhost']
576
result = {}
577
if opts.hosts?
578
for host in opts.hosts
579
result[host] = {} # may get updated below based on db query
580
581
cutoff = misc.seconds_ago(opts.min_interval_s) # only query server if at least 1 minute has elapsed since last status query
582
583
dbg("getting list of all compute server hostnames from database")
584
@compute_servers.get().map (server, k) =>
585
x = server.toJS()
586
if not opts.hosts? or x.host in opts.hosts
587
result[x.host] =
588
experimental : x.experimental
589
member_host : x.member_host
590
if (x.status?.timestamp ? 0) >= cutoff
591
for k, v of x.status
592
result[x.host][k] = v
593
dbg("considering #{misc.len(result)} compute servers")
594
dbg("querying servers #{misc.to_json(misc.keys(result))} for their status")
595
f = (host, cb) =>
596
if result[host].timestamp?
597
# we copied the data in above -- nothing to update
598
cb()
599
return
600
@call
601
host : host
602
mesg : message.compute_server_status()
603
timeout : opts.timeout
604
cb : (err, resp) =>
605
if err
606
result[host].error = err
607
else
608
if not resp?.status
609
status = {error:"invalid response -- no status"}
610
else
611
status = resp.status
612
status.timestamp = new Date()
613
for k, v of status
614
result[host][k] = v
615
# also, set in the database (don't wait on this or require success)
616
@database.set_compute_server_status
617
host : host
618
status : resp.status
619
cb()
620
async.map(misc.keys(result), f, (err) => opts.cb(err, result))
621
622
# WARNING: vacate_compute_server is **UNTESTED**
623
vacate_compute_server: (opts) =>
624
opts = defaults opts,
625
compute_server : required # array
626
move : false
627
targets : undefined # array
628
cb : required
629
@database.get_projects_on_compute_server
630
compute_server : opts.compute_server
631
columns : ['project_id']
632
cb : (err, results) =>
633
if err
634
opts.cb(err)
635
else
636
winston.debug("got them; now processing...")
637
v = (x.project_id for x in results)
638
winston.debug("found #{v.length} on #{opts.compute_server}")
639
i = 0
640
f = (project_id, cb) =>
641
winston.debug("moving #{project_id} off of #{opts.compute_server}")
642
if opts.move
643
@project
644
project_id : project_id
645
cb : (err, project) =>
646
if err
647
cb(err)
648
else
649
if opts.targets?
650
i = (i + 1)%opts.targets.length
651
project.move
652
target : opts.targets?[i]
653
cb : cb
654
async.mapLimit(v, 15, f, opts.cb)
655
656
###
657
projects = require('misc').split(fs.readFileSync('/home/salvus/work/2015-amath/projects').toString())
658
require('smc-hub/compute-client').compute_server(cb:(e,s)->console.log(e); s.set_quotas(projects:projects, cores:4, cb:(e)->console.log("DONE",e)))
659
###
660
set_quotas: (opts) =>
661
opts = defaults opts,
662
projects : required # array of project id's
663
disk_quota : undefined
664
cores : undefined
665
memory : undefined
666
cpu_shares : undefined
667
network : undefined
668
mintime : undefined # in seconds
669
cb : required
670
projects = opts.projects
671
delete opts.projects
672
cb = opts.cb
673
delete opts.cb
674
f = (project_id, cb) =>
675
o = misc.copy(opts)
676
o.cb = cb
677
@project
678
project_id : project_id
679
cb : (err, project) =>
680
project.set_quotas(o)
681
async.mapLimit(projects, 10, f, cb)
682
683
###
684
projects = require('misc').split(fs.readFileSync('/home/salvus/tmp/projects').toString())
685
require('smc-hub/compute-client').compute_server(cb:(e,s)->console.log(e); s.move(projects:projects, target:'compute5-us', cb:(e)->console.log("DONE",e)))
686
687
s.move(projects:projects, target:'compute4-us', cb:(e)->console.log("DONE",e))
688
###
689
move: (opts) =>
690
opts = defaults opts,
691
projects : required # array of project id's
692
target : required
693
limit : 10
694
cb : required
695
projects = opts.projects
696
delete opts.projects
697
cb = opts.cb
698
delete opts.cb
699
f = (project_id, cb) =>
700
@project
701
project_id : project_id
702
cb : (err, project) =>
703
project.move(target: opts.target, cb:cb)
704
async.mapLimit(projects, opts.limit, f, cb)
705
706
# x={};require('smc-hub/compute-client').compute_server(cb:(e,s)->console.log(e);x.s=s;x.s.tar_backup_recent(max_age_h:1, cb:(e)->console.log("DONE",e)))
707
tar_backup_recent: (opts) =>
708
opts = defaults opts,
709
max_age_h : required
710
limit : 1 # number to backup in parallel
711
gap_s : 5 # wait this long between backing up each project
712
cb : required
713
dbg = @dbg("tar_backup_recent")
714
target = undefined
715
async.series([
716
(cb) =>
717
@database.recently_modified_projects
718
max_age_s : opts.max_age_h*60*60
719
cb : (err, results) =>
720
if err
721
cb(err)
722
else
723
dbg("got #{results.length} projects modified in the last #{opts.max_age_h} hours")
724
target = results
725
cb()
726
727
(cb) =>
728
i = 0
729
n = misc.len(target)
730
winston.debug("next backing up resulting #{n} targets")
731
running = {}
732
f = (project_id, cb) =>
733
fs.exists "/projects/#{project_id}", (exists) =>
734
if not exists
735
winston.debug("skipping #{project_id} since not here")
736
cb(); return
737
j = i + 1
738
i += 1
739
running[j] = project_id
740
winston.debug("*****************************************************")
741
winston.debug("** #{j}/#{n}: #{project_id}")
742
winston.debug("RUNNING=#{misc.to_json(misc.keys(running))}")
743
winston.debug("*****************************************************")
744
745
smc_compute
746
args : ['tar_backup', project_id]
747
cb : (err) =>
748
delete running[j]
749
winston.debug("*****************************************************")
750
winston.debug("** #{j}/#{n}: DONE -- #{project_id}, DONE")
751
winston.debug("RUNNING=#{misc.to_json(running)}")
752
winston.debug("*****************************************************")
753
winston.debug("result of backing up #{project_id}: #{err}")
754
if err
755
cb(err)
756
else
757
winston.debug("Now waiting #{opts.gap_s} seconds...")
758
setTimeout(cb, opts.gap_s*1000)
759
async.mapLimit(target, opts.limit, f, cb)
760
], opts.cb)
761
762
# Query database for all projects that are opened (so deployed on a compute VM), but
763
# have not been touched in at least the given number of days. For each such project,
764
# stop it, save it, and close it (deleting files off compute server). This should be
765
# run periodically as a maintenance operation to free up disk space on compute servers.
766
# require('smc-hub/compute-client').compute_server(cb:(e,s)->console.log(e);global.s=s)
767
# s.close_open_unused_projects(dry_run:false, min_age_days:120, max_age_days:180, threads:5, host:'compute0-us', cb:(e,x)->console.log("TOTALLY DONE!!!",e))
768
close_open_unused_projects: (opts) =>
769
opts = defaults opts,
770
min_age_days : required
771
max_age_days : required
772
host : required # server on which to close unused projects
773
threads : 1 # number to close in parallel
774
dry_run : false # if true, just explain what would get deleted, but don't actually do anything.
775
limit : undefined # if given, do this many of the closes, then just stop (use to test before going full on)
776
cb : required
777
dbg = @dbg("close_open_unused_projects")
778
target = undefined
779
async.series([
780
(cb) =>
781
@database.get_open_unused_projects
782
min_age_days : opts.min_age_days
783
max_age_days : opts.max_age_days
784
host : opts.host
785
cb : (err, results) =>
786
if err
787
cb(err)
788
else
789
dbg("got #{results.length} open projects that were not used in the last #{opts.min_age_days} days")
790
target = results
791
cb()
792
(cb) =>
793
n = misc.len(target)
794
winston.debug("There are #{n} projects to save and close.")
795
if opts.limit
796
target = target.slice(0, opts.limit)
797
n = misc.len(target)
798
winston.debug("Reducing to only #{n} of them due to limit=#{opts.limit} parameter.")
799
if opts.dry_run
800
cb()
801
return
802
i = 0
803
done = 0
804
winston.debug("next saving and closing #{n} projects")
805
running = {}
806
f = (project_id, cb) =>
807
j = i + 1
808
i += 1
809
running[j] = project_id
810
winston.debug("*****************************************************")
811
winston.debug("** #{j}/#{n}: #{project_id}")
812
winston.debug("RUNNING=#{misc.to_json(misc.keys(running))}")
813
winston.debug("*****************************************************")
814
@project
815
project_id : project_id
816
cb : (err, project) =>
817
if err
818
winston.debug("ERROR!!! #{err}")
819
cb(err)
820
else
821
state = undefined
822
async.series([
823
(cb) =>
824
# see if project is really not closed
825
project.state
826
cb : (err, s) =>
827
if err
828
err = "error computing state -- #{err}"
829
cb(err)
830
else
831
state = s?.state
832
cb()
833
(cb) =>
834
if state == 'closed'
835
cb(); return
836
project.close
837
cb: cb
838
], (err) =>
839
project.free()
840
delete running[j]
841
done += 1
842
winston.debug("*****************************************************")
843
winston.debug("FINISHED #{done} of #{n}")
844
winston.debug("** #{j}/#{n}: DONE -- #{project_id}, DONE")
845
winston.debug("RUNNING=#{misc.to_json(running)}")
846
winston.debug("*****************************************************")
847
winston.debug("result of closing #{project_id}: #{err}")
848
cb(err)
849
)
850
async.mapLimit(target, opts.threads, f, cb)
851
], opts.cb)
852
853
# Set all quotas of *all* projects on the given host.
854
# Do this periodically as part of general maintenance in case something slips through the cracks.
855
set_all_quotas: (opts) =>
856
opts = defaults opts,
857
host : required
858
limit : 1 # number to do at once
859
cb : undefined
860
dbg = @dbg("set_all_quotas")
861
dbg("host=#{opts.host}, limit=#{opts.limit}")
862
projects = undefined
863
async.series([
864
(cb) =>
865
dbg("get all the projects on this server")
866
@database.get_projects_on_compute_server
867
compute_server : opts.host
868
cb : (err, x) =>
869
projects = x
870
cb(err)
871
(cb) =>
872
dbg("call set_all_quotas on each project")
873
n = 0
874
f = (project, cb) =>
875
n += 1
876
dbg("#{n}/#{projects.length}")
877
@project
878
project_id : project.project_id
879
cb : (err, p) =>
880
if err
881
cb(err)
882
else
883
p.set_all_quotas(cb: cb)
884
async.mapLimit(projects, opts.limit, f, cb)
885
])
886
887
# This Projectclient has no garbage collection/way to free itself.
888
# Once a project is created, it just sits there with a changefeed,
889
# etc. Never freed. Not sure what to do...
890
class ProjectClient extends EventEmitter
891
constructor: (opts) ->
892
opts = defaults opts,
893
project_id : required
894
compute_server : required
895
cb : required
896
@project_id = opts.project_id
897
@compute_server = opts.compute_server
898
@_dev = @compute_server._dev
899
@_single = @compute_server._single
900
901
dbg = @dbg('constructor')
902
dbg()
903
# initialize tables and force a state update
904
async.series [@_init_synctable, @_init_storage_server], (err) =>
905
dbg("initialized ProjectClient")
906
opts.cb(err, @)
907
908
# free -- stop listening for status updates from the database and broadcasting
909
# updates about this project.
910
# NOTE: as of writing this line, this free is never called by hub, and idle_timeout_s
911
# is used instead below (of course, free could be used by maintenance operations).
912
free: () =>
913
# Ensure that next time this project gets requested, a fresh one is created, rather than
914
# this cached one, which has been free'd up, and will no longer work.
915
delete @compute_server._project_cache[@project_id]
916
# Close the changefeed, so get no further data from database.
917
@_synctable.close()
918
# Make sure nothing else reacts to changes on this ProjectClient, since they won't happen.
919
@removeAllListeners()
920
921
_init_synctable: (cb) =>
922
dbg = @dbg('_init_synctable')
923
dbg()
924
# don't want stale data:
925
@host = @assigned = @_state = @_state_time = @_state_error = undefined
926
@_stale = true
927
db = @compute_server.database
928
# It's *critical* that idle_timeout_s be used below, since I haven't
929
# come up with any good way to "garbage collect" ProjectClient objects,
930
# due to the async complexity of everything.
931
# ** TODO: IMPORTANT - idle_timeout_s is NOT IMPLEMENTED in postgres-synctable yet! **
932
db.synctable
933
idle_timeout_s : 60*10 # 10 minutes -- should be long enough for any single operation;
934
# but short enough that connections get freed up.
935
table : 'projects'
936
columns : ['project_id', 'host', 'state', 'storage', 'storage_request']
937
where : {"project_id = $::UUID" : @project_id}
938
where_function : (project_id) =>
939
return project_id == @project_id # fast easy test for matching
940
cb : (err, x) =>
941
if err
942
dbg("error initializing synctable -- #{err}")
943
cb(err)
944
else
945
dbg("initialized synctable successfully")
946
@_stale = false
947
@_synctable = x
948
update = () =>
949
new_val = @_synctable.get(@project_id)?.toJS()
950
if not new_val?
951
# The project hasn't been created/written to the database yet, in which
952
# case there is nothing to do. It should get added in a moment and
953
# trigger another update.
954
return
955
old_host = @host
956
@host = new_val.host?.host
957
@assigned = new_val.host?.assigned
958
@_state = new_val.state?.state
959
@_state_time = new_val.state?.time
960
@_state_error = new_val.state?.error
961
@emit(@_state, @)
962
if STATES[@_state]?.stable
963
@emit('stable', @_state)
964
if old_host? and @host != old_host
965
@emit('host_changed', @host) # event whenever host changes from one set value to another (e.g., move or failover)
966
update()
967
@_synctable.on('change', update)
968
cb()
969
970
# ensure project has a storage server assigned to it (if there are any)
971
_init_storage_server: (cb) =>
972
dbg = @dbg('_init_storage_server')
973
@_synctable.connect
974
cb : (err) =>
975
if err
976
cb(err)
977
return
978
if @_synctable.getIn([@project_id, 'storage', 'host'])
979
dbg('already done')
980
cb()
981
return
982
# assign a storage server, if there are any
983
hosts = @compute_server.storage_servers.get()?.keySeq().toJS() ? []
984
if hosts.length == 0
985
dbg('no storage servers')
986
cb()
987
return
988
# TODO: use some size-balancing algorithm here!
989
host = misc.random_choice(hosts)
990
dbg("assigning storage server '#{host}'")
991
@compute_server.database.set_project_storage
992
project_id : @project_id
993
host : host
994
cb : cb
995
996
dbg: (method) =>
997
(m) => winston.debug("ProjectClient(project_id='#{@project_id}','#{@host}').#{method}: #{m}")
998
999
# Choose a compute server on which to place this project. If project already assigned a host
1000
# and the host exists, just returns that. This doesn't actually set the host assignment in
1001
# the database.
1002
get_host: (opts) =>
1003
opts = defaults opts,
1004
cb : required # (err, hostname of compute server)
1005
host = @host
1006
member_host = undefined
1007
dbg = @dbg("get_host")
1008
t = misc.mswalltime()
1009
if host
1010
# The host might no longer be defined at all, so we should check this here.
1011
if not @compute_server.compute_servers.get(host)?
1012
host = undefined
1013
async.series([
1014
(cb) =>
1015
if host
1016
cb()
1017
else
1018
@get_quotas
1019
cb : (err, quota) =>
1020
member_host = !!quota?.member_host
1021
cb(err)
1022
(cb) =>
1023
if host
1024
cb()
1025
else
1026
dbg("assigning some host (member_host=#{member_host})")
1027
@compute_server.assign_host
1028
member_host : member_host
1029
cb : (err, h) =>
1030
if err
1031
dbg("error assigning random host -- #{err}")
1032
cb(err)
1033
else
1034
host = h
1035
cb()
1036
], (err) =>
1037
opts.cb?(err, host)
1038
)
1039
1040
_action: (opts) =>
1041
opts = defaults opts,
1042
action : required
1043
args : undefined
1044
timeout : 30
1045
cb : required
1046
dbg = @dbg("_action(action=#{opts.action})")
1047
if not @host
1048
opts.cb('project must be open before doing this action - no known host')
1049
return
1050
dbg("args=#{misc.to_safe_str(opts.args)}")
1051
dbg("calling compute server at '#{@host}'")
1052
@compute_server.call
1053
host : @host
1054
project : @
1055
mesg :
1056
message.compute
1057
project_id : @project_id
1058
action : opts.action
1059
args : opts.args
1060
timeout : opts.timeout
1061
cb : (err, resp) =>
1062
if err
1063
dbg("error calling compute server -- #{err}")
1064
# CRITICAL: For heavily loaded systems, an error as above can happen a lot.
1065
# The server will get removed when the connection itself closes.
1066
# So do not remove from cache except in some cases (not for every
1067
# possible err message). Removing from the cache willy nilly in all cases
1068
# results in a huge number of connections from the hub to compute
1069
# servers, which crashes everything.
1070
if "#{err}".indexOf('error writing to socket') != -1
1071
# See https://github.com/sagemathinc/cocalc/issues/507
1072
# Experience suggests that when we get this error, it gets stuck like this
1073
# and never goes away -- in this case we want to try again with a new connection.
1074
@compute_server.remove_from_cache(host:@host)
1075
opts.cb(err)
1076
else
1077
dbg("got response #{misc.to_safe_str(resp)}")
1078
if resp.error?
1079
opts.cb(resp.error)
1080
else
1081
opts.cb(undefined, resp)
1082
1083
_set_state: (opts) =>
1084
opts.project_id = @project_id
1085
@compute_server.database.set_project_state(opts)
1086
1087
###
1088
id='20257d4e-387c-4b94-a987-5d89a3149a00'; require('smc-hub/compute-client').compute_server(cb:(e,s)->console.log(e);global.s=s;s.project(project_id:id, cb:(e,p)->console.log(e);global.p=p; p.state(cb:console.log)))
1089
###
1090
1091
state: (opts) =>
1092
opts = defaults opts,
1093
force : false
1094
update : false
1095
cb : required # cb(err, {state:?, time:?, error:?})
1096
dbg = @dbg("state()")
1097
if @_stale
1098
opts.cb("not connected to database")
1099
return
1100
state_obj = =>
1101
return {state : @_state, time : @_state_time, error : @_state_error}
1102
1103
if not @host
1104
if @_dev or @_single
1105
# in case of dev or single mode, open will properly setup the host.
1106
the_state = undefined
1107
async.series([
1108
(cb) =>
1109
@open(cb:cb)
1110
(cb) =>
1111
if not @host
1112
cb("BUG: host not defined after open")
1113
return
1114
# open succeeded; now call state
1115
@state
1116
force : opts.force
1117
cb : (err, state) =>
1118
the_state = state
1119
cb(err)
1120
], (err) =>
1121
opts.cb(err, the_state)
1122
)
1123
return
1124
1125
# Full multi-machine deployment: project definitely not open on any host
1126
if @_state != 'closed'
1127
dbg("project not opened, but state in db not closed -- set to closed")
1128
now = new Date()
1129
@_set_state
1130
state : 'closed'
1131
time : now
1132
cb : (err) =>
1133
if err
1134
opts.cb(err)
1135
else
1136
opts.cb(undefined, {state:'closed', time:now})
1137
else
1138
# state object is valid
1139
opts.cb(undefined, state_obj())
1140
return
1141
1142
STATE_UPDATE_INTERVAL_S = 30 # always update db after this many seconds, no matter what
1143
if opts.force or not @_state_time? or new Date() - (@_last_state_update ? 0) >= 1000*STATE_UPDATE_INTERVAL_S
1144
dbg("calling remote compute server for state")
1145
@_action
1146
action : "state"
1147
args : if opts.update then ['--update']
1148
timeout : 60
1149
cb : (err, resp) =>
1150
@_last_state_update = new Date()
1151
if err
1152
dbg("problem getting state -- #{err}")
1153
opts.cb(err)
1154
else
1155
dbg("got '#{misc.to_json(resp)}'")
1156
if @_state != resp.state or @_state_error != resp.state_error or (resp.time - @_state_time >= 1000*STATE_UPDATE_INTERVAL_S)
1157
# Set the latest info about state that we got in the database so that
1158
# clients and other hubs know about it.
1159
@_state = resp.state; @_state_time = resp.time; @_state_error = resp.state_error
1160
@_set_state
1161
state : resp.state
1162
time : resp.time
1163
error : resp.state_error
1164
cb : (err) =>
1165
if err
1166
dbg("Error setting state of #{@project_id} in database -- #{err}")
1167
opts.cb(undefined, state_obj())
1168
else
1169
opts.cb(undefined, state_obj())
1170
1171
# information about project (ports, state, etc. )
1172
status: (opts) =>
1173
opts = defaults opts,
1174
cb : required
1175
dbg = @dbg("status")
1176
dbg()
1177
status = undefined
1178
async.series([
1179
(cb) =>
1180
@_action
1181
action : "status"
1182
cb : (err, s) =>
1183
if not err
1184
status = s
1185
cb(err)
1186
(cb) =>
1187
dbg("get status from compute server")
1188
f = (cb) =>
1189
@_action
1190
action : "status"
1191
cb : (err, s) =>
1192
if not err
1193
status = s
1194
# save status in database
1195
@compute_server.database.set_project_status
1196
project_id : @project_id
1197
status : status
1198
cb : cb
1199
else
1200
cb(err)
1201
# we retry getting status with exponential backoff until we hit max_time, which
1202
# triggers failover of project to another node.
1203
misc.retry_until_success
1204
f : f
1205
start_delay : 10000
1206
max_time : AUTOMATIC_FAILOVER_TIME_S*1000
1207
cb : (err) =>
1208
if err
1209
m = "failed to get status -- project not working on #{@host}"
1210
dbg(m)
1211
cb(m)
1212
## Auto failover disabled for now.
1213
## Now we actually initiate the failover, which could take a long time,
1214
## depending on how big the project is.
1215
#@move
1216
# force : true
1217
# cb : (err) =>
1218
# dbg("result of failover -- #{err}")
1219
else
1220
cb()
1221
(cb) =>
1222
@get_quotas
1223
cb : (err, quotas) =>
1224
if err
1225
cb(err)
1226
else
1227
status.host = @host
1228
status.ssh = @host
1229
status.quotas = quotas
1230
cb()
1231
], (err) =>
1232
if err
1233
opts.cb(err)
1234
else
1235
opts.cb(undefined, status)
1236
)
1237
1238
1239
# COMMANDS:
1240
1241
# open project files on some node.
1242
# A project is by definition opened on a host if @host is set.
1243
open: (opts) =>
1244
opts = defaults opts,
1245
host : undefined # if given and project not on any host (so @host undefined), then this host will be used
1246
cb : required
1247
@_synctable?.connect()
1248
if @host and @_state != 'closed'
1249
# already opened
1250
opts.cb()
1251
return
1252
dbg = @dbg("open")
1253
dbg()
1254
if @_dev or @_single
1255
host = 'localhost'
1256
async.series([
1257
(cb) =>
1258
if not @host
1259
@compute_server.database.set_project_host
1260
project_id : @project_id
1261
host : host
1262
cb : cb
1263
else
1264
cb()
1265
(cb) =>
1266
@_set_state
1267
state : 'opened'
1268
cb : cb
1269
], opts.cb)
1270
return
1271
1272
host = undefined
1273
async.series([
1274
(cb) =>
1275
if opts.host
1276
host = opts.host
1277
cb()
1278
else
1279
dbg("choose a host")
1280
@get_host
1281
cb : (err, h) =>
1282
host = h
1283
cb(err)
1284
(cb) =>
1285
dbg("unset project host")
1286
# important, so that we know when the project has been opened (see "wait until host set" below)
1287
@compute_server.database.unset_project_host
1288
project_id : @project_id
1289
cb : cb
1290
(cb) =>
1291
dbg("request to open on '#{host}'")
1292
@_storage_request
1293
action : 'open'
1294
target : host
1295
cb : cb
1296
(cb) =>
1297
dbg("succeeded in opening; wait until host set")
1298
@_synctable.wait
1299
until : (table) => table.getIn([@project_id, 'host', 'host'])?
1300
timeout : 30 # should be very fast
1301
cb : cb
1302
(cb) =>
1303
dbg('update state')
1304
@state
1305
force : true
1306
update : true
1307
cb : cb
1308
], (err) =>
1309
dbg("opening done -- #{err}")
1310
opts.cb(err)
1311
)
1312
1313
1314
# start local_hub daemon running (must be opened somewhere)
1315
start: (opts) =>
1316
opts = defaults opts,
1317
set_quotas : true # if true, also sets all quotas
1318
cb : required
1319
dbg = @dbg("start")
1320
@_synctable?.connect()
1321
if @_state == 'running'
1322
dbg("already running")
1323
if opts.set_quotas
1324
@set_all_quotas(cb : opts.cb)
1325
else
1326
opts.cb()
1327
return
1328
if @_state == 'starting'
1329
dbg("wait until running")
1330
@wait_for_a_state
1331
states : ['running']
1332
timeout : 30
1333
cb : (err) =>
1334
if err
1335
opts.cb(err)
1336
else if opts.set_quotas
1337
@set_all_quotas(cb : opts.cb)
1338
else
1339
opts.cb()
1340
return
1341
async.series([
1342
(cb) =>
1343
if opts.set_quotas
1344
# CRITICAL: some quotas -- like member hosting must be set before project starts.
1345
dbg("setting all quotas")
1346
@set_all_quotas(cb:cb)
1347
else
1348
cb()
1349
(cb) =>
1350
@open(cb : cb)
1351
(cb) =>
1352
dbg("issuing the start command")
1353
@_action
1354
action : "start"
1355
args : ['--base_url', @compute_server._base_url]
1356
cb : cb
1357
(cb) =>
1358
dbg("waiting until running")
1359
@wait_for_a_state
1360
states : ['running']
1361
timeout : 30
1362
cb : cb
1363
(cb) =>
1364
if opts.set_quotas
1365
# CRITICAL: the quotas **MUST** also be set after the project has started, since some of
1366
# the quotas, e.g., disk space and network, can't be set until the Linux account
1367
# has been created.
1368
dbg("setting all quotas")
1369
@set_all_quotas(cb:cb)
1370
else
1371
cb()
1372
], (err) =>
1373
opts.cb(err)
1374
)
1375
1376
# restart project -- must be opened or running
1377
restart: (opts) =>
1378
opts = defaults opts,
1379
set_quotas : true
1380
cb : required
1381
@_synctable?.connect()
1382
dbg = @dbg("restart")
1383
dbg("get state")
1384
state = undefined
1385
async.series([
1386
(cb) =>
1387
@wait_stable_state
1388
timeout : 30
1389
cb : (err, s) =>
1390
state = s; cb(err)
1391
(cb) =>
1392
if state != 'running'
1393
dbg("just start it")
1394
@start(cb: cb)
1395
else
1396
dbg("stop it")
1397
@stop
1398
cb : (err) =>
1399
if err
1400
cb(err)
1401
else
1402
@start(cb:cb)
1403
(cb) =>
1404
if opts.set_quotas
1405
dbg("setting all quotas")
1406
@set_all_quotas(cb:cb)
1407
else
1408
cb()
1409
], opts.cb)
1410
1411
# kill everything and remove project from this compute
1412
# node (must be opened somewhere)
1413
close: (opts) =>
1414
opts = defaults opts,
1415
cb : required
1416
@_synctable?.connect()
1417
args = []
1418
dbg = @dbg("close()")
1419
dbg()
1420
async.series([
1421
(cb) =>
1422
dbg("stop project from running")
1423
if @_state == 'running'
1424
@stop(cb:cb)
1425
else
1426
cb()
1427
(cb) =>
1428
dbg("doing storage request to close")
1429
@_storage_request
1430
action : 'close'
1431
cb : cb
1432
], opts.cb)
1433
1434
ensure_opened_or_running: (opts) =>
1435
opts = defaults opts,
1436
cb : undefined # cb(err, state='opened' or 'running')
1437
@_synctable?.connect()
1438
state = undefined
1439
dbg = @dbg("ensure_opened_or_running")
1440
async.series([
1441
(cb) =>
1442
dbg("get state")
1443
@wait_stable_state
1444
cb : (err, s) =>
1445
state = s; cb(err)
1446
(cb) =>
1447
if state == 'running' or state == 'opened'
1448
cb()
1449
else if state == 'closed'
1450
dbg("opening")
1451
@open
1452
cb : (err) =>
1453
if err
1454
cb(err)
1455
else
1456
dbg("it opened")
1457
state = 'opened'
1458
cb()
1459
else
1460
cb("bug -- state='#{state}' should be stable but isn't known")
1461
], (err) => opts.cb?(err, state))
1462
1463
ensure_running: (opts) =>
1464
opts = defaults opts,
1465
cb : undefined
1466
@_synctable?.connect()
1467
state = undefined
1468
dbg = @dbg("ensure_running")
1469
async.series([
1470
(cb) =>
1471
@wait_stable_state
1472
cb : (err, s) =>
1473
state = s; cb(err)
1474
(cb) =>
1475
f = () =>
1476
dbg("start running")
1477
@start(cb : cb)
1478
if state == 'running'
1479
cb()
1480
else if state == 'opened'
1481
f()
1482
else if state == 'closed'
1483
dbg("open first")
1484
@open
1485
cb : (err) =>
1486
if err
1487
cb(err)
1488
else
1489
dbg("project opened; now start running")
1490
f()
1491
else
1492
cb("bug -- state=#{state} should be stable but isn't known")
1493
], (err) => opts.cb?(err))
1494
1495
ensure_closed: (opts) =>
1496
opts = defaults opts,
1497
cb : undefined
1498
@_synctable?.connect()
1499
dbg = @dbg("ensure_closed()")
1500
state = undefined
1501
async.series([
1502
(cb) =>
1503
@wait_stable_state
1504
cb : (err, s) =>
1505
state = s; cb(err)
1506
(cb) =>
1507
f = () =>
1508
dbg("close project")
1509
@close(cb : cb)
1510
if state == 'closed'
1511
cb()
1512
else if state == 'opened'
1513
f()
1514
else if state == 'running'
1515
dbg("is running so first stop it")
1516
@stop
1517
cb : (err) =>
1518
if err
1519
cb(err)
1520
else
1521
f()
1522
else
1523
cb("bug -- state=#{state} should be stable but isn't known")
1524
], (err) => opts.cb?(err))
1525
1526
# Determine whether or not a storage request is currently running for this project
1527
is_storage_request_running: () =>
1528
@_synctable?.connect()
1529
x = @_synctable.getIn([@project_id, 'storage_request'])
1530
if not x?
1531
return false
1532
x = x.toJS()
1533
if x.started? and not x.finished? and (new Date() - x.started) < 1000*60*30 # 30m=stale
1534
return true
1535
return false
1536
1537
wait_storage_request_finish: (opts) =>
1538
opts = defaults opts,
1539
timeout : 60*30
1540
cb : required
1541
winston.debug("wait_storage_request_finish")
1542
@_synctable.wait
1543
until : (table) => table.getIn([@project_id, 'storage_request', 'finished'])?
1544
timeout : opts.timeout
1545
cb : opts.cb
1546
1547
wait_stable_state: (opts) =>
1548
opts = defaults opts,
1549
timeout : 60*10 # 10 minutes
1550
cb : required
1551
dbg = (m) => winston.debug("wait_stable_state (state='#{@_synctable.getIn([@project_id, 'state', 'state'])}')': #{m}")
1552
dbg('causing state update')
1553
@state # opportunity to cause state update
1554
force : true
1555
update : true
1556
cb : () =>
1557
dbg("waiting for state to change to something stable")
1558
@_synctable.wait
1559
timeout : opts.timeout
1560
cb : opts.cb
1561
until : (table) =>
1562
state = table.getIn([@project_id, 'state', 'state'])
1563
if STATES[state]?.stable
1564
dbg("synctable changed to stable state")
1565
return state
1566
else
1567
dbg("synctable changed but state NOT stable yet; keep waiting...")
1568
return false
1569
1570
wait_for_a_state: (opts) =>
1571
opts = defaults opts,
1572
timeout : 60 # 1 minute
1573
states : required
1574
cb : required
1575
dbg = (m) => winston.debug("wait_for_a_state in #{misc.to_json(opts.states)}, state='#{@_synctable.getIn([@project_id, 'state', 'state'])}': #{m}")
1576
dbg("cause state update")
1577
if @_dev
1578
@_wait_for_a_state_dev(opts)
1579
return
1580
@state
1581
force : true
1582
cb : =>
1583
# Compute server will update synctable when change happens.
1584
@_synctable.wait
1585
timeout : opts.timeout
1586
cb : opts.cb
1587
until : (table) =>
1588
state = table.getIn([@project_id, 'state', 'state'])
1589
if state in opts.states
1590
dbg("in the right state")
1591
return state
1592
else
1593
dbg("wait longer...")
1594
1595
_wait_for_a_state_dev: (opts) =>
1596
# For smc-in-smc dev, we have to **manually** cause another check,
1597
# since there is no separate compute server running!
1598
dbg = (m) => winston.debug("_wait_for_a_state(dev) in #{misc.to_json(opts.states)}, state='#{@_synctable.getIn([@project_id, 'state', 'state'])}': #{m}")
1599
dbg("retry until succeess")
1600
misc.retry_until_success
1601
max_time : opts.timeout*1000
1602
start_delay : 250
1603
max_delay : 3000
1604
f : (cb) =>
1605
dbg("force update")
1606
@state
1607
force : true
1608
update : true
1609
cb : =>
1610
state = @_synctable.getIn([@project_id, 'state', 'state'])
1611
if state in opts.states
1612
dbg("in a required state")
1613
opts.cb(undefined, state)
1614
cb()
1615
else
1616
cb('keep trying')
1617
1618
# Move project from one compute node to another one. Both hosts are assumed to be working!
1619
# We will have to write something else to deal with auto-failover in case of a host not working.
1620
move: (opts) =>
1621
opts = defaults opts,
1622
target : undefined # hostname of a compute server; if not given, one (diff than current) will be chosen by load balancing
1623
force : false # ignored for now
1624
cb : required
1625
@_synctable?.connect()
1626
dbg = @dbg("move(target:'#{opts.target}')")
1627
if opts.target? and @host == opts.target
1628
dbg("project is already at target -- not moving")
1629
opts.cb()
1630
return
1631
1632
member_host = undefined
1633
async.series([
1634
(cb) =>
1635
if opts.target?
1636
cb()
1637
else
1638
dbg("determine member_host status of project")
1639
@get_quotas
1640
cb : (err, quota) =>
1641
member_host = !!quota?.member_host
1642
dbg("member_host=#{member_host}")
1643
cb(err)
1644
(cb) =>
1645
dbg("determine target (member_host=#{member_host})")
1646
if opts.target?
1647
cb()
1648
else
1649
exclude = []
1650
if @host
1651
exclude.push(@host)
1652
@compute_server.assign_host
1653
exclude : exclude
1654
member_host : member_host
1655
cb : (err, host) =>
1656
if err
1657
cb(err)
1658
else
1659
dbg("assigned target = #{host}")
1660
opts.target = host
1661
cb()
1662
(cb) =>
1663
dbg("stop project from running so user doesn't lose work during transfer and processes aren't left around")
1664
if @_state == 'running'
1665
@stop
1666
cb : (err) =>
1667
# ignore error on purpose
1668
cb()
1669
else
1670
cb()
1671
(cb) =>
1672
dbg("doing storage request")
1673
@_storage_request
1674
action : 'move'
1675
target : opts.target
1676
cb : cb
1677
(cb) =>
1678
dbg("project now opened on target")
1679
@_set_state
1680
state : 'opened'
1681
cb : cb
1682
], opts.cb)
1683
1684
stop: (opts) =>
1685
opts = defaults opts,
1686
cb : required
1687
@_synctable?.connect()
1688
@dbg("stop")("will kill all processes")
1689
async.series([
1690
(cb) =>
1691
@_action
1692
action : "stop"
1693
cb : cb
1694
(cb) =>
1695
@wait_for_a_state
1696
states : ['opened', 'closed']
1697
cb : cb
1698
], opts.cb)
1699
1700
_storage_request: (opts) =>
1701
opts = defaults opts,
1702
action : required
1703
target : undefined
1704
cb : undefined
1705
m = "_storage_request(action='#{opts.action}'"
1706
m += if opts.target? then ",target='#{opts.target}')" else ")"
1707
dbg = @dbg(m)
1708
dbg("")
1709
if (@compute_server.storage_servers.get()?.size ? 0) == 0
1710
dbg('no storage servers -- so all _storage_requests trivially done')
1711
opts.cb?()
1712
return
1713
if @is_storage_request_running()
1714
opts.cb?("already doing a storage request")
1715
return
1716
1717
final_state = fail_state = undefined
1718
async.series([
1719
(cb) =>
1720
@_synctable.connect(cb:cb)
1721
(cb) =>
1722
state = @_synctable.getIn([@project_id, 'state', 'state'])
1723
switch opts.action
1724
when 'open'
1725
action_state = 'opening'
1726
final_state = 'opened'
1727
fail_state = 'closed'
1728
when 'save'
1729
action_state = 'saving'
1730
final_state = state
1731
fail_state = state
1732
when 'close'
1733
action_state = 'closing'
1734
final_state = 'closed'
1735
fail_state = 'opened'
1736
else
1737
final_state = fail_state = state
1738
if action_state?
1739
dbg("set state to '#{action_state}'")
1740
@_set_state
1741
state : action_state
1742
cb : cb
1743
else
1744
cb()
1745
(cb) =>
1746
dbg("update database with *request* to '#{misc.to_json(opts.action)}' -- this causes storage server to doing something")
1747
@compute_server.database.set_project_storage_request
1748
project_id : @project_id
1749
action : opts.action
1750
target : opts.target
1751
cb : cb
1752
(cb) =>
1753
dbg("wait for action to finish")
1754
@wait_storage_request_finish
1755
cb : (err) =>
1756
if err
1757
dbg("set state to fail state")
1758
@_set_state
1759
state : fail_state
1760
error : err
1761
cb : cb
1762
else
1763
cb()
1764
(cb) =>
1765
dbg("set state to '#{final_state}'")
1766
@_set_state
1767
state : final_state
1768
cb : cb
1769
], (err) =>
1770
opts.cb?(err)
1771
)
1772
1773
save: (opts) =>
1774
opts = defaults opts,
1775
min_interval : 5 # fail if already saved less than this many MINUTES (use 0 to disable) ago
1776
cb : undefined
1777
dbg = @dbg("save(min_interval:#{opts.min_interval})")
1778
dbg("")
1779
@_synctable.connect
1780
cb : (err) =>
1781
if err
1782
opts.cb?(err)
1783
return
1784
# update @_last_save with value from database (could have been saved by another compute server)
1785
s = @_synctable.getIn([@project_id, 'storage', 'saved'])
1786
if not @_last_save? or s > @_last_save
1787
@_last_save = s
1788
1789
# Do a client-side test to see if we have saved too recently
1790
if opts.min_interval and @_last_save and (new Date() - @_last_save) < 1000*60*opts.min_interval
1791
dbg("already saved")
1792
opts.cb?("already saved within min_interval")
1793
return
1794
@_last_save = new Date()
1795
dbg('doing actual save')
1796
@_storage_request
1797
action : 'save'
1798
cb : opts.cb
1799
1800
# do this but don't block on returning.
1801
dbg("send message to storage server that project is being saved")
1802
# it will be marked as active as a result (so it doesn't idle timeout)
1803
@_action
1804
action : "save"
1805
cb : (err) =>
1806
dbg("finished save message to backend: #{err}")
1807
1808
_address: (cb) =>
1809
dbg = @dbg("_address")
1810
dbg("get project location and listening port -- will open and start project if necessary")
1811
address = undefined
1812
async.series([
1813
(cb) =>
1814
dbg("first ensure project is running")
1815
@ensure_running(cb:cb)
1816
(cb) =>
1817
@status
1818
cb : (err, status) =>
1819
if err
1820
cb(err)
1821
else
1822
if status.state != 'running'
1823
dbg("something went wrong and not running ?! -- status=#{misc.to_json(status)}")
1824
cb("not running") # DO NOT CHANGE -- exact callback error is used by client code in the UI
1825
else
1826
dbg("status includes info about address...")
1827
address =
1828
host : @host
1829
port : status['local_hub.port']
1830
secret_token : status.secret_token
1831
cb()
1832
], (err) =>
1833
cb(err, address)
1834
)
1835
1836
# This will keep trying for up to an hour to get the address, with exponential
1837
# decay backing off up to 15s between attempts.
1838
address: (opts) =>
1839
opts = defaults opts,
1840
cb : required
1841
if @_address_cbs?
1842
@_address_cbs.push(opts.cb)
1843
return
1844
@_synctable?.connect()
1845
@_address_cbs = [opts.cb]
1846
dbg = @dbg("address")
1847
dbg()
1848
address = undefined
1849
misc.retry_until_success
1850
f : (cb) =>
1851
@_address (err, x) =>
1852
address = x
1853
cb(err)
1854
start_delay : 3000
1855
max_delay : 15000
1856
max_time : 3600*1000
1857
cb : (err) =>
1858
if not address and not err
1859
err = "failed to get address"
1860
for cb in @_address_cbs
1861
cb(err, address)
1862
delete @_address_cbs
1863
1864
copy_path: (opts) =>
1865
opts = defaults opts,
1866
path : ""
1867
target_project_id : ""
1868
target_path : "" # path into project; if "", defaults to path above.
1869
overwrite_newer : false # if true, newer files in target are copied over (otherwise, uses rsync's --update)
1870
delete_missing : false # if true, delete files in dest path not in source, **including** newer files
1871
backup : false # make backup files
1872
exclude_history : false
1873
timeout : 5*60
1874
bwlimit : undefined
1875
cb : required
1876
dbg = @dbg("copy_path(#{opts.path} to #{opts.target_project_id})")
1877
dbg("copy a path using rsync from one project to another")
1878
if not opts.target_project_id
1879
opts.target_project_id = @project_id
1880
if not opts.target_path
1881
opts.target_path = opts.path
1882
args = ["--path", opts.path,
1883
"--target_project_id", opts.target_project_id,
1884
"--target_path", opts.target_path]
1885
if opts.overwrite_newer
1886
args.push('--overwrite_newer')
1887
if opts.delete_missing
1888
args.push('--delete_missing')
1889
if opts.backup
1890
args.push('--backup')
1891
if opts.exclude_history
1892
args.push('--exclude_history')
1893
if opts.bwlimit
1894
args.push('--bwlimit')
1895
args.push(opts.bwlimit)
1896
dbg("created args=#{misc.to_safe_str(args)}")
1897
target_project = undefined
1898
async.series([
1899
(cb) =>
1900
@ensure_opened_or_running
1901
cb : cb
1902
(cb) =>
1903
if opts.target_project_id == @project_id
1904
cb()
1905
else
1906
dbg("getting other project and ensuring that it is already opened")
1907
@compute_server.project
1908
project_id : opts.target_project_id
1909
cb : (err, x) =>
1910
if err
1911
dbg("error ")
1912
cb(err)
1913
else
1914
target_project = x
1915
target_project.ensure_opened_or_running
1916
cb : (err) =>
1917
if err
1918
cb(err)
1919
else
1920
dbg("got other project on #{target_project.host}")
1921
args.push("--target_hostname")
1922
args.push(target_project.host)
1923
cb()
1924
(cb) =>
1925
containing_path = misc.path_split(opts.target_path).head
1926
if not containing_path
1927
dbg("target path need not be made since is home dir")
1928
cb(); return
1929
dbg("create containing target directory = #{containing_path}")
1930
if opts.target_project_id != @project_id
1931
target_project._action
1932
action : 'mkdir'
1933
args : [containing_path]
1934
timeout : opts.timeout
1935
cb : cb
1936
else
1937
@_action
1938
action : 'mkdir'
1939
args : [containing_path]
1940
timeout : opts.timeout
1941
cb : cb
1942
(cb) =>
1943
dbg("doing the actual copy")
1944
@_action
1945
action : 'copy_path'
1946
args : args
1947
timeout : opts.timeout
1948
cb : cb
1949
(cb) =>
1950
if target_project?
1951
dbg("target is another project, so saving that project (if possible)")
1952
target_project.save
1953
cb: (err) =>
1954
if err
1955
# NON-fatal: this could happen, e.g, if already saving... very slightly dangerous.
1956
dbg("warning: can't save target project -- #{err}")
1957
cb()
1958
else
1959
cb()
1960
], (err) =>
1961
if err
1962
dbg("error -- #{err}")
1963
opts.cb(err)
1964
)
1965
1966
directory_listing: (opts) =>
1967
opts = defaults opts,
1968
path : ''
1969
hidden : false
1970
time : false # sort by timestamp, with newest first?
1971
start : 0
1972
limit : -1
1973
cb : required
1974
dbg = @dbg("directory_listing")
1975
@ensure_opened_or_running
1976
cb : (err) =>
1977
if err
1978
opts.cb(err)
1979
else
1980
args = []
1981
if opts.hidden
1982
args.push("--hidden")
1983
if opts.time
1984
args.push("--time")
1985
# prefix relative paths with ./ such that names starting with a dash do not confuse parsing arguments
1986
args.push("--path")
1987
if opts.path[0] isnt '/'
1988
args.push("./#{opts.path}")
1989
else
1990
args.push(opts.path)
1991
for k in ['start', 'limit']
1992
args.push("--#{k}"); args.push(opts[k])
1993
dbg("get listing of files using options #{misc.to_safe_str(args)}")
1994
@_action
1995
action : 'directory_listing'
1996
args : args
1997
cb : opts.cb
1998
1999
read_file: (opts) =>
2000
opts = defaults opts,
2001
path : required
2002
maxsize : 3000000 # maximum file size in bytes to read
2003
cb : required # cb(err, Buffer)
2004
dbg = @dbg("read_file(path:'#{opts.path}')")
2005
dbg("read a file or directory from disk") # directories get zip'd
2006
@ensure_opened_or_running
2007
cb : (err) =>
2008
if err
2009
opts.cb(err)
2010
else
2011
@_action
2012
action : 'read_file'
2013
args : [opts.path, "--maxsize", opts.maxsize]
2014
cb : (err, resp) =>
2015
if err
2016
opts.cb(err)
2017
else
2018
opts.cb(undefined, new Buffer(resp.base64, 'base64'))
2019
2020
get_quotas: (opts) =>
2021
opts = defaults opts,
2022
cb : required
2023
@dbg("get_quotas")("lookup project quotas in the database")
2024
@compute_server.database.get_project_quotas
2025
project_id : @project_id
2026
cb : opts.cb
2027
2028
# If member_host is true, make sure project is on a members only host, and if
2029
# member_host is false, make sure project is NOT on a members only host.
2030
# If project is not open on any host, don't do anything. This function
2031
# never puts project on an experimental server.
2032
# This does *NOT* set anything in the database about this project being member_host'ed;
2033
# that's entirely determined by upgrades.
2034
set_member_host: (opts) =>
2035
opts = defaults opts,
2036
member_host : required
2037
cb : required
2038
if @_dev or @_single or not @host
2039
# dev environments -- only one host. Or, not open on any host.
2040
opts.cb()
2041
return
2042
# Ensure that member_host is a boolean for below; it is an integer -- 0 or >= 1 -- elsewhere. But below
2043
# we very explicitly assume it is boolean (due to coffeescript not doing coercion).
2044
opts.member_host = opts.member_host > 0
2045
dbg = @dbg("set_member_host(member_host=#{opts.member_host})")
2046
host_is_members_only = !!@compute_server.compute_servers.getIn([@host, 'member_host'])
2047
dbg("host_is_members_only = #{host_is_members_only}")
2048
if opts.member_host == host_is_members_only
2049
# done -- nothing to do
2050
opts.cb()
2051
return
2052
dbg("must move project, if possible")
2053
w = []
2054
@compute_server.compute_servers.get().map (server, host) =>
2055
if server.get('experimental')
2056
return
2057
if opts.member_host == !!server.get('member_host')
2058
w.push(host)
2059
if w.length == 0
2060
opts.cb("there are no #{if not opts.member_host then 'non-' else ''}members only hosts available")
2061
return
2062
target = misc.random_choice(w)
2063
dbg("moving project to #{target}...")
2064
@move
2065
target : target
2066
cb : opts.cb
2067
2068
set_quotas: (opts) =>
2069
# Ignore any quotas that aren't in the list below: these are the only ones that
2070
# the local compute server supports. It is convenient to allow the caller to
2071
# pass in additional quota settings.
2072
if @_dev
2073
opts.cb(); return
2074
opts = misc.copy_with(opts, ['disk_quota', 'cores', 'memory', 'cpu_shares', 'network', 'mintime', 'member_host', 'cb'])
2075
dbg = @dbg("set_quotas")
2076
dbg("set various quotas")
2077
commands = undefined
2078
async.series([
2079
(cb) =>
2080
if not opts.member_host?
2081
cb()
2082
else
2083
dbg("ensure machine is or is not on member host")
2084
@set_member_host
2085
member_host : opts.member_host
2086
cb : cb
2087
(cb) =>
2088
dbg("get state")
2089
@state
2090
cb: (err, s) =>
2091
if err
2092
cb(err)
2093
else
2094
dbg("state = #{s.state}")
2095
commands = STATES[s.state].commands
2096
cb()
2097
(cb) =>
2098
async.parallel([
2099
(cb) =>
2100
if opts.network? and commands.indexOf('network') != -1
2101
dbg("update network: #{opts.network}")
2102
@_action
2103
action : 'network'
2104
args : if opts.network then [] else ['--ban']
2105
cb : cb
2106
else
2107
cb()
2108
(cb) =>
2109
if opts.mintime? and commands.indexOf('mintime') != -1
2110
dbg("update mintime quota on project")
2111
@_action
2112
action : 'mintime'
2113
args : [opts.mintime]
2114
cb : (err) =>
2115
cb(err)
2116
else
2117
cb()
2118
(cb) =>
2119
if opts.disk_quota? and commands.indexOf('disk_quota') != -1
2120
dbg("disk quota")
2121
@_action
2122
action : 'disk_quota'
2123
args : [opts.disk_quota]
2124
cb : cb
2125
else
2126
cb()
2127
(cb) =>
2128
if (opts.cores? or opts.memory? or opts.cpu_shares?) and commands.indexOf('compute_quota') != -1
2129
dbg("compute quota")
2130
args = []
2131
for s in ['cores', 'memory', 'cpu_shares']
2132
if opts[s]?
2133
if s == 'cpu_shares'
2134
opts[s] = Math.floor(opts[s])
2135
args.push("--#{s}")
2136
args.push(opts[s])
2137
@_action
2138
action : 'compute_quota'
2139
args : args
2140
cb : cb
2141
else
2142
cb()
2143
], cb)
2144
], (err) =>
2145
dbg("done setting quotas")
2146
opts.cb(err)
2147
)
2148
2149
set_all_quotas: (opts) =>
2150
opts = defaults opts,
2151
cb : required
2152
dbg = @dbg("set_all_quotas")
2153
quotas = undefined
2154
async.series([
2155
(cb) =>
2156
dbg("looking up quotas for this project from database")
2157
@get_quotas
2158
cb : (err, x) =>
2159
quotas = x; cb(err)
2160
(cb) =>
2161
dbg("setting the quotas to #{misc.to_json(quotas)}")
2162
quotas.cb = cb
2163
@set_quotas(quotas)
2164
], (err) => opts.cb(err))
2165
2166
2167