Contact
CoCalc Logo Icon
StoreFeaturesDocsShareSupport News AboutSign UpSign In
| Download
Views: 39535
1
###
2
Start the Sage server and also get a new socket connection to it.
3
###
4
5
async = require('async')
6
winston = require('winston')
7
8
misc = require('smc-util/misc')
9
misc_node = require('smc-util-node/misc_node')
10
message = require('smc-util/message')
11
12
port_manager = require('./port_manager')
13
14
common = require('./common')
15
blobs = require('./blobs')
16
17
18
{required, defaults} = misc
19
20
###############################################
21
# Direct Sage socket session -- used internally in local hub, e.g., to assist CodeMirror editors...
22
###############################################
23
24
# Wait up to this long for the Sage server to start responding
25
# connection requests, after we restart it. It can
26
# take a while, since it pre-imports the sage library
27
# at startup, before forking.
28
SAGE_SERVER_MAX_STARTUP_TIME_S = 60
29
30
_restarting_sage_server = false
31
_restarted_sage_server = 0 # time when we last restarted it
32
restart_sage_server = (cb) ->
33
dbg = (m) -> winston.debug("restart_sage_server: #{misc.to_json(m)}")
34
if _restarting_sage_server
35
dbg("hit lock")
36
cb("already restarting sage server")
37
return
38
t = new Date() - _restarted_sage_server
39
if t <= SAGE_SERVER_MAX_STARTUP_TIME_S*1000
40
err = "restarted sage server #{t}ms ago: not allowing too many restarts too quickly..."
41
dbg(err)
42
cb(err)
43
return
44
45
_restarting_sage_server = true
46
dbg("restarting the daemon")
47
misc_node.execute_code
48
command : "smc-sage-server restart"
49
timeout : 45
50
ulimit_timeout : false # very important -- so doesn't kill after 30 seconds of cpu!
51
err_on_exit : true
52
bash : true
53
cb : (err, output) ->
54
if err
55
dbg("failed to restart sage server daemon -- #{err}")
56
else
57
dbg("successfully restarted sage server daemon -- '#{JSON.stringify(output)}'")
58
_restarting_sage_server = false
59
_restarted_sage_server = new Date()
60
cb(err)
61
62
# Get a new connection to the Sage server. If the server
63
# isn't running, e.g., it was killed due to running out of memory,
64
# so attempt to restart it and try to connect.
65
exports.get_sage_socket = (cb) -> # cb(err, socket)
66
socket = undefined
67
try_to_connect = (cb) ->
68
_get_sage_socket (err, _socket) ->
69
if not err
70
socket = _socket
71
cb()
72
else
73
# Failed for some reason: try to restart one time, then try again.
74
# We do this because the Sage server can easily get killed due to out of memory conditions.
75
# But we don't constantly try to restart the server, since it can easily fail to start if
76
# there is something wrong with a local Sage install.
77
# Note that restarting the sage server doesn't impact currently running worksheets (they
78
# have their own process that isn't killed).
79
restart_sage_server (err) -> # won't actually try to restart if called recently.
80
if err
81
cb(err)
82
return
83
# success at restarting sage server: *IMMEDIATELY* try to connect
84
_get_sage_socket (err, _socket) ->
85
socket = _socket
86
cb(err)
87
88
misc.retry_until_success
89
f : try_to_connect
90
start_delay : 50
91
max_delay : 5000
92
factor : 1.5
93
max_time : SAGE_SERVER_MAX_STARTUP_TIME_S*1000
94
log : (m) -> winston.debug("get_sage_socket: #{m}")
95
cb : (err) ->
96
cb(err, socket)
97
98
_get_sage_socket = (cb) -> # cb(err, socket that is ready to use)
99
sage_socket = undefined
100
port = undefined
101
async.series([
102
(cb) =>
103
winston.debug("get sage server port")
104
port_manager.get_port 'sage', (err, _port) =>
105
if err
106
cb(err); return
107
else
108
port = _port
109
cb()
110
(cb) =>
111
winston.debug("get and unlock socket")
112
misc_node.connect_to_locked_socket
113
port : port
114
token : common.secret_token()
115
cb : (err, _socket) =>
116
if err
117
port_manager.forget_port('sage')
118
winston.debug("unlock socket: _new_session: sage session denied connection: #{err}")
119
cb("_new_session: sage session denied connection: #{err}")
120
return
121
sage_socket = _socket
122
winston.debug("Successfully unlocked a sage session connection.")
123
cb()
124
125
(cb) =>
126
winston.debug("request sage session from server.")
127
misc_node.enable_mesg(sage_socket)
128
sage_socket.write_mesg('json', message.start_session(type:'sage'))
129
winston.debug("Waiting to read one JSON message back, which will describe the session....")
130
# TODO: couldn't this just hang forever :-(
131
sage_socket.once 'mesg', (type, desc) =>
132
winston.debug("Got message back from Sage server: #{common.json(desc)}")
133
sage_socket.pid = desc.pid
134
cb()
135
136
], (err) -> cb(err, sage_socket))
137
138
139
cache = {}
140
exports.sage_session = (opts) ->
141
opts = defaults opts,
142
client : required
143
path : required # the path to the *worksheet* file
144
# compute and cache if not cached; otherwise, get from cache:
145
return cache[opts.path] ?= new SageSession(opts)
146
147
###
148
Sage Session object
149
150
Until you actually try to call it no socket need
151
###
152
class SageSession
153
constructor: (opts) ->
154
opts = defaults opts,
155
client : required
156
path : required # the path to the *worksheet* file
157
@_path = opts.path
158
@_client = opts.client
159
@_output_cb = {}
160
161
dbg: (f) =>
162
return (m) => winston.debug("SageSession(path='#{@_path}').#{f}: #{m}")
163
164
close: () =>
165
if @_socket?
166
misc_node.process_kill(@_socket.pid, 9)
167
@_socket?.end()
168
delete @_socket
169
for id, cb of @_output_cb
170
cb({done:true, error:"killed"})
171
@_output_cb = {}
172
delete cache[@_path]
173
174
# return true if there is a socket connection to a sage server process
175
is_running: () =>
176
return @_socket?
177
178
_init_socket: (cb) =>
179
dbg = @dbg('_init_socket()')
180
dbg()
181
exports.get_sage_socket (err, socket) =>
182
if err
183
dbg("fail -- #{err}.")
184
cb(err)
185
return
186
187
dbg("successfully opened a sage session")
188
@_socket = socket
189
190
socket.on 'end', () =>
191
delete @_socket
192
dbg("codemirror session terminated")
193
194
# CRITICAL: we must define this handler before @_init_path below,
195
# or @_init_path can't possibly work... since it would wait for
196
# this handler to get the response message!
197
socket.on 'mesg', (type, mesg) =>
198
dbg("sage session: received message #{type}")
199
@["_handle_mesg_#{type}"]?(mesg)
200
201
@_init_path(cb)
202
203
_init_path: (cb) =>
204
dbg = @dbg("_init_path()")
205
dbg()
206
err = undefined
207
@call
208
input :
209
event : 'execute_code'
210
code : "os.chdir(salvus.data['path']);__file__=salvus.data['file']"
211
data :
212
path : misc_node.abspath(misc.path_split(@_path).head)
213
file : misc_node.abspath(@_path)
214
preparse : false
215
cb : (resp) =>
216
if resp.stderr
217
err = resp.stderr
218
dbg("error '#{err}'")
219
if resp.done
220
cb?(err)
221
222
call: (opts) =>
223
opts = defaults opts,
224
input : required
225
cb : undefined # cb(resp) or cb(resp1), cb(resp2), etc. -- posssibly called multiple times when message is execute or 0 times
226
dbg = @dbg("call")
227
dbg("input='#{misc.trunc(misc.to_json(opts.input), 300)}'")
228
switch opts.input.event
229
when 'ping'
230
opts.cb?({pong:true})
231
when 'status'
232
opts.cb?({running:@is_running()})
233
when 'signal'
234
if @_socket?
235
dbg("sending signal #{opts.input.signal} to process #{@_socket.pid}")
236
misc_node.process_kill(@_socket.pid, opts.input.signal)
237
opts.cb?({})
238
when 'restart'
239
dbg("restarting sage session")
240
if @_socket?
241
@close()
242
@_init_socket (err) =>
243
if err
244
opts.cb?({error:err})
245
else
246
opts.cb?({})
247
when 'raw_input'
248
dbg("sending sage_raw_input event")
249
@_socket?.write_mesg('json', {event:'sage_raw_input', value:opts.input.value})
250
else
251
# send message over socket and get responses
252
async.series([
253
(cb) =>
254
if @_socket?
255
cb()
256
else
257
@_init_socket(cb)
258
(cb) =>
259
if not opts.input.id?
260
opts.input.id = misc.uuid()
261
dbg("generated new random uuid for input: '#{opts.input.id}' ")
262
@_socket.write_mesg('json', opts.input)
263
if opts.cb?
264
@_output_cb[opts.input.id] = opts.cb # this is when opts.cb will get called...
265
cb()
266
], (err) =>
267
if err
268
opts.cb?({done:true, error:err})
269
)
270
_handle_mesg_blob: (mesg) =>
271
sha1 = mesg.uuid
272
dbg = @dbg("_handle_mesg_blob(sha1='#{sha1}')")
273
dbg()
274
hub = @_client.get_hub_socket()
275
if not hub?
276
error = 'no global hubs are connected to the local hub, so nowhere to send file'
277
dbg(error)
278
resp = message.save_blob
279
error : error
280
sha1 : sha1
281
@_socket?.write_mesg('json', resp)
282
return
283
dbg("forwarding blob to hub")
284
hub.write_mesg('blob', mesg)
285
blobs.receive_save_blob_message
286
sha1 : sha1
287
cb : (resp) =>
288
@_socket?.write_mesg('json', resp)
289
290
_handle_mesg_json: (mesg) =>
291
dbg = @dbg('_handle_mesg_json')
292
dbg("mesg='#{misc.trunc_middle(misc.to_json(mesg),400)}'")
293
c = @_output_cb[mesg?.id]
294
if c?
295
# Must do this check first since it uses done:false.
296
if mesg.done or not mesg.done?
297
delete @_output_cb[mesg.id]
298
mesg.done = true
299
if mesg.done? and not mesg.done
300
# waste of space to include done part of mesg if just false for everything else...
301
delete mesg.done
302
c(mesg)
303
304
305