Quickstart: Apache Spark on Kubernetes

Give your big loads a smooth sailing using the native Apache Spark Operator for Kubernetes

Matheus Cunha
Towards Data Science

--

Photo by Kinsey on Unsplash

Introduction

The Apache Spark Operator for Kubernetes

Since its launch in 2014 by Google, Kubernetes has gained a lot of popularity along with Docker itself and since 2016 has become the de facto Container Orchestrator, established as a market standard. Having cloud-managed versions available in all the major Clouds.[1][2][3] (including Digital Ocean and Alibaba).

With this popularity came various implementations and use-cases of the orchestrator, among them the execution of Stateful applications including databases using containers.

What would be the motivation to host an orchestrated database? That’s a great question. But let’s focus on the Spark Operator running workloads on Kubernetes.

A native Spark Operator idea came out in 2016, before that you couldn’t run Spark jobs natively except some hacky alternatives, like running Apache Zeppelin inside Kubernetes or creating your Apache Spark cluster inside Kubernetes (from the official Kubernetes organization on GitHub) referencing the Spark workers in Stand-alone mode.

However, the native execution would be far more interesting for taking advantage of Kubernetes Scheduler responsible for taking action of allocating resources, giving elasticity and an simpler interface to manage Apache Spark workloads.

Considering that, Apache Spark Operator development got attention, merged and released into Spark version 2.3.0 launched in February, 2018.

If you’re eager for reading more regarding the Apache Spark proposal, you can head to the design document published in Google Docs.

Why Kubernetes?

As companies are currently seeking to reinvent themselves through the widely spoken digital transformation in order for them to be competitive and, above all, to survive in an increasingly dynamic market, it is common to see approaches that include Big Data, Artificial Intelligence and Cloud Computing[1][2][3].

An interesting comparison between the benefits of using Cloud Computing in the context of Big Data instead of On-premises’ servers can be read at Databricks blog, which is the company founded by the creators of Apache Spark.

As we see a widespread adoption of Cloud Computing (even by companies that would be able to afford the hardware and run on-premises), we notice that most of these Cloud implementations don’t have an Apache Hadoop since the Data Teams (BI/Data Science/Analytics) increasingly choose to use tools like Google BigQuery or AWS Redshift. Therefore, it doesn’t make sense to spin-up a Hadoop with the only intention to use YARN as the resources manager.

An alternative is the use of Hadoop cluster providers such as Google DataProc or AWS EMR for the creation of ephemeral clusters. Just to name a few options.

To better understand the design of Spark Operator, the doc from GCP on GitHub is a no-brainer.

Let’s get hands-on!

Warming up the engine

Now that the word has been spread, let’s get our hands on it to show the engine running. For that, let’s use:

Once the necessary tools are installed, it’s necessary to include Apache Spark path in PATH environment variable, to ease the invocation of Apache Spark executables. Simply run:

export PATH=${PATH}:/path/to/apache-spark-X.Y.Z/bin

Creating the Minikube “cluster”

At last, to have a Kubernetes “cluster” we will start a minikube with the intention of running an example from Spark repository called SparkPi just as a demonstration.

minikube start --cpus=2 \
--memory=4g

Building the Docker image

Let’s use the Minikube Docker daemon to not depend on an external registry (and only generate Docker image layers on the VM, facilitating garbage disposal later). Minikube has a wrapper that makes our life easier:

eval $(minikube docker-env)

After having the daemon environment variables configured, we need a Docker image to run the jobs. There is a shell script in the Spark repository to help with this. Considering that our PATH was properly configured, just run:

docker-image-tool.sh -m -t latest build

FYI: The -m parameter here indicates a minikube build.

Let’s take the highway to execute SparkPi, using the same command that would be used for a Hadoop Spark cluster spark-submit.

However, Spark Operator supports defining jobs in the “Kubernetes dialect” using CRD, here are some examples — for later.

Fire in the hole!

Mid the gap between the Scala version and .jar when you’re parameterizing with your Apache Spark version:

spark-submit --master k8s://https://$(minikube ip):8443 \
--deploy-mode cluster \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.executor.instances=2 \
--executor-memory 1024m \
--conf spark.kubernetes.container.image=spark:latest \
local:///opt/spark/examples/jars/spark-examples_2.11-X.Y.Z.jar # here

What’s new is:

  • --master: Accepts a prefix k8s:// in the URL, for the Kubernetes master API endpoint, exposed by the commandhttps://$(minikube ip):8443. BTW, in case you want to know, it's a shell command substitution;
  • --conf spark.kubernetes.container.image=: Configures the Docker image to run in Kubernetes.

