Chaining Multiple MapReduce Jobs with Hadoop/ Java

Tri Nguyen
Towards Data Science
5 min readMar 23, 2020

--

Source: https://mapr.com/blog/how-to-launching-mapreduce-jobs/

I learned about MapReduce briefly pretty much a year ago when my job required a bit of Hadoop. I then had not touched MapReduce, let along doing it with Java. So when an assignment asked me to implement multiple MapReduce jobs under one script, it was a mess searching up Stack Overflow and Youtube.

So, why not write something about it? Yes, I am.

Recap: MapReduce

MapReduce is a computation abstraction that works well with The Hadoop Distributed File System (HDFS). It comprises of a “Map” step and a “Reduce” step. Map performs filtering and sorting into another set of data while Reduce performs a summary operation. In both steps, individual elements are broken down into tuples of key and value pairs.

MapReduce gains its popularity by being able to easily scale data processing over multiple computing nodes behind the scene. Hence, it works well with

extremely large datasets.

Multiple MapReduce Jobs

To illustrate how to chain multiple MapReduce jobs in one script, I will be using the NYC Taxi & Limousine Commission dataset of around 7.6 million rows to compute the distribution of degree differences of locations. The .tsv file I am using has the following structure:

Let’s say we want to use MapReduce to obtain the following output of two columns:

where “diff” is a location ID’s out-degree minus its in-degree. The out-degree of a location is the number of times that location is used for pickup and the in-degree is the number of times it is used for dropoff. Then, “count” is the frequency for a particular “diff”.

Therefore, one way to accomplish this is to have two MapReduce jobs. One is to calculate the “diff” for each location and the other is to turn that output of the first job into “count” as we want above.

MAP-REDUCE JOB #1

Map Procedure #1

The Map procedure for Job #1 simply loops through and breaks the .tsv input into 4 different values for each line: PickUpLocation ID, DropOffLocation ID, PassengerCount, and TotalFare. However, only PickUpLocation ID and DropOffLocation ID are relevant for our task here. Then, for each element, the class also creates an inDegree variable that is 1 and an outDegree variable that is -1.

Then, it writes each (PickUpLocation, inDegree) and (DropOffLocation, outDegree) as a key-value tuple that will be processed further by the Reduce procedure, which takes form like below where the first column is location ID and the second column indicates whether it is a pickup or dropoff from each location ID.

Reduce Procedure #1

Then, what the Reduce procedure does is simply grouping by location ID and aggregate by summing up the second column to achieve the “diff”. Then it writes (Location ID, diff) as a key-value tuple.

The output takes the form of the following:

MAP-REDUCE JOB #2

The output of Job #1 is then passed in as the input for Job #2.

Map Procedure #2

In order to group by “diff” to achieve the final output, the Map procedure of Job 2 needs to swap the input key-value pairs (Location ID, diff) into (diff, Location ID) because Reduce procedure groups by keys.

Essentially, the output of the Map procedure this time looks like:

Finally, that is then passed into the second Reduce procedure to finish the task:

Reduce Procedure #2

What happens here is that the Reduce procedure simply loops through the input and for each occurrence of a unique “diff”, its count increases by 1. At the end of the loop, it writes (diff, count) as key-value pair into the final output file:

CHAINING JOB #1 & JOB #2

The challenging step that I found is connecting the above two MapReduce jobs so that Job #2 could take the output of Job #1 as input without the need for Job #1 to physically write out a file.

Chaining Step

I had to look for and experiment with multiple suggestions online until I found the combination that is easiest to understand.

Basically, the key is to create two different configurations for the two jobs as “conf” and “conf2” where they also get two different instances.

Configuration conf = new Configuration();
Job job1 = Job.getInstance(conf, "degree");
Configuration conf2 = new Configuration();
Job job2 = Job.getInstance(conf2, "frequency");

Then, Job #2 can only be executed when Job #1 finishes with

job1.waitForCompletion(true);

But note that the system does not terminate here. Then, the output of Job #1 is the same as the input of Job #2:

FileOutputFormat.setOutputPath(job1, new Path(args[1]));FileInputFormat.addInputPath(job2, new Path(args[1]));

And that’s it. With this technique, you are not limited to only two MapReduce jobs but can also increase to three, five, or even ten to fit your task.

I hope this quick note helps whoever that are struggling to find a comprehensive and easy to understand guide on chaining MapReduce jobs.

--

--