Training multiple machine learning models and running data tasks in parallel via YARN + Spark + multithreading

Harness large scale computational resources to allow a single data scientist to perform dozens or hundreds of Big data tasks in parallel, stretching the limits of data science scaling and automation

Edson Hiroshi Aoki
Towards Data Science

--

image: Freepik.com

Summary

To objective of this article is to show how a single data scientist can launch dozens or hundreds of data science-related tasks simultaneously (including machine learning model training) without using complex deployment frameworks. In fact, the tasks can be launched from a “data scientist”-friendly interface, namely, a single Python script which can be run from an interactive shell such as Jupyter, Spyder or Cloudera Workbench. The tasks can be themselves parallelised in order to handle large amounts of data, such that we effectively add a second layer of parallelism.

Who this article is intended for?

  • Data scientists who wish to do more work with less time, by making use of large scale computational resources (e.g. clusters or public clouds), possibly shared with other users via YARN. To understand this article you need a good knowledge of Python, working knowledge of Spark, and at least basic understanding about Hadoop YARN architecture and shell scripting;
  • Machine learning engineers who are supporting data scientists on making use of available computational capacity and operating large scale data

Introduction

Data science and automation

“Data science” and “automation” are two words that invariably go hand-in-hand with each other, as one of the keys goals of machine learning is to allow machines to perform tasks more quickly, with lower cost, and/or better quality than humans.

Naturally, it wouldn’t make sense for an organization to spend more on tech staff that are supposed to develop and maintain systems that automate work (data scientists, data engineers, DevOps engineers, software engineers and others) than on the staff that do the work manually. It’s not thus surprising that a recurrent discussion is how much we can automate the work of data science teams themselves, for instance via automated machine learning.

To achieve cost-effective data science automation, it is imperative to able to harness computational power from public or private clouds; after all, the cost of hardware is quite low compared to the cost of highly skilled technical staff. While technology to achieve so is certainly available, many organisations ended up facing the “big data software engineer vs data scientist conundrum”, or more precisely, the drastic discrepancy between

  • “Big data software engineer skills”, i.e. skills necessary to manipulate massive amounts of data in complex computational environments, and run these processes in a reliable manner along with other concurrent processes
  • “Data scientist skills”, i.e. skills necessary to apply algorithms and mathematics to the data to extract insights valuable from a business standpoint
Harnessing computational power is key to automating data science work

image: Freepik.com

Some organisations would make “data scientists” responsible for developing the analytics models in some sort of “controlled analytics environment” where one does not need to think too much about the underlying computational resources or sharing the resources with other processes, and “big data software engineers” responsible for coding “production-ready” versions of the models developed by data scientists and deploy them into production. This setup resulted in obvious inefficiencies, such as:

  1. Data scientists developing sub-optimal models due to not making use of large scale data and computational resources. In some organisations, data scientists even ended up working with single-node frameworks such as Pandas/Scikit-Learn and basing their models entirely on small datasets obtained via sampling or over-engineered features;
  2. Developed models performing well on analytics environment but not performing well, or being completely unable to run, in production environment;
  3. The difficulty to evaluate generation of business value, identify and fix problems, as well as making iterative improvements, as data scientists end up dramatically losing oversight of the analytics process once models are sent into production.

Different organisations dealt with this situation with different ways, either by forcing big data software engineers and data scientists learn the skills of the “other role”, or by creating a “third role”, named “Machine Learning Engineer” to bridge the gap between the two roles.

But the fact is that nowadays, there are far more resources in terms of allowing data scientists without exceptional software engineering skills to work in “realistic” environments, i.e. similar to production, in terms of computational complexity. Machine learning libraries such as Spark MLLib, Kubeflow, Tensorflow-GPU, and MMLSpark allow data preparation and model training to be distributed across multiple CPUs, GPUs, or a combination of both; at the same time, frameworks such as Apache Hadoop YARN and Kubernetes allow data scientists to work simultaneously using the same computational resources, by understanding only basic concepts about the underlying server infrastructure, such as number of available CPUs/GPUs and available memory.

The intent of this article is to provide an example of how these libraries and frameworks, as well as massive (but shared) computational resources, can be leveraged together in order to automate the creation and testing of data science models.

From individually massively parallelised tasks to massively running tasks in parallel

Frameworks like Spark and Kubeflow make easy to distribute a Big Data task, such as feature processing or machine learning model training, across GPUs and/or hundreds of CPUs without a detailed understanding of the server architecture. On the other hand, executing tasks in parallel, rather than individual parallelised tasks, is not as seamless. Of course, it’s not hard for a data scientist to work with two or three PySpark sessions in Jupyter at the same time, but for the sake of automation, we might be rather interested in running dozens and hundreds of tasks simultaneously, all specified in a programmatic way with minimal human interference.

