The world’s leading publication for data science, AI, and ML professionals.

Real-Time Crowdedness Predictions for Train Travelers

Using serverless Azure technology to provide streaming predictions to our travel planner app

With Wessel Radstok

Travelers on the Dutch Railways can use the app from the Dutch railway agency to plan their trip. While planning the trip, the app shows a prediction for the crowdedness of the train in question. This is shown as three categories: low occupation, medium, or high. The traveler can use this information to decide if they wish to take a different train that might be a bit less crowded.

These predictions are performed using a batch process. A machine learning model is trained regularly on historic data and each morning a process runs to predict the crowdedness of trains in the coming days. This is done by predicting how many passengers are expected and combining it with the capacity of the train planned for the route.

However, during the day incidents may happen that will cause trains to be cancelled, be diverted, or it might be that a double-decker train is planned but only a single-deck train is available. As a result, the traveler will see outdated crowdedness information. Around 20% of departing trains change capacity on the day of travel, and often shortly before departing.

In this blog, we explain how we built a streaming pipeline that takes real-time information on the length and type of train that is planned for a route and updates the expected crowdedness in the app. We follow a Lambda architecture where our nightly predictions implement the batch layer, and the update process implements the streaming layer. This pipeline is currently running in production, providing all train travelers in the Netherlands using our app with a more real-time view on the expected crowdedness of their trip.

We describe the approach we took to implement this architecture. Our first implementation was done using Spark Structured Streaming, which didn’t work out as we expected. Based on our experience, which we will discuss, we decided to take a different approach using serverless resources in the Azure cloud.

First attempt: Spark Structured Streaming

Our daily crowdedness predictions run on Databricks using Spark for data processing. As Spark has support for streaming data processing, it seems a logical fit to implement the real-time updates of our predictions in Spark Structured Streaming. This decision gave us the advantage that the platform was already available and we could implement the logic using the DataFrame paradigm which we already had experience with.

We started the implementation with a batch version of the model we wanted to run in a streaming version and converted this to a pure Spark Structured Streaming implementation. We ended up with a small notebook to bootstrap the streaming job and a custom python package containing the logic we needed.

During the development process we learned a few things about programming using Structured Streaming. First, the programming interface of SQL DataFrames and Structured Streaming DataFrames is not the same. Structured Streaming is a lot more limited in terms of what can be done, which meant we couldn’t implement the batch model one-to-one in a streaming fashion and we had to revise the algorithm a few times to make it work. The limited expressiveness of the Structured Streaming interface led to code that turned unreadable and was therefore difficult to maintain.

A simple example of this is that we wanted to perform an outer join on two streams of data based on a time window. However, Spark Structured Streaming requires having an equality in the join condition and we did not have two columns with the same data. We tried adding two literal fields with the same value to the two streams for the equality, but Spark is not that easily fooled. We ended up creating a "millennium" field as our time stamps are all in the 3th millennium: that works but we essentially created a "Y3K" bug.

Furthermore, we had to split the algorithm in separate steps because we had different time constraints on different parts of the model which we couldn’t implement in a single Structured Streaming job. We chose to split the model into several parts, coupled together using Azure Event Hubs as a persistent storage layer in between. This had the advantage that each part of the processing had a clear goal and could be tested individually.

We tested our flow in two ways. For the unit tests, we would simply take the streaming logic and fed it with hand-crafted batch Spark SQL DataFrames for testing. This means we could test parts of the streaming flow without actually starting a streaming job. This approach captures a lot of the functional requirements, but wouldn’t capture any timing issues. The second testing step used Spark Structured Streaming memory sinks to run the query in streaming mode to capture some timing effects as well.

Eventually, we deployed our code and we saw our cloud bill increase dramatically. We identified two reasons for this: First, Databricks is a great solution for batch analytics jobs, but it is expensive to keep running continuously for streaming jobs. Secondly, the information security policy of our employer requires us to log data access. As the state store of Structured Streaming may contain data, we had to log this as well. However, the state store is updated very often and contains many small files which lead to an enormous set of logs that is expensive to capture.

