Contact
CoCalc Logo Icon
StoreFeaturesDocsShareSupport News AboutSign UpSign In
| Download
Views: 39509
1
#!/usr/bin/env python
2
###############################################################################
3
#
4
# CoCalc: Collaborative Calculation in the Cloud
5
#
6
# Copyright (C) 2016, Sagemath Inc.
7
#
8
# LICENSE: AGPLv3
9
#
10
###############################################################################
11
12
13
"""
14
Administration and Launch control of salvus components
15
16
Use it like so
17
18
import admin; reload(admin); a = admin.Services('dev/smc/conf/cloud.sagemath.com')
19
a.monitor.go(10,3)
20
21
"""
22
23
####################
24
# Standard imports
25
####################
26
import json, logging, os, shutil, signal, socket, stat, subprocess, sys, time, tempfile
27
28
DISK_THRESHOLD = int(os.environ.get("SMC_DISK_THRESHOLD", '96'))
29
30
from string import Template
31
32
import misc
33
34
############################################################
35
# Paths where data and configuration are stored
36
############################################################
37
SITENAME = 'cocalc.com'
38
DOMAINNAME = 'cocalc.com'
39
DATA = 'data'
40
CONF = 'conf'
41
AGENT = os.path.join(os.environ['HOME'], '.ssh', 'agent')
42
PWD = os.path.abspath('.')
43
PIDS = os.path.join(DATA, 'pids') # preferred location for pid files
44
LOGS = os.path.join(DATA, 'logs') # preferred location for pid files
45
BIN = os.path.join(DATA, 'local', 'bin')
46
PYTHON = os.path.join(BIN, 'python')
47
SECRETS = os.path.join(DATA,'secrets')
48
49
50
# Read in socket of ssh-agent, if there is an AGENT file.
51
# NOTE: I'm using this right now on my laptop, but it's not yet
52
# deployed in production *yet*. When done, it will mean the
53
# ssh key used by the hub is password protected, which
54
# will be much more secure: someone who steals ~/.ssh gets nothing,
55
# though still if somebody logs in as the salvus user on one of
56
# these nodes, they can ssh to other nodes, though they can't
57
# change passwords, etc. Also, this means having the ssh private
58
# key on the compute vm's is no longer a security risk, since it
59
# is protected by a (very long, very random) passphrase.
60
if os.path.exists(AGENT):
61
for X in open(AGENT).readlines():
62
if 'SSH_AUTH_SOCK' in X:
63
# The AGENT file is as output by ssh-agent.
64
os.environ['SSH_AUTH_SOCK'] = X.split(';')[0][len('SSH_AUTH_SOCK='):]
65
66
# TODO: factor out all $HOME/smc/src style stuff in code below and use BASE.
67
BASE = 'smc/src/'
68
69
LOG_INTERVAL = 6
70
71
GIT_REPO='' # TODO
72
73
whoami = os.environ['USER']
74
75
# Default ports
76
HAPROXY_PORT = 8000
77
NGINX_PORT = 8080
78
79
HUB_PORT = 5000
80
HUB_PROXY_PORT = 5001
81
82
SYNCSTRING_PORT = 6001
83
84
85
####################
86
# Sending an email (useful for monitoring script)
87
# See http://www.nixtutor.com/linux/send-mail-through-gmail-with-python/
88
####################
89
90
def email(msg= '', subject='ADMIN -- %s' % DOMAINNAME, toaddrs='[email protected]', fromaddr='[email protected]'):
91
log.info("sending email to %s", toaddrs)
92
username = 'salvusmath'
93
password = open(os.path.join(os.environ['HOME'],'smc/src/data/secrets/salvusmath_email_password')
94
).read().strip()
95
import smtplib
96
from email.mime.text import MIMEText
97
msg = MIMEText(msg)
98
server = smtplib.SMTP('smtp.gmail.com:587')
99
server.starttls()
100
server.login(username,password)
101
for x in toaddrs.split(','):
102
toaddr = x.strip()
103
msg['Subject'] = subject
104
msg['From'] = fromaddr
105
msg['To'] = toaddr
106
server.sendmail(fromaddr, toaddr, msg.as_string())
107
server.quit()
108
109
def zfs_size(s):
110
"""
111
Convert a zfs size string to gigabytes (float)
112
"""
113
if len(s) == 0:
114
return 0.0
115
u = s[-1]; q = float(s[:-1])
116
if u == 'M':
117
q /= 1000
118
elif u == 'T':
119
q *= 1000
120
elif u == 'K':
121
q /= 1000000
122
return q
123
124
####################
125
# Running a subprocess
126
####################
127
MAXTIME_S=300
128
129
def run(args, maxtime=MAXTIME_S, verbose=True, stderr=True):
130
"""
131
Run the command line specified by args (using subprocess.Popen)
132
and return the stdout and stderr, killing the subprocess if it
133
takes more than maxtime seconds to run.
134
135
If stderr is false, don't include in the returned output.
136
137
If args is a list of lists, run all the commands separately in the
138
list.
139
140
if ignore_errors is true, completely ignores any error codes!
141
"""
142
if args and isinstance(args[0], list):
143
return '\n'.join([str(run(a, maxtime=maxtime, verbose=verbose)) for a in args])
144
145
args = [str(x) for x in args]
146
147
if maxtime:
148
def timeout(*a):
149
raise KeyboardInterrupt("running '%s' took more than %s seconds, so killed"%(' '.join(args), maxtime))
150
signal.signal(signal.SIGALRM, timeout)
151
signal.alarm(maxtime)
152
if verbose:
153
log.info("running '%s'", ' '.join(args))
154
try:
155
a = subprocess.Popen(args,
156
stdin = subprocess.PIPE,
157
stdout = subprocess.PIPE,
158
stderr = subprocess.PIPE)
159
if stderr:
160
out = a.stderr.read()
161
else:
162
out = ''
163
out += a.stdout.read()
164
if verbose:
165
log.info("output '%s'", out[:256])
166
return out
167
finally:
168
if maxtime:
169
signal.signal(signal.SIGALRM, signal.SIG_IGN) # cancel the alarm
170
171
# A convenience object "sh":
172
# sh['list', 'of', ..., 'arguments'] to run a shell command
173
174
class SH(object):
175
def __init__(self, maxtime=MAXTIME_S):
176
self.maxtime = maxtime
177
def __getitem__(self, args):
178
return run([args] if isinstance(args, str) else list(args), maxtime=self.maxtime)
179
sh = SH()
180
181
def process_status(pid, run):
182
"""
183
Return the status of a process, obtained using the ps command.
184
The run option is used to run the command (so it could run on
185
a remote machine). The result is a dictionary; it is empty if
186
the given process is not running.
187
"""
188
fields = ['%cpu', '%mem', 'etime', 'pid', 'start', 'cputime', 'rss', 'vsize']
189
v = run(['ps', '-p', str(int(pid)), '-o', ' '.join(fields)], verbose=False).splitlines()
190
if len(v) <= 1: return {}
191
return dict(zip(fields, v[-1].split()))
192
193
194
def dns(host, timeout=10):
195
"""
196
Return list of ip addresses of a given host. Errors out after timeout seconds.
197
"""
198
a = os.popen3("host -t A -W %s %s | awk '{print $4}'"%(timeout,host))
199
err = a[2].read().strip()
200
if err:
201
raise RuntimeError(err)
202
out = a[1].read()
203
if 'found' in out:
204
raise RuntimeError("unknown domain '%s'"%host)
205
else:
206
return out.split()
207
208
########################################
209
# Standard Python Logging
210
########################################
211
logging.basicConfig()
212
log = logging.getLogger('')
213
#log.setLevel(logging.DEBUG) # WARNING, INFO, etc.
214
log.setLevel(logging.WARNING) # WARNING, INFO, etc.
215
#log.setLevel(logging.INFO) # WARNING, INFO, etc.
216
217
def restrict(path):
218
#log.info("ensuring that '%s' has restrictive permissions", path)
219
if os.stat(path)[stat.ST_MODE] != 0o40700:
220
os.chmod(path, 0o40700)
221
222
def init_data_directory():
223
#log.info("ensuring that '%s' exist", DATA)
224
225
for path in [DATA, PIDS, LOGS]:
226
if not os.path.exists(path):
227
os.makedirs(path)
228
restrict(path)
229
230
#log.info("ensuring that PATH starts with programs in DATA directory")
231
os.environ['PATH'] = os.path.join(DATA, 'local/bin/') + ':' + os.environ['PATH']
232
233
init_data_directory()
234
235
########################################
236
# Misc operating system interaction
237
########################################
238
def system(args):
239
"""
240
Run the command line specified by args (using os.system) and
241
return the stdout and stderr, killing the subprocess if it takes
242
more than maxtime seconds to run. If args is a list of lists, run
243
all the commands separately in the list, returning *sum* of error
244
codes output by os.system.
245
"""
246
if args and isinstance(args[0], list):
247
return sum([system(a) for a in args])
248
249
c = ' '.join([str(x) for x in args])
250
log.info("running '%s' via system", c)
251
return os.system(c)
252
253
def abspath(path='.'):
254
return os.path.abspath(path)
255
256
def kill(pid, signal=15):
257
"""Send signal to the process with pid."""
258
if pid is not None:
259
return run(['kill', '-%s'%signal, pid])
260
261
def copyfile(src, target):
262
return shutil.copyfile(src, target)
263
264
def readfile(filename):
265
"""Read the named file and return its contents."""
266
if not os.path.exists(filename):
267
raise IOError, "no such file or directory: '%s'"%filename
268
try:
269
return open(filename).read()
270
except IOError:
271
pass
272
273
def writefile(filename, content):
274
open(filename,'w').write(content)
275
276
def makedirs(path):
277
if not os.path.exists(path):
278
os.makedirs(path)
279
280
def unlink(filename):
281
os.unlink(filename)
282
283
def path_exists(path):
284
return os.path.exists(path)
285
286
def is_running(pid):
287
try:
288
os.kill(pid, 0)
289
return True
290
except OSError:
291
return False
292
293
########################################
294
# Component: named collection of Process objects
295
########################################
296
297
class Component(object):
298
def __init__(self, id, processes):
299
self._processes = processes
300
self._id = id
301
302
def __repr__(self):
303
return "Component %s with %s processes"%(self._id, len(self._processes))
304
305
def __getitem__(self, i):
306
return self._processes[i]
307
308
def _procs_with_id(self, ids):
309
return [p for p in self._processes if ids is None or p.id() in ids]
310
311
def start(self, ids=None):
312
return [p.start() for p in self._procs_with_id(ids)]
313
314
def stop(self, ids=None):
315
return [p.stop() for p in self._procs_with_id(ids)]
316
317
def reload(self, ids=None):
318
return [p.reload() for p in self._procs_with_id(ids)]
319
320
def restart(self, ids=None):
321
return [p.restart() for p in self._procs_with_id(ids)]
322
323
def status(self, ids=None):
324
return [p.status() for p in self._procs_with_id(ids)]
325
326
327
########################################
328
# Grouped collection of hosts
329
# See the files conf/hosts* for examples.
330
# The format is
331
# [group1]
332
# hostname1
333
# hostname2
334
# [group2]
335
# hostname3
336
# hostname1 # repeats allowed, comments allowed
337
########################################
338
339
def parse_groupfile(filename):
340
groups = {None:[]}
341
group = None
342
group_opts = []
343
ordered_group_names = []
344
if not os.path.exists(filename):
345
return groups, ordered_group_names
346
namespace = {}
347
namespace['os'] = os
348
for r in open(filename).xreadlines():
349
line = r.split('#')[0].strip() # ignore comments and leading/trailing whitespace
350
if line: # ignore blank lines
351
if line.startswith('import ') or '=' in line:
352
# import modules for use in assignments below
353
print "exec ", line
354
exec line in namespace
355
continue
356
357
i = line.find(' ')
358
if i == -1:
359
opts = {}
360
name = line
361
else:
362
name = line[:i]
363
opts = eval(line[i+1:], namespace)
364
if name.startswith('['): # host group
365
group = name.strip(' []')
366
group_opts = opts
367
groups[group] = []
368
ordered_group_names.append(group)
369
else:
370
opts.update(group_opts)
371
groups[group].append((name, opts))
372
for k in sorted(namespace.keys()):
373
if not k.startswith('_') and k not in ['os']:
374
print "%-20s = %s"%(k, namespace[k])
375
return groups, ordered_group_names
376
377
def parse_hosts_file(filename):
378
ip = {} # ip = dictionary mapping from hostname to a list of ip addresses
379
hn = {} # hn = canonical hostnames for each ip address
380
for r in open(filename).readlines():
381
line = r.split('#')[0].strip() # ignore comments and leading/trailing whitespace
382
v = line.split()
383
if len(v) == 0: continue
384
if len(v) <= 1:
385
raise ValueError("parsing hosts file -- invalid line '%s'"%r)
386
address = v[0]
387
hostnames = v[1:]
388
hn[address] = hostnames[-1]
389
for h in hostnames:
390
if len(h) < 1 or len(h) > 63 or not (h.replace('-','').isalnum()):
391
raise RuntimeError("invalid hostname: must be at most 63 characters from a-z, 0-9, or -")
392
if h in ip:
393
ip[h].append(address)
394
else:
395
ip[h] = [address]
396
# make ip address lists canonical
397
ip = dict([(host, list(sorted(set(addresses)))) for host, addresses in ip.iteritems()])
398
return ip, hn
399
400
class Hosts(object):
401
"""
402
Defines a set of hosts on a network and provides convenient tools
403
for running commands on them using ssh.
404
"""
405
def __init__(self, hosts_file, username=whoami, passwd=True, password=None):
406
"""
407
- passwd -- if False, don't ask for a password; in this case nothing must require sudo to
408
run, and all logins must work using ssh with keys
409
"""
410
self._ssh = {}
411
self._username = username
412
self._password = password
413
self._passwd = passwd
414
self._ip_addresses, self._canonical_hostnames = parse_hosts_file(hosts_file)
415
416
def __getitem__(self, hostname):
417
"""
418
Return list of dinstinct ip_address matching the given hostname. If the hostname
419
is an ip address defined in the hosts file, return [hostname].
420
"""
421
v = hostname.split()
422
if len(v) > 1:
423
return list(sorted(set(sum([self[q] for q in v], []))))
424
if hostname in self._canonical_hostnames.keys(): # it is already a known ip address
425
return [hostname]
426
if hostname == 'all': # return all ip addresses
427
return list(sorted(self._canonical_hostnames.keys()))
428
if hostname in self._ip_addresses:
429
return self._ip_addresses[hostname]
430
raise ValueError("unknown ip hostname or address '%s'"%hostname)
431
432
def hostname(self, ip):
433
return self._canonical_hostnames[ip]
434
435
def is_valid_hostname(self, hostname):
436
return hostname in self._canonical_hostnames # ok, since is dictionary mapping hostnames to canonical ones
437
438
def password(self, retry=False):
439
if not self._passwd:
440
log.info("Explicitly skipping asking for password, due to passwd=False option.")
441
return self._password
442
if self._password is None or retry:
443
import getpass
444
self._password = getpass.getpass("%s's password: "%self._username)
445
return self._password
446
447
def ssh(self, hostname, timeout=10, keepalive=None, use_cache=True, username=None):
448
if username is None:
449
username = self._username
450
key = (hostname, username)
451
if use_cache and key in self._ssh:
452
return self._ssh[key]
453
import paramiko
454
ssh = paramiko.SSHClient()
455
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
456
ssh.connect(hostname=hostname, username=username, password=self._password, timeout=timeout)
457
if keepalive:
458
ssh.get_transport().set_keepalive(keepalive)
459
self._ssh[key] = ssh
460
return ssh
461
462
def _do_map(self, callable, address, **kwds):
463
log.info('%s (%s):', address, self.hostname(address))
464
x = callable(address, **kwds)
465
log.info(x)
466
return x
467
468
def map(self, callable, hostname, parallel=True, **kwds):
469
# needed before parallel
470
self.password()
471
def f(address, **kwds):
472
return ((address, self.hostname(address)), self._do_map(callable, address, **kwds))
473
if parallel:
474
return misc.thread_map(f, [((address,), kwds) for address in self[hostname]])
475
else:
476
return [f(address, **kwds) for address in self[hostname]]
477
478
def ping(self, hostname='all', timeout=3, count=3, parallel=True):
479
"""
480
Return list of pairs ((ip, hostname), ping_time) of those that succeed at pinging
481
and a list of pairs ((ip, hostname), False) for those that do not.
482
"""
483
v = self.map(ping, hostname, timeout=timeout, count=count, parallel=parallel)
484
return [x for x in v if x[1] is not False], [x for x in v if x[1] is False]
485
486
def ip_addresses(self, hostname):
487
return [socket.gethostbyname(h) for h in self[hostname]]
488
489
def exec_command(self, hostname, command, sudo=False, timeout=90, wait=True, parallel=True, username=None, verbose=True):
490
def f(hostname):
491
try:
492
return self._exec_command(command, hostname, sudo=sudo, timeout=timeout, wait=wait, username=username, verbose=verbose)
493
except Exception, msg:
494
return {'stdout':'', 'stderr':'Error connecting -- %s: %s'%(hostname, msg)}
495
return dict(self.map(f, hostname=hostname, parallel=parallel))
496
497
def __call__(self, *args, **kwds):
498
"""
499
>>> self(hostname, command)
500
"""
501
result = self.exec_command(*args, **kwds)
502
if kwds.get('verbose',True):
503
for h,v in result.iteritems():
504
print '%s :'%(h,),
505
print v.get('stdout',''),
506
print v.get('stderr',''),
507
print
508
return result
509
510
def _exec_command(self, command, hostname, sudo, timeout, wait, username=None, verbose=True):
511
if not self._passwd:
512
# never use sudo if self._passwd is false...
513
sudo = False
514
start = time.time()
515
ssh = self.ssh(hostname, username=username, timeout=timeout)
516
try:
517
chan = ssh.get_transport().open_session()
518
except:
519
# try again in case if remote machine got rebooted or something...
520
chan = self.ssh(hostname, username=username, timeout=timeout, use_cache=False).get_transport().open_session()
521
stdin = chan.makefile('wb')
522
stdout = chan.makefile('rb')
523
stderr = chan.makefile_stderr('rb')
524
cmd = ('sudo -S bash -c "%s"' % command.replace('"', '\\"')) if sudo else command
525
log.info("hostname=%s, command='%s'", hostname, cmd)
526
chan.exec_command(cmd)
527
if sudo and not stdin.channel.closed:
528
try:
529
print "sending sudo password..."
530
stdin.write('%s\n' % self.password()); stdin.flush()
531
except:
532
pass # could have closed in the meantime if password cached
533
if not wait:
534
return {'stdout':None, 'stderr':None, 'exit_status':None, 'note':"wait=False: '%s'"%cmd}
535
while not stdout.channel.closed:
536
time.sleep(0.05)
537
if time.time() - start >= timeout:
538
raise RuntimeError("on %s@%s command '%s' timed out"%(self._username, hostname, command))
539
return {'stdout':stdout.read(), 'stderr':stderr.read(), 'exit_status':chan.recv_exit_status()}
540
541
542
def public_ssh_keys(self, hostname, timeout=5):
543
return '\n'.join([x['stdout'] for x in self.exec_command(hostname, 'cat .ssh/id_rsa.pub', timeout=timeout).values()])
544
545
def git_pull(self, hostname, repo=GIT_REPO, timeout=60):
546
return self(hostname, 'cd salvus && git pull %s'%repo, timeout=timeout)
547
548
def build(self, hostname, pkg_name, timeout=250):
549
return self(hostname, 'cd $HOME/smc/src && source ./smc-env && ./build.py --build_%s'%pkg_name, timeout=timeout)
550
551
def python_c(self, hostname, cmd, timeout=60, sudo=False, wait=True):
552
command = 'cd \"$HOME/smc/src\" && source ./smc-env && python -c "%s"'%cmd
553
log.info("python_c: %s", command)
554
return self(hostname, command, sudo=sudo, timeout=timeout, wait=wait)
555
556
def apt_upgrade(self, hostname):
557
# some nodes (e.g., sage nodes) have a firewall that disables upgrading via apt,
558
# so we temporarily disable it.
559
try:
560
return self(hostname,'ufw --force disable && apt-get update && apt-get -y upgrade', sudo=True, timeout=120)
561
# very important to re-enable the firewall, no matter what!
562
finally:
563
self(hostname,'ufw --force enable', sudo=True, timeout=120)
564
565
566
def apt_install(self, hostname, pkg):
567
# EXAMPLE: hosts.apt_install('cassandra', 'openjdk-7-jre')
568
try:
569
return self(hostname, 'ufw --force disable && apt-get -y --force-yes install %s'%pkg, sudo=True, timeout=120)
570
finally:
571
self(hostname,'ufw --force enable', sudo=True, timeout=120)
572
573
574
def reboot(self, hostname):
575
return self(hostname, 'reboot -h now', sudo=True, timeout=5)
576
577
def ufw(self, hostname, commands):
578
if self[hostname] == ['127.0.0.1']:
579
print "Not enabling firewall on 127.0.0.1"
580
return
581
cmd = ' && '.join(['/home/salvus/smc/src/scripts/ufw_clear'] + ['ufw disable'] +
582
['ufw default allow incoming'] + ['ufw default allow outgoing'] + ['ufw --force reset']
583
+ ['ufw ' + c for c in commands] +
584
(['ufw --force enable'] if commands else []))
585
return self(hostname, cmd, sudo=True, timeout=10, wait=False)
586
587
588
#########################################################
589
# SFTP support
590
#########################################################
591
def put(self, hostname, local_filename, remote_filename=None, timeout=5):
592
if remote_filename is None:
593
remote_filename = local_filename
594
for hostname in self[hostname]:
595
sftp = self.ssh(hostname, timeout=timeout).open_sftp()
596
log.info('put: %s --> %s:%s', local_filename, hostname, remote_filename)
597
sftp.put(local_filename, remote_filename)
598
599
def putdir(self, hostname, local_path, remote_containing_path='.', timeout=5):
600
# recursively copy over the local_path directory tree so that it is contained
601
# in remote_containing_path on the target
602
for hostname in self[hostname]:
603
sftp = self.ssh(hostname, timeout=timeout).open_sftp()
604
self._mkdir(sftp, remote_containing_path)
605
for dirpath, dirnames, filenames in os.walk(local_path):
606
print dirpath, dirnames, filenames
607
self._mkdir(sftp, os.path.join(remote_containing_path, dirpath))
608
for name in filenames:
609
local = os.path.join(dirpath, name)
610
remote = os.path.join(remote_containing_path, dirpath, name)
611
log.info('put: %s --> %s:%s', local, hostname, remote)
612
sftp.put(local, remote)
613
614
def get(self, hostname, remote_filename, local_filename=None, timeout=5):
615
if local_filename is None:
616
local_filename = remote_filename
617
ssh = self.ssh(hostname, timeout=timeout)
618
sftp = ssh.open_sftp()
619
sftp.get(remote_filename, local_filename)
620
# If I want to implement recursive get of directory: http://stackoverflow.com/questions/6674862/recursive-directory-download-with-paramiko
621
622
def rmdir(self, hostname, path, timeout=10):
623
# this is a very dangerous function!
624
self(hostname, 'rm -rf "%s"'%path, timeout=timeout)
625
626
def _mkdir(self, sftp, path, mode=0o40700):
627
try:
628
sftp.mkdir(path, mode)
629
except IOError:
630
from stat import S_ISDIR
631
if not S_ISDIR(sftp.stat(path).st_mode):
632
raise IOError("remote '%s' (on %s) exists and is not a path"%(path, hostname))
633
634
635
def mkdir(self, hostname, path, timeout=10, mode=0o40700): # default mode is restrictive=user only, on general principle.
636
for hostname in self[hostname]:
637
ssh = self.ssh(hostname, timeout=timeout)
638
sftp = ssh.open_sftp()
639
self._mkdir(sftp, path, mode)
640
641
def unlink(self, hostname, filename, timeout=10):
642
for hostname in self[hostname]:
643
ssh = self.ssh(hostname, timeout=timeout)
644
sftp = ssh.open_sftp()
645
try:
646
sftp.remove(filename)
647
except:
648
pass # file doesn't exist
649
650
class Monitor(object):
651
def __init__(self, hosts, services):
652
self._hosts = hosts
653
self._services = services # used for self-healing
654
655
def compute(self):
656
ans = []
657
c = 'nproc && uptime && free -g && nprojects && cd smc/src; source smc-env'
658
for k, v in self._hosts('compute', c, wait=True, parallel=True, timeout=120).iteritems():
659
d = {'host':k[0], 'service':'compute'}
660
stdout = v.get('stdout','')
661
m = stdout.splitlines()
662
if v.get('exit_status',1) != 0 or len(m) < 7:
663
d['status'] = 'down'
664
else:
665
d['status'] = 'up'
666
d['nproc'] = int(m[0])
667
z = m[1].replace(',','').split()
668
d['load1'] = float(z[-3]) / d['nproc']
669
d['load5'] = float(z[-2]) / d['nproc']
670
d['load15'] = float(z[-1]) / d['nproc']
671
z = m[3].split()
672
d['ram_used_GB'] = int(z[2])
673
d['ram_free_GB'] = int(z[3])
674
d['nprojects'] = int(m[6])
675
ans.append(d)
676
w = [(-d.get('load15',0), d) for d in ans]
677
w.sort()
678
return [y for x,y in w]
679
680
def nettest(self):
681
# Verify that outbound network access is blocked for the nettest user, which was created
682
# specifically for this test, and gets the same firewall treatment as all other users except
683
# salvus/root. We actually just test google.com, but odds are that if the firewall were
684
# broken, it would at least let that through.
685
ans = []
686
c = "ping -c 1 -W 1 google.com"
687
for k, v in self._hosts('compute', c, wait=True, parallel=True, timeout=120, username='nettest').iteritems():
688
if "Operation not permitted" not in v.get('stderr',''):
689
status = 'down'
690
else:
691
status = 'up'
692
d = {'host':k[0], 'service':'nettest', 'status':status}
693
ans.append(d)
694
return ans
695
696
def database(self):
697
ans = []
698
c = 'pidof postgres'
699
for k, v in self._hosts('database', c, wait=True, parallel=True, timeout=120).iteritems():
700
d = {'host':k[0], 'service':'database'}
701
if v.get('exit_status',1) != 0 :
702
d['status'] = 'down'
703
else:
704
d['status'] = 'up'
705
ans.append(d)
706
return ans
707
708
def hub(self):
709
ans = []
710
cmd = 'export TERM=vt100; cd smc/src && source smc-env && check_hub && check_hub_block |tail -1'
711
for k, v in self._hosts('hub', cmd, wait=True, parallel=True, timeout=60).iteritems():
712
d = {'host':k[0], 'service':'hub'}
713
if v['exit_status'] != 0 or v['stderr']:
714
d['status'] = 'down'
715
continue
716
for x in v['stdout'].splitlines()[:5]:
717
i = x.find(' ')
718
if i != -1:
719
d[x[:i]] = x[i:].strip()
720
if 'sign_in_timeouts' in d:
721
d['sign_in_timeouts'] = int(d['sign_in_timeouts'])
722
if 'db_errors' in d:
723
d['db_errors'] = int(d['db_errors'])
724
if 'concurrent_warn' in d:
725
d['concurrent_warn'] = int(d['concurrent_warn'])
726
d['status'] = 'up'
727
if d['etime'] == 'ELAPSED':
728
d['status'] = 'down'
729
if d['sign_in_timeouts'] > 4:
730
d['status'] = 'down' # demands attention!
731
if d['db_errors'] > 0:
732
d['status'] = 'down' # demands attention!
733
if d['concurrent_warn'] > 0:
734
d['status'] = 'down' # demands attention!
735
try:
736
d['block'] = int(v['stdout'].splitlines()[3].split()[-1].rstrip('ms'))
737
if d['block'] > 15000:
738
d['status'] = 'down' # demands attention!
739
except: pass
740
ans.append(d)
741
def f(x,y):
742
if x['status'] == 'down':
743
return -1
744
if y['status'] == 'down':
745
return 1
746
if 'loadavg' in x and 'loadavg' in y:
747
return -cmp(float(x['loadavg'].split()[0]), float(y['loadavg'].split()[0]))
748
return -1
749
ans.sort(f)
750
return ans
751
752
def load(self):
753
"""
754
Return normalized load on *everything*, sorted by highest current load first.
755
"""
756
ans = []
757
for k, v in self._hosts('all', 'nproc && uptime', parallel=True, wait=True, timeout=80).iteritems():
758
d = {'host':k[0]}
759
m = v.get('stdout','').splitlines()
760
if v.get('exit_status',1) != 0 or len(m) < 2:
761
d['status'] = 'down'
762
else:
763
d['status'] = 'up'
764
d['nproc'] = int(m[0])
765
z = m[1].replace(',','').split()
766
d['load1'] = float(z[-3])/d['nproc']
767
d['load5'] = float(z[-2])/d['nproc']
768
d['load15'] = float(z[-1])/d['nproc']
769
ans.append(d)
770
w = [(-d['load15'], d) for d in ans]
771
w.sort()
772
return [y for x,y in w]
773
774
def pingall(self, hosts='all', on=None):
775
v = []
776
for x in hosts.split():
777
try:
778
v += self._hosts[x]
779
except ValueError:
780
v.append(x)
781
c = 'pingall ' + ' '.join(v)
782
if on is not None:
783
c = 'ssh %s "cd smc/src && source smc-env && %s"'%(on, c)
784
print c
785
s = os.popen(c).read()
786
print s
787
return json.loads(s)
788
789
def disk_usage(self, hosts='all', disk_threshold=DISK_THRESHOLD):
790
"""
791
Verify that no disk is more than disk_threshold (=disk_threshold%).
792
"""
793
cmd = "df --output=pcent,source |grep -v fuse | sort -n|tail -1"
794
ans = []
795
for k, v in self._hosts(hosts, cmd, parallel=True, wait=True, timeout=30).iteritems():
796
d = {'host':k[0], 'service':'disk_usage'}
797
percent = int((v.get('stdout','100') + ' 0').split()[0].strip().strip('%'))
798
d['percent'] = percent
799
if percent > disk_threshold:
800
d['status'] = 'down'
801
print k,v
802
else:
803
d['status'] = 'up'
804
ans.append(d)
805
w = [((-d['percent'],d['host']),d) for d in ans]
806
w.sort()
807
return [y for x,y in w]
808
809
def dns(self, hosts='all', rounds=1):
810
"""
811
Verify that DNS is working well on all machines.
812
"""
813
cmd = '&&'.join(["host -v google.com > /dev/null"]*rounds) + "; echo $?"
814
ans = []
815
exclude = set([]) # set(self._hosts['cellserver']) # + self._hosts['webdev'])
816
h = ' '.join([host for host in self._hosts[hosts] if host not in exclude])
817
if not h:
818
return []
819
for k, v in self._hosts(h, cmd, parallel=True, wait=True, timeout=30).iteritems():
820
d = {'host':k[0], 'service':'dns'}
821
exit_code = v.get('stdout','').strip()
822
if exit_code == '':
823
exit_code = '1'
824
if exit_code=='1' or v.get('exit_status',1) != 0:
825
d['status'] = 'down'
826
print k,v
827
else:
828
d['status'] = 'up'
829
ans.append(d)
830
w = [((d.get('status','down'),d['host']),d) for d in ans]
831
w.sort()
832
return [y for x,y in w]
833
834
def stats(self, timeout=90):
835
"""
836
Get all ip addresses that SITENAME resolves to, then verify that https://ip_address/stats returns
837
valid data, for each ip. This tests that all stunnel and haproxy servers are running.
838
839
NOTE: now that we use cloudflare this test is no longer possible.
840
"""
841
ans = []
842
import urllib2, ssl
843
ctx = ssl.create_default_context() # see http://stackoverflow.com/questions/19268548/python-ignore-certicate-validation-urllib2
844
ctx.check_hostname = False
845
ctx.verify_mode = ssl.CERT_NONE
846
try:
847
for ip_address in dns(SITENAME, timeout):
848
entry = {'host':ip_address, 'service':'stats'}
849
ans.append(entry)
850
try:
851
# site must return and be valid json
852
json.loads(urllib2.urlopen('https://%s/stats'%ip_address, timeout=timeout, context=ctx).read())
853
entry['status'] = 'up'
854
except: # urllib2.URLError: # there are other possible errors
855
entry['status'] = 'down'
856
except (RuntimeError, ValueError):
857
ans = [{'host':SITENAME, 'service':'stats', 'status':'down'}]
858
859
w = [(d.get('status','down'),d) for d in ans]
860
w.sort()
861
return [y for x,y in w]
862
863
def ignored_storage_requests(self):
864
try:
865
n = int(os.popen('ignored_storage_requests').read().strip())
866
except:
867
n = 10000
868
if n > 10:
869
status = 'down'
870
else:
871
status = 'up'
872
return [{"ignored_storage_requests":n, 'status':status}]
873
874
def all(self):
875
return {
876
'timestamp' : time.time(),
877
'disk_usage' : self.disk_usage(),
878
'dns' : self.dns(),
879
'load' : self.load(),
880
#'hub' : self.hub(),
881
#'stats' : self.stats(), # disabled due to using cloudflare.
882
'compute' : self.compute(),
883
'nettest' : self.nettest(),
884
'database' : self.database(),
885
'storage' : self.ignored_storage_requests()
886
}
887
888
def down(self, all):
889
# Make a list of down services
890
down = []
891
for service, v in all.iteritems():
892
if isinstance(v, list):
893
for x in v:
894
if x.get('status','') == 'down':
895
down.append(x)
896
return down
897
898
def print_status(self, all=None, n=9):
899
if all is None:
900
all = self.all( )
901
902
print "TIME: " + time.strftime("%Y-%m-%d %H:%M:%S")
903
904
#print "DNS"
905
#for x in all['dns'][:n]:
906
# print x
907
908
#print "HUB"
909
#for x in all['hub'][:n]:
910
# print x
911
912
print "DATABASE"
913
for x in all['database'][:n]:
914
print x
915
916
print "DISK USAGE"
917
for x in all['disk_usage'][:n]:
918
print x
919
920
print "LOAD"
921
for x in all['load'][:n]:
922
print x
923
924
#print "STATS"
925
#for x in all['stats'][:n]:
926
# print x
927
928
print "COMPUTE"
929
vcompute = all['compute']
930
print "%s projects running"%(sum([x.get('nprojects',0) for x in vcompute]))
931
for x in all['compute'][:n]:
932
print x
933
934
if 'storage' in all:
935
print "IGNORED_STORAGE_REQUESTS"
936
print all['storage'][0]['ignored_storage_requests']
937
938
def _go(self):
939
all = self.all()
940
self.print_status(all=all)
941
down = self.down(all=all)
942
m = ''
943
if len(down) > 0:
944
m += "The following are down: %s"%down
945
for x in all['load']:
946
if x['load15'] > 400:
947
m += "A machine is going *crazy* with load!: %s"%x
948
#for x in all['zfs']:
949
# if x['nproc'] > 10000:
950
# m += "Large amount of ZFS: %s"%x
951
if m:
952
try:
953
email(m, subject="SMC issue")
954
except Exception, msg:
955
print "Failed to send email! -- %s\n%s"%(msg, m)
956
957
def go(self, interval=5, residue=0):
958
"""
959
Run a full monitor scan when the current time in *minutes* since the epoch
960
is congruent to residue modulo interval.
961
"""
962
self._services._hosts.password() # ensure known for self-healing
963
import time
964
last_time = 0
965
i = 0
966
while True:
967
now = int(time.time()/60) # minutes since epoch
968
if now != last_time:
969
#print "%s minutes since epoch"%now
970
if now % interval == residue:
971
last_time = now
972
try:
973
self._go()
974
except:
975
print sys.exc_info()[:2]
976
print "ERROR"
977
try:
978
self._go()
979
except:
980
print sys.exc_info()[:2]
981
print "ERROR"
982
time.sleep(20)
983
984
class Services(object):
985
def __init__(self, path, username=whoami, keyspace='salvus', passwd=True, password=""):
986
"""
987
- passwd -- if False, don't ask for a password; in this case nothing must require sudo to
988
run, and all logins must work using ssh with keys
989
"""
990
self._keyspace = keyspace
991
self._path = path
992
self._username = username
993
self._hosts = Hosts(os.path.join(path, 'hosts'), username=username, passwd=passwd, password=password)
994
995
self._services, self._ordered_service_names = parse_groupfile(os.path.join(path, 'services'))
996
del self._services[None]
997
998
self.monitor = Monitor(Hosts(os.path.join(path, 'hosts'), username=username, passwd=False), services = self)
999
1000
# this is the canonical list of options, expanded out by service and host.
1001
def hostopts(service, query='all', copy=True):
1002
"""Return list of pairs (hostname, options) defined in the services file, where
1003
the hostname matches the given hostname/group"""
1004
restrict = set(self._hosts[query])
1005
return sum([[(h, dict(opts) if copy else opts) for h in self._hosts[query] if h in restrict]
1006
for query, opts in self._services[service]], [])
1007
1008
self._options = dict([(service, hostopts(service)) for service in self._ordered_service_names])
1009
1010
def _all(self, callable, reverse=False):
1011
names = self._ordered_service_names
1012
return dict([(s, callable(s)) for s in (reversed(names) if reverse else names)])
1013
1014
def start(self, service, host='all', wait=True, parallel=False, **opts):
1015
if service == 'all':
1016
return self._all(lambda x: self.start(x, host=host, wait=wait, **opts), reverse=False)
1017
return self._action(service, 'start', host, opts, wait=wait, parallel=parallel)
1018
1019
def stop(self, service, host='all', wait=True, parallel=False, **opts):
1020
if service == 'all':
1021
return self._all(lambda x: self.stop(x, host=host, wait=wait, **opts), reverse=True)
1022
return self._action(service, 'stop', host, opts, wait, parallel=parallel)
1023
1024
def status(self, service, host='all', wait=True, parallel=False, **opts):
1025
if service == 'all':
1026
return self._all(lambda x: self.status(x, host=host, wait=True, **opts), reverse=False)
1027
return self._action(service, 'status', host, opts, wait=True, parallel=parallel)
1028
1029
def restart(self, service, host='all', wait=True, reverse=True, parallel=False, **opts):
1030
if service == 'all':
1031
return self._all(lambda x: self.restart(x, host=host, reverse=reverse, wait=wait, **opts), reverse=reverse)
1032
return self._action(service, 'restart', host, opts, wait, parallel=parallel)
1033
1034
def wait_until_up(self, host='all'):
1035
while True:
1036
v = self._hosts.ping(host)[1]
1037
if not v: return
1038
log.info("Waiting for %s"%(v,))
1039
1040
1041
def _action(self, service, action, host, opts, wait, parallel):
1042
if service not in self._services:
1043
raise ValueError("unknown service '%s'"%service)
1044
1045
1046
name = service.capitalize()
1047
def db_string(address):
1048
return ""
1049
1050
v = self._hostopts(service, host, opts)
1051
1052
self._hosts.password() # can't get password in thread
1053
1054
w = [((name, action, address, options, db_string(address), wait),{}) for address, options in v]
1055
if parallel:
1056
return misc.thread_map(self._do_action, w)
1057
else:
1058
return [self._do_action(*args, **kwds) for args, kwds in w]
1059
1060
def _hostopts(self, service, hostname, opts):
1061
"""
1062
Return copy of pairs (hostname, options_dict) for the given
1063
service, restricted by the given hostname.
1064
"""
1065
hosts = set(self._hosts[hostname])
1066
opts1 = set(opts.iteritems())
1067
return [(h,dict(o)) for h,o in self._options[service] if h in hosts and opts1.issubset(set([(x,y) for x, y in o.iteritems() if x in opts]))]
1068
1069
def _do_action(self, name, action, address, options, db_string, wait):
1070
1071
if 'sudo' in options:
1072
sudo = True
1073
del options['sudo']
1074
else:
1075
sudo = False
1076
if 'timeout' in options:
1077
timeout = options['timeout']
1078
del options['timeout']
1079
else:
1080
timeout = 60
1081
1082
for t in ['hub', 'nginx', 'proxy']:
1083
s = '%s_servers'%t
1084
if s in options:
1085
# restrict to the subset of servers in the same data center
1086
dc = self.ip_address_to_dc(address)
1087
options[s] = [dict(x) for x in options[s] if self.ip_address_to_dc(x['ip']) == dc]
1088
# turn the ip's into hostnames
1089
for x in options[s]:
1090
x['ip'] = self._hosts.hostname(x['ip'])
1091
1092
if 'id' not in options:
1093
options['id'] = 0
1094
if 'monitor_database' in options:
1095
db_string = ''
1096
elif db_string.strip():
1097
db_string = db_string + ', '
1098
1099
cmd = "import admin; print admin.%s(%s**%r).%s()"%(name, db_string, options, action)
1100
1101
ret = self._hosts.python_c(address, cmd, sudo=sudo, timeout=timeout, wait=wait)
1102
1103
if name == "Compute":
1104
log.info("Recording compute server in database")
1105
# TODO...
1106
1107
return (address, self._hosts.hostname(address), options, ret)
1108
1109
1110