Contents

Event Threat Enrichment using Logstash and Minemeld

At my work we use the Elastic Stack for quite a few things, but one of the more recent-ish “official” roles is as our SIEM. Elastic introduced SIEM specific funcationality to Kibana a few releases ago, around 7.4 if I rembember correctly.

One of the features that the Elastic Stack doesn’t really support well (yet) is an enrichment system. They did introduce an elasticsearch side enrichment system in 7.5, but in my opinionn theres a few problems with it:

  • It runs inside ElasticSeach using ES pipelines, which are harder to design and operate than something Logstash side, as wellas not quite being as flexible I’d like.
  • As it runs in ES ingest nodes, it has a license cost impact
  • The processors available have some limitations, such as no support for range queries currently, which is important in the use case I’ll be writing about here.

One of the requirements I have for our SIEM is to be able to identify events coming from “known risky IPs”. Those risky IPs could be known botnets, spam sources, tor exit nodes, or anything else that you or threat intelligence providers/feeds classify as a risk.

After much research I settled on the approach I’ll detail below.

First I needed a way of collecting threat feeds in such a way that they’d be useable by the SIEM. For this I settled on using Minemeld, a product by Palo Alto networks, as they describe it “an open-source application that streamlines the aggregation, enforcement and sharing of threat intelligence”.

There are quite a few other options for this, but Minemeld seemed ideal for me because:

  • It’s a pretty focused design, where some other options are far far more than just threat feed aggregation.
  • It’s pretty simple to use and setup
  • It has support for delivering threat feed data to Logstash

Minemeld

Note This post only deals with IoCs of “IP type”. Minemeld can consume, aggregate, and distribute IoCs of other tyes, such as URLs, domains, etc, but they are not dealt with by this article.

How to install, run and configure feeds/aggregators/etc in Minemeld is an exercise left to the reader.

To enable Logstash output, you can use the built in prototype stdlib.localLogStash, if your logstash instance is running on the same system as Minemeld and is reachable over localhost:

/images/234a165e8b86498c8956617b0d01e6ca.png

However in my case, my logstash instances arent runnin on the same host, and in fact I have multiple target instances.

In a case like this, you need to create a new prototype by pressing the “New” button highlighted in the above screenshot.

Set your new “local prototype” name, and then the important part, set the logstash_host: config field:

/images/465f6543348749ac96240d6db28031b0.png

Hit save, and thats it. You now have a method of outputting IoCs from various IP threat feeds to Logstash. I’ve done this two times, with different logstash_host: set for each new local prototype. So I’m duplicating my IoCs to two different logstash and elasticsearch clusters.

Logstash - Part 1

Important Note: You will need to define an index template/mapping that ensures the start and end IP address fields created by the dissect filter below are actually of IP datatype. IP Range queries used in Part 2 will not work without this.

On the logstash side the configs for reading and storing the Minemeld provided IoCs are reasonably simple.

Part 2 will look at how I do event enrichment.

First, you need to listen on the TCP input port configured in your Minemeld local prototypes, in the case above:

input {
  tcp {
    port => 5514
  }
}

You then need some filters to parse the message field, create a predictable document ID, and split the @indicator into a start and end IP address, which will be used for ES range queries in Part 2.

filter {
  # Parse the message field as JSON into the target minemeld field
  json {
    source => "message"
    target => "minemeld"
  }
  
  # Generate a fingerprint on the @indicator field, this will be used as the Document ID in the Elasticsearch outputs.
  fingerprint {
    source => "[minemeld][@indicator]"
    target => "[@metadata][fingerprint]"
    method => "MURMUR3"
  }

  # Split the IP indicator field on the -, so we have a start and end IP address
  dissect {
    mapping => { "[minemeld][@indicator]" => "%{[minemeld][indicator][ip][start]}-%{[minemeld][indicator][ip][end]}" }
  }

}

So now you just need to store that docuement in an ES index. The slightly different part, as compared to for example storing log data in ES, is that IoCs can and will “expire”, and therefore need to be removed from the index when that happens.

