DOING ML EFFECTIVELY AT STARTUP SCALE

The modern data pattern

Replayable data processing and ingestion at scale with serverless, Snowflake and dbt

Luca Bigon
Towards Data Science
10 min readJan 21, 2022

--

MLOps without too much Ops — Episode 4, feat. Jacopo Tagliabue, Ciro Greco and Andrea Polonioli

Introduction

As we discussed in the previous episodes, data is an integral part of ML pipelines: following the data-centric approach and then generalizing a bit, we may say that a sound pipeline for a data company is composed of two main blocks, dataOps (ingestion, normalization, preparation, visualization, monitoring etc.), and MLOps (training, testing, deploying, monitoring etc.).

After some philosophical posts, we are back to actual (open source) code (yeah yeah, “talk is cheap, show me the code”). We will show how to ingest, store, transform data using a combination of open-source and PaaS/SaaS technologies: following our credo — “MLOps without much Ops” — we provide a plug-and-play template, tried and tested at “reasonable scale” with terabytes of data.

We make use of three main technologies:

  • Pulumi, which allows us to manage infrastructure-as-code (in Python).
  • Snowflake, which allows us to store raw data and manipulate it with powerful queries, abstracting away the complexity of distributed computing.
  • dbt, which allows us to define data transformation as versioned, replayable DAGs, and mix-and-match materialization strategies to suit our needs.

While we present a working pipeline for a non-toy e-commerce scenario (to immediately leverage our real-world open dataset and experience), everything we share is readily applicable to a vast number of use cases: as even medium-sized shops can produce and run sophisticated ML flows, we found that tools that pass the e-commerce tech bar usually behave well in most industries.

Clone the repo, and tag along: we present the basic blocks first, for readers completely new to this, and then more advanced patterns later, to showcase the stack flexibility in more advanced use cases.

Version data, not just code

To set the stage and drive some preliminary points home, let’s imagine a typical e-commerce use case i.e. providing recommendations for myshop.com (recommendations are a common data-driven feature, and most practitioners should be able to easily imagine themselves into our shoes).

Before discussing ingestion per se, we want to make sure a more general pattern is clear. Every day, myshop.com provides us with a data dump of its inventory, that is, all the products myshop is selling that day. So, at day 1, we receive these products:

+------------+-------+--------------------+
| PRODUCT ID | PRICE | COLORS |
+------------+-------+--------------------+
| 123 | 12 | Blue / White |
| 456 | 45 | Black |
| 789 | 33 | Red / Blue / White |
+------------+-------+--------------------+

At day 2, we receive a slightly different catalog: 456 has more colors now, and 789 costs a bit less:

+------------+-------+--------------------+
| PRODUCT ID | PRICE | COLORS |
+------------+-------+--------------------+
| 123 | 12 | Blue / White |
| 456 | 45 | Black / Blue |
| 789 | 28 | Red / Blue / White |
+------------+-------+--------------------+

What is the proper way of ingesting and storing this information? The “naive ETL” way will have a storage place — say, a table, an index in Elasticsearch, etc. — containing the products of myshop: when the catalog dump from day 2 is received, the corresponding items are updated/overwritten, so we are always guaranteed to work with the “freshest” version. While this sounds appealing, it has a major drawback: it destroys the past forever, as now, at day 2, there is no easy way to recover the original color of 456. That has several implications:

  • we can’t “time-travel” anymore in our data stack: we can’t debug errors from last week in data-driven system whose input is the catalog — data would have changed by now, and reproducing the state of the system is impossible;
  • by losing track of the original state, any update or modification is much harder to undo, in case of mistakes in the business logic;
  • it becomes harder to test new code and iterate on it, as different runs will produce slightly different outcomes.

A better pattern is the append-only log pattern, that is, maintaining a write-only table that stores all data relevant to the universe in which we operate (in this case, e-commerce data for myshop), in one immutable ledger: since we cannot change the past, our log-like table will be a perpetual memory of the state of our universe at any given point in time. Practically, it means having at day 2 something like the following raw table:

