Contact
CoCalc Logo Icon
StoreFeaturesDocsShareSupport News AboutSign UpSign In
| Download
Views: 39550
1
#!/usr/bin/env python
2
###############################################################################
3
#
4
# CoCalc: Collaborative Calculation in the Cloud
5
#
6
# Copyright (C) 2016, Sagemath Inc.
7
#
8
# This program is free software: you can redistribute it and/or modify
9
# it under the terms of the GNU General Public License as published by
10
# the Free Software Foundation, either version 3 of the License, or
11
# (at your option) any later version.
12
#
13
# This program is distributed in the hope that it will be useful,
14
# but WITHOUT ANY WARRANTY; without even the implied warranty of
15
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16
# GNU General Public License for more details.
17
#
18
# You should have received a copy of the GNU General Public License
19
# along with this program. If not, see <http://www.gnu.org/licenses/>.
20
#
21
###############################################################################
22
23
24
25
"""
26
27
BUP/ZFS-based project storage system
28
29
The basic idea:
30
31
- a bup repo with snapshot history of a project is stored on k machines in each data center, with a way to sync repos
32
- live files are also stored on those same k machines in a directory as part of one big dedup'd and compressed zpool, which is snapshotted regularly
33
- all internode/interdata-center replication is done via rsync
34
- Loss of files is very hard, because the files and their history is contained in:
35
(1) the bup repos (backed up offsite)
36
(2) the snapshots of the big single shared zfs filesystem (not backed up)
37
Note that project history may move when new nodes are added, due to consistent hashing. But the zfs snapshots still exist.
38
39
40
INSTALL:
41
42
In visudo:
43
44
salvus ALL=(ALL) NOPASSWD: /usr/local/bin/bup_storage.py *
45
46
Install script:
47
48
cp /home/salvus/salvus/salvus/scripts/bup_storage.py /usr/local/bin/
49
chown root:salvus /usr/local/bin/bup_storage.py
50
chmod ug+rx /usr/local/bin/bup_storage.py
51
chmod og-w /usr/local/bin/bup_storage.py
52
chmod o-x /usr/local/bin/bup_storage.py
53
54
55
Setup Pool:
56
57
58
export POOL=bup
59
# export POOL=pool
60
#zpool create -f $POOL /dev/sdb # on gce
61
#zpool create -f $POOL /dev/vdb
62
zfs create $POOL/projects
63
zfs set mountpoint=/projects $POOL/projects
64
zfs set dedup=on $POOL/projects
65
zfs set compression=lz4 $POOL/projects
66
zfs create $POOL/bups
67
zfs set mountpoint=/bup/bups $POOL/bups
68
chmod og-rwx /bup/bups
69
70
zfs create $POOL/scratch
71
zfs set mountpoint=/scratch $POOL/scratch
72
zfs set compression=lz4 $POOL/scratch
73
chmod a+rwx /scratch
74
75
zfs create $POOL/conf
76
zfs set mountpoint=/bup/conf $POOL/conf
77
zfs set compression=lz4 $POOL/conf
78
chmod og-rwx /bup/conf
79
chown salvus. /bup/conf
80
81
chmod a+rx /bup
82
83
"""
84
85
# How frequently bup watch dumps changes to disk.
86
BUP_WATCH_SAVE_INTERVAL_MS=60000
87
USE_BUP_WATCH = False
88
89
# If UNSAFE_MODE=False, we only provide a restricted subset of options. When this
90
# script will be run via sudo, it is useful to minimize what it is able to do, e.g.,
91
# there is no reason it should have easy command-line options to overwrite any file
92
# on the system with arbitrary content.
93
UNSAFE_MODE=True
94
95
import argparse, base64, hashlib, math, os, random, shutil, socket, string, sys, time, uuid, json, signal, math, pwd, codecs, re
96
from subprocess import Popen, PIPE
97
from uuid import UUID, uuid4
98
99
# Flag to turn off all use of quotas, since it will take a while to set these up after migration.
100
QUOTAS_ENABLED=True
101
QUOTAS_OVERRIDE=0 # 0 = don't override
102
103
USERNAME = pwd.getpwuid(os.getuid())[0]
104
105
# If using ZFS
106
ZPOOL = 'bup' # must have ZPOOL/bups and ZPOOL/projects filesystems
107
108
# The path where bup repos are stored
109
BUP_PATH = '/bup/bups'
110
111
ARCHIVE_PATH = '/archive/'
112
113
GS_BUCKET_NAME = 'smc-projects-devel'
114
115
# The path where project working files appear
116
PROJECTS_PATH = '/projects'
117
118
# Default account settings
119
120
DEFAULT_SETTINGS = {
121
'disk' : 3000, # default disk in megabytes
122
'scratch' : 15000, # default disk quota on /scratch
123
'memory' : 2, # memory in gigabytes
124
'cpu_shares' : 256,
125
'cores' : 1,
126
'login_shell': '/bin/bash',
127
'mintime' : int(60*60), # default = hour idle (no save) time before kill
128
'inode' : 200000, # not used with ZFS
129
'network' : False
130
}
131
132
BWLIMIT = 20000
133
134
# don't try to sync files bigger than this.
135
# We do this because the user could create a 10 exabyte sparse
136
# file, say, and kill our synchronization system. At least this
137
# minizes the damage. It's fine since project quotas are much smaller
138
# than this... for now.
139
MAX_RSYNC_SIZE = '100G'
140
141
FILESYSTEM = 'zfs' # 'zfs' or 'ext4'
142
143
if FILESYSTEM == 'ext4':
144
if not os.path.exists(BUP_PATH):
145
cmd("/bin/mkdir -p %s; chmod og-rwx %s"%(BUP_PATH, BUP_PATH))
146
147
if not os.path.exists(PROJECTS_PATH):
148
cmd("/bin/mkdir -p %s; chmod og+rx %s"%(PROJECTS_PATH, PROJECTS_PATH))
149
150
151
# Make sure to copy: 'cp -rv ~/salvus/salvus/scripts/skel/.sagemathcloud/data /home/salvus/salvus/salvus/local_hub_template/"
152
SAGEMATHCLOUD_TEMPLATE = "/home/salvus/salvus/salvus/local_hub_template/"
153
154
BASHRC_TEMPLATE = "/home/salvus/salvus/salvus/scripts/skel/.bashrc"
155
BASH_PROFILE_TEMPLATE = "/home/salvus/salvus/salvus/scripts/skel/.bash_profile"
156
157
#SSH_ACCESS_PUBLIC_KEY = "/home/salvus/salvus/salvus/scripts/skel/.ssh/authorized_keys2"
158
159
def log(m, *args):
160
if len(args):
161
m = m%args
162
sys.stderr.write(str(m)+'\n')
163
sys.stderr.flush()
164
165
166
UID_WHITELIST = "/root/smc-iptables/uid_whitelist"
167
if not os.path.exists(UID_WHITELIST):
168
try:
169
open(UID_WHITELIST,'w').close()
170
except Exception, err:
171
log(err)
172
173
174
175
def print_json(s):
176
print json.dumps(s, separators=(',',':'))
177
178
def uid(project_id):
179
# We take the sha-512 of the uuid just to make it harder to force a collision. Thus even if a
180
# user could somehow generate an account id of their choosing, this wouldn't help them get the
181
# same uid as another user.
182
# 2^31-1=max uid which works with FUSE and node (and Linux, which goes up to 2^32-2).
183
n = int(hashlib.sha512(project_id).hexdigest()[:8], 16) # up to 2^32
184
n /= 2 # up to 2^31
185
return n if n>65537 else n+65537 # 65534 used by linux for user sync, etc.
186
187
def now():
188
return time.strftime('%Y-%m-%dT%H:%M:%S')
189
190
def ensure_file_exists(src, target):
191
if not os.path.exists(target):
192
shutil.copyfile(src, target)
193
s = os.stat(os.path.split(target)[0])
194
if USERNAME == "root":
195
os.chown(target, s.st_uid, s.st_gid)
196
197
def check_uuid(uuid):
198
if UUID(uuid).version != 4:
199
raise RuntimeError("invalid uuid")
200
201
202
def cmd(s, ignore_errors=False, verbose=2, timeout=None, stdout=True, stderr=True):
203
if isinstance(s, list):
204
s = [str(x) for x in s]
205
if verbose >= 1:
206
if isinstance(s, list):
207
t = [x if len(x.split()) <=1 else "'%s'"%x for x in s]
208
log(' '.join(t))
209
else:
210
log(s)
211
t = time.time()
212
213
mesg = "ERROR"
214
if timeout:
215
mesg = "TIMEOUT: running '%s' took more than %s seconds, so killed"%(s, timeout)
216
def handle(*a):
217
if ignore_errors:
218
return mesg
219
else:
220
raise KeyboardInterrupt(mesg)
221
signal.signal(signal.SIGALRM, handle)
222
signal.alarm(timeout)
223
try:
224
out = Popen(s, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=not isinstance(s, list))
225
x = out.stdout.read() + out.stderr.read()
226
e = out.wait() # this must be *after* the out.stdout.read(), etc. above or will hang when output large!
227
if e:
228
if ignore_errors:
229
return (x + "ERROR").strip()
230
else:
231
raise RuntimeError(x)
232
if verbose>=2:
233
log(("(%s seconds): %s"%(time.time()-t, x))[:500])
234
elif verbose >= 1:
235
log("(%s seconds)"%(time.time()-t))
236
return x.strip()
237
except IOError:
238
return mesg
239
finally:
240
if timeout:
241
signal.signal(signal.SIGALRM, signal.SIG_IGN) # cancel the alarm
242
243
class Project(object):
244
def __init__(self, project_id):
245
try:
246
u = uuid.UUID(project_id)
247
assert u.get_version() == 4
248
project_id = str(u) # leaving off dashes still makes a valid uuid in python
249
except (AssertionError, ValueError):
250
raise RuntimeError("invalid project uuid='%s'"%project_id)
251
self.project_id = project_id
252
self.uid = uid(project_id)
253
self.gid = self.uid
254
self.username = self.project_id.replace('-','')
255
self.groupname = self.username
256
self.bup_path = os.path.join(BUP_PATH, project_id)
257
self.archive_path = os.path.join(ARCHIVE_PATH, "%s.tar"%self.project_id)
258
self.gs_path = 'gs://%s/%s.tar'%(GS_BUCKET_NAME, self.project_id) # google cloud storage
259
self.conf_path = os.path.join(self.bup_path, "conf")
260
self.settings_path = os.path.join(self.conf_path, "settings.json")
261
self.replicas_path = os.path.join(self.conf_path, "replicas.json")
262
self.project_mnt = os.path.join(PROJECTS_PATH, project_id)
263
self.snap_mnt = os.path.join(self.project_mnt, '.snapshots')
264
self.touch_file = os.path.join(self.bup_path, "conf", "touch")
265
self.save_log = os.path.join(self.bup_path, "conf", "save_log.json")
266
self.HEAD = "%s/HEAD"%self.bup_path
267
if os.path.exists(self.HEAD):
268
self.branch = open(self.HEAD).read().split('/')[-1].strip()
269
else:
270
self.branch = 'master'
271
272
def cmd(self, *args, **kwds):
273
os.environ['BUP_DIR'] = self.bup_path
274
return cmd(*args, **kwds)
275
276
def __repr__(self):
277
return "Project(%s)"%project_id
278
279
def _log(self, funcname,**kwds):
280
def f(mesg='',*args):
281
log("%s(project_id=%s,%s): %s"%(funcname, self.project_id, kwds, mesg), *args)
282
f()
283
return f
284
285
# this user_exists function isn't used or tested yet:
286
def user_exists(self):
287
"""
288
Returns True if the UNIX user for this project exists.
289
"""
290
try:
291
cmd(['id', self.username]) # id returns a non-zero status <==> user exists
292
return True
293
except RuntimeError:
294
return False
295
296
def create_user(self):
297
self.create_home()
298
login_shell = self.get_settings()['login_shell']
299
if self.gid == self.uid:
300
self.cmd(['/usr/sbin/groupadd', '-g', self.gid, '-o', self.username], ignore_errors=True)
301
self.cmd(['/usr/sbin/useradd', '-u', self.uid, '-g', self.gid, '-o', self.username,
302
'-d', self.project_mnt, '-s', login_shell], ignore_errors=True)
303
304
def delete_user(self):
305
self.cmd(['/usr/sbin/userdel', self.username], ignore_errors=True)
306
if self.gid == self.uid:
307
self.cmd(['/usr/sbin/groupdel', self.username], ignore_errors=True)
308
309
def start_daemons(self):
310
self.cmd(['su', '-', self.username, '-c', 'cd .sagemathcloud; . sagemathcloud-env; ./start_smc'], timeout=30)
311
312
def start_file_watch(self):
313
pidfile = os.path.join(self.bup_path, "watch.pid")
314
try:
315
# there are a lot of valid reasons, e.g., due to sync/replication!, that this pidfile would be here when we do start.
316
os.unlink(pidfile)
317
except:
318
pass
319
320
self.cmd([
321
"/usr/bin/bup", "watch",
322
"--start",
323
"--pidfile", pidfile,
324
"--logfile", os.path.join(self.bup_path, "watch.log"),
325
"--save-interval", BUP_WATCH_SAVE_INTERVAL_MS,
326
"--xdev"]
327
+ self.exclude(self.project_mnt, prog='bup')
328
+ [self.project_mnt]
329
)
330
331
def stop_file_watch(self):
332
self.cmd([
333
"/usr/bin/bup", "watch",
334
"--stop",
335
"--pidfile", os.path.join(self.bup_path, "watch.pid")]
336
)
337
338
def start(self):
339
self.init()
340
self.create_home()
341
self.delete_user()
342
self.create_user()
343
self.killall()
344
self.settings()
345
self.ensure_conf_files()
346
self.touch()
347
if USE_BUP_WATCH:
348
log("starting file watch for user with id %s"%self.uid)
349
self.start_file_watch()
350
self.update_daemon_code()
351
self.start_daemons()
352
self.umount_snapshots()
353
# TODO: remove this chown once (1) uid defn stabilizes -- after migration will go through all projects and properly chown.
354
#self.chown_all()
355
self.mount_snapshots()
356
357
def chown_all(self):
358
log = self._log("chown_all")
359
for P in os.listdir(self.project_mnt):
360
target = os.path.join(self.project_mnt, P)
361
if target != self.snap_mnt:
362
try:
363
self.chown(target)
364
except Exception, err:
365
log("WARNING: %s"%err)
366
367
def get_zfs_status(self): # output is in BYTES!
368
q = {}
369
if not QUOTAS_ENABLED or QUOTAS_OVERRIDE:
370
return q
371
try:
372
for x in ['userquota', 'userused']:
373
for y in ['projects', 'scratch']:
374
q['%s-%s'%(x,y)] = int(cmd(['zfs', 'get', '-Hp', '%s@%s'%(x,self.uid), '%s/%s'%(ZPOOL,y)]).split()[2]) #//(2**20)
375
return q
376
except RuntimeError:
377
return None
378
379
def status(self, running=False, stop_on_error=True):
380
log = self._log("status")
381
if running:
382
s = {}
383
else:
384
s = {'username':self.username, 'uid':self.uid, 'gid':self.gid, 'settings':self.get_settings()}
385
try:
386
s['newest_snapshot'] = self.newest_snapshot()
387
s['bup'] = 'working'
388
except RuntimeError, mesg:
389
mesg = str(mesg)
390
if 'bup init' in mesg:
391
s['bup'] = 'uninitialized' # it's just not initialized, which is no problem
392
else:
393
s['bup'] = mesg
394
s['load'] = [float(a.strip(',')) for a in os.popen('uptime').read().split()[-3:]]
395
if FILESYSTEM == 'zfs':
396
s['zfs'] = self.get_zfs_status()
397
398
if self.username not in open('/etc/passwd').read(): # TODO: can be done better
399
s['running'] = False
400
return s
401
402
try:
403
t = self.cmd(['su', '-', self.username, '-c', 'cd .sagemathcloud; . sagemathcloud-env; ./status'], timeout=30)
404
t = json.loads(t)
405
s.update(t)
406
s['running'] = bool(t.get('local_hub.pid',False))
407
return s
408
except Exception, msg:
409
log("Error getting status -- %s"%msg)
410
# Original comment: important to actually let error propogate so that bup_server gets an error and knows things are
411
# messed up, namely there is a user created, but the status command isn't working at all. In this
412
# case bup_server will know to try to kill this.
413
if stop_on_error:
414
# ** Actually, in practice sometimes the caller doesn't know
415
# to kill this project. So we explicitly toss in a stop below,
416
# which will clean things up completely. **
417
self.stop()
418
return self.status(running=running, stop_on_error=False) # try again
419
else:
420
raise
421
422
def create_home(self):
423
self._log('create_home')
424
if not os.path.exists(self.project_mnt):
425
self.makedirs(self.project_mnt)
426
if USERNAME == "root":
427
os.chown(self.project_mnt, self.uid, self.gid)
428
429
def init(self):
430
"""
431
Create user home directory and bup repo.
432
"""
433
log = self._log("create")
434
if not os.path.exists(os.path.join(self.bup_path,'objects')):
435
self.cmd(['/usr/bin/bup', 'init'])
436
self.create_home()
437
self.makedirs(self.conf_path, chown=False)
438
439
def set_branch(self, branch=''):
440
if branch and branch != self.branch:
441
self.branch = branch
442
open(self.HEAD,'w').write("ref: refs/heads/%s"%branch)
443
444
def checkout(self, snapshot='latest', branch=None):
445
self.set_branch(branch)
446
if not os.path.exists(self.project_mnt):
447
self.makedirs(self.project_mnt)
448
self.cmd(['/usr/bin/bup', 'restore', '%s/%s/'%(self.branch, snapshot), '--outdir', self.project_mnt])
449
self.chown(self.project_mnt)
450
else:
451
src = os.path.join(self.snap_mnt, self.branch, snapshot)+'/'
452
self.cmd(['rsync', '-saxH', '--delete-excluded', '--delete', self.exclude(src), src, self.project_mnt+'/'])
453
454
def umount_snapshots(self):
455
self.cmd(['fusermount', '-uz', self.snap_mnt], ignore_errors=True)
456
457
def mount_snapshots(self):
458
log = self._log('mount_snapshots')
459
self.umount_snapshots()
460
if os.path.exists(self.snap_mnt):
461
shutil.rmtree(self.snap_mnt, ignore_errors=True)
462
try:
463
self.makedirs(self.snap_mnt)
464
self.cmd(['bup', 'fuse', '-o', '--uid', self.uid, '--gid', self.gid, self.snap_mnt])
465
except Exception, msg:
466
# if there is no space to make the snapshot directory, user gets no snapshots.
467
if 'Disk quota exceeded' in msg:
468
log("nonfatal error -- %s"%msg)
469
else:
470
raise
471
472
def touch(self):
473
open(self.touch_file,'w')
474
475
def last_touch_time(self):
476
if os.path.exists(self.touch_file):
477
return int(round(os.path.getmtime(self.touch_file)))
478
else:
479
return time.time() # now -- since could be just creating project
480
481
def stop(self, grace_s=0.5, only_if_idle=False):
482
log = self._log('stop')
483
if only_if_idle:
484
log("checking if project is idle regarding saves")
485
mintime = self.get_settings()['mintime']
486
if mintime <= 0:
487
log("nope -- it has infinite time")
488
else:
489
last = self.last_touch_time()
490
time_since_last = time.time() - last
491
log(" time_since_last = %s and mintime = %s"%( time_since_last , mintime))
492
if time_since_last < mintime:
493
log("hasn't been long enough -- not stopping")
494
return
495
496
self.killall(grace_s=grace_s)
497
498
if USE_BUP_WATCH:
499
log("stopping file watch for user with id %s"%self.uid)
500
self.stop_file_watch()
501
502
# So crontabs, remote logins, etc., won't happen... and user can't just get more free time via crontab. Not sure.
503
# We need another state, which is that the project is "on" but daemons are all stopped and not using RAM.
504
self.delete_user()
505
self.unset_quota()
506
self.umount_snapshots()
507
508
def killall(self, grace_s=0.5):
509
log = self._log('killall')
510
log("killing all processes by user with id %s"%self.uid)
511
MAX_TRIES=10
512
# we use both kill and pkill -- pkill seems better in theory, but I've definitely seen it get ignored.
513
for i in range(MAX_TRIES):
514
self.cmd(['/usr/bin/killall', '-u', self.username], ignore_errors=True)
515
self.cmd(['/usr/bin/pkill', '-u', self.uid], ignore_errors=True)
516
time.sleep(grace_s)
517
self.cmd(['/usr/bin/killall', '-9', '-u', self.username], ignore_errors=True)
518
self.cmd(['/usr/bin/pkill', '-9', '-u', self.uid], ignore_errors=True)
519
n = self.num_procs()
520
log("kill attempt left %s procs"%n)
521
if n == 0:
522
return
523
log("WARNING: failed to kill all procs after %s tries"%MAX_TRIES)
524
525
526
def restart(self):
527
self.stop()
528
self.start()
529
530
def pids(self):
531
return [int(x) for x in cmd(['pgrep', '-u', self.uid], ignore_errors=True).replace('ERROR','').split()]
532
533
def num_procs(self):
534
return len(self.pids())
535
536
def delete_project(self):
537
"""
538
Remove the user's files, leaving only the bup repo.
539
540
** DANGEROUS. **
541
542
This would be used when it is highly unlikely the project will ever be used again, e.g.,
543
maybe when one deletes a project, and we want to keep it around for a while for archival
544
purposes, just in case.
545
"""
546
log = self._log("delete_project")
547
self.stop()
548
self.umount_snapshots()
549
log("removing users files")
550
shutil.rmtree(self.project_mnt)
551
self.delete_user()
552
553
def destroy(self):
554
"""
555
*VERY DANGEROUS.* Delete all traces of this project from the ZFS pool.
556
"""
557
self.delete_project()
558
shutil.rmtree(self.bup_path)
559
560
def exclude(self, prefix, prog='rsync'):
561
eprefix = re.escape(prefix)
562
excludes = ['.sage/cache', '.fontconfig', '.sage/temp', '.zfs', '.npm', '.sagemathcloud', '.node-gyp', '.cache', '.forever', '.snapshots']
563
exclude_rxs = []
564
if prog == 'rsync':
565
excludes.append('*.sage-backup')
566
else: # prog == 'bup'
567
exclude_rxs.append(r'.*\.sage\-backup')
568
excludes.append('.trash') # don't bup archive trash (but do sync trash between vm's)
569
570
for i,x in enumerate(exclude_rxs):
571
# escape the prefix for the regexs
572
ex_len = len(re.escape(x))
573
exclude_rxs[i] = re.escape(os.path.join(prefix, x))
574
exclude_rxs[i] = exclude_rxs[i][:-ex_len]+x
575
576
return ['--exclude=%s'%os.path.join(prefix, x) for x in excludes] + ['--exclude-rx=%s'%x for x in exclude_rxs]
577
578
def save(self, path=None, timestamp=None, branch=None, sync=True, mnt=True, targets=""):
579
"""
580
Save a snapshot.
581
582
If sync is true, first does sync of live files, then creates the bup snapshot, then
583
finally syncs data out and returns info about how successful that was.
584
"""
585
log = self._log("save")
586
self.touch()
587
self.set_branch(branch)
588
if path is None:
589
path = self.project_mnt
590
591
# Some countermeasures against bad users.
592
try:
593
for bad in open('/root/banned_files').read().split():
594
if os.path.exists(os.path.join(self.project_mnt,bad)):
595
self.stop()
596
return {'files_saved' : 0}
597
except Exception, msg:
598
log("WARNING: non-fatal issue reading /root/banned_files file and shrinking user priority: %s"%msg)
599
600
if sync:
601
log("Doing first sync before save of the live files (ignoring any issues or errors)")
602
self.sync(targets=targets, snapshots=False)
603
604
# We ignore_errors below because unfortunately bup will return a nonzero exit code ("WARNING")
605
# when it hits a fuse filesystem. TODO: somehow be more careful that each
606
if not USE_BUP_WATCH:
607
self.cmd(["/usr/bin/bup", "index", "-x"] + self.exclude(path+'/',prog='bup') + [path], ignore_errors=True)
608
609
what_changed = self.cmd(["/usr/bin/bup", "index", '-m', path],verbose=0).splitlines()
610
files_saved = max(0, len(what_changed) - 1) # 1 since always includes the directory itself
611
result = {'files_saved' : files_saved}
612
if files_saved > 0:
613
614
if timestamp is None:
615
# mark by the time when we actually start saving, not start indexing above.
616
timestamp = int(time.time())
617
618
result['timestamp'] = timestamp
619
620
# It is important to still sync out, etc., even if there is an error. Many errors are nonfatal, e.g., a file vanishes during save.
621
try:
622
self.cmd(["/usr/bin/bup", "save", "--strip", "-n", self.branch, '-d', timestamp, path])
623
except RuntimeError, msg:
624
log("WARNING: running bup failed with error: %s"%msg)
625
result['error'] = str(msg)
626
627
# record this so can properly describe the true "interval of time" over which the snapshot happened,
628
# in case we want to for some reason...
629
result['timestamp_end'] = int(time.time())
630
631
result['bup_repo_size_kb'] = int(self.cmd(['du', '-s', '-x', '--block-size=KB', self.bup_path]).split()[0].split('k')[0])
632
633
if mnt and path == self.project_mnt:
634
self.mount_snapshots()
635
636
if sync:
637
result['sync'] = self.sync(targets=targets)
638
639
# The save log turns out to be a really bad idea, at least implemented this way.
640
# The problem is we quickly end up with one MASSIVE file; this is particularly painful
641
# due to how replication works -- a single file saved here, and we have to copy gigabytes around!
642
# We will find another way... e.g., one file for each save.
643
#r = dict(result)
644
#n = len(self.project_mnt)+1
645
#r['files'] = [x[n:] for x in what_changed if len(x) > n]
646
#try:
647
# codecs.open(self.save_log,'a',"utf-8-sig").write(json.dumps(r)+'\n')
648
#except Exception, msg:
649
# # the save log is only a convenience -- not critical.
650
# log("WARNING: unable to write to save log -- %s"%msg)
651
return result
652
653
def tag(self, tag, delete=False):
654
"""
655
Tag the latest commit to master or delete a tag.
656
"""
657
if delete:
658
self.cmd(["/usr/bin/bup", "tag", "-f", "-d", tag])
659
else:
660
self.cmd(["/usr/bin/bup", "tag", "-f", tag, self.branch])
661
662
def newest_snapshot(self, branch=''):
663
"""
664
Return newest snapshot in current branch or None if there are no snapshots yet.
665
"""
666
v = self.snapshots(branch)
667
if len(v) > 0:
668
return v[-1]
669
else:
670
return None
671
672
def snapshots(self, branch=''):
673
"""
674
Return list of all snapshots in date order for the given branch.
675
"""
676
if not branch:
677
branch = self.branch
678
if not os.path.exists(os.path.join(self.bup_path, 'refs', 'heads', branch)):
679
# branch doesn't exist
680
return []
681
else:
682
return self.cmd(["/usr/bin/bup", "ls", branch+'/'], verbose=0).split()[:-1]
683
684
def branches(self):
685
return {'branches':self.cmd("bup ls").split(), 'branch':self.branch}
686
687
def cleanup(self):
688
"""
689
Clean up the bup repo, replacing the large number of git pack files by a small number, deleting
690
the bupindex cache, which can get really big, etc.
691
692
After using this, you *must* do a destructive sync to all replicas!
693
"""
694
self.cmd("cd %s; rm -f bupindex; rm -f objects/pack/*.midx; rm -f objects/pack/*.midx.tmp && rm -rf objects/*tmp && time git repack --max-pack-size=2g --window=0 --depth=0 -lad"%self.bup_path)
695
696
def makedirs(self, path, chown=True):
697
log = self._log('makedirs')
698
if os.path.exists(path) and not os.path.isdir(path):
699
log("removing %s"%path)
700
os.unlink(path)
701
if not os.path.exists(path):
702
log("creating %s"%path)
703
def makedirs(name): # modified from os.makedirs to chown each newly created path segment
704
head, tail = os.path.split(name)
705
if not tail:
706
head, tail = os.path.split(head)
707
if head and tail and not os.path.exists(head):
708
try:
709
makedirs(head)
710
except OSError, e:
711
# be happy if someone already created the path
712
if e.errno != errno.EEXIST:
713
raise
714
if tail == os.curdir: # xxx/newdir/. exists if xxx/newdir exists
715
return
716
os.mkdir(name, 0700)
717
os.chown(name, self.uid, self.gid)
718
makedirs(path)
719
720
def update_daemon_code(self):
721
log = self._log('update_daemon_code')
722
self.create_home()
723
target = '/%s/.sagemathcloud/'%self.project_mnt
724
self.makedirs(target)
725
self.cmd(["rsync", "-zaxHL", "--update", SAGEMATHCLOUD_TEMPLATE+"/", target])
726
self.chown(target)
727
728
def chown(self, path):
729
self.cmd(["chown", "%s:%s"%(self.uid, self.gid), '-R', path])
730
731
def ensure_file_exists(self, src, target):
732
target = os.path.abspath(target)
733
if not os.path.exists(target):
734
self.makedirs(os.path.split(target)[0])
735
shutil.copyfile(src, target)
736
if USERNAME == "root":
737
os.chown(target, self.uid, self.gid)
738
739
def ensure_conf_files(self):
740
log = self._log('ensure_conf_files')
741
log("ensure there is a bashrc and bash_profile")
742
self.create_home()
743
self.ensure_file_exists(BASHRC_TEMPLATE, os.path.join(self.project_mnt,".bashrc"))
744
self.ensure_file_exists(BASH_PROFILE_TEMPLATE, os.path.join(self.project_mnt,".bash_profile"))
745
746
def get_settings(self):
747
if not os.path.exists(self.conf_path):
748
self.makedirs(self.conf_path, chown=False)
749
if os.path.exists(self.settings_path):
750
try:
751
settings = json.loads(open(self.settings_path).read())
752
for k, v in DEFAULT_SETTINGS.iteritems():
753
if k not in settings:
754
settings[k] = v
755
except (ValueError, IOError), mesg:
756
settings = dict(DEFAULT_SETTINGS)
757
else:
758
settings = dict(DEFAULT_SETTINGS)
759
return settings
760
761
def set_quota(self, disk, scratch):
762
"""
763
Disk space quota
764
"""
765
if not QUOTAS_ENABLED:
766
return
767
if QUOTAS_OVERRIDE:
768
disk = scratch = QUOTAS_OVERRIDE
769
cmd(['zfs', 'set', 'userquota@%s=%sM'%(self.uid, disk), '%s/projects'%ZPOOL])
770
cmd(['zfs', 'set', 'userquota@%s=%sM'%(self.uid, scratch), '%s/scratch'%ZPOOL])
771
772
def unset_quota(self):
773
if not QUOTAS_ENABLED:
774
return
775
cmd(['zfs', 'set', 'userquota@%s=none'%self.uid, '%s/projects'%ZPOOL])
776
cmd(['zfs', 'set', 'userquota@%s=none'%self.uid, '%s/scratch'%ZPOOL])
777
778
779
def settings(self, memory = None, cpu_shares = None, cores = None, disk = None,
780
inode = None, login_shell = None, scratch = None, mintime = None,
781
network = None):
782
log = self._log('settings')
783
log("configuring account...")
784
785
settings = self.get_settings()
786
787
if memory is not None:
788
settings['memory'] = int(memory)
789
memory = settings['memory']
790
791
if cpu_shares is not None:
792
settings['cpu_shares'] = int(cpu_shares)
793
cpu_shares = settings['cpu_shares']
794
795
if cores is not None:
796
settings['cores'] = float(cores)
797
cores = settings['cores']
798
799
if disk is not None:
800
settings['disk'] = int(disk)
801
disk = settings['disk']
802
803
if scratch is not None:
804
settings['scratch'] = int(scratch)
805
scratch = settings['scratch']
806
807
if inode is not None:
808
settings['inode'] = int(inode)
809
inode = settings['inode']
810
811
if mintime is not None:
812
settings['mintime'] = int(mintime)
813
mintime = settings['mintime']
814
815
if network is not None:
816
if isinstance(network, str):
817
if network.lower() in ['0','false']:
818
network = False
819
else:
820
network = True
821
settings['network'] = bool(network)
822
network = settings['network']
823
824
if login_shell is not None and os.path.exists(login_shell):
825
settings['login_shell'] = login_shell
826
else:
827
login_shell = settings['login_shell']
828
829
try:
830
s = json.dumps(settings)
831
open(self.settings_path,'w').write(s)
832
print s
833
except IOError:
834
pass
835
836
# Set the quota
837
self.set_quota(disk=disk, scratch=scratch)
838
839
# Cgroups
840
if cores <= 0:
841
cfs_quota = -1 # no limit
842
else:
843
cfs_quota = int(100000*cores)
844
845
# Special case -- if certain files are in the project, make them slow as molasses
846
try:
847
for bad in open('/root/banned_files').read().split():
848
if os.path.exists(os.path.join(self.project_mnt, bad)):
849
cfs_quota = 1000
850
except Exception, msg:
851
log("WARNING: non-fatal issue reading banned_files file: %s"%msg)
852
853
self.cmd(["cgcreate", "-g", "memory,cpu:%s"%self.username])
854
open("/sys/fs/cgroup/memory/%s/memory.limit_in_bytes"%self.username,'w').write("%sG"%memory)
855
open("/sys/fs/cgroup/cpu/%s/cpu.shares"%self.username,'w').write(str(cpu_shares))
856
open("/sys/fs/cgroup/cpu/%s/cpu.cfs_quota_us"%self.username,'w').write(str(cfs_quota))
857
858
# important -- using self.username instead of self.uid does NOT work reliably!
859
z = "\n%s cpu,memory %s\n"%(self.username, self.username)
860
cur = open("/etc/cgrules.conf").read() if os.path.exists("/etc/cgrules.conf") else ''
861
862
if z not in cur:
863
open("/etc/cgrules.conf",'a').write(z)
864
865
# In Ubuntu 12.04 we used cgred, which doesn't exist in 14.04. In 14.04, we're using PAM, so
866
# classification happens automatically on login.
867
try:
868
self.cmd(['service', 'cgred', 'restart'])
869
except:
870
pass
871
self.cgclassify()
872
873
# open firewall whitelist for user if they have network access
874
restart_firewall = False
875
whitelisted_users = set([x.strip() for x in open(UID_WHITELIST).readlines()])
876
uid = str(self.uid)
877
if network and uid not in whitelisted_users:
878
# add to whitelist and restart
879
whitelisted_users.add(uid)
880
restart_firewall = True
881
elif not network and uid in whitelisted_users:
882
# remove from whitelist and restart
883
whitelisted_users.remove(uid)
884
restart_firewall = True
885
if restart_firewall:
886
# THERE is a potential race condition here! I would prefer to instead have files with names the
887
# uid's in a subdirectory, or something...
888
a = open(UID_WHITELIST,'w')
889
a.write('\n'.join(whitelisted_users)+'\n')
890
a.close()
891
self.cmd(['/root/smc-iptables/restart.sh'])
892
893
def cgclassify(self):
894
try:
895
pids = self.cmd("ps -o pid -u %s"%self.username, ignore_errors=False).split()[1:]
896
self.cmd(["cgclassify"] + pids, ignore_errors=True)
897
# ignore cgclassify errors, since processes come and go, etc.":
898
except:
899
# ps returns an error code if there are NO processes at all (a common condition).
900
pids = []
901
902
def sync(self, targets="", destructive=True, snapshots=True, union=False):
903
"""
904
If union is True, uses the --update option of rsync to make the bup repos and working files
905
on all replicas identical, namely the union of the newest versions of all files. This is mainly
906
used every-once-in-a-while as a sanity operation. (It's intended application was only for migration.)
907
This *CAN* loose bup commits -- we only get the commits of whoever had the newest master. The
908
data is in the git repo, but the references will be lost. Tags won't be lost though.
909
"""
910
log = self._log('sync')
911
status = [{'host':h} for h in targets.split(',')]
912
if not targets:
913
log("nothing to sync to")
914
return status
915
log("syncing to %s"%targets)
916
917
for s in status:
918
t = time.time()
919
try:
920
self._sync(remote=s['host'], destructive=destructive, snapshots=snapshots, union=union)
921
except Exception, err:
922
s['error'] = str(err)
923
s['time'] = time.time() - t
924
925
if union:
926
# do second stage of union
927
for s in status:
928
t = time.time()
929
try:
930
self._sync(remote=s['host'], destructive=destructive, snapshots=snapshots, union2=True)
931
except Exception, err:
932
s['error'] = s.get('error','') + str(err)
933
s['time'] += time.time() - t
934
935
return status
936
937
def remote_is_ready(self, remote, port='22'):
938
"""
939
Ensure that that /projects and /bup/bups are properly mounted on remote host.
940
941
This code assumes that / on the remote host is *NOT* a ZFS filesystem.
942
"""
943
s = "stat -f -c %T /projects /bup/bups"
944
out = self.cmd(["ssh", "-o", "ConnectTimeout=15", "-o", "StrictHostKeyChecking=no", '-p', port, 'root@'+remote, s], ignore_errors=True)
945
return 'ext' not in out and 'zfs' in out # properly mounted = mounted via ZFS in any way.
946
947
def _sync(self, remote, destructive=True, snapshots=True, union=False, union2=False, rsync_timeout=120, bwlimit=BWLIMIT, max_rsync_size=MAX_RSYNC_SIZE):
948
"""
949
NOTE: sync is by default destructive on live files; on snapshots it isn't by default.
950
951
If destructive is true, simply push from local to remote, overwriting anything that is remote.
952
If destructive is false, pushes, then pulls, and makes a tag pointing at conflicts.
953
"""
954
# NOTE: In the rsync's below we compress-in-transit the live project mount (-z),
955
# but *NOT* the bup's, since they are already compressed.
956
957
log = self._log('sync')
958
log("syncing...")
959
960
remote_bup_path = os.path.join(BUP_PATH, self.project_id)
961
962
if ':' in remote:
963
remote, port = remote.split(':')
964
else:
965
port = 22
966
967
# Ensure that that /projects and /bup/bups are properly mounted on remote host before
968
# doing the sync. This is critical, since we do not want to sync to a machine that has
969
# booted up, but hasn't yet imported the ZFS pools.
970
if not self.remote_is_ready(remote, port):
971
raise RuntimeError("remote machine %s not ready to receive replicas"%remote)
972
973
if union:
974
log("union stage 1: gather files from outside")
975
self.cmd(['rsync', '--update', '-zsaxH', '--timeout', rsync_timeout, '--max-size=%s'%max_rsync_size,
976
'--bwlimit', bwlimit, "--ignore-errors"] + self.exclude('') +
977
['-e', 'ssh -o StrictHostKeyChecking=no -p %s'%port,
978
"root@%s:%s/"%(remote, self.project_mnt),
979
self.project_mnt+'/'
980
], ignore_errors=True)
981
if snapshots:
982
self.cmd(["rsync", "--update", "-axH", '--timeout', rsync_timeout, '--max-size=%s'%max_rsync_size,
983
'--bwlimit', bwlimit, "-e", 'ssh -o StrictHostKeyChecking=no -p %s'%port,
984
"root@%s:%s/"%(remote, remote_bup_path),
985
self.bup_path+'/'
986
], ignore_errors=False)
987
988
return
989
990
if union2:
991
log("union stage 2: push back to form union")
992
self.cmd(['rsync', '--update', '-zsaxH', '--timeout', rsync_timeout, '--max-size=%s'%max_rsync_size, '--bwlimit', bwlimit, "--ignore-errors"] + self.exclude('') +
993
['-e', 'ssh -o StrictHostKeyChecking=no -p %s'%port,
994
self.project_mnt+'/',
995
"root@%s:%s/"%(remote, self.project_mnt)
996
], ignore_errors=True)
997
if snapshots:
998
self.cmd(["rsync", "--update", "-axH", '--timeout', rsync_timeout, '--max-size=%s'%max_rsync_size, '--bwlimit', bwlimit, "-e", 'ssh -o StrictHostKeyChecking=no -p %s'%port,
999
self.bup_path+'/',
1000
"root@%s:%s/"%(remote, remote_bup_path)
1001
], ignore_errors=False)
1002
return
1003
1004
1005
if os.path.exists(self.project_mnt):
1006
def f(ignore_errors):
1007
o = self.cmd(["rsync", "-zaxH", '--timeout', rsync_timeout, '--max-size=%s'%max_rsync_size, '--bwlimit', bwlimit, '--delete-excluded', "--delete", "--ignore-errors"] + self.exclude('') +
1008
['-e', 'ssh -o StrictHostKeyChecking=no -p %s'%port,
1009
self.project_mnt+'/', "root@%s:%s/"%(remote, self.project_mnt)], ignore_errors=True)
1010
# include only lines that don't contain any of the following errors, since permission denied errors are standard with
1011
# FUSE mounts, and there is no way to make rsync not report them (despite the -x option above).
1012
# TODO: This is horrible code since a different rsync version could break it.
1013
v = ('\n'.join([a for a in o.splitlines() if a.strip() and 'ERROR' not in a and 'to the list of known hosts' not in a and 'see previous errors' not in a and 'failed: Permission denied' not in a and 'Command exited with non-zero status' not in a])).strip()
1014
if ignore_errors:
1015
return v
1016
else:
1017
if v: # report the error
1018
raise RuntimeError(v)
1019
1020
e = f(ignore_errors=True)
1021
if QUOTAS_ENABLED and 'Disk quota exceeded' in e:
1022
self.cmd(["ssh", "-o", "StrictHostKeyChecking=no", '-p', port, 'root@'+remote,
1023
'zfs set userquota@%s=%sM %s/projects'%(
1024
self.uid, QUOTAS_OVERRIDE if QUOTAS_OVERRIDE else self.get_settings()['disk'], ZPOOL)])
1025
f(ignore_errors=False)
1026
elif e:
1027
raise RuntimeError(e)
1028
1029
if not snapshots:
1030
# nothing further to do -- we already sync'd the live files above, if we have any
1031
return
1032
1033
if destructive:
1034
log("push so that remote=local: easier; have to do this after a recompact (say)")
1035
self.cmd(["rsync", "-axH", '--delete-excluded', "--delete", '--timeout', rsync_timeout, '--max-size=%s'%max_rsync_size, '--bwlimit', bwlimit, "-e", 'ssh -o StrictHostKeyChecking=no -p %s'%port,
1036
self.bup_path+'/', "root@%s:%s/"%(remote, remote_bup_path)])
1037
return
1038
1039
log("get remote heads")
1040
out = self.cmd(["ssh", "-o", "StrictHostKeyChecking=no", '-p', port, 'root@'+remote,
1041
'grep -H \"\" %s/refs/heads/*'%remote_bup_path], ignore_errors=True)
1042
if 'such file or directory' in out:
1043
remote_heads = []
1044
else:
1045
if 'ERROR' in out:
1046
raise RuntimeError(out)
1047
remote_heads = []
1048
for x in out.splitlines():
1049
a, b = x.split(':')[-2:]
1050
remote_heads.append((os.path.split(a)[-1], b))
1051
log("sync from local to remote")
1052
self.cmd(["rsync", "-saxH", "-e", 'ssh -o StrictHostKeyChecking=no -p %s'%port, '--timeout', rsync_timeout, '--max-size=%s'%max_rsync_size, '--bwlimit', bwlimit,
1053
self.bup_path + '/', "root@%s:%s/"%(remote, remote_bup_path)])
1054
log("sync from remote back to local")
1055
# the -v is important below!
1056
back = self.cmd(["rsync", "-saxH", "-e", 'ssh -o StrictHostKeyChecking=no -p %s'%port, '--timeout', rsync_timeout, '--max-size=%s'%max_rsync_size, '--bwlimit', bwlimit,
1057
"root@%s:%s/"%(remote, remote_bup_path), self.bup_path + "/"]).splitlines()
1058
if remote_heads and len([x for x in back if x.endswith('.pack')]) > 0:
1059
log("there were remote packs possibly not available locally, so make tags that points to them")
1060
# so user can get their files if anything important got overwritten.
1061
tag = None
1062
for branch, id in remote_heads:
1063
# have we ever seen this commit?
1064
c = "%s/logs/refs/heads/%s"%(self.bup_path,branch)
1065
if not os.path.exists(c) or id not in open(c).read():
1066
log("nope, never seen %s -- tag it."%branch)
1067
tag = 'conflict-%s-%s'%(branch, time.strftime("%Y-%m-%d-%H%M%S"))
1068
path = os.path.join(self.bup_path, 'refs', 'tags', tag)
1069
open(path,'w').write(id)
1070
if tag is not None:
1071
log("sync back any tags")
1072
self.cmd(["rsync", "-saxH", "-e", 'ssh -o StrictHostKeyChecking=no -p %s'%port,
1073
'--timeout', rsync_timeout, '--max-size=%s'%max_rsync_size, '--bwlimit', bwlimit, self.bup_path+'/', 'root@'+remote+'/'])
1074
1075
def mount_remote(self, remote_host, project_id, mount_point='', remote_path='', read_only=False):
1076
"""
1077
Make it so /projects/project_id/remote_path (which is on the remote host)
1078
appears as a local directory at /projects/project_id/mount_point.
1079
"""
1080
log = self._log('mount_remote')
1081
log("mounting..")
1082
1083
if not remote_host:
1084
raise RuntimeError("remote_host must be specified")
1085
try:
1086
u = uuid.UUID(project_id)
1087
assert u.get_version() == 4
1088
project_id = str(u)
1089
except (AssertionError, ValueError):
1090
raise RuntimeError("invalid project_id='%s'"%project_id)
1091
1092
if not mount_point:
1093
m = os.path.join('projects', project_id, remote_path)
1094
else:
1095
m = mount_point.lstrip('/')
1096
mount_point = os.path.join(self.project_mnt, m)
1097
1098
# If the point is already fuse or otherwise mounted but broken, then the os.path.exists(mount_point)
1099
# below returns false, etc. So we always first unmount it, to start cleanly.
1100
try:
1101
self.umount_remote(mount_point)
1102
except RuntimeError:
1103
pass
1104
1105
if not os.path.exists(mount_point):
1106
log("creating mount point")
1107
self.makedirs(mount_point)
1108
elif not os.path.isdir(mount_point):
1109
raise ValueError("mount_point='%s' must be a directory"%mount_point)
1110
1111
remote_projects = "/projects-%s"%remote_host
1112
e = self.cmd(['stat', '-f', '-c', '%T', remote_projects], ignore_errors=True)
1113
if e != 'fuseblk':
1114
if 'endpoint is not connected' in e:
1115
self.cmd(["fusermount", "-z", "-u", remote_projects])
1116
log("mount the remote /projects filesystem using sshfs")
1117
if not os.path.exists(remote_projects):
1118
os.makedirs(remote_projects)
1119
self.cmd(['sshfs', remote_host + ':' + PROJECTS_PATH, remote_projects])
1120
1121
remote_path = os.path.join(remote_projects, project_id)
1122
1123
log("binding %s to %s"%(remote_path, mount_point))
1124
self.cmd(['bindfs'] + (['-o', 'ro'] if read_only else []) +
1125
['--create-for-user=%s'%uid(project_id), '--create-for-group=%s'%uid(project_id),
1126
'-u', self.uid, '-g', self.gid, remote_path, mount_point])
1127
1128
def umount_remote(self, mount_point):
1129
# the -z forces unmount even if filesystem is busy
1130
self.cmd(["fusermount", "-z", "-u", os.path.join(self.project_mnt, mount_point)])
1131
1132
def mkdir(self, path): # relative path in project; must resolve to be under PROJECTS_PATH/project_id
1133
project_id = self.project_id
1134
project_path = os.path.join(PROJECTS_PATH, project_id)
1135
abspath = os.path.abspath(os.path.join(project_path, path))
1136
if not abspath.startswith(project_path):
1137
raise RuntimeError("path (=%s) must be contained in project path %s"%(path, project_path))
1138
if not os.path.exists(abspath):
1139
self.makedirs(abspath)
1140
1141
def copy_path(self,
1142
path, # relative path to copy; must resolve to be under PROJECTS_PATH/project_id
1143
target_hostname, # list of hostnames (foo or foo:port) to copy files to
1144
target_project_id, # project_id of destination for files
1145
target_path="", # path into project; if "", defaults to path above.
1146
overwrite_newer=False, # if True, newer files in target are copied over (otherwise, uses rsync's --update)
1147
delete=False, # if True, delete files in dest path not in source
1148
rsync_timeout=120,
1149
bwlimit=BWLIMIT
1150
):
1151
"""
1152
Copy a path (directory or file) from one project to another.
1153
"""
1154
#NOTES:
1155
# 1. We assume that PROJECTS_PATH is constant across all machines.
1156
# 2. We do the rsync, then change permissions. This is either annoying or a feature,
1157
# depending on your perspective, since it means the files
1158
# aren't accessible until the copy completes.
1159
1160
log = self._log("copy_path")
1161
1162
if not target_hostname:
1163
raise RuntimeError("the target hostname must be specified")
1164
if not target_path:
1165
target_path = path
1166
1167
# check that both UUID's are valid -- these will raise exception if there is a problem.
1168
check_uuid(target_project_id)
1169
1170
project_id = self.project_id
1171
1172
# parse out target rsync port, if necessary
1173
if ':' in target_hostname:
1174
target_hostname, target_port = target_hostname.split(':')
1175
else:
1176
target_port = '22'
1177
1178
log("check that target is working (has ZFS mounts etc)")
1179
if not self.remote_is_ready(target_hostname, target_port):
1180
raise RuntimeError("remote machine %s:%s not ready to receive copy of path"%(target_hostname, target_port))
1181
1182
# determine canonical absolute path to source
1183
project_path = os.path.join(PROJECTS_PATH, project_id)
1184
src_abspath = os.path.abspath(os.path.join(project_path, path))
1185
if not src_abspath.startswith(project_path):
1186
raise RuntimeError("source path must be contained in project path %s"%project_path)
1187
1188
# determine canonical absolute path to target
1189
target_project_path = os.path.join(PROJECTS_PATH, target_project_id)
1190
target_abspath = os.path.abspath(os.path.join(target_project_path, target_path))
1191
if not target_abspath.startswith(target_project_path):
1192
raise RuntimeError("target path must be contained in target project path %s"%target_project_path)
1193
1194
if os.path.isdir(src_abspath):
1195
src_abspath += '/'
1196
target_abspath += '/'
1197
1198
# handle options
1199
options = []
1200
if not overwrite_newer:
1201
options.append("--update")
1202
if delete:
1203
options.append("--delete")
1204
1205
u = uid(target_project_id)
1206
try:
1207
# do the rsync
1208
v = (['rsync'] + options +
1209
['-zsax', # compressed, archive mode (so leave symlinks, etc.), don't cross filesystem boundaries
1210
'--timeout', rsync_timeout,
1211
'--bwlimit', bwlimit,
1212
'--chown=%s:%s'%(u,u),
1213
"--ignore-errors"] +
1214
self.exclude('') +
1215
['-e', 'ssh -o StrictHostKeyChecking=no -p %s'%target_port,
1216
src_abspath,
1217
"%s:%s"%(target_hostname, target_abspath),
1218
])
1219
self.cmd(v)
1220
except Exception, mesg:
1221
log("rsync error: %s", mesg)
1222
raise
1223
1224
1225
# path = relative path in project; *must* resolve to be under PROJECTS_PATH/project_id or get an error.
1226
def directory_listing(self, path, hidden=True, time=True, start=0, limit=-1):
1227
project_id = self.project_id
1228
project_path = os.path.join(PROJECTS_PATH, project_id)
1229
abspath = os.path.abspath(os.path.join(project_path, path))
1230
if not abspath.startswith(project_path):
1231
raise RuntimeError("path (=%s) must be contained in project path %s"%(path, project_path))
1232
def get_file_mtime(name):
1233
try:
1234
# use lstat instead of stat or getmtime so this works on broken symlinks!
1235
return int(round(os.lstat(os.path.join(abspath, name)).st_mtime))
1236
except:
1237
# ?? This should never happen ??
1238
return 0
1239
1240
def get_file_size(name):
1241
try:
1242
# same as above; use instead of os.path....
1243
return os.lstat(os.path.join(abspath, name)).st_size
1244
except:
1245
return -1
1246
1247
1248
listdir = os.listdir(abspath)
1249
result = {}
1250
if not hidden:
1251
listdir = [x for x in listdir if not x.startswith('.')]
1252
1253
# Get list of (name, timestamp) pairs
1254
all = [(name, get_file_mtime(name)) for name in listdir]
1255
1256
if time:
1257
# sort by time first with bigger times first, then by filename in normal order
1258
def f(a,b):
1259
if a[1] > b[1]:
1260
return -1
1261
elif a[1] < b[1]:
1262
return 0
1263
else:
1264
return cmp(a[0],b[0])
1265
all.sort(f)
1266
else:
1267
all.sort() # usual sort is fine
1268
1269
# Limit and convert to objects
1270
all = all[start:]
1271
if limit > 0 and len(all) > limit:
1272
result['more'] = True
1273
all = all[:limit]
1274
1275
1276
files = dict([(name, {'name':name, 'mtime':mtime}) for name, mtime in all])
1277
sorted_names = [x[0] for x in all]
1278
1279
# Fill in other OS information about each file
1280
#for obj in result:
1281
for name, info in files.iteritems():
1282
if os.path.isdir(os.path.join(abspath, name)):
1283
info['isdir'] = True
1284
else:
1285
info['size'] = get_file_size(name)
1286
1287
1288
result['files'] = [files[name] for name in sorted_names]
1289
return result
1290
1291
1292
# Filename *must* resolve to be under PROJECTS_PATH/project_id or get an error; and it
1293
# must have size in bytes less than the given limit
1294
# -- to download the directory blah/foo, request blah/foo.zip
1295
def read_file(self, path, maxsize_bytes):
1296
project_id = self.project_id
1297
project_path = os.path.join(PROJECTS_PATH, project_id)
1298
abspath = os.path.abspath(os.path.join(project_path, path))
1299
base, ext = os.path.splitext(abspath)
1300
if not abspath.startswith(project_path):
1301
raise RuntimeError("path (=%s) must be contained in project path %s"%(path, project_path))
1302
if not os.path.exists(abspath):
1303
if ext != '.zip':
1304
raise RuntimeError("path (=%s) does not exist"%path)
1305
else:
1306
if os.path.exists(base) and os.path.isdir(base):
1307
abspath = os.path.splitext(abspath)[0]
1308
else:
1309
raise RuntimeError("path (=%s) does not exist and neither does"%(path, base))
1310
1311
filename = os.path.split(abspath)[-1]
1312
if os.path.isfile(abspath):
1313
# read a regular file
1314
size = os.lstat(abspath).st_size
1315
if size > maxsize_bytes:
1316
raise RuntimeError("path (=%s) must be at most %s bytes, but it is %s bytes"%(path, maxsize_bytes, size))
1317
return open(abspath).read()
1318
else:
1319
# create a zip file in memory from a directory tree
1320
# REFERENCES:
1321
# - http://stackoverflow.com/questions/1855095/how-to-create-a-zip-archive-of-a-directory
1322
# - https://support.google.com/accounts/answer/6135882
1323
import zipfile
1324
from cStringIO import StringIO
1325
output = StringIO()
1326
relroot = os.path.abspath(os.path.join(abspath, os.pardir))
1327
1328
size = 0
1329
zip = zipfile.ZipFile(output, "w", zipfile.ZIP_DEFLATED, False)
1330
for root, dirs, files in os.walk(abspath):
1331
# add directory (needed for empty dirs)
1332
zip.write(root, os.path.relpath(root, relroot))
1333
for file in files:
1334
filename = os.path.join(root, file)
1335
if os.path.isfile(filename): # regular files only
1336
size += os.lstat(filename).st_size
1337
if size > maxsize_bytes:
1338
raise RuntimeError("path (=%s) must be at most %s bytes, but it is at least %s bytes"%(path, maxsize_bytes, size))
1339
arcname = os.path.join(os.path.relpath(root, relroot), file)
1340
zip.write(filename, arcname)
1341
1342
# Mark the files as having been created on Windows so that
1343
# Unix permissions are not inferred as 0000.
1344
for zfile in zip.filelist:
1345
zfile.create_system = 0
1346
zip.close()
1347
return output.getvalue()
1348
1349
def archive(self):
1350
"""
1351
Create tar archive from the bup repo associated to this project.
1352
1353
Verifies that the bup repo at least shows the directory listing for
1354
master, or gives an error otherwise.
1355
"""
1356
t0 = time.time()
1357
if not os.path.exists(ARCHIVE_PATH):
1358
raise RuntimeError("Create/mount the directory %s"%ARCHIVE_PATH)
1359
1360
target = self.archive_path
1361
mtime = self.last_touch_time()
1362
if os.path.exists(target):
1363
# check to see if the target is already up to date.
1364
if int(round(os.path.getmtime(target))) >= mtime:
1365
# archive is already newer than the last touch time, so nothing to do.
1366
return {'filename':target, 'status':'ok', 'note':'repo has not changed since last archive', 'action':'nothing'}
1367
1368
heads = os.path.join(self.bup_path,'refs','heads')
1369
if os.path.exists(heads) and len(os.listdir(heads)) > 0:
1370
# There has been at least one save/commit, so we
1371
# at least check that bup repo ls works on the current branch.
1372
try:
1373
self.cmd(["/usr/bin/bup", "ls", self.branch+"/latest"], verbose=0)
1374
except Exception, mesg:
1375
raise RuntimeError("basic bup consistency test failed -- %s"%mesg)
1376
1377
containing_path, path = os.path.split(self.bup_path)
1378
cwd = os.getcwd()
1379
try:
1380
os.chdir(containing_path)
1381
target0 = os.path.join(ARCHIVE_PATH, ".%s.tar"%self.project_id)
1382
try:
1383
self.cmd(['tar', '-cf', target0,
1384
'--exclude', "%s/cache"%path,
1385
path])
1386
shutil.move(target0, target)
1387
os.utime(target, (mtime, mtime)) # set timestamp to last touch time
1388
finally:
1389
# don't leave a half-crap tarball around
1390
if os.path.exists(target0):
1391
os.unlink(target0)
1392
return {'filename':target, 'status':'ok', 'time_s':time.time()-t0, 'action':'tar'}
1393
finally:
1394
os.chdir(cwd)
1395
1396
def dearchive(self):
1397
"""
1398
Extract project from archive tar file.
1399
1400
- extracts bup repo from tarball
1401
- extracts projects/project_id from bup repo
1402
"""
1403
log = self._log("dearchive")
1404
t0 = time.time()
1405
source = os.path.join(ARCHIVE_PATH, "%s.tar"%self.project_id)
1406
if not os.path.exists(source):
1407
raise RuntimeError("Missing source archive %s"%source)
1408
1409
containing_path, path = os.path.split(self.bup_path)
1410
cwd = os.getcwd()
1411
try:
1412
os.chdir(containing_path)
1413
log("extracting bup repository from tarball")
1414
self.cmd(['tar', '-xf', source])
1415
if os.path.exists(self.project_mnt):
1416
log("removing existing project directory")
1417
self.delete_project()
1418
self.cmd(['/usr/bin/bup', 'restore', '%s/latest/'%self.branch, '--outdir', self.project_mnt])
1419
#self.chown(self.project_mnt)
1420
return {'status':'ok', 'time_s':t0-time.time()}
1421
finally:
1422
os.chdir(cwd)
1423
1424
def gs_stat(self):
1425
"""
1426
Returns stat info as a JSON object, or empty object if there is no such object.
1427
"""
1428
r = {}
1429
key = None
1430
try:
1431
for x in self.cmd(['gsutil','stat', self.gs_path], verbose=0).splitlines():
1432
v = x.split(':')
1433
if len(v) == 2:
1434
if v[0].startswith('\t\t') and key:
1435
r[key][v[0].strip()] = v[1].strip()
1436
else:
1437
key = v[0].strip()
1438
val = v[1].strip()
1439
if not val:
1440
val = {}
1441
r[key] = val
1442
return r
1443
except RuntimeError, mesg:
1444
if "no url" in str(mesg).lower():
1445
return {}
1446
else:
1447
raise
1448
1449
def gs_upload_archive(self):
1450
"""
1451
Upload archive to google cloud storage, assuming archive exists
1452
"""
1453
log = self._log("gs_upload_archive")
1454
t = time.time()
1455
log("uploading to google cloud storage")
1456
self.cmd(['gsutil',
1457
'-h', "x-goog-meta-mtime:%s"%int(round(os.path.getmtime(self.archive_path))),
1458
'cp', self.archive_path, self.gs_path])
1459
log("upload time=%s"%(time.time()-t))
1460
1461
def gs_download_archive(self, mtime):
1462
"""
1463
Download archive from google cloud storage to local.
1464
"""
1465
log = self._log("gs_download_archive")
1466
t = time.time()
1467
log("downloading from google cloud storage")
1468
self.cmd(['gsutil',
1469
'cp', self.gs_path, self.archive_path])
1470
os.utime(self.archive_path, (mtime, mtime))
1471
log("download time=%s"%(time.time()-t))
1472
1473
def mtimes(self):
1474
"""
1475
Return modification times of live, google cloud storage, and archive.
1476
1477
NOTE: slow and perfect to do in parallel... (node.js rewrite?)
1478
"""
1479
log = self._log("mtimes")
1480
t0 = time.time()
1481
1482
# Determine archive time.
1483
archive = os.path.join(ARCHIVE_PATH, "%s.tar"%self.project_id)
1484
if not os.path.exists(archive):
1485
archive_mtime = 0
1486
else:
1487
archive_mtime = int(round(os.path.getmtime(archive)))
1488
log("archive_mtime=%s"%archive_mtime)
1489
1490
# Determine live time.
1491
if os.path.exists(self.touch_file):
1492
live_mtime = int(round(os.path.getmtime(self.touch_file)))
1493
else:
1494
live_mtime = 0
1495
log("live_mtime=%s"%live_mtime)
1496
1497
# Determine gcloud last write time using metadata.
1498
gs_mtime = int(round(float(self.gs_stat().get("Metadata",{}).get('mtime','0'))))
1499
log("gs_mtime=%s"%gs_mtime)
1500
1501
log("total time=%s"%(time.time()-t0))
1502
return {'archive_mtime':archive_mtime, 'live_mtime':live_mtime, 'gs_mtime':gs_mtime}
1503
1504
def gs_sync(self):
1505
"""
1506
Synchronize Google Cloud Storage (gs), ARCHIVE_PATH tarball, and live bup
1507
repo on this machine.
1508
1509
Determines which is newer, then takes steps to synchronize the others
1510
1511
- live : generate archive and copy to gcloud.
1512
- gs : copy to local archive, then extract to live
1513
- archive : copy to google cloud storage
1514
"""
1515
log = self._log("gs_sync")
1516
t0 = time.time()
1517
1518
# get last modification times for each
1519
mtimes = self.mtimes()
1520
archive_mtime = mtimes['archive_mtime']
1521
live_mtime = mtimes['live_mtime']
1522
gs_mtime = mtimes['gs_mtime']
1523
1524
newest_mtime = max(archive_mtime, live_mtime, gs_mtime)
1525
if not newest_mtime:
1526
log("nothing to do -- no data")
1527
return {'status':'ok'}
1528
1529
if archive_mtime == newest_mtime:
1530
log("archive is newest")
1531
if archive_mtime > live_mtime:
1532
log("extract to live")
1533
self.dearchive()
1534
live_mtime = archive_mtime
1535
if archive_mtime > gs_mtime:
1536
log("upload to google cloud storage")
1537
self.gs_upload_archive()
1538
gs_mtime = archive_mtime
1539
elif live_mtime == newest_mtime:
1540
log("live is newest")
1541
if live_mtime > archive_mtime:
1542
log("make an archive")
1543
self.archive()
1544
archive_mtime = live_mtime
1545
if live_mtime > gs_mtime:
1546
self.gs_upload_archive()
1547
gs_mtime = live_mtime
1548
elif gs_mtime == newest_mtime:
1549
log("google cloud storage is newest")
1550
if gs_mtime > archive_mtime:
1551
log("download from google cloud storage")
1552
self.gs_download_archive(gs_mtime)
1553
archive_mtime = gs_mtime
1554
if archive_mtime > live_mtime:
1555
log("extract to live")
1556
self.dearchive()
1557
live_mtime = archive_mtime
1558
log("after operations, mtime of archive=%s, live=%s, gs=%s"%(archive_mtime, live_mtime, gs_mtime))
1559
return {'status':'ok'}
1560
1561
def gs_sync_all():
1562
# Must use this by typing
1563
# bup_storage.py gs_sync_all ""
1564
# since I can't get var args parsing to work.
1565
log("gs_sync_all")
1566
v = os.listdir(BUP_PATH)
1567
v.sort()
1568
i = 1
1569
t0 = time.time()
1570
fail = {}
1571
for project_id in v:
1572
if i > 1:
1573
avg = (time.time()-t0)/(i-1)
1574
est = int((len(v)-(i-1))*avg)
1575
if est < 60:
1576
est = "%s seconds"%est
1577
else:
1578
minutes = est//60
1579
hours = minutes//60
1580
est = "%s hours and %s minutes"%(hours, minutes-hours*60)
1581
else:
1582
est = "unknown"
1583
log("gs_sync_all -- %s/%s: %s (est time remaining: %s)"%(i,len(v),project_id,est))
1584
i += 1
1585
try:
1586
t1 = time.time()
1587
r = Project(project_id=project_id).gs_sync()
1588
log(r)
1589
except Exception, mesg:
1590
fail[project_id] = mesg
1591
result = {'total_s':time.time()-t0}
1592
1593
if len(fail) > 0:
1594
result['status'] = 'fail'
1595
result['fail'] = fail
1596
else:
1597
result['status'] = 'ok'
1598
1599
return result
1600
1601
def archive_all(fast_io=False):
1602
# Must use this by typing
1603
# bup_storage.py archive_all ""
1604
# since I can't get var args parsing to work.
1605
log("archive_all")
1606
v = os.listdir(BUP_PATH)
1607
v.sort()
1608
i = 1
1609
t0 = time.time()
1610
fail = {}
1611
for project_id in v:
1612
if i > 1:
1613
avg = (time.time()-t0)/(i-1)
1614
est = int((len(v)-(i-1))*avg)
1615
if est < 60:
1616
est = "%s seconds"%est
1617
else:
1618
minutes = est//60
1619
hours = minutes//60
1620
est = "%s hours and %s minutes"%(hours, minutes-hours*60)
1621
else:
1622
est = "unknown"
1623
log("archive_all -- %s/%s: %s (est time remaining: %s)"%(i,len(v),project_id,est))
1624
i += 1
1625
try:
1626
t1 = time.time()
1627
r = Project(project_id=project_id).archive()
1628
if r.get('action') == "tar":
1629
log(r)
1630
if not fast_io:
1631
# TODO: this is probably only necessary because of ZFS -- remove when we
1632
# go all ext4...
1633
s = 0.1 + (time.time() - t1)*2
1634
log("sleeping %s seconds to let slow IO catch up"%s)
1635
time.sleep(s)
1636
except Exception, mesg:
1637
fail[project_id] = mesg
1638
result = {'total_s':time.time()-t0}
1639
1640
if len(fail) > 0:
1641
result['status'] = 'fail'
1642
result['fail'] = fail
1643
else:
1644
result['status'] = 'ok'
1645
1646
return result
1647
1648
1649
if __name__ == "__main__":
1650
1651
parser = argparse.ArgumentParser(description="Bup-backed SMC project storage system")
1652
subparsers = parser.add_subparsers(help='sub-command help')
1653
1654
parser.add_argument("--zpool", help="the ZFS pool that has bup/projects in it", dest="zpool", default=ZPOOL, type=str)
1655
1656
parser_init = subparsers.add_parser('init', help='init project repo and directory')
1657
parser_init.set_defaults(func=lambda args: project.init())
1658
1659
parser_start = subparsers.add_parser('start', help='create user and setup the ~/.sagemathcloud filesystem')
1660
parser_start.set_defaults(func=lambda args: project.start())
1661
1662
parser_status = subparsers.add_parser('status', help='get status of servers running in the project')
1663
parser_status.add_argument("--running", help="if given only return running part of status (easier to compute)",
1664
dest="running", default=False, action="store_const", const=True)
1665
def print_status(running):
1666
print json.dumps(project.status(running=running))
1667
parser_status.set_defaults(func=lambda args: print_status(args.running))
1668
1669
parser_stop = subparsers.add_parser('stop', help='Kill all processes running as this user and delete user.')
1670
parser_stop.add_argument("--only_if_idle", help="only actually stop the project if the project is idle long enough",
1671
dest="only_if_idle", default=False, action="store_const", const=True)
1672
parser_stop.set_defaults(func=lambda args: project.stop(only_if_idle=args.only_if_idle))
1673
1674
parser_restart = subparsers.add_parser('restart', help='restart servers')
1675
parser_restart.set_defaults(func=lambda args: project.restart())
1676
1677
def do_save(*args, **kwds):
1678
print json.dumps(project.save(*args, **kwds))
1679
parser_save = subparsers.add_parser('save', help='save a snapshot then sync everything out')
1680
parser_save.add_argument("--targets", help="if given, a comma separated ip addresses of computers to replicate to NOT including the current machine", dest="targets", default="", type=str)
1681
parser_save.add_argument("--branch", dest="branch", help="save to specified branch (default: whatever current branch is); will change to that branch if different", type=str, default='')
1682
parser_save.set_defaults(func=lambda args: do_save(branch=args.branch, targets=args.targets))
1683
1684
def do_sync(*args, **kwds):
1685
status = project.sync(*args, **kwds)
1686
print json.dumps(status)
1687
parser_sync = subparsers.add_parser('sync', help='sync with all replicas')
1688
parser_sync.add_argument("--targets", help="REQUIRED: a comma separated ip addresses of computers to replicate to NOT including the current machine", dest="targets", default="", type=str)
1689
parser_sync.add_argument("--destructive", help="sync, destructively overwriting all remote replicas (DANGEROUS)",
1690
dest="destructive", default=False, action="store_const", const=True)
1691
parser_sync.add_argument("--snapshots", help="include snapshots in sync",
1692
dest="snapshots", default=False, action="store_const", const=True)
1693
parser_sync.add_argument("--union", help="make it so bup and working directories on all replicas are the SAME (the union of newest files); this CAN loose particular bup snapshots",
1694
dest="union", default=False, action="store_const", const=True)
1695
parser_sync.set_defaults(func=lambda args: do_sync(targets = args.targets,
1696
destructive = args.destructive,
1697
snapshots = args.snapshots,
1698
union = args.union))
1699
1700
def do_copy_path(*args, **kwds):
1701
try:
1702
project.copy_path(*args, **kwds)
1703
except Exception, mesg:
1704
print json.dumps({"error":str(mesg)})
1705
raise
1706
else:
1707
print json.dumps({"ok":True})
1708
parser_copy_path = subparsers.add_parser('copy_path', help='copy a path from one project to another')
1709
parser_copy_path.add_argument("--target_hostname", help="REQUIRED: hostname of target machine for copy",
1710
dest="target_hostname", default='', type=str)
1711
parser_copy_path.add_argument("--target_project_id", help="REQUIRED: id of target project",
1712
dest="target_project_id", default="", type=str)
1713
parser_copy_path.add_argument("--path", help="relative path or filename in project",
1714
dest="path", default='', type=str)
1715
parser_copy_path.add_argument("--target_path", help="relative path into target project (defaults to --path)",
1716
dest="target_path", default='', type=str)
1717
parser_copy_path.add_argument("--overwrite_newer", help="if given, newer files in target are copied over",
1718
dest="overwrite_newer", default=False, action="store_const", const=True)
1719
parser_copy_path.add_argument("--delete", help="if given, delete files in dest path not in source",
1720
dest="delete", default=False, action="store_const", const=True)
1721
parser_copy_path.set_defaults(func=lambda args: do_copy_path(
1722
path = args.path,
1723
target_hostname = args.target_hostname,
1724
target_project_id = args.target_project_id,
1725
target_path = args.target_path,
1726
overwrite_newer = args.overwrite_newer,
1727
delete = args.delete,
1728
))
1729
1730
def do_remote_is_ready(remote):
1731
ans = {}
1732
try:
1733
for x in remote.split(','):
1734
v = x.split(':')
1735
remote = v[0]
1736
if len(v) == 2:
1737
port = v[1]
1738
else:
1739
port = '22'
1740
ans[x] = project.remote_is_ready(remote=remote, port=port)
1741
except Exception, mesg:
1742
print json.dumps({"error":str(mesg)})
1743
raise
1744
else:
1745
print json.dumps(ans)
1746
1747
parser_remote_is_ready = subparsers.add_parser('remote_is_ready', help='check that remote servers are working; ip_address:port,ip_address:port,...; the project_id is ignored!')
1748
parser_remote_is_ready.add_argument("--remote", help="REQUIRED: hostnames:ports of remote machine",
1749
dest="remote", default='', type=str)
1750
parser_remote_is_ready.set_defaults(func=lambda args: do_remote_is_ready(args.remote))
1751
1752
1753
def do_mkdir(*args, **kwds):
1754
try:
1755
project.mkdir(*args, **kwds)
1756
except Exception, mesg:
1757
print json.dumps({"error":str(mesg)})
1758
raise
1759
else:
1760
print json.dumps({"ok":True})
1761
parser_mkdir = subparsers.add_parser('mkdir', help='make a path in a project')
1762
parser_mkdir.add_argument("--path", help="relative path in project", dest="path", default='', type=str)
1763
parser_mkdir.set_defaults(func=lambda args: do_mkdir(path = args.path))
1764
1765
1766
def do_directory_listing(*args, **kwds):
1767
try:
1768
print json.dumps(project.directory_listing(*args, **kwds))
1769
except Exception, mesg:
1770
print json.dumps({"error":str(mesg)})
1771
raise
1772
parser_directory_listing = subparsers.add_parser('directory_listing', help='list files (and info about them) in a directory in the project')
1773
parser_directory_listing.add_argument("--path", help="relative path in project", dest="path", default='', type=str)
1774
parser_directory_listing.add_argument("--hidden", help="if given, show hidden files",
1775
dest="hidden", default=False, action="store_const", const=True)
1776
parser_directory_listing.add_argument("--time", help="if given, sort by time with newest first",
1777
dest="time", default=False, action="store_const", const=True)
1778
parser_directory_listing.add_argument("--start", help="return only part of listing starting with this position (default: 0)",
1779
dest="start", default=0, type=int)
1780
parser_directory_listing.add_argument("--limit", help="if given, only return this many directory entries (default: -1)",
1781
dest="limit", default=-1, type=int)
1782
1783
parser_directory_listing.set_defaults(func=lambda args: do_directory_listing(path = args.path, hidden=args.hidden, time=args.time, start=args.start, limit=args.limit))
1784
1785
1786
def do_read_file(path, maxsize):
1787
try:
1788
print json.dumps({'base64':base64.b64encode(project.read_file(path, maxsize))})
1789
except Exception, mesg:
1790
print json.dumps({"error":str(mesg)})
1791
raise
1792
1793
parser_read_file = subparsers.add_parser('read_file',
1794
help="read a file/directory from disk; outputs {'base64':'..content in base64..'}; use directory.zip to get directory/ as a zip")
1795
parser_read_file.add_argument("--path", help="relative path of a file/directory in project (required)", dest="path", type=str)
1796
parser_read_file.add_argument("--maxsize", help="maximum file size in bytes to read; any bigger and instead give an error",
1797
dest="maxsize", default=3000000, type=int)
1798
1799
parser_read_file.set_defaults(func=lambda args: do_read_file(path = args.path, maxsize=args.maxsize))
1800
1801
1802
parser_settings = subparsers.add_parser('settings', help='set settings for this user; also outputs settings in JSON')
1803
parser_settings.add_argument("--memory", dest="memory", help="memory settings in gigabytes",
1804
type=int, default=None)
1805
parser_settings.add_argument("--cpu_shares", dest="cpu_shares", help="shares of the cpu",
1806
type=int, default=None)
1807
parser_settings.add_argument("--cores", dest="cores", help="max number of cores (may be float)",
1808
type=float, default=None)
1809
parser_settings.add_argument("--disk", dest="disk", help="working disk space in megabytes", type=int, default=None)
1810
parser_settings.add_argument("--network", dest="network", help="whether or not project has external network access", type=str, default=None)
1811
parser_settings.add_argument("--mintime", dest="mintime", help="minimum time in seconds before this project is automatically stopped if not saved", type=int, default=None)
1812
parser_settings.add_argument("--scratch", dest="scratch", help="scratch disk space in megabytes", type=int, default=None)
1813
parser_settings.add_argument("--inode", dest="inode", help="inode settings", type=int, default=None)
1814
parser_settings.add_argument("--login_shell", dest="login_shell", help="the login shell used when creating user", default=None, type=str)
1815
parser_settings.set_defaults(func=lambda args: project.settings(
1816
memory=args.memory, cpu_shares=args.cpu_shares,
1817
cores=args.cores, disk=args.disk, inode=args.inode, scratch=args.scratch,
1818
login_shell=args.login_shell, mintime=args.mintime, network=args.network))
1819
1820
parser_mount_remote = subparsers.add_parser('mount_remote',
1821
help='Make it so /projects/project_id/remote_path (which is on the remote host) appears as a local directory at /projects/project_id/mount_point with ownership dynamically mapped so that the files appear owned by both projects (as they should).')
1822
parser_mount_remote.add_argument("--remote_host", help="", dest="remote_host", default="", type=str)
1823
parser_mount_remote.add_argument("--project_id", help="", dest="remote_project_id", default="", type=str)
1824
parser_mount_remote.add_argument("--mount_point", help="", dest="mount_point", default="", type=str)
1825
parser_mount_remote.add_argument("--remote_path", help="", dest="remote_path", default="", type=str)
1826
parser_mount_remote.add_argument("--read_only", help="", dest="read_only", default=False, action="store_const", const=True)
1827
parser_mount_remote.set_defaults(func=lambda args: project.mount_remote(
1828
remote_host = args.remote_host,
1829
project_id = args.remote_project_id,
1830
mount_point = args.mount_point,
1831
remote_path = args.remote_path,
1832
read_only = args.read_only)
1833
)
1834
1835
parser_chown = subparsers.add_parser('chown', help="Ensure all files in the project have the correct owner and group.")
1836
parser_chown.set_defaults(func=lambda args: project.chown_all())
1837
1838
parser_umount_remote = subparsers.add_parser('umount_remote')
1839
parser_umount_remote.add_argument("--mount_point", help="", dest="mount_point", default="", type=str)
1840
parser_umount_remote.set_defaults(func=lambda args: project.umount_remote(
1841
mount_point = args.mount_point))
1842
1843
1844
parser_tag = subparsers.add_parser('tag', help='tag the *latest* commit to master, or delete a tag')
1845
parser_tag.add_argument("tag", help="tag name", type=str)
1846
parser_tag.add_argument("--delete", help="delete the given tag",
1847
dest="delete", default=False, action="store_const", const=True)
1848
parser_tag.set_defaults(func=lambda args: project.tag(tag=args.tag, delete=args.delete))
1849
1850
1851
def do_archive():
1852
try:
1853
print json.dumps(project.archive()) # {'filename':'%s/project_id.tar'%ARCHIVE_PATH, 'status':'ok'}
1854
except Exception, mesg:
1855
print json.dumps({"error":str(mesg), 'status':'error'})
1856
raise
1857
1858
parser_archive = subparsers.add_parser('archive',
1859
help="creates single archive file containing the bup repo associated to this project")
1860
parser_archive.set_defaults(func=lambda args: do_archive())
1861
1862
def do_dearchive():
1863
try:
1864
print json.dumps(project.dearchive()) # {status':'ok'}
1865
except Exception, mesg:
1866
print json.dumps({"error":str(mesg), 'status':'error'})
1867
raise
1868
1869
parser_dearchive = subparsers.add_parser('dearchive',
1870
help="extract project from archive")
1871
parser_dearchive.set_defaults(func=lambda args: do_dearchive())
1872
1873
def do_gs_sync(*args, **kwds):
1874
try:
1875
print json.dumps(project.gs_sync())
1876
except Exception, mesg:
1877
print json.dumps({"error":str(mesg), 'status':'error'})
1878
raise
1879
1880
parser_gs_sync = subparsers.add_parser('gs_sync',
1881
help="sync project between live, google cloud, and archive")
1882
parser_gs_sync.set_defaults(func=do_gs_sync)
1883
1884
if UNSAFE_MODE:
1885
parser_destroy = subparsers.add_parser('destroy', help='**DANGEROUS**: Delete all traces of live project from this machine (does not delete archive if there).')
1886
parser_destroy.set_defaults(func=lambda args: project.destroy())
1887
1888
parser_snapshots = subparsers.add_parser('snapshots', help='output JSON list of snapshots of current branch')
1889
parser_snapshots.add_argument("--branch", dest="branch", help="show for given branch (by default the current one)", type=str, default='')
1890
parser_snapshots.set_defaults(func=lambda args: print_json(project.snapshots(branch=args.branch)))
1891
1892
parser_branches = subparsers.add_parser('branches', help='output JSON {branches:[list of branches], branch:"name"}')
1893
parser_branches.set_defaults(func=lambda args: print_json(project.branches()))
1894
1895
parser_checkout = subparsers.add_parser('checkout', help='checkout snapshot of project to working directory (DANGEROUS)')
1896
parser_checkout.add_argument("--snapshot", dest="snapshot", help="which tag or snapshot to checkout (default: latest)", type=str, default='latest')
1897
parser_checkout.add_argument("--branch", dest="branch", help="branch to checkout (default: whatever current branch is)", type=str, default='')
1898
parser_checkout.set_defaults(func=lambda args: project.checkout(snapshot=args.snapshot, branch=args.branch))
1899
1900
def do_archive_all():
1901
try:
1902
print json.dumps(archive_all())
1903
except Exception, mesg:
1904
print json.dumps({"error":str(mesg), 'status':'error'})
1905
raise
1906
1907
parser_archive_all = subparsers.add_parser('archive_all',
1908
help="archive every project hosted on this machine")
1909
parser_archive_all.add_argument("--fast_io", dest="fast_io", help="don't pause between each archiving", default=False, action="store_const", const=True)
1910
parser_archive_all.set_defaults(func=lambda args : archive_all(fast_io=args.fast_io))
1911
1912
def do_gs_sync_all(*args, **kwds):
1913
try:
1914
print json.dumps(gs_sync_all())
1915
except Exception, mesg:
1916
print json.dumps({"error":str(mesg), 'status':'error'})
1917
raise
1918
parser_gs_sync_all = subparsers.add_parser('gs_sync_all',
1919
help="gs_sync every project hosted on this machine")
1920
parser_gs_sync_all.set_defaults(func=do_gs_sync_all)
1921
1922
1923
1924
parser.add_argument("project_id", help="project id's -- most subcommands require this", type=str)
1925
1926
args = parser.parse_args()
1927
1928
t0 = time.time()
1929
ZPOOL = args.zpool
1930
try:
1931
if len(args.project_id) > 0:
1932
project = Project(project_id = args.project_id)
1933
args.func(args)
1934
else:
1935
args.func(args)
1936
except Exception, mesg:
1937
log("exception - %s"%mesg)
1938
sys.exit(1)
1939
finally:
1940
log("total time: %s seconds"%(time.time()-t0))
1941
1942
1943