Anomaly Detection using Sigma Rules (Part 3) Temporal Correlation Using Bloom Filters

Can a custom tailor made stateful mapping function based on bloom filters outperform the generic Spark stream-stream join?

Jean-Claude Cote
Towards Data Science

--

Photo by Kalpaj on Unsplash, Peggys Cove, NS, Canada

This is the 3rd article of our series. Refer to part 1 and part 2 for some context.

Spark’s flatMapGroupsWithState function allows users to apply custom code on grouped data and provides support to persist user defined states.

In this article, we will implement a stateful function that retrieves the tags (features) of a parent process. The crux of the the solution is to create a composite key made of the process ID (e_key in the diagram below) and the tag (feature) name we want to remember. For example, on row 1, we make a bloom key 300.pr1_filter_msiexec_syswow64. We store this key in the bloom filter. Note that because of the nature of bloom filters, the key is not actually stored in the bloom. Instead, the key is hashed and this hash is used to turn on some bits in the bloom’s bit array. Thus, if the tag is false, we don’t store it in the bloom. Only flags that are true are stored.

In the second row, we show how we create a retrieval bloom key using the parent’s process ID (pe_key rather than e_key). Thus, the lookup key is 300.pr1_filter_msiexec_syswow64 . If querying the bloom for this key returns false, we know for sure it was never stored in the bloom. However, if the query returns true, we know the key was stored in the bloom (with some chance of false positive).

We use the result returned by the bloom to update the current row's pr1_filter_msiexec_syswow64. This effectively performs the equivalent of the join in our previous article. Using the tags from the current row and the flags retrieved for the current row’s parent process, we can finally evaluate the complete Sigma rule condition.

Test Harness

In order to evaluate the performance of this approach, we built a rudimentary flatMapGroupsWithState prototype in Scala, and we used the Guava Bloom filter library.

To estimate the size of bloom filters, we use this handy online calculator. The larger the capacity (number of tags it can store), the larger the bloom filter needs to be. For example, a bloom with a capacity of 200,000 results has a size of about 234KiB, with a false positive rate of 1%.

We used one bloom per host. Continuing with our simulation of 50,000 hosts, this gives us about 12GiB of state.

We initially performed all our experiments using the RocksDB state store since it performed so well when we were doing stream-stream joins. However, as often happens, we made a serendipitous discovery. The older HDFS state store actually performed better than RocksDB in this use case. Here’s a brief summary of the results:

Execution time using RocksDB keeps increasing while the HDFS is stable at about 200 seconds per micro-batch.

The reason why RocksDB’s performance is less then stellar is due to it’s reorganization of local state files. At every trigger, we load and modify every bloom which are 200KiB each. RocksDB then needs to compact and reorganize it’s SST files. RocksDB seems more suited for large amount of smaller values. HDFS, on the other hand, seems better at handling bigger values but not so good when there are a very large amount of them.

Comparing Results With Stream-Stream Join

Let’s now compare these results with the results from the stream-stream join. In the stream-stream join, we used a rate of 2,500 events per seconds compared to a rate of 5,000 with bloom.

Also, in the stream-stream join, we could keep an average 3,200 “parents of interests”. That is the tags associated with 3,200 events. If we suppose that each event only has one true tag this yields 3,200 tags per host.

Using a bloom, we can keep about 100,000 tags per host. This is about 30x more!

Using a custom flatMapGroupsWithState, we read the input stream (for example from Kafka or from an Iceberg table) only once. When using a join, we have to read the stream twice, once per side of the join.

A Custom flatMapGroupsWithState is More Flexible

Furthermore, the bloom approach lends itself to more interesting use cases. For example, say you want to remember the tags of not only your parent, but also of your parent’s parent. This would be rather difficult to do using stream-stream join. How many times do you join?

With a simple tweak, we can support propagating ancestor tags. In the illustration above, at row 2, we pulled bloom key 300.pr1_filter_msiexec_syswow64 and the value in the column pr1_filter_msiexec_syswow64. To support ancestors, we put that tag back into the bloom, but using our e_key value from row 2. Thus, we store the bloom key 600.pr1_filter_msiexec_syswow64 .

Now, suppose we get a child process 900 (a grand-child of process 300). As before, we pull using our parent key pe_key (600), forming the bloom key 600.pr1_filter_msiexec_syswow64 and thus effectively pulling our grand-parent’s tag.

Beyond Parent-Child Relationships

So far we have concentrated on parent-child relationships. However, by storing and retrieving tags, we can also support temporal proximity correlation. Temporal proximity correlation is an new feature of the upcoming Sigma specification. The example given in the specification is as follows:

action: correlation
type: temporal
rules:
- recon_cmd_a
- recon_cmd_b
- recon_cmd_c
group-by:
- ComputerName
- User
timespan: 5m
ordered: false

This rule states that 3 events (recon_cmd_a/b/c) must occur inside a window of time. These events don’t have to be in any specific order.

As we did before, we can represent these events as tags and use a bloom per host (per ComputerName). Rather then using the columns e_key/pe_key (parent/chil), we use the column User. The column User gives context to the composite keys that we store and retrieve from the bloom filter.

On the first row, we see recon_cmd_b is true so we store it in the bloom using User as the context, thus we store it under Alice.recon_cmd_b.

On the second row, we see recon_cmd_a is true so we store it under Alice.recon_cmd_a. At every row we always attempt to pull keys: Alice.recon_cmd_a, Alice.recon_cmd_b and Alice.recond_cmd_c. Thus recom_cmd_b is updated to true.

For the third row, we push Alice.recond_cmd_c and pull the other two.

The Sigma specification supports an ordered version of this correlation. Supporting ordered temporal proximity is also possible. To support ordered events, we conditionally pull the tags instead of always pulling.

If we suppose the rule above was ordered, we would change our procedure from always storing and pulling all keys to conditionally storing recon_cmd_b only when recon_cmd_a has already been seen and never storing recon_cmd_c .

In our next article, we will improve on our proof of concept. We will make it more flexible and able to support the additional use cases described above.

All images unless otherwise noted are by the author

--

--

Data democratization advocate and principal engineer at Canadian Centre for Cyber Security | jean-claude.cote@cyber.gc.ca | Twitter @cybercentre_ca