Making Sense of Big Data

Scalable and Reliable Kubernetes Logging

Building a scalable and reliable logging solution for large Kubernetes cluster with scalable tools and fast object storage.

Yifeng Jiang
Towards Data Science
7 min readJun 21, 2021

--

Building a basic logging solution for Kubernetes could be as easy as running a couple of commands. However, to support large-scale Kubernetes clusters, the logging solution itself needs to be scalable and reliable.

In my previous blog, I described an overview of my Kubernetes monitoring and logging solution. At that time, I used a basic setup for logging: logs collected by Fluentd on the Kubernetes node are directly sent to a Elasticsearch and Kibana cluster for search and visualization, with both Fluentd and Elasticsearch running on the Kubernetes cluster itself. This is an easy setup that works for small clusters. Once we move to large production clusters, it will have challenges such as: 1) Fluentd may drop logs (data lose!) if Elasticsearch is down or cannot catch up indexing the incoming logs. 2) Log input and output are tightly coupled therefore difficult to manage. 3 ) Logs are only stored in Elasticsearch therefore difficult to extend to other tools, such as Apache Spark for general log processing and analytics.

In this blog, I will describe how I addressed these challenges by building a scalable and reliable Kubernetes logging solution with scalable tools such as Fluentd, Kafka, Elasticsearch, Spark and Trino. I will also highlight the role a fast object storage like FlashBlade S3 plays in this solution. The final architecture looks like the below.

Scalable Kubernetes logging

Apache Kafka as pipeline buffer and broker

The first thing we need is to de-couple log inputs (Fluentd) and outputs (Elasticsearch). This adds flexibility and scalability to the logging pipeline. Apacke Kafka is the most popular solution for this. This setup requires a running Kafka cluster. Please refer to my blog here on how to set up an open source Kafka cluster on Kubernetes. Another option is to use Confluent Kafka for Kubernetes. One nice feature about Confluent Kafka is its support for tiered storage, which allows Kafka to offload cold data to remote object storage like FlashBlade S3. More on this later. For now, let’s focus on Fluentd and Kafka integration.

The easiest way to get started is to use the Kafka2 plugin from the Fluentd Kubernetes Daemonset repo, which includes pre-built Docker images and Kubernetes spec examples. The Fluentd-Kafka2 Docker image supports basic configuration using environment variables. For example, Kafka brokers and topics can be set in Kubernetes spec file like this:

containers:
- name: fluentd
image: fluent/fluentd-kubernetes-daemonset:v1.12-debian-kafka2-1
env:
- name: FLUENT_KAFKA2_BROKERS
value: "kafka-headless.kafka:29092"
- name: FLUENT_KAFKA2_DEFAULT_TOPIC
value: "k8s-logs"

Since I need more controls on Kafka producer acknowledgement, timestamp format and log source separation, I had extended the config file using Kubernetes ConfigMap. After these, logs are delivered into multiple Kafka topics based on their source. Additional metadata, such as ingestion timestamp and source tag, are also attached by Fluentd. These will be helpful later when we process the logs with Elasticsearch and Apache Spark.

Shipping logs from Kafka to multiple outputs

To ship the logs from Kafka to its final outputs, in this case Elasticsearch and S3 object storage, I use Kafka Connect Elasticsearch sink and S3 sink. Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other data systems. Each connector runs as its own Kafka consumer groups in distributed mode, meaning scalability and reliability are built in. Because log input and output are de-coupled by Kafka, not it is easier to extend the system. For example, there are over 100 connectors in the ecosystem that ships data from Kafka to different outputs.

By running the connector pods as Kubernetes Deployment, we can easily scale the logging pipeline to match the growth of the Kubernetes cluster. The below is an example of Fluentd and Kafka Connect running in a Kubernetes cluster.

Fluentd and Kafka Connect running in Kubernetes

Confluent Kafka Tiered Storage with S3

If using Confluent Kafka, its tiered storage feature, especially together with FlashBlade S3, makes Kafka even more scalable, reliable and easy to operate in a Kubernetes environment. The basic idea is to offload cold data from Kafka brokers to remote object storages, so that Kafka only need to manage minimum local data (hot data) in the brokers. This makes Kafka broker pods very lightweight, therefore easier to scale and operate. Particularly, data re-balancing could be several folds faster. This also reduces storage cost because Kafka no need to maintain multiple replicas for data in S3.

Here is an exmaple of setting up FlashBlade S3 for Confluent Kafka tiered storage using the Confluent for Kubernetes operator:

Confluent Kafka tiered storage with FlashBlade S3 configuration

Once deployed, we can confirm FlashBlade S3 has been configured as tiered storage backend on Confluent UI.

Confluent Kafka tiered storage with FlashBlade S3

Kafka clients still access all the data through Kafka. If a request hits cold data, Kafka will download it from the remote object storage, cache it and serve to the client. FlashBlade S3 is a supported tiered storage target. Unlike Amazon S3 and other object storages, FlashBlade S3 is designed to be very fast, so even the data is stored remotely, serving cold data could be close to as fast as hot data.

Log Analytics with Elasticsearch, Spark and Trino

Large Kubernetes clusters could generate millions of log entries every hour or sooner. Analyzing those logs itself is a big data problem. Elasticsearch, Apache Spark and Trino are some of the most popular scalable tools for log analytics.

I use Elasticsearch for streaming log analytics, searching and dashboarding. Using the Elastic Cloud on Kubernetes, deploying Elasticsearch in Kubernetes can be as easy as a couple of commands.

Searching Kubernetes logs in Elasticsearch

Elasticsearch searchable snapshots with S3

Like Confluent Kafka, Elasticsearch also supports offload cold and frozen data to remote object storage. Originally, snapshots in S3 was only supported for backup purpose, with the latest 7.13 release, the snapshots becomes searchable!

Elasticsearch searchable snapshots with FlashBlade S3

By keeping minimum data locally, searchable snapshots makes it easier to operate and scale Elasticsearch, reduce storage cost, while increaing search speed for cold and frozen data with FlashBlade S3.

The most important config for searchable snapshots is to set the shared cache storage and size in Elasticsearch frozen node. In the below example, I have set 90GB out of the 100GB FlashArray backed persistent volumn for the cache.

We can then create a searchable snapshots and store it in FlashBlade S3 repository.

POST /_snapshot/reddot-s3-repo/demo/_mount?storage=shared_cache
{
"index": "logstash-2021.05.20",
"renamed_index": "logstash-2021.05.20-mounted"
}

Searching a searchable snapshot index is the same as searching any other index. If data is not available locally, Elasticsearch will download the index from S3, cache it locally and server from there for future requests.

As you can see in the below, I have created three indcies: the original, one fully restored from a regular snapshot, and one from searchable snapshot. The index from searchable snapshot uses zero byte on disk, indicating the data was served from the S3 repository.

GET /_cat/shards/logstash-2021.05.20*/?v&h=index,shard,prirep,state,docs,store,node
Index from searchable snapshot using zero byte on disk

ETL and SQL analytics

While Elasticsearch is very good at streaming log analytics, it is not for all the log analytics needs. For example, we use Apache Spark and Trino for ETL and SQL analytics. For that, we will need to store the raw logs in S3 first. As described above, we use Kafka Connect S3 sink to ship the logs to S3 in its raw json format. We also separate/partition the data by source and ingestion time for easy processing. The logs is stored in S3 like the below.

Kubernetes logs stored in FlashBlade S3

Once logs is stored in the S3 bucket, I can explore the data in JupyterLab notebook using Spark.

Log ETL with Spark in JupyterLab

I can also use Trino to run SQL queries against the json data directly.

CREATE TABLE IF NOT EXISTS k8s_logs(
ingest_ts varchar,
log varchar,
kubernetes ROW(
container_image_id varchar,
container_name varchar,
...
),
dt varchar,
hour varchar
)
WITH (
format='json',
partitioned_by = ARRAY['dt', 'hour'],
external_location='s3a://deephub/logs/k8s-logs'
);
SQL analytics with Trino

Becase all the tools, including JupyterLab, Spark and Trino, are running in the Kubernetes cluster, I can start these analytics immediately and scale it easily. My previous blogs here and here describe how to run Spark and Trino (former PrestoSQL) with Kubernetes and S3.

Summary

In this blog, I described how I scaled my logging solution to collect, store and process large amount of logs generated in the Kubernetes cluster. While it was built for Kubernetes logging, the solution is flexible and suitable for other big data use cases as well.

To make the solution scalable and reliable, the core tools we use need to be scalable and reliable. The cloud-native trend (public cloud or on-premise Kubernetes) has encouraged traditional big data system like Kafka and Elasticsearch to move away from local data replication towards remote object storage. Because of this, a fast object storage like FlashBlade S3 plays an important role in the solution.

Scaling the logging solution has bee much easier.

--

--

Software & solutions engineer, big data and machine learning, jogger, hiker, traveler, gamer.