Skip to content

Commit

Permalink
✨ Add use_metadata option
Browse files Browse the repository at this point in the history
  • Loading branch information
pemontto committed Sep 21, 2021
1 parent 7f1099e commit e6278b8
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 2 deletions.
23 changes: 23 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ This plugin supports the following configuration options plus the
| <<plugins-{type}s-{plugin}-truststore_password>> |<<password,password>>|No
| <<plugins-{type}s-{plugin}-upsert>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-user>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-use_metadata>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-validate_after_inactivity>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-version>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-version_type>> |<<string,string>>, one of `["internal", "external", "external_gt", "external_gte", "force"]`|No
Expand Down Expand Up @@ -1095,6 +1096,28 @@ Create a new document with this parameter as json string if `document_id` doesn'

Username to authenticate to a secure Elasticsearch cluster

[id="plugins-{type}s-{plugin}-use_metadata"]
===== `use_metadata`

* Value type is <<boolean,boolean>>
* Default value is `false`

Use and preference output parameters defined in the document metadata. The <<plugins-{type}s-{plugin}-index>> (`@metadata._index`), <<plugins-{type}s-{plugin}-document_id>> (`@metadata._id_`), and <<plugins-{type}s-{plugin}-pipeline>> (`@metadata.pipeline`) can be set by their respective `@metadata` fields.

E.g. to index a document to index `myindex` with id `myid` with the ingest pipeline `mypipeline`:

[source,json]
-----
{
"message": "foo",
"@metadata": {
"_index": "myindex",
"_id": "myid",
"pipeline": "mypipeline"
}
}
-----

[id="plugins-{type}s-{plugin}-validate_after_inactivity"]
===== `validate_after_inactivity`

Expand Down
11 changes: 11 additions & 0 deletions lib/logstash/outputs/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
# ILM policy to use, if undefined the default policy will be used.
config :ilm_policy, :validate => :string, :default => DEFAULT_POLICY

# ILM policy to use, if undefined the default policy will be used.
config :use_metadata, :validate => :boolean, :default => false

attr_reader :client
attr_reader :default_index
attr_reader :default_ilm_rollover_alias
Expand Down Expand Up @@ -428,6 +431,14 @@ def common_event_params(event)
params[:pipeline] = value unless value.empty?
end

if @use_metadata
params[:_id] = event.get("[@metadata][_id]") || params[:_id]
event_index = event.get("[@metadata][_index]")
params[:_index] = event.sprintf(event_index) if event_index && !event_index.empty?
event_pipeline = event.get("[@metadata][pipeline]")
params[:pipeline] = event.sprintf(event_pipeline) if event_pipeline && !event_pipeline.empty?
end

params
end

Expand Down
2 changes: 1 addition & 1 deletion spec/integration/outputs/index_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
end

describe "indexing" do
let(:event) { LogStash::Event.new("message" => "Hello World!", "type" => type) }
let(:event) { LogStash::Event.new("message" => "Hello World!", "type" => type, "@metadata" => { "_id" => "test-id", "_index" => "test-index", "pipeline" => "test-pipeline" }) }
let(:index) { 10.times.collect { rand(10).to_s }.join("") }
let(:type) { ESHelper.es_version_satisfies?("< 7") ? "doc" : "_doc" }
let(:event_count) { 1 + rand(2) }
Expand Down
27 changes: 27 additions & 0 deletions spec/integration/outputs/index_version_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,32 @@
expect(r2["_source"]["message"]).to eq('foo')
end
end

describe "use metadata" do
let(:settings) do
{
"index" => "logstash-index",
"hosts" => get_host_port(),
"use_metadata" => true,
}
end

it "should use @metadata._id for document_id" do
id = "new_doc_id_1"
subject.multi_receive([LogStash::Event.new("@metadata" => { "_id" => id }, "message" => "foo")])
r = es.get(:index => "logstash-index", :type => doc_type, :id => id, :refresh => true)
expect(r["_id"]).to eq(id)
expect(r["_source"]["message"]).to eq("foo")
end
it "should use @metadata._index for index" do
id = "new_doc_id_2"
new_index = "logstash-index-new"
subject.multi_receive([LogStash::Event.new("@metadata" => { "_id" => id, "_index" => new_index }, "message" => "foo")])
r = es.get(:index => new_index, :type => doc_type, :id => id, :refresh => true)
expect(r["_id"]).to eq(id)
expect(r["_index"]).to eq(new_index)
expect(r["_source"]["message"]).to eq("foo")
end
end
end
end
119 changes: 118 additions & 1 deletion spec/integration/outputs/ingest_pipeline_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
end

it "indexes using the proper pipeline" do
results = @es.search(:index => 'logstash-*', :q => "message:\"netcat\"")
results = @es.search(:index => "logstash-*", :q => "message:\"netcat\"")
expect(results).to have_hits(1)
expect(results["hits"]["hits"][0]["_source"]["response"]).to eq("200")
expect(results["hits"]["hits"][0]["_source"]["bytes"]).to eq("182")
Expand All @@ -72,3 +72,120 @@
expect(results["hits"]["hits"][0]["_source"]["junkfieldaaaa"]).to eq(nil)
end
end

