Skip to content

Commit

Permalink
resilience: prevent failures from crashing plugin
Browse files Browse the repository at this point in the history
When an Event cannot be created directly from the hit, or when the
docinfo cannot be merged into a non-hash field in the hit, emit an
Event tagged with `_elasticsearch_input_failure` that contains the
JSON-encoded hit in `[event][original]` instead of crashing.
  • Loading branch information
yaauie committed Dec 26, 2024
1 parent 79cfe85 commit 8912943
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 10 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 4.21.1
- Fix: prevent plugin crash when hits contain illegal structure
- When a hit cannot be converted to an event, the input now emits an event tagged with `_elasticsearch_input_failure` with an `[event][original]` containing a JSON-encoded string representation of the entire hit.

## 4.21.0
- Add support for custom headers [#217](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/217)

Expand Down
10 changes: 10 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@ The plugin logs a warning when ECS is enabled and `target` isn't set.

TIP: Set the `target` option to avoid potential schema conflicts.

[id="plugins-{type}s-{plugin}-failure-handling"]
==== Failure handling

When this input plugin cannot create a structured `Event` from a hit result, it will instead create an `Event` that is tagged with `_elasticsearch_input_failure` whose `[event][original]` is a JSON-encoded string representation of the entire hit.

Common causes are:

- When the hit result contains top-level fields that are {logstash-ref}/processing.html#reserved-fields[reserved in Logstash] but do not have the expected shape. Use the <<plugins-{type}s-{plugin}-target>> directive to avoid conflicts with the top-level namespace.
- When <<plugins-{type}s-{plugin}-docinfo>> is enabled and the docinfo fields cannot be merged into the hit result. Combine <<plugins-{type}s-{plugin}-target>> and <<plugins-{type}s-{plugin}-docinfo_target>> to avoid conflict.

[id="plugins-{type}s-{plugin}-options"]
==== Elasticsearch Input configuration options

Expand Down
30 changes: 24 additions & 6 deletions lib/logstash/inputs/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -350,24 +350,42 @@ def run(output_queue)
end

##
# @param output_queue [#<<]
# @param scroll_id [String]: a scroll id to resume
# @return [Array(Boolean,String)]: a tuple representing whether the response
#
def process_next_scroll(output_queue, scroll_id)
r = scroll_request(scroll_id)
r['hits']['hits'].each { |hit| push_hit(hit, output_queue) }
[r['hits']['hits'].any?, r['_scroll_id']]
end

# This can be called externally from the query_executor
public
def push_hit(hit, output_queue, root_field = '_source')
event = targeted_event_factory.new_event hit[root_field]
set_docinfo_fields(hit, event) if @docinfo
event = event_from_hit(hit, root_field)
decorate(event)
output_queue << event
end

def event_from_hit(hit, root_field)
event = targeted_event_factory.new_event hit[root_field]
set_docinfo_fields(hit, event) if @docinfo

event
rescue => e
serialized_hit = hit.to_json
logger.warn("Event creation error, original data now in [event][original] field", message: e.message, exception: e.class, data: serialized_hit)
return event_factory.new_event('event' => { 'original' => serialized_hit }, 'tags' => ['_elasticsearch_input_failure'])
end

def set_docinfo_fields(hit, event)
# do not assume event[@docinfo_target] to be in-place updatable. first get it, update it, then at the end set it in the event.
docinfo_target = event.get(@docinfo_target) || {}

unless docinfo_target.is_a?(Hash)
@logger.error("Incompatible Event, incompatible type for the docinfo_target=#{@docinfo_target} field in the `_source` document, expected a hash got:", :docinfo_target_type => docinfo_target.class, :event => event.to_hash_with_metadata)

# TODO: (colin) I am not sure raising is a good strategy here?
raise Exception.new("Elasticsearch input: incompatible event")
# expect error to be handled by `#event_from_hit`
fail RuntimeError, "Incompatible event; unable to merge docinfo fields into docinfo_target=`#{@docinfo_target}`"
end

@docinfo_fields.each do |field|
Expand Down
2 changes: 1 addition & 1 deletion logstash-input-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-input-elasticsearch'
s.version = '4.21.0'
s.version = '4.21.1'
s.licenses = ['Apache License (2.0)']
s.summary = "Reads query results from an Elasticsearch cluster"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
105 changes: 102 additions & 3 deletions spec/inputs/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -653,11 +653,28 @@ def synchronize_method!(object, method_name)
context 'if the `docinfo_target` exist but is not of type hash' do
let(:config) { base_config.merge 'docinfo' => true, "docinfo_target" => 'metadata_with_string' }
let(:do_register) { false }
let(:mock_queue) { double('Queue', :<< => nil) }
let(:hit) { response.dig('hits', 'hits').first }

it 'emits a tagged event with JSON-serialized event in [event][original]' do
allow(plugin).to receive(:logger).and_return(double('Logger').as_null_object)

it 'raises an exception if the `docinfo_target` exist but is not of type hash' do
expect(client).not_to receive(:clear_scroll)
plugin.register
expect { plugin.run([]) }.to raise_error(Exception, /incompatible event/)
plugin.run(mock_queue)

expect(mock_queue).to have_received(:<<) do |event|
expect(event).to be_a_kind_of LogStash::Event

expect(event.get('tags')).to include("_elasticsearch_input_failure")
expect(event.get('[event][original]')).to be_a_kind_of String
expect(JSON.load(event.get('[event][original]'))).to eq hit
end

expect(plugin.logger)
.to have_received(:warn).with(
a_string_including("Event creation error, original data now in [event][original] field"),
a_hash_including(:message => a_string_including('unable to merge docinfo fields into docinfo_target=`metadata_with_string`'),
:data => a_string_including('"_id":"C5b2xLQwTZa76jBmHIbwHQ"')))
end

end
Expand Down Expand Up @@ -1235,6 +1252,88 @@ def wait_receive_request
end
end

context '#push_hit' do
let(:config) do
{
'docinfo' => true, # include ids
'docinfo_target' => '[@metadata][docinfo]'
}
end

let(:hit) do
JSON.load(<<~EOJSON)
{
"_index" : "test_bulk_index_2",
"_type" : "_doc",
"_id" : "sHe6A3wBesqF7ydicQvG",
"_score" : 1.0,
"_source" : {
"@timestamp" : "2021-09-20T15:02:02.557Z",
"message" : "ping",
"@version" : "17",
"sequence" : 7,
"host" : {
"name" : "maybe.local",
"ip" : "127.0.0.1"
}
}
}
EOJSON
end

let(:mock_queue) { double('queue', :<< => nil) }

it 'pushes a generated event to the queue' do
plugin.send(:push_hit, hit, mock_queue)
expect(mock_queue).to have_received(:<<) do |event|
expect(event).to be_a_kind_of LogStash::Event

# fields overriding defaults
expect(event.timestamp.to_s).to eq("2021-09-20T15:02:02.557Z")
expect(event.get('@version')).to eq("17")

# structure from hit's _source
expect(event.get('message')).to eq("ping")
expect(event.get('sequence')).to eq(7)
expect(event.get('[host][name]')).to eq("maybe.local")
expect(event.get('[host][ip]')).to eq("127.0.0.1")

# docinfo fields
expect(event.get('[@metadata][docinfo][_index]')).to eq("test_bulk_index_2")
expect(event.get('[@metadata][docinfo][_type]')).to eq("_doc")
expect(event.get('[@metadata][docinfo][_id]')).to eq("sHe6A3wBesqF7ydicQvG")
end
end

context 'when event creation fails' do
before(:each) do
allow(plugin).to receive(:logger).and_return(double('Logger').as_null_object)

allow(plugin.event_factory).to receive(:new_event).and_call_original
allow(plugin.event_factory).to receive(:new_event).with(a_hash_including hit['_source']).and_raise(RuntimeError, 'intentional')
end

it 'pushes a tagged event containing a JSON-encoded hit in [event][original]' do
plugin.send(:push_hit, hit, mock_queue)

expect(mock_queue).to have_received(:<<) do |event|
expect(event).to be_a_kind_of LogStash::Event

expect(event.get('tags')).to include("_elasticsearch_input_failure")
expect(event.get('[event][original]')).to be_a_kind_of String
expect(JSON.load(event.get('[event][original]'))).to eq hit
end

expect(plugin.logger)
.to have_received(:warn).with(
a_string_including("Event creation error, original data now in [event][original] field"),
a_hash_including(:message => a_string_including('intentional'),
:data => a_string_including('"_id":"sHe6A3wBesqF7ydicQvG"')))

end
end
end

# @note can be removed once we depends on elasticsearch gem >= 6.x
def extract_transport(client) # on 7.x client.transport is a ES::Transport::Client
client.transport.respond_to?(:transport) ? client.transport.transport : client.transport
Expand Down

0 comments on commit 8912943

Please sign in to comment.