Powerful yet familiar workflow for the Data Analyst

Analyze billions of records faster with RAPIDS & Nvidia GPUs on Google Cloud’s AI Platform Notebooks.

Mikhail Chrestkha
Towards Data Science

--

Photo by Wiliam Iven on Unsplash

“Of the cast of characters mentioned … the only ones every business with data needs are decision-makers and analysts”

— Cassie Kozyrkov (Data Science’s Most Misunderstood Hero)

Analysts and “citizen data scientists” are the often forgotten heroes across every organization . They tend to have a wide range of responsibilities spanning business domain knowledge, data extraction & analysis, predictive analytics & machine learning, and reporting & communication to stakeholders. Piece of cake right?

And as the size of data has grown, many of these practitioners have had to learn parts of big data frameworks and infrastructure management. This increased scope of work is not sustainable and has a direct impact on the most important steps of the workflow: data exploration & experimentation. This can result in rudimentary reports, less accurate predictive models & forecasts, and less innovative insights & ideas.

Am I really suggesting yet another big data framework/service? Don’t we already have Hive, Impala, Presto, Spark, Beam, BigQuery, Athena, and the list goes on? Don’t get me wrong. For teams running data platforms for a large organization, one or more of these frameworks/services is essential for managing hundreds of batch and streaming jobs, a vast ecosystem of data sources, and production pipelines.

My focus here however, is the data analyst who wants a flexible and scalable solution with minimal code changes to accelerate their existing workflows. Before thinking about multi-node clusters and new frameworks, you’d be surprised how much can be done with your existing code on one machine with some help from GPUs. Using myself as a guinea pig, I wanted to explore a workflow with the following constraints:

  1. I want the setup (both hardware and software) to be easy and quick (< 30 min)
  2. I don’t want to manage a distributed cluster or learn a new framework
  3. I want full flexibility to interact with the python data & machine learning ecosystem (jupyter, pandas, xgboost, tensorflow, pytorch, sklearn)
  4. I want to be able to scale to 100s of millions of rows of data without waiting overnight for the results

These constraints led me to RAPIDS with the help of the new & powerful Nvidia A100 GPU.

Image by author

Why RAPIDS + Nvidia A100 GPUs on Google Cloud

  • Speed: GPUs have been at the forefront of machine learning innovation by speeding up the deep learning training step in a pipeline. RAPIDS is looking to provide similar gains across your entire workflow from data ingestion, data transformations, and predictive analytics. Dask extends RAPIDS to be able to scale across GPUs on a single cluster (and multi-node clusters if needed) and handle source data spread across 100s of files.
  • Interactive Python familiarity: RAPIDS for the most part follows the pandas and sklearn API and allows for a comfortable interactive workflow to analyze results after each step. The barrier to entry is much lower than learning a new framework with different concepts. There is a reason why Spark and Beam are both also heavily investing in the pandas API.
  • Scale at your own pace on-demand: Only use and pay for the the Nvidia A100 GPUs when you need to. Start with Pandas on CPUs & system RAM, test out RAPIDS on affordable T4 GPUs (or even for free on Kaggle & Colab) and then scale up to Nvidia A100s for your largest datasets.
  • Nvidia A100s on Google Cloud: 16 x 40GB GPU RAM: While you can creatively design around RAM limitations (i.e. partitions, offloading summary aggregations to a data warehouse) fitting data into GPU RAM allows for optimal speedup and full flexibility of analyzing data at the most granular level. Nvidia A100s have 40GB of GPU RAM and you can scale up to 16 of them on a single machine without having to worry about multi-node distribution. This gives you a whopping 640GB GPU RAM to play with. This both simplifies managing your environment and limits any networking issues that pop up when dealing with multi-node setups.

GPUs are not a silver bullet so take a look at my previous blog on assessing if GPUs are the right choice for you.

End-to-End Example

GitHub Notebook HERE

Business problem

I will be exploring the Fannie Mae Single-Family Loan Performance Data looking at 17 years (2000–2016) of performance history across ~37M loans and ~1.8B records. A few questions I aim to answer:

Descriptive Analytics:

  • What are number of loans originated by year?
  • What % of loans had delinquency events?
  • What years had the highest delinquency rate?

Predictive Analytics:

  • Can we predict the likelihood of delinquency for a future loan?

Hardware Setup

I first started off creating a managed JupyterLab environment on AI Platform Notebooks using a Python 3, CUDA 11.0, and Ubuntu 18.04 image. I chose a machine with 2 Nvidia A100 GPUs but feel free to start with 1 or 2 T4 GPUs to test on a smaller sample of data. Nvidia A100s are currently only available in the us-central1 region in case you don’t see it in the dropdown.

Image by author

Software Setup

