Photo by author. Altea, Spain, Spring 2011.

A modern guide to Spark RDDs

Everyday opportunities to reach the full potential of PySpark

Leo Lezcano
Towards Data Science
5 min readJun 5, 2020

--

The web is full of Apache Spark tutorials, cheatsheets, tips and tricks. Lately, most of them have been focusing on Spark SQL and Dataframes, because they offer a gentle learning curve, with a familiar SQL syntax, as opposed to the steeper curve required for the older RDD API. However, it’s the versatility and stability of RDDs what ignited the Spark adoption in 2015, and turned it into a dominant framework for distributed data processing.

As you become a regular Spark user, mainly three circumstances will lead you to consider the RDD API:

  • Accessing raw unstructured data sources
  • Performing data operations which are more suited for a general programming language than for SQL. Avoiding UDFs
  • Parallelizing the execution of a third party library

This opinionated material covers such circumstances and the questions that frequently arise from them, so that you can enjoy the full potential of PySpark.

Table of Contents

A note on style
Grow your functional python vertically
Append the Spark type to variable names

Turning Dataframes into RDDs & vice versa
Dataframe to RDD
RDD to Dataframe

The expressive Python dictionaries
One-line dictionary transformations
Python sets & dictionaries are unhashable

Caching & Broadcasting
Caching RDDs
Unpersisting RDDs
Cleaning the whole RDD cache

Distributed execution of Python libraries
Numpy — A generic approach
NLTK — Partition setup

A note on style

Code readability is particularly important when functional programming meets big data, on the one hand because traditional IDE debugging tools have been designed around imperative programming, and on the other, because multiple code runs for debugging purposes is expensive and time consuming.

Grow your Functional Python vertically

Spark logics are defined as DAGs. Keep the code narrow to improve readability by breaking the lines with a BACKSLASH “\” after every transformation or action.

Append the Spark type to variable names

Distributed data will largely fall into one of the three types:

Turning Dataframes into RDDs and vice versa

The backbone of a Dataframe is an RDD[Row], a Spark type that behaves very similar to a Python dictionary. As you can see below this Row type serves as a bridge between the two APIs.

Dataframe to RDD

  • Approach: Dataframe -> RDD[Row] -> RDD[dict]

RDD to Dataframe

  • Approach A: RDD[dict] -> RDD[Row] -> Dataframe
  • Approach B: RDD[tuples] -> Dataframe

The expressive Python dictionaries

As the chain of operations on an RDD grows long, it’s challenging to keep the code readable and the logic understandable. Python dictionaries play a key role in this regard. As opposed to Scala tuples, they allow accessing fields by name rather than position, and as opposed to Scala Case Classes, they can be created in-line without previous external definition.

One-line dictionary transformations

Lambda functions are syntactically restricted to a single expression. In the common scenario where an RDD[dict] transformation is needed, consider these one-line lambdas. Note that **old_dict leads to a shallow copy, but no deepcopy operations are required inside RDD operations, as PySpark guarantees the new dictionary to be totally independent, ie. RDDs are immutable.

Python sets and dictionaries are unhashable types

RDD aggregations such as groupByKey and reduceByKey require the Key to be a hashable type for shuffling purposes. Therefore, avoid using Python dictionaries and sets as shuffling keys. If you must, consider using frozensets.

Caching and Broadcasting

Learning how to customize Spark’s distributed memory processing leads to optimal resource usage for ETLs and ML training jobs. Also, it’s the key ingredient of a fast and smooth REPL experience for Jupyter and Zeppelin notebooks.

Caching RDDs

See RDD.cache for the common use case of persisting to MEMORY, or RDD.persist for other storage levels. Note that line 4 below sets the caching instruction but only an action like line 6 will trigger the DAG execution and subsequent storage in memory.

Unpersisting RDDs

There are mainly two reasons to invoke RDD.unpersist and remove all its blocks from memory and disk:

  1. You’re done using the RDD, ie. all the actions depending on the RDD have been executed, and you want to free up storage for further steps in your pipeline or ETL job.
  2. You want to modify the persisted RDD, a frequent use case when working on Jupyter/Zeppelin notebooks. Given that RDDs are immutable, what you can do is reuse the RDD name to point to a new RDD. Therefore, if the code above is ran twice, you’ll end up with two allocations in memory of the same cached_rdd. After the first run, cached_rdd will point to the first allocation below, and then to the second, leaving the first allocation orphaned.

To avoid duplicate memory allocation, prepend a try-unpersist (line 1–4 below) to the Jupyter paragraph that initializes the RDD:

Cleaning the RDD cache

Unpersisting all RDDs can be achieved as follows:

Distributed execution of Python libraries

The flexibility of RDDs allows to distribute the payload when running practically any Python code. For computationally inexpensive tasks such as O(n) and below, truly big data is required for the benefits of parallelization to be obvious. However, for above linear complexity, parallelization can easily turn hours of medium-sized data jobs into minutes, or minutes of small data jobs into seconds.

Numpy — A generic approach

In order to parallelize the execution of libraries such as statsmodels, scikit-learn, numpy, simply invoke them from inside a map, flatMap, filter or any other transformation. In the example below, the np.median call is inside an RDD map, therefore, it will run locally in each Spark executor:

NLTK — Partition setup

The Natural Language Toolkit (NLTK) library requires further setup in addition to importing it. The nltk.download call in the example below must run in each executor to guarantee local nltk data availability. In such cases, consider using RDD.mapPartitions to avoid redundant calls to nltk.download inside the same executor. The RDD mapPartitions call allows to operate on the whole list of RDD entries for each partition, while the RDD map/flatMap/filter work on each RDD entry and offer no visibility to which partition the entry belongs to:

… more sections coming soon.

--

--