Archive

article-1-cluster

Pandas is an open-source Python library for doing working with tabular datasets. It provides an in-memory DataFrame, a container for heterogenous, tabular data. Pandas provides many methods for wrangling your data into shape.

In this article we'll attempt to analyze a larger-than-memory dataset. Since a single pandas dataframe can't be larger than memory, we'll need to adjust our workflow to cope. We'll then perform the same analysis with dask.dataframe, which gives us a pandas-like API for analyzing large datasets in parallel, potentially using a cluster of machines.

Our examples will work with data from the Federal Elections Commision. The dataset includes multiple tables, with information on dontations from individuals, committees that accpet those donations, what the money is spent on, and candidates running for office. Each table is split by election cycle (2007-2008, 2009-2010, ...).

Let's start by examining the individual dontains. Each record represents a single donation from an individual to a committee.

In [5]:
individual_cols = ['cmte_id', 'entity_tp', 'employer', 'occupation',
                   'transaction_dt', 'transaction_amt']
In [6]:
indiv = pd.read_parquet("data/indiv-08.parq", columns=individual_cols,
                        engine="fastparquet")
indiv.head()
Out[6]:
cmte_id entity_tp employer occupation transaction_dt transaction_amt
0 C00371021 IND THE BEAL COMPANIES LLC REAL ESTATE DEVELOPER 2007-05-22 1000
1 C00371021 IND BESWICK ENGINEERING CO. INC. PRESIDENT 2007-05-22 500
2 C00371021 IND SELF-EMPLOYED PHYSICIAN 2007-05-21 250
3 C00371021 IND RETIRED None 2007-05-08 2300
4 C00371021 IND RETIRED/N/A None 2007-06-12 250

Pandas makes most common operations one-liners. For example, we can compute the count of each occupation and select just the 100 most common.

In [7]:
indiv['occupation'].value_counts().nlargest(100)
Out[7]:
RETIRED               472458
ATTORNEY              279898
HOMEMAKER             111925
PRESIDENT             104568
PHYSICIAN             102005
                       ...  
CONSTRUCTION            3009
LETTER CARRIER          2911
PRODUCER                2910
INSURANCE BROKER        2902
INVESTMENT ADVISOR      2818
Name: occupation, Length: 100, dtype: int64

While my laptop can read in the file for 2008, it can't read the full dataset spanning 2008-2016 all at once. In this situation, pandas users will typically turn to chunking. We will

  1. Create a global total_counts variable that contains the counts from all of the files processed so far
  2. Read in a file
  3. Compute a temporary variable counts with the counts for just this file
  4. Add that temporary counts into the global total_counts
  5. Select the 100 largest with .nlargest
In [8]:
files = glob.glob("data/indiv-*.parq")
In [9]:
%%time
total_counts = pd.Series()
for year in files:
    df = pd.read_parquet(year, columns=individual_cols,
                         engine="fastparquet")
    counts = df.occupation.value_counts()
    total_counts = total_counts.add(counts, fill_value=0)
    

total_counts = total_counts.nlargest(100).sort_values(ascending=False)
total_counts
CPU times: user 32.7 s, sys: 7.22 s, total: 39.9 s
Wall time: 41.2 s

We've effectively implemented a MapReduce version of value_counts. We map pd.read_parquet and pandas.Series.value_counts over each file, giving the counts per occupation for each file. We reduce those by summing all the intermediate counts into the total_counts Series.

We were able to analyze a larger-than-memory dataset by breaking the large problem down into many smaller ones. In this case that was OK; we lost our nice one-liner, but the additional work of tracking all the intermediate counts and iterating through the chunks wasn't too bad. However, it isn't obvious how we could extend this approach to handle something like merging two dataframes, or more complicated aggregations than sum.

dask.dataframe is able to do all that for us. It us write pandas-like code for working with larger than memory datasets in parallel.

dask.dataframe

dask is a library that enables parallel computing. dask.dataframe is one of dask's collections. The collections provide APIs that mimic popular Python libraries (dask.array for large NumPy arrays, dask.dataframe for large pandas DataFrames, dask.bag for large Python collections). While the APIs are similar, the dask versions scale out to larger datasets.

We'll create a single dask DataFrame containing the entire dataset using dask's verison of read_parquet. We're using an f-string, new in python 3.6, to interpolate the name of the cloud storage bucket these files are sitting in.

In [8]:
import dask.dataframe as dd
from dask import compute

indiv = dd.read_parquet(f'{data}/indiv_all.parq',
                        columns=individual_cols,
                        engine="fastparquet",
                        storage_options=storage_options)
indiv
Out[8]:
Dask DataFrame Structure:
cmte_id entity_tp employer occupation transaction_dt transaction_amt
npartitions=50
object category[unknown] object object datetime64[ns] int64
... ... ... ... ... ...
... ... ... ... ... ... ...
... ... ... ... ... ...
... ... ... ... ... ...
Dask Name: read-parquet, 50 tasks

Dask collections are lazy. They don't actually execute until you ask for the result. So far, read_parquet has just read in some metadata like the column names and types. But you can still operate on dask DataFrames just like a normal pandas DataFrame. For example, we'll use the exact same pandas code as above for computing the 100 most common occupations.

In [9]:
counts = indiv.occupation.value_counts().nlargest(100)
counts
Out[9]:
Dask Series Structure:
npartitions=1
    int64
      ...
Name: occupation, dtype: int64
Dask Name: series-nlargest-agg, 160 tasks

When you call a method like .value_couts dask manipulates a task graph rather than computing the result immediately. counts contains a task graph with all the information needed to compute the counts when asked. These graphs can be visualized with the .visualize method. To save on space, here's a slightly simplified version of the counts task graph, with just the tasks for the 2008 and 2010 election cycles.

Out[16]:

Looking through the structure of the graph, you can see dask has done all the MapReduce-style work for us. It's read in each file independently (read_parquet), obtained the intermediate counts (value_counts), summed those together (sum), and selected the 100 largest (nlargest). We've successfully done a larger-than-memory value_counts in a single line of pandas-like code.

When you need the actual value, the graph can be executed by calling dask.compute or the .compute method.

In [10]:
%time counts.compute()
CPU times: user 83 ms, sys: 8 ms, total: 91 ms
Wall time: 26.5 s
Out[10]:
RETIRED                    4769520
NOT EMPLOYED               2656988
ATTORNEY                   1340434
PHYSICIAN                   659082
HOMEMAKER                   494187
                            ...   
CHIEF EXECUTIVE OFFICER      26551
SURGEON                      25521
EDITOR                       25457
OPERATOR                     25151
ORTHOPAEDIC SURGEON          24384
Name: occupation, Length: 100, dtype: int64

The diagnostics dashboard will show the progress of the computation. Each bar shows a particular kind of task (like getitem for selecting the occupations column). The bar is filled as dask begins executing and completes those tasks.

Out[2]:

Calling compute hands the task graph off to one of dask's schedulers. Schedulers are responsible for computing task graphs and returning concrete results. It's up to the scheduler to choose which tasks to run when, and how they should be run. You can use the cores on your laptop with the threaded scheduler, or using multiple processes with the multiprocessing scheduler, or even multiple machines in a cluster with the distributed scheduler. The best scheduler depends on your particular workload, but dask sets good defaults and makes it easy to override.

So that's a high-level summary of dask:

  1. Various collections collections like dask.dataframe and dask.array provide familiar APIs for working with large datasets.
  2. Computations are represented as a task graph. These graphs could be built by hand, or more commonly built by one of the collections.
  3. Dask schedulers run task graphs in parallel (potentially distributed across a cluster), reusing libraries like NumPy and pandas to do the computations.

One of the primary goals of dask is to parallelize the existing scientific python stack. Most of these libraries (most notably NumPy and pandas) can only operate on datasets that fit in RAM, and do so with only a single core of a single CPU of a single machine.

Rather than attempting to rewrite NumPy or pandas to be parallel-native, dask reuses those libraries for what they're great at: in-memory containers and computation engines. Dask enables analysis of large datasets (in parallel) by breaking the problem into many smaller problems and using NumPy, pandas, or other existing libraries to do the computation of the smaller problems. Dask coordinates everything by representing the smaller problems in a task graph.

Our value_counts example fits in the MapReduce pattern. But it's important to emphasize that dask isn't limited to simple map-reduce type computations. Let's continue exploring these datasets to demonstrate some more of dask's flexibility.

First, we'll persist the dataset in distributed RAM. This will avoid having to read the file from disk each time we go to do a computation.

In [11]:
indiv = indiv.persist()

Distributed Scheduler Dashboard showing the processing of indiv.persit(). This begins reading data from disk and stores the large dataframe in distributed memory.

Out[3]:

Using regular pandas syntax, we can compute summary statistics like the average mean and standard deviation of the transaction amount:

In [12]:
avg_transaction = indiv.transaction_amt.mean()

Or we can answer questions like "Which employer's employees donated the most?"

In [13]:
total_by_employee = (
    indiv.groupby('employer')
        .transaction_amt.sum()
        .nlargest(10)
)

Or "what is the average amount donated per occupation?"

In [14]:
avg_by_occupation = (
    indiv.groupby("occupation")
        .transaction_amt.mean()
        .nlargest(10)
)

Since dask is lazy, we haven't actually computed anything.

In [15]:
total_by_employee
Out[15]:
Dask Series Structure:
npartitions=1
    int64
      ...
Name: transaction_amt, dtype: int64
Dask Name: series-nlargest-agg, 110 tasks

avg_transaction, avg_by_occupation and total_by_employee are three separate computations (they have different task graphs), but we know they share some structure: they're all reading in the same data, they might select the same subset of columns, and so on. Dask is able to avoid redundant computation when you use the top-level dask.compute function.

In [16]:
%%time
avg_transaction, by_employee, by_occupation = compute(
    avg_transaction, total_by_employee, avg_by_occupation
)
CPU times: user 105 ms, sys: 8 ms, total: 113 ms
Wall time: 28.1 s

Dask distributed dashboard showing many computations. The task stream (middle) shows tasks being processed, color-coded with the progress bars below. Communication between workers is overlaid in red.

Out[6]: