Contact
CoCalc Logo Icon
StoreFeaturesDocsShareSupport News Sign UpSign In
| Download
Project: Testing 18.04
Views: 1912
Kernel: Python 3 (system-wide)

Dask in Python 3 (Ubuntu Linux)

https://docs.dask.org/en/latest/

import dask dask.__version__
'2.21.0'
import distributed distributed.__version__
'2.21.0'
import dask import dask.distributed import os dask.config.set({ 'temporary_directory': os.path.expanduser('~/tmp'), 'scheduler.work-stealing': True })
<dask.config.set at 0x7f29fc3ea2e8>
dask.config.config
{'temporary-directory': '/home/user/tmp', 'dataframe': {'shuffle-compression': None}, 'array': {'svg': {'size': 120}}, 'optimization': {'fuse': {'active': True, 'ave-width': 1, 'max-width': None, 'max-height': inf, 'max-depth-new-edges': None, 'subgraphs': None, 'rename-keys': True}}, 'distributed': {'version': 2, 'scheduler': {'allowed-failures': 3, 'bandwidth': 100000000, 'blocked-handlers': [], 'default-data-size': '1kiB', 'events-cleanup-delay': '1h', 'idle-timeout': None, 'transition-log-length': 100000, 'work-stealing': True, 'work-stealing-interval': '100ms', 'worker-ttl': None, 'pickle': True, 'preload': [], 'preload-argv': [], 'unknown-task-duration': '500ms', 'default-task-durations': {'rechunk-split': '1us', 'shuffle-split': '1us'}, 'validate': False, 'dashboard': {'status': {'task-stream-length': 1000}, 'tasks': {'task-stream-length': 100000}, 'tls': {'ca-file': None, 'key': None, 'cert': None}, 'bokeh-application': {'allow_websocket_origin': ['*'], 'keep_alive_milliseconds': 500, 'check_unused_sessions_milliseconds': 500}}, 'locks': {'lease-validation-interval': '10s', 'lease-timeout': '30s'}, 'http': {'routes': ['distributed.http.scheduler.prometheus', 'distributed.http.scheduler.info', 'distributed.http.scheduler.json', 'distributed.http.health', 'distributed.http.proxy', 'distributed.http.statics']}}, 'worker': {'blocked-handlers': [], 'multiprocessing-method': 'spawn', 'use-file-locking': True, 'connections': {'outgoing': 50, 'incoming': 10}, 'preload': [], 'preload-argv': [], 'daemon': True, 'validate': False, 'lifetime': {'duration': None, 'stagger': '0 seconds', 'restart': False}, 'profile': {'interval': '10ms', 'cycle': '1000ms', 'low-level': False}, 'memory': {'target': 0.6, 'spill': 0.7, 'pause': 0.8, 'terminate': 0.95}, 'http': {'routes': ['distributed.http.worker.prometheus', 'distributed.http.health', 'distributed.http.statics']}}, 'nanny': {'preload': [], 'preload-argv': []}, 'client': {'heartbeat': '5s', 'scheduler-info-interval': '2s'}, 'deploy': {'lost-worker-timeout': '15s', 'cluster-repr-interval': '500ms'}, 'adaptive': {'interval': '1s', 'target-duration': '5s', 'minimum': 0, 'maximum': inf, 'wait-count': 3}, 'comm': {'retry': {'count': 0, 'delay': {'min': '1s', 'max': '20s'}}, 'compression': 'auto', 'offload': '10MiB', 'default-scheme': 'tcp', 'socket-backlog': 2048, 'recent-messages-log-length': 0, 'zstd': {'level': 3, 'threads': 0}, 'timeouts': {'connect': '10s', 'tcp': '30s'}, 'require-encryption': None, 'tls': {'ciphers': None, 'ca-file': None, 'scheduler': {'cert': None, 'key': None}, 'worker': {'key': None, 'cert': None}, 'client': {'key': None, 'cert': None}}}, 'dashboard': {'link': '{scheme}://{host}:{port}/status', 'export-tool': False, 'graph-max-items': 5000}, 'admin': {'tick': {'interval': '20ms', 'limit': '3s'}, 'max-error-length': 10000, 'log-length': 10000, 'log-format': '%(name)s - %(levelname)s - %(message)s', 'pdb-on-err': False}}, 'rmm': {'pool-size': None}, 'ucx': {'tcp': None, 'nvlink': None, 'infiniband': None, 'rdmacm': None, 'cuda_copy': None, 'net-devices': None, 'reuse-endpoints': True}, 'scheduler': {'work-stealing': True}}
from dask.distributed import Client client = Client('127.0.0.1:8786') client

Client

Cluster

  • Workers: 1
  • Cores: 1
  • Memory: 256.00 MB

Start dash-scheduler and dash-worker with --dashboard-prefix b9bacd7b-6cee-402c-88ed-9d74b07f29a1/port/8787

The dashboard is actually at https://cocalc.com/{{ THE PROJECT UUID }}/port/8787/status

Websocket forwarding doesn't work, though ... hmm...

alternatively, start an X11 desktop in cocalc and run google-chrome at http://127.0.0.1:8787/status

data array similar to numpy arrays

