The world’s leading publication for data science, AI, and ML professionals.

Spark Streaming for Beginners

Spark is deemed to be a highly fast engine to process high volumes of data and is found to be 100 times faster than MapReduce. It is so as…

An understanding of the concept of Spark Streaming and code demonstrations using Java API for beginners.

Photo by Safar Safarov on Unsplash.com
Photo by Safar Safarov on Unsplash.com

Spark is deemed to be a highly fast engine to process high volumes of data and is found to be 100 times faster than MapReduce. It is so as it uses distributed data processing through which it breaks the data into smaller pieces so that the chunks of data can be computed in parallel across the machines which saves time. Also, it uses in-memory processing rather than disk based processing which allows the computation to be faster.

Spark Streaming is one of the most important parts of Big Data ecosystem. It is a software framework from Apache Spark Foundation used to manage Big Data. Basically it ingests the data from sources like Twitter in real time, processes it using functions and algorithms and pushes it out to store it in databases and other places.

Source: spark.apache.org
Source: spark.apache.org

How to initiate Spark Streaming?

Configuring Spark

First we configure spark and tell it from where it has to ingest the data, whether from local directory, spark cluster, mesos cluster or kubernetes cluster. If you are unfamiliar with these terms, don’t worry. Basically these are cluster management systems which spark needs to handle tasks such as checking node health and scheduling jobs. If you choose your local directory as the master, you need to specify the number of cores from your local machine that you want spark to run on. The more cores you use to run, the faster would be the performance. If you specify *, it means use all the cores from your system. Then we specify app name which is the name we give to our spark application.

SparkConf conf = new SparkConf().setAppName("SparkApp").setMaster("local[*]");

Creation of Streaming Context Object

Then we create an object of Java Streaming Context which kinda like opens the door for the streaming to start. It provides methods to create JavaDStream and JavaPairDStream from input sources which we’ll discuss further. While creating Java Streaming Context object, we need to specify batch interval; basically spark streaming divides the incoming data into batches such that the final result is also generated in batches. A batch interval tells spark that for what duration you have to fetch the data, like if its 1 minute, it would fetch the data for the last 1 minute.

source: spark.apache.org
source: spark.apache.org

So the data would start pouring in a stream in batches, this continuous stream of data is called DStream. Every batch of dsteam would contain collection of elements that can be processed in parallel, this collection is called RDD.

source: spark.apache.org
source: spark.apache.org
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(60));

Start Streaming the data

To receive data, the streaming context provides method to stream data from a TCP socket connection or from files as input sources. The sources can be sources like HDFS, S3 etc. To read textfiles, there is textFileStream method of javastreamingcontext.

JavaDStream<String> lines = jssc.textFileStream("C:UsersHPDownloadsSpark_Streams");

But you won’t be able to read the files already present in the directory before the streaming context starts, because it reads only the newly created files.

So here I’ll be Streaming the data through socket connection through port 9999 and create a java receiver input DStream with it.

JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

So now if you establish a socket connection and write something in terminal, and run the dstream you’ll see the text appearing in the console.

Note: To start a java streaming context, we need to tell spark to start it, wait for computation to terminate and then stop it. And we need to print the DStream by method print().

lines.print();
jssc.start();
jssc.awaitTermination();
jssc.stop();
Input through TCP socket
Input through TCP socket
output in console
output in console

Notice when it prints the output at time t1, but no output is printed at time t2 and t3, as it fetches data for every minute. In the next batch intervals, it didn’t receive any input, so it does not prints anything.

Now I’ll show you how we can use some transformations on these dstreams using lanbda functions.

Map Transformation

Map transformation applies the function we specify on the DStream and produces one output value for each input value. So it basically transform one stream to an other. Like here I want to count the length of the line of text, so I’ll use map transformation for it.

JavaDStream<Integer> length = lines.map(x -> x.length());
Input
Input
It counts the length of line of text
It counts the length of line of text

FlatMap Transformation

FlatMap transformation applies the function on DStream but can produce one or more output values for each input value. So if I want to transform the RDD such that it produces more than one values, I will use FlatMap transformation.

So here I gave input a line of text ‘hi how are you’ and I want to split it into individual words. I used lambda function for the same. A FlatMap transformation returns arbitrary number of values that depends upon the rdd and the function applied, so the return type has to be a stream of values.

JavaDStream words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
It splits the line into words
It splits the line into words

Reduce Transformation

A reduce transformation aggregates the elements in each RDD. It takes two arguments of single element RDDs and returns one.

Here after we are apply flatMap function and return stream of words, I’ll apply reduce transformation to get the word with highest length in each RDD.

JavaDStream<String> reduce = words.reduce((a,b) -> {
String max_length_word;
if(a.length() >= b.length()) {
max_length_word = a;
}else {
max_length_word = b;
}
return max_length_word;
});

Try to understand this code, here it takes arguments of type String, wherein the words in each RDD are aggregated on basis of their length, and the word with maximum length is returned.

Input
Input
It returns the word with maximum length in every batch
It returns the word with maximum length in every batch

Filter Transformation

Filter transformation filters the DStream according to the function given. Like after flatMap transformation, lets say I want to filter the word hello from the stream of words.

JavaDStream<String> filter = words.filter(x -> !x.equals("hello"));
input
input
It filters the word 'hello'
It filters the word ‘hello’

Notice the word ‘Hello’ is not filtered as it includes upper case letter which we didn’t specify in our code.

MapToPair Transformation

mapToPair transformation transforms each input value to a pair of values.

JavaPairDStream<String, Integer> pairs = filter.mapToPair(x -> new Tuple2<String, Integer>(x, 1));

Notice here the object created is a JavaPairDStream rather than DStream as we are returning pairs in the stream.

It returns the word and integer 1
It returns the word and integer 1

ReduceByKey Transformation

In a DStream, we can aggregate the elements of RDDs on the basis of key using reduceByKey. Unlike reduce transformation, it takes pairs of RDDs not single element RDDs. Like here it takes a tuple of String and Integer, and we are aggregating the count of the number of times a word appears in a RDD. It takes two arguments and returns one. But while specifying the call type, we aren’t specifying tuple<String, Integer>, we are just specifying Integer as reduce by key will itself take note of the key and aggregate it on basis of the function specified.

JavaPairDStream<String, Integer> sum = pairs.reduceByKey((a,b) -> a + b);
For very word, it sums the integer 1 according to the number of times the word appears
For very word, it sums the integer 1 according to the number of times the word appears

Count Transformation

Count transformation counts the number of elements in each RDD and return a DStream with single element RDDs.

Here after applying the flatMap transformation, we want to know the count of words in every RDD.

JavaDStream<Long> count = words.count();
It returns the number of words in the line of text
It returns the number of words in the line of text

So the number of words in the RDD were 13, so it returned 13 as output. This was after applying flatMap which split the line of words into individual words. If we apply without splitting the words lets see what we get.

It returns the number of lines it receives in every batch
It returns the number of lines it receives in every batch

Since we haven’t split the line of words into individual words, spark is treating the whole line as a single element. In the first batch, it receives 3 lines, so it returns count as 3, and next batches it receives 2 and 6 lines, so the count is 3 and 6 respectively.

CountByValue Transformation

countByValue take a DStream of type k and counts the number of times the key appears in the RDD and returns a PairedDStream of (k, value) pairs.

Here after I have split the line of words with flatMap, I applied countByValue transformation.

JavaPairDStream<String, Long> countByValue = words.countByValue();
input
input
It counts how many times a word appear in a RDD
It counts how many times a word appear in a RDD

Now I’ll show you some actions we can perform on RDDs. So basically we are applying transformations on DStreams which contains RDDs, and we are applying functions on those RDDs when we specify a transformation. There are some actions spark provides that we can apply on these RDDs.

So lets say I want to arrange the key value pairs I got after applying countByValue into descending order. What I can do is I can swap those key value pairs and then sort it which I’ll show further. Using, mapToPair transformation, I am using swap action on RDDs to to change it to (Long, String) pairs.

Swap Action

JavaPairDStream<Long, String> swap = countByValue.mapToPair(x -> x.swap());
input
input
swap action swaps the key value pairs
swap action swaps the key value pairs

SortByKey Action

Now to sort the values in descending order, I can use sortByKey transformation of RDD. It will sort in ascending or descending order as per the boolean value specified. It we don’t specify boolean value, it will sort in ascending as default.

TransformToPair Transformation

Now to use sortByKey, I will use transformToPair transformation. A transform function returns a new DStream by applying RDD to RDD transformation on every RDD. Here I want to return PairedDStream, so I’ll use transformToPair.

JavaPairDStream<Long, String> sort = swap.transformToPair(x -> x.sortByKey(false));
It sorts the keys in descending order
It sorts the keys in descending order

Now I can again swap the pairs using mapToPair to get words as Keys and counts as values.

JavaPairDStream<String, Long> swap_again = sort.mapToPair(x -> x.swap());
input
input
It gets swapped again
It gets swapped again

This was an outline of how spark streaming works and a few examples of how we apply transformations to DStreams. Thanks for reading!!! If you have doubts, you can ask me in the comment section.


Related Articles