Naturally, one may ask why bother with running tasks in parallel, instead of simply increasing the number of cores per task and make each task run in a shorter time. There are two reasons:

  1. The processing speed often does not scale with the number of cores. For example, in the case of training machine learning models, if the data is not large enough, there might be zero improvement on computation time by increasing the number of cores from say, 10 to 100, and sometimes the computational time might even increase due to process and communication overhead, as well as the inability to leverage highly efficient single-processor implementations available in some machine learning libraries
  2. The accuracy of machine learning algorithms models may also decrease due to parallelisation, as those algorithms often rely on suboptimal heuristics to able to run in distributed fashion, such as data split and voting

It is certainly possible, using deployment tools such as Airflow, to run arbitrarily complex, dynamically defined and highly automated data analytics pipelines involving parallelised tasks. However, these tools require low-level scripting and configuration and aren’t suited for quick “trial and error” experiments carried on by data scientists on a daily basis, often accustomed to try and re-try ideas quickly in interactive shells such as Jupyter or Spyder. Also, taking us back to the previously mentioned “big data software engineer vs data scientist” conundrum, organisations might prefer data scientists to spend their time focusing on experimenting with the data and generating business value, not on getting immersed in low-level implementation or deployment.

What you will learn in this article?

In this article, I will show how we can make use of Apache Hadoop YARN to launch and monitor multiple jobs in a Hadoop cluster simultaneously, (including individually parallelised Spark jobs), directly from any Python code (including code from interactive Python shells such as Jupyter), via Python multithreading. While the example will consist of training multiple machine learning models in parallel, I will provide a generic framework that can be used to launch arbitrary data tasks such as feature engineering and model metric computation.

Some applications for multiple model parallel training are:

  • Hyper-parameter tuning: For the same training data set, simultaneously train using different model types (say Logistic Regression, Gradient Boosting and Multi-layer Perceptron) and also different hyperparameter configurations, in order to find the optimal model type/hyperparameter set as quickly as possible;
  • Multi-label classification: Train multiple binary/multi-class classification models in parallel, where each model training task will use a different column as the label column, such that the resulting combination of models will effectively be a multi-label classifier;
  • Feature reduction: For a poll of previously ranked features, train multiple models, each using only the top N-ranked features as feature columns, with N being varied across the training tasks.

Technical overview

In our framework, I will call the main task, i.e. the Python code that creates the additional tasks to run in parallel, as the controller task, and the tasks being started by the controller task as the subordinate tasks. (I intentionally avoid using the expression “worker” to avoid confusion, as in Spark, “worker” is a synonym for Spark executor)

The controller task is responsible for:

  • Defining how many subordinate tasks should be run at the same time and what to do in case one of the tasks fail;
  • Creating the subordinate tasks, passing the inputs to each task and getting their outputs, if any;
  • Generating the inputs and processing the outputs of the subordinate tasks.

An interesting aspect of YARN is that it allows Spark to be used both in the controller and subordinate tasks. Although neither is necessary, this allows us to handle arbitrarily large datasets without needing to worry ourselves with data engineering, as long as we have enough computational resources. Namely, the controller task can run Spark in client mode, and the subordinate tasks can run Spark in cluster mode:

  • In client mode, the Spark driver runs in the environment where the controller’s Python code is being run (that we refer to as client environment), allowing the use of locally installed interactive shells such as Jupyter, whereas the Spark executors run in the YARN-managed Hadoop cluster, with the interactions between the driver and executors made via a third type of process named Application Master also running in the Hadoop cluster;
  • In cluster mode, both the driver and the executors run in the YARN-managed Hadoop cluster. Note that nothing prevent us to have the controller task also running in cluster mode, but interactive shells cannot be used in this way.

The framework is illustrated in the figure below:

Illustration of the parallelisation framework

There are two things to note about the example above:

  • Although in the example the controller task is also the driver of the Spark process (and thus associated with executors in the Hadoop cluster via the YARN Application Master), this is not necessary, although useful for example if we want to do some preprocessing on the data before deploying to the subordinate tasks;
  • Although the subordinate tasks do not need to use Spark parallelisation, we will use the spark-submit command to launch them, such that they will always have a Spark driver, although not necessarily Spark executors. This is the case of process 3 above.

Technical implementation

Executing a subordinate task as a Spark job

Before I delve into parallelisation, I will first explain how to execute a subordinate task from a controller task written in Python. As mentioned before, we will do so using the spark-submit shell script contained in the Apache Spark installation, such that the subordinate task will be technically a Spark job, although it does not necessarily has executors or Spark code as I mentioned before.

In principle, we can use spark-submit from Python by simply calling the os.system function, which allows us to execute a shell command from Python. In practice, we need to be able to debug and monitor the task; for that purpose, it is better to use the excellent subprocess library. An example:

