Watermarking in Spark Structured Streaming

Thomas Treml
Towards Data Science
3 min readOct 4, 2018

--

Handling late arriving events is a crucial functionality for Stream Processing Engines. A solution to this problem is the concept of watermarking. And it is supported by the Structured Streaming API since Spark 2.1.

What is a Watermark?

Watermarking is a useful method which helps a Stream Processing Engine to deal with lateness. Basically, a watermark is a threshold to specify how long the system waits for late events. If an arriving event lies within the watermark, it gets used to update a query. Otherwise, if it’s older than the watermark, it will be dropped and not further processed by the Streaming Engine.

Flooding watermarks

But, Why Should I Care?

In distributed and networked systems, there’s always a chance for disruption — nodes going down, sensors are loosing connection and so on and so forth. Because of that, it’s not guaranteed that data will arrive in a Stream Processing Engine in the order they were created. For the sake of fault tolerance it’s therefore necessary to handle such Out-of-Order data.

To deal with this problem, the state of an aggregate must be preserved. In case a late event occurs, the query can then be reprocessed. But this means that the state of all aggregates must kept indefinitely, which causes the memory usage to grow to indefinitely too. And that is not practical in a real world scenario, unless the system has unlimited resources (resp. an unlimited budget). Therefore watermarking is a useful concept to constrain the system by design and prevent it from exploding at runtime.

How to use it?

Since Spark 2.1, watermarking is introduced into Structured Streaming API. You can enable it by simply adding the withWatermark-Operator to a query:

withWatermark(eventTime: String, delayThreshold: String): Dataset[T]

It takes two Parameters, a) an event time column (must be the same as the aggregate is working on) and b) a threshold to specify for how long late data should be processed (in event time unit). The state of an aggregate will then be maintained by Spark until max eventTime — delayThreshold > T , where max eventTime is the latest event time seen by the engine and T is the starting time of a window. If late data fall within this threshold, the query gets updated eventually (right image in the figure below). Otherwise it gets dropped and no reprocessing is triggered (left image in figure below).

Late donkey in structured word count: event dropped (left), event within watermark updates Window 2 (right).

It’s important to mention that the output mode of the query must be set either to "append" (which is the default) or "update”. Complete-mode can’t be used in conjunction with watermarking by design, because it requires all the data to be preserved for outputting the whole result table to a sink.

A quick demonstration, how to use the concept in a simple Spark Structured Streaming application, can be found here— and it’s a word count (with some minor NLP enhancements), what else :D

--

--

Data Professional with background in Sociology, who ❤️ #coding #machinelearning #espresso #cycling #donkies