require('coffee-cache')
DEBUG = false
if not process.env.SMC_TEST
if process.env.SMC_DEBUG or process.env.DEVEL
DEBUG = true
net = require('net')
assert = require('assert')
fs = require('fs')
path_module = require('path')
underscore = require('underscore')
{EventEmitter} = require('events')
mime = require('mime')
program = undefined
misc_node = require('smc-util-node/misc_node')
SMC_ROOT = misc_node.SMC_ROOT
SALVUS_HOME = misc_node.SALVUS_HOME
OUTPUT_DIR = misc_node.OUTPUT_DIR
STATIC_PATH = path_module.join(SALVUS_HOME, OUTPUT_DIR)
WEBAPP_LIB = misc_node.WEBAPP_LIB
underscore = require('underscore')
misc = require('smc-util/misc')
{defaults, required} = misc
message = require('smc-util/message')
client_lib = require('smc-util/client')
{Client} = require('./client')
sage = require('./sage')
auth = require('./auth')
base_url = require('./base-url')
local_hub_connection = require('./local_hub_connection')
hub_proxy = require('./proxy')
MetricsRecorder = require('./metrics-recorder')
hub_http_server = require('./hub_http_server')
hub_register = require('./hub_register')
REGISTER_INTERVAL_S = 45
smc_version = {}
init_smc_version = () ->
smc_version = require('./hub-version')
smc_version.on 'change', (version) ->
winston.debug("smc_version changed -- sending updates to clients")
for id, c of clients
if c.smc_version < version.version
c.push_version_update()
to_json = misc.to_json
from_json = misc.from_json
async = require("async")
Cookies = require('cookies')
winston = require('winston')
winston.remove(winston.transports.Console)
if not process.env.SMC_TEST
winston.add(winston.transports.Console, {level: 'debug', timestamp:true, colorize:true})
database = null
{init_support} = require('./support')
clients = require('./clients').get_clients()
normalize_path = (path) ->
path = misc.trunc_middle(path, 2048)
ext = misc.filename_extension(path)
action = 'edit'
{head, tail} = misc.path_split(path)
if ext == "sage-chat"
action = 'chat'
if tail?[0] == '.'
path = path.slice(0, path.length-'.sage-chat'.length)
{head, tail} = misc.path_split(path)
tail = tail.slice(1)
if head
path = head + '/' + tail
else
path = tail
else if ext.slice(0,7) == 'syncdoc'
path = path.slice(0, path.length - ext.length - 1)
{head, tail} = misc.path_split(path)
tail = tail.slice(1)
if head
path = head + '/' + tail
else
path = tail
else if ext == "sage-history"
path = undefined
return {path:path, action:action}
path_activity_cache = {}
path_activity = (opts) ->
opts = defaults opts,
account_id : required
project_id : required
path : required
client : required
cb : undefined
{path, action} = normalize_path(opts.path)
winston.debug("path_activity(#{opts.account_id},#{opts.project_id},#{path}): #{action}")
if not path?
opts.cb?()
return
opts.client.touch
project_id : opts.project_id
path : path
action : action
force : action == 'chat'
cb : opts.cb
primus_server = undefined
init_primus_server = (http_server) ->
Primus = require('primus')
opts =
pathname : path_module.join(BASE_URL, '/hub')
primus_server = new Primus(http_server, opts)
winston.debug("primus_server: listening on #{opts.pathname}")
primus_server.on "connection", (conn) ->
winston.debug("primus_server: new connection from #{conn.address.ip} -- #{conn.id}")
primus_conn_sent_data = false
f = (data) ->
primus_conn_sent_data = true
id = data.toString()
winston.debug("primus_server: got id='#{id}'")
conn.removeListener('data',f)
C = clients[id]
if C?
if C.closed
winston.debug("primus_server: '#{id}' matches expired Client -- deleting")
delete clients[id]
C = undefined
else
winston.debug("primus_server: '#{id}' matches existing Client -- re-using")
C.query_cancel_all_changefeeds()
cookies = new Cookies(conn.request)
if C._remember_me_value == cookies.get(BASE_URL + 'remember_me')
old_id = C.conn.id
C.conn.removeAllListeners()
C.conn.end()
C.conn = conn
conn.id = id
conn.write(conn.id)
C.install_conn_handlers()
else
winston.debug("primus_server: '#{id}' matches but cookies do not match, so not re-using")
C = undefined
if not C?
winston.debug("primus_server: '#{id}' unknown, so making a new Client with id #{conn.id}")
conn.write(conn.id)
clients[conn.id] = new Client
conn : conn
logger : winston
database : database
compute_server : compute_server
host : program.host
port : program.port
conn.on("data",f)
no_data = ->
if conn? and not primus_conn_sent_data
winston.debug("primus_server: #{conn.id} sent no data after 15s, so closing")
conn.end()
setTimeout(no_data, 15000)
get_client_ids = (opts) ->
opts = defaults opts,
account_id : undefined
project_id : undefined
exclude : undefined
cb : required
result = []
include = (id) ->
if id not in result
if opts.exclude?
if id in opts.exclude
return
result.push(id)
account_ids = {}
if opts.account_id?
account_ids[opts.account_id] = true
async.series([
(cb) ->
if opts.project_id?
database.get_account_ids_using_project
project_id : opts.project_id
cb : (err, result) ->
if err
cb(err); return
for r in result
account_ids[r] = true
cb()
else
cb()
(cb) ->
for id, client of clients
if account_ids[client.account_id]?
include(id)
cb()
], (err) ->
opts.cb(err, result)
)
push_to_clients = (opts) ->
opts = defaults opts,
mesg : required
where : undefined
to : undefined
cb : undefined
dest = []
async.series([
(cb) ->
if opts.where?
get_client_ids(misc.merge(opts.where, cb:(error, result) ->
if error
opts.cb?(true)
cb(true)
else
dest = dest.concat(result)
cb()
))
else
cb()
(cb) ->
if opts.to?
dest = dest.concat(opts.to)
for id in dest
client = clients[id]
if client?
winston.debug("pushing a message to client #{id}")
client.push_to_client(opts.mesg)
else
winston.debug("not pushing message to client #{id} since not actually connected")
opts.cb?(false)
cb()
])
reset_password = (email_address, cb) ->
async.series([
(cb) ->
connect_to_database
pool : 1
cb : cb
(cb) ->
database.reset_password
email_address : email_address
cb : cb
], (err) ->
if err
winston.debug("Error -- #{err}")
else
winston.debug("Password changed for #{email_address}")
cb?()
)
database = undefined
connect_to_database = (opts) ->
opts = defaults opts,
error : undefined
pool : program.db_pool
cb : required
dbg = (m) -> winston.debug("connect_to_database (PostgreSQL): #{m}")
if database?
dbg("already done")
opts.cb(); return
dbg("connecting...")
database = require('./postgres').db
host : program.database_nodes.split(',')[0]
database : program.keyspace
concurrent_warn : program.db_concurrent_warn
database.connect(cb:opts.cb)
compute_server = undefined
init_compute_server = (cb) ->
winston.debug("init_compute_server: creating compute_server client")
f = (err, x) ->
if not err
winston.debug("compute server created")
else
winston.debug("FATAL ERROR creating compute server -- #{err}")
cb?(err)
return
compute_server = x
database.compute_server = compute_server
database.ensure_connection_to_project = (project_id) ->
local_hub_connection.connect_to_project(project_id, database, compute_server)
cb?()
if program.kucalc
f(undefined, require('./kucalc/compute-client').compute_client(database, winston))
else
require('./compute-client').compute_server
database : database
dev : program.dev
single : program.single
base_url : BASE_URL
cb : f
update_primus = (cb) ->
misc_node.execute_code
command : path_module.join(SMC_ROOT, WEBAPP_LIB, '/primus/update_primus')
cb : cb
delete_expired = (cb) ->
async.series([
(cb) ->
connect_to_database(cb:cb)
(cb) ->
database.delete_expired
count_only : false
cb : cb
], cb)
blob_maintenance = (cb) ->
async.series([
(cb) ->
connect_to_database(error:99999, pool:5, cb:cb)
(cb) ->
database.blob_maintenance(cb:cb)
], cb)
update_stats = (cb) ->
async.series([
(cb) ->
connect_to_database(error:99999, pool:5, cb:cb)
(cb) ->
database.get_stats(cb:cb)
], cb)
stripe_sync = (dump_only, cb) ->
dbg = (m) -> winston.debug("stripe_sync: #{m}")
dbg()
async.series([
(cb) ->
dbg("connect to the database")
connect_to_database(error:99999, cb:cb)
(cb) ->
require('./stripe/sync').stripe_sync
database : database
dump_only : dump_only
logger : winston
cb : cb
], cb)
BASE_URL = ''
metric_blocked = undefined
exports.start_server = start_server = (cb) ->
winston.debug("start_server")
winston.debug("dev = #{program.dev}")
BASE_URL = base_url.init(program.base_url)
winston.debug("base_url='#{BASE_URL}'")
fs.writeFileSync(path_module.join(SMC_ROOT, 'data', 'base_url'), BASE_URL)
winston.debug("port = #{program.port}, proxy_port=#{program.proxy_port}")
winston.info("using database #{program.keyspace}")
hosts = program.database_nodes.split(',')
http_server = express_router = undefined
blocked = require('blocked')
blocked (ms) ->
if ms > 0
metric_blocked?.inc(ms)
winston.debug("BLOCKED for #{ms}ms")
init_smc_version()
async.series([
(cb) ->
if not program.port
cb(); return
winston.debug("Initializing Metrics Recorder")
MetricsRecorder.init(winston, (err, mr) ->
if err?
cb(err)
else
metric_blocked = MetricsRecorder.new_counter('blocked_ms_total', 'accumulates the "blocked" time in the hub [ms]')
cb()
)
(cb) ->
winston.debug("Connecting to the database.")
misc.retry_until_success
f : (cb) -> connect_to_database(cb:cb)
start_delay : 1000
max_delay : 10000
cb : () ->
winston.debug("connected to database.")
cb()
(cb) ->
if not program.port
cb(); return
if program.dev or program.update
winston.debug("updating the database schema...")
database.update_schema(cb:cb)
else
cb()
(cb) ->
if not program.port
cb(); return
require('./stripe/connect').init_stripe
database : database
logger : winston
cb : cb
(cb) ->
if not program.port
cb(); return
init_support(cb)
(cb) ->
init_compute_server(cb)
(cb) ->
if not program.port
cb(); return
x = hub_http_server.init_express_http_server
base_url : BASE_URL
dev : program.dev
compute_server : compute_server
database : database
{http_server, express_router} = x
winston.debug("starting express webserver listening on #{program.host}:#{program.port}")
http_server.listen(program.port, program.host, cb)
(cb) ->
if not program.port
cb(); return
async.parallel([
(cb) ->
auth.init_passport
router : express_router
database : database
base_url : BASE_URL
host : program.host
cb : cb
(cb) ->
if (program.dev or program.update) and not program.kucalc
update_primus(cb)
else
cb()
], cb)
], (err) =>
if err
winston.error("Error starting hub services! err=#{err}")
else
winston.debug("base_url='#{BASE_URL}'")
if program.port
winston.debug("initializing primus websocket server")
init_primus_server(http_server)
if program.proxy_port
winston.debug("initializing the http proxy server on port #{program.proxy_port}")
hub_proxy.init_http_proxy_server
database : database
compute_server : compute_server
base_url : BASE_URL
port : program.proxy_port
host : program.host
if program.port
hub_register.start
database : database
clients : clients
host : program.host
port : program.port
interval_s : REGISTER_INTERVAL_S
winston.info("Started hub. HTTP port #{program.port}; keyspace #{program.keyspace}")
cb?(err)
)
add_user_to_project = (project_id, email_address, cb) ->
account_id = undefined
async.series([
(cb) ->
connect_to_database(cb:cb)
(cb) ->
database.account_exists
email_address : email_address
cb : (err, _account_id) ->
account_id = _account_id
cb(err)
(cb) ->
database.add_user_to_project
project_id : project_id
account_id : account_id
group : 'collaborator'
cb : cb
], cb)
command_line = () ->
program = require('commander')
daemon = require("start-stop-daemon")
default_db = process.env.PGHOST ? 'localhost'
program.usage('[start/stop/restart/status/nodaemon] [options]')
.option('--port <n>', 'port to listen on (default: 5000; 0 -- do not start)', ((n)->parseInt(n)), 5000)
.option('--proxy_port <n>', 'port that the proxy server listens on (default: 0 -- do not start)', ((n)->parseInt(n)), 0)
.option('--log_level [level]', "log level (default: debug) useful options include INFO, WARNING and DEBUG", String, "debug")
.option('--host [string]', 'host of interface to bind to (default: "127.0.0.1")', String, "127.0.0.1")
.option('--pidfile [string]', 'store pid in this file (default: "data/pids/hub.pid")', String, "data/pids/hub.pid")
.option('--logfile [string]', 'write log to this file (default: "data/logs/hub.log")', String, "data/logs/hub.log")
.option('--database_nodes <string,string,...>', "database address (default: '#{default_db}')", String, default_db)
.option('--keyspace [string]', 'Database name to use (default: "smc")', String, 'smc')
.option('--passwd [email_address]', 'Reset password of given user', String, '')
.option('--update', 'Update schema and primus on startup (always true for --dev; otherwise, false)')
.option('--stripe_sync', 'Sync stripe subscriptions to database for all users with stripe id', String, 'yes')
.option('--stripe_dump', 'Dump stripe subscriptions info to ~/stripe/', String, 'yes')
.option('--update_stats', 'Calculates the statistics for the /stats endpoint and stores them in the database', String, 'yes')
.option('--delete_expired', 'Delete expired data from the database', String, 'yes')
.option('--blob_maintenance', 'Do blob-related maintenance (dump to tarballs, offload to gcloud)', String, 'yes')
.option('--add_user_to_project [project_id,email_address]', 'Add user with given email address to project with given ID', String, '')
.option('--base_url [string]', 'Base url, so https://sitenamebase_url/', String, '')
.option('--local', 'If option is specified, then *all* projects run locally as the same user as the server and store state in .sagemathcloud-local instead of .sagemathcloud; also do not kill all processes on project restart -- for development use (default: false, since not given)', Boolean, false)
.option('--foreground', 'If specified, do not run as a deamon')
.option('--kucalc', 'if given, assume running in the KuCalc kubernetes environment')
.option('--dev', 'if given, then run in VERY UNSAFE single-user local dev mode')
.option('--single', 'if given, then run in LESS SAFE single-machine mode')
.option('--db_pool <n>', 'number of db connections in pool (default: 1)', ((n)->parseInt(n)), 1)
.option('--db_concurrent_warn <n>', 'be very unhappy if number of concurrent db requests exceeds this (default: 300)', ((n)->parseInt(n)), 300)
.parse(process.argv)
if program._name.slice(0,3) == 'hub'
process.addListener "uncaughtException", (err) ->
winston.debug("BUG ****************************************************************************")
winston.debug("Uncaught exception: " + err)
winston.debug(err.stack)
winston.debug("BUG ****************************************************************************")
database?.uncaught_exception(err)
if program.passwd
winston.debug("Resetting password")
reset_password(program.passwd, (err) -> process.exit())
else if program.stripe_sync
winston.debug("Stripe sync")
stripe_sync(false, (err) -> winston.debug("DONE", err); process.exit())
else if program.stripe_dump
winston.debug("Stripe dump")
stripe_sync(true, (err) -> winston.debug("DONE", err); process.exit())
else if program.delete_expired
delete_expired (err) ->
winston.debug("DONE", err)
process.exit()
else if program.blob_maintenance
blob_maintenance (err) ->
winston.debug("DONE", err)
process.exit()
else if program.update_stats
update_stats (err) ->
winston.debug("DONE", err)
process.exit()
else if program.add_user_to_project
console.log("Adding user to project")
v = program.add_user_to_project.split(',')
add_user_to_project v[0], v[1], (err) ->
if err
console.log("Failed to add user: #{err}")
else
console.log("User added to project.")
process.exit()
else
console.log("Running hub; pidfile=#{program.pidfile}, port=#{program.port}, proxy_port=#{program.proxy_port}")
if program.foreground
start_server (err) ->
if err and program.dev
process.exit(1)
else
daemon({pidFile:program.pidfile, outFile:program.logfile, errFile:program.logfile, logFile:'/dev/null', max:30}, start_server)
if process.argv.length > 1
command_line()