Contents

Azure Translation Services with Elasticsearch and Logstash

Recently had a need to parse some XML into Elasticsearch, specifically some CERT RSS feeds. Logstash has an RSS input, but it’s a bit basic, and doesn’t provide any language indications if the RSS feed includes them. One of the feeds I’m using can be various languages per item, and many others are non-english entirely.

Since this would be parsing the same RSS feeds repeatedly, I need a predictable Document ID, so I can re-insert / upsert the item, and not create a new document every time the RSS feed is parsed.

For translating the title and description, thought I’d give Azure Translation Services a try. Since Translation Services cost is based on number of characters translated, I’d want to avoid translating items more than once.

Predictable Document ID

When parsed, each RSS item is an object in the [xml][channel][item] field, so I create a unique and predictable ID using Logstash fingerprint filter.

  fingerprint {
    method => "MURMUR3"
    source => [
      "[xml][channel][item]"
    ]
    target => "[event][id]"
  }

Look for existing document

Then use that ID as the Document ID to attempt to lookup the document in Elasticsearch.

If this is the first time this specific RSS feed/item is being processed, this will not return anything as we haven’t stored the item as a document yet.

If this is a repeated parsing, there should already be a document with this Document ID in Elasticsearch, and this query will return it.

From the returned document, the fields [rss][item][title][en] and [rss][item][description][en] will be copied into the current events fields of [tmp_translated_title] and [tmp_translated_description]

  elasticsearch {
    hosts => [ "es" ]
    user => "user"
    password => "pw"
    index => "index"
    query => "_id: %{[event][id]}"
    fields => {
      "[rss][item][title][en]"          => "[tmp_translated_title]"
      "[rss][item][description][en]"    => "[tmp_translated_description]"
    }
  }

Translation

If this is the first time this RSS item has been parsed, or if previous attempts failed to translate, the [tmp_translated_title] and [tmp_translated_description] will not be set. This is how we know to attempt to translate this event.

So now check if [xml][channel][item][language] exists and is not "en". Then check if [xml][channel][item][title] exists, and that this item has not been translated previously ( ! [tmp_translated_title] ). If all that is true, then use the HTTP Filter to POST to your Translation Services instance.

if ([xml][channel][item][language] and [xml][channel][item][language] != "en"){
  if ! [tmp_translated_title] and [xml][channel][item][title] {
    http {
      url => "https://<<< translation api URL>>>"
      verb => "POST"
      query => {
        "api-version" => "3.0"
        "to" => "en"
      }
      body => "[{'Text':'%{[xml][channel][item][title]}'}]"
      body_format => "json"
      headers => {
        "Ocp-Apim-Subscription-Key" => "<<< api key >>>"
        "Ocp-Apim-Subscription-Region" => "<<< azure region >>>"
        "Content-Type" => "application/json"
      }
      target_body => "[tmp][translated][en][xml][channel][item][title]"
    }
  }
}

If all goes well, there will be a translated title in the field [tmp][translated][en][xml][channel][item][title].

If the translation attempt fails, it will be retried the next time Logstash parses the RSS feed. Failures happen reasonable frequently for me as I’m attempting quite a few translations as rapidly as possible and am getting 429’s Too Many Requests back from the Translation Service. I can’t find documentation on translation service throttling, but it’s evidently doing it.

Insert/Update the document

So finally use the Logstash Elasticsearch output plugin to perform an upsert ( doc_as_upsert )

If the document with this ID does not exist, it will be created. If it does exist, it will be updated using the event.

  elasticsearch {
	hosts => [
	  "es"
	]
	index => "index"
	document_id => "%{[event][id]}"
	doc_as_upsert => true
	action => "update"
	user => "user"
	password => "pw"
  }

References