Contact
CoCalc Logo Icon
StoreFeaturesDocsShareSupport News AboutSign UpSign In
| Download
Views: 39536
1
###
2
Manage console sessions and the console server
3
4
This runs as part of the local hub.
5
6
NOTE: This code is complicated mainly because it supports multiple users
7
connecting (or reconnecting) to the **same** session, and also handles bursts
8
of output.
9
###
10
11
fs = require('fs')
12
async = require('async')
13
winston = require('winston')
14
15
message = require('smc-util/message')
16
misc = require('smc-util/misc')
17
misc_node = require('smc-util-node/misc_node')
18
19
port_manager = require('./port_manager')
20
21
# try to restart the console server and get port where it is listening
22
CONSOLE_SERVER_MAX_STARTUP_TIME_S = 10 # 10 seconds
23
24
_restarting_console_server = false
25
_restarted_console_server = 0 # time when we last restarted it
26
restart_console_server = (cb) -> # cb(err)
27
dbg = (m) -> winston.debug("restart_console_server: #{misc.to_json(m)}")
28
dbg()
29
30
if _restarting_console_server
31
dbg("hit lock -- already restarting console server")
32
cb("already restarting console server")
33
return
34
35
t = new Date() - _restarted_console_server
36
if t <= CONSOLE_SERVER_MAX_STARTUP_TIME_S*1000
37
err = "restarted console server #{t}ms ago -- still waiting for it to start"
38
dbg(err)
39
cb(err)
40
return
41
42
_restarting_console_server = true
43
dbg("restarting daemon")
44
45
port_file = misc_node.abspath(port_manager.port_file('console'))
46
port = undefined
47
async.series([
48
(cb) ->
49
dbg("remove port_file=#{port_file}")
50
fs.unlink port_file, (err) ->
51
cb() # ignore error, e.g., if file not there.
52
(cb) ->
53
dbg("restart console server")
54
cmd = "smc-console-server"
55
misc_node.execute_code
56
command : "#{cmd} stop; #{cmd} start"
57
timeout : 15
58
ulimit_timeout : false # very important -- so doesn't kill consoles after 15 seconds!
59
err_on_exit : true
60
bash : true
61
verbose : true
62
cb : cb
63
(cb) ->
64
dbg("wait a little to see if #{port_file} appears, and if so read it and return port")
65
f = (cb) ->
66
fs.exists port_file, (exists) ->
67
if not exists
68
cb(true)
69
else
70
fs.readFile port_file, (err, data) ->
71
if err
72
cb(err)
73
else
74
s = data.toString()
75
#try
76
port = parseInt(s)
77
cb()
78
# cb("console port_file(='#{port_file}') corrupt -- contents='#{s}' -- #{error}")
79
misc.retry_until_success
80
f : f
81
max_time : 7000
82
cb : cb
83
], (err) =>
84
_restarting_console_server = false
85
_restarted_console_server = new Date()
86
dbg("finished trying to restart console_server")
87
if err
88
dbg("ERROR: #{err}")
89
cb(err, port)
90
)
91
92
93
class ConsoleSessions
94
constructor: () ->
95
@_sessions = {}
96
@_get_session_cbs = {}
97
98
set_secret_token: (secret_token) =>
99
@_secret_token = secret_token
100
101
set_port: (port) =>
102
@_port = port
103
104
session_exists: (session_uuid) =>
105
return @_sessions[session_uuid]?
106
107
terminate_session: (session_uuid, cb) =>
108
session = @_sessions[session_uuid]
109
if not session?
110
cb?()
111
else
112
winston.debug("terminate console session '#{session_uuid}'")
113
if session.status == 'running'
114
session.socket.end()
115
session.status = 'done'
116
cb?()
117
else
118
cb?()
119
120
terminate_all_sessions: () =>
121
for session_uuid, session of @_sessions[session_uuid]
122
try
123
session.socket.end()
124
catch e
125
session.status = 'done'
126
127
# Connect to (if 'running'), restart (if 'dead'), or create (if
128
# non-existing) the console session with mesg.session_uuid.
129
connect: (client_socket, mesg, cb) =>
130
if not mesg.session_uuid?
131
mesg.session_uuid = misc.uuid()
132
client_socket.on 'end', () =>
133
winston.debug("a console session client socket ended -- session_uuid=#{mesg.session_uuid}")
134
#client_socket.destroy()
135
@get_session mesg, (err, session) =>
136
if err
137
client_socket.write_mesg('json', message.error(id:mesg.id, error:err))
138
cb?(err)
139
else
140
client_socket.write_mesg('json', {desc:session.desc, history:session.history.toString()})
141
misc_node.plug(client_socket, session.socket, 20000) # 20000 = max burst to client every few ms.
142
session.clients.push(client_socket)
143
cb?()
144
145
# Get or create session with given uuid.
146
# Can be safely called several times at once without creating multiple sessions...
147
get_session: (mesg, cb) =>
148
# NOTE: must be robust against multiple clients opening same session_id at once, which
149
# would be likely to happen on network reconnect.
150
winston.debug("get_session: console session #{mesg.session_uuid}")
151
session = @_sessions[mesg.session_uuid]
152
if session? and session.status == 'running'
153
winston.debug("console session: done -- it's already there and working")
154
cb(undefined, session)
155
return
156
157
if not @_get_session_cbs[mesg.session_uuid]?
158
winston.debug("console session not yet created -- put on stack")
159
@_get_session_cbs[mesg.session_uuid] = [cb]
160
else
161
winston.debug("console session already being created -- just push cb onto stack and return")
162
@_get_session_cbs[mesg.session_uuid].push(cb)
163
return
164
165
port = undefined
166
history = undefined
167
async.series([
168
(cb) =>
169
if session?
170
history = session.history # maintain history
171
winston.debug("console session does not exist or is not running, so we make a new session")
172
session = undefined
173
if @_port
174
port = @_port
175
cb()
176
return
177
port_manager.get_port 'console', (err, _port) =>
178
if err
179
cb() # will try to restart console server in next step
180
else
181
port = _port
182
winston.debug("got console server port = #{port}")
183
cb()
184
(cb) =>
185
if port?
186
cb()
187
else
188
winston.debug("couldn't determine console server port; probably console server not running -- try restarting it")
189
@terminate_all_sessions()
190
restart_console_server (err, _port) =>
191
if err
192
cb(err)
193
else
194
port = _port
195
winston.debug("restarted console server, then got port = #{port}")
196
cb()
197
(cb) =>
198
winston.debug("console: Got port -- now create the new session")
199
@_new_session mesg, port, (err, _session) =>
200
if err
201
cb(err)
202
else
203
session = _session
204
if history? # we restarted session; maintain history
205
session.history = history
206
cb()
207
], (err) =>
208
# call all the callbacks that were waiting on this session.
209
for cb in @_get_session_cbs[mesg.session_uuid]
210
cb(err, session)
211
delete @_get_session_cbs[mesg.session_uuid]
212
)
213
214
_get_console_server_socket: (port, cb) =>
215
socket = undefined
216
f = (cb) =>
217
misc_node.connect_to_locked_socket
218
port : port
219
token : @_secret_token
220
cb : (err, _socket) =>
221
if err
222
cb(err)
223
else
224
socket = _socket
225
cb()
226
async.series([
227
(cb) =>
228
misc.retry_until_success
229
f : f
230
start_delay : 50
231
factor : 1.7
232
max_delay : 2000
233
max_time : 5000
234
cb : (err) =>
235
cb() # ignore err on purpose -- no err sets socket
236
(cb) =>
237
if socket?
238
cb(); return
239
port_manager.forget_port('console')
240
@terminate_all_sessions()
241
restart_console_server (err, _port) =>
242
if err
243
cb(err)
244
else
245
port = _port
246
cb()
247
(cb) =>
248
if socket?
249
cb(); return
250
misc.retry_until_success
251
f : f
252
max_time : 5000
253
cb : cb
254
], (err) =>
255
if err
256
cb(err)
257
else
258
cb(undefined, socket)
259
)
260
261
_new_session: (mesg, port, cb) => # cb(err, session)
262
winston.debug("_new_session: defined by #{misc.to_json(mesg)}")
263
# Connect to port CONSOLE_PORT, send mesg, then hook sockets together.
264
@_get_console_server_socket port, (err, console_socket) =>
265
if err
266
cb("_new_session: console server failed to connect -- #{err}")
267
return
268
# Request a Console session from console_server
269
misc_node.enable_mesg(console_socket)
270
console_socket.write_mesg('json', mesg)
271
272
# Below we wait for one message to come back from the console_socket.
273
# However, if 5s elapses with no response -- which could happen! --
274
# we give up, and return an error. We then set cb undefined in case
275
# the session does actually work.
276
no_response = =>
277
if cb?
278
cb("no response")
279
console_socket.destroy()
280
cb = undefined # make sure doesn't get used below
281
no_response_timeout = setTimeout(no_response, 5000)
282
283
# Read one JSON message back, which describes the session
284
console_socket.once 'mesg', (type, desc) =>
285
clearTimeout(no_response_timeout)
286
if not cb? # already failed
287
return
288
# in future, history could be read from a file
289
# Disable JSON mesg protocol, since it isn't used further
290
misc_node.disable_mesg(console_socket)
291
292
session =
293
socket : console_socket
294
desc : desc
295
status : 'running'
296
clients : []
297
history : '' # TODO: this could come from something stored in a file
298
session_uuid : mesg.session_uuid
299
project_id : mesg.project_id
300
301
session.amount_of_data = 0
302
session.last_data = misc.mswalltime()
303
304
console_socket.on 'data', (data) ->
305
#winston.debug("receive #{data.length} of data from the pty: data='#{data.toString()}'")
306
# every 2 ms we reset the burst data watcher.
307
tm = misc.mswalltime()
308
if tm - session.last_data >= 2
309
session.amount_of_data = 0
310
session.last_data = tm
311
312
if session.amount_of_data > 50000
313
# We just got more than 50000 characters of output in <= 2 ms, so truncate it.
314
# I had a control-c here, but it was EVIL (and useless), so do *not* enable this.
315
# console_socket.write(String.fromCharCode(3))
316
data = '[...]'
317
318
session.history += data
319
session.amount_of_data += data.length
320
n = session.history.length
321
if n > 150000
322
session.history = session.history.slice(session.history.length - 100000)
323
324
@_sessions[mesg.session_uuid] = session
325
cb(undefined, session)
326
327
console_socket.on 'end', () =>
328
winston.debug("console session #{mesg.session_uuid} ended")
329
session = @_sessions[mesg.session_uuid]
330
if session?
331
session.status = 'done'
332
for client in session.clients
333
# close all of these connections
334
client.end()
335
336
# Return object that describes status of all Console sessions
337
info: (project_id) =>
338
obj = {}
339
for id, session of @_sessions
340
if session.project_id == project_id
341
obj[id] =
342
desc : session.desc
343
status : session.status
344
history_length : session.history.length
345
return obj
346
347
exports.ConsoleSessions = ConsoleSessions
348