import json
import subprocess
spark_config_cluster_path = "/home/edsonaoki/spark_config_cluster"
app_name = "some_model_training"
spark_config = {
"spark.jars.packages" :
"com.microsoft.ml.spark:mmlspark_2.11:0.18.1",
"spark.dynamicAllocation.enabled": "false",
"spark.executor.instances": "10",
"spark.yarn.dist.files": "/home/edsonaoki/custom_packages.tar"
}
command = "lightgbm_training.py "\
"hdfs://user/edsonaoki/datasets/input_data.parquet "\
"hdfs://user/edsonaoki/models"
spark_submit_cmd = “SPARK_CONF_DIR=%s spark-submit -name %s %s %s"
% (spark_config_cluster_path, app_name,
" ".join(['-conf %s="%s"' % (key, value) for key, value in
spark_config.items()]),
command)
cmd_output = subprocess.Popen(spark_submit_cmd, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
bufsize=1, universal_newlines=True)
for line in cmd_output.stdout:
print(line)
cmd_output.communicate()

At the beginning of the code I set the path containing the cluster mode base Spark configuration, which is later used to change the SPARK_CONF_DIR environmental variable. This is an actually crucial step if the controller task is configured to run in Spark in client mode since the Spark configuration for cluster mode is typically different than for client mode.

If you don’t know much about how to configure Spark in cluster mode, you can start by making a copy of the existing SPARK_CONF_DIR. Inside the spark-defaults.conf file we need to have

spark.submit.deployMode=cluster

instead of

spark.submit.deployMode=client

and certain configuration options, such as spark.yarn.rmProxy.enabled and the spark.driver.options.* options need to be disabled as there is no network-specific configuration for the driver when running Spark in cluster mode. Check the Spark on YARN documentation if you are in doubt. Of course, if the controller task is also running Spark in cluster mode, there is no need to have a separate configuration.

Now, looking at the subsequent steps:

app_name = "some_model_training"
spark_config = {
"spark.jars.packages" :
"com.microsoft.ml.spark:mmlspark_2.11:0.18.1",
"spark.dynamicAllocation.enabled": "false",
"spark.executor.instances": "10",
"spark.yarn.dist.files": "/home/edsonaoki/custom_packages.tar"
}
command = "lightgbm_training.py "\
"hdfs://user/edsonaoki/datasets/input_data.parquet"\
"hdfs://user/edsonaoki/models"
spark_submit_cmd = “SPARK_CONF_DIR=%s spark-submit -name %s %s %s"
% (spark_config_cluster_path, app_name,
" ".join(['-conf %s="%s"' % (key, value) for key, value in
spark_config.items()]),
command)

Here I set up the application name, additional Spark configuration options and the command to be executed by the spark-submit script. These are straightforward to understand, but the application name is particularly important in our case — we will later understand why. We also submit a custom Python package via the spark.yarn.dist.files configuration parameter, which as I will show later, is especially handy since the subordinate task runs in the Hadoop cluster and hence has no access to the Python functions available in the local (client) environment.

Note also that I specify two HDFS paths as arguments to the lightgbm_training.py Python script (the subordinate task’s code), for a similar reason to above: since the Python script will run in the Hadoop cluster, it will not have access to any files in the client environment’s file system, and hence any files to be exchanged between controller or subordinate task must be either explicitly submitted via spark.yarn.dist.files or put into a shared file system such as HDFS or AWS S3.

After preparing the spark-submit shell command line, we are ready to execute it using the subprocess.Popen command:

cmd_output = subprocess.Popen(spark_submit_cmd, shell=True,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
bufsize=1, universal_newlines=True)

We set shell=True to make Python initiate a separate shell process to execute the command, rather than attempting to initiate spark-submit directly from the Python process. Although setting shell=False is generally preferable when using the subprocess library, doing so restricts the command line format and it’s not feasible in our case.

The stdout, stderr, bufsize and universal_newlines arguments are used to handle the output (STDOUT) and error messages (STDERR) issued by the shell command during execution time. When we are executing multiple subordinate tasks in parallel, we will probably want to ignore all execution time messages as they will be highly cluttered and impossible to interpret anyways. This is also useful to save memory for reasons we will explain later. However, before attempting to run multiple tasks in parallel, it is certainly best to first make sure that each individual task will work properly, by running a single subordinate task with output/error messages enabled.

In the example I set stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1 and universal_newlines=True, which basically, will direct all shell command output to a First In First Out (FIFO) queue named subprocess.PIPE.

Note that when running a Spark job in cluster mode, subprocess.PIPE will only have access to messages from the YARN Application Master, not the driver or executors. To check the driver and executor messages, you might look at the Hadoop cluster UI via your browser, or retrieve the driver and executor logs post-execution as I will show later. Additionally, if file logging is enabled in the log4j.properties file (located in the Spark configuration), the messages from the Application Master will be logged into a file rather than directed to subprocess.PIPE, so disable file logging if needed.

