fs = require('fs')
winston = require('winston')
winston.remove(winston.transports.Console)
winston.add(winston.transports.Console, {level: 'debug', timestamp:true, colorize:true})
async = require('async')
temp = require('temp')
misc = require('smc-util/misc')
{defaults, required} = misc
filename = (path) -> misc.path_split(path).tail
misc_node = require('smc-util-node/misc_node')
PROJECT = process.env.SMC_PROJECT ? 'sage-math-inc'
DEFAULT_ZONE = 'us-central1-c'
exports.gcloud = (opts) ->
return new GoogleCloud(opts)
age_h = (time) -> (new Date() - time)/(3600*1000)
age_s = (time) -> (new Date() - time)/1000
onCompleteOpts =
maxAttempts : 1200
handle_operation = (err, operation, done, cb) ->
if err
done()
cb?(err)
else
operation.onComplete onCompleteOpts, (err, metadata) ->
done()
if err
cb?(err)
else if metadata.error
cb?(metadata.error)
else
cb?()
class VM
constructor: (@gcloud, @name, @zone=DEFAULT_ZONE) ->
@_vm = @gcloud._gce.zone(@zone).vm(@name)
dbg: (f) -> @gcloud.dbg("vm(name='#{@name}').#{f}")
show: =>
@get_metadata(cb:console.log)
_action: (cmd, cb) =>
dbg = @dbg(cmd)
dbg('calling api...')
start = misc.walltime()
@_vm[cmd] (err, operation, apiResponse) ->
handle_operation(err, operation, (->dbg("done -- took #{misc.walltime(start)}s")), cb)
stop: (opts={}) =>
@_action('stop', opts.cb)
start: (opts={}) =>
@_action('start', opts.cb)
reset: (opts={}) =>
@_action('reset', opts.cb)
delete: (opts={}) =>
opts = defaults opts,
keep_disks : undefined
cb : undefined
if opts.keep_disks
misc_node.execute_code
command : 'gcloud'
timeout : 3600
args : ['--quiet', 'compute', 'instances', 'delete', '--keep-disks', 'all', '--zone', @zone, @name]
cb : (err) => opts.cb?(err)
else
@_action('delete', opts.cb)
get_metadata: (opts) =>
opts = defaults opts,
cb : required
dbg = @dbg("metadata")
dbg("starting")
@_vm.getMetadata (err, metadata, apiResponse) =>
dbg("done")
opts.cb(err, metadata)
disks: (opts) =>
opts = defaults opts,
cb : required
@get_metadata
cb : (err, data) =>
if err
opts.cb(err)
else
disks = (@gcloud.disk(zone:@zone, name:filename(x.source)) for x in data.disks)
opts.cb(undefined, disks)
status: (opts) =>
opts = defaults opts,
cb : required
@get_metadata
cb : (err, x) =>
opts.cb(err, x?.status)
create_disk: (opts) =>
opts = defaults opts,
name : required
size_GB : undefined
type : 'pd-standard'
snapshot : undefined
cb : undefined
dbg = @dbg("create_disk(#{misc.to_json(misc.copy_without(opts, ['cb']))})")
async.series([
(cb) =>
dbg("creating disk...")
@gcloud.create_disk
name : opts.name
size_GB : opts.size_GB
type : opts.type
snapshot : opts.snapshot
zone : @zone
cb : cb
(cb) =>
dbg("attaching to this instance")
@gcloud.disk(name:opts.name, zone:@zone).attach_to
vm : @
cb : cb
], (err) => opts.cb?(err))
attach_disk: (opts) =>
opts = defaults opts,
disk : required
read_only : false
cb : undefined
dbg = @dbg("attach_disk")
if not (opts.disk instanceof Disk)
dbg("not Disk")
if typeof(opts.disk) == 'string'
dbg("is string so make disk")
opts.disk = @gcloud.disk(name:opts.disk, zone:@zone)
else
opts.cb?("disk must be an instance of Disk")
return
dbg("starting...")
options =
readOnly : opts.read_only
deviceName : opts.disk.name
@_vm.attachDisk opts.disk._disk, options, (err, operation, apiResponse) =>
handle_operation(err, operation, (->dbg("done")), opts.cb)
detach_disk: (opts) =>
opts = defaults opts,
disk : required
cb : undefined
dbg = @dbg("detach_disk")
if not (opts.disk instanceof Disk)
dbg("not Disk")
if typeof(opts.disk) == 'string'
dbg("is string so make disk")
opts.disk = @gcloud.disk(name:opts.disk, zone:@zone)
else
opts.cb?("disk must be an instance of Disk")
return
dbg("starting...")
vm_data = disk_data = undefined
async.series([
(cb) =>
dbg("getting disk and vm metadata in parallel")
async.parallel([
(cb) =>
@get_metadata
cb : (err, x) =>
vm_data = x; cb(err)
(cb) =>
opts.disk.get_metadata
cb : (err, x) =>
disk_data = x; cb(err)
], cb)
(cb) =>
deviceName = undefined
for x in vm_data.disks
if x.source == disk_data.selfLink
deviceName = x.deviceName
break
dbg("determined that local deviceName is '#{deviceName}'")
if not deviceName
dbg("already done -- disk not connected to this machine")
cb()
return
disk = @gcloud._gce.zone(@zone).disk(deviceName)
dbg("doing the detachDisk operation")
@_vm.detachDisk disk, (err, operation, apiResponse) =>
handle_operation(err, operation, (->dbg("done")), cb)
], (err) => opts.cb?(err))
get_serial_console: (opts) =>
opts = defaults opts,
cb : required
@_vm.getSerialPortOutput (err, output) => opts.cb(err, output)
show_console: =>
@get_serial_console
cb : (err, output) =>
if err
console.log("ERROR -- ", err)
else
n = output.length
if n > 15000
output = output.slice(n - 15000)
console.log(output)
change: (opts) =>
opts = defaults opts,
preemptible : undefined
type : undefined
zone : undefined
storage : undefined
boot_size_GB : undefined
boot_type : undefined
start : undefined
cb : undefined
dbg = @dbg("change(#{misc.to_json(misc.map_without_undefined(misc.copy_with(opts, ['cb'])))})")
dbg()
data = undefined
changes = {}
no_change = false
external = undefined
boot_disk_name = undefined
async.series([
(cb) =>
dbg('get vm metadata to see what needs to be changed')
@get_metadata
cb : (err, x) =>
data = x; cb(err)
(cb) =>
external = data.networkInterfaces?[0]?.accessConfigs?[0]?.natIP
if not external?
cb()
else
dbg('get all static external ip addresses')
@gcloud.get_external_static_addresses
cb : (err, v) =>
if err
cb(err)
else
is_reserved = false
for x in v
if x.metadata?.address == external
is_reserved = true
break
if not is_reserved
external = undefined
cb()
(cb) =>
if opts.preemptible? and data.scheduling.preemptible != opts.preemptible
changes.preemptible = opts.preemptible
if opts.type? and filename(data.machineType) != opts.type
changes.type = opts.type
if opts.zone? and filename(data.zone) != opts.zone
changes.zone = opts.zone
if opts.storage? and @_storage(data) != opts.storage
changes.storage = opts.storage
if not opts.boot_size_GB? and not opts.boot_type?
cb(); return
boot_disk = undefined
for x in data.disks
if x.boot
boot_disk_name = filename(x.source)
boot_disk = @gcloud.disk(name: boot_disk_name)
break
if not boot_disk?
cb(); return
boot_disk.get_metadata
cb : (err, data) =>
if err
cb(err)
else
if opts.boot_size_GB? and parseInt(data.sizeGb) != opts.boot_size_GB
changes.boot_size_GB = opts.boot_size_GB
if opts.boot_type? and filename(data.type) != opts.boot_type
changes.boot_type = opts.boot_type
cb()
(cb) =>
dbg("determined changes=#{misc.to_json(changes)}")
no_change = misc.len(changes) == 0
if no_change
cb(); return
dbg("data.status = '#{data.status}'")
if data.status != 'TERMINATED'
dbg("Ensure machine is off.")
@stop(cb:cb)
else
cb()
(cb) =>
if no_change
cb(); return
dbg("delete machine (not deleting any attached disks)")
@delete
keep_disks : true
cb : cb
(cb) =>
if no_change
cb(); return
if not changes.zone
cb(); return
dbg("move non-boot disks to new zone")
f = (disk, cb) =>
dbg("moving disk '#{disk}'")
d = @gcloud.disk(name:disk, zone:@zone)
async.series([
(cb) =>
d.copy
zone : changes.zone
cb : cb
(cb) =>
d.delete
cb : cb
], cb)
async.map((filename(x.source) for x in data.disks when not x.boot), f, cb)
(cb) =>
if no_change
cb(); return
if boot_disk_name? and (changes.boot_size_GB? or changes.boot_type? or changes.zone?)
dbg("change the size, type, location of the boot disk")
@gcloud.disk(name:boot_disk_name, zone:changes.zone).change
size_GB : changes.boot_size_GB
type : changes.boot_type
zone : changes.zone
cb : cb
else
cb()
(cb) =>
if no_change
cb(); return
if changes.zone
for d in data.disks
@_mutate_disk_zone(d, changes.zone)
dbg("Create machine with new params, disks, starting if it was running initially (but not otherwise).")
@gcloud.create_vm
name : @name
zone : changes.zone ? @zone
disks : data.disks
type : changes.type ? filename(data.machineType)
tags : data.tags.items
preemptible : changes.preemptible ? data.scheduling.preemptible
storage : changes.storage ? @_storage(data)
external : external
cb : cb
(cb) =>
if no_change or data.status == 'RUNNING' or opts.start
cb(); return
dbg("Stop machine")
@stop(cb:cb)
], (err) =>
opts.cb?(err)
)
_storage: (data) =>
{parse} = require('path')
for x in data.serviceAccounts ? []
for s in x.scopes
p = parse(s)
if p.name == 'devstorage'
return p.ext.slice(1)
return undefined
_mutate_disk_zone: (meta, zone) =>
i = meta.source.indexOf('/zones/')
j = meta.source.indexOf('/', i+7)
meta.source = meta.source.slice(0,i+7) + zone + meta.source.slice(j)
return
_mutate_disk_name: (meta, name) =>
i = meta.source.lastIndexOf('/')
meta.source = meta.source.slice(0,i+1) + name
return
keep_running: (opts={}) =>
opts = defaults opts,
interval_s : 30
dbg = @dbg("keep_running(interval_s=#{opts.interval_s})")
dbg()
check = () =>
dbg('check')
@status
cb: (err, status) =>
if status == 'TERMINATED'
dbg("attempting to start since status is TERMINATED")
@start
cb : (err) =>
dbg("result of start -- #{err}")
setInterval(check, opts.interval_s*1000)
copy: (opts) =>
opts = defaults opts,
name : required
preemptible : undefined
type : undefined
zone : undefined
boot_size_GB : undefined
start : true
cb : undefined
dbg = @dbg("copy(name='#{opts.name}')")
dbg(misc.to_json(misc.copy_without(opts, ['cb'])))
if opts.name == @name
opts.cb("must specify a different name")
return
create_opts =
name : opts.name
data = undefined
async.series([
(cb) =>
dbg('get vm metadata')
@get_metadata
cb : (err, x) =>
data = x; cb(err)
(cb) =>
create_opts.preemptible = opts.preemptible ? data.scheduling.preemptible
create_opts.type = opts.type ? filename(data.machineType)
create_opts.zone = opts.zone ? filename(data.zone)
create_opts.storage = opts.storage ? @_storage(data)
create_opts.tags = data.tags?.items ? []
create_opts.external = data.networkInterfaces?[0]?.accessConfigs?[0]?.natIP?
for disk in data.disks
if disk.mode == 'READ_WRITE' and not misc.startswith(filename(disk.source), @name)
cb("all READ_WRITE disks must start with '#{@name}' but '#{filename(disk.source)}' does not")
return
create_opts.disks = []
f = (disk, cb) =>
if disk.mode != 'READ_WRITE'
if create_opts.zone == filename(data.zone)
create_opts.disks.push(disk)
cb()
return
src_name = filename(disk.source)
new_name = opts.name + src_name.slice(@name.length)
dbg("copying a disk: '#{src_name}' --> '#{new_name}'")
@gcloud.disk(zone:filename(data.zone), name:src_name).copy
name : new_name
zone : create_opts.zone
size_GB : if disk.boot and opts.boot_size_GB then opts.boot_size_GB
cb : (err) =>
if err
cb(err)
else
if create_opts.zone != filename(data.zone)
@_mutate_disk_zone(disk, create_opts.zone)
@_mutate_disk_name(disk, new_name)
create_opts.disks.push(disk)
cb()
async.map(data.disks, f, cb)
(cb) =>
dbg("Create copy machine with options #{misc.to_json(create_opts)}")
create_opts.cb = cb
@gcloud.create_vm(create_opts)
(cb) =>
if data.status == 'RUNNING' or opts.start
cb(); return
dbg("Stop machine")
@gcloud.vm(name: opts.name).stop(cb:cb)
], (err) =>
if err
opts.cb?(err)
else
opts.cb?(undefined, @gcloud.vm(name: opts.name))
)
class Disk
constructor: (@gcloud, @name, @zone=DEFAULT_ZONE) ->
@_disk = @gcloud._gce.zone(@zone).disk(@name)
dbg: (f) -> @gcloud.dbg("disk.#{f}")
show: =>
@get_metadata(cb:console.log)
copy: (opts) =>
opts = defaults opts,
name : @name
zone : @zone
size_GB : undefined
type : undefined
cb : required
dbg = @dbg("copy(name=#{misc.to_json(misc.copy_without(opts, ['cb']))})")
dbg()
if @name == opts.name and @zone == opts.zone
dbg("nothing to do")
opts.cb()
return
@_utility
name : opts.name
size_GB : opts.size_GB
type : opts.type
zone : opts.zone
delete : false
cb : opts.cb
change: (opts) =>
opts = defaults opts,
size_GB : undefined
type : undefined
zone : undefined
cb : required
dbg = @dbg("change()")
dbg(misc.to_json(misc.copy_without(opts, ['cb'])))
if not opts.size_GB? and not opts.type? and not opts.zone?
dbg("nothing to do")
opts.cb()
return
@_utility
size_GB : opts.size_GB
type : opts.type
zone : opts.zone
delete : true
cb : opts.cb
_utility: (opts) =>
opts = defaults opts,
name : @name
size_GB : undefined
type : undefined
zone : @zone
delete : false
cb : undefined
dbg = @dbg("_utility(name=#{misc.to_json(misc.copy_without(opts, ['cb']))})")
dbg()
vms = undefined
snapshot_name = undefined
async.series([
(cb) =>
if not opts.size_GB?
cb()
else
dbg("size consistency check")
if opts.size_GB < 10
cb("size_GB must be at least 10")
return
@get_size_GB
cb : (err, size_GB) =>
if err
cb(err)
else
if opts.size_GB < size_GB
cb("Requested disk size cannot be smaller than the current size")
else
cb()
(cb) =>
dbg("determine new disk type")
if opts.type
cb()
else
@get_type
cb : (err, type) =>
opts.type = type
cb(err)
(cb) =>
snapshot_name = "temp-#{@name}-#{misc.uuid()}"
dbg("create snapshot with name #{snapshot_name}")
@snapshot
name : snapshot_name
cb : cb
(cb) =>
if not opts.delete
cb(); return
dbg("detach disk from any vms")
@detach
cb : (err, x) =>
vms = x
cb(err)
(cb) =>
if not opts.delete
cb(); return
dbg("delete disk")
@delete(cb : cb)
(cb) =>
dbg("make new disk from snapshot")
@gcloud.snapshot(name:snapshot_name).create_disk
name : opts.name
size_GB : opts.size_GB
type : opts.type
zone : opts.zone
cb : cb
(cb) =>
if not vms? or vms.length == 0
cb(); return
if opts.zone? and @zone != opts.zone
cb(); return
dbg("remount new disk on same vms")
f = (vm, cb) =>
vm.attach_disk
disk : opts.name
read_only : vms.length > 1
cb : cb
async.map(vms, f, cb)
(cb) =>
if not snapshot_name?
cb(); return
dbg("clean up snapshot #{snapshot_name}")
@gcloud.snapshot(name:snapshot_name).delete(cb : cb)
], (err) =>
opts.cb?(err)
)
snapshot: (opts) =>
opts = defaults opts,
name : required
cb : undefined
dbg = @dbg('snapshot')
dbg('calling api')
start = misc.walltime()
done = -> dbg("done -- took #{misc.walltime(start)}s")
@_disk.createSnapshot opts.name, (err, snapshot, operation, apiResponse) =>
handle_operation(err, operation, done, opts.cb)
get_size_GB: (opts) =>
opts = defaults opts,
cb : required
@get_metadata
cb : (err, data) =>
opts.cb(err, if data? then parseInt(data.sizeGb))
get_type: (opts) =>
opts = defaults opts,
cb : required
@get_metadata
cb : (err, data) =>
opts.cb(err, if data? then filename(data.type))
get_metadata: (opts) =>
opts = defaults opts,
cb : required
dbg = @dbg("metadata")
dbg("starting")
@_disk.getMetadata (err, metadata, apiResponse) =>
dbg("done")
opts.cb(err, metadata)
get_snapshots: (opts) =>
opts = defaults opts,
cb : required
dbg = @dbg("get_snapshots")
id = undefined
s = undefined
async.series([
(cb) =>
dbg("determining id of disk")
@get_metadata
cb : (err, data) =>
id = data?.id; cb(err)
(cb) =>
dbg("get all snapshots with given id as source")
@gcloud.get_snapshots
filter : "sourceDiskId eq #{id}"
cb : (err, snapshots) =>
if err
cb(err)
else
s = (@gcloud.snapshot(name:x.name) for x in snapshots)
cb()
], (err) =>
opts.cb(err, s)
)
delete: (opts) =>
opts = defaults opts,
keep_disks : undefined
cb : undefined
dbg = @dbg("delete")
dbg("starting")
@_disk.delete (err, operation, apiResponse) =>
handle_operation(err, operation, (->dbg('done')), opts.cb)
attach_to: (opts) =>
opts = defaults opts,
vm : required
read_only : false
cb : required
if not (opts.vm instanceof VM)
if typeof(opts.vm) == 'string'
opts.vm = @gcloud.vm(name:opts.vm, zone:@zone)
else
opts.cb("vm must be an instance of VM")
return
opts.vm.attach_disk
disk : @
read_only : opts.read_only
cb : opts.cb
detach: (opts) =>
opts = defaults opts,
vm : undefined
cb : undefined
dbg = @dbg("detach")
vms = undefined
async.series([
(cb) =>
if opts.vm?
vms = [opts.vm]
cb()
else
dbg("determine vm that disk is attached to")
@get_metadata
cb : (err, data) =>
if err
cb(err)
else
vms = (@gcloud.vm(name:filename(u), zone:@zone) for u in (data.users ? []))
cb()
(cb) =>
dbg('actually detach disk from that vm')
f = (vm, cb) =>
vm.detach_disk
disk : @
cb : cb
async.map(vms, f, cb)
], (err) => opts.cb?(err, vms))
class Snapshot
constructor: (@gcloud, @name) ->
@_snapshot = @gcloud._gce.snapshot(@name)
show: =>
@get_metadata(cb:console.log)
dbg: (f) -> @gcloud.dbg("snapshot.#{f}")
delete: (opts) =>
opts = defaults opts,
cb : undefined
dbg = @dbg("delete")
dbg("starting")
@_snapshot.delete (err, operation, apiResponse) =>
handle_operation(err, operation, (->dbg('done')), opts.cb)
get_metadata: (opts) =>
opts = defaults opts,
cb : required
dbg = @dbg("metadata")
dbg("starting")
@_snapshot.getMetadata (err, metadata, apiResponse) =>
dbg("done")
opts.cb(err, metadata)
get_size_GB: (opts) =>
opts = defaults opts,
cb : required
if @_snapshot.metadata.storageBytes
opts.cb(undefined, @_snapshot.metadata.storageBytes / 1000 / 1000 / 1000)
else
@get_metadata
cb : (err, data) =>
if err
opts.cb(err)
else
opts.cb(undefined, data.storageBytes / 1000 / 1000 / 1000)
create_disk: (opts) =>
opts = defaults opts,
name : required
size_GB : undefined
type : 'pd-standard'
zone : DEFAULT_ZONE
cb : undefined
dbg = @dbg("create_disk(#{misc.to_json(misc.copy_without(opts, ['cb']))})")
if opts.size_GB? and opts.size_GB < 10
opts.cb?("size_GB must be at least 10")
return
opts.snapshot = @name
@gcloud.create_disk(opts)
class GoogleCloud
constructor: (opts={}) ->
opts = defaults opts,
debug : true
db : undefined
@db = opts.db
@_debug = opts.debug
if @_debug
@dbg = (f) -> ((m) -> winston.debug("gcloud.#{f}: #{m}"))
else
@dbg = (f) -> (->)
@_gcloud = require('gcloud')(projectId: PROJECT)
@_gce = @_gcloud.compute()
get_external_static_addresses: (opts) =>
opts = defaults opts,
cb : required
@_gce.getAddresses(opts.cb)
create_vm: (opts) =>
opts = defaults opts,
name : required
zone : DEFAULT_ZONE
disks : undefined
http : undefined
https : undefined
type : undefined
os : undefined
tags : undefined
preemptible : false
storage : undefined
external : true
cb : required
dbg = @dbg("create_vm(name=#{opts.name})")
config = {}
config.http = opts.http if opts.http?
config.https = opts.https if opts.https?
config.machineType = opts.type if opts.type?
config.os = opts.os if opts.os?
config.tags = opts.tags if opts.tags?
config.networkInterfaces = [{network: 'global/networks/default', accessConfigs:[]}]
if opts.external
net =
name: "External NAT"
type: "ONE_TO_ONE_NAT"
if typeof(opts.external) == 'string'
if opts.external.indexOf('.') != -1
net.natIP = opts.external
else
@get_external_static_addresses
cb : (err, v) =>
if err
opts.cb(err)
else
for x in v
if x.name == opts.external
opts.external = x.metadata.address
@create_vm(opts)
return
opts.cb("unknown static external interface '#{opts.external}'")
return
config.networkInterfaces[0].accessConfigs.push(net)
if opts.storage? and opts.storage != ''
if typeof(opts.storage) != 'string'
opts.cb("opts.storage=#{opts.storage}, typeof=#{typeof(opts.storage)}, must be a string")
return
config.serviceAccounts = [{email:'default', scopes:[]}]
config.serviceAccounts[0].scopes.push("https://www.googleapis.com/auth/devstorage.#{opts.storage}")
if opts.preemptible
config.scheduling = {preemptible : true}
else
config.scheduling =
onHostMaintenance: "MIGRATE"
automaticRestart: true
if opts.disks?
config.disks = []
for disk in opts.disks
if typeof(disk) == 'string'
disk = @disk(name:disk, zone:opts.zone)
if disk instanceof Disk
config.disks.push({source:disk._disk.formattedName})
else
config.disks.push(disk)
if config.disks.length > 0 and (x for x in config.disks when x.boot).length == 0
config.disks[0].boot = true
dbg("config=#{misc.to_json(config)}")
@_gce.zone(opts.zone).createVM opts.name, config, (err, vm, operation, apiResponse) =>
handle_operation(err, operation, (->dbg('done')), opts.cb)
vm: (opts) =>
opts = defaults opts,
name : required
zone : DEFAULT_ZONE
key = "#{opts.name}-#{opts.zone}"
@_vm_cache ?= {}
return (@_vm_cache[key] ?= new VM(@, opts.name, opts.zone))
disk: (opts) =>
opts = defaults opts,
name : required
zone : DEFAULT_ZONE
key = "#{opts.name}-#{opts.zone}"
@_disk_cache ?= {}
return (@_disk_cache[key] ?= new Disk(@, opts.name, opts.zone))
create_disk: (opts) =>
opts = defaults opts,
name : required
size_GB : undefined
type : 'pd-standard'
zone : DEFAULT_ZONE
snapshot : undefined
cb : undefined
dbg = @dbg("create_disk(#{misc.to_json(misc.copy_without(opts, ['cb']))})")
if opts.size_GB? and opts.size_GB < 10
opts.cb?("size_GB must be at least 10")
return
dbg("starting...")
config = {}
if opts.snapshot?
config.sourceSnapshot = "global/snapshots/#{opts.snapshot}"
config.sizeGb = opts.size_GB if opts.size_GB?
config.type = "zones/#{opts.zone}/diskTypes/#{opts.type}"
@_gce.zone(opts.zone).createDisk opts.name, config, (err, disk, operation, apiResponse) =>
handle_operation(err, operation, (->dbg('done')), (err) => opts.cb?(err))
snapshot: (opts) =>
opts = defaults opts,
name : required
key = opts.name
@_snapshot_cache ?= {}
return (@_snapshot_cache[key] ?= new Snapshot(@, opts.name))
get_snapshots: (opts) =>
opts = defaults opts,
filter : undefined
match : undefined
cb : required
options = {maxResults:500}
options.filter = opts.filter if opts.filter?
dbg = @dbg("get_snapshots")
dbg("options=#{misc.to_json(options)}")
if opts.match?
opts.match = opts.match.toLowerCase()
@_gce.getSnapshots options, (err, snapshots) =>
dbg("done")
if err
opts.cb(err)
else
s = []
for x in snapshots
i = x.metadata.sourceDisk.indexOf('/zones/')
if opts.match? and x.name.toLowerCase().indexOf(opts.match) == -1
continue
s.push
name : x.name
timestamp : new Date(x.metadata.creationTimestamp)
size_GB : x.metadata.storageBytes / 1000 / 1000 / 1000
source : x.metadata.sourceDisk.slice(i+7)
opts.cb(undefined, s)
get_disks: (opts) =>
opts = defaults opts,
filter : undefined
match : undefined
cb : required
options = {maxResults:500}
options.filter = opts.filter if opts.filter?
dbg = @dbg("get_disks")
dbg("options=#{misc.to_json(options)}")
if opts.match?
opts.match = opts.match.toLowerCase()
@_gce.getDisks options, (err, disks) =>
dbg("done")
if err
opts.cb(err)
else
s = []
for x in disks
if opts.match? and x.name.toLowerCase().indexOf(opts.match) == -1
continue
size_GB = parseInt(x.metadata.sizeGb)
type = filename(x.metadata.type)
switch type
when 'pd-standard'
cost = size_GB * 0.04
when 'pd-ssd'
cost = size_GB * 0.17
else
cost = size_GB * 0.21
s.push
name : x.name
zone : x.zone.name
size_GB : size_GB
type : type
cost_month : cost
opts.cb(undefined, s)
get_vms: (opts) =>
opts = defaults opts,
cb : required
dbg = @dbg("get_vms")
dbg('starting...')
@_gce.getVMs (err, vms) =>
dbg('done')
if err
opts.cb(err)
else
for x in vms
if x.zone?
delete x.zone
opts.cb(undefined, vms)
get_operations: (opts) =>
opts = defaults opts,
cb : required
@dbg("get_operations")()
@_gce.getOperations {filter:"status ne 'DONE'", maxResults:500}, (err, operations) => opts.cb(err, operations)
_check_db: (cb) =>
if not @db
cb?("database not defined")
return true
vm_manager: (opts) =>
opts = defaults opts,
interval_s : 15
all_m : 10
manage : true
if not @db?
throw "database not defined!"
opts.gcloud = @
return new VM_Manager(opts)
bucket: (opts) =>
opts = defaults opts,
name : required
return new Bucket(@, opts.name)
gcloud_bucket_cache = {}
class Bucket
constructor: (@gcloud, @name) ->
@_bucket = gcloud_bucket_cache[@name]
@_bucket ?= gcloud_bucket_cache[@name] = @gcloud._gcloud.storage().bucket(@name)
dbg: (f) -> @gcloud.dbg("Bucket.#{f}")
delete: (opts) =>
opts = defaults opts,
name : required
cb : undefined
dbg = @dbg("delete(name='#{opts.name}')")
dbg()
@_bucket.file(opts.name).delete (err) => opts.cb?(err)
write: (opts) =>
opts = defaults opts,
name : required
content : required
cb : undefined
dbg = @dbg("write(name='#{opts.name}')")
dbg()
stream = @_bucket.file(opts.name).createWriteStream()
stream.write(opts.content)
stream.end()
stream.on 'finish', =>
dbg('finish')
opts.cb?()
delete opts.cb
stream.on 'error', (err) =>
dbg("err = '#{JSON.stringify(err)}'")
if err
@_write_using_gsutil(opts)
return
_write_using_gsutil: (opts) =>
opts = defaults opts,
name : required
content : required
cb : undefined
dbg = @dbg("_write_using_gsutil(name='#{opts.name}')")
dbg()
info = undefined
async.series([
(cb) =>
dbg("write content to a file")
temp.open '', (err, _info) ->
if err
cb(err)
else
info = _info
dbg("temp file = '#{info.path}'")
fs.writeFile(info.fd, opts.content, cb)
(cb) =>
dbg("close")
fs.close(info.fd, cb)
(cb) =>
dbg("call gsutil via shell")
misc_node.execute_code
command : 'gsutil'
args : ['cp', info.path, "gs://#{@name}/#{opts.name}"]
timeout : 30
cb : cb
], (err) =>
if info?
try
fs.unlink(info.path)
catch e
dbg("error unlinking")
opts.cb?(err)
)
read: (opts) =>
opts = defaults opts,
name : required
cb : required
dbg = @dbg("read(name='#{opts.name}')")
dbg()
stream = @_bucket.file(opts.name).download (err, content) =>
if err
dbg("error = '#{err}")
else
dbg('done')
opts.cb(err, content)
return
class VM_Manager
constructor: (opts) ->
opts = defaults opts,
gcloud : required
interval_s : required
all_m : required
manage : required
@_manage = opts.manage
@_action_timeout_m = 15
@_switch_back_to_preemptible_m = 120
@gcloud = opts.gcloud
dbg = @_dbg("start(interval_s:#{opts.interval_s}, all_m:#{opts.all_m})")
@_init_instances_table()
if @_manage
dbg('starting vm manager monitoring')
@_init_timers(opts)
return
close: () =>
@_dbg('close')()
if @_update_interval?
clearInterval(@_update_interval)
delete @_update_interval
if @_update_all?
clearInterval(@_update_all)
delete @_update_all
if @_instances_table?
@_instances_table.close()
delete @_instances_table
request: (opts) =>
opts = defaults opts,
name : required
status : undefined
preemptible : undefined
cb : undefined
obj = {}
if opts.status?
if opts.status not in ['TERMINATED', 'RUNNING']
err = "status must be 'TERMINATED' or 'RUNNING'"
winston.debug(err)
opts.cb?(err)
return
obj.requested_status = opts.status
if opts.preemptible?
obj.requested_preemptible = !! opts.preemptible
if misc.len(obj) == 0
opts.cb()
else
@gcloud.db.table('instances').get(opts.name).update(obj).run(opts.cb)
get_data: (name) =>
obj = @_instances_table?.get(name)?.toJS()
if obj?
return @_data(obj)
get_log: (opts) =>
opts = defaults opts,
name : undefined
age_m : undefined
cb : required
db = @gcloud.db
query = db.table('instance_actions_log')
if opts.name?
query = query.filter(name:opts.name)
if opts.age_m?
query = query.filter(db.r.row('action')('finished').ge(misc.minutes_ago(opts.age_m)))
query.run(opts.cb)
show_log: (opts) =>
opts = defaults opts,
name : undefined
age_m : undefined
cb : undefined
@get_log
name : opts.name
age_m : opts.age_m
cb : (err, log) =>
if err
console.log("ERROR: ", err)
else
log.sort (x,y) => misc.cmp(x.action?.started ? new Date(), y.action?.started ? new Date())
pad = (s) -> misc.pad_left(s ? '', 10)
for x in log
console.log "#{pad(x.name)} #{pad(x.action?.type)} #{pad(x.action?.action)} #{x.action?.started?.toLocaleString()} #{x.action?.finished?.toLocaleString()} #{pad(misc.round1((x.action?.finished - x.action?.started)/1000/60))} minutes '#{misc.to_json(x.action?.error ? '')}'"
opts.cb?(err)
monitor: () =>
f = () =>
console.log("\n\n-----------------------------\n\n\n")
@show_log(age_m : 60*24)
f()
setInterval(f, 60*1000)
_init_timers: (opts) =>
@_dbg("_init_timers")()
@_update_interval = setInterval(@_update_db, opts.interval_s * 1000)
@_udpate_all = setInterval(@_update_all, opts.all_m * 1000 * 60)
async.series([((cb)=>@_update_db(cb:cb)), @_update_all])
return
_dbg: (f) ->
return (m) -> winston.debug("VM_Manager.#{f}: #{m}")
_update_all: () =>
dbg = @_dbg("update_all")
dbg()
@_instances_table?.get().map (vm, key) =>
if vm.get('requested_status')
@_apply_rules(vm.toJS())
return
_update_db: (opts) =>
opts = defaults opts,
cb : undefined
dbg = @_dbg("update_db")
dbg()
db_data = {}
gce_data = {}
table = @gcloud.db.table('instances')
async.series([
(cb) =>
async.parallel([
(cb) =>
dbg("get info from Google Compute engine api about all VMs")
@gcloud.get_vms
cb : (err, data) =>
if err
cb(err)
else
for x in data
gce_data[x.name] = x
dbg("got gce api data about #{data.length} VMs")
cb()
(cb) =>
dbg("get info from our database about all VMs")
table.pluck('name', 'gce_sha1').run (err, data) =>
if err
cb(err)
else
for x in data
db_data[x.name] = x.gce_sha1
dbg("got database data about #{misc.len(db_data)} VMs")
cb()
], cb)
(cb) =>
objects = []
for name, x of gce_data
new_sha1 = misc_node.sha1(JSON.stringify(x))
sha1 = db_data[name]
if new_sha1 != sha1
objects.push(name:name, gce:x, gce_sha1:new_sha1)
if objects.length == 0
dbg("nothing changed")
cb()
else
dbg("#{objects.length} vms changed")
global.objects = objects
table.insert(objects, conflict:'update').run(cb)
], (err) =>
opts.cb?(err)
)
_init_instances_table: () =>
dbg = @_dbg("_init_instances_table")
dbg()
@gcloud.db.synctable
query : @gcloud.db.table('instances')
cb : (err, t) =>
if err
dbg("ERROR: #{err}")
else
dbg("initialized instances synctable")
@_instances_table = t
if @_manage
t.on 'change', (name) =>
@_apply_rules(t.get(name).toJS())
_is_in_progress: (vm) =>
if not vm.action?
return false
if vm.action.finished?
return false
if vm.action.started? and not vm.action.finished? and vm.action.started <= misc.minutes_ago(@_action_timeout_m)
return false
return true
_data: (vm) =>
data =
name : vm.name
gce_status : vm.gce.metadata.status
gce_preemptible : vm.gce.metadata.scheduling.preemptible
gce_created : new Date(vm.gce.metadata.creationTimestamp)
requested_status : vm.requested_status
requested_preemptible : vm.requested_preemptible
last_action : vm.action
return data
_apply_rules: (vm) =>
if not vm.requested_status
return
if @_is_in_progress(vm)
return
if not vm.gce?.metadata?.scheduling?
return
dbg = @_dbg("_apply_rules")
data = @_data(vm)
dbg(misc.to_json(data))
if @_rule1(data)
return
if @_rule2(data)
return
if @_rule3(data)
return
if @_rule4(data)
return
_rule0: (data) =>
if data.gce_status == 'TERMINATED' and data.requested_status == 'RUNNING'
@_action(data, 'start', 'rule1')
return true
_rule1: (data) =>
if data.gce_status == 'TERMINATED' and data.requested_status == 'RUNNING'
dbg = @_dbg("rule1('#{data.name}')")
dbg("terminated VM should be running")
if data.gce_preemptible and data.last_action?.started >= misc.minutes_ago(5) and data.last_action?.action == 'start'
dbg("Pre-emptible right now and there was an attempt to start it recently, so switch to non-pre-empt.")
@_action(data, 'non-preemptible', 'rule1')
else
@_action(data, 'start', 'rule1')
return true
_rule2: (data) =>
if data.gce_status == 'RUNNING' and data.requested_status == 'TERMINATED'
dbg = @_dbg("rule2('#{data.name}')")
dbg("running VM should be stopped")
@_action(data, 'stop', 'rule2')
return true
_rule3: (data) =>
if data.gce_status == 'RUNNING' and data.requested_status == 'RUNNING' and \
data.requested_preemptible and not data.gce_preemptible and data.gce_created <= misc.minutes_ago(@_switch_back_to_preemptible_m)
dbg = @_dbg("rule3('#{data.name}')")
dbg("switch running instance from non-preempt to preempt")
@_action(data, 'preemptible', 'rule3')
return true
_rule4: (data) =>
if data.gce_status == 'RUNNING' and data.requested_status == 'RUNNING' and \
not data.requested_preemptible and data.gce_preemptible
dbg = @_dbg("rule4('#{data.name}')")
dbg("switch running instance from preempt to non-preempt")
@_action(data, 'non-preemptible', 'rule4')
return true
_action: (data, action, type, cb) =>
db = @gcloud.db
query = db.table('instances').get(data.name)
dbg = @_dbg("_action(action='#{action}',host='#{data.name}')")
dbg(misc.to_json(data))
action_obj =
action : action
started : new Date()
type : type
log =
id : misc.uuid()
name : data.name
action : action_obj
async.series([
(cb) =>
dbg('set fact that action started in the database')
query.update(action:db.r.literal(action_obj)).run(cb)
(cb) =>
dbg("set log entry to #{misc.to_json(log)}")
db.table('instance_actions_log').insert(log).run(cb)
(cb) =>
vm = @gcloud.vm(name:data.name)
start = data.requested_status == 'RUNNING'
switch action
when 'start', 'stop'
vm[action](cb:cb)
when 'preemptible'
vm.change(preemptible:true, start:start, cb:cb)
when 'non-preemptible'
vm.change(preemptible:false, start:start, cb:cb)
else
cb("invalid action '#{action}'")
(cb) =>
dbg("update db view of GCE machine state, so we don't try to do action again right after finishing")
@_update_db(cb:cb)
], (err) =>
change = {finished:new Date()}
if err
change.error = err
db.table('instances').get(data.name).update(action:change).run(cb)
db.table('instance_actions_log').get(log.id).update(action : misc.merge(action_obj, change)).run (err) =>
if err
dbg("ERROR inserting log -- #{err}")
)
exports.copy_projects_disks = (v) ->
g = exports.gcloud()
f = (n, cb) ->
async.parallel([
(cb) ->
d = g.disk(name:"projects#{n}-base")
d.copy(name:"storage#{n}",cb:cb)
(cb) ->
d = g.disk(name:"projects#{n}")
d.copy(name:"storage#{n}-projects",cb:cb)
(cb) ->
d = g.disk(name:"projects#{n}-bup")
d.copy(name:"storage#{n}-bups", size_GB:200, cb:cb)
], (err) ->
if not err
g.create_vm(name:"storage#{n}", disks:["storage#{n}", "storage#{n}-projects", "storage#{n}-bups"], tags:['storage','http'], preemptible:false, storage:'read_write', cb:cb)
else
cb(err)
)
async.map v, f, (err)->
console.log("TOTOTALY DONE! -- #{err}")