Speeding up your Algorithms Part 4— Dask
Run your Pandas/Numpy/Sklearn/Python code in parallel with Dask
This is the fourth post in a series I am writing. All posts are here:
- Speed Up your Algorithms Part 1 — PyTorch
- Speed Up your Algorithms Part 2 — Numba
- Speed Up your Algorithms Part 3 — Parallelization
- Speed Up your Algorithms Part 4 — Dask
And these goes with Jupyter Notebooks available here:
[Github-SpeedUpYourAlgorithms] and [Kaggle]
(Edit-1/2/2019) — Added more info/possible usage of Dask.distributed.LocalCluster
Index
NOTE:
This post goes with Jupyter Notebook available in my Repo on Github:[SpeedUpYourAlgorithms-Dask]
and on Kaggle:
[SpeedUpYourAlgorithms-Dask]
1. Introduction ^
With increasing need for parallelization of Machine Learning algorithms, because of exponential increase in data sizes and even model sizes, it would have been really helpful if we had a tool which could help us parallelize our Pandas
's DataFrame handling, which could parallilize our Numpy
's computations, and even parallelize our Machine Learning algorithms (maybe algorithms from sklearn
and tensorflow
) without much hassle.
But such a library does exist, and its name is Dask
. Dask
is a parallel computing library which doesn’t just help parallelize existing Machine Learning tools ( Pandas
andNumpy
)[i.e. using High Level Collection], but also helps parallelize low level tasks/functions and can handle complex interactions between these functions by making a tasks’ graph.[i.e. using Low Level Schedulers] This is similar to Threading or multiprocessing modules of Python.
They also have a separate Machine Learning library, dask-ml
, which has integration with existing libraries such as sklearn
, xgboost
and tensorflow
.
Dask
parallelizes tasks given to it by making a graph of interactions between the tasks. It will be really helpful to visualize what you are doing by using Dask
's .visualize()
method which is available with all of its data types and with complex chain of tasks you compute. This method will output a graph of your tasks, and if your tasks have many nodes at each level(i.e. your tasks chain structure have many independent tasks at many levels, such as parallelizable task on chunks of data), then Dask
will be able to parallelize them.
Note:
Dask is still a relatively new project. It has a long way to go. Still if you don't want to go through learning a completely new API (like in case of PySpark) Dask is your best option, which surely will get better and better in future.
Still Spark/PySpark is ways ahead and will still keep on improving. It is a well established Apache project. I will publish a post on PySpark in coming months.(Today: April'19)
If you want to start with PySpark, read this comment here.
2. Data Types ^
Each data type in Dask
provides a distributed version of existing data types, such as DataFrame
from Pandas
, ndarray
's from numpy
, and list
from Python
. These data types can be larger than your memory, Dask
will run computations on your data parallel(y) in Blocked
manner. Blocked
in the sense that they perform large computations by performing many small computations, i.e. in blocks, and number of blocks are total number of chunks
.
a) Array:
Dask Array operates on very large arrays, by dividing them into chunks and executing those blocks parallely. It has many of numpy methods available which you can use to get speedup. But some of them are not implemented.
Dask Array can read from any array like structure given it supports numpy like slicing and has .shape
property by using dask.array.from_array
method. It can also read from .npy
and .zarr
files.
import dask.array as da
import numpy as nparr = numpy.random.randint(1, 1000, (10000, 10000))darr = da.from_array(arr, chunks=(1000, 1000))
# It will make chunks, each of size (1000, 1000)
darr.npartitioins
# 100
It can be used when your arrays are really heavy (i.e. they won’t fit into memory) and numpy
won’t be able to do anything about that. So, Dask
divides them into chunks of arrays and operate on them in parallel for you.
Now, Dask
does lazy evaluation of every method. So, to actually compute the value of a function, you have to use .compute()
method. It will compute the result parallely in blocks, parallelizing every independent task at that time.
result = darr.compute()
b) DataFrame:
Similar to Dask Arrays
, Dask DataFrame
s parallelize computation on very large Data Files, which won’t fit on memory, by dividing files into chunks and computing functions to those blocks parallely.
import dask.dataframe as dd
df = dd.read_csv("BigFile(s).csv", blocksize=50e6)
Now you can apply/use most of the functions available in pandas
library and apply it here.
agg = df.groupby(["column"]).aggregate(["sum", "mean", "max", "min"])
agg.columns = new_column_names # see in notebook
df_new = df.merge(agg.reset_index(), on="column", how="left")
df_new.compute().head()
c) Bag:
Dask
Bag
s parallelizes computation on Python
's list
like objects which contains elements of many data types. It is useful when you are trying to process some semi-structured data like JSON blobs or log files.
import dask.bag as db
b = db.from_txt("BigSemiStructuredData.txt")
b.take(1)
Dask
bags reads line by line and .take
method outputs tuple of number of lines specified.
Dask
Bag
implements operations like map
, filter
, fold
, and groupby
on such collections of Python objects. It does this in parallel with a small memory footprint using Python iterators. It is similar to a parallel version of PyToolz or a Pythonic version of the PySpark RDD.
filtered = b.filter(lambda x: x["Name"]=="James")\
.map(lambda x: x["Address"] = "New_Address")
filtered.compute()
3. Delayed ^
If your task is a little simple and you are not able to or don’t want to do that with these High Level Collections, then you can use Low Level Schedulers which help you to parallelize your code/algorithm using dask.delayed
interface. dask.delayed
also does lazy computation.
import dask.delayed as delay@delay
def sq(x):
return x**2
@delay
def add(x, y):
return x+y
@delay
def sum(arr):
sum=0
for i in range(len(arr)): sum+=arr[i]
return sum
You can add complex interactions between these functions according to your needs using results from previous tasks as an argument to next ones. Dask
will not compute these functions right away, rather it will make a graph for your tasks, effectively incorporating interactions between functions that you use.
inputs = list(np.arange(1, 11))#Will be addin' dask.delayed to list
temp = []
for i in range(len(inputs)):
temp.append(sq(inputs[i])) # Compute sq of inputs and save
# delayed in list
inputs=temp; temp = []
for i in range(0, len(inputs)-1, 2):
temp.append(add(inputs[i]+inputs[i+1])) # Add two consecutive
# results from prev step
inputs = temp
result = sum(inputs) # Sum all results from prev step
results.compute()
You can add delay to any parallelizable code with many possible small blocks and get a speedup. It can be many functions you want to compute like in example above or maybe reading a number of files in parallel using pandas.read_csv
.
4. Distributed ^
Firstly, until now we have using Dask
's default Scheduler for computing results of our tasks. But you can change them according to your needs from the options available from Dask
.
Dask
comes with four available schedulers:
- “
threaded
”: a scheduler backed by a thread pool - “
processes
”: a scheduler backed by a process pool - “
single-threaded
” (aka “sync
”): a synchronous scheduler, good for debugging distributed
: a distributed scheduler for executing graphs on multiple machines
result.compute(scheduler="single-threaded") # for debugging
# Or
dask.config.set(scheduler="single-threaded")
result.compute()NOTE: (from official page here)
Threaded tasks will work well when the functions called release the GIL, whereas multiprocessing will always have a slower start-up time and suffer where a lot of communication is required between tasks.# And you can get the scheduler by the one of these commands:
dask.threaded.get, dask.multiprocessing.get, dask.local.get_sync
# last one for "single-threaded"
But, Dask
has one more scheduler, dask.distributed
, and it can be preferred for following reasons:
- It provides access to asynchronous API, notably Futures,
- It provides a diagnostic dashboard that can provide valuable insight on performance and progress, and
- It handles data locality with more sophistication, and so can be more efficient than the multiprocessing scheduler on workloads that require multiple processes.
You can create Dask
's dask.distributed
Scheduler by importing and creating a Client
.
from dask.distributed import Client
client = Client() # Set up a local cluster# You can navigate to http://localhost:8787/status to see the
# diagnostic dashboard if you have Bokeh installed.
Now you can submit your tasks to this cluster by using client.submit
method, giving function and arguments as its parameters. And then we can gather our result by either using client.gather
or .result
method.
sent = client.submit(sq, 4) # sq: square function
result = client.gather(sent) # Or sent.result()
You can also look at progress of your task in current cell only by using dask.distributed.progress
. And you can also explicitly opt to wait for a task to complete by using dask.distributed.wait
.
For more information look here.
Note: (Local Cluster)
At times you will notice that Dask is exceeding memory use, even though it is dividing tasks. It could be happening to you because of the function you are trying to use on your dataset wants most of your data for processing, and multiprocessing can make things worse as all workers might try to copy dataset to memory. This can happen in aggregating cases.
Or maybe you want to restrict Dask to use only specific amount of memory. In these cases you can use Dask.distributed.LocalCluster parameters and pass them to Client() to make a LocalCluster using cores of your Local machines.from dask.distributed import Client, LocalCluster
client = Client(n_workers=1, threads_per_worker=1, processes=False,
memory_limit='25GB', scheduler_port=0,
silence_logs=True, diagnostics_port=0)
client 'scheduler_port=0' and 'diagnostics_port=0' will choose random port number for this particular client. With 'processes=False' dask's client won't copy dataset, which would have happened for every process you might have made.
You can tune your client as per your needs or limitations, and for more info you can look into parameters of LocalCluster.
You can also use multiple clients on same machine at different ports.
5. Machine Learning ^
Dask
also has library which helps in running most popular Machine Learning libraries in parallel, like sklearn
, tensorflow
and xgboost
.
In Machine Learning there are a couple of distinct scaling problems you might face. The scaling strategy depends on which problem you’re facing:
- Large Models: Data fits in RAM, but training takes too long. Many hyperparameter combinations, a large ensemble of many models, etc.
- Large Datasets: Data is larger than RAM, and sampling isn’t an option.
So, you should:
- For in-memory fit-able problems, just use scikit-learn (or your favorite ML library);
- For large models, use
dask_ml.joblib
and your favorite scikit-learn estimator; and - For large datasets, use
dask_ml
estimators.
a) Preprocessing:
dask_ml.preprocessing
contains some of the functions from sklearn
like RobustScalar
, StandardScalar
, LabelEncoder
, OneHotEncoder
, PolynomialFeatures
etc., and some of its own such as Categorizer
, DummyEncoder
, OrdinalEncoder
etc.
You can use them as you have been using them with Pandas
's DataFrame.
from dask_ml.preprocessing import RobustScalardf = da.read_csv("BigFile.csv", chunks=50000)rsc = RobustScalar()
df["column"] = rsc.fit_transform(df["column"])
And you can make a pipeline from sklearn
's make_pipeline
method using preprocessing methods of Dask
on Dask
's DataFrame
.
b) Hyper Parameter Search:
Dask
has methods from sklearn
for hyperparameter search such as GridSearchCV
, RandomizedSearchCV
etc.
from dask_ml.datasets import make_regression
from dask_ml.model_selection import train_test_split, GridSearchCV
X, y = make_regression(chunks=50000)
xtr, ytr, xval, yval = test_train_split(X, y)gsearch = GridSearchCV(estimator, param_grid, cv=10)
gsearch.fit(xtr, ytr)
And if you are using partial_fit
with your estimators, you can use dask-ml
's IncrementalSearchCV
.
NOTE: (from Dask)
If you want to use post-fit tasks like scoring and prediction, then underlying estimators scoring method is used. If your estimator, possibly from sklearn is not able to handle large dataset, then wrap your estimator around "dask_ml.wrappers.ParallelPostFit". It can parallelize methods like "predict", "predict_proba", "transform" etc.
c) Models/Estimators:
Dask
has some of the Linear Models (LinearRegression
, LogisticRegression
etc.), some Clustering Models ( Kmeans
and SpectralClustering
), a method to operate with Tensorflow
clusters, methods to train XGBoost
models using Dask
.
You can use sklearn
's models with Dask
, if your training data is small, maybe with with ParallelPostFit
wrapper (if your test data is large).
from sklearn.linear_model import ElasticNet
from dask_ml.wrappers import ParallelPostFitel = ParallelPostFit(estimator=ElasticNet())el.fit(Xtrain, ytrain)
preds = el.predict(Xtest)
If your dataset is not large but your model is big, then you can use joblib
. Many sklearns
algorithms were written for parallel execution (you might have used n_jobs=-1
argument), using joblib
which makes use of threads and processes to parallelize workload. To use Dask
to parallelize you can create a Client
(you have to) and then wrap your code around with joblib.parallel_backend('dask'):
.
import dask_ml.joblib
from sklearn.externals import joblibclient = Client()with joblib.parallel_backend('dask'):
# your scikit-learn codeNOTE:
Note that the Dask joblib backend is useful for scaling out CPU-bound workloads; workloads with datasets that fit in RAM, but have many individual operations that can be done in parallel. To scale out to RAM-bound workloads (larger-than-memory datasets) you should use Dask's inbuilt models and methods.
And if you training data is too big which cannot fit into memory, then you should use Dask
's inbuilt estimators to get a speedup. You can also use Dask
's wrapper.Incremental
which uses underlying estimator’s partial_fit
method to train on whole dataset but it is sequential in nature.
Dask
's inbuilt estimators scale well for large datasets with variety of optimization algorithms like admm
, lbfgs
, gradient_descent
etc. and with regularizers like L1
, L2
, ElasticNet
etc.
from dask_ml.linear_model import LogisticRegressionlr = LogisticRegression()
lr.fit(X, y, solver="lbfgs")
For one more example using Dask
you can read Dask
's section in my post here. It is a complete walk-through from exploration to training the model.
6. Further Reading ^
- https://mybinder.org/v2/gh/dask/dask-examples/master?urlpath=lab
- https://towardsdatascience.com/how-i-learned-to-love-parallelized-applies-with-python-pandas-dask-and-numba-f06b0b367138
- https://docs.dask.org/en/latest/
- https://ml.dask.org
7. References ^
Suggestions and reviews are welcome.
Thank you for reading!
Signed: