CoCalc
Shareddask.ipynbOpen in CoCalc
Author: Harald Schilly
Views : 2
import dask dask.__version__
'1.0.0'
import dask import dask.distributed import os dask.config.set({ 'temporary_directory': os.path.expanduser('~/tmp'), 'scheduler.work-stealing': True })
<dask.config.set at 0x7f063001f748>
dask.config.config
{'logging': {'distributed': 'info', 'distributed.client': 'warning', 'bokeh': 'critical', 'tornado': 'critical', 'tornado.application': 'error'}, 'require-encryption': False, 'client-heartbeat-interval': 5000, 'distributed': {'version': 2, 'scheduler': {'allowed-failures': 3, 'bandwidth': 100000000, 'default-data-size': 1000, 'transition-log-length': 100000, 'work-stealing': True, 'worker-ttl': None, 'preload': [], 'preload-argv': []}, 'worker': {'multiprocessing-method': 'forkserver', 'use-file-locking': True, 'connections': {'outgoing': 50, 'incoming': 10}, 'preload': [], 'preload-argv': [], 'profile': {'interval': 10, 'cycle': 1000}, 'memory': {'target': 0.6, 'spill': 0.7, 'pause': 0.8, 'terminate': 0.95}}, 'client': {'heartbeat': '5s'}, 'comm': {'compression': 'auto', 'default-scheme': 'tcp', 'socket-backlog': 2048, 'recent-messages-log-length': 0, 'timeouts': {'connect': 3, 'tcp': 30}}, 'dashboard': {'link': 'http://{host}:{port}/status', 'export-tool': False}, 'admin': {'tick': {'interval': 20, 'limit': 1000}, 'log-length': 10000, 'log-format': '%(name)s - %(levelname)s - %(message)s', 'pdb-on-err': False}}, 'temporary-directory': '/home/user/tmp', 'scheduler': {'work-stealing': True}}
from dask.distributed import Client client = Client('127.0.0.1:8786') client

Client

Cluster

  • Workers: 2
  • Cores: 2
  • Memory: 512.00 MB

The dashboard is actually at https://cocalc.com/{{ THE PROJECT UUID }}/server/8787/status

Websocket forwarding doesn't work, though ... hmm...

alternatively, start an X11 desktop in cocalc and run google-chrome at http://127.0.0.1:8787/status

data array similar to numpy arrays

import dask.array as da
x = da.random.random((3000, 3000), chunks=(300, 300)) x
dask.array<random_sample, shape=(3000, 3000), dtype=float64, chunksize=(300, 300)>
y = x + x.T z = y[::50, 1000:].mean(axis=1) z
dask.array<mean_agg-aggregate, shape=(60,), dtype=float64, chunksize=(6,)>
z.shape
(60,)
out = z.compute() out
array([1.00894334, 0.98686389, 1.00412201, 1.00562101, 0.9984469 , 1.00773417, 1.0017398 , 0.99675565, 0.99054068, 0.98704365, 0.99406761, 1.01857855, 1.00252324, 1.00853879, 0.98123995, 0.99701221, 0.99059657, 0.99144697, 1.00628833, 0.9947498 , 0.99845961, 1.01338604, 1.00216002, 0.99366811, 0.99538552, 1.03424335, 1.00016978, 0.99584787, 0.99663896, 1.0028844 , 0.99098811, 0.9955956 , 0.99308403, 1.01371317, 1.0229804 , 0.99350163, 1.00235196, 1.00826049, 0.9949777 , 0.98840276, 1.00248748, 1.01109067, 1.01085646, 1.01437783, 1.00604058, 0.99871721, 1.00370476, 0.99210787, 1.00258028, 0.99557382, 1.0012086 , 0.99908451, 0.98371352, 1.00652402, 0.9849942 , 1.0135658 , 0.98824092, 0.99474543, 1.00656597, 0.9921391 ])
(z[:100].sum() / z[:100].shape[0]).compute()
1.0002978610915068

functions and native lists

fn = lambda x : x**3 - x**2 + 1 def neg(a): import time time.sleep(.1) return -a A = client.map(fn, range(10)) B = client.map(neg, A) C = client.map(lambda a, b: a+b, A, B) total = client.submit(sum, B) total.result()
-1750
client.gather(C)
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

loops?

def fib(x): if x <= 1: return 1 else: return fib(x - 2) + fib(x - 1)
[fib(_) for _ in range(10)]
[1, 1, 2, 3, 5, 8, 13, 21, 34, 55]

Dask Bags

import dask.bag as db b1 = db.from_sequence(range(-1000, 1000), npartitions=50) b1
dask.bag<from_se..., npartitions=50>
import operator
is_odd = lambda x : x % 2 == 0 #b1.groupby(is_odd).map(lambda k_v : (k_v[0], sum(k_v[1]))).compute() b1.foldby(is_odd, operator.add, 0).compute()
[(True, -1000), (False, 0)]

Dask Delayed

from dask import delayed
inc = lambda x : x+1 from operator import add
z = delayed(0) for i in range(5): x = delayed(inc)(i) y = delayed(inc)(delayed(add)(i, x)) z = delayed(add)(z, y) z.compute()
30
z.vizualize(filename='dask-delayed-1.svg')
Delayed('vizualize-38f8c38e-2766-4979-aa1d-a76835e14c0f')
import dask_ml
from dask_ml.preprocessing import Categorizer, DummyEncoder from dask_ml.linear_model import LogisticRegression
lr = LogisticRegression() lr
LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True, intercept_scaling=1.0, max_iter=100, multiclass='ovr', n_jobs=1, penalty='l2', random_state=None, solver='admm', solver_kwargs=None, tol=0.0001, verbose=0, warm_start=False)
from sklearn.externals.joblib import parallel_backend with parallel_backend('dask') as pb: print(pb[0])
<sklearn.externals.joblib._dask.DaskDistributedBackend object at 0x7f0611245d68>