2. Dask
Dask
is a parallel processing API that has analogous data structures to numpy
arrays, pandas
dataframes and Spark
RDDs. In the cases of arrays and dataframes, unlike their original data structures in numpy and pandas, the Dask
versions are distributed and operations are parallelized. The bag
data structure of dask is the analog to a Spark RDD, and both are distributed data structures. These distributed data structures are called high-level collections
in Dask.
Dask Analog |
Existing Data Structures |
---|---|
Array |
Numpy Array |
DataFrame |
Pandas DataFrame |
Bag |
Spark RDD |
Dask seems to take the best of what’s fundamental and available to the Python data science toolset and unifies them into a single framework. If you have ever worked with Apache Spark and seen its web interface, Dask has a similar dashboard (runs on port 8787
by default). Like Spark, Dask is also built on the concept of a driver
submitting jobs (centered on a distributed data structure) to a cluster
of workers. Here’s a more in depth comparison and
contrast of Dask and Spark. Let’s take glance at dask and see how it works.
2.1. Cluster and client
In Dask, the Client
represents the driver
and the LocalCluster
represents the cluster
. The driver controls the manipulation of distributed data through submission of jobs to the cluster. The LocalCluster
is not a real cluster of separate, physical worker nodes, but it mimics one and is useful for local development.
[1]:
from dask.distributed import Client, LocalCluster
params = {
'n_workers': 4,
'threads_per_worker': 2,
'dashboard_address': '8787'
}
cluster = LocalCluster(**params)
client = Client(cluster)
[2]:
client
[2]:
Client
|
Cluster
|
2.2. DataFrame
A Dask DataFrame behaves nearly identical to a pandas one. There’s a couple of ways to create a Dask DataFrame (e.g. reading from files), but, here, we create a Pandas one and use the from_pandas()
function to convert the Pandas DataFrame to a Dask one.
[3]:
import numpy as np
import random
import pandas as pd
from sklearn.datasets import make_regression, make_classification
np.random.seed(37)
random.seed(37)
def to_pdf(X, y):
A = pd.DataFrame(X, columns=[f'x{i}' for i in range(X.shape[1])])
b = pd.DataFrame(pd.Series(y, name='y'))
return A, b
def to_seq(X, y):
A = [{c: r[c] for c in X.columns} for _, r in X.iterrows()]
b = [{c: r[c] for c in y.columns} for _, r in y.iterrows()]
return A, b
def get_regression(n_samples=2000):
X, y = make_regression(**{
'n_samples': n_samples,
'n_features': 10,
'n_informative': 5,
'n_targets': 1,
'bias': 5.3,
'random_state': 37
})
return X, y
def get_classification(n_samples=2000):
X, y = make_classification(**{
'n_samples': n_samples,
'n_features': 10,
'n_informative': 5,
'n_redundant': 2,
'n_repeated': 0,
'n_classes': 2,
'n_clusters_per_class': 2,
'random_state': 37
})
return X, y
X, y = get_regression()
X, y = to_pdf(X, y)
As with Spark, we have to worry about the number of partitions in Dask as well. For now, we will specify 100 arbitrarily. Partitioning the data is always important as it impacts the distribution of computations and memory usage. When specifying npartitions
, this is the number of partitions you want, but you could end up with less partitions than requested. There’s another option, chunksize
, that specifies the number of records per partitions.
[4]:
import dask.dataframe as dd
X = dd.from_pandas(X, npartitions=100)
When attempting to get the string representation of a Dask DataFrame, notice how the actual data is not displayed, but, rather, just some metadata.
[5]:
X
[5]:
x0 | x1 | x2 | x3 | x4 | x5 | x6 | x7 | x8 | x9 | |
---|---|---|---|---|---|---|---|---|---|---|
npartitions=100 | ||||||||||
0 | float64 | float64 | float64 | float64 | float64 | float64 | float64 | float64 | float64 | float64 |
20 | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
1980 | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
1999 | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
We can check the data types of each column.
[6]:
X.dtypes
[6]:
x0 float64
x1 float64
x2 float64
x3 float64
x4 float64
x5 float64
x6 float64
x7 float64
x8 float64
x9 float64
dtype: object
We can invoke statistical functions like sum()
, mean()
and std()
on the DataFrame. However, if we want the results to come back to the client, we need to issue compute()
. This pattern of functions mimics Spark’s transformation
vs action
functions.
[7]:
X.sum().compute()
[7]:
x0 8.309788
x1 -7.660057
x2 -1.745221
x3 -8.071736
x4 -25.665312
x5 70.528265
x6 -18.859073
x7 -15.195804
x8 -46.170896
x9 -29.945949
dtype: float64
[8]:
X.mean().compute()
[8]:
x0 0.004155
x1 -0.003830
x2 -0.000873
x3 -0.004036
x4 -0.012833
x5 0.035264
x6 -0.009430
x7 -0.007598
x8 -0.023085
x9 -0.014973
dtype: float64
[9]:
X.std().compute()
[9]:
x0 0.984805
x1 1.004140
x2 1.014151
x3 1.016681
x4 0.982926
x5 0.981421
x6 0.986267
x7 1.025351
x8 1.019498
x9 0.986211
dtype: float64
2.3. Bag
Dask Bags are like Spark RDDs. You get similar functions to RDDs with Bags like map()
, filter()
and reduce()
. The reduceByKey()
in Spark RDDs is foldby()
in Dask Bags. The reduce()
in Spark RDDs is fold()
in Dask Bags.
Below, to create a Dask Bag, we use from_sequence()
where our collection has dictionary elements.
[10]:
import dask.bag as db
X, y = get_regression()
X, y = to_pdf(X, y)
X, y = to_seq(X, y)
X = db.from_sequence(X)
Here’s an example of map and reduce by key. In the map operation, we map a dictionary element to a tuple, where the first element is a boolean indicating if the integer representation of the x0
field is even, and the second element is the value of x0
.
[11]:
(X.map(lambda r: (int(r['x0']) % 2 == 0, r['x0']))
.foldby(lambda tup: tup[0], lambda a, b: (a[0], a[1] + b[1]))
.compute())
[11]:
[(True, (True, -7.959470449062381)), (False, (False, 16.269258267363348))]
In this example, we filter for even numbers.
[12]:
(X.map(lambda r: (int(r['x0']) % 2 == 0, r['x0']))
.filter(lambda tup: tup[0])
.foldby(lambda tup: tup[0], lambda a, b: (a[0], a[1] + b[1]))
.compute())
[12]:
[(True, (True, -7.959470449062381))]
In this exampke, we filter for odd numbers.
[13]:
(X.map(lambda r: (int(r['x0']) % 2 == 0, r['x0']))
.filter(lambda tup: not tup[0])
.foldby(lambda tup: tup[0], lambda a, b: (a[0], a[1] + b[1]))
.compute())
[13]:
[(False, (False, 16.269258267363348))]
This would be your classic map, filter and reduce example.
[14]:
(X.map(lambda r: r['x0'])
.filter(lambda x0: int(x0) % 2 == 0)
.fold(lambda a, b: a + b)
.compute())
[14]:
-7.959470449062381
2.4. Machine Learning
Dask has a machine learning package dask-ml. It’s very cool the API has a make_classification()
method just like with scikit-learn
.
[15]:
from dask_ml.datasets import make_classification
X, y = make_classification(chunks=50)
[16]:
X
[16]:
|
[17]:
y
[17]:
|
Here, we perform logistic regression with L2
penalty.
[18]:
%%time
from dask_ml.linear_model import LogisticRegression
model = LogisticRegression(penalty='l2', C=0.01, random_state=37, solver='admm')
model.fit(X, y)
[('intercept', model.intercept_)] + [(f, c) for f, c in zip(range(len(model.coef_)), model.coef_)]
Wall time: 12.3 s
[18]:
[('intercept', 0.031604621536995815),
(0, -0.03977702121551344),
(1, 0.02738286821465915),
(2, 0.03905960541075271),
(3, 0.04244144014184735),
(4, -0.024966699833507272),
(5, 0.041832909297795436),
(6, -0.06986791411991944),
(7, -0.003705773110285104),
(8, 0.08000742624249171),
(9, -0.030103932549586015),
(10, 0.02337067990625838),
(11, -0.030984617662260194),
(12, -0.015829171537806927),
(13, -0.05937940996774958),
(14, 0.012897158663833902),
(15, 0.003511501618180998),
(16, -0.007073093340879795),
(17, -0.033397647822395786),
(18, -0.11026077158815495),
(19, 0.022812162700462206)]
Here, we peform logistic regression with L1
penalty.
[19]:
%%time
model = LogisticRegression(penalty='l1', C=0.5, random_state=37, solver='admm')
model.fit(X, y)
[('intercept', model.intercept_)] + [(f, c) for f, c in zip(range(len(model.coef_)), model.coef_)]
Wall time: 5.67 s
[19]:
[('intercept', 0.05775287110215685),
(0, -0.19003640661111515),
(1, 0.03490567942649925),
(2, 0.06896348591496251),
(3, 0.12080581330088788),
(4, -0.1449561951791722),
(5, 0.1821003499095371),
(6, -0.377129037444486),
(7, 0.0),
(8, 0.30657389885412994),
(9, -0.15861731667477016),
(10, 0.003624785705384879),
(11, -0.05489291271773286),
(12, 0.0),
(13, -0.1865770525965461),
(14, 0.0),
(15, 0.0),
(16, 0.0),
(17, -0.08624180716536158),
(18, -0.5024166851150068),
(19, 0.06449899347591809)]