Setting up a proper environment with many moving parts (GPU Drivers, CUDA, NCCL, RAPIDS, Python, etc.) can be complex and painful. I’ve already selected Ubuntu 18.04, Python 3 and CUDA 11.0 in our base image so we need to build the right compatible RAPIDS environment. Lucky for me the RAPIDS site has a great UI-tool that generates the exact conda command based on your needs. I added gcsfs to access files in Google Cloud Storage and matplotlib to generate some charts. Setting up both the hardware and software environment took me <20 min.

source: https://rapids.ai/start.html
conda create -n rapids-0.17 -c rapidsai -c nvidia -c conda-forge -c defaults rapids-blazing=0.17 python=3.7 cudatoolkit=11.0 matplotlib=3.3.3 gcsfs=0.7.1

Dask Initialization

Since my JupyterLab environment has 2 GPUs I will use dask to orchestrate RAPIDS treating each GPU as a worker. This is straightforward with 2 import statements and 2 cluster initialization lines.

from dask_cuda import LocalCUDACluster
from dask.distributed import Client
cluster = LocalCUDACluster()
client = Client(cluster)

Data Ingestion

You can download the full 17 years of data or a smaller subset to test this workflow from this Nvidia demo repository. I have 100+ csv files totaling ~200GB for the full 17 years sitting in GCS and can read them using the familiar pandas approach. You can also download the files to a local filesystem and update the location accordingly.

csv_acq_fnames='gs://mchrestkha-github-ml-examples/fannie_mae_loans/acq/Acquisition_20*'
csv_perf_fnames='gs://mchrestkha-github-ml-examples/fannie_mae_loans/perf/Performance_20*'
df_acq = dask_cudf.read_csv(csv_acq_fnames, sep='|', names=col_acq_names, dtype=dtype_acq, columns=col_acq, parse_dates=parse_dates_acq)df_per = dask_cudf.read_csv(csv_perf_fnames, sep='|', names=col_per_names, dtype=dtype_per, columns= col_per, parse_dates=parse_dates_per)

Data Exploration

One of the main reasons to use RAPIDS+dask is to leverage all your favorite pandas functions but running faster on GPUs across multiple workers. Below are a few examples I used for initial data exploration and profiling.

df_acq['OrYr']=df_acq['OrDate'].str[-4:]
df_acq['OrUnpaidPrinc $M']=df_acq['OrUnpaidPrinc']/1000000
df_acq.head()df_acq_describe=df_acq.describe().compute()df_acq_nulls=df_acq.isna().sum().compute()df_acq_summary = df_acq.groupby('OrYr',as_index=False).agg({'LoanID': 'count','OrUnpaidPrinc $M': 'sum'}).compute()

After understanding the structure, granularity, and information in the data, I define what I’m looking to predict. Two columns are interesting when looking at the data dictionary. The ‘Current Loan Delinquency Status’ can be used to flag whether a loan was ever delinquent and the ‘ForeclosureDate’ can be used to check if a property was foreclosed. I create flags for each event and can treat either as my target variables (aka label) depending on the business question I am trying to answer. For this example I am looking to predict whether a loan will have any delinquency event so I will use the former as a label.

df_per['DelinquentEvent']=0
df_per['DelinquentEvent']=df_per['DelinquentEvent'].where(df_per['CLDS']<1,1)
df_per['ForeclosureEvent']=0
df_per['ForeclosureEvent']=df_per['ForeclosureEvent'].where(df_per['ForeclosureDate'].isnull()== True,1)

One big takeaway for me was that while there are limitations for plotting charts directly from a dask_cudf or cudf dataframes you can design around that quite easily. I used the following approach:

  • Use GPUs + Dask for the heavy duty number crunching to create a summarized view
  • Convert the summarized dataframe back to pandas
  • Use any MatPlotlib library to plot your favorite charts using system RAM

Here is an example of summarizing the full 1.8B records and then plotting a time series chart of delinquency rates by year

df_per_yr_summary = df_per.groupby('YrRep',as_index=False).agg({'LoanID': 'count', 'DelinquentEvent':'sum'}).compute()df_per_yr_summary['DelinquencyEventsperLoan']=df_per_yr_summary['DelinquentEvent']/df_per_yr_summary['LoanID']df_per_yr_summary.to_pandas().sort_values(by=['YrRep']).plot.line(x='YrRep',y='DelinquencyEventsperLoan')
Image by author

Modeling Dataset

I then aggregate the data to one row (aka example) per loan and join with all the loan attributes (features) that were only available at origination. This ensures I have no data leakage.

df_per_loan = df_per.groupby('LoanID',as_index=False).agg({'DelinquentEvent':'sum'}).compute()df_per_loan['DelinquentFlag']=0
df_per_loan['DelinquentFlag']=df_per_loan['DelinquentFlag'].where(df_per_loan['DelinquentEvent']<1,1)
joined=df_acq.merge(df_per_loan,on=['LoanID'],how='left')

I then select my features and convert categorical variables into indicator variables (one-hot encoding).

