Apache Spark with Kubernetes and Fast S3 Access

Yifeng Jiang
Towards Data Science
7 min readMay 7, 2020

--

To make the most of Apache Spark, we need two things:

  • A distributed storage to store the data
  • A scheduler to run the Spark executors across a computing cluster

People have been doing this differently on-premise and cloud. With on-premise, most use Spark with Hadoop, or particularly HDFS for the storage and YARN for the scheduler. While in the cloud, most use object storage like Amazon S3 for the storage, and a separate cloud-native service such as Amazon EMR or Databricks for the scheduler. What if we could use Spark in a single architecture on-promise or in the cloud?

Enter Spark with Kubernetes and S3. The highlights for this architecture includes:

  • Single architecture to run Spark across hybrid cloud.
  • Scale, operate compute and storage independently.
  • Fast provision, deploy and upgrade.
  • No need of Hadoop, which is complex to use and operate.

In this blog, I will explain how to run Spark with Kubernetes using the Spark on Kubernetes Operator. I will also describe the configurations for fast S3 data access using S3A Connector and S3A Committers. This architecture works for both cloud object storage and on premise S3 compatible object storage like FlashBlade S3.

Install Spark Kubernetes Operator

Follow this quick start guide to install the operator. Make sure you enable webhook in the installation.

helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubatorhelm install incubator/sparkoperator --namespace spark-operator --set enableWebhook=true

Create spark service account and roll binding in Kubernetes default name space using this manifest.

kubectl create -f manifest/spark-rbac.yaml

Run the Spark Pi example to test the installation. This will create two Spark pods in Kubernetes: one for the driver, another for an executor.

kubectl apply -f examples/spark-pi.yaml

Accessing Data in S3 Using S3A Connector

Thanks to the Spark Operator, with a couple of commands, I was able to deploy a simple Spark job running on Kubernetes. My next task is to access data in S3 in my Spark job. Hadoop’s S3A connector offers high-performance I/O against Amazon S3 and compatible object storage implementations including FlashBlade S3.

Building a Docker Image with Latest S3A Connector

The Spark Operator uses a pre-built Spark docker image from Google Cloud. However, the image does not include the S3A connector. Although, it is possible to customise and add S3A, the default Spark image is built against Hadoop 2.7, which is known to have an inefficient and slow S3A implementation. So I decided to build my own Docker image with Spark and latest S3A connector.

I will omit the building process details as it is straightforward, but the key points is to use the pre-built Spark-without-Hadoop binary and user-provided Hadoop. My Docker file is available on my Github.

My Docker image with Spark 2.4.5, Hadoop 3.2.1 and latest S3A is available at Docker Hub:

docker pull uprush/apache-spark:2.4.5

S3A Connector Configuration

The minimum S3A configuration for Spark to access data in S3 is as the below:

"spark.hadoop.fs.s3a.endpoint": "192.168.170.12"
"spark.hadoop.fs.s3a.access.key": "S3_ACCESS_KEY"
"spark.hadoop.fs.s3a.secret.key": "S3_SECRET_KEY"
"spark.hadoop.fs.s3a.connection.ssl.enabled": "false"

In a regular Spark cluster, this is put in the spark-default.conf, or core-site.xml file if running with Hadoop. With the Spark Operator, it is configured under spec.sparkConf section in your application YAML file. Refer to the API doc for a full description of the Spark Operator API.

Spark and S3: The DogLover Example

A working example is the best to demonstrate how it works. Here is an example code to read and write data in S3 from a Spark program called DogLover. I have collected tweets from dog lovers on twitter, using the twitter API, and stores them as JSON files in an S3 bucket on FlashBlade. The DogLover Spark program is a simple ETL job, which reads the JSON files from S3, does the ETL using Spark Dataframe and writes the result back to S3 as Parquet file, all through the S3A connector.

To manage the lifecycle of Spark applications in Kubernetes, the Spark Operator does not allow clients to use spark-submit directly to run the job. Instead, I upload the jar file to S3, and in my doglover.yaml spec file, I let the Spark Operator to download from there and run the program on Kubernetes.

spec:
type: Scala
mode: cluster
image: "uprush/apache-spark:2.4.5"
imagePullPolicy: Always
mainClass: com.uprush.example.DogLover
mainApplicationFile: "s3a://deephub/doglover/doglover_2.12-0.1.0-SNAPSHOT.jar"
sparkVersion: "2.4.5"

I can then submit the Spark job like this:

kubectl create -f doglover.yaml

After a few seconds, my Spark job was running on Kubernetes.

Spark job running on Kubernetes

Once the job finished, we should see a zero byte _SUCCESS file along with multiple Parquet files in the output S3 directory.

My architecture looks like this:

Apache Spark with Kubernetes and S3

In my opinion, this is a better way to submit Spark jobs because it can be submitted from any available Kubernetes client. It makes my job less dependent on the infrastructure, therefore more portable. For example to run the same job in AWS, I can first replicate my data from FlashBlade S3 to Amazon S3 using FlashBlade object replication. I can then easily run the same Spark job same way in a Kubernetes cluster in AWS cloud.

