Contact
CoCalc Logo Icon
StoreFeaturesDocsShareSupport News AboutSign UpSign In
| Download
Views: 39539
1
###############################################################################
2
#
3
# CoCalc: Collaborative Calculation in the Cloud
4
#
5
# Copyright (C) 2016, Sagemath Inc.
6
#
7
# This program is free software: you can redistribute it and/or modify
8
# it under the terms of the GNU General Public License as published by
9
# the Free Software Foundation, either version 3 of the License, or
10
# (at your option) any later version.
11
#
12
# This program is distributed in the hope that it will be useful,
13
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
# GNU General Public License for more details.
16
#
17
# You should have received a copy of the GNU General Public License
18
# along with this program. If not, see <http://www.gnu.org/licenses/>.
19
#
20
###############################################################################
21
22
require('coffee-cache')
23
24
###
25
26
compute-server -- runs on the compute nodes; is also imported as a module
27
28
###
29
30
CONF = '/projects/conf'
31
SQLITE_FILE = undefined
32
DEV = false # if true, in special single-process dev mode, where this code is being run directly by the hub.
33
34
START_TIME = new Date().getTime() # milliseconds
35
36
# IMPORTANT: see schema.coffee for some important information about the project states.
37
STATES = require('smc-util/schema').COMPUTE_STATES
38
39
net = require('net')
40
fs = require('fs')
41
42
async = require('async')
43
winston = require('winston')
44
program = require('commander')
45
46
uuid = require('node-uuid')
47
48
misc_node = require('smc-util-node/misc_node')
49
50
message = require('smc-util/message')
51
misc = require('smc-util/misc')
52
53
sqlite = require('smc-util-node/sqlite')
54
55
56
# Set the log level
57
try
58
winston.remove(winston.transports.Console)
59
winston.add(winston.transports.Console, {level: 'debug', timestamp:true, colorize:true})
60
catch err
61
# ignore
62
63
{defaults, required} = misc
64
65
TIMEOUT = 60*60
66
67
if process.env.SMC_STORAGE?
68
STORAGE = process.env.SMC_STORAGE
69
else if misc.startswith(require('os').hostname(), 'compute') # my official deploy: TODO -- should be moved to conf file.
70
STORAGE = 'storage0-us'
71
else
72
STORAGE = ''
73
# TEMPORARY:
74
75
76
77
#################################################################
78
#
79
# Server code -- runs on the compute server
80
#
81
#################################################################
82
83
TIMEOUT = 60*60
84
85
smc_compute = (opts) =>
86
opts = defaults opts,
87
args : required
88
timeout : TIMEOUT
89
cb : required
90
if DEV
91
winston.debug("dev_smc_compute: running #{misc.to_json(opts.args)}")
92
path = require('path')
93
command = path.join(process.env.SALVUS_ROOT, 'smc_pyutil/smc_pyutil/smc_compute.py')
94
PROJECT_PATH = process.env.COCALC_PROJECT_PATH ? path.join(process.env.SALVUS_ROOT, 'data', 'projects')
95
v = ['--dev', "--projects", PROJECT_PATH]
96
else
97
winston.debug("smc_compute: running #{misc.to_safe_str(opts.args)}")
98
command = "sudo"
99
v = ["/usr/local/bin/smc-compute"]
100
if program.single
101
v.push("--single")
102
103
misc_node.execute_code
104
command : command
105
args : v.concat(opts.args)
106
timeout : opts.timeout
107
bash : false
108
path : process.cwd()
109
cb : (err, output) =>
110
#winston.debug(misc.to_safe_str(output))
111
winston.debug("smc_compute: finished running #{opts.args.join(' ')} -- #{err}")
112
if err
113
if output?.stderr
114
opts.cb(output.stderr)
115
else
116
opts.cb(err)
117
else
118
opts.cb(undefined, if output.stdout then misc.from_json(output.stdout) else undefined)
119
120
project_cache = {}
121
project_cache_cb = {}
122
get_project = (opts) ->
123
opts = defaults opts,
124
project_id : required
125
cb : required
126
project = project_cache[opts.project_id]
127
if project?
128
opts.cb(undefined, project)
129
return
130
v = project_cache_cb[opts.project_id]
131
if v?
132
v.push(opts.cb)
133
return
134
v = project_cache_cb[opts.project_id] = [opts.cb]
135
new Project
136
project_id : opts.project_id
137
cb : (err, project) ->
138
winston.debug("got project #{opts.project_id}")
139
delete project_cache_cb[opts.project_id]
140
if not err
141
project_cache[opts.project_id] = project
142
for cb in v
143
if err
144
cb(err)
145
else
146
cb(undefined, project)
147
148
class Project
149
constructor: (opts) ->
150
opts = defaults opts,
151
project_id : required
152
cb : required
153
@project_id = opts.project_id
154
@_command_cbs = {}
155
@_state_listeners = {}
156
@_last = {} # last time a giving action was initiated
157
dbg = @dbg("constructor")
158
sqlite_db.select
159
table : 'projects'
160
columns : ['state', 'state_time', 'state_error', 'mintime',
161
'network', 'cores', 'memory', 'cpu_shares']
162
where : {project_id : @project_id}
163
cb : (err, results) =>
164
if err
165
dbg("error -- #{err}")
166
opts.cb(err); return
167
if results.length == 0
168
dbg("nothing in db")
169
@_state = undefined
170
@_state_time = new Date()
171
@_state_error = undefined
172
@_network = false
173
else
174
@_state = results[0].state
175
@_state_time = new Date(results[0].state_time)
176
@_state_error= results[0].state_error
177
@_mintime = results[0].mintime
178
@_network = results[0].network
179
@_cores = results[0].cores
180
@_memory = results[0].memory
181
@_cpu_shares = results[0].cpu_shares
182
dbg("fetched project info from db: state=#{@_state}, state_time=#{@_state_time}, state_error=#{@_state_error}, mintime=#{@_mintime}s")
183
if not STATES[@_state]?.stable
184
dbg("updating non-stable state")
185
@_update_state(@_state_error, ((err) => opts.cb(err, @)))
186
return
187
opts.cb(undefined, @)
188
189
dbg: (method) =>
190
return (m) => winston.debug("Project(#{@project_id}).#{method}: #{m}")
191
192
add_listener: (socket) =>
193
if not @_state_listeners[socket.id]?
194
dbg = @dbg("add_listener")
195
dbg("adding #{socket.id}")
196
@_state_listeners[socket.id] = socket
197
socket.on 'close', () =>
198
dbg("closing #{socket.id} and removing listener")
199
delete @_state_listeners[socket.id]
200
201
_update_state_db: (cb) =>
202
dbg = @dbg("_update_state_db")
203
dbg("new state=#{@_state}")
204
sqlite_db.update
205
table : 'projects'
206
set :
207
state : @_state
208
state_time : @_state_time - 0
209
state_error : if not @_state_error? then '' else @_state_error
210
where :
211
project_id : @project_id
212
cb : cb
213
214
_update_state_listeners: () =>
215
dbg = @dbg("_update_state_listeners")
216
mesg = message.project_state_update
217
project_id : @project_id
218
state : @_state
219
time : @_state_time
220
state_error : @_state_error
221
dbg("send message to each of the #{misc.len(@_state_listeners)} listeners that the state has been updated = #{misc.to_safe_str(mesg)}")
222
for id, socket of @_state_listeners
223
dbg("sending mesg to socket #{id}")
224
socket.write_mesg('json', mesg)
225
226
_command: (opts) =>
227
opts = defaults opts,
228
action : required
229
args : undefined
230
at_most_one : false # ignores subsequent args if set -- only use this for things where args don't matter
231
timeout : TIMEOUT
232
cb : undefined
233
dbg = @dbg("_command(action:'#{opts.action}')")
234
235
if opts.at_most_one
236
if @_command_cbs[opts.action]?
237
@_command_cbs[opts.action].push(opts.cb)
238
return
239
else
240
@_command_cbs[opts.action] = [opts.cb]
241
242
@_last[opts.action] = new Date()
243
args = [opts.action]
244
if opts.args?
245
args = args.concat(opts.args)
246
args.push(@project_id)
247
dbg("args=#{misc.to_safe_str(args)}")
248
smc_compute
249
args : args
250
timeout : opts.timeout
251
cb : (err, result) =>
252
if opts.at_most_one
253
v = @_command_cbs[opts.action]
254
delete @_command_cbs[opts.action]
255
for cb in v
256
cb?(err, result)
257
else
258
opts.cb?(err, result)
259
260
command: (opts) =>
261
opts = defaults opts,
262
action : required
263
args : undefined
264
cb : undefined
265
after_command_cb : undefined # called after the command completes (even if it is long)
266
dbg = @dbg("command(action=#{opts.action}, args=#{misc.to_safe_str(opts.args)})")
267
state = undefined
268
state_info = undefined
269
assigned = undefined
270
resp = undefined
271
async.series([
272
(cb) =>
273
dbg("get state")
274
@state
275
cb: (err, s) =>
276
dbg("got state=#{misc.to_safe_str(s)}, #{err}")
277
if err
278
opts.after_command_cb?(err)
279
cb(err)
280
else
281
state = s.state
282
cb()
283
(cb) =>
284
if opts.action == 'save'
285
# The actual save is done completely from the outside by the storage servers.
286
# However, we update the state change time (e.g., from running --> saving --> running)
287
# so that the kill when idle code can use it!
288
@_state_time = new Date()
289
@_update_state_db()
290
resp = {}
291
cb()
292
return
293
294
if opts.action == 'start'
295
if not opts.args?
296
opts.args = []
297
for k in ['cores', 'memory', 'cpu_shares']
298
v = @["_#{k}"]
299
if v?
300
opts.args.push("--#{k}")
301
opts.args.push(v)
302
303
state_info = STATES[state]
304
if not state_info?
305
err = "bug / internal error -- unknown state '#{misc.to_safe_str(state)}'"
306
dbg(err)
307
opts.after_command_cb?(err)
308
cb(err)
309
return
310
i = state_info.commands.indexOf(opts.action)
311
if i == -1
312
err = "command #{opts.action} not allowed in state #{state}"
313
dbg(err)
314
opts.after_command_cb?(err)
315
cb(err)
316
else
317
next_state = state_info.to[opts.action]
318
if next_state?
319
dbg("next_state: #{next_state} -- launching")
320
# This action causes state change and could take a while,
321
# so we (1) change state, (2) launch the command, (3)
322
# respond immediately that it's started.
323
@_state = next_state # change state
324
@_state_time = new Date()
325
delete @_state_error
326
@_update_state_db()
327
@_update_state_listeners()
328
@_command # launch the command: this might take a long time
329
action : opts.action
330
args : opts.args
331
timeout : state_info.timeout
332
cb : (err, ignored) =>
333
dbg("finished command -- will transition to new state as result (#{err})")
334
@_state_error = err
335
if err
336
dbg("state change command ERROR -- #{err}")
337
else
338
dbg("state change command success -- #{misc.to_safe_str(ignored)}")
339
if assigned?
340
# Project was just opened and opening is an allowed command.
341
# Set when this was done.
342
sqlite_db.update
343
table : 'projects'
344
set : {assigned: assigned}
345
where : {project_id: @project_id}
346
347
@_update_state(err, ((err2) =>opts.after_command_cb?(err or err2)))
348
349
resp = {state:next_state, time:new Date()}
350
cb()
351
else
352
dbg("An action that doesn't involve state change")
353
if opts.action == 'network' # length==0 is allow network
354
dbg("do network setting")
355
# refactor this out
356
network = opts.args.length == 0
357
async.parallel([
358
(cb) =>
359
sqlite_db.update # store network state in database in case things get restarted.
360
table : 'projects'
361
set :
362
network : network
363
where :
364
project_id : @project_id
365
cb : cb
366
(cb) =>
367
uname = @project_id.replace(/-/g,'')
368
if network
369
args = ['--whitelist_users', uname]
370
else
371
args = ['--blacklist_users', uname]
372
firewall
373
command : "outgoing"
374
args : args
375
cb : cb
376
], (err) =>
377
if err
378
resp = message.error(error:err)
379
else
380
resp = {network:network}
381
cb(err)
382
)
383
else
384
dbg("doing action #{opts.action}")
385
if opts.action == 'status' or opts.action == 'state'
386
at_most_one = true
387
else
388
at_most_one = false
389
@_command
390
action : opts.action
391
args : opts.args
392
at_most_one : at_most_one
393
cb : (err, r) =>
394
dbg("got #{misc.to_safe_str(r)}, #{err}")
395
resp = r
396
cb(err)
397
opts.after_command_cb?(err)
398
(cb) =>
399
if assigned?
400
dbg("Project was just opened and opening is an allowed command... so saving that")
401
# Set when this assign happened, so we can return this as
402
# part of the status in the future, which the global hubs use
403
# to see whether the project on this node was some mess left behind
404
# during auto-failover, or is legit.
405
sqlite_db.update
406
table : 'projects'
407
set : {assigned: assigned}
408
where : {project_id: @project_id}
409
cb : cb
410
else
411
cb()
412
(cb) =>
413
if opts.action == 'status'
414
if resp.state? and STATES[@_state]?.stable
415
# We just computed the status, which includes the state. Let's save this,
416
# since it is now our most up to date view of the state.
417
@_state = resp.state
418
dbg("status: so get additional info from database")
419
sqlite_db.select
420
table : 'projects'
421
columns : ['assigned']
422
where : {project_id: @project_id}
423
cb : (err, result) =>
424
if err
425
cb(err)
426
else
427
resp.assigned = result[0].assigned
428
cb()
429
else
430
cb()
431
], (err) =>
432
if err
433
dbg("failed -- #{err}")
434
opts.cb?(err)
435
else
436
dbg("success -- #{misc.to_safe_str(resp)}")
437
opts.cb?(undefined, resp)
438
)
439
440
_update_state: (state_error, cb) =>
441
dbg = @dbg("_update_state")
442
if @_update_state_cbs?
443
dbg("waiting on previously launched status subprocess...")
444
@_update_state_cbs.push(cb)
445
return
446
@_update_state_cbs = [cb]
447
dbg("state likely changed -- determined what it changed to")
448
before = @_state
449
result = undefined
450
async.series([
451
(cb) =>
452
@_command
453
action : 'state'
454
timeout : 60
455
cb : (err, r) =>
456
result = r
457
cb(err)
458
(cb) =>
459
if result?.state == 'broken'
460
dbg("project broken, so try to stop once")
461
@_command
462
action : 'stop'
463
cb : cb
464
else
465
cb()
466
(cb) =>
467
if result?.state == 'broken'
468
dbg("project was broken; we stopped, so now trying to get state again")
469
@_command
470
action : 'state'
471
timeout : 60
472
cb : (err, r) =>
473
result = r
474
cb(err)
475
else
476
cb()
477
], (err) =>
478
if err
479
dbg("error getting status -- #{err}")
480
else
481
if result.state != before
482
@_state = result.state
483
@_state_time = new Date()
484
@_state_error = state_error
485
dbg("got new state -- #{@_state}")
486
@_update_state_db()
487
@_update_state_listeners()
488
489
v = @_update_state_cbs
490
delete @_update_state_cbs
491
dbg("calling #{v.length} callbacks")
492
for cb in v
493
cb?(err)
494
)
495
496
state: (opts) =>
497
opts = defaults opts,
498
update : false
499
cb : required
500
@dbg("state")()
501
f = (cb) =>
502
if not opts.update and @_state?
503
cb()
504
else
505
@_update_state(@_state_error, cb)
506
f (err) =>
507
if err
508
opts.cb(err)
509
else
510
x =
511
state : @_state
512
time : @_state_time
513
state_error : @_state_error
514
opts.cb(undefined, x)
515
516
set_mintime: (opts) =>
517
opts = defaults opts,
518
mintime : required
519
cb : required
520
dbg = @dbg("mintime(mintime=#{opts.mintime}s)")
521
@_mintime = opts.mintime
522
sqlite_db.update
523
table : 'projects'
524
set : {mintime: opts.mintime}
525
where : {project_id: @project_id}
526
cb : (err) =>
527
if err
528
opts.cb(err)
529
else
530
opts.cb(undefined, {})
531
532
_update_network: (cb) =>
533
@command
534
action : 'network'
535
args : if @_network then [] else ['--ban']
536
cb : cb
537
538
set_network: (opts) =>
539
opts = defaults opts,
540
network : required
541
cb : required
542
dbg = @dbg("network(network=#{opts.network})")
543
@_network = opts.network
544
resp = undefined
545
async.parallel([
546
(cb) =>
547
sqlite_db.update
548
table : 'projects'
549
set : {network: opts.network}
550
where : {project_id: @project_id}
551
cb : () => cb()
552
(cb) =>
553
@_update_network (err, r) =>
554
resp = r
555
cb(err)
556
], (err) => opts.cb?(err, resp))
557
558
set_compute_quota: (opts) =>
559
opts = defaults opts,
560
args : required
561
cb : required
562
dbg = @dbg("set_compute_quota")
563
i = 0
564
quotas = {}
565
while i < opts.args.length
566
k = opts.args[i].slice(2)
567
v = parseInt(opts.args[i+1])
568
quotas[k] = v
569
@["_#{k}"] = v
570
i += 2
571
sqlite_db.update
572
table : 'projects'
573
set : quotas
574
where : {project_id: @project_id}
575
cb : () =>
576
@command
577
action : 'compute_quota'
578
args : opts.args
579
cb : opts.cb
580
581
secret_token = undefined
582
read_secret_token = (cb) ->
583
if secret_token?
584
cb()
585
return
586
dbg = (m) -> winston.debug("read_secret_token: #{m}")
587
588
async.series([
589
# Read or create the file; after this step the variable secret_token
590
# is set and the file exists.
591
(cb) ->
592
dbg("check if file exists")
593
fs.exists program.secret_file, (exists) ->
594
if exists
595
dbg("exists -- now reading '#{program.secret_file}'")
596
fs.readFile program.secret_file, (err, buf) ->
597
if err
598
dbg("error reading the file '#{program.secret_file}'")
599
cb(err)
600
else
601
secret_token = buf.toString().trim()
602
cb()
603
else
604
dbg("creating '#{program.secret_file}'")
605
require('crypto').randomBytes 64, (ex, buf) ->
606
secret_token = buf.toString('base64')
607
fs.writeFile(program.secret_file, secret_token, cb)
608
(cb) ->
609
dbg("Ensure restrictive permissions on the secret token file.")
610
fs.chmod(program.secret_file, 0o600, cb)
611
], cb)
612
613
handle_compute_mesg = (mesg, socket, cb) ->
614
dbg = (m) => winston.debug("handle_compute_mesg(hub -> compute, id=#{mesg.id}): #{m}")
615
p = undefined
616
resp = undefined
617
async.series([
618
(cb) ->
619
get_project
620
project_id : mesg.project_id
621
cb : (err, _p) ->
622
p = _p; cb(err)
623
(cb) ->
624
p.add_listener(socket)
625
if mesg.action == 'state'
626
dbg("getting state")
627
p.state
628
update : mesg.args? and mesg.args.length > 0 and mesg.args[0] == '--update'
629
cb : (err, r) ->
630
dbg("state -- got #{err}, #{misc.to_safe_str(r)}")
631
resp = r; cb(err)
632
else if mesg.action == 'mintime'
633
p.set_mintime
634
mintime : mesg.args[0]
635
cb : (err, r) ->
636
resp = r; cb(err)
637
else if mesg.action == 'network'
638
p.set_network
639
network : mesg.args.length == 0 # no arg = enable
640
cb : (err, r) ->
641
resp = r; cb(err)
642
else if mesg.action == 'compute_quota'
643
p.set_compute_quota
644
args : mesg.args
645
cb : (err, r) ->
646
resp = r; cb(err)
647
else
648
dbg("running command")
649
p.command
650
action : mesg.action
651
args : mesg.args
652
cb : (err, r) ->
653
resp = r; cb(err)
654
], (err) ->
655
if err
656
cb(message.error(error:err))
657
else
658
cb(resp)
659
)
660
661
handle_status_mesg = (mesg, socket, cb) ->
662
dbg = (m) => winston.debug("handle_status_mesg(hub -> compute, id=#{mesg.id}): #{m}")
663
dbg()
664
status = {nproc:STATS.nproc}
665
async.parallel([
666
(cb) =>
667
sqlite_db.select
668
table : 'projects'
669
columns : ['state']
670
cb : (err, result) =>
671
if err
672
cb(err)
673
else
674
projects = status.projects = {}
675
for x in result
676
s = x.state
677
if not projects[s]?
678
projects[s] = 1
679
else
680
projects[s] += 1
681
cb()
682
(cb) =>
683
if DEV
684
cb(); return
685
fs.readFile '/proc/loadavg', (err, data) =>
686
if err
687
cb(err)
688
else
689
# http://stackoverflow.com/questions/11987495/linux-proc-loadavg
690
x = misc.split(data.toString())
691
# this is normalized based on number of procs
692
status.load = (parseFloat(x[i])/STATS.nproc for i in [0..2])
693
v = x[3].split('/')
694
status.num_tasks = parseInt(v[1])
695
status.num_active = parseInt(v[0])
696
cb()
697
(cb) =>
698
if DEV
699
cb(); return
700
fs.readFile '/proc/meminfo', (err, data) =>
701
if err
702
cb(err)
703
else
704
# See this about what MemAvailable is:
705
# https://git.kernel.org/cgit/linux/kernel/git/torvalds/linux.git/commit/?id=34e431b0ae398fc54ea69ff85ec700722c9da773
706
x = data.toString()
707
status.memory = memory = {}
708
for k in ['MemAvailable', 'SwapTotal', 'MemTotal', 'SwapFree']
709
i = x.indexOf(k)
710
y = x.slice(i)
711
i = y.indexOf('\n')
712
memory[k] = parseInt(misc.split(y.slice(0,i).split(':')[1]))/1000
713
cb()
714
], (err) =>
715
if err
716
cb(message.error(error:err))
717
else
718
cb(message.compute_server_status(status:status))
719
)
720
721
handle_mesg = (socket, mesg) ->
722
dbg = (m) => winston.debug("handle_mesg(hub -> compute, id=#{mesg.id}): #{m}")
723
dbg(misc.to_safe_str(mesg))
724
725
f = (cb) ->
726
switch mesg.event
727
when 'compute'
728
handle_compute_mesg(mesg, socket, cb)
729
when 'compute_server_status'
730
handle_status_mesg(mesg, socket, cb)
731
when 'ping'
732
cb(message.pong())
733
else
734
cb(message.error(error:"unknown event type: '#{mesg.event}'"))
735
f (resp) ->
736
resp.id = mesg.id
737
dbg("resp = '#{misc.to_safe_str(resp)}'")
738
socket.write_mesg('json', resp)
739
740
sqlite_db = undefined
741
sqlite_db_set = (opts) ->
742
opts = defaults opts,
743
key : required
744
value : required
745
cb : required
746
sqlite_db.update
747
table : 'keyvalue'
748
set :
749
value : misc.to_json(opts.value)
750
where :
751
key : misc.to_json(opts.key)
752
cb : opts.cb
753
754
sqlite_db_get = (opts) ->
755
opts = defaults opts,
756
key : required
757
cb : required
758
sqlite_db.select
759
table : 'keyvalue'
760
columns : ['value']
761
where :
762
key : misc.to_json(opts.key)
763
cb : (err, result) ->
764
if err
765
opts.cb(err)
766
else if result.length == 0
767
opts.cb(undefined, undefined)
768
else
769
opts.cb(undefined, misc.from_json(result[0][0]))
770
771
init_sqlite_db = (cb) ->
772
winston.debug("init_sqlite_db: #{SQLITE_FILE}")
773
exists = undefined
774
async.series([
775
(cb) ->
776
fs.exists SQLITE_FILE, (e) ->
777
exists = e
778
cb()
779
(cb) ->
780
sqlite.sqlite
781
filename : SQLITE_FILE
782
cb : (err, db) ->
783
sqlite_db = db; cb(err)
784
(cb) ->
785
if exists
786
cb()
787
else
788
# initialize schema
789
# project_id -- the id of the project
790
# state -- opened, closed, etc.
791
# state_time -- when switched to current state
792
# assigned -- when project was first opened on this node.
793
f = (query, cb) ->
794
sqlite_db.sql
795
query : query
796
cb : cb
797
async.map([
798
'CREATE TABLE projects(project_id TEXT PRIMARY KEY, state TEXT, state_error TEXT, state_time INTEGER, mintime INTEGER, assigned INTEGER, network BOOLEAN, cores INTEGER, memory INTEGER, cpu_shares INTEGER)',
799
'CREATE TABLE keyvalue(key TEXT PRIMARY KEY, value TEXT)'
800
], f, cb)
801
], cb)
802
803
# periodically check to see if any projects need to be killed
804
kill_idle_projects = (cb) ->
805
dbg = (m) -> winston.debug("kill_idle_projects: #{m}")
806
all_projects = undefined
807
async.series([
808
(cb) ->
809
dbg("query database for all projects")
810
sqlite_db.select
811
table : 'projects'
812
columns : ['project_id', 'state_time', 'mintime']
813
where :
814
state : 'running'
815
cb : (err, r) ->
816
all_projects = r; cb(err)
817
(cb) ->
818
now = new Date() - 0
819
v = []
820
for p in all_projects
821
if not p.mintime
822
continue
823
last_change = (now - p.state_time)/1000
824
dbg("project_id=#{p.project_id}, last_change=#{last_change}s ago, mintime=#{p.mintime}s")
825
if p.mintime < last_change
826
dbg("plan to kill project #{p.project_id}")
827
v.push(p.project_id)
828
if v.length > 0
829
f = (project_id, cb) ->
830
dbg("killing #{project_id}")
831
get_project
832
project_id : project_id
833
cb : (err, project) ->
834
if err
835
cb(err)
836
else
837
project.command
838
action : 'stop'
839
cb : cb
840
async.map(v, f, cb)
841
else
842
dbg("nothing idle to kill")
843
cb()
844
], (err) ->
845
if err
846
dbg("error killing idle -- #{err}")
847
cb?()
848
)
849
850
init_mintime = (cb) ->
851
if program.single
852
winston.debug("init_mintime: running in single-machine mode; not initializing idle timeout")
853
cb()
854
return
855
setInterval(kill_idle_projects, 3*60*1000)
856
kill_idle_projects(cb)
857
858
start_tcp_server = (cb) ->
859
dbg = (m) -> winston.debug("tcp_server: #{m}")
860
dbg("start")
861
862
server = net.createServer (socket) ->
863
dbg("received connection")
864
socket.id = uuid.v4()
865
misc_node.unlock_socket socket, secret_token, (err) ->
866
if err
867
dbg("ERROR: unable to unlock socket -- #{err}")
868
else
869
dbg("unlocked connection")
870
misc_node.enable_mesg(socket)
871
socket.on 'mesg', (type, mesg) ->
872
if type == "json" # other types ignored -- we only deal with json
873
dbg("(socket id=#{socket.id}) -- received #{misc.to_safe_str(mesg)}")
874
try
875
handle_mesg(socket, mesg)
876
catch e
877
dbg(new Error().stack)
878
winston.error("ERROR(socket id=#{socket.id}): '#{e}' handling message '#{misc.to_safe_str(mesg)}'")
879
880
get_port = (c) ->
881
dbg("get_port")
882
if program.port
883
c()
884
else
885
dbg("attempt once to use the same port as in port file, if there is one")
886
fs.exists program.port_file, (exists) ->
887
if not exists
888
dbg("no port file so choose new port")
889
program.port = 0
890
c()
891
else
892
dbg("port file exists, so read")
893
fs.readFile program.port_file, (err, data) ->
894
if err
895
program.port = 0
896
c()
897
else
898
program.port = data.toString()
899
c()
900
listen = (c) ->
901
dbg("trying port #{program.port}")
902
server.listen program.port, program.address, () ->
903
dbg("listening on #{program.address}:#{program.port}")
904
program.port = server.address().port
905
fs.writeFile(program.port_file, program.port, cb)
906
server.on 'error', (e) ->
907
dbg("error getting port -- #{e}; try again in one second (type 'netstat -tulpn |grep #{program.port}' to figure out what has the port)")
908
try_again = () ->
909
server.close()
910
server.listen(program.port, program.address)
911
setTimeout(try_again, 1000)
912
913
get_port () ->
914
listen(cb)
915
916
# Initialize basic information about this node once and for all.
917
# So far, not much -- just number of processors.
918
STATS = {}
919
init_stats = (cb) =>
920
if DEV
921
return
922
misc_node.execute_code
923
command : "nproc"
924
cb : (err, output) =>
925
if err
926
cb(err)
927
else
928
STATS.nproc = parseInt(output.stdout)
929
cb()
930
931
# Gets metadata from Google, or if that fails, from the local SQLITe database. Saves
932
# result in database for future use in case metadata fails.
933
get_metadata = (opts) ->
934
opts = defaults opts,
935
key : required
936
cb : required
937
dbg = (m) -> winston.debug("get_metadata: #{m}")
938
value = undefined
939
key = "metadata-#{opts.key}"
940
async.series([
941
(cb) ->
942
dbg("query google metdata server for #{opts.key}")
943
misc_node.execute_code
944
command : "curl"
945
args : ["http://metadata.google.internal/computeMetadata/v1/project/attributes/#{opts.key}",
946
'-H', 'Metadata-Flavor: Google']
947
cb : (err, output) ->
948
if err
949
dbg("nonfatal error querying metadata -- #{err}")
950
cb()
951
else
952
if output.stdout.indexOf('not found') == -1
953
value = output.stdout
954
cb()
955
(cb) ->
956
if value?
957
dbg("save to local database")
958
sqlite_db_set
959
key : key
960
value : value
961
cb : cb
962
else
963
dbg("querying local database")
964
sqlite_db_get
965
key : key
966
cb : (err, result) ->
967
if err
968
cb(err)
969
else
970
value = result
971
cb()
972
], (err) ->
973
if err
974
opts.cb(err)
975
else
976
opts.cb(undefined, value)
977
)
978
979
get_whitelisted_users = (opts) ->
980
opts = defaults opts,
981
cb : required
982
sqlite_db.select
983
table : 'projects'
984
where :
985
network : true
986
columns : ['project_id']
987
cb : (err, results) ->
988
if err
989
opts.cb(err)
990
else
991
opts.cb(undefined, ['root','salvus','monitoring','_apt'].concat((x.project_id.replace(/-/g,'') for x in results)))
992
993
NO_OUTGOING_FIREWALL = false
994
firewall = (opts) ->
995
opts = defaults opts,
996
command : required
997
args : []
998
cb : required
999
if opts.command == 'outgoing' and NO_OUTGOING_FIREWALL
1000
opts.cb()
1001
return
1002
misc_node.execute_code
1003
command : 'sudo'
1004
args : ["#{process.env.SALVUS_ROOT}/scripts/smc_firewall.py", opts.command].concat(opts.args)
1005
bash : false
1006
timeout : 30
1007
path : process.cwd()
1008
cb : opts.cb
1009
1010
#
1011
# Initialize the iptables based firewall. Must be run after sqlite db is initialized.
1012
#
1013
#
1014
init_firewall = (cb) ->
1015
dbg = (m) -> winston.debug("init_firewall: #{m}")
1016
if program.single
1017
dbg("running in single machine mode; not creating firewall")
1018
cb()
1019
return
1020
hostname = require("os").hostname()
1021
if not misc.startswith(hostname, 'compute')
1022
dbg("not starting firewall since hostname does not start with 'compute'")
1023
cb()
1024
return
1025
tm = misc.walltime()
1026
dbg("starting firewall configuration")
1027
incoming_whitelist_hosts = ''
1028
outgoing_whitelist_hosts = 'sagemath.com'
1029
whitelisted_users = ''
1030
admin_whitelist = ''
1031
storage_whitelist = ''
1032
async.series([
1033
(cb) ->
1034
async.parallel([
1035
(cb) ->
1036
dbg("getting incoming_whitelist_hosts")
1037
get_metadata
1038
key : "smc-servers"
1039
cb : (err, w) ->
1040
incoming_whitelist_hosts = w.replace(/ /g,',')
1041
outgoing_whitelist_hosts += ',' + w # allow users to connect to get blobs when printing sage worksheets
1042
cb(err)
1043
(cb) ->
1044
dbg("getting admin whitelist")
1045
get_metadata
1046
key : "admin-servers"
1047
cb : (err, w) ->
1048
admin_whitelist = w.replace(/ /g,',')
1049
cb(err)
1050
(cb) ->
1051
dbg("getting storage whitelist")
1052
get_metadata
1053
key : "storage-servers"
1054
cb : (err, w) ->
1055
storage_whitelist = w.replace(/ /g,',')
1056
cb(err)
1057
(cb) ->
1058
dbg('getting whitelisted users')
1059
get_whitelisted_users
1060
cb : (err, users) ->
1061
whitelisted_users = users.join(',')
1062
cb(err)
1063
], cb)
1064
(cb) ->
1065
dbg("clear existing firewall")
1066
firewall
1067
command : "clear"
1068
cb : cb
1069
(cb) ->
1070
dbg("not disabling incoming connections -- no need to")
1071
# CRITICAL: this causes a lot of trouble for no gain at all
1072
cb()
1073
return
1074
1075
dbg("starting firewall -- applying incoming rules")
1076
if admin_whitelist
1077
incoming_whitelist_hosts += ',' + admin_whitelist
1078
if storage_whitelist
1079
incoming_whitelist_hosts += ',' + storage_whitelist
1080
firewall
1081
command : "incoming"
1082
args : ["--whitelist_hosts", incoming_whitelist_hosts]
1083
cb : cb
1084
(cb) ->
1085
if incoming_whitelist_hosts.split(',').indexOf(require('os').hostname()) != -1
1086
dbg("this is a frontend web node, so not applying outgoing firewall rules (probably being used for development)")
1087
NO_OUTGOING_FIREWALL = true
1088
cb()
1089
else
1090
dbg("starting firewall -- applying outgoing rules")
1091
firewall
1092
command : "outgoing"
1093
args : ["--whitelist_hosts_file", "#{process.env.SALVUS_ROOT}/scripts/outgoing_whitelist_hosts",
1094
"--whitelist_users", whitelisted_users]
1095
cb : cb
1096
], (err) ->
1097
dbg("finished firewall configuration in #{misc.walltime(tm)} seconds")
1098
cb(err)
1099
)
1100
1101
update_states = (cb) ->
1102
# TEMPORARY until I have time to fix a bug.
1103
# Right now when a project times out starting, it gets stuck like that forever unless the client
1104
# does a project.state(force:true,update:true,cb:...), which the hub clients at this moment
1105
# evidently don't do. So as a temporary workaround (I don't want to restart them until making status better!),
1106
# we have this:
1107
# 1. query database for all projects in starting state which started more than 60 seconds ago.
1108
# 2. call .state(force:true,update:true,cb:...)
1109
projects = undefined
1110
dbg = (m) -> winston.debug("update_state: #{m}")
1111
dbg()
1112
async.series([
1113
(cb) ->
1114
dbg("querying db")
1115
sqlite_db.select
1116
table : 'projects'
1117
columns : ['project_id', 'state_time', 'state']
1118
cb : (err, x) ->
1119
if err
1120
dbg("query err=#{misc.to_safe_str(err)}")
1121
cb(err)
1122
else
1123
projects = (a for a in x when a.state == 'starting' or a.state == 'stopping' or a.state == 'saving')
1124
dbg("got #{projects.length} projects that are '....ing'")
1125
cb()
1126
(cb) ->
1127
if projects.length == 0
1128
cb(); return
1129
dbg("possibly updating each of #{projects.length} projects")
1130
f = (x, cb) ->
1131
if x.state_time >= new Date() - 1000*STATES[x.state].timeout
1132
dbg("not updating #{x.project_id}")
1133
cb()
1134
else
1135
dbg("updating #{x.project_id}")
1136
get_project
1137
project_id : x.project_id
1138
cb : (err, project) ->
1139
if err
1140
cb(err)
1141
else
1142
project.state(update:true, cb:cb)
1143
async.mapLimit(projects, 8, f, cb)
1144
], (err) ->
1145
# slow down during the first 10 minutes after startup
1146
startup = ((new Date().getTime()) - START_TIME) < 10*60*1000
1147
delay_s = if startup then 10 else 2
1148
setTimeout(update_states, delay_s * 60 * 1000)
1149
cb?(err)
1150
)
1151
1152
start_server = (cb) ->
1153
winston.debug("start_server")
1154
async.series [init_stats, read_secret_token, init_sqlite_db, init_firewall, init_mintime, start_tcp_server, update_states], (err) ->
1155
if err
1156
winston.debug("Error starting server -- #{err}")
1157
else
1158
winston.debug("Successfully started server.")
1159
cb?(err)
1160
1161
###########################
1162
# Devel testing interface (same process as hub)
1163
###########################
1164
start_fake_server = (cb) ->
1165
winston.debug("start_fake_server")
1166
# change global CONF path for local dev purposes
1167
DEV = true
1168
SQLITE_FILE = require('path').join(process.env.SALVUS_ROOT, 'data', 'compute.sqlite3')
1169
async.series [init_sqlite_db, init_mintime], (err) ->
1170
if err
1171
winston.debug("Error starting server -- #{err}")
1172
else
1173
winston.debug("Successfully started server.")
1174
cb?(err)
1175
1176
{EventEmitter} = require('events')
1177
1178
class FakeDevSocketFromCompute extends EventEmitter
1179
constructor: (@socket_from_hub) ->
1180
@callbacks = {}
1181
1182
write_mesg: (type, resp, cb) =>
1183
f = @callbacks[resp.id]
1184
if f?
1185
# response to message
1186
f(resp)
1187
delete @callbacks[resp.id]
1188
else
1189
# our own initiated message (e.g., for state updates)
1190
@socket_from_hub.emit('mesg', type, resp)
1191
1192
recv_mesg: (opts) =>
1193
opts = defaults opts,
1194
type : 'json'
1195
id : required
1196
timeout : undefined
1197
cb : required
1198
1199
class FakeDevSocketFromHub extends EventEmitter
1200
constructor: ->
1201
@_socket = new FakeDevSocketFromCompute(@)
1202
1203
write_mesg: (type, mesg, cb) =>
1204
if type == 'json'
1205
winston.debug("FakeDevSocket.write_mesg: #{misc.to_json(mesg)}")
1206
else
1207
winston.debug("FakeDevSocket.write_mesg: sending message of type #{type}")
1208
cb?() # must be before handle_mesg, so client can install recv_mesg handler before we send message!
1209
handle_mesg(@_socket, mesg)
1210
1211
recv_mesg: (opts) =>
1212
opts = defaults opts,
1213
type : 'json'
1214
id : required
1215
timeout : undefined
1216
cb : required
1217
winston.debug("FakeDevSocket.recv_mesg: #{opts.id}")
1218
@_socket.callbacks[opts.id] = opts.cb
1219
1220
fake_server = undefined
1221
exports.fake_dev_socket = (cb) ->
1222
async.series([
1223
(cb) ->
1224
if fake_server?
1225
cb()
1226
else
1227
start_fake_server(cb)
1228
], (err) ->
1229
if err
1230
cb(err)
1231
else
1232
fake_server = true
1233
cb(undefined, new FakeDevSocketFromHub())
1234
)
1235
1236
1237
1238
1239
###########################
1240
# Command line interface
1241
###########################
1242
1243
try
1244
program.usage('[start/stop/restart/status] [options]')
1245
.option('--pidfile [string]', 'store pid in this file', String, "#{CONF}/compute.pid")
1246
.option('--logfile [string]', 'write log to this file', String, "#{CONF}/compute.log")
1247
.option('--port_file [string]', 'write port number to this file', String, "#{CONF}/compute.port")
1248
.option('--secret_file [string]', 'write secret token to this file', String, "#{CONF}/compute.secret")
1249
.option('--sqlite_file [string]', 'store sqlite3 database here', String, "#{CONF}/compute.sqlite3")
1250
.option('--debug [string]', 'logging debug level (default: "" -- no debugging output)', String, 'debug')
1251
.option('--port [integer]', 'port to listen on (default: assigned by OS)', String, 0)
1252
.option('--address [string]', 'address to listen on (default: all interfaces)', String, '')
1253
.option('--single', 'if given, assume no storage servers and everything is running on one VM')
1254
.parse(process.argv)
1255
catch e
1256
# Stupid bug in the command module when loaded as a module.
1257
program._name = 'xxx'
1258
1259
program.port = parseInt(program.port)
1260
1261
exports.program = program # so can use the defaults above in other libraries, namely compute-client
1262
1263
main = () ->
1264
if program.debug
1265
winston.remove(winston.transports.Console)
1266
winston.add(winston.transports.Console, {level: program.debug, timestamp:true, colorize:true})
1267
1268
SQLITE_FILE = program.sqlite_file
1269
1270
winston.debug("running as a deamon")
1271
# run as a server/daemon (otherwise, is being imported as a library)
1272
process.addListener "uncaughtException", (err) ->
1273
winston.debug("BUG ****************************************************************************")
1274
winston.debug("Uncaught exception: " + err)
1275
winston.debug(err.stack)
1276
winston.debug("BUG ****************************************************************************")
1277
1278
fs.exists CONF, (exists) ->
1279
if exists
1280
fs.chmod(CONF, 0o700) # just in case...
1281
1282
daemon = require("start-stop-daemon") # don't import unless in a script; otherwise breaks in node v6+
1283
daemon({max:999, pidFile:program.pidfile, outFile:program.logfile, errFile:program.logfile, logFile:'/dev/null'}, start_server)
1284
1285
if program._name.split('.')[0] == 'compute'
1286
main()
1287
1288