Real Time Anomaly Detection with AWS

Seleme Topuz
Towards Data Science
7 min readMay 27, 2019

--

The more recent data is, the more actionable therefore valuable it is. That is the underlying logic of the real time data analytics.

There might be some users performing suspicious activity, or abusing your discounts. On the other hand, you may as well just observe client behavior and take action in the act. To achieve this goal, I will go through how to create a real-time data analytics app using AWS tools.

Picture taken from AWS Docs

Imagine the last thousand data points you want to observe in real time is drawn on 2D graph like above. Our app will be able to capture those red points which imply an outlier, an anomaly. On top of that, we also build an alerting system that is going to be triggered whenever a red point is present.

Pipeline

Input

The input of our app actually can be any streamed data. However, we need it to be actionable data in order for our anomaly detection app to make sense. Method logs in the backend, click/touch events from mobile applications or database change events will do fine due to their nature giving us the ability to observe client actions.

Stream

How do we ingest and stream the data we have into our app ? AWS Kinesis is a very handy tool which is easy to use, easy to configure and self-managed. Frankly, Kinesis consists of 3 different services; for streaming, we will be using Kinesis Data Streams.

In order to create a Kinesis Data Stream, you need to give it a name, and select shard count for it. Shards are processing units of Kinesis streams, as of the time I am writing this story, a shard can ingest up to 1MB/sec and 1000 records/sec, and emit up to 2MB/sec. For most basic applications, 1 shard is enough to process your input data.

Ingestion

Now that we have created the stream, we can ingest the live data. There are official libraries to use AWS tools, for example:

  • boto for Python 2
  • boto3 for Python 3
  • aws-sdk for node.js

We only need the name of the stream that we want to put the records, and permission to write to that stream.

Data Analytics App

To process the data and capture anomalies from our live data, we will use Kinesis’ another service called Kinesis Data Analytics. This is a very flexible tool which gives us the ability to preprocess the data before being processed by the app, then process the data with enriched SQL engine, and postprocess the data captured by the app.

  • Preprocessor is a AWS Lambda function that is triggered by Kinesis Data Streams, which may enrich the data or clean the data. If we want to use some other data in the app, we may enrich our input with that data. Also, we might want to clean the input by doing type casting or remove unused fields.
  • Anomaly Detection is the name of our Kinesis Data Analytics app. It is written with SQL which is enriched with special analytics functions. I will go further into this in the next part.
  • Postprocessor is a AWS Lambda function that is triggered by Kinesis Data Analytics for each of its results. You may do whatever you want to do with the result. You might attach it to an alerting system, or you might call an endpoint, or you might want to train a model with that data. The flexibility is there.

Alert

AWS serves this tool called Simple Notification Services which you use to create alerting systems. In our case, we send the results from anomaly detection app to an SNS topic. Then every user that subscribed to that topic will be notified with a service of our selection, let’s say, an email.

As a result, our pipeline is like below. You might disconnect Lambda functions, or SNS, and replace with another service you want. This approach offers flexibility while it keeps self-management and durability thanks to AWS tools.

Data Analytics App

Creating a Data Analytics app in Kinesis is fairly easy:

  • We select the app engine, either SQL or Apache Flink. We will use SQL for our case.
  • We select a data source, either Kinesis stream or Kinesis Firehose delivery stream. We will use Kinesis stream for our case.
  • A lambda function can be created to preprocess the data beforehand.
  • Since we are building our app with SQL, our data needs to be strictly typed meaning that a schema is needed. Kinesis Data Analytics have a feature to infer the schema automatically, then you can edit the schema for your need.
An example schema enforced data analytics app input format
  • We can connect another data source called reference data to use in application. We will pass over it for now.
  • We can select a destination for the results among a Kinesis stream, a Kinesis Firehose delivery stream or a Lambda function. In our case, a postprocessor Lambda function to send the data to SNS to create alerts is selected.
  • As for the code of the app, one needs to read the AWS Kinesis data analytics SQL reference documentation. The SQL for data analytics app is enriched with analytical purpose functions and techniques such as streams, pumps and windows. I will slightly talk about them in the next chapters.

Overall Logic

  • There are streams like tables with data having TTL (time-to-live).
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" 
(
"field1" DOUBLE,
"field2" INTEGER,
"ANOMALY_SCORE" DOUBLE
);
  • And there are pumps that enters real time data into streams.
  • You can access your input stream in the nameSOURCE_SQL_STREAM_001 .
  • The output of the code should be a stream. You need to pump the results into that stream.
CREATE OR REPLACE PUMP "STREAM_PUMP" 
AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT "field1", "field2", ANOMALY_SCORE
FROM TABLE (RANDOM_CUT_FOREST(CURSOR(
SELECT STREAM "field1", "field2"
FROM "SOURCE_SQL_STREAM_001"
WHERE "field1" > 0 AND "field2" > 0
ORDER BY STEP("TEMP_STREAM".ROWTIME BY INTERVAL '10' SECOND);
)));

The code pieces above actually constructs a whole real time data analytics app code. We do read field1 and field2 from the stream in windows, which I will talk about after this, and apply them to RANDOM_CUT_FOREST built-in SQL function that uses machine learning in the background. The model is trained incrementally and can say whether a data point is an anomaly or not after some data points are entered for the model having enough data to train. The results of the random cut forest functions are pumped into destination stream.

Windows

Windows provide us to perform aggregation functions on our data. For example, to perform COUNT on real time data, we need to have boundaries for our data.

There are different techniques for windows, but I will only go through most used windows: tumbling and sliding.

  • Tumbling windows separates the data in batches. Boundaries for these batches are commonly selected on time basis and rarely data count basis. The underlying reason we may want to use tumbling windows is that we do not use same data point twice.
GROUP BY field1,
STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND);
  • Sliding windows is more useful when doing time basis aggregations. There is a fixed sized window that moves to the right incrementally. You may want to use it to observe a peak point.
WINDOW W1 AS (    
PARTITION BY field1
RANGE INTERVAL '1' MINUTE PRECEDING);

You can also build custom windows or use staggering windows where a window is created whenever a new data point arrives.

Advanced Usage

Kinesis Data Analytics SQL engine gives you the flexibility to create more complex logic to handle harder situations.

  • What if we only want to send the records with anomaly scores bigger than a specific value ? Or, what if that score needs to be aggregated with some other reference data ? To solve these, a second stream can be defined in the app. The results of anomaly detection algorithm should be pumped into this new temporary stream instead our destination stream. Then we have another window to make new aggregations on our result data. The outcome of this new aggregations now can be pumped into our destination stream.

As you can see from the above example, there can be multiple streams with different window types.

Conclusion

I have gone through the flexible structure, powerful SQL engine with fulfilling streaming techniques, self-managed services of AWS. We have created an example real time data analytics application that detects anomalies in the data. We took advantage of AWS Lambda to pre and post process the data before and after hand data analytics app. We used AWS SNS to create an alert system that will notify us whenever an anomaly is present in real time.

For interested

Zynga tech team presents their usage of Kinesis Data Analytics and how they used it to solve the complex situations.

--

--