Fast S3 Writes with S3A Committers

When working with S3, Spark relies on the Hadoop output committers to reliably writes output to S3 object storage. The traditional FileOutputCommitter is designed for HDFS, thus when using with S3, it is known to be inefficient, slow and less reliable because it relays on an atomic “rename” HDFS operation, which is not available for object storage. In Hadoop 3, the new “zero-rename” S3A Committers are created for addressing these issues by leveraging cloud-native S3 feature. See more on Committing work to S3 with the S3A Committers.

Netflix has contributed a S3A committer called the Staging Committer, one which has a number of appealing features

  • Doesn’t have any requirements of the destination object store.
  • Known to work.

The committer writes task outputs to a temporary directory called task attempt directory on the local filesystem. On task commit, the committer enumerates files in the task attempt directory. Each file is uploaded to S3 using the multi-part upload API. The information needed to commit the upload is saved into a HDFS staging directory and committed via that protocol: when the job commits, pending upload parts of the successful tasks are all committed.

Minimum S3A staging committer configurations for Spark on Kubernetes (without HDFS):

"spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a":"org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory"
"spark.hadoop.fs.s3a.committer.name": "directory"
"spark.sql.sources.commitProtocolClass": "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol"
"spark.sql.parquet.output.committer.class": "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter"

Persistent Volume for S3A Staging Committer

In fact, the staging directory does not have to be in HDFS, it can be also be a NFS volume that is shared to all Spark pods. In my case, I use FlashBlade NFS because I don’t want to have any HDFS dependency.

Creating a staging PV and mount it to all Spark pods is easy by using Pure Service Orchestrator (PSO).

To use FlashBlade NFS as the PV, create a staging-pvc.yaml spec file and specify storage class to pure-file.

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: data-staging-share
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 1Ti
storageClassName: pure-file

Apply the spec file to create a PVC.

kubectl create -f staging-pvc.yaml

I then create a PV and mount it to all Spark pods under the committer’s work directory, which is /home/spark/tmp in my case.

spec:
volumes:
- name: "staging-vol"
persistentVolumeClaim:
claimName: data-staging-share
driver:
volumeMounts:
- name: "staging-vol"
mountPath: "/home/spark/tmp"
executor:
instances: 2
volumeMounts:
- name: "staging-vol"
mountPath: "/home/spark/tmp"

Finally, I configure S3A to use this PV.

"spark.hadoop.fs.s3a.committer.tmp.path": "file:///home/spark/tmp/staging"

Persistent Volume for Buffer Directory

It is not required, but often time it is better to use a PV for S3A committer’s buffer directory. Buffer directory is a local filesystem directory for data being written by the committer. Because the staging committer writes its output to the local filesystem and only uploads the data on task commits, it is important to make sure enough local storage is available to store outputs generated by all uncommitted tasks running on the host. Small hosts/VMs may run out of disk. To avoid this situation, I configure S3A to use the same PV as the above for buffer directory. Even it is a remote storage, performance is no problem here because FlashBlade is very fast.

"spark.hadoop.fs.s3a.buffer.dir": "/home/spark/tmp/buffer"

With these, I can use the efficient, fast and reliable S3A staging committer to write data to S3 from Spark running on Kubernetes.

The Dog Lover Example with S3A Committer

With the above setup, my architecture changes to:

Apache Spark with Kubernetes and S3A Committer

Put everything above together in the doglover.yaml spec file, rerun the job. Unlike before, this time the _SUCCESS file created by the job is not zero byte. It contains metrics and counters from the S3A committer.

cat _SUCCESS
{
"name" : "org.apache.hadoop.fs.s3a.commit.files.SuccessData/1",
"timestamp" : 1588673361792,
"date" : "Tue May 05 10:09:21 UTC 2020",
"hostname" : "doglover-driver",
"committer" : "directory",
"description" : "Task committer attempt_20200505100916_0000_m_000000_0",
"metrics" : {
"stream_write_block_uploads" : 0,
"files_created" : 1,
...

This indicates the S3A committer is correctly configured, and Spark was writing to S3 more efficiently. Refer to my deck for the details of S3A committers and their performance character.

Conclusion

With Spark on Kubernetes, and by putting data in S3, I was able to easily and quickly spin up and down Spark jobs in a portable way. I was also able to run my Spark jobs along with many other applications such as Presto and Apache Kafka in the same Kubernetes cluster, using the same FlashBlade storage. And I did not need to manage a Hadoop cluster for all of these. Kubernetes, FlashBlade and PSO together bring a simple, scalable and high performance disaggregated solution for running modern analytics systems such as Spark like a service.

The benefit of using Spark with Kubernetes and S3 is definitely huge, however, it does have limits. This architecture is great for fire-and-forget types of workload like ETL batches. Although, it may not be the best Spark architecture for things like business intelligence (BI) and notebook backend, because I couldn’t find an easy way to keep the Thrift Server or Spark session running through the Spark Operator. But I know there are better ways to do those on Kubernetes with Spark and other solutions. Keep tuned.

All it takes is a spark!

--

--

Software & solutions engineer, big data and machine learning, jogger, hiker, traveler, gamer.