CoCalc Public Filesdask.ipynbOpen in with one click!
Author: Harald Schilly
Views : 529

Dask in Python 3 (Ubuntu Linux)

https://docs.dask.org/en/latest/

In [6]:
import dask dask.__version__
'2.16.0'
In [7]:
import distributed distributed.__version__
'2.16.0'
In [13]:
import dask import dask.distributed import os dask.config.set({ 'temporary_directory': os.path.expanduser('~/tmp'), 'scheduler.work-stealing': True })
--------------------------------------------------------------------------- TypeError Traceback (most recent call last) <ipython-input-13-1891719d5672> in <module> 4 dask.config.set({ 5 'temporary_directory': os.path.expanduser('~/tmp'), ----> 6 'scheduler.work-stealing': True 7 }) /usr/local/lib/python3.6/dist-packages/dask/config.py in __init__(self, arg, config, lock, **kwargs) 303 for key, value in arg.items(): 304 key = check_deprecations(key) --> 305 self._assign(key.split("."), value, config) 306 if kwargs: 307 for key, value in kwargs.items(): /usr/local/lib/python3.6/dist-packages/dask/config.py in _assign(self, keys, value, d, path, record) 363 # No need to record subsequent operations after an insert 364 record = False --> 365 self._assign(keys[1:], value, d[key], path, record=record) 366 367 /usr/local/lib/python3.6/dist-packages/dask/config.py in _assign(self, keys, value, d, path, record) 355 else: 356 self._record.append(("insert", path, None)) --> 357 d[key] = value 358 else: 359 if key not in d: TypeError: 'str' object does not support item assignment
In [14]:
dask.config.config
{'temporary-directory': '/home/user/tmp', 'dataframe': {'shuffle-compression': None}, 'array': {'svg': {'size': 120}, 'chunk-size': '128MiB', 'rechunk-threshold': 4}, 'optimization': {'fuse': {'active': True, 'ave-width': 1, 'subraphs': False, 'rename-keys': True}}, 'distributed': {'version': 2, 'scheduler': {'allowed-failures': 3, 'bandwidth': 100000000, 'blocked-handlers': [], 'default-data-size': '1kiB', 'events-cleanup-delay': '1h', 'idle-timeout': None, 'transition-log-length': 100000, 'work-stealing': True, 'work-stealing-interval': '100ms', 'worker-ttl': None, 'pickle': True, 'preload': [], 'preload-argv': [], 'unknown-task-duration': '500ms', 'default-task-durations': {'rechunk-split': '1us', 'shuffle-split': '1us'}, 'validate': False, 'dashboard': {'status': {'task-stream-length': 1000}, 'tasks': {'task-stream-length': 100000}, 'tls': {'ca-file': None, 'key': None, 'cert': None}, 'bokeh-application': {'allow_websocket_origin': ['*'], 'keep_alive_milliseconds': 500, 'check_unused_sessions_milliseconds': 500}}, 'locks': {'lease-validation-interval': '10s', 'lease-timeout': '30s'}, 'http': {'routes': ['distributed.http.scheduler.prometheus', 'distributed.http.scheduler.info', 'distributed.http.scheduler.json', 'distributed.http.health', 'distributed.http.proxy', 'distributed.http.statics']}}, 'worker': {'blocked-handlers': [], 'multiprocessing-method': 'spawn', 'use-file-locking': True, 'connections': {'outgoing': 50, 'incoming': 10}, 'preload': [], 'preload-argv': [], 'daemon': True, 'validate': False, 'lifetime': {'duration': None, 'stagger': '0 seconds', 'restart': False}, 'profile': {'interval': '10ms', 'cycle': '1000ms', 'low-level': False}, 'memory': {'target': 0.6, 'spill': 0.7, 'pause': 0.8, 'terminate': 0.95}, 'http': {'routes': ['distributed.http.worker.prometheus', 'distributed.http.health', 'distributed.http.statics']}}, 'nanny': {'preload': [], 'preload-argv': []}, 'client': {'heartbeat': '5s', 'scheduler-info-interval': '2s'}, 'deploy': {'lost-worker-timeout': '15s', 'cluster-repr-interval': '500ms'}, 'adaptive': {'interval': '1s', 'target-duration': '5s', 'minimum': 0, 'maximum': inf, 'wait-count': 3}, 'comm': {'retry': {'count': 0, 'delay': {'min': '1s', 'max': '20s'}}, 'compression': 'auto', 'offload': '10MiB', 'default-scheme': 'tcp', 'socket-backlog': 2048, 'recent-messages-log-length': 0, 'zstd': {'level': 3, 'threads': 0}, 'timeouts': {'connect': '10s', 'tcp': '30s'}, 'require-encryption': False, 'tls': {'ciphers': None, 'ca-file': None, 'scheduler': {'cert': None, 'key': None}, 'worker': {'key': None, 'cert': None}, 'client': {'key': None, 'cert': None}}}, 'dashboard': {'link': '{scheme}://{host}:{port}/status', 'export-tool': False, 'graph-max-items': 5000}, 'admin': {'tick': {'interval': '20ms', 'limit': '3s'}, 'max-error-length': 10000, 'log-length': 10000, 'log-format': '%(name)s - %(levelname)s - %(message)s', 'pdb-on-err': False}}, 'rmm': {'pool-size': None}, 'ucx': {'tcp': None, 'nvlink': None, 'infiniband': None, 'rdmacm': None, 'cuda_copy': None, 'net-devices': None}, 'scheduler': 'dask.distributed', 'shuffle': 'tasks'}
In [15]:
from dask.distributed import Client client = Client('127.0.0.1:8786') client

