Contact
CoCalc Logo Icon
StoreFeaturesDocsShareSupport News AboutSign UpSign In
| Download
Views: 39537
1
###############################################################################
2
#
3
# CoCalc: Collaborative Calculation in the Cloud
4
#
5
# Copyright (C) 2016, Sagemath Inc.
6
#
7
# This program is free software: you can redistribute it and/or modify
8
# it under the terms of the GNU General Public License as published by
9
# the Free Software Foundation, either version 3 of the License, or
10
# (at your option) any later version.
11
#
12
# This program is distributed in the hope that it will be useful,
13
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
# GNU General Public License for more details.
16
#
17
# You should have received a copy of the GNU General Public License
18
# along with this program. If not, see <http://www.gnu.org/licenses/>.
19
#
20
###############################################################################
21
22
###
23
Evaluation of code with streaming output built on both the clients and
24
server (local hub) using a sync_table. This evaluator is associated
25
to a syncstring editing session, and provides code evaluation that
26
may be used to enhance the experience of document editing.
27
###
28
29
async = require('async')
30
stringify = require('json-stable-stringify')
31
32
sagews = require('./sagews')
33
misc = require('./misc')
34
35
{defaults, required} = misc
36
37
class exports.Evaluator
38
constructor: (@string, cb) ->
39
@_init_sync_tables (err) =>
40
if err
41
cb?(err)
42
else
43
if @string._client.is_project()
44
@_init_project_evaluator()
45
cb?()
46
47
_init_sync_tables: (cb) =>
48
async.parallel([@_init_eval_inputs, @_init_eval_outputs], (err) => cb(err))
49
50
_init_eval_inputs: (cb) =>
51
query =
52
eval_inputs :
53
string_id : @string._string_id
54
time : {'>=': misc.server_seconds_ago(30)}
55
input : null
56
@_inputs = @string._client.sync_table(query, undefined, 500)
57
@_inputs.once('connected', =>cb())
58
59
_init_eval_outputs: (cb) =>
60
query =
61
eval_outputs :
62
string_id : @string._string_id
63
time : {'>=': misc.server_seconds_ago(30)}
64
output : null
65
@_outputs = @string._client.sync_table(query, undefined, 500)
66
@_outputs.setMaxListeners(100) # in case of many evaluations at once.
67
@_outputs.once('connected', =>cb())
68
69
close: () =>
70
@_closed = true
71
@_inputs?.close()
72
delete @_inputs
73
@_outputs?.close()
74
delete @_outputs
75
@_sage_session?.close()
76
delete @_sage_session
77
78
call: (opts) =>
79
opts = defaults opts,
80
program : required # 'sage', 'bash'
81
input : required # object whose meaning depends on the program
82
cb : undefined
83
if @_closed
84
opts.cb?("closed")
85
return
86
time = @string._client.server_time()
87
# Perturb time if it is <= last time when this client did an evaluation.
88
# We do this so that the time below is different than anything else.
89
# TODO: This is NOT 100% yet, due to multiple clients possibly starting
90
# different evaluations simultaneously.
91
if @_last_time? and time <= @_last_time
92
time = new Date(@_last_time - 0 + 1) # one millesecond later
93
@_last_time = time
94
95
@_inputs.set
96
string_id : @string._string_id
97
time : time
98
user_id : 0
99
input : misc.copy_without(opts, 'cb')
100
@_inputs.save() # root cause of https://github.com/sagemathinc/cocalc/issues/1589
101
if opts.cb?
102
# Listen for output until we receive a message with mesg.done true.
103
messages = {}
104
mesg_number = 0
105
send = (mesg) =>
106
if mesg.done
107
@_outputs.removeListener('change', handle_output)
108
opts.cb?(mesg)
109
110
handle_output = (keys) =>
111
#console.log("handle_output #{misc.to_json(keys)}")
112
if @_closed
113
opts.cb?("closed")
114
return
115
for key in keys
116
t = misc.from_json(key)
117
if t[1] - time == 0 # we called opts.cb on output with the given timestamp
118
mesg = @_outputs.get(key)?.get('output')?.toJS()
119
if mesg?
120
delete mesg.id # waste of space
121
# This code is written under the assumption that messages may
122
# arrive in somewhat random order. We did this since RethinkDB
123
# doesn't guarantee anything about the order of writes versus
124
# when changes get pushed out. That said, PostgreSQL **does** make
125
# clear guarantees about when things happen, so this may
126
# no longer be a problem.... (TODO).
127
# E.g. this in a Sage worksheet:
128
# for i in range(20): print i; sys.stdout.flush()
129
if t[2] == mesg_number # t[2] is the sequence number of the message
130
# Inform caller of result
131
send(mesg)
132
# Push out any messages that arrived earlier that are ready to send.
133
mesg_number += 1
134
while messages[mesg_number]?
135
send(messages[mesg_number])
136
delete messages[mesg_number]
137
mesg_number += 1
138
else
139
# Put message in the queue of messages that arrived too early
140
messages[t[2]] = mesg
141
142
@_outputs.on('change', handle_output)
143
144
_execute_code_hook: (output_uuid) =>
145
dbg = @string._client.dbg("_execute_code_hook('#{output_uuid}')")
146
dbg()
147
if @_closed
148
dbg("closed")
149
return
150
151
output_line = sagews.MARKERS.output
152
process = (mesg) =>
153
dbg("processing mesg '#{misc.to_json(mesg)}'")
154
content = @string.to_str()
155
i = content.indexOf(sagews.MARKERS.output + output_uuid)
156
if i == -1
157
# no cell anymore -- do nothing further
158
process = undefined
159
return
160
i += 37
161
n = content.indexOf('\n', i)
162
if n == -1 # corrupted
163
return
164
output_line += stringify(misc.copy_without(mesg, ['id', 'event'])) + sagews.MARKERS.output
165
#dbg("sage_execute_code: i=#{i}, n=#{n}, output_line.length=#{output_line.length}, output_line='#{output_line}'")
166
if output_line.length > n - i
167
dbg("sage_execute_code: initiating client didn't maintain sync promptly. fixing")
168
x = content.slice(0, i)
169
content = x + output_line + content.slice(n)
170
if mesg.done
171
j = x.lastIndexOf(sagews.MARKERS.cell)
172
if j != -1
173
j = x.lastIndexOf('\n', j)
174
cell_id = x.slice(j+2, j+38)
175
#dbg("removing a cell flag: before='#{content}', cell_id='#{cell_id}'")
176
S = sagews.sagews(content)
177
S.remove_cell_flag(cell_id, sagews.FLAGS.running)
178
S.set_cell_flag(cell_id, sagews.FLAGS.this_session)
179
content = S.content
180
#dbg("removing a cell flag: after='#{content}'")
181
@string.from_str(content)
182
@string.save()
183
184
hook = (mesg) =>
185
setTimeout((=>process?(mesg)), 5000)
186
return hook
187
188
_handle_input_change: (key) =>
189
dbg = @string._client.dbg('_handle_input_change')
190
dbg("change: #{key}")
191
if @_closed
192
dbg("closed")
193
return
194
t = misc.from_json(key)
195
id = [string_id, time, number] = [t[0], t[1], 0]
196
if not @_outputs.get(JSON.stringify(id))?
197
dbg("no outputs with key #{misc.to_json(id)}")
198
x = @_inputs.get(key)?.get('input')?.toJS?() # could be deleting a key!
199
if not x?
200
return
201
if x.program? and x.input?
202
f = @["_evaluate_using_#{x.program}"]
203
if f?
204
if x.input.event == 'execute_code' and x.input.output_uuid?
205
hook = @_execute_code_hook(x.input.output_uuid)
206
f x.input, (output) =>
207
if @_closed
208
return
209
#dbg("got output='#{misc.to_json(output)}'; id=#{misc.to_json(id)}")
210
hook?(output)
211
@_outputs.set({string_id:string_id, time:time, number:number, output:output})
212
@_outputs.save()
213
number += 1
214
else
215
@_outputs.set({string_id:string_id, time:time, number:number, output:misc.to_json({error:"no program '#{x.program}'", done:true})})
216
@_outputs.save()
217
else
218
@_outputs.set({string_id:string_id, time:time, number:number, output:misc.to_json({error:"must specify program and input", done:true})})
219
@_outputs.save()
220
221
# Runs only in the project
222
_init_project_evaluator: () =>
223
dbg = @string._client.dbg('project_evaluator')
224
dbg('init')
225
@_inputs.on 'change', (keys) =>
226
for key in keys
227
@_handle_input_change(key)
228
229
# Runs only in the project
230
_evaluate_using_sage: (input, cb) =>
231
@_sage_session ?= @string._client.sage_session(path : @string._path)
232
# TODO: input also may have -- uuid, output_uuid, timeout
233
if input.event == 'execute_code'
234
input = misc.copy_with(input, ['code', 'data', 'preparse', 'event', 'id'])
235
@_sage_session.call
236
input : input
237
cb : cb
238
239
# Runs only in the project
240
_evaluate_using_shell: (input, cb) =>
241
input.cb = (err, output) =>
242
if not output?
243
output = {}
244
if err
245
output.error = err
246
output.done = true
247
cb(output)
248
@string._client.shell(input)
249
250