async = require('async')
uuid = require('node-uuid')
winston = require('winston')
underscore = require('underscore')
message = require('smc-util/message')
misc_node = require('smc-util-node/misc_node')
misc = require('smc-util/misc')
{defaults, required} = misc
blobs = require('./blobs')
clients = require('./clients')
BLOB_TTL_S = 60*60*24
if not process.env.SMC_TEST
DEBUG = true
connect_to_a_local_hub = (opts) ->
opts = defaults opts,
port : required
host : required
secret_token : required
timeout : 10
cb : required
misc_node.connect_to_locked_socket
port : opts.port
host : opts.host
token : opts.secret_token
timeout : opts.timeout
cb : (err, socket) =>
if err
opts.cb(err)
else
misc_node.enable_mesg(socket, 'connection_to_a_local_hub')
socket.on 'data', (data) ->
misc_node.keep_portforward_alive(opts.port)
opts.cb(undefined, socket)
_local_hub_cache = {}
exports.new_local_hub = (project_id, database, compute_server) ->
if not project_id?
throw "project_id must be specified (it is undefined)"
H = _local_hub_cache[project_id]
if H?
winston.debug("new_local_hub('#{project_id}') -- using cached version")
else
winston.debug("new_local_hub('#{project_id}') -- creating new one")
H = new LocalHub(project_id, database, compute_server)
_local_hub_cache[project_id] = H
return H
exports.connect_to_project = (project_id, database, compute_server) ->
hub = exports.new_local_hub(project_id, database, compute_server)
hub.local_hub_socket(()->)
exports.all_local_hubs = () ->
v = []
for k, h of _local_hub_cache
if h?
v.push(h)
return v
smc_version = undefined
init_smc_version = () ->
smc_version = require('./hub-version')
smc_version.on 'change', () ->
winston.debug("local_hub_connection (smc_version changed) -- checking on clients")
for x in exports.all_local_hubs()
x.restart_if_version_too_old()
class LocalHub
constructor: (@project_id, @database, @compute_server) ->
if not smc_version?
init_smc_version()
@_local_hub_socket_connecting = false
@_sockets = {}
@_sockets_by_client_id = {}
@call_callbacks = {}
@path = '.'
@dbg("getting deployed running project")
project: (cb) =>
@compute_server.project
project_id : @project_id
cb : cb
dbg: (m) =>
if DEBUG
winston.debug("local_hub(#{@project_id}: #{misc.to_json(m)}")
move: (opts) =>
opts = defaults opts,
target : undefined
cb : undefined
@dbg("move")
@project (err, project) =>
if err
cb?(err)
else
project.move(opts)
restart: (cb) =>
@dbg("restart")
@free_resources()
@project (err, project) =>
if err
cb(err)
else
project.restart(cb:cb)
close: (cb) =>
@dbg("close: stop the project and delete from disk (but leave in cloud storage)")
@project (err, project) =>
if err
cb(err)
else
project.ensure_closed(cb:cb)
save: (cb) =>
@dbg("save: save a snapshot of the project")
@project (err, project) =>
if err
cb(err)
else
project.save(cb:cb)
status: (cb) =>
@dbg("status: get status of a project")
@project (err, project) =>
if err
cb(err)
else
project.status(cb:cb)
state: (cb) =>
@dbg("state: get state of a project")
@project (err, project) =>
if err
cb(err)
else
project.state(cb:cb)
free_resources: () =>
@dbg("free_resources")
@query_cancel_all_changefeeds()
delete @address
delete @_status
delete @smc_version
try
@_socket?.end()
winston.debug("free_resources: closed main local_hub socket")
catch e
winston.debug("free_resources: exception closing main _socket: #{e}")
delete @_socket
for k, s of @_sockets
try
s.end()
winston.debug("free_resources: closed #{k}")
catch e
winston.debug("free_resources: exception closing a socket: #{e}")
@_sockets = {}
@_sockets_by_client_id = {}
free_resources_for_client_id: (client_id) =>
v = @_sockets_by_client_id[client_id]
if v?
@dbg("free_resources_for_client_id(#{client_id}) -- #{v.length} sockets")
for socket in v
try
socket.end()
socket.destroy()
catch e
delete @_sockets_by_client_id[client_id]
mesg_query: (mesg, write_mesg) =>
dbg = (m) => winston.debug("mesg_query(project_id='#{@project_id}'): #{misc.trunc(m,200)}")
dbg(misc.to_json(mesg))
query = mesg.query
if not query?
write_mesg(message.error(error:"query must be defined"))
return
first = true
if mesg.changes
@_query_changefeeds ?= {}
@_query_changefeeds[mesg.id] = true
mesg_id = mesg.id
@database.user_query
project_id : @project_id
query : query
options : mesg.options
changes : if mesg.changes then mesg_id
cb : (err, result) =>
if result?.action == 'close'
err = 'close'
if err
dbg("project_query error: #{misc.to_json(err)}")
if @_query_changefeeds?[mesg_id]
delete @_query_changefeeds[mesg_id]
write_mesg(message.error(error:err))
if mesg.changes and not first
@database.user_query_cancel_changefeed(id : mesg_id)
else
if mesg.changes and not first
resp = result
resp.id = mesg_id
resp.multi_response = true
else
first = false
resp = mesg
resp.query = result
write_mesg(resp)
mesg_query_cancel: (mesg, write_mesg) =>
if not @_query_changefeeds?
write_mesg(mesg)
else
@database.user_query_cancel_changefeed
id : mesg.id
cb : (err, resp) =>
if err
write_mesg(message.error(error:err))
else
mesg.resp = resp
write_mesg(mesg)
delete @_query_changefeeds?[mesg.id]
mesg_query_get_changefeed_ids: (mesg, write_mesg) =>
mesg.changefeed_ids = if @_query_changefeeds? then misc.keys(@_query_changefeeds) else []
write_mesg(mesg)
query_cancel_all_changefeeds: (cb) =>
if not @_query_changefeeds? or @_query_changefeeds.length == 0
cb?(); return
dbg = (m)-> winston.debug("query_cancel_all_changefeeds(project_id='#{@project_id}'): #{m}")
v = @_query_changefeeds
dbg("canceling #{v.length} changefeeds")
delete @_query_changefeeds
f = (id, cb) =>
dbg("canceling id=#{id}")
@database.user_query_cancel_changefeed
id : id
cb : (err) =>
if err
dbg("FEED: warning #{id} -- error canceling a changefeed #{misc.to_json(err)}")
else
dbg("FEED: canceled changefeed -- #{id}")
cb()
async.map(misc.keys(v), f, (err) => cb?(err))
local_hub_version: (version) =>
winston.debug("local_hub_version: version=#{version}")
@smc_version = version
@restart_if_version_too_old()
restart_if_version_too_old: () =>
if not @_socket?
return
if not @smc_version?
return
if smc_version.min_project_version <= @smc_version
return
if @_restart_goal_version == smc_version.min_project_version
return
winston.debug("restart_if_version_too_old(#{@project_id}): #{@smc_version}, #{smc_version.min_project_version}")
ver = @_restart_goal_version = smc_version.min_project_version
f = () =>
if @_restart_goal_version == ver
delete @_restart_goal_version
setTimeout(f, 15*60*1000)
@dbg("restart_if_version_too_old -- restarting since #{smc_version.min_project_version} > #{@smc_version}")
@restart (err) =>
@dbg("restart_if_version_too_old -- done #{err}")
handle_mesg: (mesg, socket) =>
@dbg("local_hub --> hub: received mesg: #{misc.trunc(misc.to_json(mesg), 250)}")
if mesg.client_id?
clients.push_to_client(mesg)
return
if mesg.event == 'version'
@local_hub_version(mesg.version)
return
if mesg.id?
f = @call_callbacks[mesg.id]
if f?
f(mesg)
else
winston.debug("handling call from local_hub")
write_mesg = (resp) =>
resp.id = mesg.id
@local_hub_socket (err, sock) =>
if not err
sock.write_mesg('json', resp)
switch mesg.event
when 'ping'
write_mesg(message.pong())
when 'query'
@mesg_query(mesg, write_mesg)
when 'query_cancel'
@mesg_query_cancel(mesg, write_mesg)
when 'query_get_changefeed_ids'
@mesg_query_get_changefeed_ids(mesg, write_mesg)
when 'file_written_to_project'
return
when 'file_read_from_project'
return
when 'error'
return
else
write_mesg(message.error(error:"unknown event '#{mesg.event}'"))
return
handle_blob: (opts) =>
opts = defaults opts,
uuid : required
blob : required
@dbg("local_hub --> global_hub: received a blob with uuid #{opts.uuid}")
blobs.save_blob
uuid : opts.uuid
blob : opts.blob
project_id : @project_id
ttl : BLOB_TTL_S
check : true
database : @database
cb : (err, ttl) =>
if err
resp = message.save_blob(sha1:opts.uuid, error:err)
@dbg("handle_blob: error! -- #{err}")
else
resp = message.save_blob(sha1:opts.uuid, ttl:ttl)
@local_hub_socket (err, socket) =>
if not err
socket.write_mesg('json', resp)
local_hub_socket: (cb) =>
if @_socket?
cb(undefined, @_socket)
return
if @_local_hub_socket_connecting
@_local_hub_socket_queue.push(cb)
@dbg("local_hub_socket: added socket request to existing queue, which now has length #{@_local_hub_socket_queue.length}")
return
@_local_hub_socket_connecting = true
@_local_hub_socket_queue = [cb]
connecting_timer = undefined
cancel_connecting = () =>
@_local_hub_socket_connecting = false
if @_local_hub_socket_queue?
@dbg("local_hub_socket: cancelled due to timeout")
for c in @_local_hub_socket_queue
c?('timeout')
delete @_local_hub_socket_queue
clearTimeout(connecting_timer)
connecting_timer = setTimeout(cancel_connecting, 20000)
@dbg("local_hub_socket: getting new socket")
@new_socket (err, socket) =>
if not @_local_hub_socket_queue?
return
@_local_hub_socket_connecting = false
@dbg("local_hub_socket: new_socket returned #{err}")
if err
for c in @_local_hub_socket_queue
c?(err)
delete @_local_hub_socket_queue
else
socket.on 'mesg', (type, mesg) =>
switch type
when 'blob'
@handle_blob(mesg)
when 'json'
@handle_mesg(mesg, socket)
socket.on('end', @free_resources)
socket.on('close', @free_resources)
socket.on('error', @free_resources)
socket.write_mesg('json', {event:'hello'})
for c in @_local_hub_socket_queue
c?(undefined, socket)
delete @_local_hub_socket_queue
@_socket = socket
check_version_received = () =>
if @_socket? and not @smc_version?
@smc_version = 0
@restart_if_version_too_old()
setTimeout(check_version_received, 60*1000)
cancel_connecting()
new_socket: (cb) =>
@dbg("new_socket")
f = (cb) =>
if not @address?
cb("no address")
return
connect_to_a_local_hub
port : @address.port
host : @address.host
secret_token : @address.secret_token
cb : cb
socket = undefined
async.series([
(cb) =>
if not @address?
@dbg("get address of a working local hub")
@project (err, project) =>
if err
cb(err)
else
@dbg("get address")
project.address
cb : (err, address) =>
@address = address; cb(err)
else
cb()
(cb) =>
@dbg("try to connect to local hub socket using last known address")
f (err, _socket) =>
if not err
socket = _socket
cb()
else
@dbg("failed to get address of a working local hub")
@project (err, project) =>
if err
cb(err)
else
@dbg("get address")
project.address
cb : (err, address) =>
@address = address; cb(err)
(cb) =>
if not socket?
@dbg("still don't have our connection -- try again")
f (err, _socket) =>
socket = _socket; cb(err)
else
cb()
], (err) =>
cb(err, socket)
)
remove_multi_response_listener: (id) =>
delete @call_callbacks[id]
call: (opts) =>
opts = defaults opts,
mesg : required
timeout : undefined
multi_response : false
cb : undefined
@dbg("call")
if not opts.mesg.id?
if opts.timeout or opts.multi_response
opts.mesg.id = uuid.v4()
@local_hub_socket (err, socket) =>
if err
@dbg("call: failed to get socket -- #{err}")
opts.cb?(err)
return
@dbg("call: get socket -- now writing message to the socket -- #{misc.trunc(misc.to_json(opts.mesg),200)}")
socket.write_mesg 'json', opts.mesg, (err) =>
if err
@free_resources()
opts.cb?(err)
return
if opts.multi_response
@call_callbacks[opts.mesg.id] = opts.cb
else if opts.timeout
@call_callbacks[opts.mesg.id] = (resp) =>
delete @call_callbacks[opts.mesg.id]
if resp.event == 'error'
opts.cb(resp.error)
else
opts.cb(undefined, resp)
_open_session_socket: (opts) =>
opts = defaults opts,
client_id : required
session_uuid : required
type : required
params : required
project_id : required
timeout : 10
cb : required
@dbg("_open_session_socket")
key = "#{opts.session_uuid}:#{opts.client_id}"
socket = @_sockets[key]
if socket?
opts.cb(false, socket)
return
socket = undefined
async.series([
(cb) =>
@dbg("_open_session_socket: getting new socket connection to a local_hub")
@new_socket (err, _socket) =>
if err
cb(err)
else
socket = _socket
socket._key = key
@_sockets[key] = socket
if not @_sockets_by_client_id[opts.client_id]?
@_sockets_by_client_id[opts.client_id] = [socket]
else
@_sockets_by_client_id[opts.client_id].push(socket)
cb()
(cb) =>
mesg = message.connect_to_session
id : uuid.v4()
type : opts.type
project_id : opts.project_id
session_uuid : opts.session_uuid
params : opts.params
@dbg("_open_session_socket: send the message asking to be connected with a #{opts.type} session.")
socket.write_mesg('json', mesg)
f = (type, resp) =>
clearTimeout(timer)
if resp.event == 'error'
cb(resp.error)
else
if opts.type == 'console'
if resp.history?
socket.history = resp.history.slice(resp.history.length - 100000)
else
socket.history = ''
misc_node.disable_mesg(socket)
cb()
socket.once('mesg', f)
timed_out = () =>
socket.removeListener('mesg', f)
socket.end()
cb("Timed out after waiting #{opts.timeout} seconds for response from #{opts.type} session server. Please try again later.")
timer = setTimeout(timed_out, opts.timeout*1000)
], (err) =>
if err
@dbg("_open_session_socket: error getting a socket -- (declaring total disaster) -- #{err}")
@_socket?.destroy()
delete @_status; delete @_socket
else if socket?
opts.cb(false, socket)
)
console_session: (opts) =>
opts = defaults opts,
client : required
project_id : required
params : required
session_uuid : undefined
cb : required
@dbg("console_session: connect client to console session -- session_uuid=#{opts.session_uuid}")
if not opts.session_uuid?
opts.session_uuid = uuid.v4()
@_open_session_socket
client_id : opts.client.id
session_uuid : opts.session_uuid
project_id : opts.project_id
type : 'console'
params : opts.params
cb : (err, console_socket) =>
if err
opts.cb(err)
return
console_socket.removeAllListeners()
console_socket._ignore = false
console_socket.on 'end', () =>
winston.debug("console_socket (session_uuid=#{opts.session_uuid}): received 'end' so setting ignore=true")
opts.client.push_to_client(message.terminate_session(session_uuid:opts.session_uuid))
console_socket._ignore = true
delete @_sockets[console_socket._key]
recently_sent_reconnect = false
channel = opts.client.register_data_handler (data) =>
if not console_socket._ignore
try
console_socket.write(data)
catch
if opts.params.filename?
opts.client.touch(project_id:opts.project_id, path:opts.params.filename)
else
if not recently_sent_reconnect
recently_sent_reconnect = true
setTimeout( (()=>recently_sent_reconnect=false), 5000 )
winston.debug("console -- trying to write to closed console_socket with session_uuid=#{opts.session_uuid}")
opts.client.push_to_client(message.session_reconnect(session_uuid:opts.session_uuid))
mesg = message.session_connected
session_uuid : opts.session_uuid
data_channel : channel
history : console_socket.history
opts.cb(false, mesg)
f = (data) ->
if data.length > 20000
data = "[...]" + data.slice(data.length - 20000)
opts.client.push_data_to_client(channel, data)
console_socket.history += data
if console_socket.history.length > 150000
console_socket.history = console_socket.history.slice(console_socket.history.length - 100000)
console_socket.on('data', f)
terminate_session: (opts) =>
opts = defaults opts,
session_uuid : required
project_id : required
cb : undefined
@dbg("terminate_session")
@call
mesg :
message.terminate_session
session_uuid : opts.session_uuid
project_id : opts.project_id
timeout : 30
cb : opts.cb
read_file: (opts) =>
{path, project_id, archive, cb} = defaults opts,
path : required
project_id : required
archive : 'tar.bz2'
cb : required
@dbg("read_file '#{path}'")
socket = undefined
id = uuid.v4()
data = undefined
data_uuid = undefined
result_archive = undefined
async.series([
(cb) =>
@local_hub_socket (err, _socket) =>
if err
cb(err)
else
socket = _socket
cb()
(cb) =>
socket.write_mesg('json', message.read_file_from_project(id:id, project_id:project_id, path:path, archive:archive))
socket.recv_mesg
type : 'json'
id : id
timeout : 60
cb : (mesg) =>
switch mesg.event
when 'error'
cb(mesg.error)
when 'file_read_from_project'
data_uuid = mesg.data_uuid
result_archive = mesg.archive
cb()
else
cb("Unknown mesg event '#{mesg.event}'")
(cb) =>
socket.recv_mesg
type : 'blob'
id : data_uuid
timeout : 60
cb : (_data) =>
data = _data
data.archive = result_archive
cb()
], (err) =>
if err
cb(err)
else
cb(false, data)
)
write_file: (opts) =>
{path, project_id, cb, data} = defaults opts,
path : required
project_id : required
data : required
cb : required
@dbg("write_file '#{path}'")
id = uuid.v4()
data_uuid = uuid.v4()
@local_hub_socket (err, socket) =>
if err
opts.cb(err)
return
mesg = message.write_file_to_project
id : id
project_id : project_id
path : path
data_uuid : data_uuid
socket.write_mesg('json', mesg)
socket.write_mesg('blob', {uuid:data_uuid, blob:data})
socket.recv_mesg
type : 'json'
id : id
timeout : 10
cb : (mesg) =>
switch mesg.event
when 'file_written_to_project'
opts.cb()
when 'error'
opts.cb(mesg.error)
else
opts.cb("unexpected message type '#{mesg.event}'")