Minemeld caters for this by providing a “message”, and if the value of that message is “withdraw”, the IoC can be removed. As can be seen in the Logstash output configs below, when we receive an event with [minemeld][message] == "withdraw" we issue a delete against the ES index, as indicated by action => "delete". This uses the predictable document ID we create in the earlier filter, so we know which document to delete.

Also note the index template specified in the configs, and refer to the note at the start of this section.

output {
    if [minemeld][message] == "withdraw" {
      elasticsearch {
        hosts => [
          "http://<es>"
        ]
        index => "minemeld"
        manage_template => true
        template_name => "minemeld"
        template => "/etc/logstash/minemeld.indextemplate"
        document_id => "%{[@metadata][fingerprint]}"
        action => "delete"
      }
    } else {
      elasticsearch {
        hosts => [
          "http://<es>"
        ]
        index => "minemeld"
        manage_template => true
        template_name => "minemeld"
        template => "/etc/logstash/minemeld.indextemplate"
        document_id => "%{[@metadata][fingerprint]}"
      }
    }
}

So at this point you should have some Minemeld provided IoCs stored in an Elasticsearch index, which can be used to enrich other events in real time.

Logstash - Part 2

So now onto how to enrich events. In my case, certain events come with a source.ip address, and I then do a ES lookup using the Logstash elasticsearch filter.

filter {
      elasticsearch {
          hosts => [
            "http://<es>/"
          ]
        index => "minemeld"
        enable_sort => "false"
        tag_on_failure => [ "_threat_lookup_failure" ]
        add_tag => ["_threat_found"]
        query_template => "/etc/logstash/conf.d/threat_query.json"
        fields => {
          "[minemeld][sources]" => "[custom][threat][sources]"
        }
      }
}

The query, specified in query_template => "/etc/logstash/conf.d/threat_query.json" is as follows:

{
  "query": {
    "bool": {
      "filter": [
        { "range": { "minemeld.indicator.ip.start": { "lte": "%{[source][ip]}" }}},
        { "range": { "minemeld.indicator.ip.end": { "gte": "%{[source][ip]}" }}}
      ]
    }
  }
}

Note This is where you need to ensure you’ve created these fields as IP datatypes, as mentioned previously.

All this does is excute the above query against the Minemeld ES index, and looks for a match where the Minemeld minemeld.indicator.ip.start is less than or equal to, and the minemeld.indicator.ip.end is greater than or equal to the source IP in the event that we’re attempting to enrich.

Basically: “if the source IP is between the start and end range provided by Minemeld”.

The actual enrichment is performed by the fields parameter of the elasticsearch filter. In the example above, it sets a field custom.threat.sources to the value of minemeld.sources from the document in the Minemeld index, which is a list of source “names” provided by minemeld.

Conclusion

That’s it. We now have an indication if a source IP in an event is considered a threat by various threat feeds, which is obviously very useful for informing any decisions around responses.

Some closing thoughts.

Performance. This approach is almost definitely not scalable to extremely high throughput levels, due to the overhead of network connections between logstash and elasticsearch, and the impact of querying elasticsearch for every event. It would be signficantly better tyo take advantage of Logstashs' translate filter or its memcache filter.

However, both the dictionary and memcache filter suffer from the same limitation: theres no (easy?) way of doing range type queries.

As I mentioned earlier, Elasticsearchs new enrichment pipeline features also doesnt support range queries, at this time.

Since the Minemeld threat feeds provide IP ranges, the only options are:

  • Use a queryable source that supports range queries on IP addresses (or their integer representation thereof)
  • Expand the Minemeld provided IP ranges into lists of singlular IPs.

I actually experimented with option two, expanding the Minemeld ranges into complete lists of single /32 IPs. Depending on the number of the threat feeds being consumed and their size, you will end up with millions and millions of IP addresses in your new list, which may not be usable in logstash dictionaries or memcache. YMMV.

At the time I decided to continue using the Elasticsearch IP range query approach for now, as the number of events I’m enriching is low enough to not impact our performance or availability. However in the future I’m going to want to apply this kind of enrichment against a much larger amount of events, so I’m going to have to revisit the approach I think.