diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 031f2224..0240196e 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -367,6 +367,7 @@ This plugin supports the following configuration options plus the | <> |<>|No | <> |<>|No | <> |<>|No +| <> |<>|No | <> |<>|No | <> |<>|No | <> |<>, one of `["internal", "external", "external_gt", "external_gte", "force"]`|No @@ -1095,6 +1096,28 @@ Create a new document with this parameter as json string if `document_id` doesn' Username to authenticate to a secure Elasticsearch cluster +[id="plugins-{type}s-{plugin}-use_metadata"] +===== `use_metadata` + + * Value type is <> + * Default value is `false` + +Use and preference output parameters defined in the document metadata. The <> (`@metadata._index`), <> (`@metadata._id_`), and <> (`@metadata.pipeline`) can be set by their respective `@metadata` fields. + +E.g. to index a document to index `myindex` with id `myid` with the ingest pipeline `mypipeline`: + +[source,json] +----- +{ + "message": "foo", + "@metadata": { + "_index": "myindex", + "_id": "myid", + "pipeline": "mypipeline" + } +} +----- + [id="plugins-{type}s-{plugin}-validate_after_inactivity"] ===== `validate_after_inactivity` diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index 25dd866a..48addfb9 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -251,6 +251,9 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base # ILM policy to use, if undefined the default policy will be used. config :ilm_policy, :validate => :string, :default => DEFAULT_POLICY + # ILM policy to use, if undefined the default policy will be used. + config :use_metadata, :validate => :boolean, :default => false + attr_reader :client attr_reader :default_index attr_reader :default_ilm_rollover_alias @@ -428,6 +431,14 @@ def common_event_params(event) params[:pipeline] = value unless value.empty? end + if @use_metadata + params[:_id] = event.get("[@metadata][_id]") || params[:_id] + event_index = event.get("[@metadata][_index]") + params[:_index] = event.sprintf(event_index) if event_index && !event_index.empty? + event_pipeline = event.get("[@metadata][pipeline]") + params[:pipeline] = event.sprintf(event_pipeline) if event_pipeline && !event_pipeline.empty? + end + params end diff --git a/spec/integration/outputs/index_spec.rb b/spec/integration/outputs/index_spec.rb index 16dac139..f805dfd1 100644 --- a/spec/integration/outputs/index_spec.rb +++ b/spec/integration/outputs/index_spec.rb @@ -46,7 +46,7 @@ end describe "indexing" do - let(:event) { LogStash::Event.new("message" => "Hello World!", "type" => type) } + let(:event) { LogStash::Event.new("message" => "Hello World!", "type" => type, "@metadata" => { "_id" => "test-id", "_index" => "test-index", "pipeline" => "test-pipeline" }) } let(:index) { 10.times.collect { rand(10).to_s }.join("") } let(:type) { ESHelper.es_version_satisfies?("< 7") ? "doc" : "_doc" } let(:event_count) { 1 + rand(2) } diff --git a/spec/integration/outputs/index_version_spec.rb b/spec/integration/outputs/index_version_spec.rb index 0b1ecda9..62ea8fa4 100644 --- a/spec/integration/outputs/index_version_spec.rb +++ b/spec/integration/outputs/index_version_spec.rb @@ -94,5 +94,32 @@ expect(r2["_source"]["message"]).to eq('foo') end end + + describe "use metadata" do + let(:settings) do + { + "index" => "logstash-index", + "hosts" => get_host_port(), + "use_metadata" => true, + } + end + + it "should use @metadata._id for document_id" do + id = "new_doc_id_1" + subject.multi_receive([LogStash::Event.new("@metadata" => { "_id" => id }, "message" => "foo")]) + r = es.get(:index => "logstash-index", :type => doc_type, :id => id, :refresh => true) + expect(r["_id"]).to eq(id) + expect(r["_source"]["message"]).to eq("foo") + end + it "should use @metadata._index for index" do + id = "new_doc_id_2" + new_index = "logstash-index-new" + subject.multi_receive([LogStash::Event.new("@metadata" => { "_id" => id, "_index" => new_index }, "message" => "foo")]) + r = es.get(:index => new_index, :type => doc_type, :id => id, :refresh => true) + expect(r["_id"]).to eq(id) + expect(r["_index"]).to eq(new_index) + expect(r["_source"]["message"]).to eq("foo") + end + end end end diff --git a/spec/integration/outputs/ingest_pipeline_spec.rb b/spec/integration/outputs/ingest_pipeline_spec.rb index c76fdaef..0073478d 100644 --- a/spec/integration/outputs/ingest_pipeline_spec.rb +++ b/spec/integration/outputs/ingest_pipeline_spec.rb @@ -60,7 +60,7 @@ end it "indexes using the proper pipeline" do - results = @es.search(:index => 'logstash-*', :q => "message:\"netcat\"") + results = @es.search(:index => "logstash-*", :q => "message:\"netcat\"") expect(results).to have_hits(1) expect(results["hits"]["hits"][0]["_source"]["response"]).to eq("200") expect(results["hits"]["hits"][0]["_source"]["bytes"]).to eq("182") @@ -72,3 +72,120 @@ expect(results["hits"]["hits"][0]["_source"]["junkfieldaaaa"]).to eq(nil) end end + +describe "Ingest pipeline from metadata", :integration => true do + subject! do + require "logstash/outputs/elasticsearch" + settings = { + "hosts" => "#{get_host_port()}", + "pipeline" => "apache-logs", + "data_stream" => "false", + "use_metadata" => true, + } + next LogStash::Outputs::ElasticSearch.new(settings) + end + + let(:http_client) { Manticore::Client.new } + let(:ingest_url) { "http://#{get_host_port()}/_ingest/pipeline/apache-logs" } + let(:apache_logs_pipeline) { + ' + { + "description" : "Pipeline to parse Apache logs", + "processors" : [ + { + "grok": { + "field": "message", + "patterns": ["%{COMBINEDAPACHELOG}"] + } + } + ] + }' + } + + let(:add_field_ingest_url) { "http://#{get_host_port()}/_ingest/pipeline/add-field" } + let(:add_field_logs_pipeline) { + ' + { + "description": "Add field foo with value bar", + "processors": [ + { + "set": { + "field": "foo", + "value": "bar" + } + } + ] + }' + } + + before :each do + # Delete all templates first. + require "elasticsearch" + + # Clean ES of data before we start. + @es = get_client + @es.indices.delete_template(:name => "*") + + # This can fail if there are no indexes, ignore failure. + @es.indices.delete(:index => "*") rescue nil + + # delete existing ingest pipeline + http_client.delete(ingest_url).call + + # register pipelines + http_client.put(ingest_url, :body => apache_logs_pipeline, :headers => { "Content-Type" => "application/json" }).call + http_client.put(add_field_ingest_url, :body => add_field_logs_pipeline, :headers => { "Content-Type" => "application/json" }).call + + #TODO: Use esclient + #@es.ingest.put_pipeline :id => 'apache_pipeline', :body => pipeline_defintion + + subject.register + subject.multi_receive([ + LogStash::Event.new("message" => '183.60.215.50 - - [01/Jun/2015:18:00:00 +0000] "GET /scripts/netcat-webserver HTTP/1.1" 200 182 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"'), + LogStash::Event.new("message" => '183.60.215.50 - - [01/Jun/2015:18:00:00 +0000] "GET /scripts/netcat-webserver HTTP/1.1" 200 182 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"', "@metadata" => { "_id" => "id1", "_index" => "index1", "pipeline" => "add-field" }), + LogStash::Event.new("message" => '183.60.215.50 - - [01/Jun/2015:18:00:00 +0000] "GET /scripts/netcat-webserver HTTP/1.1" 200 182 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"', "@metadata" => { "_id" => "id2", "_index" => "index2", "pipeline" => "" }), + ]) + @es.indices.refresh + + #Wait or fail until everything's indexed. + Stud::try(10.times) do + r = @es.search(index: "logstash-*") + expect(r).to have_hits(1) + r = @es.search(index: "index1") + expect(r).to have_hits(1) + r = @es.search(index: "index2") + expect(r).to have_hits(1) + sleep(0.1) + end + end + + it "indexes using the correct pipeline when @metadata.pipeline not defined" do + results = @es.search(:index => "logstash-*", :q => "message:\"netcat\"") + expect(results).to have_hits(1) + expect(results["hits"]["hits"][0]["_source"]["response"]).to eq("200") + expect(results["hits"]["hits"][0]["_source"]["bytes"]).to eq("182") + expect(results["hits"]["hits"][0]["_source"]["verb"]).to eq("GET") + expect(results["hits"]["hits"][0]["_source"]["request"]).to eq("/scripts/netcat-webserver") + expect(results["hits"]["hits"][0]["_source"]["auth"]).to eq("-") + expect(results["hits"]["hits"][0]["_source"]["ident"]).to eq("-") + expect(results["hits"]["hits"][0]["_source"]["clientip"]).to eq("183.60.215.50") + expect(results["hits"]["hits"][0]["_source"]["junkfieldaaaa"]).to eq(nil) + end + + it "indexes using the @metadata._index, @metadata._id, and @metadata.pipeline when defined" do + results = @es.search(:index => "index1", :q => "message:\"netcat\"") + expect(results).to have_hits(1) + expect(results["hits"]["hits"][0]["_id"]).to eq("id1") + expect(results["hits"]["hits"][0]["_index"]).to eq("index1") + expect(results["hits"]["hits"][0]["_source"]["foo"]).to eq("bar") + end + + it "indexes ignore empty @metadata.pipeline values" do + results = @es.search(:index => "index2", :q => "message:\"netcat\"") + expect(results).to have_hits(1) + expect(results["hits"]["hits"][0]["_id"]).to eq("id2") + expect(results["hits"]["hits"][0]["_index"]).to eq("index2") + expect(results["hits"]["hits"][0]["_source"]).not_to include("foo") + end + +end