From Raw Videos to GAN Training

Implementing a data pipeline and a lightweight Deep Learning data lake using ClearML on AWS

Assaf Pinhasi
Towards Data Science

--

Introduction

Hour One is an AI-centric start-up, and its main product transforms text into videos of virtual human presenters.

Generating realistic, smooth, and compelling videos of human presenters speaking and gesturing in multiple languages based on text alone is a challenging task, that requires training complex Deep Learning models — and lots of training data.

This post describes the design and implementation of a data pipeline and data management solution I built for Hour One using ClearML and AWS.

The solution is based on a lightweight version of the
Deep Lake architectural pattern.

Data pipeline and Data management for Deep Learning on Videos. Image by author

Note: I have no affiliation to the ClearML project or its backers.

From Videos to prepared Datasets

Hour One’s AI models need to take text as input, and generate realistic videos as output.

One way to achieve this is by training models on videos of real people presenting various texts.

The model then attempts to predict the next frames or sequences of frames in the video, while minimizing loss functions that help ensure the output is realistic and of high quality.

From a data preparation and management perspective, this requires:

Transforming the video data into useful representations — to enable the training mechanics to focus on the right “features” of the inputs.

E.g. representing audio in a format that is used for spectrum analysis, or encoding the video pixels into a compact format that can be fed into a model.

Enriching with data layers that provide detailed supervision —
Naively, a model trained to predict an image can attempt to minimize simple pixel-wise distance from the ground truth image.
However, this loss function may not be the optimal way to account for realism, smoothness, or consistency.

In order to support more detailed supervision, additional annotations or layers of data can be used during training.
For example, consider a layer of information (“annotation”) about the exact location of the face in each frame in the video.

These layers can be generated either programmatically, by human annotators, or both.

Cleaning the data to ensure it is suitable for training — e.g. removing sections that don’t contain a person talking to the camera.
Some cleaning logics require running on transformed or even enriched data.

Capturing metadata — to assist in the process of constructing diverse and well-balanced datasets, we need to map the data along multiple domain dimensions e.g. the genders of the presenters, lighting conditions, voice qualities etc.

The metadata may describe an entire video, sections of the videos or very short sequences inside the video, e.g. frame level.

Some basic dimensions describing the entire videos may be provided as part of acquiring the data from the source.

In other cases, the metadata needs to be computed e.g. by additional deep learning algorithms performing inference on the data.

Storing the data + metadata long term — The data in all its forms needs to be stored long term, including raw, transformed and enriched, and curated datasets.

Making the data searchable — The solution needs to allow researchers to quickly compose datasets by searching for data for instances with a desired combination of properties /dimensions/metadata — e.g. “fetch 100 training instances, up to 40% of them should have the character blinking”.

Constructing and storing versioned training datasets — Once instances were chosen for a dataset, it should be stored in a versioned manner, and pulled into the training machines whenever is required.

Light deep lake for video data. Image by author

Let’s dive in to the requirements from each part of the solution.

Requirements

Pipeline

The goal of the data pipeline sub-system is to carry out a DAG of processing steps and emit the data which will later be stored in the data management sub-system.

Inputs and triggers

The input to the pipeline is a file containing pointers to raw videos, as well as some metadata about their content.

The pipeline is typically triggered after new data was acquired, and processes only the new data increment.

Occasionally we may choose to run it on all data from scratch, or on a specific subset of input data.

Processing

The pipeline should run multiple heterogenous steps of processing on the data. Some of the steps may run external processes, some may run inference of models, and some may perform image or signal manipulation.

The output of each step may be used by the next step in the process, by the training process, or both.

Extensibility and evolution

Some low-level processing stages are considered to be relatively stable, and are unlikely to change often.

Other parts of the pipeline, such as enrichment logics, will continue to evolve at a high rate — and we need to allow researchers to add enrichment stages to the pipeline without depending on engineers.

Sub-DAG execution and backfilling

When pipeline logic evolves, the new logic needs to be run over the entire data corpus. In Data engineering speak, this is often referred to as “back filling” or “back populating”.

In our case, re-running the entire pipeline on the entire data corpus is an expensive and time-consuming effort, due to the size of data and complexity of processing.

Hence, the pipeline needs to support triggering partial executions that run only a user-specified sub-DAG.

Result caching

As a related requirement, we wanted the pipeline to be “caching aware” — i.e. skip expensive stages of processing in case nothing changed in data, code and configuration since the last execution.

Output handling semantics

When running the pipeline on older data, we may decide to overwrite the old data, or append the output as a new version of the data.

Scale out
As data corpuses keep growing over time, and we needed a solution that will enable to scale out to run on many machines.

When working in this mode, the pipeline should be invocable via a UI or a scheduler.

Local runs
At the same time, it’s very useful to be able to run the pipeline as a completely standard Python process locally — from a source, package or inside a Docker container, without depending on cloud infrastructure and without publishing its outputs, mainly for development and local testing.

CPU and GPU

Some stages in the pipeline perform activities such as video cropping or encoding/decoding which are suitable for CPU, and some stages perform inference of deep learning models (e.g. detecting a bounding box around an actor’s face), which benefit from GPU acceleration.

Users should be able to specify declaratively which tasks should run on which processing unit.

Data management

The goals of the data management sub-system is to store data and metadata long term. In addition, it should :

  1. Make the data searchable and accessible for building datasets
  2. Support the creation of new datasets in a version controlled manner
  3. Allow users to download datasets for training

Storage

For long term storage, we needed a scalable object storage technology such as S3.

Media storage formats

We want to store the larger media files — both raw and pre-processed, in a standard format that would allow to view them using standard tools wherever possible (e.g .mp4, .wav, and .png)

Metadata storage and schema
Following the Deep Lake architectural pattern, the metadata should be stored using a format that offers structure, and is queryable.

At the same time, we need to allow a high degree of flexibility in schema management, without introducing complex or rigid data engines.

Data versioning
The low-level and heavy pre-processing logic of media files doesn’t change often, and when it does, it’s usually safe to overwrite the pervious version.

Enrichment logics, on the other hand, do tend to change over time, are lighter in terms of data footprints (think bounding boxes and landmark coordinates), and hence their outputs should be versioned.

Training datasets should be version controlled.

ClearML 101

ClearML is an open source MLOps project that combines experiment tracking, dataset management, remote code execution and job pipelines written in Python.

ClearML HL architecture. Image from the ClearML github project https://raw.githubusercontent.com/allegroai/clearml-docs/main/docs/img/clearml_architecture.png

Tasks and experiment tracking

At high level, you can instrument a Python program with a few lines of code, to connect it to ClearML. An instrumented Python program is referred to as a task.

When a task executes, e.g. on your local machine, the instrumentation code automatically collects information such as command line arguments the git diff and latest commit, the list of Python Packages which were available to the interpreter, and even ML-tool specific state such as PyTorch Metrics.

The tracked metadata is then sent to a ClearML server and stored there, accessible via UI and API.

You can also report data explicitly from your code during the execution of a task. This is useful for e.g. tracking metrics during training processes.

Remote execution

When a task is tracked by ClearML, the server stores all the information needed to reproduce it — including running it on a remote machine.

To execute a task on a remote machine, you “clone” it (via the UI or an API call), and place it in a queue.

An agent running on a remote machine polls the queue for new tasks, and once it dequeues one, it execute it as a (local) Python process.
A remote machine running an agent is called a worker.

Pipelines

A DAG of tasks is called a Pipeline.
The flow of the pipeline is composed of a controller task — another Python function that triggers the task execution, and passes parameters and information between them.

The controller would typically execute tasks by sending them to a queue, where they will be picked up by workers.

Datasets

A dataset is a special kind of task, in which the user reports “data” instead of e.g. metrics like in a normal experiment.

The data can be anything, but is typically it would be files stored on some file system such as a mounted disk, NFS or object storage.

Datasets can be versioned in a manner not dissimilar to Git, where each time you commit a version it stores only the diff from the previous one(s).

The metadata about the dataset is stored in the ClearML server, while the actual data (e.g. files contained in the dataset) can be stored in a storage device of your choice, e.g. S3 or an NFS server, as long as it’s available to the worker machines that need to download and use it.

Why ClearML

Functional suitability

ClearML supports all the main functional requirements:

  • Defining and running a pipeline of media-processing in Python
  • Running the pipeline in a remote / distributed execution on CPU and GPU.
  • Storing large number of binary or semi structured files long term, managing and curating them into datasets.
  • Allowing by downstream processing steps and training processes to consume the datasets easily.

ClearML can perform all of these tasks.

However, if you’ve been paying attention, you would notice that ClearML doesn’t offer a query language that can run on the data stored inside datasets, which was a part of our requirements.

However, as we shall see, we had a solution for this limitation that managed to get the job done.

Pros

While there are many tools with which one can implement this functionality, the Hour One AI team had already adopted ClearML for experiment tracking.

At the same time, the team had much less experience in running relational data warehouses or cloud infrastructure, so the tool’s Pythonic and familiar interface played in its favor.

As a more general advantage, the tool is open source, and has a vibrant community.

Finally, we knew that ClearML is extremely flexible, and once you get the hang of the tasks and remote execution mechanism, you can build very rick and complex workflows — so we knew we could get it to work.

Cons

The tool’s automagical nature comes with a price — it takes time to understand what is happening when things don’t work as expected.

Debugging ClearML code that doesn’t perform as expected requires opening the tool’s code, debugging through it, asking questions on slack, and often having a working knowledge of distributed computing, cloud API’s, Python dependency management, docker internals and more.

Documentation can be patchy around the edges, to say the least.

Finally, flexibility is also a con — as ClearML is not an opinionated tool.
This means that you can usually get it to do what you want, but you need to know what you are doing for your workflows to make sense.

System design

High level

  • The workflow is implemented as a ClearML pipeline (specifically — using the PipelineDecorator).
  • Each task in the pipeline takes a dataset ID as input, and generates one or more datasets as outputs.
  • Metadata about the produced data including lineage is stored long term in datasets. The data itself resides on S3 in a number of different formats.
  • Scaling the pipeline is achieved using ClearML Queues and Autoscaler
  • Most other requirements (caching, sub-DAG execution, running locally and remotely with the same codebase) are achieved using careful separation of concerns, as well as by using low-level ClearML API.

Logical flow

Follow the diagram from left to right:

  • The pipeline is triggered with a parameter that points it to a file containing links to raw videos.
    The file is added to a dataset representing “all raw data”.
  • The first task is to split the raw videos based on metadata that exists in the file into shorter sections (“segments”).
    The results are split videos and metadata files, each stored as a ClearML datasets.
  • The next step is a basic pre-processing of the video and audio data from the split videos.
    Each gets stored into ClearML Datasets.
  • Further enrichment and cleaning the audio and visual signals — an additional ~10 tasks.
Pipeline logical flow and output datasets. Image by author

Data management

  • Each run of each task generates one or more independent ClearML datasets.
  • Each dataset object contains a pointer to the task that created it (and vice versa) for lineage.
    This allows us to e.g. pull all the different datasets that were generated by a specific pipeline run.
  • Each dataset contains an index of which video segments it contains.
  • Large media files are stored in their standard format on S3, and ClearML datasets hold a reference to their location on S3 — using the external files mechanism.
  • Smaller files are cloned and stored in a ClearML format (also on S3).

Metadata schema and query processing

As discussed above, we wanted to allow researchers to evolve the schema easily without needing to learn about relational databases and other external tools.

In addition, a lot of the metadata that the pipeline computes is semi structured — e.g. a bounding box or face landmarks, for each frame in the video.

The structure of the data makes it a bit challenging to query via relational query engines.

We decided to avoid adding another moving part to the solution, and keep it purely based on ClearML. Here is how we implement queries:

  1. A researcher obtains the list of dataset IDs they want to query.
    Typically these will include metadata or annotations (not media).
  2. Using a ClearML tool, the user downloads and merges these datasets into a local dataset copy.
    For example — fetch the datasets representing the bounding boxes and landmarks of faces.
  3. The researcher performs a “query” using standard tools such as numpy or Pandas code — in order to select the data she wants to train on.
    For example, iterate over the Numpy array representing the face bounding boxes and filter out only elements where the total area of the bounding box is greater than X and where all landmarks fall inside the bounding box.
    Each element in the result of this “query” will contain a pointer to the frames and the videos from which it is derived.
  4. The researcher programmatically creates a new dataset containing the filtered videos in ClearML.
  5. Later, the training code downloads the dataset from (4) into the local disk and starts training.
Query flow using ClearML datasets. Image by author

In practice, the process of building the dataset involves linear programming that satisfy constraints regarding the dataset structure.

Remote and cluster based execution

The process works as follows:

  1. A user triggers a pipeline execution — either by running the pipeline on her machine or via the UI
  2. The ClearML server receives the call
  3. The ClearML server enqueues the execution of the task representing the pipeline logic (controller) into a queue
  4. An agent running on some machine pulls this task
  5. The agent starts executing the pipeline method’s code.
  6. The controller spawns ClearML tasks for each step in the pipeline and places them in a queue
  7. Additional worker machines pull these tasks and start them locally
  8. Each Task Logic calls ClearML dataset API’s to create it’s output datasets, where the metadata gets stored on ClearML server and the actual data on S3.
