Contents

Using Elasticsearch Upserts to Combine Multiple Event Lines Into One

Contents

Note This approach is probably not appropriate for high volume / high throughput events. It required in my case quite a lot of Logstash parsing, and Elasticsearch doc_as_upsert use, both of which will have a significant performance penalty. For low throughput use it works fine.

Sometimes log sources split logically grouped events into separate lines, and sometimes those logically grouped event lines are mixed into the same log file with actual singular line events.

This particular case is not dealt with well by Filebeat multiline support 1. In fact it simply doesn’t work in this case.

The structure I’m talking about is this:

Line 1
Line 2
Line 3 - Some event starts
Line 4 - Content of event
Line 5 - End of event
Line 6
Line 7

Where Lines 1 and 2 are individual events, Lines 3, 4, and 5 are actually multiple lines of the same event, and Lines 6 and 7 are separate individual events again.

Since Filebeat doesn’t deal with this type of setup, at all, I had to look elsewhere to see if I could combine Lines 3, 4, and 5 into one event.

Logically the next place to look would be Logstash, as we have it in our ingestion pipeline and it has multiline capabilities. However, we use a set of Azure Event Hubs (essentially Kafka for those not familiar) as our event queueing mechanism, with a group of Logstash processes consuming the events as they arrive. There’s no grouping or ordering here, so Lines 3,4,5 may arrive:

  • out of order in time
  • across multiuple different Logstash consumers

This means it impossible to combine these 3 lines into one event, as we may never see all 3 lines at a single logstash process/instance, and therefore can’t combine them.

So they can’t be combined at source using Filebeat, and they can’t be combined during processing by using Logstashs multiline codec, which only leaves one place where all 3 lines will be in the same place: Elasticsearch itself.

The approach I settled on was using (or perhaps abusing) Elasticseach’s doc_as_upsert2 capability to incrementally add data to a single ES document.

The key is to identify something that can group the multiple lines together, and use that information as the Document ID.

In my case, we have the following:

  1. Two common “phrases” in the event line, such that I can identify all lines reliably as being part of a logical group (i.e. they need to be processed as per the next step)
  2. A set of datetime and ip/port information thats common across the event lines, that can be used to create a shared “signature” (using Logstash fingerprint filter)

The first step is to identify the common “phrases” that identify the event lines, and mark each event as part of an “upsert”. I do this as follows:

if ( [message] =~ /phrase1/ ) or ( [message] =~ /phrase2/ ) {
    mutate {
      add_tag => [ "_upserts" ]
	}
}

The next step is to parse the common data of each event into a structure that allows the use of Logstash’s fingerprint filter. I extract the datetime and ip/port information, and use Logstash’s fingerprint filter to create an ECS style [event][id] field.

Fingerprint the event line:

    fingerprint {
      method => "SHA1"
      source => [
        "[tmp_date_day]",
        "[tmp_date_month]",
        "[tmp_date_daynum]",
        "[tmp_date_time]",
        "[tmp_date_year]",
        "[tmp_source_ip]",
        "[tmp_source_port]"
      ]
      target => "[event][id]"
    }

In my Elasticsearch outputs I then simply filter for that tag, and set a few paramters, as below. This uses the [event][id] as the document ID, and will perform an update if a document exists already with the same document id:

if "_upserts" in [tags] {
  elasticsearch {
    hosts => [
      "es"
    ]
    index => "<target index>"
    document_id => "%{[event][id]}"
    doc_as_upsert => true
    action => "update"
  }
}

That’s it. Abusing Elasticsearch Update API to combine multiline log events into one document. Please don’t do this if you have better options available!

References

  1. https://github.com/elastic/beats/pull/4019
  2. https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html#doc_as_upsert