From 8912943f79402e71cdddbe3352ae0abb7cee7146 Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Thu, 20 Oct 2022 18:03:50 +0000 Subject: [PATCH] resilience: prevent failures from crashing plugin When an Event cannot be created directly from the hit, or when the docinfo cannot be merged into a non-hash field in the hit, emit an Event tagged with `_elasticsearch_input_failure` that contains the JSON-encoded hit in `[event][original]` instead of crashing. --- CHANGELOG.md | 4 + docs/index.asciidoc | 10 +++ lib/logstash/inputs/elasticsearch.rb | 30 ++++++-- logstash-input-elasticsearch.gemspec | 2 +- spec/inputs/elasticsearch_spec.rb | 105 ++++++++++++++++++++++++++- 5 files changed, 141 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b5acdb..f4afe71 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 4.21.1 + - Fix: prevent plugin crash when hits contain illegal structure + - When a hit cannot be converted to an event, the input now emits an event tagged with `_elasticsearch_input_failure` with an `[event][original]` containing a JSON-encoded string representation of the entire hit. + ## 4.21.0 - Add support for custom headers [#217](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/217) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 597e08b..fb516c8 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -93,6 +93,16 @@ The plugin logs a warning when ECS is enabled and `target` isn't set. TIP: Set the `target` option to avoid potential schema conflicts. +[id="plugins-{type}s-{plugin}-failure-handling"] +==== Failure handling + +When this input plugin cannot create a structured `Event` from a hit result, it will instead create an `Event` that is tagged with `_elasticsearch_input_failure` whose `[event][original]` is a JSON-encoded string representation of the entire hit. + +Common causes are: + + - When the hit result contains top-level fields that are {logstash-ref}/processing.html#reserved-fields[reserved in Logstash] but do not have the expected shape. Use the <> directive to avoid conflicts with the top-level namespace. + - When <> is enabled and the docinfo fields cannot be merged into the hit result. Combine <> and <> to avoid conflict. + [id="plugins-{type}s-{plugin}-options"] ==== Elasticsearch Input configuration options diff --git a/lib/logstash/inputs/elasticsearch.rb b/lib/logstash/inputs/elasticsearch.rb index e624e84..34c0c92 100644 --- a/lib/logstash/inputs/elasticsearch.rb +++ b/lib/logstash/inputs/elasticsearch.rb @@ -350,24 +350,42 @@ def run(output_queue) end ## + # @param output_queue [#<<] + # @param scroll_id [String]: a scroll id to resume + # @return [Array(Boolean,String)]: a tuple representing whether the response + # + def process_next_scroll(output_queue, scroll_id) + r = scroll_request(scroll_id) + r['hits']['hits'].each { |hit| push_hit(hit, output_queue) } + [r['hits']['hits'].any?, r['_scroll_id']] + end + # This can be called externally from the query_executor public def push_hit(hit, output_queue, root_field = '_source') - event = targeted_event_factory.new_event hit[root_field] - set_docinfo_fields(hit, event) if @docinfo + event = event_from_hit(hit, root_field) decorate(event) output_queue << event end + def event_from_hit(hit, root_field) + event = targeted_event_factory.new_event hit[root_field] + set_docinfo_fields(hit, event) if @docinfo + + event + rescue => e + serialized_hit = hit.to_json + logger.warn("Event creation error, original data now in [event][original] field", message: e.message, exception: e.class, data: serialized_hit) + return event_factory.new_event('event' => { 'original' => serialized_hit }, 'tags' => ['_elasticsearch_input_failure']) + end + def set_docinfo_fields(hit, event) # do not assume event[@docinfo_target] to be in-place updatable. first get it, update it, then at the end set it in the event. docinfo_target = event.get(@docinfo_target) || {} unless docinfo_target.is_a?(Hash) - @logger.error("Incompatible Event, incompatible type for the docinfo_target=#{@docinfo_target} field in the `_source` document, expected a hash got:", :docinfo_target_type => docinfo_target.class, :event => event.to_hash_with_metadata) - - # TODO: (colin) I am not sure raising is a good strategy here? - raise Exception.new("Elasticsearch input: incompatible event") + # expect error to be handled by `#event_from_hit` + fail RuntimeError, "Incompatible event; unable to merge docinfo fields into docinfo_target=`#{@docinfo_target}`" end @docinfo_fields.each do |field| diff --git a/logstash-input-elasticsearch.gemspec b/logstash-input-elasticsearch.gemspec index 8818301..9dc4ebc 100644 --- a/logstash-input-elasticsearch.gemspec +++ b/logstash-input-elasticsearch.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-input-elasticsearch' - s.version = '4.21.0' + s.version = '4.21.1' s.licenses = ['Apache License (2.0)'] s.summary = "Reads query results from an Elasticsearch cluster" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/inputs/elasticsearch_spec.rb b/spec/inputs/elasticsearch_spec.rb index f8a9a31..31477a2 100644 --- a/spec/inputs/elasticsearch_spec.rb +++ b/spec/inputs/elasticsearch_spec.rb @@ -653,11 +653,28 @@ def synchronize_method!(object, method_name) context 'if the `docinfo_target` exist but is not of type hash' do let(:config) { base_config.merge 'docinfo' => true, "docinfo_target" => 'metadata_with_string' } let(:do_register) { false } + let(:mock_queue) { double('Queue', :<< => nil) } + let(:hit) { response.dig('hits', 'hits').first } + + it 'emits a tagged event with JSON-serialized event in [event][original]' do + allow(plugin).to receive(:logger).and_return(double('Logger').as_null_object) - it 'raises an exception if the `docinfo_target` exist but is not of type hash' do - expect(client).not_to receive(:clear_scroll) plugin.register - expect { plugin.run([]) }.to raise_error(Exception, /incompatible event/) + plugin.run(mock_queue) + + expect(mock_queue).to have_received(:<<) do |event| + expect(event).to be_a_kind_of LogStash::Event + + expect(event.get('tags')).to include("_elasticsearch_input_failure") + expect(event.get('[event][original]')).to be_a_kind_of String + expect(JSON.load(event.get('[event][original]'))).to eq hit + end + + expect(plugin.logger) + .to have_received(:warn).with( + a_string_including("Event creation error, original data now in [event][original] field"), + a_hash_including(:message => a_string_including('unable to merge docinfo fields into docinfo_target=`metadata_with_string`'), + :data => a_string_including('"_id":"C5b2xLQwTZa76jBmHIbwHQ"'))) end end @@ -1235,6 +1252,88 @@ def wait_receive_request end end + context '#push_hit' do + let(:config) do + { + 'docinfo' => true, # include ids + 'docinfo_target' => '[@metadata][docinfo]' + } + end + + let(:hit) do + JSON.load(<<~EOJSON) + { + "_index" : "test_bulk_index_2", + "_type" : "_doc", + "_id" : "sHe6A3wBesqF7ydicQvG", + "_score" : 1.0, + "_source" : { + "@timestamp" : "2021-09-20T15:02:02.557Z", + "message" : "ping", + "@version" : "17", + "sequence" : 7, + "host" : { + "name" : "maybe.local", + "ip" : "127.0.0.1" + } + } + } + EOJSON + end + + let(:mock_queue) { double('queue', :<< => nil) } + + it 'pushes a generated event to the queue' do + plugin.send(:push_hit, hit, mock_queue) + expect(mock_queue).to have_received(:<<) do |event| + expect(event).to be_a_kind_of LogStash::Event + + # fields overriding defaults + expect(event.timestamp.to_s).to eq("2021-09-20T15:02:02.557Z") + expect(event.get('@version')).to eq("17") + + # structure from hit's _source + expect(event.get('message')).to eq("ping") + expect(event.get('sequence')).to eq(7) + expect(event.get('[host][name]')).to eq("maybe.local") + expect(event.get('[host][ip]')).to eq("127.0.0.1") + + # docinfo fields + expect(event.get('[@metadata][docinfo][_index]')).to eq("test_bulk_index_2") + expect(event.get('[@metadata][docinfo][_type]')).to eq("_doc") + expect(event.get('[@metadata][docinfo][_id]')).to eq("sHe6A3wBesqF7ydicQvG") + end + end + + context 'when event creation fails' do + before(:each) do + allow(plugin).to receive(:logger).and_return(double('Logger').as_null_object) + + allow(plugin.event_factory).to receive(:new_event).and_call_original + allow(plugin.event_factory).to receive(:new_event).with(a_hash_including hit['_source']).and_raise(RuntimeError, 'intentional') + end + + it 'pushes a tagged event containing a JSON-encoded hit in [event][original]' do + plugin.send(:push_hit, hit, mock_queue) + + expect(mock_queue).to have_received(:<<) do |event| + expect(event).to be_a_kind_of LogStash::Event + + expect(event.get('tags')).to include("_elasticsearch_input_failure") + expect(event.get('[event][original]')).to be_a_kind_of String + expect(JSON.load(event.get('[event][original]'))).to eq hit + end + + expect(plugin.logger) + .to have_received(:warn).with( + a_string_including("Event creation error, original data now in [event][original] field"), + a_hash_including(:message => a_string_including('intentional'), + :data => a_string_including('"_id":"sHe6A3wBesqF7ydicQvG"'))) + + end + end + end + # @note can be removed once we depends on elasticsearch gem >= 6.x def extract_transport(client) # on 7.x client.transport is a ES::Transport::Client client.transport.respond_to?(:transport) ? client.transport.transport : client.transport