Client

Cluster

  • Workers: 3
  • Cores: 3
  • Memory: 768.00 MB

Start dash-scheduler and dash-worker with --dashboard-prefix b9bacd7b-6cee-402c-88ed-9d74b07f29a1/port/8787

The dashboard is actually at https://cocalc.com/{{ THE PROJECT UUID }}/port/8787/status

Websocket forwarding doesn't work, though ... hmm...

alternatively, start an X11 desktop in cocalc and run google-chrome at http://127.0.0.1:8787/status

data array similar to numpy arrays

In [16]:
import dask.array as da
In [17]:
client.restart() x = da.random.random((1000, 1000), chunks=(300, 300)) x
Array Chunk
Bytes 8.00 MB 720.00 kB
Shape (1000, 1000) (300, 300)
Count 16 Tasks 16 Chunks
Type float64 numpy.ndarray
1000 1000
In [18]:
x = x.rechunk({0: 500, 1: 500}).persist() x
Array Chunk
Bytes 8.00 MB 2.00 MB
Shape (1000, 1000) (500, 500)
Count 4 Tasks 4 Chunks
Type float64 numpy.ndarray
1000 1000
In [19]:
client.scatter(x)
Future: Array status: finished, type: dask.Array, key: Array-9db7319391330ab9161d425d754da9b1
In [20]:
y = x + x.T z = y[::50, 100:].mean(axis=1) z
Array Chunk
Bytes 160 B 80 B
Shape (20,) (10,)
Count 22 Tasks 2 Chunks
Type float64 numpy.ndarray
20 1
In [11]:
z.shape
(20,)
In [12]:
out = z.compute() out
array([0.99803183, 1.00554271, 0.99672158, 0.99485491, 1.00031019, 0.98133476, 1.01035192, 1.00860477, 1.0029704 , 0.98932816, 0.99846216, 1.01396122, 1.02296116, 0.98738112, 0.99428716, 1.00707458, 1.01200935, 0.97409059, 0.99750783, 0.98888702])
In [13]:
rz = (z[:100].sum() / z[:100].shape[0]) rz.compute()
0.9992336696610516
In [14]:
del x, y, z, rz, out

sum 1000 ints

In [15]:
client.restart() import dask.bag as db nums = db.from_sequence(range(1000), npartitions=10) nums.sum().compute()
499500

functions and native lists

In [16]:
import numpy as np xx = dask.array.array(np.arange(100)) xx = xx.reshape(10, -1) xx = xx.rechunk(2, 2) xx = client.persist(xx) xx
Array Chunk
Bytes 800 B 32 B
Shape (10, 10) (2, 2)
Count 25 Tasks 25 Chunks
Type int64 numpy.ndarray
10 10
In [17]:
y = client.submit(lambda x : x.dot(x.T), xx) y.result()
Array Chunk
Bytes 800 B 32 B
Shape (10, 10) (2, 2)
Count 375 Tasks 25 Chunks
Type int64 numpy.ndarray
10 10
In [18]:
client.restart() import dask.array as da from time import sleep from random import random import numpy as np delay = lambda : sleep(.2 * random()) def formula(x): delay() return x**3 - x**2 + 1 def neg(a): delay() return -a def dup(a): delay() return 2 * a def mysum(a, b): delay() return lambda a, b: a + b N = 100 A = client.map(formula, range(N)) B = client.map(neg, A[:N // 3]) B.extend(client.map(dup, A[N // 3:2 * N // 3])) B.extend(B[2*N//3:]) C = client.map(mysum, A, B) total = client.submit(sum, B) total.result()
8212721
In [ ]:

loops?

In [21]:
from time import sleep from random import random @dask.delayed def fib(x): sleep(random()) if x <= 1: return 1 else: a, b = fib(x - 2), fib(x - 1) a, b = dask.compute(a, b) return a + b
In [22]:
dask.compute((fib(i) for i in range(5)), 10)
([1, 1, 2, 3, 5], 10)

Dask Bags

In [23]:
import dask.bag as db b1 = db.from_sequence(range(-1031, 1000), npartitions=50) b1
dask.bag<from_sequence, npartitions=50>
In [24]:
import operator
In [25]:
is_odd = lambda x : x % 2 == 0 #b1.groupby(is_odd).map(lambda k_v : (k_v[0], sum(k_v[1]))).compute() b1.foldby(is_odd, operator.add, 0).compute()
[(False, -16256), (True, -16240)]

Dask Delayed

In [24]:
from dask import delayed
In [25]:
inc = lambda x : x+1 from operator import add
In [26]:
from time import sleep z = delayed(0) for i in range(5): x = delayed(inc)(i) y = delayed(inc)(delayed(add)(i, x)) z = delayed(add)(z, y) z.compute()
30
In [27]:
z.vizualize(filename=os.path.expanduser('~/dask-delayed-1.png'), format='png')
Delayed('vizualize-f71d3704-0ae5-4641-b3b6-5e20b98d456d')
In [28]:
z.vizualize()
Delayed('vizualize-3d312d55-dbac-47fd-89a1-7628348e432f')
In [29]:
import dask_ml dask_ml.__version__
'1.3.0'
In [30]:
from dask_ml.preprocessing import Categorizer, DummyEncoder from dask_ml.linear_model import LogisticRegression
In [31]:
lr = LogisticRegression() lr
LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True, intercept_scaling=1.0, max_iter=100, multi_class='ovr', n_jobs=1, penalty='l2', random_state=None, solver='admm', solver_kwargs=None, tol=0.0001, verbose=0, warm_start=False)
In [32]:
from joblib import parallel_backend with parallel_backend('dask') as pb: print(pb[0])
<joblib._dask.DaskDistributedBackend object at 0x7f2520d382e8>
In [33]:
import joblib joblib.__version__
'0.14.1'
In [ ]:
In [ ]:
In [ ]:

Ad-hoc Local Cluster

In [34]:
from dask.distributed import Client, LocalCluster import dask.array as da cluster = LocalCluster( n_workers=3, threads_per_worker=1, processes=True, diagnostics_port=None, ) client = Client(cluster) x = da.random.random((300, 300), chunks=(10, 10)) y = x + x.T z = (y.mean(axis=1) / y.shape[0]).std() print(z.compute())
/usr/local/lib/python3.6/dist-packages/distributed/dashboard/core.py:79: UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the diagnostics dashboard on a random port instead. warnings.warn("\n" + msg)
7.34683839787855e-05
In [ ]:
In [ ]:
In [ ]: