The world’s leading publication for data science, AI, and ML professionals.

Anomaly Detection using Sigma Rules (Part 1): Leveraging Spark SQL Streaming

Sigma rules are used to detect anomalies in cyber security logs. We use Spark structured streaming to evaluate Sigma rules at scale.

Photo by Tom Carnegie on Unsplash, Supreme Court of Canada
Photo by Tom Carnegie on Unsplash, Supreme Court of Canada

The Rise of Data Sketching

Data sketch is an umbrella term for data structures and algorithms that use theoretical mathematics, statistics and computer science to solve set cardinality, quantiles, frequency estimation, with mathematically proven error bounds.

Data sketches are orders-of magnitude faster than traditional approaches, they require less compute resources and sometimes are the only viable solution to big data problems. To find out more about data sketches checkout the Apache Data Sketch project:

Sketches implement algorithms that can extract information from a stream of data in a single pass, which is also known as "one-touch" processing.

Spark leverages data sketches a lot, for example: dimension reduction, locality sensitive hashing, count min sketch.

In this series of articles we will walk you through the design of a highly performant fraud detection type system. Using real examples, we evaluate and contrast the performance of a traditional algorithm versus one based on data sketching.

What are Sigma Rules

Sigma is a generic signature format that allows you to make detections in log events. Rules are easy to write and applicable to any type of log. Best of all, Sigma rules are abstract and not tied to any particular SIEM, making Sigma rules shareable.

Once a cyber security researcher or analyst develops detection method, they can use Sigma to describe and share their technique with others. Here’s a quote from Sigma HQ:

Sigma is for log files what Snort is for network traffic and YARA is for files.

Let’s look at an example from Sigma HQ.

The heart of the rule is the detection section. When the condition evaluates to true it means we made a detection. A condition is composed of named expressions. For example, here a selection and a filter expression are declared. These expressions perform tests against attributes of the logs. In this case web logs.

Sigmac Generates SQL

The sigmac compiler is used to translate an abstract Sigma rule into a concrete form which can be evaluated by an actual SIEM or processing platform. Sigmac has many backends capable of translating a rule into QRadar, ElasticSearch, ArcSight, FortiSIEM and generic SQL.

Using the SQL sigmac backend we can translate the rule above into:

SELECT 
    * 
FROM
    (
    SELECT
    (
        (cs-uri-query LIKE '%cmd=read%'
        OR cs-uri-query LIKE '%connect&target%'
        OR cs-uri-query LIKE '%cmd=connect%'
        OR cs-uri-query LIKE '%cmd=disconnect%'
        OR cs-uri-query LIKE '%cmd=forward%')
        AND (cs-referer IS NULL
        AND cs-USER-agent IS NULL
        AND cs-METHOD LIKE 'POST')) 
 AS web_webshell_regeorg,
       *
   FROM
    test_webserver_logs
   )
WHERE 
     web_webshell_regeorg = TRUE

These SQL statements are typically invoked by a scheduler on particular trigger interval; say 1 hour. Every hour the detection searches through the newest events.

However, some Sigma rules apply temporal aggregations. For example Enumeration via the Global Catalog counts occurrence of events in a window of time.

detection:
    selection:
        EventID: 5156
        DestPort:
        - 3268
        - 3269
    timeframe: 1h
    condition: selection | count() by SourceAddress > 2000

Using the batch model above, these types of queries reprocess the same events over and over. Especially if the correlation window is large. Furthermore, if we try to reduce the detection latency by increasing the trigger rate say to every 5 minutes, we introduce even more reprocessing of the same events.

Ideally to reduce reprocessing the same events over and over we would like the Anomaly Detection to remember what was the last event it processed and what were the value of the counters so far. And that is exactly what Spark Structured Streaming framework provides. A streaming query triggers a micro-batch every minute (configurable). It reads in the new events, updates all counters and persists them (for disaster recovery).

In this model, each event is evaluated once. Increasing the trigger rate does not incur the same cost as the stateless batch model. Also because events are evaluated only once, complex detections such as regex matching do not incur ballooning costs.

Use Spark Streaming to run Detections

Spark Structured streaming can easily evaluate the SQL produced by the sigmac compiler. First we create a streaming dataframe by connecting to our favorite queuing mechanism (EventHubs, Kafka). In this example, we will readStream from an Iceberg table, where events are incrementally inserted into. Find out more about Iceberg’s streaming capabilities here.

# current time in milliseconds
ts = int(time.time() * 1000)
# create a streaming dataframe for an iceberg table
streamingDf = (
    Spark.readStream
    .format("iceberg")
    .option("stream-from-timestamp", ts)
    .option("streaming-skip-delete-snapshots", True)
    .load("icebergcatalog.dev.events_table")
)

# alias the dataframe to a table named "events"
streamingDf.createOrReplaceTempView("events")

Notice that we aliased the streaming dataframe with the view name "events". We do this in order to refer to this streaming dataframe in our SQL statement, i.e.: select * from events. All we have to do now is configure the sigmac compiler to produce SQL statements against an events table. For example a generated sql file might look like this:

SELECT
    (cs-uri-query LIKE '%cmd=read%'
    OR cs-uri-query LIKE '%connect&target%'
    OR cs-uri-query LIKE '%cmd=connect%'
    OR cs-uri-query LIKE '%cmd=disconnect%'
    OR cs-uri-query LIKE '%cmd=forward%')
    AND (cs-referer IS NULL
    AND cs-USER-agent IS NULL
    AND cs-METHOD LIKE 'POST')) 
    AS web_webshell_regeorg,
    -- another detection here
    cs-uri-query LIKE '%something%'
    AS detection2
    *
FROM
    events

In our analytic we load this generated SQL and ask Spark to create a hitsDf dataframe from it.

# load auto-generated SQL statement
with open('./generated_sql_statement.sql', 'r') as f:
    detections_sql = f.read()

hitsDf = spark.sql(detections_sql)

We start the streaming query by invoking writeStream and we configure the query to trigger a micro-batch every minute. This streaming query will run indefinitely writing detections to the sink of our choice. Here we simply write to the console sink but we could write to another Iceberg table. Or we could use forEachBatch and execute some arbitrary python code, for example to push notifications into a REST endpoint. Or we can even do both.

# start a streaming query printing results to the console
query = (
    hitsDf.writeStream
    .outputMode("append")
    .format("console")
    .trigger(processingTime="1 minute")
    .start()
)

The Parent Process Challenge

So far we have seen how to detect anomalies in discrete events. However, Sigma rules can correlate an event with previous ones. A classic example of this is found in Windows Security Logs (Event ID 4688). In this log source we find information about a process being created. A crucial piece of information in this log is the process that started this process. You can use these Process ID to determine what the program did while it ran etc.

Let’s use this Sigma rule as an example: Rundll32 Execution Without DLL File.

detection:
    selection:
        Image|endswith: 'rundll32.exe'
    filter_empty:
        CommandLine: null
    filter:
        - CommandLine|contains: '.dll'
        - CommandLine: ''
    filter_iexplorer:
        ParentImage|endswith: ':Program FilesInternet Exploreriexplore.exe'
        CommandLine|contains: '.cpl'
    filter_msiexec_syswow64:
        ParentImage|endswith: ':WindowsSysWOW64msiexec.exe'
        ParentCommandLine|startswith: 'C:Windowssyswow64MsiExec.exe -Embedding'
    filter_msiexec_system32:
        ParentImage|endswith: ':WindowsSystem32msiexec.exe'
        ParentCommandLine|startswith: 'C:Windowssystem32MsiExec.exe -Embedding'
    filter_splunk_ufw:
        ParentImage|endswith: ':WindowsSystem32cmd.exe'
        ParentCommandLine|contains: ' C:Program FilesSplunkUniversalForwarder'
    filter_localserver_fp:
        CommandLine|contains: ' -localserver '
    condition: selection and not 1 of filter*

In the raw telemetry an event only knows the parent Process ID . Yet the rule refers to the ParentImage and the ParentCommandLine. The rule basically assumes a join has already been performed.

Luckily Spark Structured Streaming supports stream-stream joins. In order to retrieve the ParentImage and ParentCommanLine, we perform a self-join of the process logs. We will join the current side with the parent_of_interest side. With a join condition like this:

current.ParentProcessID = parent_of_interest.ProcessID

Left side: Set flags for every detection rule

We use a convention c for current process and r1 for rule 1.

So filter_empty in Rundll32 Execution Without DLL File (rule 1) is named cr1_filter_empty


%%sparksql --view current --output skip
select
    *,
    ID,
    CommandLine,
    ImagePath,
    -- rule 1
    ImagePath ilike '%\rundll32.exe' as cr1_selection,
    Commandline is null as cr1_filter_empty,
    Commandline ilike '%.dll%' OR Commandline = '' as cr1_filter,
    Commandline ilike '% -localserver %' as cr1_filter_localserver_fp
from
    events

Right side: Filter messages on the parents_of_interest

We do the same with conditions that apply to parent process. However, in this case we also filter the table. This means parent process that are filtered out necessarily had all their flags set to FALSE. By filtering we greatly reduce the amount of parents_of_interest keys to cache when performing a streaming join.

%%sparksql --output skip --view parents_of_interest

select
    *
from (
    select
        host_id as parent_host_id,
        ID as parent_id,
        ImagePath as parent_imagepath,
        CommandLine as parent_commandline,

        -- rule 1
        (ImagePath ilike '%:Program FilesInternet Exploreriexplore.exe'
            AND CommandLine ilike '%.cpl%')
        as pr1_filter_iexplorer,
        (ImagePath ilike '%:WindowsSysWOW64msiexec.exe'
            AND CommandLine ilike 'C:Windowssyswow64MsiExec.exe -Embedding%')
        as pr1_filter_msiexec_syswow64,
        (ImagePath ilike '%:WindowsSystem32msiexec.exe' AND
            CommandLine ilike 'C:Windowssystem32MsiExec.exe -Embedding%')
        as pr1_filter_msiexec_system32
    from
        events
)
where
    pr1_filter_iexplorer
    OR pr1_filter_msiexec_syswow64
    OR pr1_filter_msiexec_system32

Join current with its parent

We are doing a left join with the parent side. Since the parent side is filtered it is possible we will not find the corresponding parent process id. When a parent is not found, the columns will have flags set to NULL. We use coalesce to assign these parent side flags a value of FALSE. pr3_selection_atexec is a parent flag, so we apply coalesce like this:

coalesce(pr3_selection_atexec, FALSE)

We also combine conditions that come from current and parent side. For example the selection_atexec condition is composed of a parent and child condition.

selection_atexec:
        ParentCommandLine|contains:
            - 'svchost.exe -k netsvcs' 
            - 'taskeng.exe'
        CommandLine|contains|all:
            - 'cmd.exe'
            - '/C'
            - 'WindowsTemp'
            - '&1'

We thus combine them like this:

cr3_selection_atexec AND coalesce(pr3_selection_atexec, FALSE) 
as r3_selection_atexec,

r3_selection_atexec is the final flag for selection_atexec in rule 3.

%%sparksql --view joined --output skip

select
    --rule1
    cr1_selection as r1_selection,
    cr1_filter_empty as r1_filter_empty,
    cr1_filter as r1_filter,
    cr1_filter_localserver_fp as r1_filter_localserver_fp,
    coalesce(pr1_filter_iexplorer, FALSE) as r1_filter_iexplorer,
    coalesce(pr1_filter_msiexec_syswow64, FALSE) as r1_filter_msiexec_syswow64,
    coalesce(pr1_filter_msiexec_system32, FALSE) as r1_filter_msiexec_system32,
    parent_host_id,
    parent_id,
    parent_imagepath,
    parent_commandline
from
    current as c
    left join parents_of_interest as p
    on c.ParentProcessID = p.parent_id

Finally we apply the Sigma rule condition

For example rule1’s condition is:

condition: selection and not 1 of filter*

We simply apply this condition and name the result rule1.

r1_selection AND NOT (r1_filter_empty 
                              OR r1_filter
                              OR r1_filter_localserver_fp
                              OR r1_filter_iexplorer
                              OR r1_filter_msiexec_syswow64
                              OR r1_filter_msiexec_system32)
        as rule1,

Here’s the condition in the complete statement.

%%sparksql --output json
select
    *
from (
    select
        *,

        -- rule 1 -> condition: selection and not 1 of filter*
        r1_selection AND NOT (r1_filter_empty 
                              OR r1_filter
                              OR r1_filter_localserver_fp
                              OR r1_filter_iexplorer
                              OR r1_filter_msiexec_syswow64
                              OR r1_filter_msiexec_system32)
        as rule1,
    from
        joined
    )
where
    rule1 = TRUE

Note, the sigmac compiler does not produce this type of SQL. However, we plan on writing a custom sigma compiler to produce the above SQL statements.

Running these SQL statements is no different then our original example.

Spark Structured Streaming can keep and persist state across micro-batches. In the documentation Spark refers to this as Windowed Grouped Aggregation. The same principal applies to stream-stream join. You configure Spark to cache aggregations or in this case the rows of the parents_of_interests inside a window.

However, how well does this scale? How many rows of parents_of_interests can we keep in Spark’s state store window?

In our next article we will answer these questions. In order not to miss it, follow us and subscribe to get these stories via email. Stay tuned!

All images unless otherwise noted are by the author


Related Articles