Skip to content

Commit

Permalink
Use integration metadata to create ES actions (part 2) (#1158)
Browse files Browse the repository at this point in the history
Change the creation of actions that are passed down to Elasticsearch to use also the metadata fields set by an integration.
The interested fields are version, version_type and routing, the field values are taken verbatim without placeholders resolution.
The version, version_type and routing that are configured in the plugin settings have precedence on the integration ones because manifest an explicit choice made by the user.

Co-authored-by: Ry Biesemeyer <[email protected]>
  • Loading branch information
andsel and yaauie authored Nov 16, 2023
1 parent 47a5169 commit cfd82fa
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 6 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 11.22.0
- Added support for propagating event processing metadata when this output is downstream of an Elastic Integration Filter and configured _without_ explicit `version`, `version_type`, or `routing` directives [#1158](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1158)

## 11.21.0
- Added support for propagating event processing metadata when this output is downstream of an Elastic Integration Filter and configured _without_ explicit `index`, `document_id`, or `pipeline` directives [#1155](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1155)

Expand Down
31 changes: 26 additions & 5 deletions lib/logstash/outputs/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -499,9 +499,6 @@ def event_action_tuple(event)
params[retry_on_conflict_action_name] = @retry_on_conflict
end

params[:version] = event.sprintf(@version) if @version
params[:version_type] = event.sprintf(@version_type) if @version_type

EventActionTuple.new(action, params, event)
end

Expand Down Expand Up @@ -541,12 +538,12 @@ def initialize(bad_action)
# @private shared event params factory between index and data_stream mode
def common_event_params(event)
event_control = event.get("[@metadata][_ingest_document]")
event_id, event_pipeline, event_index = event_control&.values_at("id","pipeline","index") rescue nil
event_id, event_pipeline, event_index, event_routing, event_version, event_version_type = event_control&.values_at("id","pipeline","index", "routing", "version", "version_type") rescue nil

params = {
:_id => resolve_document_id(event, event_id),
:_index => resolve_index!(event, event_index),
routing_field_name => @routing ? event.sprintf(@routing) : nil
routing_field_name => resolve_routing(event, event_routing)
}

target_pipeline = resolve_pipeline(event, event_pipeline)
Expand All @@ -557,9 +554,33 @@ def common_event_params(event)
# }
params[:pipeline] = target_pipeline unless (target_pipeline.nil? || target_pipeline.empty?)

resolved_version = resolve_version(event, event_version)
resolved_version_type = resolve_version_type(event, event_version_type)
# avoid to add nil valued key-value pairs
params[:version] = resolved_version unless resolved_version.nil?
params[:version_type] = resolved_version_type unless resolved_version_type.nil?

params
end

def resolve_version(event, event_version)
return event_version if event_version && !@version
event.sprintf(@version) if @version
end
private :resolve_version

def resolve_version_type(event, event_version_type)
return event_version_type if event_version_type && !@version_type
event.sprintf(@version_type) if @version_type
end
private :resolve_version_type

def resolve_routing(event, event_routing)
return event_routing if event_routing && !@routing
@routing ? event.sprintf(@routing) : nil
end
private :resolve_routing

def resolve_document_id(event, event_id)
return event.sprintf(@document_id) if @document_id
return event_id || nil
Expand Down
2 changes: 1 addition & 1 deletion logstash-output-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-elasticsearch'
s.version = '11.21.0'
s.version = '11.22.0'
s.licenses = ['apache-2.0']
s.summary = "Stores logs in Elasticsearch"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
102 changes: 102 additions & 0 deletions spec/unit/outputs/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,108 @@
let(:event_fields) {{}}
let(:event) { LogStash::Event.new(event_fields)}

context "when plugin's version is specified" do
let(:options) { super().merge("version" => "123")}

context "when the event contains an integration metadata version" do
let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"version" => "456"}}}) }

it "plugin's version is used" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:version => "123")
end
end

context "when the event DOESN'T contains an integration metadata version" do
it "plugin's version is used" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:version => "123")
end
end
end

context "when plugin's version is NOT specified" do
context "when the event contains an integration metadata version" do
let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"version" => "456"}}}) }

it "event's metadata version is used" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:version => "456")
end
end

context "when the event DOESN'T contain an integration metadata version" do
it "plugin's default id mechanism is used" do
expect(subject.send(:event_action_tuple, event)[1]).to_not include(:version)
end
end
end

context "when plugin's version_type is specified" do
let(:options) { super().merge("version_type" => "internal")}

context "when the event contains an integration metadata version_type" do
let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"version_type" => "external"}}}) }

it "plugin's version_type is used" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:version_type => "internal")
end
end

context "when the event DOESN'T contains an integration metadata version_type" do
it "plugin's version_type is used" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:version_type => "internal")
end
end
end

context "when plugin's version_type is NOT specified" do
context "when the event contains an integration metadata version_type" do
let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"version_type" => "external"}}}) }

it "event's metadata version_type is used" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:version_type => "external")
end
end

context "when the event DOESN'T contain an integration metadata version_type" do
it "plugin's default id mechanism is used" do
expect(subject.send(:event_action_tuple, event)[1]).to_not include(:version_type)
end
end
end

context "when plugin's routing is specified" do
let(:options) { super().merge("routing" => "settings_routing")}

context "when the event contains an integration metadata routing" do
let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"routing" => "meta-document-routing"}}}) }

it "plugin's routing is used" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:routing => "settings_routing")
end
end

context "when the event DOESN'T contains an integration metadata routing" do
it "plugin's routing is used" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:routing => "settings_routing")
end
end
end

context "when plugin's routing is NOT specified" do
context "when the event contains an integration metadata routing" do
let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"routing" => "meta-document-routing"}}}) }

it "event's metadata routing is used" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:routing => "meta-document-routing")
end
end

context "when the event DOESN'T contain an integration metadata routing" do
it "plugin's default id mechanism is used" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:routing => nil)
end
end
end

context "when plugin's index is specified" do
let(:options) { super().merge("index" => "index_from_settings")}

Expand Down

0 comments on commit cfd82fa

Please sign in to comment.