import dask.array as da
client.restart() x = da.random.random((1000, 1000), chunks=(300, 300)) x
Array Chunk
Bytes 8.00 MB 720.00 kB
Shape (1000, 1000) (300, 300)
Count 16 Tasks 16 Chunks
Type float64 numpy.ndarray
x = x.rechunk({0: 500, 1: 500}).persist() x
Array Chunk
Bytes 8.00 MB 2.00 MB
Shape (1000, 1000) (500, 500)
Count 4 Tasks 4 Chunks
Type float64 numpy.ndarray
client.scatter(x)
Future: Array status: finished, type: dask.Array, key: Array-a43378b13ba8382373df14b6dd7f61d4
y = x + x.T z = y[::50, 100:].mean(axis=1) z
Array Chunk
Bytes 160 B 80 B
Shape (20,) (10,)
Count 22 Tasks 2 Chunks
Type float64 numpy.ndarray
z.shape
(20,)
out = z.compute() out
array([0.99706421, 1.03277113, 0.9993643 , 0.99231638, 1.02139168, 0.98631926, 0.99280159, 0.97743904, 0.99200793, 1.00457694, 1.01779522, 0.98179355, 1.01977627, 1.01330775, 1.00401255, 0.98929948, 0.98495306, 0.99648525, 0.98166991, 1.01806776])
rz = (z[:100].sum() / z[:100].shape[0]) rz.compute()
0.9974348104666977
del x, y, z, rz, out

sum 1000 ints

client.restart() import dask.bag as db nums = db.from_sequence(range(1234), npartitions=10) nums.sum().compute()
760761

functions and native lists

import numpy as np xx = dask.array.array(np.arange(100)) xx = xx.reshape(10, -1) xx = xx.rechunk(2, 2) xx = client.persist(xx) xx
Array Chunk
Bytes 800 B 32 B
Shape (10, 10) (2, 2)
Count 25 Tasks 25 Chunks
Type int64 numpy.ndarray
y = client.submit(lambda x : x.dot(x.T), xx) y.result()
Array Chunk
Bytes 800 B 32 B
Shape (10, 10) (2, 2)
Count 375 Tasks 25 Chunks
Type int64 numpy.ndarray
client.restart() import dask.array as da from time import sleep from random import random import numpy as np delay = lambda : sleep(.2 * random()) def formula(x): delay() return x**3 - x**2 + 1 def neg(a): delay() return -a def dup(a): delay() return 2 * a def mysum(a, b): delay() return lambda a, b: a + b N = 100 A = client.map(formula, range(N)) B = client.map(neg, A[:N // 3]) B.extend(client.map(dup, A[N // 3:2 * N // 3])) B.extend(B[2*N//3:]) C = client.map(mysum, A, B) total = client.submit(sum, B) total.result()
8212721

loops?

from time import sleep from random import random @dask.delayed def fib(x): sleep(random()) if x <= 1: return 1 else: a, b = fib(x - 2), fib(x - 1) a, b = dask.compute(a, b) return a + b
dask.compute((fib(i) for i in range(5)), 10)
([1, 1, 2, 3, 5], 10)

Dask Bags

import dask.bag as db b1 = db.from_sequence(range(-1031, 1000), npartitions=50) b1
dask.bag<from_sequence, npartitions=50>
import operator
is_odd = lambda x : x % 2 == 0 #b1.groupby(is_odd).map(lambda k_v : (k_v[0], sum(k_v[1]))).compute() b1.foldby(is_odd, operator.add, 0).compute()
[(False, -16256), (True, -16240)]

Dask Delayed

from dask import delayed
inc = lambda x : x+1 from operator import add
from time import sleep z = delayed(0) for i in range(5): x = delayed(inc)(i) y = delayed(inc)(delayed(add)(i, x)) z = delayed(add)(z, y) z.compute()
30
z.vizualize(filename=os.path.expanduser('~/dask-delayed-1.png'), format='png')
Delayed('vizualize-ad3fba49-bb55-4757-a0ce-668e7fb3aac8')
z.vizualize()
Delayed('vizualize-3dde23e6-bc93-4e01-a407-6b5d33a0ddbe')
import dask_ml dask_ml.__version__
from dask_ml.preprocessing import Categorizer, DummyEncoder from dask_ml.linear_model import LogisticRegression
lr = LogisticRegression() lr
from joblib import parallel_backend with parallel_backend('dask') as pb: print(pb[0])
import joblib joblib.__version__
client.restart()

Client

Cluster

  • Workers: 3
  • Cores: 3
  • Memory: 768.00 MB

Ad-hoc Local Cluster

from dask.distributed import Client, LocalCluster import dask.array as da cluster = LocalCluster( n_workers=3, threads_per_worker=1, processes=True, diagnostics_port=None, ) client = Client(cluster) x = da.random.random((300, 300), chunks=(10, 10)) y = x + x.T z = (y.mean(axis=1) / y.shape[0]).std() print(z.compute())
/usr/local/lib/python3.6/dist-packages/distributed/dashboard/core.py:79: UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the diagnostics dashboard on a random port instead. warnings.warn("\n" + msg)
7.34683839787855e-05