async = require('async')
stringify = require('json-stable-stringify')
sagews = require('./sagews')
misc = require('./misc')
{defaults, required} = misc
class exports.Evaluator
constructor: (@string, cb) ->
@_init_sync_tables (err) =>
if err
cb?(err)
else
if @string._client.is_project()
@_init_project_evaluator()
cb?()
_init_sync_tables: (cb) =>
async.parallel([@_init_eval_inputs, @_init_eval_outputs], (err) => cb(err))
_init_eval_inputs: (cb) =>
query =
eval_inputs :
string_id : @string._string_id
time : {'>=': misc.server_seconds_ago(30)}
input : null
@_inputs = @string._client.sync_table(query, undefined, 500)
@_inputs.once('connected', =>cb())
_init_eval_outputs: (cb) =>
query =
eval_outputs :
string_id : @string._string_id
time : {'>=': misc.server_seconds_ago(30)}
output : null
@_outputs = @string._client.sync_table(query, undefined, 500)
@_outputs.setMaxListeners(100)
@_outputs.once('connected', =>cb())
close: () =>
@_closed = true
@_inputs?.close()
delete @_inputs
@_outputs?.close()
delete @_outputs
@_sage_session?.close()
delete @_sage_session
call: (opts) =>
opts = defaults opts,
program : required
input : required
cb : undefined
if @_closed
opts.cb?("closed")
return
time = @string._client.server_time()
if @_last_time? and time <= @_last_time
time = new Date(@_last_time - 0 + 1)
@_last_time = time
@_inputs.set
string_id : @string._string_id
time : time
user_id : 0
input : misc.copy_without(opts, 'cb')
@_inputs.save()
if opts.cb?
messages = {}
mesg_number = 0
send = (mesg) =>
if mesg.done
@_outputs.removeListener('change', handle_output)
opts.cb?(mesg)
handle_output = (keys) =>
if @_closed
opts.cb?("closed")
return
for key in keys
t = misc.from_json(key)
if t[1] - time == 0
mesg = @_outputs.get(key)?.get('output')?.toJS()
if mesg?
delete mesg.id
if t[2] == mesg_number
send(mesg)
mesg_number += 1
while messages[mesg_number]?
send(messages[mesg_number])
delete messages[mesg_number]
mesg_number += 1
else
messages[t[2]] = mesg
@_outputs.on('change', handle_output)
_execute_code_hook: (output_uuid) =>
dbg = @string._client.dbg("_execute_code_hook('#{output_uuid}')")
dbg()
if @_closed
dbg("closed")
return
output_line = sagews.MARKERS.output
process = (mesg) =>
dbg("processing mesg '#{misc.to_json(mesg)}'")
content = @string.to_str()
i = content.indexOf(sagews.MARKERS.output + output_uuid)
if i == -1
process = undefined
return
i += 37
n = content.indexOf('\n', i)
if n == -1
return
output_line += stringify(misc.copy_without(mesg, ['id', 'event'])) + sagews.MARKERS.output
if output_line.length > n - i
dbg("sage_execute_code: initiating client didn't maintain sync promptly. fixing")
x = content.slice(0, i)
content = x + output_line + content.slice(n)
if mesg.done
j = x.lastIndexOf(sagews.MARKERS.cell)
if j != -1
j = x.lastIndexOf('\n', j)
cell_id = x.slice(j+2, j+38)
S = sagews.sagews(content)
S.remove_cell_flag(cell_id, sagews.FLAGS.running)
S.set_cell_flag(cell_id, sagews.FLAGS.this_session)
content = S.content
@string.from_str(content)
@string.save()
hook = (mesg) =>
setTimeout((=>process?(mesg)), 5000)
return hook
_handle_input_change: (key) =>
dbg = @string._client.dbg('_handle_input_change')
dbg("change: #{key}")
if @_closed
dbg("closed")
return
t = misc.from_json(key)
id = [string_id, time, number] = [t[0], t[1], 0]
if not @_outputs.get(JSON.stringify(id))?
dbg("no outputs with key #{misc.to_json(id)}")
x = @_inputs.get(key)?.get('input')?.toJS?()
if not x?
return
if x.program? and x.input?
f = @["_evaluate_using_#{x.program}"]
if f?
if x.input.event == 'execute_code' and x.input.output_uuid?
hook = @_execute_code_hook(x.input.output_uuid)
f x.input, (output) =>
if @_closed
return
hook?(output)
@_outputs.set({string_id:string_id, time:time, number:number, output:output})
@_outputs.save()
number += 1
else
@_outputs.set({string_id:string_id, time:time, number:number, output:misc.to_json({error:"no program '#{x.program}'", done:true})})
@_outputs.save()
else
@_outputs.set({string_id:string_id, time:time, number:number, output:misc.to_json({error:"must specify program and input", done:true})})
@_outputs.save()
_init_project_evaluator: () =>
dbg = @string._client.dbg('project_evaluator')
dbg('init')
@_inputs.on 'change', (keys) =>
for key in keys
@_handle_input_change(key)
_evaluate_using_sage: (input, cb) =>
@_sage_session ?= @string._client.sage_session(path : @string._path)
if input.event == 'execute_code'
input = misc.copy_with(input, ['code', 'data', 'preparse', 'event', 'id'])
@_sage_session.call
input : input
cb : cb
_evaluate_using_shell: (input, cb) =>
input.cb = (err, output) =>
if not output?
output = {}
if err
output.error = err
output.done = true
cb(output)
@string._client.shell(input)