Eventually, we decided to abandon this approach. Our cloud costs were too high for the problem we tried to tackle. Combined with the fact that the model implementation was very difficult to understand and maintain due to the limited expressiveness of Spark Structured Streaming led us to the conclusion that we didn’t want to invest further to improve on this approach but to see if we could tackle this in a different way.

Redesign using serverless technologies

Noting that many parts of the flow do not require state, we landed on a system which used Azure Functions as a compute platform such that each message can be handled separately. Where state is necessary we use Stream Analytics. This allows us to compare messages, replay messages, or join them with another stream. To allow fast access to auxiliary data we use a Cosmos Database. We still use Azure Event Hubs to tie all parts together.

Azure Functions

Azure Functions are an easy method to apply operations to a stream of events. They are invoked separately for each event in the stream which makes it easy to reason about the business logic. They have native support for Python, making it easy to write maintainable operations. Because the platform manages all of the cloud connectivity boilerplate, they can easily be developed and tested locally. We use them in various parts of the flow:

  • Some functions simply filter incoming messages, reducing the compute load of subsequent steps and thus reducing capacity and cost;
  • Several functions enrich messages by joining them with other data sources available in for example Cosmos DB;
  • Other functions transforms the message from one format to e.g. the final output format;
  • Finally, we use Azure Functions to ingest data from the batch layer into the streaming layer.

Filtering, enriching, and transformations

The functions performing these steps are straightforward Python code. As an example, the main part of the filtering function is just a few lines:

def main(event: func.EventHubEvent, evh: func.Out[bytes]) -> None:
  """
  Filter messages to only send relevant messages for our streaming flow.
  """
  message = json.loads(event.get_body().decode("utf-8"))

  if _is_ns_operator(message):
    message = _remove_keys(message)
    message = _add_build_id(message)

    evh.set(str.encode(json.dumps(message)))

Listing 1: Example Azure Function code which filters and transforms messages.

Here, we take each message, filter only the messages related to trains our company operates, and remove keys (data fields) from the message we are not interested in. Finally, we add a build ID metadata to the message such that we have some tracing information for debugging purposes. For the interested reader, the JSON string is encoded as a Bytes object using str.encode(). If regular string is sent to the Event Hub, it is being pretty-printed automatically which introduces a lot of white space in the message. A Bytes object is sent unchanged.

Data ingestion into a fast Cosmos Database

In order to recompute the train crowdedness, quick access to the predicted number of travelers in the train, the capacity of the new rolling stock and the boundaries for low, medium and high classification are required. This data is generated daily as part of our batch process and written to our data lake in parquet format. Loading this data from the data lake for every recompute action is too slow. We leverage Azure Cosmos Database key-value store to make required static data available with low latency for the Azure Functions recomputing train crowdedness.

The ideal scenario is that we trigger the ingestion from our nightly batch process and can also receive whether the ingestion succeeded or failed. The ingestion process also needs to be able to read parquet files with complex types, which dropped support for an Azure Data Factory copy activity. Our solution was to leverage Azure Durable Functions. This is an extension to the standard Azure Functions platform which enables stateful, long-running functions. Specifically, durable functions support webhooks which allows us to communicate back whether ingestion succeeded to the orchestrator.

Ingestion then works as follows. Our nightly batch process triggers a durable function. This durable functions selects the correct Activity Function for the data source that needs to be ingested and triggers this Activity for each parquet file available. We then use pandas to read each file, perform some simple transformations and bulk-insert the records into the Cosmos database. The durable function automatically keeps track of any failures and will retry that function.

Azure Stream Analytics

Some operations we cannot easily perform with Azure Functions. This is primarily true for stateful operations, or for operations that combine messages over time windows.

Our daily crowdedness predictions are done in a batch process that does not compute the predictions instantaneously. It takes time, time during which new updates on train capacity may occur. If that happens, we wish to update the crowdedness twice: first on the most recent previous prediction, and next on the new predictions when they becomes available. We use Azure Stream Analytics here to keep state of update messages and replay them from a certain timestamp when a new batch prediction is available.

Azure Stream Analytics queries are written in a SQL dialect. It is relatively straight-forward to implement transformations. However, some care need to be taken when the message throughput needs to be high. In our case, a straightforward implementation couldn’t keep up with the input stream and we had to make sure that the stream analytics query was able to run in an embarrassingly parallel manner.

Embarrassingly parallel queries have some requirements and limitations. They need to process partitioned data, and they need to perform stateful operations (e.g. joins) contained within a partition. This means that when joining two Event Hub streams, they must have the same number of partitions, and data from partition 1 on the first Event Hub may only be joined to data from partition 1 on the second.

In order to solve this, we duplicate some of our data on several Event Hub partitions and essentially implement a broadcast join operation. We illustrate this in the following query. Here, each of our crowdedness predictions is given a batch ID and a batch start time which is used to decide which train capacity update message is applicable to which prediction. A message can be applicable to multiple predictions if the message arrives during the calculation of a new set of predictions). In this case multiple message are output. Each batch ID is duplicated over multiple Event Hub partitions.

SELECT
 batchid.batch_id,
 batchid.batch_start_time,
 event.message,
 event.message_timestamp INTO [Target]
FROM
 [SourceData] event TIMESTAMP BY event.message_timestamp PARTITION BY PartitionId
 JOIN [BatchId] batchid TIMESTAMP BY batchid.EventEnqueuedUtcTime PARTITION BY PartitionId
 ON
 -- Join if the batch id message was received before the message (positive DATEDIFF) and
 -- when replay when a batch id message was received after the message (negative DATEDIFF),
 -- but only if the message was enqueued after the batch start time.
 -- To allow fast re-ingesting data we discard messages which are no longer valid for the batch
  DATEDIFF(HOUR, batchid, event) BETWEEN - 24 AND 24
  AND CAST(batchid.batch_start_time AS datetime) <= CAST(event.message_timestamp AS datetime)
  AND CAST(event.message.valid_until AS datetime) >= CAST(batchid.batch_start_time AS datetime)
  AND event.PartitionId = batchid.PartitionId

Listing 2: Example Azure Stream Analytics query adding the corresponding batch ID for the prediction to each message.

End-to-end integration testing

From the initial commit on the project we decided to perform automated end-to-end integration testing on the streaming flow. This testing took the form of seeding the entry Event Hubs with sample messages that we generate, and then validating the messages created at the output Event Hubs. We also included the Cosmos database ingestion in this integration test flow. Making these tests part of our continuous deployment gave us great confidence when making changes as the number of components in the flow increased and the complexity increased with it.

Conclusions and key learnings

In our quest to give our train travelers the most recent insights into passenger crowdedness, even in situations where changes in train service occur, we embraced a lambda architecture to update our prognoses when train capacity changes.

Our initial implementation using Spark Structured Streaming didn’t perform as expected and we switched to a Serverless Architecture using Azure Event Hubs, Azure Functions, Azure Stream Analytics and Azure Cosmos DB.

The key benefits of this approach include:

  • As the developer, you are in control: it is clear which parts under perform and which parts incur the highest costs;
  • In contrast to Spark Structured Streaming, pure Python code in Azure Functions is readable, maintainable and expressive;
  • Azure Functions are cheap for stateless operations;
  • Azure Streaming Analytics is the most expensive part, and must only be used where it matters (stateful or time window operations);
  • The new solution reduced cloud infra costs significantly.

The key downsides:

  • The use of decoupled components such as Azure Functions and Azure Cosmos DB may lead to race conditions if the design is not considered very well;
  • There are many bits of infrastructure and small piece of code to manage: logic is not concentrated in a single place and requires more extensive testing.

Related Articles