Parallelize code with dask.delayed
In this section we parallelize simple for-loop style code with Dask and dask.delayed
. Often, this is the only function that you will need to convert functions for use with Dask.
This is a simple way to use dask
to parallelize existing codebases or build complex systems. This will also help us to develop an understanding for later sections.
Basics
First let's make some toy functions, inc
and add
, that sleep for a while to simulate work. We'll then time running these functions normally.
In the next section we'll parallelize this code.
We time the execution of this normal code using the %%time
magic, which is a special function of the Jupyter Notebook.
Parallelize with the dask.delayed
decorator
Those two increment calls could be called in parallel, because they are totally independent of one-another.
We'll transform the inc
and add
functions using the dask.delayed
function. When we call the delayed version by passing the arguments, exactly as before, but the original function isn't actually called yet - which is why the cell execution finishes very quickly. Instead, a delayed object is made, which keeps track of the function to call and the arguments to pass to it.
This ran immediately, since nothing has really happened yet.
To get the result, call compute
. Notice that this runs faster than the original code.
What just happened?
The z
object is a lazy Delayed
object. This object holds everything we need to compute the final result, including references to all of the functions that are required and their inputs and relationship to one-another. We can evaluate the result with .compute()
as above or we can visualize the task graph for this value with .visualize()
.
Notice that this includes the names of the functions from before, and the logical flow of the outputs of the inc
functions to the inputs of add
.
Some questions to consider:
Why did we go from 3s to 2s? Why weren't we able to parallelize down to 1s?
What would have happened if the inc and add functions didn't include the
sleep(1)
? Would Dask still be able to speed up this code?What if we have multiple outputs or also want to get access to x or y?
Exercise: Parallelize a for loop
for
loops are one of the most common things that we want to parallelize. Use dask.delayed
on inc
and sum
to parallelize the computation below:
How do the graph visualizations compare with the given solution, compared to a version with the sum
function used directly rather than wrapped with delay
? Can you explain the latter version? You might find the result of the following expression illuminating
Exercise: Parallelizing a for-loop code with control flow
Often we want to delay only some functions, running a few of them immediately. This is especially helpful when those functions are fast and help us to determine what other slower functions we should call. This decision, to delay or not to delay, is usually where we need to be thoughtful when using dask.delayed
.
In the example below we iterate through a list of inputs. If that input is even then we want to call inc
. If the input is odd then we want to call double
. This is_even
decision to call inc
or double
has to be made immediately (not lazily) in order for our graph-building Python code to proceed.
Some questions to consider:
What are other examples of control flow where we can't use delayed?
What would have happened if we had delayed the evaluation of
is_even(x)
in the example above?What are your thoughts on delaying
sum
? This function is both computational but also fast to run.
Exercise: Parallelizing a Pandas Groupby Reduction
In this exercise we read several CSV files and perform a groupby operation in parallel. We are given sequential code to do this and parallelize it with dask.delayed
.
The computation we will parallelize is to compute the mean departure delay per airport from some historical flight data. We will do this by using dask.delayed
together with pandas
. In a future section we will do this same exercise with dask.dataframe
.
Prep data
First, run this code to prep some data, if you have not already done so.
This downloads and extracts some historical flight data for flights out of NYC between 1990 and 2000. The data is originally from here.
Inspect data
Read one file with pandas.read_csv
and compute mean departure delay
Sequential code: Mean Departure Delay Per Airport
The above cell computes the mean departure delay per-airport for one year. Here we expand that to all years using a sequential for loop.
Parallelize the code above
Use dask.delayed
to parallelize the code above. Some extra things you will need to know.
Methods and attribute access on delayed objects work automatically, so if you have a delayed object you can perform normal arithmetic, slicing, and method calls on it and it will produce the correct delayed calls.
Calling the
.compute()
method works well when you have a single output. When you have multiple outputs you might want to use thedask.compute
function:This way Dask can share the intermediate values (like
y = x**2
)
So your goal is to parallelize the code above (which has been copied below) using dask.delayed
. You may also want to visualize a bit of the computation to see if you're doing it correctly.
If you load the solution, add %%time
to the top of the cell to measure the running time.
Some questions to consider:
How much speedup did you get? Is this how much speedup you'd expect?
Experiment with where to call
compute
. What happens when you call it onsums
andcounts
? What happens if you wait and call it onmean
?Experiment with delaying the call to
sum
. What does the graph look like ifsum
is delayed? What does the graph look like if it isn't?Can you think of any reason why you'd want to do the reduction one way over the other?