This is part two of my series on scalable machine learning.
You can download a notebook of this post here.
Scikit-learn supports out-of-core learning (fitting a model on a dataset that
doesn't fit in RAM), through it's
partial_fit API. See
The basic idea is that, for certain estimators, learning can be done in
batches. The estimator will see a batch, and then incrementally update whatever
it's learning (the coefficients, for example). This
has a list of the algorithms that implement
partial_fit API doesn't play that nicely with my favorite
part of scikit-learn,
which we discussed at length in part 1. For pipelines to work,
you would essentially need every step in the pipeline to have an out-of-core
partial_fit version, which isn't really feasible; some algorithms just have to
see the entire dataset at once. Setting that aside, it wouldn't be great for a
user, since working with generators of datasets is awkward compared to the
expressivity we get from pandas and NumPy.
Fortunately, we have great data containers for larger than memory arrays and
dask.dataframe. We can
- Use dask for pre-processing data in an out-of-core manner
- Use scikit-learn to fit the actual model, out-of-core, using the
And with a little bit of work, all of this can be done in a pipeline. The rest of this post shows how.
If you follow along in the companion notebook, you'll see that I
generate a dataset, replicate it 100 times, and write the results out to disk. I
then read it back in as a pair of
dask.dataframes and convert them to a pair
dask.arrays. I'll skip those details to focus on main goal: using
sklearn.Pipelines on larger-than-memory datasets. Suffice to say, we have a
read that gives us our big
X, y = read() X
dask.array<concatenate, shape=(100000000, 20), dtype=float64, chunksize=(500000, 20)>
dask.array<squeeze, shape=(100000000,), dtype=float64, chunksize=(500000,)>
X is a 100,000,000 x 20 array of floats (I have float64s, you're probably
fine with float32s) that we'll use to predict
y. I generated the dataset, so I
y is either 0 or 1. We'll be doing classification.
(X.nbytes + y.nbytes) / 10**9
My laptop has 16 GB of RAM, and the dataset is 16.8 GB. We can't simply read the entire thing into memory. We'll use dask for the preprocessing, and scikit-learn for the fitting. We'll have a small pipeline
- Scale the features by mean and variance
- Fit an SGDClassifier
I've implemented a
daskml.preprocessing.StandardScaler, using dask, in about
40 lines of code (see here).
The scaling will be done completely in parallel and completely out-of-core.
I haven't implemented a custom
SGDClassifier, because that'd be much more
than 40 lines of code. Instead, I've put together a small wrapper that will use
SGDClassifier.partial_fit to fit the model out-of-core (but not
from daskml.preprocessing import StandardScaler from daskml.linear_model import BigSGDClassifier # The wrapper from dask.diagnostics import ResourceProfiler, Profiler, ProgressBar from sklearn.pipeline import make_pipeline
As a user, the API is the same as
scikit-learn. Indeed, it is just a regular
pipe = make_pipeline( StandardScaler(), BigSGDClassifier(classes=[0, 1], max_iter=1000, tol=1e-3, random_state=2), )
And fitting is identical as well:
pipe.fit(X, y). We'll collect some
performance metrics as well, so we can analyze our parallelism.
%%time rp = ResourceProfiler() p = Profiler() with p, rp: pipe.fit(X, y)
CPU times: user 2min 38s, sys: 1min 44s, total: 4min 22s Wall time: 1min 47s
And that's it. It's just a regular scikit-learn pipeline, operating on a
pipe has has all the regular methods you would
predict_proba, etc. You can get to the individual
One important point to stress here: when we get to the
at the end of the pipeline, everything is done serially. We can see that by
Profiler we captured up above:
That graph shows the tasks (the rectangles) each worker (a core on my laptop)
executed over time. Workers are along the vertical axis, and time is along the
horizontal. Towards the start, when we're reading off disk, converting to
dask.arrays, and doing the
StandardScaler, everything is in parallel. Once
we get to the
BigSGDClassifier, which is just a simple wrapper around
sklearn.linear_model.SGDClassifier, we lose all our parallelism*.
The predict step is done entirely in parallel.
with rp, p: predictions = pipe.predict(X) predictions.to_dask_dataframe(columns='a').to_parquet('predictions.parq')
That took about 40 seconds, from disk to prediction, and back to disk on 16 GB of data, using all 8 cores of my laptop.
When I had this idea last week, of feeding blocks of
dask.array to a
partial_fit method, I thought it was pretty neat.
Turns out Matt Rocklin already had the idea, and implemented it in dask, two
Roughly speaking, the implementation is:
class BigSGDClassifier(SGDClassifier): ... def fit(self, X, y): # ... some setup for xx, yy in by_blocks(X, y): self.partial_fit(xx, yy) return self
If you aren't familiar with
dask, its arrays are composed of many smaller
NumPy arrays (blocks in the larger dask array). We iterate over the dask arrays
block-wise, and pass them into the estimators
partial_fit method. That's exactly
what you would be doing if you were using, say, a generator feed NumPy arrays to
partial_fit method. Only you can manipulate a
dask.array like regular
NumPy array, so things are more convenient.
For our small pipeline, we had to make two passes over the data. One to fit the
StandardScaler and one to fit the
BigSGDClassifier. In general, with
this approach, we'll have to make one pass per stage of the pipeline, which
isn't great. I think this is unavoidable with the current design, but I'm
considering ways around it.
We've seen a way to use scikit-learn's existing estimators on
larger-than-memory dask arrays by passing the blocks of a dask array to the
partial_fit method. This enables us to use
Pipelines on larger-than-memory
Let me know what you think. I'm pretty excited about this because it removes
some of the friction around using sckit-learn Pipelines with out-of-core
dask-ml, I've implemented similar wrappers for
I'll be packaging this up in
daskml to make it more usable for the
community over the next couple weeks. If this type of work interests you, please
reach out on Twitter or by
email at email@example.com. If you're interested in contributing, I
think a library of basic transformers that operate on NumPy and dask arrays and
pandas and dask DataFrames would be extremely useful. I've started an
issue to track this progress.
Contributions would be extremely welcome.
Next time we'll be going back to smaller datasets. We'll see how dask can help us parallelize our work to fit more models in less time.