+------------+-----+--------------------------------------------+
| TIMESTAMP | PID | RAW |
+------------+-----+--------------------------------------------+
| 1638139610 | 123 | {"price":12,"colors":"Blue / White"} |
| 1638139610 | 456 | {"price":45,"colors":"Black"} |
| 1638139610 | 789 | {"price":33,"colors":"Red / Blue / White"} |
| 1638226021 | 123 | {"price":12,"colors":"Blue / White"} |
| 1638226021 | 456 | {"price":45,"colors":"Black / Blue"} |
| 1638226021 | 789 | {"price":28,"colors":"Red / Blue / White"} |
+------------+-----+--------------------------------------------+

The timestamp places the event (in this case, the ingestion of a product and its metadata) in the ledger, and the raw data is dumped as a JSON: product 789 has two rows — one for each ingestion event — with two associated collections of metadata, as expected. From this representation, it is always possible to recover the “day 2 table” as a view over this stream, that is, a “photograph” capturing for each product the freshest info we have. But having the freshest table does not destroy previous information, in exactly the same way as running the latest code pulled from git does not destroy previous versions of it: it is not by chance that this structure is nicknamed “git for products” in our production stack.

Of course, the combination of a never-ending log table and then practical snapshots, capturing the state of the universe at certain points in time, is now way easier to implement than before, as a result of three main trends:

  1. data storage is getting cheaper by the minute,
  2. modern data warehouses have superb query performances and great JSON support,
  3. open source tools like dbt made it easier for people with a broad range of skills to transform data in a versioned, replayable, testable way.

Building a noOps ingestion platform

Now that the log+snapshot pattern has been explained, we can dive deeper into the project: while we leave the details to the repo, it is useful to provide a high-level overview of the tooling and the functional layer. The ingestion pipeline mimics a typical data flow for data-driven applications: clients send events, an endpoint collects them and dumps them into a stream, finally a data warehouse stores them for further processing.

Functional overview of the ingestion pipeline: we use AWS lambda for the endpoint, AWS firehose for the stream, Snowflake as a data warehouse and dbt to organize the DAG. The entire infrastructure is managed in code with Pulumi (image by the authors).

In our example, we are providing recommendations for myshop.com, so:

  • The clients sending events are the shoppers browsers: as users browse products on myshop.com, a Javascript SDK sends analytics to our endpoint to collect behavioral signals (to be later used for our recommender!). To simulate these on-site events, the repository contains a pumper script, which streams realistic payloads to the cloud;
  • the endpoint is responding with a pixel, a 1x1 transparent gif used for dealing with client-server tracking (e.g. Google Analytics);
  • behavioral signals are dumped into a raw table in Snowflake;
  • SQL-powered transformations (through dbt) take the raw events and build features, intermediate aggregations, normalized views etc. While detailing downstream applications is outside the scope of this post (but see this for a fleshed out example), you can imagine B.I. tools and ML pipelines to use these nicely prepared tables as input to their own workflow.

As usual, we would like to point out that no devOps person has been harmed (or even called) to run this platform: the endpoint runs in a serverless fashion and will scale automatically thanks to AWS lambda (same goes for data stream on Firehose); Snowflake computing power can be, if necessary, adjusted per query, without any maintenance or arcane configuration (Spark, thinking of you!); the dbt flow can be run either in a SaaS orchestrator (e.g. Prefect) or, even through the “native” dbt cloud.

Once again, our proposal sits in the middle of the spectrum, between the custom infrastructure of tech giants running “impossible scale” loads, and monolithic, end-to-end enterprise platforms, that offer little-to-no-control over the available functionalities.

It really doesn’t take much to spin it up, as we hand over to third-parties the underlying infrastructure layer, but it is infinitely extensible, as we can pick and choose different tools when / if we reach another scale or change our needs substantially (e.g. switch Firehose with Kafka, run your own dbt job, etc.).

