Source (PublicDomainPicture): https://pixabay.com/en/baby-boy-child-childhood-computer-84627/

Productizing ML Models with Dataflow

Ben Weber
Towards Data Science
7 min readFeb 10, 2018

--

One of the key challenges I’ve faced in my data science career is translating findings from exploratory analysis into scalable models that can power products. In the game industry, I built several predictive models for identifying player churn, but it was always a struggle to get these models put into production. I’ve written about some of the processes used to productize models at Twitch, but each product team required a unique approach and different infrastructure.

At Windfall, we’re empowering our data science team to own the process for productizing models. Rather than relying on a engineering team to translate a model specification to a production system, we provide our data scientists with the tools needed to scale models. To accomplish this, we use predictive model markup language (PMML) and Google’s Cloud Dataflow. Here is our workflow for building and deploying models at Windfall:

  1. Models are trained offline in R or Python
  2. Trained models are translated to PMML
  3. Data flow jobs ingest PMML models for production

This approach enables data scientists to work locally with sampled data sets for training models, and then use the resulting model specifications on our entire data set. Step 3 may take some initial support from an engineering team, but only needs to be set up once. Using this approach means that our data science team can use any predictive model supported by PMML, and leveraging the managed Dataflow service means that the team does not need to worry about maintaining infrastructure.

This tutorial walks through the steps of translating from an offline model trained in R to a productized model using the Java SDK for Cloud Dataflow. All of the code presented in this tutorial is available on my github profile. We use the natality public dataset available for BigQuery, and train a linear regression model to predict infant birth weight based on a number of factors. The Dataflow job reads records from the public data set, applies the trained regression model to each of the records, and writes the results to a table in our BigQuery project. This is a workflow that our data science team employes when building custom models for customers, such as lead scoring using a random forest model.

Training the Model
The first thing that we need to do is familiarize ourselves with the natality data set, which records births in the United States between 1969 and 2008. Some additional details about this data set are provided in this medium post. We used the BigQuery web interface to author a query to sample the data set. The query pulls the following attributes for each birth:

  • year: the year that the birth took place
  • plurality: the number of children delivered
  • apgar_5min: a health score assigned to newborn babies after 5 minutes
  • mother_age: the age of the mother
  • father_age: the age of the father
  • gestation_weeks: the number of weeks into the pregnancy
  • ever_born: how many children the mother has delivered
  • mother_married: was the mother married at the time of the birth?
  • weight_pounds: the weight of the baby (what we are predicting)

In order to work with this data set in R, we use the excellent bigrquery library, which makes it easy to read BigQuery results into a data frame. We pull a sample data set as follows:

library(bigrquery)
project <- "gcp_project_id"
sql <- "
SELECT year, plurality, apgar_5min, mother_age, father_age,
gestation_weeks, ever_born, mother_married, weight_pounds
FROM `bigquery-public-data.samples.natality`
order by rand()
LIMIT 10000
"
df <- query_exec(sql, project = project, use_legacy_sql = FALSE)

We can then do various operations on the data set such as summary(df) and hist(df$df$weight_pounds), which generates the following chart that shows the distribution of infant weights in the sampled data set.

Distribution of birth weights in the sampled data set.

Next, we train a linear regression model to predict the birth weight, and compute error metrics:

lm <- lm(weight_pounds ~ ., data = df)
summary(lm)
cor(df$weight_pounds, predict(lm, df))
mean(abs(df$weight_pounds - predict(lm, df))) sqrt(mean(abs(df$weight_pounds - predict(lm, df)^2)))

Which produces the following results:

  • Correlation Coefficient: 0.335
  • Mean Error: 0.928
  • RMSE: 6.825

The model performance is quite weak, and other algorithms and features could be explored to improve it. Since the goal of this post is to focus on productizing a model, the trained model is sufficient. The complete notebook used to train the model is available here.

Translating to PMML
The next step is to translate the trained model into PMML. The r2pmml R package and the jpmml-r tool make this process easy and support a wide range of different algorithms. The first library does a direct translation of a R model object to a PMML file, while the second library requires saving the model object to an RDS file and then running a command line tool. We used the first library to do the translation directly:

library(r2pmml)
r2pmml(lm, "natality.pmml")

This code generates the following pmml file. The PMML file format specifies the data fields to use for the model, the type of calculation to perform (regression), and the structure of the model. In this case, the structure of the model is a set of coefficients, which is defined as follows:

<RegressionTable intercept="7.5619">   
<NumericPredictor name="year" coefficient="3.6683E-4"/>
<NumericPredictor name="plurality" coefficient="-2.0459"/>
<NumericPredictor name="apgar_5min" coefficient="9.4773E-5"/>
...
<NumericPredictor name="mother_married" coefficient="0.2784"/>
</RegressionTable>

We now have a model specification that we are ready to productize and apply to our entire data set.

Productizing with Cloud Dataflow
When using the Dataflow Java SDK, you define an graph of operations to perform on a collection of objects, and the service will automatically provision hardware to scale up as necessary. In this case, our graph is a set of three operations: read the data from BigQuery, calculate the model prediction for every record, and write the results back to BigQuery. This tutorial generates the following Dataflow DAG:

The Dataflow graph of operations used in this tutorial.

We use IntelliJ IDEA for authoring and deploying Dataflow jobs. While setting up the Java environment is outside of the scope of this tutorial, the pom file used for building the project is available here. It includes the following dependencies for the Dataflow sdk and the JPMML library:

<dependency>            
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.jpmml</groupId>
<artifactId>pmml-evaluator</artifactId>
<version>1.3.9</version>
</dependency>

As shown in the figure above, our data flow job consists of three steps that we’ll cover in more detail. Before discussing these steps, we need to create the pipeline object:

PmmlPipeline.Options options = PipelineOptionsFactory
.fromArgs(args).withValidation().as(PmmlPipeline.Options.class);
Pipeline pipeline = Pipeline.create(options);

We create a pipeline object, which defines the set of operations to apply to a collection of objects. In our case, the pipeline is operating on a collection of TableRow objects. We pass an options class as input to the pipeline class, which defines a set of runtime arguments for the dataflow job, such as the GCP temporary location to use for running the job.

The first step in the pipeline is reading data from the public BigQuery data set. The object returned from this step is a PCollection of TableRow objects. The feature query String defines the query to run, and we specify that we want to use standard SQL when running the query.

private static final String featureQuery =     
"SELECT year, plurality, apgar_5min ... weight_pounds\n" +
"FROM `bigquery-public-data.samples.natality`";
pipeline.apply(BigQueryIO.read().fromQuery(featureQuery)
.usingStandardSql().withoutResultFlattening())

The next step in the pipeline is to apply the model prediction to every record in the data set. We define a PTransform that loads the model specification and then applies a DoFn that performs the model calculation on each TableRow.

.apply("PMML Application", new PTransform<PCollection<TableRow>,
PCollection<TableRow>>() {
model = new RegressionModelEvaluator(PMMLUtil.unmarshal(
Resources.getResource("natality.pmml").openStream()));
return input.apply("To Predictions", ParDo.of(
new DoFn<TableRow, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {

/* Apply Model */
}})))

The apply model code segment is shown below. It retrieves the TableRow to create an estimate for, creates a map of input fields for the pmml object, uses the model to estimate the birth weight, creates a new TableRow that stores the actual and predicted weights for the birth, and then adds this object to the output of this DoFn. To summarize, this apply step loads the model, defines a function to transform each of the records in the input collection, and creates an output collection of prediction objects.

TableRow row = c.element();HashMap<FieldName, Double> inputs = new HashMap<>();            
for (String key : row.keySet()) {
if (!key.equals("weight_pounds")) {
inputs.put(FieldName.create(key), Double
.parseDouble(row.get(key).toString()));
}
}
Double estimate =(Double)model.evaluate(inputs)
.get(FieldName.create("weight_pounds"));
TableRow prediction = new TableRow(); prediction.set("actual_weight", Double.parseDouble(
row.get("weight_pounds").toString())); prediction.set("predicted_weight", estimate);
c.output(prediction);

The final step is to write the results back to BigQuery. Earlier in the class, we defined the schema to use when writing records back to BigQuery.

List<TableFieldSchema> fields = new ArrayList<>();    
fields.add(new TableFieldSchema()
.setName("actual_weight").setType("FLOAT64"));
fields.add(new TableFieldSchema()
.setName("predicted_weight").setType("FLOAT64"));
TableSchema schema = new TableSchema().setFields(fields);
.apply(BigQueryIO.writeTableRows()
.to(String.format("%s:%s.%s", PROJECT_ID, dataset, table))
.withCreateDisposition(BigQueryIO.Write.CreateDisposition
.CREATE_IF_NEEDED).withSchema(schema));
pipeline.run();

We now have a pipeline defined that we can run to create predictions for the entire data set. The full code listing for this class is available here. Running this class spins up a Dataflow job that generates the DAG shown above, and will provision a number of GCE instances to complete the job. Here’s an example of the autoscaling used to run this pipeline:

Autoscaling the Model Predicting Task

When the job completes, the output is a new table in our BigQuery project that stores the predicted and actual weights for all of the records in the natality data set. If we want to run a new model, we simply need to point to a new PMML file in the data flow job. All of the files needed to run the offline analysis and data flow project are available on Github.

Dataflow is a great tool for enabling data scientists to productize their models. It provides a managed environment that can scale to huge data sets, and enables data science teams to own more of the production process.

Ben Weber is the lead data scientist at Windfall Data, where our mission is to identify the net worth of every household in the world.

--

--