label=['DelinquentFlag']
cat_features=['Channel','OccStatus','FTHomeBuyer','LoanPurpose','PropertyType','ProductType','RelMortInd']
num_features=['OrInterestRate','OrUnpaidPrinc','OrLoanTerm','OrLTV','OrCLTV','CreditScore']
modeling_dataset=joined_categorized[cat_features + num_features + label]
modeling_dataset=dask_df.get_dummies(modeling_dataset)

Training

I then finally set up my input parameters for XGBoost and train the model.

<define feature columns>X = joined[features]
y = joined['DelinquentFlag']
dtrain=xgb.dask.DaskDeviceQuantileDMatrix(client, X,y)
param = {
'max_depth': 8,
'objective': 'reg:squarederror',
'tree_method': 'gpu_hist'
}
bst = xgb.dask.train(client,param, dtrain,num_boost_round=100)

Summary

The big data ecosystem continues to evolve with a lots of choices (Spark, Beam, Flink, Dask, Modin, Ray, BigQuery, SnowFlake, Athena, etc.). Historically many of these options required a tradeoff between usability, flexibility and scale. Luckily many of these projects are investing to reduce the need for these tradeoffs.

I am most excited on the impact this will have on the unsung heroes of the data world, data analysts. Empowering data analysts and citizen data scientists to explore & analyze big data seamlessly has tremendous benefits to a business: more insightful BI reporting, more accurate forecasts & predictive models, and more data-driven & innovative decision-making.

I offer up one workflow with RAPIDS on Google Cloud’s AI Platform Notebooks with GPUs that any data analyst can leverage today. This architecture follows the principles below that make it a great first step to start accelerating your big data (and data science) workflows before potentially expanding to Spark or Beam for more complex pipeline orchestration needs.

  • Quick and easy setup (< 30 min)
  • No distributed cluster; Scale vertically on one machine before adding horizontal scaling complexity
  • Familiar python tools and APIs (pandas for the win!)
  • Scale to 100s of millions of rows of data
  • Accelerate the entire data life cycle

Big data and machine learning technologies are evolving as fast as ever. High tech companies and research departments will continue to push the limits for their advanced needs. It is however important to step back and think about what is available today. I believe there is a HUGE backlog of use cases that data analysts can solve better & faster with the tools available today. I am excited for many projects (RAPIDS, Dask, BQML, Koalas on Spark) focusing on scalability AND usability to lower the barrier to entry and empower a much larger part of the data community.

Have a question or want to chat? Find me on Twitter

The notebook examples from this blog can be found on my GitHub.

Big thanks to Ethem Can, Remy Welch, Dong Meng, Arun Raman, Rajesh Thallam, Michael Thomas, Rajan Arora and Subhan Ali for educating me and helping with the example workflow.

FAQ

  • [Q] Isn’t a data warehouse (like Snowflake or BigQuery) more efficient for the upstream data processing? [A] If your company/team has access to a scalable data warehouse then yes, push as much compute (that can be written with SQL) to it as you can early in your workflow. This blog’s focus was a self-contained workflow that only required object storage (or local file system) and an on-demand Jupyter notebook instance with a GPU. It follows more of a data lake architecture.
  • [Q] Aren’t GPUs super expensive? [A] First read this as GPUs may not be the right fit for your workflow. GPUs are however becoming the norm for the training step of machine learning models. If you plan to use GPUs for one step of your pipeline, RAPIDS allows you to run and speed up the rest of the pipeline, potentially increasing the ROI of the GPU.
  • [Q] How does RAPIDS fit in with Spark and Beam? [A] This blog focused on a single-node setup where various RAPIDS libraries (i.e. cudf, cuml) can be used stand-alone similar to pandas. For multi-node setups, you can think of RAPIDS as a lower level GPU acceleration layer that can power different distributed big data frameworks. Spark and Dask both leverage RAPIDS under the covers when running data tasks on GPUs. I assume the Apache Beam project may also look into RAPIDS integration for GPU acceleration in the future.
  • [Q] This doesn’t seem that fast! How much faster can this be optimized? [A] I am no expert so I’m sure there are several ways this could be sped up (i.e. optimal file sizes & partitions, indexing, leveraging some unused CPU cores, etc.). This will require some trial an error. One part I plan to explore more in the future is to use the Dask Dashboard to better understand bottlenecks and NVDashboard to better understand GPU utilization. If you have a use case, dataset and benchmark, I would love to connect and help you test it.
  • [Q] Can multiple users share the same environment to share infrastructure and GPUs? [A] Good catch! This workflow kept it simple with one user per machine (which can be shut off on-demand) but is still not cost effective for a team, especially for the more expensive Nvidia A100 or V100 GPUs. There are solutions to set up multi-tenant notebook environments on Kubernetes which do add some complexity. The Nvidia A100 is also intriguing with it’s multi-instance GPU (MIG) architecture which allows you to slice up each GPU into 7 units so different users can run tasks on each concurrently. Something to look into for a future blog.

--

--