In other words, the full “ingestion story” is unlikely to end here for fast-growing companies, but for sure this is a good place to start.

Bonus: re-creating the lambda architecture in one stack

In the Big Data era, lambda architectures (to not be confused with AWS or Python lambdas!) were a popular solution for data-driven applications where both historical data — aggregations and counts for the last 30 days — and fresh data — aggregation and counts for the last hour — matter, for visualization, monitoring or ML purposes. In a nutshell, you run a batch and a streaming system in parallel and join results for clients downstream: a dashboard would render revenue trends by reading from a table which is partially built by slow batch processes and fast streaming ones. Lambdas faded a bit as, on the one hand, many critics advocated for leaner solutions not involving maintaining de facto two stacks, and, on the other hand, much ML still today doesn’t really use real-time data — and even when it does, it is often the “online prediction with batch features” scenario. Can we dream again the lambda dream and leverage the modern data stack to get the cake and eat it too?

Consider again a popular problem in user analytics, i.e. sessionization — the problem of taking a raw stream of events, partitioning them per user, and then grouping events in windows of contiguous activities (“sessions”) according to some time threshold. For example, the shopper in the image below has interacted with four sneakers on myshop.com:

Sessionization example: behavioral events are aggregated post hoc based on timestamp (image by the authors).

At ingestion time (the raw events in the gray boxes above), the pipeline makes no distinctions between the four interactions; it is the job of sessionization later to group these interactions in two sessions, based on a 30 minutes threshold, as in Google Analytics: since there are more than 30 minutes (42) between the 2nd and the 3rd interactions, the 3rd event marks the start of a new session for the shopper.

In the back-end, sessionization requires some heavy lifting with window functions, so we would love to run that computation in batch without much latency constraints; on the other hand, it would be nice if recent events (e.g. session counts) could still be somehow actionable in the stack. The stylized flow below shows how to re-use the same ideas we introduced previously — the append-only table, DAG-based processing, materialized and runtime views, etc. — to solve the lambda conundrum with a single stack.

A stylized lamba example with materialized tables and views (image by the authors).

The intuition is that the ever-growing log table (green) can logically be split into two parts: the distant past (orange), immutable and for which sessions have been already assigned, once and for all, and the immediate past (pink), where new events are still coming in and things are still in flux. We can then run the same SQL logic in one stack, avoiding therefore two big drawbacks of original lambda architectures. dbt makes it easy to express the pattern, and then to reap the benefits wit with a final downstream table (red) unifying the two sessionization tables into one, best-of-breed set of records for downstream applications, such as our internal metrics dashboard, or ML model now able to re-train on fresh data.

While this pattern does not replace a full-fledged low-latency streaming solution, it opens the possibility of doing almost real-time (~5 min) analysis on terabytes of data at a scale and speed that the original lambda architecture, with all its complexity, could only dream of.

If you have already adopted Snowlake, dbt and the log pattern, it also helps you bridge the gap between batch and real-time in a gentle way, before investing in a parallel stack for streaming that you may not need if your acceptable latency is measured in minutes.

As we stressed above, this may not be the place to end your data journey, but it may be a place to start: it is another testament to the incredible progress made by the field that such complex transformations can now be run at scale by a small team with almost no upfront infrastructure spending.

What’s next?

Once all the data is cleaned, standardized and in the right place (and only then), productive Machine Learning at scale can happen. In the next episode of our series, we will pick up from where we left things here (that is, in a data warehouse), and discuss how modern ML tools can get you from data to APIs in no-time.

If you like this blog post, please add a star on Github and share it with your team: as usual, we welcome any feedback, comment and war story from other peers doing “ML at Reasonable Scale”.

Acknowledgements

We wish to thank Fabio Melen, Amir Sadoughi, Chip Huyen, Zhenzhong Xu, Ryan Vilim, Leopoldo Garcia Vargas for their precious feedback this project.

--

--