Arrays
Parallel: Uses all of the cores on your computer
Larger-than-memory: Lets you work on datasets that are larger than your available memory by breaking up your array into many small pieces, operating on those pieces in an order that minimizes the memory footprint of your computation, and effectively streaming data from disk.
Blocked Algorithms: Perform large computations by performing many smaller computations
Related Documentation
Blocked Algorithms
A blocked algorithm executes on a large dataset by breaking it up into many small blocks.
For example, consider taking the sum of a billion numbers. We might instead break up the array into 1,000 chunks, each of size 1,000,000, take the sum of each chunk, and then take the sum of the intermediate sums.
We achieve the intended result (one sum on one billion numbers) by performing many smaller results (one thousand sums on one million numbers each, followed by another sum of a thousand numbers.)
We do exactly this with Python and NumPy in the following example:
Create random dataset
Compute sum using blocked algorithm
Before using dask, lets consider the concept of blocked algorithms. We can compute the sum of a large number of elements by loading them chunk-by-chunk, and keeping a running total.
Here we compute the sum of this large array on disk by
Computing the sum of each 1,000,000 sized chunk of the array
Computing the sum of the 1,000 intermediate sums
Note that this is a sequential process in the notebook kernel, both the loading and summing.
Exercise: Compute the mean using a blocked algorithm
Now that we've seen the simple example above try doing a slightly more complicated problem, compute the mean of the array, assuming for a moment that we don't happen to already know how many elements are in the data. You can do this by changing the code above with the following alterations:
Compute the sum of each block
Compute the length of each block
Compute the sum of the 1,000 intermediate sums and the sum of the 1,000 intermediate lengths and divide one by the other
This approach is overkill for our case but does nicely generalize if we don't know the size of the array or individual blocks beforehand.
dask.array
contains these algorithms
Dask.array is a NumPy-like library that does these kinds of tricks to operate on large datasets that don't fit into memory. It extends beyond the linear problems discussed above to full N-Dimensional algorithms and a decent subset of the NumPy interface.
Create dask.array
object
You can create a dask.array
Array
object with the da.from_array
function. This function accepts
data
: Any object that supports NumPy slicing, likedset
chunks
: A chunk size to tell us how to block up our array, like(1000000,)
** Manipulate dask.array
object as you would a numpy array**
Now that we have an Array
we perform standard numpy-style computations like arithmetic, mathematics, slicing, reductions, etc..
The interface is familiar, but the actual work is different. dask_array.sum() does not do the same thing as numpy_array.sum().
What's the difference?
dask_array.sum()
builds an expression of the computation. It does not do the computation yet. numpy_array.sum()
computes the sum immediately.
Why the difference?
Dask arrays are split into chunks. Each chunk must have computations run on that chunk explicitly. If the desired answer comes from a small slice of the entire dataset, running the computation over all data would be wasteful of CPU and memory.
Compute result
Dask.array objects are lazily evaluated. Operations like .sum
build up a graph of blocked tasks to execute.
We ask for the final result with a call to .compute()
. This triggers the actual computation.
Exercise: Compute the mean
And the variance, std, etc.. This should be a trivial change to the example above.
Look at what other operations you can do with the Jupyter notebook's tab-completion.
Does this match your result from before?
Performance and Parallelism
In our first examples we used for
loops to walk through the array one block at a time. For simple operations like sum
this is optimal. However for complex operations we may want to traverse through the array differently. In particular we may want the following:
Use multiple cores in parallel
Chain operations on a single blocks before moving on to the next one
Dask.array translates your array operations into a graph of inter-related tasks with data dependencies between them. Dask then executes this graph in parallel with multiple threads. We'll discuss more about this in the next section.
Example
Construct a 20000x20000 array of normally distributed random values broken up into 1000x1000 sized chunks
Take the mean along one axis
Take every 100th element
Performance comparision
The following experiment was performed on a heavy personal laptop. Your performance may vary. If you attempt the NumPy version then please ensure that you have more than 4GB of main memory.
NumPy: 19s, Needs gigabytes of memory
Dask Array: 4s, Needs megabytes of memory
Discussion
Notice that the Dask array computation ran in 4 seconds, but used 29.4 seconds of user CPU time. The numpy computation ran in 19.7 seconds and used 19.6 seconds of user CPU time.
Dask finished faster, but used more total CPU time because Dask was able to transparently parallelize the computation because of the chunk size.
Questions
What happens if the dask chunks=(20000,20000)?
Will the computation run in 4 seconds?
How much memory will be used?
What happens if the dask chunks=(25,25)?
What happens to CPU and memory?
Exercise: Meteorological data
There is 2GB of somewhat artifical weather data in HDF5 files in data/weather-big/*.hdf5
. We'll use the h5py
library to interact with this data and dask.array
to compute on it.
Our goal is to visualize the average temperature on the surface of the Earth for this month. This will require a mean over all of this data. We'll do this in the following steps
Create
h5py.Dataset
objects for each of the days of data on disk (dsets
)Wrap these with
da.from_array
callsStack these datasets along time with a call to
da.stack
Compute the mean along the newly stacked time axis with the
.mean()
methodVisualize the result with
matplotlib.pyplot.imshow
Integrate with dask.array
Make a list of dask.array
objects out of your list of h5py.Dataset
objects using the da.from_array
function with a chunk size of (500, 500)
.
Stack this list of dask.array
objects into a single dask.array
object with da.stack
Stack these along the first axis so that the shape of the resulting array is (31, 5760, 11520)
.
Plot the mean of this array along the time (0th
) axis
Plot the difference of the first day from the mean
Exercise: Subsample and store
In the above exercise the result of our computation is small, so we can call compute
safely. Sometimes our result is still too large to fit into memory and we want to save it to disk. In these cases you can use one of the following two functions
da.store
: Store dask.array into any object that supports numpy setitem syntax, e.g.da.to_hdf5
: A specialized function that creates and stores adask.array
object into anHDF5
file.
The task in this exercise is to use numpy step slicing to subsample the full dataset by a factor of two in both the latitude and longitude direction and then store this result to disk using one of the functions listed above.
As a reminder, Python slicing takes three elements
Example: Lennard-Jones potential
The Lennard-Jones is used in partical simuluations in physics, chemistry and engineering. It is highly parallelizable.
First, we'll run and profile the Numpy version on 7,000 particles.
Notice that the most time consuming function is distances
.
Dask version
Here's the Dask version. Only the potential
function needs to be rewritten to best utilize Dask.
Note that da.nansum
has been used over the full distance matrix to improve parallel efficiency.
Let's convert the NumPy array to a Dask array. Since the entire NumPy array fits in memory it is more computationally efficient to chunk the array by number of CPU cores.
This step should scale quite well with number of cores. The warnings are complaining about dividing by zero, which is why we used da.nansum
in potential_dask
.
Limitations
Dask.array does not implement the entire numpy interface. Users expecting this will be disappointed. Notably dask.array has the following failings:
Dask does not implement all of
np.linalg
. This has been done by a number of excellent BLAS/LAPACK implementations and is the focus of numerous ongoing academic research projects.Dask.array does not support any operation where the resulting shape depends on the values of the array. In order to form the Dask graph we must be able to infer the shape of the array before actually executing the operation. This precludes operations like indexing one Dask array with another or operations like
np.where
.Dask.array does not attempt operations like
sort
which are notoriously difficult to do in parallel and are of somewhat diminished value on very large data (you rarely actually need a full sort). Often we include parallel-friendly alternatives liketopk
.Dask development is driven by immediate need, and so many lesser used functions, like
np.full_like
have not been implemented purely out of laziness. These would make excellent community contributions.