ClearML pipeline with remote task execution. Image by author

Autoscaling

It makes no sense to keep tens of machines running continuously waiting for tasks to get enqueued.

ClearML offers an autoscaler that is able to spin machines up and down based on the state of the queues.

The autoscaling flow is quite involved:

  1. An “autoscaling logic” is actually a ClearML task, that gets put in a dedicated queue (e.g. “DevOps” queue).
  2. A dedicated machine (which is always up) runs an agent that listens to this queue.
  3. The agent picks up the autoscaling task which essentially runs forever
  4. The task logic involves polling the queues and using a configuration, spin up various types of machines per queue
  5. Spinning up machines is done using a cloud provider API (e.g. Boto3 on AWS).
  6. The spawned machines have a User Data launch script that sets them up with credentials and starts the ClearML Agent in a daemon mode
  7. Once the startup script is done, the agent is listening to a queue

The fine print:

  • Secret management is on you.
    ClearML expects you to enter AWS credentials and git .ssh credentials into a configuration file and save it in the ClearML server — which is a no go in terms of basic security practices.
  • Agents need access to S3, so new machines need to be able to assume the role that has the appropriate permissions.
  • The User Data script is generated in a very indirect way — from configuration to autoscaling code to AWS API calls etc.
    Any mistakes there are hard to fix/test.

We had to find alternative solutions — e.g. using an appropriate instance profile and storing secrets in a secret management solution.

Support for GPU and CPU tasks

This is achieved by having a two queues, one for CPU tasks and one for GPU tasks.

Each task (Python function) is annotated with the name of the queue it should be sent to.

Code-level design notes

The pipeline codebase is pretty straight forward. Below are pseudo-examples of the way we built the pipeline.

Lines 7–8 — The main controller logic has a PipelineDecorator.pipeline() decorator. In addition, it’s parameters (typically parsed from command line arguments) should be serializable using json or pickle.

Line 9 — Any imports should be performed inside the function (needed for running remotely).

Line 13 — we use a factory to create a “tracker” object. The tracker object is where most of the magic happens. It has two implementations — a local tracker (which is more or less a no-op), and a ClearML tracker (that actually performs calls against ClearML.

The right class is instantiated based on a command line flag.

Lines 15–19 — The flow is achieved by passing Dataset IDs between methods (tasks).

When this code runs on ClearML in a remote mode, these calls trigger the creation of remote tasks and sends them the results of the previous tasks they depend on.

Now let’s look at the anatomy of a task:

Line 1 — The task is a pure Python function with a ClearML decorator.

Lines 3–5 — The function performs imports which is needed if we want to be able to run it remotely, and then it initializes its own tracker instance.

Lines 7–9 — The tracker object is then responsible for fetching cached results if they exist, or, if none exist, download the input dataset to a local folder.

Lines 14–15 — Using the tracker, we upload the data we generated on lines 11–12 into ClearML dataset called “split_videos_media”.

Running locally

To turn on the local runs, we need to call PipelineDecorator.run_locally() prior to the pipeline method.

There are a few other running modes supported, like: run the pipeline task locally and the tasks as local processes, or as remote tasks.

Running only on sub-DAGs

This is also handled by the tracker object, that is able to traverse the DAG and automatically skip all non-needed tasks.

Lineage tracking

The tracker object marks all tasks with the pipeline run ID’s, and marks every task with the list of datasets it created — which gets stored as artifacts inside ClearML.

Additional features

The tracker takes care of all the naming, data collection and reporting conventions, so that task authors can focus on their business logic.
It is also capable of attaching external tasks as listeners to pipeline executions, run on a schedule and more.

Summary

Preparing large scale media data for training Deep Learning models requires running multiple processing and enrichment steps on the raw data.

It also requires storing processed data in a well structured manner that supports versioning and enables researchers to query it in order to construct datasets.

ClearML can be used to achieve all the above.

It shines in its purely Pythonic interface, its intuitive dataset paradigm, and its support for many non-functional requirements as autoscaling, though these come at a price.

While ClearML doesn’t offer a data querying mechanism, it’s still possible to organize the data so that pulling the it and performing queries locally can get the job done, especially if the query happens at well defined points in the data lifecycle.

--

--