Contact
CoCalc Logo Icon
StoreFeaturesDocsShareSupport News AboutSign UpSign In
| Download
Views: 39539
1
##############################################################################
2
#
3
# CoCalc: Collaborative Calculation in the Cloud
4
#
5
# Copyright (C) 2016, Sagemath Inc.
6
#
7
# This program is free software: you can redistribute it and/or modify
8
# it under the terms of the GNU General Public License as published by
9
# the Free Software Foundation, either version 3 of the License, or
10
# (at your option) any later version.
11
#
12
# This program is distributed in the hope that it will be useful,
13
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
# GNU General Public License for more details.
16
#
17
# You should have received a copy of the GNU General Public License
18
# along with this program. If not, see <http://www.gnu.org/licenses/>.
19
#
20
###############################################################################
21
22
###
23
LocalHub
24
###
25
26
async = require('async')
27
uuid = require('node-uuid')
28
winston = require('winston')
29
underscore = require('underscore')
30
31
message = require('smc-util/message')
32
misc_node = require('smc-util-node/misc_node')
33
misc = require('smc-util/misc')
34
{defaults, required} = misc
35
36
blobs = require('./blobs')
37
clients = require('./clients')
38
39
# Blobs (e.g., files dynamically appearing as output in worksheets) are kept for this
40
# many seconds before being discarded. If the worksheet is saved (e.g., by a user's autosave),
41
# then the BLOB is saved indefinitely.
42
BLOB_TTL_S = 60*60*24 # 1 day
43
44
if not process.env.SMC_TEST
45
DEBUG = true
46
47
connect_to_a_local_hub = (opts) -> # opts.cb(err, socket)
48
opts = defaults opts,
49
port : required
50
host : required
51
secret_token : required
52
timeout : 10
53
cb : required
54
55
misc_node.connect_to_locked_socket
56
port : opts.port
57
host : opts.host
58
token : opts.secret_token
59
timeout : opts.timeout
60
cb : (err, socket) =>
61
if err
62
opts.cb(err)
63
else
64
misc_node.enable_mesg(socket, 'connection_to_a_local_hub')
65
socket.on 'data', (data) ->
66
misc_node.keep_portforward_alive(opts.port)
67
opts.cb(undefined, socket)
68
69
_local_hub_cache = {}
70
exports.new_local_hub = (project_id, database, compute_server) ->
71
if not project_id?
72
throw "project_id must be specified (it is undefined)"
73
H = _local_hub_cache[project_id]
74
if H?
75
winston.debug("new_local_hub('#{project_id}') -- using cached version")
76
else
77
winston.debug("new_local_hub('#{project_id}') -- creating new one")
78
H = new LocalHub(project_id, database, compute_server)
79
_local_hub_cache[project_id] = H
80
return H
81
82
exports.connect_to_project = (project_id, database, compute_server) ->
83
hub = exports.new_local_hub(project_id, database, compute_server)
84
hub.local_hub_socket(()->)
85
86
exports.all_local_hubs = () ->
87
v = []
88
for k, h of _local_hub_cache
89
if h?
90
v.push(h)
91
return v
92
93
smc_version = undefined
94
init_smc_version = () ->
95
smc_version = require('./hub-version')
96
smc_version.on 'change', () ->
97
winston.debug("local_hub_connection (smc_version changed) -- checking on clients")
98
for x in exports.all_local_hubs()
99
x.restart_if_version_too_old()
100
101
class LocalHub # use the function "new_local_hub" above; do not construct this directly!
102
constructor: (@project_id, @database, @compute_server) ->
103
if not smc_version? # module being used -- make sure smc_version is initialized
104
init_smc_version()
105
@_local_hub_socket_connecting = false
106
@_sockets = {} # key = session_uuid:client_id
107
@_sockets_by_client_id = {} #key = client_id, value = list of sockets for that client
108
@call_callbacks = {}
109
@path = '.' # should deprecate - *is* used by some random code elsewhere in this file
110
@dbg("getting deployed running project")
111
112
project: (cb) =>
113
@compute_server.project
114
project_id : @project_id
115
cb : cb
116
117
dbg: (m) =>
118
## only enable when debugging
119
if DEBUG
120
winston.debug("local_hub(#{@project_id}: #{misc.to_json(m)}")
121
122
move: (opts) =>
123
opts = defaults opts,
124
target : undefined
125
cb : undefined # cb(err, {host:hostname})
126
@dbg("move")
127
@project (err, project) =>
128
if err
129
cb?(err)
130
else
131
project.move(opts)
132
133
restart: (cb) =>
134
@dbg("restart")
135
@free_resources()
136
@project (err, project) =>
137
if err
138
cb(err)
139
else
140
project.restart(cb:cb)
141
142
close: (cb) =>
143
@dbg("close: stop the project and delete from disk (but leave in cloud storage)")
144
@project (err, project) =>
145
if err
146
cb(err)
147
else
148
project.ensure_closed(cb:cb)
149
150
save: (cb) =>
151
@dbg("save: save a snapshot of the project")
152
@project (err, project) =>
153
if err
154
cb(err)
155
else
156
project.save(cb:cb)
157
158
status: (cb) =>
159
@dbg("status: get status of a project")
160
@project (err, project) =>
161
if err
162
cb(err)
163
else
164
project.status(cb:cb)
165
166
state: (cb) =>
167
@dbg("state: get state of a project")
168
@project (err, project) =>
169
if err
170
cb(err)
171
else
172
project.state(cb:cb)
173
174
free_resources: () =>
175
@dbg("free_resources")
176
@query_cancel_all_changefeeds()
177
delete @address # so we don't continue trying to use old address
178
delete @_status
179
delete @smc_version # so when client next connects we ignore version checks until they tell us their version
180
try
181
@_socket?.end()
182
winston.debug("free_resources: closed main local_hub socket")
183
catch e
184
winston.debug("free_resources: exception closing main _socket: #{e}")
185
delete @_socket
186
for k, s of @_sockets
187
try
188
s.end()
189
winston.debug("free_resources: closed #{k}")
190
catch e
191
winston.debug("free_resources: exception closing a socket: #{e}")
192
@_sockets = {}
193
@_sockets_by_client_id = {}
194
195
free_resources_for_client_id: (client_id) =>
196
v = @_sockets_by_client_id[client_id]
197
if v?
198
@dbg("free_resources_for_client_id(#{client_id}) -- #{v.length} sockets")
199
for socket in v
200
try
201
socket.end()
202
socket.destroy()
203
catch e
204
# do nothing
205
delete @_sockets_by_client_id[client_id]
206
207
#
208
# Project query support code
209
#
210
mesg_query: (mesg, write_mesg) =>
211
dbg = (m) => winston.debug("mesg_query(project_id='#{@project_id}'): #{misc.trunc(m,200)}")
212
dbg(misc.to_json(mesg))
213
query = mesg.query
214
if not query?
215
write_mesg(message.error(error:"query must be defined"))
216
return
217
first = true
218
if mesg.changes
219
@_query_changefeeds ?= {}
220
@_query_changefeeds[mesg.id] = true
221
mesg_id = mesg.id
222
@database.user_query
223
project_id : @project_id
224
query : query
225
options : mesg.options
226
changes : if mesg.changes then mesg_id
227
cb : (err, result) =>
228
if result?.action == 'close'
229
err = 'close'
230
if err
231
dbg("project_query error: #{misc.to_json(err)}")
232
if @_query_changefeeds?[mesg_id]
233
delete @_query_changefeeds[mesg_id]
234
write_mesg(message.error(error:err))
235
if mesg.changes and not first
236
# also, assume changefeed got messed up, so cancel it.
237
@database.user_query_cancel_changefeed(id : mesg_id)
238
else
239
#if Math.random() <= .3 # for testing -- force forgetting about changefeed with probability 10%.
240
# delete @_query_changefeeds[mesg_id]
241
if mesg.changes and not first
242
resp = result
243
resp.id = mesg_id
244
resp.multi_response = true
245
else
246
first = false
247
resp = mesg
248
resp.query = result
249
write_mesg(resp)
250
251
mesg_query_cancel: (mesg, write_mesg) =>
252
if not @_query_changefeeds?
253
# no changefeeds
254
write_mesg(mesg)
255
else
256
@database.user_query_cancel_changefeed
257
id : mesg.id
258
cb : (err, resp) =>
259
if err
260
write_mesg(message.error(error:err))
261
else
262
mesg.resp = resp
263
write_mesg(mesg)
264
delete @_query_changefeeds?[mesg.id]
265
266
mesg_query_get_changefeed_ids: (mesg, write_mesg) =>
267
mesg.changefeed_ids = if @_query_changefeeds? then misc.keys(@_query_changefeeds) else []
268
write_mesg(mesg)
269
270
query_cancel_all_changefeeds: (cb) =>
271
if not @_query_changefeeds? or @_query_changefeeds.length == 0
272
cb?(); return
273
dbg = (m)-> winston.debug("query_cancel_all_changefeeds(project_id='#{@project_id}'): #{m}")
274
v = @_query_changefeeds
275
dbg("canceling #{v.length} changefeeds")
276
delete @_query_changefeeds
277
f = (id, cb) =>
278
dbg("canceling id=#{id}")
279
@database.user_query_cancel_changefeed
280
id : id
281
cb : (err) =>
282
if err
283
dbg("FEED: warning #{id} -- error canceling a changefeed #{misc.to_json(err)}")
284
else
285
dbg("FEED: canceled changefeed -- #{id}")
286
cb()
287
async.map(misc.keys(v), f, (err) => cb?(err))
288
289
#
290
# end project query support code
291
#
292
293
# local hub just told us its version. Record it. Restart project if hub version too old.
294
local_hub_version: (version) =>
295
winston.debug("local_hub_version: version=#{version}")
296
@smc_version = version
297
@restart_if_version_too_old()
298
299
# If our known version of the project is too old compared to the
300
# current min_project_version in smcu-util/smc-version, then
301
# we restart the project, which updates the code to the latest
302
# version. Only restarts the project if we have an open control
303
# socket to it.
304
# Please make damn sure to update the project code on the compute
305
# server before updating the version, or the project will be
306
# forced to restart and it won't help!
307
restart_if_version_too_old: () =>
308
if not @_socket?
309
# not connected at all -- just return
310
return
311
if not @smc_version?
312
# client hasn't told us their version yet
313
return
314
if smc_version.min_project_version <= @smc_version
315
# the project is up to date
316
return
317
if @_restart_goal_version == smc_version.min_project_version
318
# We already restarted the project in an attempt to update it to this version
319
# and it didn't get updated. Don't try again until @_restart_version is cleared, since
320
# we don't want to lock a user out of their project due to somebody forgetting
321
# to update code on the compute server! It could also be that the project just
322
# didn't finish restarting.
323
return
324
325
winston.debug("restart_if_version_too_old(#{@project_id}): #{@smc_version}, #{smc_version.min_project_version}")
326
# record some stuff so that we don't keep trying to restart the project constantly
327
ver = @_restart_goal_version = smc_version.min_project_version # version which we tried to get to
328
f = () =>
329
if @_restart_goal_version == ver
330
delete @_restart_goal_version
331
setTimeout(f, 15*60*1000) # don't try again for at least 15 minutes.
332
333
@dbg("restart_if_version_too_old -- restarting since #{smc_version.min_project_version} > #{@smc_version}")
334
@restart (err) =>
335
@dbg("restart_if_version_too_old -- done #{err}")
336
337
# handle incoming JSON messages from the local_hub
338
handle_mesg: (mesg, socket) =>
339
@dbg("local_hub --> hub: received mesg: #{misc.trunc(misc.to_json(mesg), 250)}")
340
if mesg.client_id?
341
# Should we worry about ensuring that message from this local hub are allowed to
342
# send messages to this client? NO. For them to send a message, they would have to
343
# know the client's id, which is a random uuid, assigned each time the user connects.
344
# It obviously is known to the local hub -- but if the user has connected to the local
345
# hub then they should be allowed to receive messages.
346
clients.push_to_client(mesg)
347
return
348
if mesg.event == 'version'
349
@local_hub_version(mesg.version)
350
return
351
if mesg.id?
352
f = @call_callbacks[mesg.id]
353
if f?
354
f(mesg)
355
else
356
winston.debug("handling call from local_hub")
357
write_mesg = (resp) =>
358
resp.id = mesg.id
359
@local_hub_socket (err, sock) =>
360
if not err
361
sock.write_mesg('json', resp)
362
switch mesg.event
363
when 'ping'
364
write_mesg(message.pong())
365
when 'query'
366
@mesg_query(mesg, write_mesg)
367
when 'query_cancel'
368
@mesg_query_cancel(mesg, write_mesg)
369
when 'query_get_changefeed_ids'
370
@mesg_query_get_changefeed_ids(mesg, write_mesg)
371
when 'file_written_to_project'
372
# ignore -- don't care; this is going away
373
return
374
when 'file_read_from_project'
375
# handle elsewhere by the code that requests the file
376
return
377
when 'error'
378
# ignore -- don't care since handler already gone.
379
return
380
else
381
write_mesg(message.error(error:"unknown event '#{mesg.event}'"))
382
return
383
384
handle_blob: (opts) =>
385
opts = defaults opts,
386
uuid : required
387
blob : required
388
389
@dbg("local_hub --> global_hub: received a blob with uuid #{opts.uuid}")
390
# Store blob in DB.
391
blobs.save_blob
392
uuid : opts.uuid
393
blob : opts.blob
394
project_id : @project_id
395
ttl : BLOB_TTL_S
396
check : true # if malicious user tries to overwrite a blob with given sha1 hash, they get an error.
397
database : @database
398
cb : (err, ttl) =>
399
if err
400
resp = message.save_blob(sha1:opts.uuid, error:err)
401
@dbg("handle_blob: error! -- #{err}")
402
else
403
resp = message.save_blob(sha1:opts.uuid, ttl:ttl)
404
405
@local_hub_socket (err, socket) =>
406
if not err
407
socket.write_mesg('json', resp)
408
409
# Connection to the remote local_hub daemon that we use for control.
410
local_hub_socket: (cb) =>
411
if @_socket?
412
#@dbg("local_hub_socket: re-using existing socket")
413
cb(undefined, @_socket)
414
return
415
416
if @_local_hub_socket_connecting
417
@_local_hub_socket_queue.push(cb)
418
@dbg("local_hub_socket: added socket request to existing queue, which now has length #{@_local_hub_socket_queue.length}")
419
return
420
@_local_hub_socket_connecting = true
421
@_local_hub_socket_queue = [cb]
422
connecting_timer = undefined
423
424
cancel_connecting = () =>
425
@_local_hub_socket_connecting = false
426
if @_local_hub_socket_queue?
427
@dbg("local_hub_socket: cancelled due to timeout")
428
for c in @_local_hub_socket_queue
429
c?('timeout')
430
delete @_local_hub_socket_queue
431
clearTimeout(connecting_timer)
432
433
# If below fails for 20s for some reason, cancel everything to allow for future attempt.
434
connecting_timer = setTimeout(cancel_connecting, 20000)
435
436
@dbg("local_hub_socket: getting new socket")
437
@new_socket (err, socket) =>
438
if not @_local_hub_socket_queue?
439
# already gave up.
440
return
441
@_local_hub_socket_connecting = false
442
@dbg("local_hub_socket: new_socket returned #{err}")
443
if err
444
for c in @_local_hub_socket_queue
445
c?(err)
446
delete @_local_hub_socket_queue
447
else
448
socket.on 'mesg', (type, mesg) =>
449
switch type
450
when 'blob'
451
@handle_blob(mesg)
452
when 'json'
453
@handle_mesg(mesg, socket)
454
455
socket.on('end', @free_resources)
456
socket.on('close', @free_resources)
457
socket.on('error', @free_resources)
458
459
# Send a hello message to the local hub, so it knows this is the control connection,
460
# and not something else (e.g., a console).
461
socket.write_mesg('json', {event:'hello'})
462
463
for c in @_local_hub_socket_queue
464
c?(undefined, socket)
465
delete @_local_hub_socket_queue
466
467
@_socket = socket
468
469
# Finally, we wait a bit to see if the version gets sent from
470
# the client. If not, we set it to 0, which will cause a restart,
471
# which will upgrade to a new version that sends versions.
472
# TODO: This code can be deleted after all projects get restarted.
473
check_version_received = () =>
474
if @_socket? and not @smc_version?
475
@smc_version = 0
476
@restart_if_version_too_old()
477
setTimeout(check_version_received, 60*1000)
478
479
cancel_connecting()
480
481
# Get a new connection to the local_hub,
482
# authenticated via the secret_token, and enhanced
483
# to be able to send/receive json and blob messages.
484
new_socket: (cb) => # cb(err, socket)
485
@dbg("new_socket")
486
f = (cb) =>
487
if not @address?
488
cb("no address")
489
return
490
connect_to_a_local_hub
491
port : @address.port
492
host : @address.host
493
secret_token : @address.secret_token
494
cb : cb
495
socket = undefined
496
async.series([
497
(cb) =>
498
if not @address?
499
@dbg("get address of a working local hub")
500
@project (err, project) =>
501
if err
502
cb(err)
503
else
504
@dbg("get address")
505
project.address
506
cb : (err, address) =>
507
@address = address; cb(err)
508
else
509
cb()
510
(cb) =>
511
@dbg("try to connect to local hub socket using last known address")
512
f (err, _socket) =>
513
if not err
514
socket = _socket
515
cb()
516
else
517
@dbg("failed to get address of a working local hub")
518
@project (err, project) =>
519
if err
520
cb(err)
521
else
522
@dbg("get address")
523
project.address
524
cb : (err, address) =>
525
@address = address; cb(err)
526
(cb) =>
527
if not socket?
528
@dbg("still don't have our connection -- try again")
529
f (err, _socket) =>
530
socket = _socket; cb(err)
531
else
532
cb()
533
], (err) =>
534
cb(err, socket)
535
)
536
537
remove_multi_response_listener: (id) =>
538
delete @call_callbacks[id]
539
540
call: (opts) =>
541
opts = defaults opts,
542
mesg : required
543
timeout : undefined # NOTE: a nonzero timeout MUST be specified, or we will not even listen for a response from the local hub! (Ensures leaking listeners won't happen.)
544
multi_response : false # if true, timeout ignored; call @remove_multi_response_listener(mesg.id) to remove
545
cb : undefined
546
@dbg("call")
547
if not opts.mesg.id?
548
if opts.timeout or opts.multi_response # opts.timeout being undefined or 0 both mean "don't do it"
549
opts.mesg.id = uuid.v4()
550
551
@local_hub_socket (err, socket) =>
552
if err
553
@dbg("call: failed to get socket -- #{err}")
554
opts.cb?(err)
555
return
556
@dbg("call: get socket -- now writing message to the socket -- #{misc.trunc(misc.to_json(opts.mesg),200)}")
557
socket.write_mesg 'json', opts.mesg, (err) =>
558
if err
559
@free_resources() # at least next time it will get a new socket
560
opts.cb?(err)
561
return
562
if opts.multi_response
563
@call_callbacks[opts.mesg.id] = opts.cb
564
else if opts.timeout
565
# Listen to exactly one response, them remove the listener:
566
@call_callbacks[opts.mesg.id] = (resp) =>
567
delete @call_callbacks[opts.mesg.id]
568
if resp.event == 'error'
569
opts.cb(resp.error)
570
else
571
opts.cb(undefined, resp)
572
# As mentioned above -- there's no else -- if not timeout then
573
# we do not listen for a response.
574
575
####################################################
576
# Session management
577
#####################################################
578
579
_open_session_socket: (opts) =>
580
opts = defaults opts,
581
client_id : required
582
session_uuid : required
583
type : required # 'sage', 'console'
584
params : required
585
project_id : required
586
timeout : 10
587
cb : required # cb(err, socket)
588
@dbg("_open_session_socket")
589
# We do not currently have an active open socket connection to this session.
590
# We make a new socket connection to the local_hub, then
591
# send a connect_to_session message, which will either
592
# plug this socket into an existing session with the given session_uuid, or
593
# create a new session with that uuid and plug this socket into it.
594
595
key = "#{opts.session_uuid}:#{opts.client_id}"
596
socket = @_sockets[key]
597
if socket?
598
opts.cb(false, socket)
599
return
600
601
socket = undefined
602
async.series([
603
(cb) =>
604
@dbg("_open_session_socket: getting new socket connection to a local_hub")
605
@new_socket (err, _socket) =>
606
if err
607
cb(err)
608
else
609
socket = _socket
610
socket._key = key
611
@_sockets[key] = socket
612
if not @_sockets_by_client_id[opts.client_id]?
613
@_sockets_by_client_id[opts.client_id] = [socket]
614
else
615
@_sockets_by_client_id[opts.client_id].push(socket)
616
cb()
617
(cb) =>
618
mesg = message.connect_to_session
619
id : uuid.v4() # message id
620
type : opts.type
621
project_id : opts.project_id
622
session_uuid : opts.session_uuid
623
params : opts.params
624
@dbg("_open_session_socket: send the message asking to be connected with a #{opts.type} session.")
625
socket.write_mesg('json', mesg)
626
# Now we wait for a response for opt.timeout seconds
627
f = (type, resp) =>
628
clearTimeout(timer)
629
#@dbg("Getting #{opts.type} session -- get back response type=#{type}, resp=#{misc.to_json(resp)}")
630
if resp.event == 'error'
631
cb(resp.error)
632
else
633
if opts.type == 'console'
634
# record the history, truncating in case the local_hub sent something really long (?)
635
if resp.history?
636
socket.history = resp.history.slice(resp.history.length - 100000)
637
else
638
socket.history = ''
639
# Console -- we will now only use this socket for binary communications.
640
misc_node.disable_mesg(socket)
641
cb()
642
socket.once('mesg', f)
643
timed_out = () =>
644
socket.removeListener('mesg', f)
645
socket.end()
646
cb("Timed out after waiting #{opts.timeout} seconds for response from #{opts.type} session server. Please try again later.")
647
timer = setTimeout(timed_out, opts.timeout*1000)
648
649
], (err) =>
650
if err
651
@dbg("_open_session_socket: error getting a socket -- (declaring total disaster) -- #{err}")
652
# This @_socket.destroy() below is VERY important, since just deleting the socket might not send this,
653
# and the local_hub -- if the connection were still good -- would have two connections
654
# with the global hub, thus doubling sync and broadcast messages. NOT GOOD.
655
@_socket?.destroy()
656
delete @_status; delete @_socket
657
else if socket?
658
opts.cb(false, socket)
659
)
660
661
# Connect the client with a console session, possibly creating a session in the process.
662
console_session: (opts) =>
663
opts = defaults opts,
664
client : required
665
project_id : required
666
params : required
667
session_uuid : undefined # if undefined, a new session is created; if defined, connect to session or get error
668
cb : required # cb(err, [session_connected message])
669
@dbg("console_session: connect client to console session -- session_uuid=#{opts.session_uuid}")
670
671
# Connect to the console server
672
if not opts.session_uuid?
673
# Create a new session
674
opts.session_uuid = uuid.v4()
675
676
@_open_session_socket
677
client_id : opts.client.id
678
session_uuid : opts.session_uuid
679
project_id : opts.project_id
680
type : 'console'
681
params : opts.params
682
cb : (err, console_socket) =>
683
if err
684
opts.cb(err)
685
return
686
687
# In case it was already setup to listen before... (and client is reconnecting)
688
console_socket.removeAllListeners()
689
690
console_socket._ignore = false
691
console_socket.on 'end', () =>
692
winston.debug("console_socket (session_uuid=#{opts.session_uuid}): received 'end' so setting ignore=true")
693
opts.client.push_to_client(message.terminate_session(session_uuid:opts.session_uuid))
694
console_socket._ignore = true
695
delete @_sockets[console_socket._key]
696
697
# Plug the two consoles together
698
#
699
# client --> console:
700
# Create a binary channel that the client can use to write to the socket.
701
# (This uses our system for multiplexing JSON and multiple binary streams
702
# over one single connection.)
703
recently_sent_reconnect = false
704
#winston.debug("installing data handler -- ignore='#{console_socket._ignore}")
705
channel = opts.client.register_data_handler (data) =>
706
#winston.debug("handling data -- ignore='#{console_socket._ignore}'; path='#{opts.path}'")
707
if not console_socket._ignore
708
try
709
console_socket.write(data)
710
catch
711
###
712
Sometimes this appears in the server logs, where
713
local_hub_connection.coffee:719 below was the above line:
714
2017-06-16 12:03:46.749111 | 04:33:22.604308 | uncaught_exception | {"host": "smc-hub-242825805-9pkmp", "error": "Error: write after end", "stack": "Error: write after end\n at writeAfterEnd (_stream_writable.js:193:12)\n at Socket.Writable.write (_stream_writable.js:240:5)\n at Socket.write (net.js:657:40)\n at Array.<anonymous> (/cocalc/src/smc-hub/local_hub_connection.coffee:719:40)\n at Timeout._onTimeout (/cocalc/src/smc-hub/client.coffee:540:13)\n at ontimeout (timers.js:386:14)\n at tryOnTimeout (timers.js:250:5)\n at Timer.listOnTimeout (timers.js:214:5)\n"}
715
###
716
if opts.params.filename?
717
opts.client.touch(project_id:opts.project_id, path:opts.params.filename)
718
else
719
# send a reconnect message, but at most once every 5 seconds.
720
if not recently_sent_reconnect
721
recently_sent_reconnect = true
722
setTimeout( (()=>recently_sent_reconnect=false), 5000 )
723
winston.debug("console -- trying to write to closed console_socket with session_uuid=#{opts.session_uuid}")
724
opts.client.push_to_client(message.session_reconnect(session_uuid:opts.session_uuid))
725
726
mesg = message.session_connected
727
session_uuid : opts.session_uuid
728
data_channel : channel
729
history : console_socket.history
730
731
opts.cb(false, mesg)
732
733
# console --> client:
734
# When data comes in from the socket, we push it on to the connected
735
# client over the channel we just created.
736
f = (data) ->
737
# Never push more than 20000 characters at once to client, since display is slow, etc.
738
if data.length > 20000
739
data = "[...]" + data.slice(data.length - 20000)
740
#winston.debug("push_data_to_client('#{data}')")
741
opts.client.push_data_to_client(channel, data)
742
console_socket.history += data
743
if console_socket.history.length > 150000
744
console_socket.history = console_socket.history.slice(console_socket.history.length - 100000)
745
console_socket.on('data', f)
746
747
terminate_session: (opts) =>
748
opts = defaults opts,
749
session_uuid : required
750
project_id : required
751
cb : undefined
752
@dbg("terminate_session")
753
@call
754
mesg :
755
message.terminate_session
756
session_uuid : opts.session_uuid
757
project_id : opts.project_id
758
timeout : 30
759
cb : opts.cb
760
761
# Read a file from a project into memory on the hub. This is
762
# used, e.g., for client-side editing, worksheets, etc. This does
763
# not pull the file from the database; instead, it loads it live
764
# from the project_server virtual machine.
765
read_file: (opts) => # cb(err, content_of_file)
766
{path, project_id, archive, cb} = defaults opts,
767
path : required
768
project_id : required
769
archive : 'tar.bz2' # for directories; if directory, then the output object "data" has data.archive=actual extension used.
770
cb : required
771
@dbg("read_file '#{path}'")
772
socket = undefined
773
id = uuid.v4()
774
data = undefined
775
data_uuid = undefined
776
result_archive = undefined
777
778
async.series([
779
# Get a socket connection to the local_hub.
780
(cb) =>
781
@local_hub_socket (err, _socket) =>
782
if err
783
cb(err)
784
else
785
socket = _socket
786
cb()
787
(cb) =>
788
socket.write_mesg('json', message.read_file_from_project(id:id, project_id:project_id, path:path, archive:archive))
789
socket.recv_mesg
790
type : 'json'
791
id : id
792
timeout : 60
793
cb : (mesg) =>
794
switch mesg.event
795
when 'error'
796
cb(mesg.error)
797
when 'file_read_from_project'
798
data_uuid = mesg.data_uuid
799
result_archive = mesg.archive
800
cb()
801
else
802
cb("Unknown mesg event '#{mesg.event}'")
803
(cb) =>
804
socket.recv_mesg
805
type : 'blob'
806
id : data_uuid
807
timeout : 60
808
cb : (_data) =>
809
data = _data
810
data.archive = result_archive
811
cb()
812
813
], (err) =>
814
if err
815
cb(err)
816
else
817
cb(false, data)
818
)
819
820
# Write a file
821
write_file: (opts) => # cb(err)
822
{path, project_id, cb, data} = defaults opts,
823
path : required
824
project_id : required
825
data : required # what to write
826
cb : required
827
@dbg("write_file '#{path}'")
828
id = uuid.v4()
829
data_uuid = uuid.v4()
830
831
@local_hub_socket (err, socket) =>
832
if err
833
opts.cb(err)
834
return
835
mesg = message.write_file_to_project
836
id : id
837
project_id : project_id
838
path : path
839
data_uuid : data_uuid
840
socket.write_mesg('json', mesg)
841
socket.write_mesg('blob', {uuid:data_uuid, blob:data})
842
socket.recv_mesg
843
type : 'json'
844
id : id
845
timeout : 10
846
cb : (mesg) =>
847
switch mesg.event
848
when 'file_written_to_project'
849
opts.cb()
850
when 'error'
851
opts.cb(mesg.error)
852
else
853
opts.cb("unexpected message type '#{mesg.event}'")
854
855
856