Contact
CoCalc Logo Icon
StoreFeaturesDocsShareSupport News AboutSign UpSign In
| Download
Views: 39558
1
#!/usr/bin/env python
2
3
###############################################################################
4
#
5
# CoCalc: Collaborative Calculation in the Cloud
6
#
7
# Copyright (C) 2016, Sagemath Inc.
8
#
9
# This program is free software: you can redistribute it and/or modify
10
# it under the terms of the GNU General Public License as published by
11
# the Free Software Foundation, either version 3 of the License, or
12
# (at your option) any later version.
13
#
14
# This program is distributed in the hope that it will be useful,
15
# but WITHOUT ANY WARRANTY; without even the implied warranty of
16
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17
# GNU General Public License for more details.
18
#
19
# You should have received a copy of the GNU General Public License
20
# along with this program. If not, see <http://www.gnu.org/licenses/>.
21
#
22
###############################################################################
23
24
25
# used in naming streams -- changing this would break all existing data...
26
TO = "-to-"
27
28
# appended to end of snapshot name to make it persistent (never automatically deleted)
29
PERSIST = "-persist"
30
31
TIMESTAMP_FORMAT = "%Y-%m-%d-%H%M%S"
32
33
# This is the quota for the .smc directory; must be
34
# significantly bigger than that directory, and hold user logs.
35
SMC_TEMPLATE_QUOTA = '1000m'
36
37
USER_SWAP_MB = 1000 # amount of swap users get
38
39
import errno, hashlib, json, math, os, platform, re, shutil, signal, socket, stat, sys, tempfile, time, uuid
40
41
from subprocess import Popen, PIPE
42
43
TIMESTAMP_FORMAT = "%Y-%m-%d-%H%M%S"
44
USER_SWAP_MB = 1000 # amount of swap users get in addition to how much RAM they have.
45
PLATFORM = platform.system().lower()
46
PROJECTS = '/projects'
47
48
def quota_to_int(x):
49
return int(math.ceil(x))
50
51
def log(s, *args):
52
if args:
53
try:
54
s = str(s%args)
55
except Exception, mesg:
56
s = str(mesg) + str(s)
57
sys.stderr.write(s+'\n')
58
sys.stderr.flush()
59
60
def cmd(s, ignore_errors=False, verbose=2, timeout=None, stdout=True, stderr=True):
61
if isinstance(s, list):
62
s = [str(x) for x in s]
63
if verbose >= 1:
64
if isinstance(s, list):
65
t = [x if len(x.split()) <=1 else "'%s'"%x for x in s]
66
log(' '.join(t))
67
else:
68
log(s)
69
t = time.time()
70
71
mesg = "ERROR"
72
if timeout:
73
mesg = "TIMEOUT: running '%s' took more than %s seconds, so killed"%(s, timeout)
74
def handle(*a):
75
if ignore_errors:
76
return mesg
77
else:
78
raise KeyboardInterrupt(mesg)
79
signal.signal(signal.SIGALRM, handle)
80
signal.alarm(timeout)
81
try:
82
out = Popen(s, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=not isinstance(s, list))
83
x = out.stdout.read() + out.stderr.read()
84
e = out.wait() # this must be *after* the out.stdout.read(), etc. above or will hang when output large!
85
if e:
86
if ignore_errors:
87
return (x + "ERROR").strip()
88
else:
89
raise RuntimeError(x)
90
if verbose>=2:
91
log("(%s seconds): %s", time.time()-t, x[:500])
92
elif verbose >= 1:
93
log("(%s seconds)", time.time()-t)
94
return x.strip()
95
except IOError:
96
return mesg
97
finally:
98
if timeout:
99
signal.signal(signal.SIGALRM, signal.SIG_IGN) # cancel the alarm
100
101
def check_uuid(u):
102
try:
103
assert uuid.UUID(u).get_version() == 4
104
except (AssertionError, ValueError), mesg:
105
raise RuntimeError("invalid uuid (='%s')"%u)
106
107
def uid(project_id):
108
# We take the sha-512 of the uuid just to make it harder to force a collision. Thus even if a
109
# user could somehow generate an account id of their choosing, this wouldn't help them get the
110
# same uid as another user.
111
# 2^31-1=max uid which works with FUSE and node (and Linux, which goes up to 2^32-2).
112
n = int(hashlib.sha512(project_id).hexdigest()[:8], 16) # up to 2^32
113
n //= 2 # up to 2^31 (floor div so will work with python3 too)
114
return n if n>65537 else n+65537 # 65534 used by linux for user sync, etc.
115
116
117
def thread_map(callable, inputs):
118
"""
119
Computing [callable(args) for args in inputs]
120
in parallel using len(inputs) separate *threads*.
121
122
If an exception is raised by any thread, a RuntimeError exception
123
is instead raised.
124
"""
125
log("Doing the following in parallel:\n%s", '\n'.join([str(x) for x in inputs]))
126
from threading import Thread
127
class F(Thread):
128
def __init__(self, x):
129
self._x = x
130
Thread.__init__(self)
131
self.start()
132
def run(self):
133
try:
134
self.result = callable(self._x)
135
self.fail = False
136
except Exception, msg:
137
self.result = msg
138
self.fail = True
139
results = [F(x) for x in inputs]
140
for f in results: f.join()
141
e = [f.result for f in results if f.fail]
142
if e: raise RuntimeError(e)
143
return [f.result for f in results]
144
145
class Project(object):
146
def __init__(self,
147
project_id, # v4 uuid string
148
dev = False, # if true, use special devel mode where everything run as same user (no sudo needed); totally insecure!
149
projects = PROJECTS,
150
single = False,
151
kucalc = False
152
):
153
self._dev = dev
154
self._single = single
155
self._kucalc = kucalc
156
if kucalc:
157
projects = '/home'
158
check_uuid(project_id)
159
if not os.path.exists(projects):
160
if self._dev:
161
os.makedirs(projects)
162
else:
163
raise RuntimeError("mount point %s doesn't exist"%projects)
164
self.project_id = project_id
165
self._projects = projects
166
if self._kucalc:
167
self.project_path = os.environ['HOME']
168
else:
169
self.project_path = os.path.join(self._projects, project_id)
170
self.smc_path = os.path.join(self.project_path, '.smc')
171
self.forever_path = os.path.join(self.project_path, '.forever')
172
self.uid = uid(self.project_id)
173
self.username = self.project_id.replace('-','')
174
self.open_fail_file = os.path.join(self.project_path, '.sagemathcloud-open-failed')
175
176
def _log(self, name=""):
177
def f(s='', *args):
178
log("Project(project_id=%s).%s(...): "%(self.project_id, name) + s, *args)
179
return f
180
181
def cmd(self, *args, **kwds):
182
log("Project(project_id=%s).cmd(...): ", self.project_id)
183
return cmd(*args, **kwds)
184
185
###
186
# Users Management
187
###
188
189
def create_user(self, login_shell='/bin/bash'):
190
if not os.path.exists(self.project_path):
191
os.makedirs(self.project_path)
192
self.chown(self.project_path) # only chown if just made; it's recursive and can be very expensive in general!
193
if self._dev:
194
return
195
cmd(['/usr/sbin/groupadd', '-g', self.uid, '-o', self.username], ignore_errors=True)
196
cmd(['/usr/sbin/useradd', '-u', self.uid, '-g', self.uid, '-o', self.username,
197
'-d', self.project_path, '-s', login_shell], ignore_errors=True)
198
199
def delete_user(self):
200
if self._dev:
201
return
202
cmd(['/usr/sbin/userdel', self.username], ignore_errors=True)
203
cmd(['/usr/sbin/groupdel', self.username], ignore_errors=True)
204
if os.path.exists('/etc/cgrules.conf'):
205
c = open("/etc/cgrules.conf").read()
206
i = c.find(self.username)
207
if i != -1:
208
j = c[i:].find('\n')
209
if j == -1:
210
j = len(c)
211
else:
212
j += i
213
open("/etc/cgrules.conf",'w').write(c[:i]+c[j+1:])
214
215
def pids(self):
216
return [int(x) for x in self.cmd(['pgrep', '-u', self.uid], ignore_errors=True).replace('ERROR','').split()]
217
218
def num_procs(self):
219
return len(self.pids())
220
221
def killall(self, grace_s=0.5, max_tries=15):
222
log = self._log('killall')
223
if self._dev:
224
self.dev_env()
225
os.chdir(self.project_path)
226
self.cmd("smc-local-hub stop")
227
self.cmd("smc-console-server stop")
228
self.cmd("smc-sage-server stop")
229
return
230
231
log("killing all processes by user with id %s"%self.uid)
232
# we use both kill and pkill -- pkill seems better in theory, but I've definitely seen it get ignored.
233
for i in range(max_tries):
234
n = self.num_procs()
235
log("kill attempt left %s procs"%n)
236
if n == 0:
237
return
238
self.cmd(['/usr/bin/killall', '-u', self.username], ignore_errors=True)
239
self.cmd(['/usr/bin/pkill', '-u', self.uid], ignore_errors=True)
240
time.sleep(grace_s)
241
self.cmd(['/usr/bin/killall', '-9', '-u', self.username], ignore_errors=True)
242
self.cmd(['/usr/bin/pkill', '-9', '-u', self.uid], ignore_errors=True)
243
log("WARNING: failed to kill all procs after %s tries"%max_tries)
244
245
def chown(self, path, recursive=True):
246
if self._dev:
247
return
248
if recursive:
249
cmd(["chown", "%s:%s"%(self.uid, self.uid), '-R', path])
250
else:
251
cmd(["chown", "%s:%s"%(self.uid, self.uid), path])
252
253
def ensure_file_exists(self, src, target):
254
target = os.path.abspath(target)
255
if not os.path.exists(target):
256
self.makedirs(os.path.split(target)[0])
257
shutil.copyfile(src, target)
258
if USERNAME == "root":
259
os.chown(target, self.uid, self.uid)
260
261
def create_smc_path(self):
262
if not os.path.exists(self.smc_path):
263
os.makedirs(self.smc_path)
264
self.chown(self.smc_path)
265
self.ensure_conf_files_exist()
266
267
def ensure_conf_files_exist(self):
268
for filename in ['bashrc', 'bash_profile']:
269
target = os.path.join(self.project_path, '.' + filename)
270
if not os.path.exists(target):
271
source = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'templates', PLATFORM, filename)
272
if os.path.exists(source):
273
shutil.copyfile(source, target)
274
if not self._dev:
275
os.chown(target, self.uid, self.uid)
276
277
def remove_forever_path(self):
278
if os.path.exists(self.forever_path):
279
shutil.rmtree(self.forever_path, ignore_errors=True)
280
281
def remove_smc_path(self):
282
# do our best to remove the smc path
283
if os.path.exists(self.smc_path):
284
shutil.rmtree(self.smc_path, ignore_errors=True)
285
286
def disk_quota(self, quota=0): # quota in megabytes
287
try:
288
quota = quota_to_int(quota)
289
# requires quotas to be setup as explained nicely at
290
# https://www.digitalocean.com/community/tutorials/how-to-enable-user-and-group-quotas
291
# and https://askubuntu.com/questions/109585/quota-format-not-supported-in-kernel/165298#165298
292
# This sets the quota on all mounted filesystems:
293
cmd(['setquota', '-u', self.username, quota*1000, quota*1200, 1000000, 1100000, '-a'])
294
except Exception, mesg:
295
log("WARNING -- quota failure %s", mesg)
296
297
def compute_quota(self, cores, memory, cpu_shares):
298
"""
299
cores - number of cores (float)
300
memory - megabytes of RAM (int)
301
cpu_shares - determines relative share of cpu (e.g., 256=most users)
302
"""
303
if self._dev:
304
return
305
cfs_quota = int(100000*cores)
306
307
group = "memory,cpu:%s"%self.username
308
try:
309
self.cmd(["cgcreate", "-g", group])
310
except:
311
if os.system("cgcreate") != 0:
312
# cgroups not installed
313
return
314
else:
315
raise
316
if memory:
317
memory = quota_to_int(memory)
318
open("/sys/fs/cgroup/memory/%s/memory.limit_in_bytes"%self.username,'w').write("%sM"%memory)
319
open("/sys/fs/cgroup/memory/%s/memory.memsw.limit_in_bytes"%self.username,'w').write("%sM"%(USER_SWAP_MB + memory))
320
if cpu_shares:
321
cpu_shares = quota_to_int(cpu_shares)
322
open("/sys/fs/cgroup/cpu/%s/cpu.shares"%self.username,'w').write(str(cpu_shares))
323
if cfs_quota:
324
open("/sys/fs/cgroup/cpu/%s/cpu.cfs_quota_us"%self.username,'w').write(str(cfs_quota))
325
326
z = "\n%s cpu,memory %s\n"%(self.username, self.username)
327
cur = open("/etc/cgrules.conf").read() if os.path.exists("/etc/cgrules.conf") else ''
328
329
if z not in cur:
330
open("/etc/cgrules.conf",'a').write(z)
331
try:
332
pids = self.cmd("ps -o pid -u %s"%self.username, ignore_errors=False).split()[1:]
333
self.cmd(["cgclassify", "-g", group] + pids, ignore_errors=True)
334
# ignore cgclassify errors, since processes come and go, etc.
335
except:
336
pass # ps returns an error code if there are NO processes at all
337
338
def cgclassify(self):
339
try:
340
pids = self.cmd("ps -o pid -u %s"%self.username, ignore_errors=False).split()[1:]
341
self.cmd(["cgclassify"] + pids, ignore_errors=True)
342
# ignore cgclassify errors, since processes come and go, etc.":
343
except:
344
# ps returns an error code if there are NO processes at all (a common condition).
345
pids = []
346
347
def create_project_path(self):
348
if not os.path.exists(self.project_path):
349
os.makedirs(self.project_path)
350
if not self._dev:
351
os.chown(self.project_path, self.uid, self.uid)
352
353
def remove_snapshots_path(self):
354
"""
355
Remove the ~/.snapshots path
356
"""
357
p = os.path.join(self.project_path, '.snapshots')
358
if os.path.exists(p):
359
shutil.rmtree(p, ignore_errors=True)
360
361
def ensure_bashrc(self):
362
# ensure .bashrc has certain properties
363
bashrc = os.path.join(self.project_path, '.bashrc')
364
if not os.path.exists(bashrc):
365
return
366
s = open(bashrc).read()
367
changed = False
368
if '.sagemathcloud' in s:
369
s = '\n'.join([y for y in s.splitlines() if '.sagemathcloud' not in y])
370
changed = True
371
if 'SAGE_ATLAS_LIB' not in s:
372
s += '\nexport SAGE_ATLAS_LIB=/usr/lib/ # do not build ATLAS\n\n'
373
changed = True
374
if '$HOME/bin:$HOME/.local/bin' not in s:
375
s += '\nexport PATH=$HOME/bin:$HOME/.local/bin:$PATH\n\n'
376
changed = True
377
if changed:
378
open(bashrc,'w').write(s)
379
380
def dev_env(self):
381
os.environ['PATH'] = "{salvus_root}/smc-project/bin:{salvus_root}/smc_pyutil/smc_pyutil:{path}".format(
382
salvus_root=os.environ['SALVUS_ROOT'], path=os.environ['PATH'])
383
os.environ['PYTHONPATH'] = "{home}/.local/lib/python2.7/site-packages".format(home=os.environ['HOME'])
384
os.environ['SMC_LOCAL_HUB_HOME'] = self.project_path
385
os.environ['SMC_HOST'] = 'localhost'
386
os.environ['SMC'] = self.smc_path
387
388
# for development, the raw server, jupyter, etc., have to listen on localhost since that is where
389
# the hub is running
390
os.environ['SMC_PROXY_HOST'] = 'localhost'
391
392
def start(self, cores, memory, cpu_shares, base_url):
393
self.remove_smc_path() # start can be prevented by massive logs in ~/.smc; if project not stopped via stop, then they will still be there.
394
self.ensure_bashrc()
395
self.remove_forever_path() # probably not needed anymore
396
self.remove_snapshots_path()
397
self.create_user()
398
self.create_smc_path()
399
self.chown(self.project_path, False) # Sometimes /projects/[project_id] doesn't have group/owner equal to that of the project.
400
401
os.environ['SMC_BASE_URL'] = base_url
402
403
if self._dev:
404
self.dev_env()
405
os.chdir(self.project_path)
406
self.cmd("smc-local-hub start")
407
def started():
408
return os.path.exists("%s/local_hub/local_hub.port"%self.smc_path)
409
i=0
410
while not started():
411
time.sleep(0.1)
412
i += 1
413
sys.stdout.flush()
414
if i >= 100:
415
return
416
return
417
418
pid = os.fork()
419
if pid == 0:
420
try:
421
os.nice(-os.nice(0)) # Reset nice-ness to 0
422
os.setgroups([]) # Drops other groups, like root or sudoers
423
os.setsid() # Make it a session leader
424
os.setgid(self.uid)
425
os.setuid(self.uid)
426
427
try:
428
# Fork a second child and exit immediately to prevent zombies. This
429
# causes the second child process to be orphaned, making the init
430
# process responsible for its cleanup.
431
pid = os.fork()
432
except OSError, e:
433
raise Exception, "%s [%d]" % (e.strerror, e.errno)
434
435
if pid == 0:
436
os.environ['HOME'] = self.project_path
437
os.environ['SMC'] = self.smc_path
438
os.environ['USER'] = os.environ['USERNAME'] = os.environ['LOGNAME'] = self.username
439
os.environ['MAIL'] = '/var/mail/%s'%self.username
440
if self._single:
441
# In single-machine mode, everything is on localhost.
442
os.environ['SMC_HOST'] = 'localhost'
443
del os.environ['SUDO_COMMAND']; del os.environ['SUDO_UID']; del os.environ['SUDO_GID']; del os.environ['SUDO_USER']
444
os.chdir(self.project_path)
445
self.cmd("smc-start")
446
else:
447
os._exit(0)
448
finally:
449
os._exit(0)
450
else:
451
os.waitpid(pid, 0)
452
self.compute_quota(cores, memory, cpu_shares)
453
454
def stop(self):
455
self.killall()
456
self.delete_user()
457
self.remove_smc_path()
458
self.remove_forever_path()
459
self.remove_snapshots_path()
460
461
def restart(self, cores, memory, cpu_shares, base_url):
462
log = self._log("restart")
463
log("first stop")
464
self.stop()
465
log("then start")
466
self.start(cores, memory, cpu_shares, base_url)
467
468
def get_memory(self, s):
469
try:
470
t = self.cmd(["smem", "-nu"], verbose=0, timeout=5).splitlines()[-1].split()[1:]
471
s['memory'] = dict(zip('count swap uss pss rss'.split(),
472
[int(x) for x in t]))
473
except:
474
log("error running memory command")
475
476
def status(self, timeout=60, base_url=''):
477
log = self._log("status")
478
s = {}
479
480
if (self._dev or self._single) and not os.path.exists(self.project_path): # no tiered storage
481
self.create_project_path()
482
483
s['state'] = 'opened'
484
485
if self._dev:
486
if os.path.exists(self.smc_path):
487
try:
488
os.environ['HOME'] = self.project_path
489
os.environ['SMC'] = self.smc_path
490
t = os.popen("smc-status").read()
491
t = json.loads(t)
492
s.update(t)
493
if bool(t.get('local_hub.pid',False)):
494
s['state'] = 'running'
495
self.get_memory(s)
496
except:
497
log("error running status command")
498
s['state'] = 'broken'
499
return s
500
501
if self._single:
502
# newly created project
503
if not os.path.exists(self.project_path):
504
s['state'] = 'opened'
505
return s
506
507
if not os.path.exists(self.project_path):
508
s['state'] = 'closed'
509
return s
510
511
if self.username not in open('/etc/passwd').read():
512
return s
513
514
try:
515
# ignore_errors since if over quota returns nonzero exit code
516
v = self.cmd(['quota', '-v', '-u', self.username], verbose=0, ignore_errors=True).splitlines()
517
quotas = v[-1]
518
# when the user's quota is exceeded, the last column is "ERROR"
519
if quotas == "ERROR":
520
quotas = v[-2]
521
s['disk_MB'] = int(quotas.split()[-6].strip('*'))/1000
522
except Exception, mesg:
523
log("error computing quota -- %s", mesg)
524
525
if os.path.exists(self.smc_path):
526
try:
527
os.setgid(self.uid)
528
os.setuid(self.uid)
529
os.environ['SMC'] = self.smc_path
530
t = os.popen("smc-status").read()
531
t = json.loads(t)
532
s.update(t)
533
if bool(t.get('local_hub.pid',False)):
534
s['state'] = 'running'
535
self.get_memory(s)
536
except:
537
log("error running status command")
538
s['state'] = 'broken'
539
return s
540
541
def state(self, timeout=60, base_url=''):
542
log = self._log("state")
543
544
if (self._dev or self._single) and not os.path.exists(self.project_path):
545
# In dev or single mode, where there is no tiered storage, we always
546
# create the /projects/project_id path, since that is the only place
547
# the project could be.
548
self.create_project_path()
549
550
s = {}
551
552
s['state'] = 'opened'
553
if self._dev:
554
if os.path.exists(self.smc_path):
555
try:
556
os.environ['HOME'] = self.project_path
557
os.environ['SMC'] = self.smc_path
558
os.chdir(self.smc_path)
559
t = json.loads(os.popen("smc-status").read())
560
s.update(t)
561
if bool(t.get('local_hub.pid',False)):
562
s['state'] = 'running'
563
except Exception, err:
564
log("error running status command -- %s", err)
565
s['state'] = 'broken'
566
return s
567
568
if not os.path.exists(self.project_path): # would have to be full tiered storage mode
569
s['state'] = 'closed'
570
return s
571
572
if self.username not in open('/etc/passwd').read():
573
return s
574
575
if os.path.exists(self.smc_path):
576
try:
577
os.setgid(self.uid)
578
os.setuid(self.uid)
579
os.environ['HOME'] = self.project_path
580
os.environ['SMC'] = self.smc_path
581
os.chdir(self.smc_path)
582
t = json.loads(os.popen("smc-status").read())
583
s.update(t)
584
if bool(t.get('local_hub.pid',False)):
585
s['state'] = 'running'
586
except Exception, err:
587
log("error running status command -- %s", err)
588
s['state'] = 'broken'
589
return s
590
591
def _exclude(self, prefix='', extras=[]):
592
return ['--exclude=%s'%os.path.join(prefix, x) for x in
593
['.sage/cache', '.sage/temp', '.trash', '.Trash',
594
'.sagemathcloud', '.smc', '.node-gyp', '.cache', '.forever',
595
'.snapshots', '*.sage-backup'] + extras]
596
597
def directory_listing(self, path, hidden=True, time=True, start=0, limit=-1):
598
"""
599
Return in JSON-format, listing of files in the given path.
600
601
- path = relative path in project; *must* resolve to be
602
under self._projects/project_id or get an error.
603
"""
604
abspath = os.path.abspath(os.path.join(self.project_path, path))
605
if not abspath.startswith(self.project_path):
606
raise RuntimeError("path (=%s) must be contained in project path %s"%(path, self.project_path))
607
def get_file_mtime(name):
608
try:
609
# use lstat instead of stat or getmtime so this works on broken symlinks!
610
return int(round(os.lstat(os.path.join(abspath, name)).st_mtime))
611
except:
612
# ?? This should never happen, but maybe if race condition. ??
613
return 0
614
615
def get_file_size(name):
616
try:
617
# same as above; use instead of os.path....
618
return os.lstat(os.path.join(abspath, name)).st_size
619
except:
620
return -1
621
622
try:
623
listdir = os.listdir(abspath)
624
except:
625
listdir = []
626
result = {}
627
if not hidden:
628
listdir = [x for x in listdir if not x.startswith('.')]
629
630
# Just as in git_ls.py, we make sure that all filenames can be encoded via JSON.
631
# Users sometimes make some really crazy filenames that can't be so encoded.
632
# It's better to just not show them, than to show a horendous error.
633
try:
634
json.dumps(listdir)
635
except:
636
# Throw away filenames that can't be json'd, since they can't be JSON'd below,
637
# which would totally lock user out of their listings.
638
ld0 = listdir[:]
639
listdir = []
640
for x in ld0:
641
try:
642
json.dumps(x)
643
listdir.append(x)
644
except:
645
pass
646
647
648
# Get list of (name, timestamp) pairs
649
all = [(name, get_file_mtime(name)) for name in listdir]
650
651
if time:
652
# sort by time first with bigger times first, then by filename in normal order
653
def f(a,b):
654
if a[1] > b[1]:
655
return -1
656
elif a[1] < b[1]:
657
return 0
658
else:
659
return cmp(a[0], b[0])
660
all.sort(f)
661
else:
662
all.sort() # usual sort is fine
663
664
# Limit and convert to objects
665
all = all[start:]
666
if limit > 0 and len(all) > limit:
667
result['more'] = True
668
all = all[:limit]
669
670
files = dict([(name, {'name':name, 'mtime':mtime}) for name, mtime in all])
671
sorted_names = [x[0] for x in all]
672
673
# Fill in other OS information about each file
674
#for obj in result:
675
for name, info in files.iteritems():
676
if os.path.isdir(os.path.join(abspath, name)):
677
info['isdir'] = True
678
else:
679
info['size'] = get_file_size(name)
680
681
result['files'] = [files[name] for name in sorted_names]
682
return result
683
684
def read_file(self, path, maxsize):
685
"""
686
path = relative path/filename in project
687
688
It:
689
690
- *must* resolve to be under self._projects/project_id or get an error
691
- it must have size in bytes less than the given limit
692
- to download the directory blah/foo, request blah/foo.zip
693
694
Returns base64-encoded file as an object:
695
696
{'base64':'... contents ...'}
697
698
or {'error':"error message..."} in case of an error.
699
"""
700
abspath = os.path.abspath(os.path.join(self.project_path, path))
701
base, ext = os.path.splitext(abspath)
702
if not abspath.startswith(self.project_path):
703
raise RuntimeError("path (=%s) must be contained in project path %s"%(path, self.project_path))
704
if not os.path.exists(abspath):
705
if ext != '.zip':
706
raise RuntimeError("path (=%s) does not exist"%path)
707
else:
708
if os.path.exists(base) and os.path.isdir(base):
709
abspath = os.path.splitext(abspath)[0]
710
else:
711
raise RuntimeError("path (=%s) does not exist and neither does %s"%(path, base))
712
713
filename = os.path.split(abspath)[-1]
714
if os.path.isfile(abspath):
715
# a regular file
716
# TODO: compress the file before base64 encoding (and corresponding decompress
717
# in hub before sending to client)
718
size = os.lstat(abspath).st_size
719
if size > maxsize:
720
raise RuntimeError("path (=%s) must be at most %s bytes, but it is %s bytes"%(path, maxsize, size))
721
content = open(abspath).read()
722
else:
723
# a zip file in memory from a directory tree
724
# REFERENCES:
725
# - http://stackoverflow.com/questions/1855095/how-to-create-a-zip-archive-of-a-directory
726
# - https://support.google.com/accounts/answer/6135882
727
import zipfile
728
from cStringIO import StringIO
729
output = StringIO()
730
relroot = os.path.abspath(os.path.join(abspath, os.pardir))
731
732
size = 0
733
zip = zipfile.ZipFile(output, "w", zipfile.ZIP_DEFLATED, False)
734
for root, dirs, files in os.walk(abspath):
735
# add directory (needed for empty dirs)
736
zip.write(root, os.path.relpath(root, relroot))
737
for file in files:
738
filename = os.path.join(root, file)
739
if os.path.isfile(filename): # regular files only
740
size += os.lstat(filename).st_size
741
if size > maxsize:
742
raise RuntimeError("path (=%s) must be at most %s bytes, but it is at least %s bytes"%(path, maxsize, size))
743
arcname = os.path.join(os.path.relpath(root, relroot), file)
744
zip.write(filename, arcname)
745
746
# Mark the files as having been created on Windows so that
747
# Unix permissions are not inferred as 0000.
748
for zfile in zip.filelist:
749
zfile.create_system = 0
750
zip.close()
751
content = output.getvalue()
752
import base64
753
return {'base64':base64.b64encode(content)}
754
755
def makedirs(self, path, chown=True):
756
log = self._log('makedirs')
757
if os.path.exists(path) and not os.path.isdir(path):
758
try:
759
log("moving %s", path)
760
os.rename(path, path+".backup")
761
except:
762
log("ok, then remove %s", path)
763
os.unlink(path)
764
765
if not os.path.exists(path):
766
log("creating %s"%path)
767
os.chdir(self.project_path)
768
def makedirs(name): # modified from os.makedirs to chown each newly created path segment
769
head, tail = os.path.split(name)
770
if not tail:
771
head, tail = os.path.split(head)
772
if head and tail and not os.path.exists(head):
773
try:
774
makedirs(head)
775
except OSError, e:
776
# be happy if someone already created the path
777
if e.errno != errno.EEXIST:
778
raise
779
if tail == os.curdir: # xxx/newdir/. exists if xxx/newdir exists
780
return
781
try:
782
os.mkdir(name, 0700)
783
except OSError, e:
784
if e.errno != errno.EEXIST:
785
raise
786
if not self._dev:
787
os.chown(name, self.uid, self.uid)
788
makedirs(path)
789
790
def mkdir(self, path): # relative path in project; must resolve to be under PROJECTS_PATH/project_id
791
log = self._log("mkdir")
792
log("ensuring path %s exists", path)
793
project_id = self.project_id
794
project_path = self.project_path
795
abspath = os.path.abspath(os.path.join(project_path, path))
796
if not abspath.startswith(project_path):
797
raise RuntimeError("path (=%s) must be contained in project path %s"%(path, project_path))
798
if not os.path.exists(abspath):
799
self.makedirs(abspath)
800
801
def copy_path(self,
802
path, # relative path to copy; must resolve to be under PROJECTS_PATH/project_id
803
target_hostname = 'localhost', # list of hostnames (foo or foo:port) to copy files to
804
target_project_id = "", # project_id of destination for files; must be open on destination machine
805
target_path = None, # path into project; defaults to path above.
806
overwrite_newer = False, # if True, newer files in target are copied over (otherwise, uses rsync's --update)
807
delete_missing = False, # if True, delete files in dest path not in source, **including** newer files
808
backup = False, # if True, create backup files with a tilde
809
exclude_history = False, # if True, don't copy .sage-history files.
810
timeout = None,
811
bwlimit = None,
812
):
813
"""
814
Copy a path (directory or file) from one project to another.
815
816
WARNING: self._projects mountpoint assumed same on target machine.
817
"""
818
log = self._log("copy_path")
819
820
if target_path is None:
821
target_path = path
822
823
# check that both UUID's are valid -- these will raise exception if there is a problem.
824
if not target_project_id:
825
target_project_id = self.project_id
826
827
check_uuid(target_project_id)
828
829
# parse out target rsync port, if necessary
830
if ':' in target_hostname:
831
target_hostname, target_port = target_hostname.split(':')
832
else:
833
target_port = '22'
834
835
# determine canonical absolute path to source
836
src_abspath = os.path.abspath(os.path.join(self.project_path, path))
837
if not src_abspath.startswith(self.project_path):
838
raise RuntimeError("source path (=%s) must be contained in project_path (=%s)"%(
839
path, self.project_path))
840
841
# determine canonical absolute path to target
842
target_project_path = os.path.join(self._projects, target_project_id)
843
target_abspath = os.path.abspath(os.path.join(target_project_path, target_path))
844
if not target_abspath.startswith(target_project_path):
845
raise RuntimeError("target path (=%s) must be contained in target project path (=%s)"%(
846
target_path, target_project_path))
847
848
if os.path.isdir(src_abspath):
849
src_abspath += '/'
850
target_abspath += '/'
851
852
# handle options
853
options = []
854
if not overwrite_newer:
855
options.append("--update")
856
if backup:
857
options.extend(["--backup"])
858
if delete_missing:
859
# IMPORTANT: newly created files will be deleted even if overwrite_newer is True
860
options.append("--delete")
861
if bwlimit:
862
options.extend(["--bwlimit", bwlimit])
863
if timeout:
864
options.extend(["--timeout", timeout])
865
866
u = uid(target_project_id)
867
try:
868
if socket.gethostname() == target_hostname:
869
# we *have* to do this, due to the firewall!
870
target_hostname = 'localhost'
871
if self._dev:
872
# In local dev mode everything is as the same account on the same machine,
873
# so we just use rsync without ssh.
874
w = [src_abspath, target_abspath]
875
else:
876
# Full mode -- different users so we use ssh between different machines.
877
# However, in a cloud environment StrictHostKeyChecking is painful to manage.
878
w = ['-e', 'ssh -o StrictHostKeyChecking=no -p %s'%target_port,
879
src_abspath,
880
"%s:%s"%(target_hostname, target_abspath)]
881
if exclude_history:
882
exclude = self._exclude('', extras=['*.sage-history'])
883
else:
884
exclude = self._exclude('')
885
v = (['rsync'] + options +
886
['-zaxs', # compressed, archive mode (so leave symlinks, etc.), don't cross filesystem boundaries
887
'--chown=%s:%s'%(u,u),
888
"--ignore-errors"] + exclude + w)
889
# do the rsync
890
self.cmd(v, verbose=2)
891
except Exception, mesg:
892
mesg = str(mesg)
893
# get rid of scary (and pointless) part of message
894
s = "avoid man-in-the-middle attacks"
895
i = mesg.rfind(s)
896
if i != -1:
897
mesg = mesg[i+len(s):]
898
log("rsync error: %s", mesg)
899
raise RuntimeError(mesg)
900
901
902
def main():
903
import argparse
904
parser = argparse.ArgumentParser(description="Project compute control script")
905
subparsers = parser.add_subparsers(help='sub-command help')
906
907
def project(args):
908
kwds = {}
909
for k in ['project_id', 'projects', 'single']:
910
if hasattr(args, k):
911
kwds[k] = getattr(args, k)
912
return Project(**kwds)
913
914
# This is a generic parser for all subcommands that operate on a collection of projects.
915
# It's ugly, but it massively reduces the amount of code.
916
def f(subparser):
917
function = subparser.prog.split()[-1]
918
def g(args):
919
special = [k for k in args.__dict__.keys() if k not in ['project_id', 'func', 'dev', 'projects', 'single', 'kucalc']]
920
out = []
921
errors = False
922
if args.kucalc:
923
args.project_id = [os.environ['COCALC_PROJECT_ID']]
924
for project_id in args.project_id:
925
kwds = dict([(k,getattr(args, k)) for k in special])
926
try:
927
result = getattr(Project(project_id=project_id, dev=args.dev, projects=args.projects, single=args.single, kucalc=args.kucalc), function)(**kwds)
928
except Exception, mesg:
929
raise #-- for debugging
930
errors = True
931
result = {'error':str(mesg), 'project_id':project_id}
932
out.append(result)
933
if len(out) == 1:
934
if not out[0]:
935
out[0] = {}
936
print json.dumps(out[0])
937
else:
938
if not out:
939
out = {}
940
print json.dumps(out)
941
if errors:
942
sys.exit(1)
943
subparser.add_argument("project_id", help="UUID of project", type=str, nargs="+")
944
subparser.set_defaults(func=g)
945
946
# optional arguments to all subcommands
947
parser.add_argument("--dev", default=False, action="store_const", const=True,
948
help="insecure development mode where everything runs insecurely as the same user (no sudo)")
949
950
parser.add_argument("--single", default=False, action="store_const", const=True,
951
help="mode where everything runs on the same machine; no storage tiers; all projects assumed opened by default.")
952
953
parser.add_argument("--kucalc", default=False, action="store_const", const=True,
954
help="run inside a project container inside KuCalc")
955
956
parser.add_argument("--projects", help="/projects mount point [default: '/projects']",
957
dest="projects", default='/projects', type=str)
958
959
# start project running
960
parser_start = subparsers.add_parser('start', help='start project running (open and start daemon)')
961
parser_start.add_argument("--cores", help="number of cores (default: 0=don't change/set) float", type=float, default=0)
962
parser_start.add_argument("--memory", help="megabytes of RAM (default: 0=no change/set) int", type=int, default=0)
963
parser_start.add_argument("--cpu_shares", help="relative share of cpu (default: 0=don't change/set) int", type=int, default=0)
964
parser_start.add_argument("--base_url", help="passed on to local hub server so it can properly launch raw server, jupyter, etc.", type=str, default='')
965
f(parser_start)
966
967
parser_status = subparsers.add_parser('status', help='get status of servers running in the project')
968
parser_status.add_argument("--timeout", help="seconds to run command", default=60, type=int)
969
parser_status.add_argument("--base_url", help="ignored", type=str, default='')
970
971
f(parser_status)
972
973
parser_state = subparsers.add_parser('state', help='get state of project') # {state:?}
974
parser_state.add_argument("--timeout", help="seconds to run command", default=60, type=int)
975
parser_state.add_argument("--base_url", help="ignored", type=str, default='')
976
f(parser_state)
977
978
979
# disk quota
980
parser_disk_quota = subparsers.add_parser('disk_quota', help='set disk quota')
981
parser_disk_quota.add_argument("quota", help="quota in MB (or 0 for no disk_quota).", type=float)
982
f(parser_disk_quota)
983
984
# compute quota
985
parser_compute_quota = subparsers.add_parser('compute_quota', help='set compute quotas')
986
parser_compute_quota.add_argument("--cores", help="number of cores (default: 0=don't change/set) float", type=float, default=0)
987
parser_compute_quota.add_argument("--memory", help="megabytes of RAM (default: 0=no change/set) float", type=float, default=0)
988
parser_compute_quota.add_argument("--cpu_shares", help="relative share of cpu (default: 0=don't change/set) float", type=float, default=0)
989
f(parser_compute_quota)
990
991
# create Linux user for project
992
parser_create_user = subparsers.add_parser('create_user', help='create Linux user')
993
parser_create_user.add_argument("--login_shell", help="", type=str, default='/bin/bash')
994
f(parser_create_user)
995
996
# delete Linux user for project
997
parser_delete_user = subparsers.add_parser('delete_user', help='delete Linux user')
998
f(parser_delete_user)
999
1000
# kill all processes by Linux user for project
1001
parser_killall = subparsers.add_parser('killall', help='kill all processes by this user')
1002
f(parser_killall)
1003
1004
# kill all processes and delete unix user.
1005
f(subparsers.add_parser('stop', help='kill all processes and delete user'))
1006
1007
parser_restart = subparsers.add_parser('restart', help='stop then start project')
1008
parser_restart.add_argument("--cores", help="number of cores (default: 0=don't change/set) float", type=float, default=0)
1009
parser_restart.add_argument("--memory", help="megabytes of RAM (default: 0=no change/set) int", type=int, default=0)
1010
parser_restart.add_argument("--cpu_shares", help="relative share of cpu (default: 0=don't change/set) int", type=int, default=0)
1011
parser_restart.add_argument("--base_url", help="passed on to local hub server so it can properly launch raw server, jupyter, etc.", type=str, default='')
1012
f(parser_restart)
1013
1014
# directory listing
1015
parser_directory_listing = subparsers.add_parser('directory_listing', help='list files (and info about them) in a directory in the project')
1016
parser_directory_listing.add_argument("--path", help="relative path in project", dest="path", default='', type=str)
1017
parser_directory_listing.add_argument("--hidden", help="if given, show hidden files",
1018
dest="hidden", default=False, action="store_const", const=True)
1019
parser_directory_listing.add_argument("--time", help="if given, sort by time with newest first",
1020
dest="time", default=False, action="store_const", const=True)
1021
parser_directory_listing.add_argument("--start", help="return only part of listing starting with this position (default: 0)",
1022
dest="start", default=0, type=int)
1023
parser_directory_listing.add_argument("--limit", help="if given, only return this many directory entries (default: -1)",
1024
dest="limit", default=-1, type=int)
1025
1026
f(parser_directory_listing)
1027
1028
parser_read_file = subparsers.add_parser('read_file',
1029
help="read a file/directory; outputs {'base64':'..content..'}; use directory.zip to get directory/ as a zip")
1030
parser_read_file.add_argument("path", help="relative path of a file/directory in project (required)", type=str)
1031
parser_read_file.add_argument("--maxsize", help="maximum file size in bytes to read (bigger causes error)",
1032
dest="maxsize", default=3000000, type=int)
1033
f(parser_read_file)
1034
1035
parser_copy_path = subparsers.add_parser('copy_path', help='copy a path from one project to another')
1036
parser_copy_path.add_argument("--target_hostname", help="hostname of target machine for copy (default: localhost)",
1037
dest="target_hostname", default='localhost', type=str)
1038
parser_copy_path.add_argument("--target_project_id", help="id of target project (default: this project)",
1039
dest="target_project_id", default="", type=str)
1040
parser_copy_path.add_argument("--path", help="relative path or filename in project",
1041
dest="path", default='', type=str)
1042
parser_copy_path.add_argument("--target_path", help="relative path into target project (defaults to --path)",
1043
dest="target_path", default=None, type=str)
1044
parser_copy_path.add_argument("--overwrite_newer", help="if given, newer files in target are copied over",
1045
dest="overwrite_newer", default=False, action="store_const", const=True)
1046
parser_copy_path.add_argument("--delete_missing", help="if given, delete files in dest path not in source",
1047
dest="delete_missing", default=False, action="store_const", const=True)
1048
parser_copy_path.add_argument("--exclude_history", help="if given, do not copy *.sage-history files",
1049
dest="exclude_history", default=False, action="store_const", const=True)
1050
parser_copy_path.add_argument("--backup", help="make ~ backup files instead of overwriting changed files",
1051
dest="backup", default=False, action="store_const", const=True)
1052
f(parser_copy_path)
1053
1054
parser_mkdir = subparsers.add_parser('mkdir', help='ensure path exists')
1055
parser_mkdir.add_argument("path", help="relative path or filename in project",
1056
type=str)
1057
f(parser_mkdir)
1058
1059
args = parser.parse_args()
1060
args.func(args)
1061
1062
1063
if __name__ == "__main__":
1064
main()
1065
1066