Finally, to display the output/error messages in the Python script’s output, I continue the code above as follows:

for line in cmd_output.stdout:
print(line)
cmd_output.communicate()

The purpose of cmd_output.communicate() is to wait for the process to finish after subprocess.PIPE is empty, i.e. no more outputs from the subordinate task are written to it. It highly advisable to read the entire queue before calling cmd_output.communicate() method as done above, to prevent the queue from increasing in size and wasting memory.

Monitoring the subordinate task without using debug messages

As I mentioned earlier, when we run tasks in parallel we do not want debug messages to be displayed; moreover, if a large number of tasks are sending messages to an in-memory FIFO queue at the same time, memory usage will increase messages aren’t being read from the queue as fast as they are generated. A version of the code from the previous section without debugging, starting with the call to spark-submit, is as follows:

cmd_output = subprocess.Popen(spark_submit_cmd, shell=True,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def getYARNApplicationID(app_name):
state = 'RUNNING,ACCEPTED,FINISHED,KILLED,FAILED'
out = subprocess.check_output(["yarn","application","-list",
"-appStates",state], stderr=subprocess.DEVNULL,
universal_newlines=True)
lines = [x for x in out.split("\n")]
application_id = ''
for line in lines:
if app_name in line:
application_id = line.split('\t')[0]
break
return application_id
max_wait_time_job_start_s = 120
start_time = time.time()
while yarn_application_id == '' and time.time()-start_time\
< max_wait_time_job_start_s:
yarn_application_id = getYARNApplicationID(app_name)
cmd_output.wait()if yarn_application_id == '':
raise RuntimeError("Couldn't get yarn application ID for application %s" % app_name)

The code starts by launching the subordinate task as before, but with debugging disabled:

cmd_output = subprocess.Popen(spark_submit_cmd, shell=True,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

Since there are no debug messages to be displayed when the process is running, we use cmd_output.wait instead of cmd_output.communicate() to wait for the task to finish. Note that although we won’t see the Application Master’s messages, we can still debug the Spark job’s driver and executor in runtime via the Hadoop cluster UI.

However, we still need to be able to monitor the task from a programmatic point of view; more specifically, the controller task needs to know when the subordinate task has finished, whether it was successful, and take appropriate action in case of failure. For that purpose, we can use the application name that we set in the beginning:

app_name = "some_model_training"

The application name can be used by YARN to retrieve the YARN application ID, which allows us to retrieve the status and other information about the subordinate task. Again, we can resort to the subprocess library to define a function that can retrieve the application ID from the application name:

def getYARNApplicationID(app_name):
state = 'RUNNING,ACCEPTED,FINISHED,KILLED,FAILED'
out = subprocess.check_output(["yarn","application","-list",
"-appStates",state], stderr=subprocess.DEVNULL,
universal_newlines=True)
lines = [x for x in out.split("\n")]
application_id = ''
for line in lines:
if app_name in line:
application_id = line.split('\t')[0]
break
return application_id

Observe that getYARNApplicationID parses the output of the yarn application -list shell command. Depending on your Hadoop version the output format may be slightly different and the parsing needs to be adjusted accordingly. If in doubt, you can test the format by running the following command in the terminal:

$ yarn application -list -appStates RUNNING,ACCEPTED,FINISHED,KILLED,FAILED

The tricky aspect is that this method can only work if the application name is unique in the Hadoop cluster. Therefore, you need to make sure you are creating a unique application name, for instance by including timestamps, random strings, your user ID, etc. Optionally, you can also add other filters when attempting to parse the output of yarn application -list, for example, the user ID, the YARN queue name or the time of the day.

Since the Spark job takes some time to be registered in YARN after it has been launched using spark-submit, I implemented the loop:

max_wait_time_job_start_s = 120
start_time = time.time()
while yarn_application_id == '' and time.time()-start_time\
< max_wait_time_job_start_s:
yarn_application_id = getYARNApplicationID(app_name)

where max_wait_time_job_start_s is the time to wait for the registration in seconds, which may need to be adjusted according to your environment.

The meaning of

if yarn_application_id == '':
raise RuntimeError("Couldn't get yarn application ID for"\
" application %s" % app_name)

is straightforward; if there is no application ID, it means the Spark job has not been successfully launched and we need to throw an exception. This may also indicate that we need to increase max_wait_time_job_start_s, or change how the output of yarn application -list is parsed inside getYARNApplicationID.

Checking the final status of the subordinate task

After the subordinate task has finished, checking its final status can be done as follows:

def getSparkJobFinalStatus(application_id):
out = subprocess.check_output(["yarn","application",
"-status",application_id], stderr=subprocess.DEVNULL,
universal_newlines=True)
status_lines = out.split("\n")
state = ''
for line in status_lines:
if len(line) > 15 and line[1:15] == "Final-State : ":
state = line[15:]
break
return state
final_status = getSparkJobFinalStatus(yarn_application_id)

where again, you may need to tune the parsing of yarn application -status depending on your Hadoop version. How to handle the final status is entirely up to you, but one possibility is to store the Spark job’s driver and executor log in a file and raise an exception. For example:

log_path = "/home/edsonaoki/logs/%s_%s.log" % (app_name, 
yarn_application_id)
if final_status != "SUCCEEDED":
cmd_output = subprocess.Popen(["yarn","logs",
"-applicationId",yarn_application_id],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
bufsize=1, universal_lines=True)
with open(log_path, "w") as f:
for line in cmd_output.stdout:
f.write(line)
print("Written log of failed task to %s" % log_path)
cmd_output.communicate()
raise RuntimeError("Task %s has not succeeded" % app_name)

Using multithreading to execute subordinate tasks in parallel

If not obvious, before attempting to execute subordinate tasks in parallel, make sure to test as many as tasks as possible without parallelisation, as debugging parallel tasks can be incredibly difficult.

To perform parallelisation we will use Python’s concurrent library. The concurrent library uses multithreading and not multiprocessing; i.e. the threads do run in the same processor, such that from the side of the controller task, there is no real parallel processing. However, since the threads started in the controller task are in I/O mode (unblocked) when waiting for the subordinate tasks to finish, multiple subordinate tasks can be launched asynchronously, such that they will actually run in parallel in the side of the Hadoop cluster. While we can technically use the multiprocessing library instead of the concurrent library to achieve parallelism also from the controller task’s side, I would advise against it as it will substantially increase the memory consumption in the client environment for little benefit — the idea is that the “tough processing” is done in the Hadoop cluster.

When we launch a Spark job, we are typically aware of the constraints of processing and memory in the cluster environment, especially in the case of a shared environment, and use configuration parameters such as spark.executor.memory and spark.executor.instances in order to control the task’s processing and memory consumption. The same needs to be done in our case; we need to limit the number of subordinate tasks that execute simultaneously according to the availability of computational resources in the cluster, such that when we reach this limit, a subordinate task can only be started after another has finished.

The concurrent package offers the futures.ThreadPoolExecutor class which allows us to start multiple threads and wait for them to finish. The class also allows us to limit the number of threads doing active processing(i.e. not blocked by I/O) via the max_workers argument. However, as I mentioned before, a thread in the controller task is treated as being blocked by I/O when the subordinate task is running, which means that max_workers won’t effectively limit the number of threads. As result, all subordinate tasks will be submitted nearly simultaneously and the Hadoop cluster can become overloaded.

This can be solved rather easily by modifying the futures.ThreadPoolExecutor class as follows:

import concurrent.futures
from queue import Queue
class ThreadPoolExecutorWithQueueSizeLimit(
concurrent.futures.ThreadPoolExecutor):
def __init__(self, maxsize, *args, **kwargs):
super(ThreadPoolExecutorWithQueueSizeLimit,
self).__init__(*args, **kwargs)
self._work_queue = Queue(maxsize=maxsize)

This new class ThreadPoolExecutorWithQueueSizeLimit works exactly like futures.ThreadPoolExecutor, but it won’t allow more than maxsize threads to exist at any point of time, effectively limiting the number of subordinate tasks running simultaneously in the Hadoop cluster.

We now need to define a function, containing the execution code of the thread, which can be passed as an argument to the class ThreadPoolExecutorWithQueueSizeLimit. Based on the previous code for executing a subordinate task from Python without debugging messages, I present the following generic thread execution function:

def executeThread(app_name, spark_submit_cmd, error_log_dir, 
max_wait_time_job_start_s=120):
cmd_output = subprocess.Popen(spark_submit_cmd, shell=True,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
start_time = time.time()
while yarn_application_id == '' and time.time()-start_time\
< max_wait_time_job_start_s:
yarn_application_id = getYARNApplicationID(app_name)
cmd_output.wait()
if yarn_application_id == '':
raise RuntimeError("Couldn't get yarn application ID for"\
"application %s" % app_name)
final_status = getSparkJobFinalStatus(yarn_application_id)
log_path = %s/%s_%s.log" % (error_log_dir, app_name,
yarn_application_id)
if final_status != "SUCCEEDED":
cmd_output = subprocess.Popen(["yarn","logs",
"-applicationId",yarn_application_id],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
bufsize=1, universal_lines=True)
with open(log_path, "w") as f:
for line in cmd_output.stdout:
f.write(line)
print("Written log of failed task to %s" % log_path)
cmd_output.communicate()
raise RuntimeError("Task %s has not succeeded" % app_name)
return True

As you can see, the function uses the previously defined functions getYARNApplicationID and getSparkJobFinalStatus, and the application name, the spark-submit command line and the directory to store the error logs are passed as arguments to the function.

Note that the function raises an exception in case the yarn application ID cannot be found, or the status of the Spark job is not successful. But depending on the case, we may just want the function to return a False value, such that the controller task knows that this particular subordinate task has not been successful and needs to be executed again, without need to run again the tasks that have been already successful. In this case, we just need to replace line

raise RuntimeError("Couldn't get yarn application ID for application %s" % app_name)

and

raise RuntimeError("Task %s has not succeeded" % app_name)

with

return False

The next step is to create a generic code to start the threads and wait for their completion, as follows:

def executeAllThreads(dict_spark_submit_cmds, error_log_dir, 
dict_success_app=None):
if dict_success_app is None:
dict_success_app = {app_name: False for app_name in
dict_spark_submit_cmds.keys()}
with ThreadPoolExecutorWithQueueSizeLimit(maxsize=max_parallel,
max_workers=max_parallel) as executor:
future_to_app_name = {
executor.submit(
executeThread, app_name,
spark_submit_cmd, error_log_dir,
): app_name for app_name, spark_submit_cmd in
dict_spark_submit_cmds.items() if
dict_success_app[app_name] == False
}
for future in concurrent.futures\
.as_completed(future_to_app_name):
app_name = future_to_app_name[future]
try:
dict_success_app[app_name] = future.result()
except Exception as exc:
print('Subordinate task %s generated exception %s' %
(app_name, exc))
raise
return dict_success_app

The mandatory arguments to the function are:

  • a dictionary with application names as keys and the corresponding job submission command lines as values;
  • the directory to store the error logs.

The output of the function is also a dictionary containing the return value (True or False) of each subordinate task, indexed by application name. The optional argument is dict_success_app, that can be the return value from a previous execution from the function, in case we only want to run the subordinate tasks that have not been already successful. I will show later how that can be accomplished.

For the reader’s convenience, I put together the complete code of the parallelisation framework below:

import subprocess
import concurrent.futures
from queue import Queue
class ThreadPoolExecutorWithQueueSizeLimit(
concurrent.futures.ThreadPoolExecutor):
def __init__(self, maxsize, *args, **kwargs):
super(ThreadPoolExecutorWithQueueSizeLimit,
self).__init__(*args, **kwargs)
self._work_queue = Queue(maxsize=maxsize)
def getYARNApplicationID(app_name):
state = 'RUNNING,ACCEPTED,FINISHED,KILLED,FAILED'
out = subprocess.check_output(["yarn","application","-list",
"-appStates",state], stderr=subprocess.DEVNULL,
universal_newlines=True)
lines = [x for x in out.split("\n")]
application_id = ''
for line in lines:
if app_name in line:
application_id = line.split('\t')[0]
break
return application_id
def getSparkJobFinalStatus(application_id):
out = subprocess.check_output(["yarn","application",
"-status",application_id], stderr=subprocess.DEVNULL,
universal_newlines=True)
status_lines = out.split("\n")
state = ''
for line in status_lines:
if len(line) > 15 and line[1:15] == "Final-State : ":
state = line[15:]
break
return state
def executeThread(app_name, spark_submit_cmd, error_log_dir,
max_wait_time_job_start_s = 120):
cmd_output = subprocess.Popen(spark_submit_cmd, shell=True,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
start_time = time.time()
while yarn_application_id == '' and time.time()-start_time\
< max_wait_time_job_start_s:
yarn_application_id = getYARNApplicationID(app_name)
cmd_output.wait()
if yarn_application_id == '':
raise RuntimeError("Couldn't get yarn application ID for"\
" application %s" % (app_name))
# Replace line above by the following if you do not
# want a failed task to stop the entire process:
# return False
final_status = getSparkJobFinalStatus(yarn_application_id)
log_path = %s/%s_%s.log" % (error_log_dir, app_name,
yarn_application_id)
if final_status != "SUCCEEDED":
cmd_output = subprocess.Popen(["yarn","logs",
"-applicationId",yarn_application_id],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
bufsize=1, universal_lines=True)
with open(log_path, "w") as f:
for line in cmd_output.stdout:
f.write(line)
print("Written log of failed task to %s" % log_path)
cmd_output.communicate()
raise RuntimeError("Task %s has not succeeded" % app_name)
# Replace line above by the following if you do not
# want a failed task to stop the entire process:
# return False
return True
def executeAllThreads(dict_spark_submit_cmds, error_log_dir,
dict_success_app=None):
if dict_success_app is None:
dict_success_app = {app_name: False for app_name in
dict_spark_submit_cmds.keys()}
with ThreadPoolExecutorWithQueueSizeLimit(maxsize=max_parallel,
max_workers=max_parallel) as executor:
future_to_app_name = {
executor.submit(
executeThread, app_name,
spark_submit_cmd, error_log_dir,
): app_name for app_name, spark_submit_cmd in
dict_spark_submit_cmds.items() if
dict_success_app[app_name] == False
}
for future in concurrent.futures\
.as_completed(future_to_app_name):
app_name = future_to_app_name[future]
try:
dict_success_app[app_name] = future.result()
except Exception as exc:
print('Subordinate task %s generated exception %s' %
(app_name, exc))
raise
return dict_success_app

Example: Multi-label model training with 2-level parallelisation using Gradient Boosting binary classifiers

In this example, I will show how to use the framework above to parallelise training of a multi-label classifier with hundreds of labels. Basically, we will train multiple binary classifiers in parallel, where the training of each binary model is itself parallelised via Spark. The individual binary classifiers are Gradient Boosting models trained using the Spark version of the popular LightGBM package, contained in the Microsoft Machine Learning for Spark (MMLSpark) library.

Setting up the controller task

By using the framework above, there are only two other things that the controller task needs to do:

  1. Prior to calling the executeAllThreads function, set up the application name and spark-submit command for each subordinate task;
  2. After returning from the executeAllThreads function, check which subordinate tasks have been successful and handle their output appropriately.

For the first part, we can start by looking at our previous example where we are submitting a standalone subordinate job:

spark_config_cluster_path = "/home/edsonaoki/spark_config_cluster"
app_name = "some_model_training"
spark_config = {
"spark.jars.packages" :
"com.microsoft.ml.spark:mmlspark_2.11:0.18.1",
"spark.dynamicAllocation.enabled": "false",
"spark.executor.instances": "10",
"spark.yarn.dist.files": "/home/edsonaoki/custom_packages.tar"
}
command = "lightgbm_training.py "\
"hdfs://user/edsonaoki/datasets/input_data.parquet"\
"hdfs://user/edsonaoki/models"
spark_submit_cmd = "SPARK_CONF_DIR=%s spark-submit -name %s %s %s"
% (spark_config_cluster_path, app_name,
" ".join(['-conf %s="%s"' % (key, value) for key, value in
spark_config.items()]),
command)

What do we need to adapt the code for multi-label classification? First, for the reasons already mentioned, the application name needs to be completely unique. Assuming that the label columns of the dataset input_data.parquet are contained in a variable lst_labels, one way to ensure likely unique applications IDs for each subordinate task would something like:

import time
curr_timestamp = int(time.time()*1000)
app_names = ["model_training_%s_%d" % (label,curr_timestamp) for
label in lst_labels]

This ensures that application names will be unique as long as the controller task is not started more once in the same millisecond (of course, if we have a shared YARN cluster other adaptions may be needed to make the application names unique, such as adding the username to the application name).

We are yet to discuss how the subordinate task code contained in lightgbm_training.py looks like, but let’s suppose it:

  • Performs some pre-processing on the training data, based on the label column (such as dataset balancing), using a function contained in the custom_packages.tar file submitted along with the Spark job
  • Trains the model based on the features column and the label column
  • Saves the trained model in the HDFS system

In this case, the controller task needs to pass the HDFS path of the training dataset, the HDFS path to store the trained models, and the label to be used for each subordinate task, via command-line arguments to lightgbm_training.py. This can be done as shown below:

dict_spark_submit_cmds = dict()
for i in range(len(lst_labels)):
command = "lightgbm_training.py "\
"hdfs://user/edsonaoki/datasets/input_data.parquet "\
"hdfs://user/edsonaoki/models "\
+lst_labels[i]
spark_submit_cmd = “SPARK_CONF_DIR=%s spark-submit -name %s "\
"%s %s" % (spark_config_cluster_path, app_names[i],
" ".join(['-conf %s="%s"' % (key, value) for key, value in
spark_config.items()]),
command)
dict_spark_submit_cmds[app_names[i]] = spark_submit_cmd

Of course, there are many other ways to customise the subordinate tasks. We might want to use different model training hyperparameters, different datasets, different Spark configurations, or even use different Python scripts for each subordinate task. The fact that we allow the spark-submit command line to be unique for each subtask allows complete customisation.

For the reader’s convenience, I put together the controller task’s code prior to and until calling executeAllThreads:

import time
spark_config_cluster_path = "/home/edsonaoki/spark_config_cluster"
curr_timestamp = int(time.time()*1000)
app_names = ["model_training_%s_%d" % (label,curr_timestamp) for
label in lst_labels]
spark_config = {
"spark.jars.packages" :
"com.microsoft.ml.spark:mmlspark_2.11:0.18.1",
"spark.dynamicAllocation.enabled": "false",
"spark.executor.instances": "10",
"spark.yarn.dist.files": "/home/edsonaoki/custom_packages.tar"
}
dict_spark_submit_cmds = dict()
for i in range(len(lst_labels)):
command = "lightgbm_training.py "\
"hdfs://user/edsonaoki/datasets/input_data.parquet "\
"hdfs://user/edsonaoki/models "\
+lst_labels[i]
spark_submit_cmd = “SPARK_CONF_DIR=%s spark-submit -name %s "\
"%s %s" % (spark_config_cluster_path, app_names[i],
" ".join(['-conf %s="%s"' % (key, value) for key, value in
spark_config.items()]),
command)
dict_spark_submit_cmds[app_names[i]] = spark_submit_cmd
executeAllThreads(dict_spark_submit_cmds, "/home/edsonaoki/logs")

For the second part, i.e. what the controller task should do after returning from executeAllThreads, assuming that the successful tasks have saved the trained models in the HDFS system, we can just open these files and process them as appropriate, for instance applying the models to some appropriate validation dataset, generating plots and computing performance metrics.

If we use the parallelisation framework presented earlier as it is, there won’t be “unsuccessful subordinate tasks” as any failure will result in an exception being raised. But if we modified executeThread to return False in case of task failure, we might store the returning dict_success_app dictionary in a JSON or Pickle file such that we can later investigate and fix the failed tasks. Finally, we can call again executeAllThreads with the optional argument dict_success_app set such that we re-run only the failed tasks.

Setting up the subordinate task

Let us now write the code of the subordinate task in the lightgbm_training.py script. The first step is to read the input arguments of the script, i.e. the path of the training dataset in the HDFS filesystem, the path to store the models and the name of the label column:

import sys
train_data_path = sys.argv[1]
model_path = sys.argv[2]
label = sys.argv[3]

Since we are using the Spark version of LightGBM, we need to create a Spark session, which we do as follows:

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.sparkContext.addPyFile("./custom_packages.tar")

Note that there is no need to set up any configuration for the Spark session, as it has been already done in the command line submitted by the controller task. Also, since we explicitly submitted a custom Python package custom_packages.tar to the Spark job, we need to use the addPyFile function to make the contents of the package usable inside our code, as the package is not included in the PYTHONPATH environment variable of the Hadoop cluster.

The code that does the actual processing in the subordinate task is pretty straightforward. The subordinate task will read the training data, call some pre-processing function inside custom_packages.tar (say custom_data_preprocessing.datasetBalancing), perform the model training, and save the trained model with a unique name back in the HDFS file system:

from custom_data_preprocessing import datasetBalancing
from mmlspark import LightGBMClassifier
df_train_data = spark.read.parquet(train_data_path)
df_preproc_data = datasetBalancing(df_train_data, label)
untrained_model = LightGBMClassifier(learningRate=0.3,
numIterations=150,
numLeaves=45)\
.setFeaturesCol("features")\
.setLabelCol(label)
trained_model = untrained_model.fit(df_preproc_data)
trained_model.write().overwrite()\
.save(model_path + "/trained_model_%s.mdl" % label)
spark.stop()

The full code of lightgbm_training.py is put together below for the reader’s convenience:

import sys
train_data_path = sys.argv[1]
model_path = sys.argv[2]
label = sys.argv[3]
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.sparkContext.addPyFile("./custom_packages.tar")
from custom_data_preprocessing import datasetBalancing
from mmlspark import LightGBMClassifier
df_train_data = spark.read.parquet(train_data_path)
df_preproc_data = datasetBalancing(df_train_data, label)
untrained_model = LightGBMClassifier(learningRate=0.3,
numIterations=150,
numLeaves=45)\
.setFeaturesCol("features")\
.setLabelCol(label)
trained_model = untrained_model.fit(df_preproc_data)
trained_model.write().overwrite()\
.save(model_path + "/trained_model_%s.mdl" % label)
spark.stop()

Conclusion

It is easy to see that the framework presented in this article can be re-used for various tasks other than multiple machine learning model training. A question is that may arise is whether it can be used for different cluster environments, for instance with Spark on Mesos rather than Spark on YARN. I believe so, but some adaptations are needed as the presented code relies heavily on the yarn command to monitor the subordinate tasks.

By using this framework, data scientists can focus more of their time on designing the data tasks, not on manually executing them for dozens or hundreds of small variations. Another advantage is that by harnessing parallelisation, the tasks can be done in much less time, or from a different perspective, without requiring multiple data scientists to work simultaneously to complete the tasks in the same amount of time.

Naturally, this article presents only one of many ways to improve data science automation. Organisations that realise that the time of data scientists and other skilled tech professionals is highly valuable will certainly find increasingly more ways to help these professionals focus on higher-level problems.

--

--

Data scientist at DBS Bank. All views are solely my own and not from my organization.