2. Dask

Dask is a parallel processing API that has analogous data structures to numpy arrays, pandas dataframes and Spark RDDs. In the cases of arrays and dataframes, unlike their original data structures in numpy and pandas, the Dask versions are distributed and operations are parallelized. The bag data structure of dask is the analog to a Spark RDD, and both are distributed data structures. These distributed data structures are called high-level collections in Dask.

Dask Analog

Existing Data Structures

Array

Numpy Array

DataFrame

Pandas DataFrame

Bag

Spark RDD

Dask seems to take the best of what’s fundamental and available to the Python data science toolset and unifies them into a single framework. If you have ever worked with Apache Spark and seen its web interface, Dask has a similar dashboard (runs on port 8787 by default). Like Spark, Dask is also built on the concept of a driver submitting jobs (centered on a distributed data structure) to a cluster of workers. Here’s a more in depth comparison and contrast of Dask and Spark. Let’s take glance at dask and see how it works.

2.1. Cluster and client

In Dask, the Client represents the driver and the LocalCluster represents the cluster. The driver controls the manipulation of distributed data through submission of jobs to the cluster. The LocalCluster is not a real cluster of separate, physical worker nodes, but it mimics one and is useful for local development.

[1]:
from dask.distributed import Client, LocalCluster

params = {
    'n_workers': 4,
    'threads_per_worker': 2,
    'dashboard_address': '8787'
}
cluster = LocalCluster(**params)
client = Client(cluster)
[2]:
client
[2]:

Client

Cluster

  • Workers: 4
  • Cores: 8
  • Memory: 25.76 GB

2.2. DataFrame

A Dask DataFrame behaves nearly identical to a pandas one. There’s a couple of ways to create a Dask DataFrame (e.g. reading from files), but, here, we create a Pandas one and use the from_pandas() function to convert the Pandas DataFrame to a Dask one.

[3]:
import numpy as np
import random
import pandas as pd
from sklearn.datasets import make_regression, make_classification

np.random.seed(37)
random.seed(37)

def to_pdf(X, y):
    A = pd.DataFrame(X, columns=[f'x{i}' for i in range(X.shape[1])])
    b = pd.DataFrame(pd.Series(y, name='y'))
    return A, b

def to_seq(X, y):
    A = [{c: r[c] for c in X.columns} for _, r in X.iterrows()]
    b = [{c: r[c] for c in y.columns} for _, r in y.iterrows()]
    return A, b

def get_regression(n_samples=2000):
    X, y = make_regression(**{
        'n_samples': n_samples,
        'n_features': 10,
        'n_informative': 5,
        'n_targets': 1,
        'bias': 5.3,
        'random_state': 37
    })

    return X, y

def get_classification(n_samples=2000):
    X, y = make_classification(**{
        'n_samples': n_samples,
        'n_features': 10,
        'n_informative': 5,
        'n_redundant': 2,
        'n_repeated': 0,
        'n_classes': 2,
        'n_clusters_per_class': 2,
        'random_state': 37
    })

    return X, y

X, y = get_regression()
X, y = to_pdf(X, y)

As with Spark, we have to worry about the number of partitions in Dask as well. For now, we will specify 100 arbitrarily. Partitioning the data is always important as it impacts the distribution of computations and memory usage. When specifying npartitions, this is the number of partitions you want, but you could end up with less partitions than requested. There’s another option, chunksize, that specifies the number of records per partitions.

[4]:
import dask.dataframe as dd

X = dd.from_pandas(X, npartitions=100)

When attempting to get the string representation of a Dask DataFrame, notice how the actual data is not displayed, but, rather, just some metadata.

[5]:
X
[5]:
Dask DataFrame Structure:
x0 x1 x2 x3 x4 x5 x6 x7 x8 x9
npartitions=100
0 float64 float64 float64 float64 float64 float64 float64 float64 float64 float64
20 ... ... ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ... ... ...
1980 ... ... ... ... ... ... ... ... ... ...
1999 ... ... ... ... ... ... ... ... ... ...
Dask Name: from_pandas, 100 tasks

We can check the data types of each column.

[6]:
X.dtypes
[6]:
x0    float64
x1    float64
x2    float64
x3    float64
x4    float64
x5    float64
x6    float64
x7    float64
x8    float64
x9    float64
dtype: object

We can invoke statistical functions like sum(), mean() and std() on the DataFrame. However, if we want the results to come back to the client, we need to issue compute(). This pattern of functions mimics Spark’s transformation vs action functions.

[7]:
X.sum().compute()
[7]:
x0     8.309788
x1    -7.660057
x2    -1.745221
x3    -8.071736
x4   -25.665312
x5    70.528265
x6   -18.859073
x7   -15.195804
x8   -46.170896
x9   -29.945949
dtype: float64
[8]:
X.mean().compute()
[8]:
x0    0.004155
x1   -0.003830
x2   -0.000873
x3   -0.004036
x4   -0.012833
x5    0.035264
x6   -0.009430
x7   -0.007598
x8   -0.023085
x9   -0.014973
dtype: float64
[9]:
X.std().compute()
[9]:
x0    0.984805
x1    1.004140
x2    1.014151
x3    1.016681
x4    0.982926
x5    0.981421
x6    0.986267
x7    1.025351
x8    1.019498
x9    0.986211
dtype: float64

2.3. Bag

Dask Bags are like Spark RDDs. You get similar functions to RDDs with Bags like map(), filter() and reduce(). The reduceByKey() in Spark RDDs is foldby() in Dask Bags. The reduce() in Spark RDDs is fold() in Dask Bags.

Below, to create a Dask Bag, we use from_sequence() where our collection has dictionary elements.

[10]:
import dask.bag as db

X, y = get_regression()
X, y = to_pdf(X, y)
X, y = to_seq(X, y)
X = db.from_sequence(X)

Here’s an example of map and reduce by key. In the map operation, we map a dictionary element to a tuple, where the first element is a boolean indicating if the integer representation of the x0 field is even, and the second element is the value of x0.

[11]:
(X.map(lambda r: (int(r['x0']) % 2 == 0, r['x0']))
 .foldby(lambda tup: tup[0], lambda a, b: (a[0], a[1] + b[1]))
 .compute())
[11]:
[(True, (True, -7.959470449062381)), (False, (False, 16.269258267363348))]

In this example, we filter for even numbers.

[12]:
(X.map(lambda r: (int(r['x0']) % 2 == 0, r['x0']))
 .filter(lambda tup: tup[0])
 .foldby(lambda tup: tup[0], lambda a, b: (a[0], a[1] + b[1]))
 .compute())
[12]:
[(True, (True, -7.959470449062381))]

In this exampke, we filter for odd numbers.

[13]:
(X.map(lambda r: (int(r['x0']) % 2 == 0, r['x0']))
 .filter(lambda tup: not tup[0])
 .foldby(lambda tup: tup[0], lambda a, b: (a[0], a[1] + b[1]))
 .compute())
[13]:
[(False, (False, 16.269258267363348))]

This would be your classic map, filter and reduce example.

[14]:
(X.map(lambda r: r['x0'])
 .filter(lambda x0: int(x0) % 2 == 0)
 .fold(lambda a, b: a + b)
 .compute())
[14]:
-7.959470449062381

2.4. Machine Learning

Dask has a machine learning package dask-ml. It’s very cool the API has a make_classification() method just like with scikit-learn.

[15]:
from dask_ml.datasets import make_classification

X, y = make_classification(chunks=50)
[16]:
X
[16]:
Array Chunk
Bytes 16.00 kB 8.00 kB
Shape (100, 20) (50, 20)
Count 2 Tasks 2 Chunks
Type float64 numpy.ndarray
20 100
[17]:
y
[17]:
Array Chunk
Bytes 400 B 200 B
Shape (100,) (50,)
Count 25 Tasks 2 Chunks
Type int32 numpy.ndarray
100 1

Here, we perform logistic regression with L2 penalty.

[18]:
%%time

from dask_ml.linear_model import LogisticRegression

model = LogisticRegression(penalty='l2', C=0.01, random_state=37, solver='admm')
model.fit(X, y)
[('intercept', model.intercept_)] + [(f, c) for f, c in zip(range(len(model.coef_)), model.coef_)]
Wall time: 12.3 s
[18]:
[('intercept', 0.031604621536995815),
 (0, -0.03977702121551344),
 (1, 0.02738286821465915),
 (2, 0.03905960541075271),
 (3, 0.04244144014184735),
 (4, -0.024966699833507272),
 (5, 0.041832909297795436),
 (6, -0.06986791411991944),
 (7, -0.003705773110285104),
 (8, 0.08000742624249171),
 (9, -0.030103932549586015),
 (10, 0.02337067990625838),
 (11, -0.030984617662260194),
 (12, -0.015829171537806927),
 (13, -0.05937940996774958),
 (14, 0.012897158663833902),
 (15, 0.003511501618180998),
 (16, -0.007073093340879795),
 (17, -0.033397647822395786),
 (18, -0.11026077158815495),
 (19, 0.022812162700462206)]

Here, we peform logistic regression with L1 penalty.

[19]:
%%time

model = LogisticRegression(penalty='l1', C=0.5, random_state=37, solver='admm')
model.fit(X, y)
[('intercept', model.intercept_)] + [(f, c) for f, c in zip(range(len(model.coef_)), model.coef_)]
Wall time: 5.67 s
[19]:
[('intercept', 0.05775287110215685),
 (0, -0.19003640661111515),
 (1, 0.03490567942649925),
 (2, 0.06896348591496251),
 (3, 0.12080581330088788),
 (4, -0.1449561951791722),
 (5, 0.1821003499095371),
 (6, -0.377129037444486),
 (7, 0.0),
 (8, 0.30657389885412994),
 (9, -0.15861731667477016),
 (10, 0.003624785705384879),
 (11, -0.05489291271773286),
 (12, 0.0),
 (13, -0.1865770525965461),
 (14, 0.0),
 (15, 0.0),
 (16, 0.0),
 (17, -0.08624180716536158),
 (18, -0.5024166851150068),
 (19, 0.06449899347591809)]