describe "Ingest pipeline from metadata", :integration => true do
subject! do
require "logstash/outputs/elasticsearch"
settings = {
"hosts" => "#{get_host_port()}",
"pipeline" => "apache-logs",
"data_stream" => "false",
"use_metadata" => true,
}
next LogStash::Outputs::ElasticSearch.new(settings)
end

let(:http_client) { Manticore::Client.new }
let(:ingest_url) { "http://#{get_host_port()}/_ingest/pipeline/apache-logs" }
let(:apache_logs_pipeline) {
'
{
"description" : "Pipeline to parse Apache logs",
"processors" : [
{
"grok": {
"field": "message",
"patterns": ["%{COMBINEDAPACHELOG}"]
}
}
]
}'
}

let(:add_field_ingest_url) { "http://#{get_host_port()}/_ingest/pipeline/add-field" }
let(:add_field_logs_pipeline) {
'
{
"description": "Add field foo with value bar",
"processors": [
{
"set": {
"field": "foo",
"value": "bar"
}
}
]
}'
}

before :each do
# Delete all templates first.
require "elasticsearch"

# Clean ES of data before we start.
@es = get_client
@es.indices.delete_template(:name => "*")

# This can fail if there are no indexes, ignore failure.
@es.indices.delete(:index => "*") rescue nil

# delete existing ingest pipeline
http_client.delete(ingest_url).call

# register pipelines
http_client.put(ingest_url, :body => apache_logs_pipeline, :headers => { "Content-Type" => "application/json" }).call
http_client.put(add_field_ingest_url, :body => add_field_logs_pipeline, :headers => { "Content-Type" => "application/json" }).call

#TODO: Use esclient
#@es.ingest.put_pipeline :id => 'apache_pipeline', :body => pipeline_defintion

subject.register
subject.multi_receive([
LogStash::Event.new("message" => '183.60.215.50 - - [01/Jun/2015:18:00:00 +0000] "GET /scripts/netcat-webserver HTTP/1.1" 200 182 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"'),
LogStash::Event.new("message" => '183.60.215.50 - - [01/Jun/2015:18:00:00 +0000] "GET /scripts/netcat-webserver HTTP/1.1" 200 182 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"', "@metadata" => { "_id" => "id1", "_index" => "index1", "pipeline" => "add-field" }),
LogStash::Event.new("message" => '183.60.215.50 - - [01/Jun/2015:18:00:00 +0000] "GET /scripts/netcat-webserver HTTP/1.1" 200 182 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"', "@metadata" => { "_id" => "id2", "_index" => "index2", "pipeline" => "" }),
])
@es.indices.refresh

#Wait or fail until everything's indexed.
Stud::try(10.times) do
r = @es.search(index: "logstash-*")
expect(r).to have_hits(1)
r = @es.search(index: "index1")
expect(r).to have_hits(1)
r = @es.search(index: "index2")
expect(r).to have_hits(1)
sleep(0.1)
end
end

it "indexes using the correct pipeline when @metadata.pipeline not defined" do
results = @es.search(:index => "logstash-*", :q => "message:\"netcat\"")
expect(results).to have_hits(1)
expect(results["hits"]["hits"][0]["_source"]["response"]).to eq("200")
expect(results["hits"]["hits"][0]["_source"]["bytes"]).to eq("182")
expect(results["hits"]["hits"][0]["_source"]["verb"]).to eq("GET")
expect(results["hits"]["hits"][0]["_source"]["request"]).to eq("/scripts/netcat-webserver")
expect(results["hits"]["hits"][0]["_source"]["auth"]).to eq("-")
expect(results["hits"]["hits"][0]["_source"]["ident"]).to eq("-")
expect(results["hits"]["hits"][0]["_source"]["clientip"]).to eq("183.60.215.50")
expect(results["hits"]["hits"][0]["_source"]["junkfieldaaaa"]).to eq(nil)
end

it "indexes using the @metadata._index, @metadata._id, and @metadata.pipeline when defined" do
results = @es.search(:index => "index1", :q => "message:\"netcat\"")
expect(results).to have_hits(1)
expect(results["hits"]["hits"][0]["_id"]).to eq("id1")
expect(results["hits"]["hits"][0]["_index"]).to eq("index1")
expect(results["hits"]["hits"][0]["_source"]["foo"]).to eq("bar")
end

it "indexes ignore empty @metadata.pipeline values" do
results = @es.search(:index => "index2", :q => "message:\"netcat\"")
expect(results).to have_hits(1)
expect(results["hits"]["hits"][0]["_id"]).to eq("id2")
expect(results["hits"]["hits"][0]["_index"]).to eq("index2")
expect(results["hits"]["hits"][0]["_source"]).not_to include("foo")
end

end

0 comments on commit e6278b8

Please sign in to comment.