Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When datastream is enabled then no version related attributes fills into document actions #1161

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 11.22.1
- Fix, avoid to populate `version` and `version_type` attributes when processing integration metadata and datastream is enabled. [#1161](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1161)

## 11.22.0
- Added support for propagating event processing metadata when this output is downstream of an Elastic Integration Filter and configured _without_ explicit `version`, `version_type`, or `routing` directives [#1158](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1158)

Expand Down
18 changes: 11 additions & 7 deletions lib/logstash/outputs/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,16 @@ def event_action_tuple(event)
params[retry_on_conflict_action_name] = @retry_on_conflict
end

event_control = event.get("[@metadata][_ingest_document]")
event_version, event_version_type = event_control&.values_at("version", "version_type") rescue nil

resolved_version = resolve_version(event, event_version)
resolved_version_type = resolve_version_type(event, event_version_type)

# avoid to add nil valued key-value pairs
params[:version] = resolved_version unless resolved_version.nil?
params[:version_type] = resolved_version_type unless resolved_version_type.nil?

EventActionTuple.new(action, params, event)
end

Expand Down Expand Up @@ -538,7 +548,7 @@ def initialize(bad_action)
# @private shared event params factory between index and data_stream mode
def common_event_params(event)
event_control = event.get("[@metadata][_ingest_document]")
event_id, event_pipeline, event_index, event_routing, event_version, event_version_type = event_control&.values_at("id","pipeline","index", "routing", "version", "version_type") rescue nil
event_id, event_pipeline, event_index, event_routing = event_control&.values_at("id","pipeline","index", "routing") rescue nil

params = {
:_id => resolve_document_id(event, event_id),
Expand All @@ -554,12 +564,6 @@ def common_event_params(event)
# }
params[:pipeline] = target_pipeline unless (target_pipeline.nil? || target_pipeline.empty?)

resolved_version = resolve_version(event, event_version)
resolved_version_type = resolve_version_type(event, event_version_type)
# avoid to add nil valued key-value pairs
params[:version] = resolved_version unless resolved_version.nil?
params[:version_type] = resolved_version_type unless resolved_version_type.nil?

params
end

Expand Down
2 changes: 1 addition & 1 deletion logstash-output-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-elasticsearch'
s.version = '11.22.0'
s.version = '11.22.1'
s.licenses = ['apache-2.0']
s.summary = "Stores logs in Elasticsearch"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
42 changes: 38 additions & 4 deletions spec/unit/outputs/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,32 @@
context "when the event contains an integration metadata version" do
let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"version" => "456"}}}) }

it "event's metadata version is used" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:version => "456")
context "when datastream settings are NOT configured" do
it "event's metadata version is used" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:version => "456")
end
end

context "when datastream settings are configured" do
# let(:options) {
# {
# "hosts" => ["localhost","localhost:9202"],
# "data_stream" => "true",
# "data_stream_type" => "logs",
# "data_stream_dataset" => "generic",
# "data_stream_namespace" => "default"
# }
# }
#
# data_stream => "true"
# data_stream_type => "metrics"
# data_stream_dataset => "foo"
# data_stream_namespace => "bar"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The commented-out setup can be removed, but we should leave a note that the validation is done using datastream-specific method (or: find a way to keep the validations in this context group at the same level of abstraction as each other).

Suggested change
# let(:options) {
# {
# "hosts" => ["localhost","localhost:9202"],
# "data_stream" => "true",
# "data_stream_type" => "logs",
# "data_stream_dataset" => "generic",
# "data_stream_namespace" => "default"
# }
# }
#
# data_stream => "true"
# data_stream_type => "metrics"
# data_stream_dataset => "foo"
# data_stream_namespace => "bar"
# NOTE: we validate with datastream-specific `data_stream_event_action_tuple`

let(:event_fields) { super().merge({"data_stream" => {"type" => "logs", "dataset" => "generic", "namespace" => "default"}}) }

it "no version is used" do
expect(subject.send(:data_stream_event_action_tuple, event)[1]).to_not include(:version)
end
end
end

Expand All @@ -315,8 +339,18 @@
context "when the event contains an integration metadata version_type" do
let(:event) { LogStash::Event.new({"@metadata" => {"_ingest_document" => {"version_type" => "external"}}}) }

it "plugin's version_type is used" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:version_type => "internal")
context "when datastream settings are NOT configured" do
it "plugin's version_type is used" do
expect(subject.send(:event_action_tuple, event)[1]).to include(:version_type => "internal")
end
end

context "when datastream settings are configured" do
let(:event_fields) { super().merge({"data_stream" => {"type" => "logs", "dataset" => "generic", "namespace" => "default"}}) }

it "no version_type is used" do
expect(subject.send(:data_stream_event_action_tuple, event)[1]).to_not include(:version_type)
end
end
end

Expand Down