Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use integration metadata to create ES actions (part 2) #1158

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 11.22.0
- Added support for propagating event processing metadata when this output is downstream of an Elastic Integration Filter and configured _without_ explicit `version`, `version_type`, or `routing` directives [#1158](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1158)

## 11.21.0
- Added support for propagating event processing metadata when this output is downstream of an Elastic Integration Filter and configured _without_ explicit `index`, `document_id`, or `pipeline` directives [#1155](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1155)

Expand Down
31 changes: 26 additions & 5 deletions lib/logstash/outputs/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -499,9 +499,6 @@ def event_action_tuple(event)
params[retry_on_conflict_action_name] = @retry_on_conflict
end

params[:version] = event.sprintf(@version) if @version
params[:version_type] = event.sprintf(@version_type) if @version_type

EventActionTuple.new(action, params, event)
end

Expand Down Expand Up @@ -541,12 +538,12 @@ def initialize(bad_action)
# @private shared event params factory between index and data_stream mode
def common_event_params(event)
event_control = event.get("[@metadata][_ingest_document]")
event_id, event_pipeline, event_index = event_control&.values_at("id","pipeline","index") rescue nil
event_id, event_pipeline, event_index, event_routing, event_version, event_version_type = event_control&.values_at("id","pipeline","index", "routing", "version", "version_type") rescue nil

params = {
:_id => resolve_document_id(event, event_id),
:_index => resolve_index!(event, event_index),
routing_field_name => @routing ? event.sprintf(@routing) : nil
routing_field_name => resolve_routing(event, event_routing)
}

target_pipeline = resolve_pipeline(event, event_pipeline)
Expand All @@ -557,9 +554,33 @@ def common_event_params(event)
# }
params[:pipeline] = target_pipeline unless (target_pipeline.nil? || target_pipeline.empty?)

resolved_version = resolve_version(event, event_version)
resolved_version_type = resolve_version_type(event, event_version_type)
# avoid to add nil valued key-value pairs
params[:version] = resolved_version unless resolved_version.nil?
params[:version_type] = resolved_version_type unless resolved_version_type.nil?

params
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issues raised elsewhere could likely be resolved by leaning on Hash#compact, which is available since Ruby 2.4 (and therefore on all Logstash 7+):

Suggested change
params
params.compact

andsel marked this conversation as resolved.
Show resolved Hide resolved
end

def resolve_version(event, event_version)
return event_version if event_version && !@version
event.sprintf(@version) if @version
end
private :resolve_version

def resolve_version_type(event, event_version_type)
return event_version_type if event_version_type && !@version_type
event.sprintf(@version_type) if @version_type
end
private :resolve_version_type

def resolve_routing(event, event_routing)
return event_routing if event_routing && !@routing
@routing ? event.sprintf(@routing) : nil
end
private :resolve_routing

def resolve_document_id(event, event_id)
return event.sprintf(@document_id) if @document_id
return event_id || nil
Expand Down
2 changes: 1 addition & 1 deletion logstash-output-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-elasticsearch'
s.version = '11.21.0'
s.version = '11.22.0'
s.licenses = ['apache-2.0']
s.summary = "Stores logs in Elasticsearch"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
102 changes: 102 additions & 0 deletions spec/unit/outputs/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,108 @@
let(:event_fields) {{}}
let(:event) { LogStash::Event.new(event_fields)}

context "when plugin's version is specified" do
let(:options) { super().merge("version" => "123")}

context "when the event contains an integration metadata version" do
let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"version" => "456"}}}) }

it "plugin's version is used" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:version => "123")
end
end

context "when the event DOESN'T contains an integration metadata version" do
it "plugin's version is used" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:version => "123")
end
end
end

context "when plugin's version is NOT specified" do
context "when the event contains an integration metadata version" do
let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"version" => "456"}}}) }

it "event's metadata version is used" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:version => "456")
end
end

context "when the event DOESN'T contain an integration metadata version" do
it "plugin's default id mechanism is used" do
expect(subject.send(:event_action_tuple, event)[1]).to_not include(:version)
end
end
end

context "when plugin's version_type is specified" do
let(:options) { super().merge("version_type" => "internal")}

context "when the event contains an integration metadata version_type" do
let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"version_type" => "external"}}}) }

it "plugin's version_type is used" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:version_type => "internal")
end
end

context "when the event DOESN'T contains an integration metadata version_type" do
it "plugin's version_type is used" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:version_type => "internal")
end
end
end

context "when plugin's version_type is NOT specified" do
context "when the event contains an integration metadata version_type" do
let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"version_type" => "external"}}}) }

it "event's metadata version_type is used" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:version_type => "external")
end
end

context "when the event DOESN'T contain an integration metadata version_type" do
it "plugin's default id mechanism is used" do
expect(subject.send(:event_action_tuple, event)[1]).to_not include(:version_type)
end
end
end

context "when plugin's routing is specified" do
let(:options) { super().merge("routing" => "settings_routing")}

context "when the event contains an integration metadata routing" do
let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"routing" => "meta-document-routing"}}}) }

it "plugin's routing is used" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:routing => "settings_routing")
end
end

context "when the event DOESN'T contains an integration metadata routing" do
it "plugin's routing is used" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:routing => "settings_routing")
end
end
end

context "when plugin's routing is NOT specified" do
context "when the event contains an integration metadata routing" do
let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"routing" => "meta-document-routing"}}}) }

it "event's metadata routing is used" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:routing => "meta-document-routing")
end
end

context "when the event DOESN'T contain an integration metadata routing" do
it "plugin's default id mechanism is used" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:routing => nil)
andsel marked this conversation as resolved.
Show resolved Hide resolved
end
end
end

context "when plugin's index is specified" do
let(:options) { super().merge("index" => "index_from_settings")}

Expand Down