Shareddask.ipynbOpen in CoCalc

Dask in Python 3 (Ubuntu Linux)

https://docs.dask.org/en/latest/

import dask
dask.__version__
'1.1.1'
import dask
import dask.distributed
import os
dask.config.set({
    'temporary_directory': os.path.expanduser('~/tmp'),
    'scheduler.work-stealing': True
})
<dask.config.set at 0x7ff427e50550>
dask.config.config
{'distributed': {'version': 2, 'scheduler': {'allowed-failures': 3, 'bandwidth': 100000000, 'default-data-size': 1000, 'events-cleanup-delay': '1h', 'transition-log-length': 100000, 'work-stealing': True, 'worker-ttl': None, 'preload': [], 'preload-argv': []}, 'worker': {'multiprocessing-method': 'forkserver', 'use-file-locking': True, 'connections': {'outgoing': 50, 'incoming': 10}, 'preload': [], 'preload-argv': [], 'profile': {'interval': '10ms', 'cycle': '1000ms'}, 'memory': {'target': 0.6, 'spill': 0.7, 'pause': 0.8, 'terminate': 0.95}}, 'client': {'heartbeat': '5s'}, 'comm': {'compression': 'auto', 'default-scheme': 'tcp', 'socket-backlog': 2048, 'recent-messages-log-length': 0, 'timeouts': {'connect': '10s', 'tcp': '30s'}}, 'dashboard': {'link': 'http://{host}:{port}/status', 'export-tool': False}, 'admin': {'tick': {'interval': '20ms', 'limit': '3s'}, 'log-length': 10000, 'log-format': '%(name)s - %(levelname)s - %(message)s', 'pdb-on-err': False}}, 'temporary-directory': '/home/user/tmp', 'scheduler': {'work-stealing': True}}
from dask.distributed import Client
client = Client('127.0.0.1:8786')
client

Client

Cluster

  • Workers: 2
  • Cores: 2
  • Memory: 512.00 MB

The dashboard is actually at https://cocalc.com/{{ THE PROJECT UUID }}/server/8787/status

Websocket forwarding doesn't work, though ... hmm...

alternatively, start an X11 desktop in cocalc and run google-chrome at http://127.0.0.1:8787/status

data array similar to numpy arrays

import dask.array as da
x = da.random.random((1000, 1000), chunks=(300, 300))
x
dask.array<random_sample, shape=(1000, 1000), dtype=float64, chunksize=(300, 300)>
y = x + x.T
z = y[::50, 100:].mean(axis=1)
z
dask.array<mean_agg-aggregate, shape=(20,), dtype=float64, chunksize=(6,)>
z.shape
(20,)
out = z.compute()
out
array([1.0146939 , 1.00857884, 0.99536186, 1.02285578, 1.00256021, 0.99480644, 0.98832619, 1.00787754, 0.997528 , 1.00129023, 0.98857417, 0.9798488 , 0.97488509, 0.99765059, 0.9932824 , 1.00199858, 1.0148808 , 0.98820368, 1.02353063, 1.00137729])
rz = (z[:100].sum() / z[:100].shape[0])
rz.compute()
0.9999055509879776
rz.visualize()

functions and native lists

fn = lambda x : x**3 - x**2 + 1

def neg(a):
    import time
    time.sleep(.1)
    return -a

A = client.map(fn, range(10))
B = client.map(neg, A)
C = client.map(lambda a, b: a+b, A, B)
total = client.submit(sum, B)
total.result()
-1750
client.gather(C)
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

loops?

def fib(x):
    if x <= 1:
        return 1
    else:
        return fib(x - 2) + fib(x - 1)
[fib(_) for _ in range(10)]
[1, 1, 2, 3, 5, 8, 13, 21, 34, 55]

Dask Bags

import dask.bag as db
b1 = db.from_sequence(range(-1000, 1000), npartitions=50)
b1
dask.bag<from_se..., npartitions=50>
import operator
is_odd = lambda x : x % 2 == 0

#b1.groupby(is_odd).map(lambda k_v : (k_v[0], sum(k_v[1]))).compute()
b1.foldby(is_odd, operator.add, 0).compute()
[(True, -1000), (False, 0)]

Dask Delayed

from dask import delayed
inc = lambda x : x+1
from operator import add
z = delayed(0)
for i in range(5):
    x = delayed(inc)(i)
    y = delayed(inc)(delayed(add)(i, x))
    z = delayed(add)(z, y)
z.compute()
30
z.vizualize(filename=os.path.expanduser('~/dask-delayed-1.png'), format='png')
Delayed('vizualize-e1cd855b-2328-4b4b-bc42-c9c9362e0c3e')
import dask_ml
dask_ml.__version__
'0.11.0'
from dask_ml.preprocessing import Categorizer, DummyEncoder
from dask_ml.linear_model import LogisticRegression
lr = LogisticRegression()
lr
LogisticRegression(C=1.0, class_weight=None, dual=False, fit_intercept=True, intercept_scaling=1.0, max_iter=100, multiclass='ovr', n_jobs=1, penalty='l2', random_state=None, solver='admm', solver_kwargs=None, tol=0.0001, verbose=0, warm_start=False)
from sklearn.externals.joblib import parallel_backend
with parallel_backend('dask') as pb:
    print(pb[0])
<sklearn.externals.joblib._dask.DaskDistributedBackend object at 0x7f39b00e8470>



Local Cluster

from dask.distributed import Client, LocalCluster
import dask.array as da

cluster = LocalCluster(
    n_workers=3, threads_per_worker=1, processes=True, diagnostics_port=None)
client = Client(cluster)
x = da.random.random((300, 300), chunks=(10, 10))
y = x + x.T
z = (y.mean(axis=1) / y.shape[0]).std()
print(z.compute())
7.934615773661303e-05