Shareddask.ipynbOpen in CoCalc
Author: Harald Schilly
Views : 58

Dask in Python 3 (Ubuntu Linux)

https://docs.dask.org/en/latest/

In [1]:
import dask dask.__version__
'2.3.0'
In [1]:
import distributed distributed.__version__
'2.3.2'
In [2]:
import dask import dask.distributed import os dask.config.set({ 'temporary_directory': os.path.expanduser('~/tmp'), 'scheduler.work-stealing': True })
<dask.config.set at 0x7f9ee8d7cdd8>
In [3]:
dask.config.config
{'array': {'svg': {'size': 120}}, 'distributed': {'admin': {'log-format': '%(name)s - %(levelname)s - %(message)s', 'log-length': 10000, 'pdb-on-err': False, 'tick': {'interval': '20ms', 'limit': '3s'}}, 'client': {'heartbeat': '5s'}, 'comm': {'compression': 'auto', 'default-scheme': 'tcp', 'recent-messages-log-length': 0, 'require-encryption': False, 'socket-backlog': 2048, 'timeouts': {'connect': '10s', 'tcp': '30s'}, 'tls': {'ca-file': None, 'ciphers': None, 'client': {'cert': None, 'key': None}, 'scheduler': {'cert': None, 'key': None}, 'worker': {'cert': None, 'key': None}}}, 'dashboard': {'export-tool': False, 'link': '{scheme}://{host}:{port}/status'}, 'scheduler': {'allowed-failures': 3, 'bandwidth': 100000000, 'blocked-handlers': [], 'dashboard': {'status': {'task-stream-length': 1000}, 'tasks': {'task-stream-length': 100000}, 'tls': {'ca-file': None, 'cert': None, 'key': None}}, 'default-data-size': 1000, 'events-cleanup-delay': '1h', 'idle-timeout': None, 'pickle': True, 'preload': [], 'preload-argv': [], 'transition-log-length': 100000, 'work-stealing': True, 'worker-ttl': None}, 'version': 2, 'worker': {'blocked-handlers': [], 'connections': {'incoming': 10, 'outgoing': 50}, 'daemon': True, 'lifetime': {'duration': None, 'restart': False, 'stagger': '0 seconds'}, 'memory': {'pause': 0.8, 'spill': 0.7, 'target': 0.6, 'terminate': 0.95}, 'multiprocessing-method': 'forkserver', 'preload': [], 'preload-argv': [], 'profile': {'cycle': '1000ms', 'interval': '10ms', 'low-level': False}, 'use-file-locking': True}}, 'scheduler': {'work-stealing': True}, 'temporary-directory': '/home/user/tmp'}
In [2]:
from dask.distributed import Client client = Client('127.0.0.1:8786') client

Client

Cluster

  • Workers: 2
  • Cores: 2
  • Memory: 512.00 MB

The dashboard is actually at https://cocalc.com/{{ THE PROJECT UUID }}/server/8787/status

Websocket forwarding doesn't work, though ... hmm...

alternatively, start an X11 desktop in cocalc and run google-chrome at http://127.0.0.1:8787/status

data array similar to numpy arrays

In [3]:
import dask.array as da
In [4]:
x = da.random.random((1000, 1000), chunks=(300, 300)) x
Array Chunk
Bytes 8.00 MB 720.00 kB
Shape (1000, 1000) (300, 300)
Count 16 Tasks 16 Chunks
Type float64 numpy.ndarray
1000 1000
In [5]:
client.scatter(x)
Future: Array status: finished, type: Array, key: Array-24e69cba4ae50deb02d577cbaa6d2f32
In [6]:
y = x + x.T z = y[::50, 100:].mean(axis=1) z
Array Chunk
Bytes 160 B 48 B
Shape (20,) (6,)
Count 84 Tasks 4 Chunks
Type float64 numpy.ndarray
20 1
In [8]:
z.shape
(20,)
In [9]:
out = z.compute() out
array([0.98685125, 1.00267952, 1.02948084, 1.00879423, 1.01498646, 0.97546357, 0.99635268, 0.98330797, 1.00368954, 0.99629023, 0.98163442, 0.99099701, 1.00793484, 1.0069481 , 1.00926458, 0.99543205, 0.99988385, 1.01652145, 0.99421467, 0.99346286])
In [10]:
rz = (z[:100].sum() / z[:100].shape[0]) rz.compute()
0.99970950713922
In [11]:
rz.visualize()

functions and native lists

In [12]:
fn = lambda x : x**3 - x**2 + 1 def neg(a): import time time.sleep(.1) return -a A = client.map(fn, range(10)) B = client.map(neg, A) C = client.map(lambda a, b: a+b, A, B) total = client.submit(sum, B) total.result()
-1750
In [13]:
client.gather(C)
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

loops?

In [14]:
def fib(x): if x <= 1: return 1 else: return fib(x - 2) + fib(x - 1)
In [15]:
[fib(_) for _ in range(10)]
[1, 1, 2, 3, 5, 8, 13, 21, 34, 55]

Dask Bags

In [16]:
import dask.bag as db b1 = db.from_sequence(range(-1031, 1000), npartitions=50) b1
dask.bag<from_se..., npartitions=50>
In [17]:
import operator
In [18]:
is_odd = lambda x : x % 2 == 0 #b1.groupby(is_odd).map(lambda k_v : (k_v[0], sum(k_v[1]))).compute() b1.foldby(is_odd, operator.add, 0).compute()
[(False, -16256), (True, -16240)]

Dask Delayed

In [19]:
from dask import delayed
In [20]:
inc = lambda x : x+1 from operator import add
In [21]:
z = delayed(0) for i in range(5): x = delayed(inc)(i) y = delayed(inc)(delayed(add)(i, x)) z = delayed(add)(z, y) z.compute()
30
In [22]:
z.vizualize(filename=os.path.expanduser('~/dask-delayed-1.png'), format='png')
Delayed('vizualize-89c76478-7b3a-4d64-9f6f-4abb7d78bfc8')
In [23]:
z.vizualize()
Delayed('vizualize-8b52a54b-9e0f-472b-bd94-540bfb7ab1a0')
In [24]:
import dask_ml dask_ml.__version__
'1.0.0'
In [25]:
from dask_ml.preprocessing import Categorizer, DummyEncoder from dask_ml.linear_model import LogisticRegression
In [26]:
lr = LogisticRegression() lr
LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True, intercept_scaling=1.0, max_iter=100, multi_class='ovr', n_jobs=1, penalty='l2', random_state=None, solver='admm', solver_kwargs=None, tol=0.0001, verbose=0, warm_start=False)
In [29]:
from joblib import parallel_backend with parallel_backend('dask') as pb: print(pb[0])
<joblib._dask.DaskDistributedBackend object at 0x7f9eba4e8a58>
In [30]:
import joblib joblib.__version__
'0.12.5'
In [ ]:
In [ ]:
In [ ]:

Ad-hoc Local Cluster

In [28]:
from dask.distributed import Client, LocalCluster import dask.array as da cluster = LocalCluster( n_workers=3, threads_per_worker=1, processes=True, diagnostics_port=None) client = Client(cluster) x = da.random.random((300, 300), chunks=(10, 10)) y = x + x.T z = (y.mean(axis=1) / y.shape[0]).std() print(z.compute())
/usr/local/lib/python3.6/dist-packages/distributed/dashboard/core.py:72: UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the diagnostics dashboard on a random port instead. warnings.warn("\n" + msg)
7.8479621735994e-05
In [ ]:
In [ ]:
In [ ]: