*This work is supported by Anaconda, Inc. and the
Data Driven Discovery Initiative from the Moore Foundation.*

This is part two of my series on scalable machine learning.

You can download a notebook of this post here.

Scikit-learn supports out-of-core learning (fitting a model on a dataset that
doesn't fit in RAM), through it's `partial_fit`

API. See
here.

The basic idea is that, *for certain estimators*, learning can be done in
batches. The estimator will see a batch, and then incrementally update whatever
it's learning (the coefficients, for example). This
link
has a list of the algorithms that implement `partial_fit`

.

Unfortunately, the `partial_fit`

API doesn't play that nicely with my favorite
part of scikit-learn,
pipelines,
which we discussed at length in part 1. For pipelines to work,
you would essentially need every step in the pipeline to have an out-of-core
`partial_fit`

version, which isn't really feasible; some algorithms just have to
see the entire dataset at once. Setting that aside, it wouldn't be great for a
user, since working with generators of datasets is awkward compared to the
expressivity we get from pandas and NumPy.

Fortunately, we *have* great data containers for larger than memory arrays and
dataframes: `dask.array`

and `dask.dataframe`

. We can

- Use dask for pre-processing data in an out-of-core manner
- Use scikit-learn to fit the actual model, out-of-core, using the
`partial_fit`

API

And with a little bit of work, all of this can be done in a pipeline. The rest of this post shows how.

## Big Arrays

If you follow along in the companion notebook, you'll see that I
generate a dataset, replicate it 100 times, and write the results out to disk. I
then read it back in as a pair of `dask.dataframe`

s and convert them to a pair
of `dask.array`

s. I'll skip those details to focus on main goal: using
`sklearn.Pipeline`

s on larger-than-memory datasets. Suffice to say, we have a
function `read`

that gives us our big `X`

and `y`

:

```
X, y = read()
X
```

```
dask.array<concatenate, shape=(100000000, 20), dtype=float64, chunksize=(500000, 20)>
```

```
y
```

```
dask.array<squeeze, shape=(100000000,), dtype=float64, chunksize=(500000,)>
```

So `X`

is a 100,000,000 x 20 array of floats (I have float64s, you're probably
fine with float32s) that we'll use to predict `y`

. I generated the dataset, so I
know that `y`

is either 0 or 1. We'll be doing classification.

```
(X.nbytes + y.nbytes) / 10**9
```

```
16.8
```

My laptop has 16 GB of RAM, and the dataset is 16.8 GB. We can't simply read the entire thing into memory. We'll use dask for the preprocessing, and scikit-learn for the fitting. We'll have a small pipeline

- Scale the features by mean and variance
- Fit an SGDClassifier

I've implemented a `daskml.preprocessing.StandardScaler`

, using dask, in about
40 lines of code (see here).
The scaling will be done completely in parallel and completely out-of-core.

I *haven't* implemented a custom `SGDClassifier`

, because that'd be much more
than 40 lines of code. Instead, I've put together a small wrapper that will use
scikit-learn's `SGDClassifier.partial_fit`

to fit the model out-of-core (but not
in parallel).

```
from daskml.preprocessing import StandardScaler
from daskml.linear_model import BigSGDClassifier # The wrapper
from dask.diagnostics import ResourceProfiler, Profiler, ProgressBar
from sklearn.pipeline import make_pipeline
```

As a user, the API is the same as `scikit-learn`

. Indeed, it *is* just a regular
`sklearn.pipeline.Pipeline`

.

```
pipe = make_pipeline(
StandardScaler(),
BigSGDClassifier(classes=[0, 1], max_iter=1000, tol=1e-3, random_state=2),
)
```

And fitting is identical as well: `pipe.fit(X, y)`

. We'll collect some
performance metrics as well, so we can analyze our parallelism.

```
%%time
rp = ResourceProfiler()
p = Profiler()
with p, rp:
pipe.fit(X, y)
```

```
CPU times: user 2min 38s, sys: 1min 44s, total: 4min 22s
Wall time: 1min 47s
```

And that's it. It's just a regular scikit-learn pipeline, operating on a
larger-than-memory data. `pipe`

has has all the regular methods you would
expect, `predict`

, `predict_proba`

, etc. You can get to the individual
attributes like `pipe.steps[1][1].coef_`

.

One important point to stress here: when we get to the `BigSGDClassifier.fit`

at the end of the pipeline, everything is done serially. We can see that by
plotting the `Profiler`

we captured up above:

That graph shows the tasks (the rectangles) each worker (a core on my laptop)
executed over time. Workers are along the vertical axis, and time is along the
horizontal. Towards the start, when we're reading off disk, converting to
`dask.array`

s, and doing the `StandardScaler`

, everything is in parallel. Once
we get to the `BigSGDClassifier`

, which is just a simple wrapper around
`sklearn.linear_model.SGDClassifier`

, we lose all our parallelism*.

The predict step *is* done entirely in parallel.

```
with rp, p:
predictions = pipe.predict(X)
predictions.to_dask_dataframe(columns='a').to_parquet('predictions.parq')
```

That took about 40 seconds, from disk to prediction, and back to disk on 16 GB of data, using all 8 cores of my laptop.

## How?

When I had this idea last week, of feeding blocks of `dask.array`

to a
scikit-learn estimator's `partial_fit`

method, I thought it was pretty neat.
Turns out Matt Rocklin already had the idea, and implemented it in dask, two
years ago.

Roughly speaking, the implementation is:

```
class BigSGDClassifier(SGDClassifier):
...
def fit(self, X, y):
# ... some setup
for xx, yy in by_blocks(X, y):
self.partial_fit(xx, yy)
return self
```

If you aren't familiar with `dask`

, its arrays are composed of many smaller
NumPy arrays (blocks in the larger dask array). We iterate over the dask arrays
block-wise, and pass them into the estimators `partial_fit`

method. That's exactly
what you would be doing if you were using, say, a generator feed NumPy arrays to
the `partial_fit`

method. Only you can manipulate a `dask.array`

like regular
NumPy array, so things are more convenient.

## Some Challenges

For our small pipeline, we had to make two passes over the data. One to fit the
`StandardScaler`

and one to fit the `BigSGDClassifier`

. In general, with
this approach, we'll have to make one pass per stage of the pipeline, which
isn't great. I *think* this is unavoidable with the current design, but I'm
considering ways around it.

## Recap

We've seen *a* way to use scikit-learn's existing estimators on
larger-than-memory dask arrays by passing the blocks of a dask array to the
`partial_fit`

method. This enables us to use `Pipeline`

s on larger-than-memory
datasets.

Let me know what you think. I'm pretty excited about this because it removes
some of the friction around using sckit-learn Pipelines with out-of-core
estimators. In `dask-ml`

, I've implemented similar wrappers for

- SGDRegressor
- PassiveAggressiveClassifier
- PassiveAggressiveRegressor
- Perceptron
- MPLClassifier
- MLPRegressor
- MiniBatchKMeans

I'll be packaging this up in `daskml`

to make it more usable for the
community over the next couple weeks. If this type of work interests you, please
reach out on Twitter or by
email at tom.w.augspurger@gmail.com. If you're interested in contributing, I
think a library of basic transformers that operate on NumPy and dask arrays and
pandas and dask DataFrames would be *extremely* useful. I've started an
issue to track this progress.
Contributions would be extremely welcome.

Next time we'll be going back to smaller datasets. We'll see how dask can help us parallelize our work to fit more models in less time.