SYNCSTRING_MAX_AGE_M = 20
NEVER_CLOSE_SYNCSTRING_EXTENSIONS =
sagews : true
'sage-jupyter2' : true
fs = require('fs')
{join} = require('path')
{EventEmitter} = require('events')
async = require('async')
winston = require('winston')
winston.remove(winston.transports.Console)
winston.add(winston.transports.Console, {level: 'debug', timestamp:true, colorize:true})
require('coffee-script/register')
message = require('smc-util/message')
misc = require('smc-util/misc')
misc_node = require('smc-util-node/misc_node')
synctable = require('smc-util/synctable')
syncstring = require('smc-util/syncstring')
db_doc = require('smc-util/db-doc')
sage_session = require('./sage_session')
jupyter = require('./jupyter/jupyter')
{json} = require('./common')
kucalc = require('./kucalc')
{Watcher} = require('./watcher')
{defaults, required} = misc
DEBUG = false
DEBUG_FILE = process.env.HOME + '/.smc-DEBUG'
if fs.existsSync(DEBUG_FILE)
winston.debug("'#{DEBUG_FILE}' exists, so enabling very verbose logging")
DEBUG = true
else
winston.debug("'#{DEBUG_FILE}' does not exist; minimal logging")
class exports.Client extends EventEmitter
constructor: (@project_id) ->
@dbg('constructor')()
@setMaxListeners(300)
@_hub_callbacks = {}
@_hub_client_sockets = {}
@_changefeed_sockets = {}
@_connected = false
@_init_recent_syncstrings_table()
if kucalc.IN_KUCALC
kucalc.init(@)
DEBUG = true
_init_recent_syncstrings_table: () =>
dbg = @dbg("_init_recent_syncstrings_table")
dbg()
obj =
project_id : @project_id
max_age_m : SYNCSTRING_MAX_AGE_M
path : null
last_active : null
deleted : null
doctype : null
@_open_syncstrings = {}
@_recent_syncstrings = @sync_table(recent_syncstrings_in_project:[obj])
@_recent_syncstrings.on 'change', =>
@_update_recent_syncstrings()
@_recent_syncstrings.once 'change', =>
@_recent_syncstrings_interval = setInterval(@_update_recent_syncstrings, 300)
_update_recent_syncstrings: () =>
dbg = @dbg("update_recent_syncstrings")
cutoff = misc.minutes_ago(SYNCSTRING_MAX_AGE_M)
@_wait_syncstrings ?= {}
keys = {}
x = @_recent_syncstrings.get()
if not x?
return
log_message = "open_syncstrings: #{misc.len(@_open_syncstrings)}; recent_syncstrings: #{x.size}"
if log_message != @_update_recent_syncstrings_last
winston.debug(log_message)
@_update_recent_syncstrings_last = log_message
x.map (val, key) =>
string_id = val.get('string_id')
path = val.get('path')
if path == '.smc/local_hub/local_hub.log'
return
if val.get("last_active") > cutoff
keys[string_id] = true
if @_open_syncstrings[string_id]? or @_wait_syncstrings[string_id]
return
if not @_open_syncstrings[string_id]?
deleted = val.get('deleted')
dbg("path='#{path}', deleted=#{deleted}, string_id='#{string_id}'")
async.series([
(cb) =>
if not deleted
cb()
return
dbg("check if '#{path}' exists")
@path_exists
path : path
cb : (err, exists) =>
if err
cb(err)
else
deleted = not exists
cb()
], (err) =>
if err
dbg("SERIOUS ERROR -- #{err}")
else if deleted
dbg("ignoring deleted path '#{path}'")
else if not @_open_syncstrings[string_id]?
dbg("open syncstring '#{path}' with id '#{string_id}'")
ext = misc.separate_file_extension(path).ext
doctype = val.get('doctype')
if doctype?
dbg("using doctype='#{doctype}'")
doctype = misc.from_json(doctype)
opts = doctype.opts ? {}
opts.path = path
type = doctype.type
else
opts = {path:path}
type = 'string'
if ext == 'sage-ipython'
opts.change_throttle = opts.patch_interval = 5
opts.save_interval = 25
ss = @_open_syncstrings[string_id] = @["sync_#{type}"](opts)
ss.on 'error', (err) =>
dbg("ERROR creating syncstring '#{path}' -- #{err}; will try again later")
ss.close()
ss.on 'close', () =>
dbg("remove syncstring '#{path}' with id '#{string_id}' from cache due to close")
delete @_open_syncstrings[string_id]
@_wait_syncstrings[string_id] = true
setTimeout((()=>delete @_wait_syncstrings[string_id]), 10000)
switch ext
when 'sage-jupyter2'
jupyter.jupyter_backend(ss, @)
)
return
for string_id, val of @_open_syncstrings
path = val._path
if not keys[string_id] and not NEVER_CLOSE_SYNCSTRING_EXTENSIONS[misc.filename_extension(path)]
dbg("close syncstring '#{path}' with id '#{string_id}'")
val.close()
delete @_open_syncstrings[string_id]
dbg: (f) =>
if DEBUG
return (m...) ->
switch m.length
when 0
s = ''
when 1
s = m[0]
else
s = JSON.stringify(m)
winston.debug("Client.#{f}: #{misc.trunc_middle(s,1000)}")
else
return (m) ->
alert_message: (opts) =>
opts = defaults opts,
type : 'default'
title : undefined
message : required
block : undefined
timeout : undefined
@dbg('alert_message')(opts.title, opts.message)
close: () =>
for _, s of misc.keys(@_open_syncstrings)
s.close()
delete @_open_syncstrings
clearInterval(@_recent_syncstrings_interval)
client_id: () =>
return @project_id
is_project: () =>
return true
is_user: () =>
return false
is_signed_in: () =>
return true
is_connected: =>
return @_connected
server_time: () =>
return new Date()
active_socket: (socket) =>
dbg = @dbg("active_socket(id=#{socket.id})")
x = @_hub_client_sockets[socket.id]
if not x?
dbg()
x = @_hub_client_sockets[socket.id] = {socket:socket, callbacks:{}, activity:new Date()}
socket.on 'end', =>
dbg("end")
if x.callbacks?
for id, cb of x.callbacks
cb?('socket closed')
delete x.callbacks
delete @_hub_client_sockets[socket.id]
dbg("number of active sockets now equals #{misc.len(@_hub_client_sockets)}")
if misc.len(@_hub_client_sockets) == 0
@_connected = false
dbg("lost all active sockets")
@emit('disconnected')
if misc.len(@_hub_client_sockets) >= 1
dbg("CONNECTED!")
@_connected = true
@emit('connected')
else
x.activity = new Date()
handle_mesg: (mesg, socket) =>
dbg = @dbg("handle_mesg(#{json(mesg)})")
f = @_hub_callbacks[mesg.id]
if f?
dbg("calling callback")
if not mesg.multi_response
delete @_hub_callbacks[mesg.id]
delete @_hub_client_sockets[socket.id].callbacks[mesg.id]
f(mesg)
return true
else
dbg("no callback")
return false
get_hub_socket: () =>
v = misc.values(@_hub_client_sockets)
if v.length == 0
return
v.sort (a,b) -> misc.cmp(a.activity ? 0, b.activity ? 0)
return v[v.length-1].socket
get_all_hub_sockets = () =>
return (x.socket for x in misc.values(@_hub_client_sockets))
call: (opts) =>
opts = defaults opts,
message : required
timeout : undefined
socket : undefined
cb : undefined
dbg = @dbg("call(message=#{json(opts.message)})")
dbg()
socket = opts.socket ?= @get_hub_socket()
if not socket?
dbg("no sockets")
opts.cb?("no hubs currently connected to this project")
return
if opts.cb?
if opts.timeout
dbg("configure timeout")
fail = () =>
dbg("failed")
delete @_hub_callbacks[opts.message.id]
opts.cb?("timeout after #{opts.timeout}s")
delete opts.cb
timer = setTimeout(fail, opts.timeout*1000)
opts.message.id ?= misc.uuid()
cb = @_hub_callbacks[opts.message.id] = (resp) =>
if timer?
clearTimeout(timer)
timer = undefined
if resp.event == 'error'
opts.cb?(if resp.error then resp.error else 'error')
else
opts.cb?(undefined, resp)
@_hub_client_sockets[socket.id].callbacks[opts.message.id] = cb
socket.write_mesg('json', opts.message)
query: (opts) =>
opts = defaults opts,
query : required
changes : undefined
options : undefined
timeout : 30
cb : required
if opts.options? and not misc.is_array(opts.options)
throw Error("options must be an array")
return
mesg = message.query
id : misc.uuid()
query : opts.query
options : opts.options
changes : opts.changes
multi_response : opts.changes
socket = @get_hub_socket()
if not socket?
opts.cb("no hub socket available")
return
if opts.changes
@_changefeed_sockets[mesg.id] = socket
socket.on 'error', =>
opts.cb('socket-end')
socket.on 'end', =>
opts.cb('socket-end')
@call
message : mesg
timeout : opts.timeout
socket : socket
cb : opts.cb
query_cancel: (opts) =>
opts = defaults opts,
id : required
cb : undefined
socket = @_changefeed_sockets[opts.id]
if not socket?
opts.cb?()
else
@call
message : message.query_cancel(id:opts.id)
timeout : 30
socket : socket
cb : opts.cb
query_get_changefeed_ids: (opts) =>
opts = defaults opts,
timeout : 30
cb : required
ids = []
f = (socket, cb) =>
@call
message : message.query_get_changefeed_ids()
timeout : opts.timeout
socket : socket
cb : (err, resp) =>
if not err
ids = ids.concat(resp.changefeed_ids)
cb()
async.map @get_all_hub_sockets(), f, () =>
opts.cb(undefined, ids)
sync_table: (query, options, debounce_interval=2000, throttle_changes=undefined) =>
return synctable.sync_table(query, options, @, debounce_interval, throttle_changes)
sync_string: (opts) =>
opts = defaults opts,
path : required
save_interval : 500
patch_interval : 500
opts.client = @
opts.project_id = @project_id
@dbg("sync_string(path='#{opts.path}')")()
return new syncstring.SyncString(opts)
sync_db: (opts) =>
opts = defaults opts,
path : required
primary_keys : required
string_cols : []
change_throttle : 0
save_interval : 500
patch_interval : 500
opts.client = @
opts.project_id = @project_id
@dbg("sync_db(path='#{opts.path}')")()
return new db_doc.SyncDB(opts)
write_file: (opts) =>
opts = defaults opts,
path : required
data : required
cb : required
path = join(process.env.HOME, opts.path)
@_file_io_lock ?= {}
dbg = @dbg("write_file(path='#{opts.path}')")
dbg()
now = new Date()
if now - (@_file_io_lock[path] ? 0) < 15000
dbg("LOCK")
setTimeout((() => @write_file(opts)), 500 + 500*Math.random())
return
@_file_io_lock[path] = now
dbg("@_file_io_lock = #{misc.to_json(@_file_io_lock)}")
async.series([
(cb) =>
misc_node.ensure_containing_directory_exists(path, cb)
(cb) =>
fs.writeFile(path, opts.data, cb)
], (err) =>
delete @_file_io_lock[path]
if err
dbg("error -- #{err}")
else
dbg("success")
opts.cb(err)
)
path_read: (opts) =>
opts = defaults opts,
path : required
maxsize_MB : undefined
cb : required
content = undefined
path = join(process.env.HOME, opts.path)
dbg = @dbg("path_read(path='#{opts.path}', maxsize_MB=#{opts.maxsize_MB})")
dbg()
@_file_io_lock ?= {}
now = new Date()
if now - (@_file_io_lock[path] ? 0) < 15000
dbg("LOCK")
setTimeout((() => @path_read(opts)), 500 + 500*Math.random())
return
@_file_io_lock[path] = now
dbg("@_file_io_lock = #{misc.to_json(@_file_io_lock)}")
async.series([
(cb) =>
if opts.maxsize_MB?
dbg("check if file too big")
@file_size
filename : opts.path
cb : (err, size) =>
if err
dbg("error checking -- #{err}")
cb(err)
else if size > opts.maxsize_MB * 1000000
dbg("file is too big!")
cb("file '#{opts.path}' size (=#{size/1000000}MB) too large (must be at most #{opts.maxsize_MB}MB); try opening it in a Terminal with vim instead or write to [email protected]")
else
dbg("file is fine")
cb()
else
cb()
(cb) =>
fs.readFile path, (err, data) =>
if err
dbg("error reading file -- #{err}")
cb(err)
else
dbg('read file')
content = data.toString()
cb()
], (err) =>
delete @_file_io_lock[path]
opts.cb(err, content)
)
path_access: (opts) =>
opts = defaults opts,
path : required
mode : required
cb : required
access = 0
for s in opts.mode
access |= fs[s.toUpperCase() + '_OK']
fs.access(opts.path, access, opts.cb)
path_exists: (opts) =>
opts = defaults opts,
path : required
cb : required
dbg = @dbg("checking if path (='#{opts.path}') exists")
dbg()
fs.exists opts.path, (exists) =>
dbg("returned #{exists}")
opts.cb(undefined, exists)
path_stat: (opts) =>
opts = defaults opts,
path : required
cb : required
fs.stat(opts.path, opts.cb)
file_size: (opts) =>
opts = defaults opts,
filename : required
cb : required
@path_stat
path : opts.filename
cb : (err, stat) =>
opts.cb(err, stat?.size)
shell: (opts) =>
misc_node.execute_code(opts)
sage_session: (opts) =>
opts = defaults opts,
path : required
return sage_session.sage_session(path:opts.path, client:@)
jupyter_kernel: (opts) =>
opts.client = @
return jupyter.kernel(opts)
jupyter_kernel_info: (opts) =>
opts = defaults opts,
cb : required
jupyter.get_kernel_data(opts.cb)
watch_file: (opts) =>
opts = defaults opts,
path : required
interval : 3000
debounce : 1000
path = require('path').join(process.env.HOME, opts.path)
dbg = @dbg("watch_file(path='#{path}')")
dbg("watching file '#{path}'")
return new Watcher(path, opts.interval, opts.debounce)