Sample output:

...19/08/22 11:59:09 INFO LoggingPodStatusWatcherImpl: State changed,
new state: pod name: spark-pi-1566485909677-driver namespace: default
labels: spark-app-selector -> spark-20477e803e7648a59e9bcd37394f7f60,
spark-role -> driver pod uid: c789c4d2-27c4-45ce-ba10-539940cccb8d
creation time: 2019-08-22T14:58:30Z service account name: default
volumes: spark-local-dir-1, spark-conf-volume, default-token-tj7jn
node name: minikube start time: 2019-08-22T14:58:30Z container
images: spark:docker phase: Succeeded status:
[ContainerStatus(containerID=docker://e044d944d2ebee2855cd2b993c62025d
6406258ef247648a5902bf6ac09801cc, image=spark:docker,
imageID=docker://sha256:86649110778a10aa5d6997d1e3d556b35454e9657978f3
a87de32c21787ff82f, lastState=ContainerState(running=null,
terminated=null, waiting=null, additionalProperties={}),
name=spark-kubernetes-driver, ready=false, restartCount=0,
state=ContainerState(running=null,
terminated=ContainerStateTerminated(containerID=docker://e044d944d2ebe
e2855cd2b993c62025d6406258ef247648a5902bf6ac09801cc, exitCode=0,
finishedAt=2019-08-22T14:59:08Z, message=null, reason=Completed,
signal=null, startedAt=2019-08-22T14:58:32Z,
additionalProperties={}), waiting=null, additionalProperties={}),
additionalProperties={})]
19/08/22 11:59:09 INFO LoggingPodStatusWatcherImpl: Container final
statuses: Container name: spark-kubernetes-driver Container image:
spark:docker Container state: Terminated Exit code: 0

To see the job result (and the whole execution) we can run a kubectl logs passing the name of the driver pod as a parameter:

kubectl logs $(kubectl get pods | grep 'spark-pi.*-driver')

Which brings the output (omitted some entries), similar to:

...
19/08/22 14:59:08 INFO TaskSetManager: Finished task 1.0 in stage 0.0
(TID 1) in 52 ms on 172.17.0.7 (executor 1) (2/2)
19/08/22 14:59:08 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose
tasks have all completed, from pool19/08/22 14:59:08 INFO
DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:38) finished in
0.957 s
19/08/22 14:59:08 INFO DAGScheduler: Job 0 finished: reduce at
SparkPi.scala:38, took 1.040608 s Pi is roughly 3.138915694578473
19/08/22 14:59:08 INFO SparkUI: Stopped Spark web UI at
http://spark-pi-1566485909677-driver-svc.default.svc:4040
19/08/22 14:59:08 INFO KubernetesClusterSchedulerBackend: Shutting
down all executors
19/08/22 14:59:08 INFO
KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking
each executor to shut down
19/08/22 14:59:08 WARN ExecutorPodsWatchSnapshotSource: Kubernetes
client has been closed (this is expected if the application is
shutting down.)
19/08/22 14:59:08 INFO MapOutputTrackerMasterEndpoint:
MapOutputTrackerMasterEndpoint stopped!
19/08/22 14:59:08 INFO MemoryStore: MemoryStore cleared
19/08/22 14:59:08 INFO BlockManager: BlockManager stopped
19/08/22 14:59:08 INFO BlockManagerMaster: BlockManagerMaster stopped
19/08/22 14:59:08 INFO
OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
OutputCommitCoordinator stopped!
19/08/22 14:59:08 INFO SparkContext: Successfully stopped SparkContext
19/08/22 14:59:08 INFO ShutdownHookManager: Shutdown hook called
19/08/22 14:59:08 INFO ShutdownHookManager: Deleting directory
/tmp/spark-aeadc6ba-36aa-4b7e-8c74-53aa48c3c9b2
19/08/22 14:59:08 INFO ShutdownHookManager: Deleting directory
/var/data/spark-084e8326-c8ce-4042-a2ed-75c1eb80414a/spark-ef8117bf-90
d0-4a0d-9cab-f36a7bb18910
...

The result appears in:

19/08/22 14:59:08 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 1.040608 s Pi is roughly 3.138915694578473

Finally, let’s delete the VM that Minikube generates, to clean up the environment (unless you want to keep playing with it):

minikube delete

Last words

I hope your curiosity got sparked and some ideas for further development have raised for your Big Data workloads. If you have any doubts or suggestions, don’t hesitate to share in the comment section.

Originally published at https://macunha.me on May 21, 2020.

--

--

Just a technology lover empowering business with high-tech computing to help innovation (: