From 50413820421a51b872e5a4340d2a168847cf4bb9 Mon Sep 17 00:00:00 2001 From: Casey Weed Date: Wed, 29 May 2019 11:17:29 -0400 Subject: [PATCH 01/16] Add ro alias setting option --- .../outputs/elasticsearch/common_configs.rb | 11 ++++++---- .../outputs/elasticsearch/http_client.rb | 22 +++++++++++++------ lib/logstash/outputs/elasticsearch/ilm.rb | 4 ++-- 3 files changed, 24 insertions(+), 13 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch/common_configs.rb b/lib/logstash/outputs/elasticsearch/common_configs.rb index c449ca665..55ac65fe6 100644 --- a/lib/logstash/outputs/elasticsearch/common_configs.rb +++ b/lib/logstash/outputs/elasticsearch/common_configs.rb @@ -17,8 +17,8 @@ def self.included(mod) # Joda formats are defined http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html[here]. mod.config :index, :validate => :string, :default => DEFAULT_INDEX_NAME - mod.config :document_type, - :validate => :string, + mod.config :document_type, + :validate => :string, :deprecated => "Document types are being deprecated in Elasticsearch 6.0, and removed entirely in 7.0. You should avoid this feature" # From Logstash 1.3 onwards, a template is applied to Elasticsearch during @@ -67,7 +67,7 @@ def self.included(mod) # The version to use for indexing. Use sprintf syntax like `%{my_version}` to use a field value here. # See https://www.elastic.co/blog/elasticsearch-versioning-support. mod.config :version, :validate => :string - + # The version_type to use for indexing. # See https://www.elastic.co/blog/elasticsearch-versioning-support. # See also https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#_version_types @@ -93,7 +93,7 @@ def self.included(mod) # `["https://127.0.0.1:9200/mypath"]` (If using a proxy on a subpath) # It is important to exclude http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-node.html[dedicated master nodes] from the `hosts` list # to prevent LS from sending bulk requests to the master nodes. So this parameter should only reference either data or client nodes in Elasticsearch. - # + # # Any special characters present in the URLs here MUST be URL escaped! This means `#` should be put in as `%23` for instance. mod.config :hosts, :validate => :uri, :default => [::LogStash::Util::SafeURI.new("//127.0.0.1")], :list => true @@ -155,6 +155,9 @@ def self.included(mod) # ILM policy to use, if undefined the default policy will be used. mod.config :ilm_policy, :validate => :string, :default => DEFAULT_POLICY + # Add index.lifecycle.rollover_alias setting to alias upon creation + mod.config :ilm_set_rollover_alias, :validate => [true, false, 'true', 'false'], :default => false + end end end end end diff --git a/lib/logstash/outputs/elasticsearch/http_client.rb b/lib/logstash/outputs/elasticsearch/http_client.rb index 32a37e82a..a9394f20f 100644 --- a/lib/logstash/outputs/elasticsearch/http_client.rb +++ b/lib/logstash/outputs/elasticsearch/http_client.rb @@ -110,7 +110,7 @@ def bulk(actions) if http_compression body_stream.set_encoding "BINARY" stream_writer = Zlib::GzipWriter.new(body_stream, Zlib::DEFAULT_COMPRESSION, Zlib::DEFAULT_STRATEGY) - else + else stream_writer = body_stream end bulk_responses = [] @@ -215,7 +215,7 @@ def scheme else nil end - + calculated_scheme = calculate_property(uris, :scheme, explicit_scheme, sniffing) if calculated_scheme && calculated_scheme !~ /https?/ @@ -235,7 +235,7 @@ def port # Enter things like foo:123, bar and wind up with foo:123, bar:9200 calculate_property(uris, :port, nil, sniffing) || 9200 end - + def uris @options[:hosts] end @@ -254,7 +254,7 @@ def http_compression def build_adapter(options) timeout = options[:timeout] || 0 - + adapter_options = { :socket_timeout => timeout, :request_timeout => timeout, @@ -281,7 +281,7 @@ def build_adapter(options) adapter_class = ::LogStash::Outputs::ElasticSearch::HttpClient::ManticoreAdapter adapter = adapter_class.new(@logger, adapter_options) end - + def build_pool(options) adapter = build_adapter(options) @@ -331,7 +331,7 @@ def host_to_url(h) h.query end prefixed_raw_query = raw_query && !raw_query.empty? ? "?#{raw_query}" : nil - + raw_url = "#{raw_scheme}://#{postfixed_userinfo}#{raw_host}:#{raw_port}#{prefixed_raw_path}#{prefixed_raw_query}" ::LogStash::Util::SafeURI.new(raw_url) @@ -360,9 +360,17 @@ def rollover_alias_exists?(name) end # Create a new rollover alias - def rollover_alias_put(alias_name, alias_definition) + def rollover_alias_put(alias_name, alias_definition, add_rollover_settings) logger.info("Creating rollover alias #{alias_name}") begin + if add_rollover_settings + real_alias_name, _ = alias_definition["aliases"].first + alias_definition.merge!({ + 'settings' => { + 'index.lifecycle.rollover_alias' => real_alias_name + } + }) + end @pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition)) # If the rollover alias already exists, ignore the error that comes back from Elasticsearch rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e diff --git a/lib/logstash/outputs/elasticsearch/ilm.rb b/lib/logstash/outputs/elasticsearch/ilm.rb index a9f43f12a..dd04c6d12 100644 --- a/lib/logstash/outputs/elasticsearch/ilm.rb +++ b/lib/logstash/outputs/elasticsearch/ilm.rb @@ -88,7 +88,7 @@ def maybe_create_ilm_policy end def maybe_create_rollover_alias - client.rollover_alias_put(rollover_alias_target, rollover_alias_payload) unless client.rollover_alias_exists?(ilm_rollover_alias) + client.rollover_alias_put(rollover_alias_target, rollover_alias_payload, ilm_set_rollover_alias) unless client.rollover_alias_exists?(ilm_rollover_alias) end def rollover_alias_target @@ -110,4 +110,4 @@ def policy_payload LogStash::Json.load(::IO.read(policy_path)) end end - end end end \ No newline at end of file + end end end From e910458dab3e4829e67bad9ac9faa555f18986b4 Mon Sep 17 00:00:00 2001 From: Casey Weed Date: Wed, 29 May 2019 15:24:48 -0400 Subject: [PATCH 02/16] Add log message --- lib/logstash/outputs/elasticsearch/http_client.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/logstash/outputs/elasticsearch/http_client.rb b/lib/logstash/outputs/elasticsearch/http_client.rb index a9394f20f..49b9f0c0a 100644 --- a/lib/logstash/outputs/elasticsearch/http_client.rb +++ b/lib/logstash/outputs/elasticsearch/http_client.rb @@ -365,6 +365,7 @@ def rollover_alias_put(alias_name, alias_definition, add_rollover_settings) begin if add_rollover_settings real_alias_name, _ = alias_definition["aliases"].first + logger.info("Adding lifecycle rollover_alias setting for #{real_alias_name}") alias_definition.merge!({ 'settings' => { 'index.lifecycle.rollover_alias' => real_alias_name From 6b4aedb73b44831668b5660f9f1438e5bbf0e95b Mon Sep 17 00:00:00 2001 From: Casey Weed Date: Tue, 11 Jun 2019 16:11:05 -0400 Subject: [PATCH 03/16] Get ILM event alias substition partially working? --- lib/logstash/outputs/elasticsearch/common.rb | 17 ++++++++++++++++ .../outputs/elasticsearch/common_configs.rb | 3 +++ .../outputs/elasticsearch/http_client.rb | 8 +++++--- .../http_client/manticore_adapter.rb | 1 + lib/logstash/outputs/elasticsearch/ilm.rb | 20 +++++++++++++++++++ 5 files changed, 46 insertions(+), 3 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch/common.rb b/lib/logstash/outputs/elasticsearch/common.rb index 9d6c68c7e..bc7a9c5c8 100644 --- a/lib/logstash/outputs/elasticsearch/common.rb +++ b/lib/logstash/outputs/elasticsearch/common.rb @@ -1,4 +1,5 @@ require "logstash/outputs/elasticsearch/template_manager" +require 'set' module LogStash; module Outputs; class ElasticSearch; module Common @@ -28,6 +29,8 @@ def register @bulk_request_metrics = metric.namespace(:bulk_requests) @document_level_metrics = metric.namespace(:documents) @logger.info("New Elasticsearch output", :class => self.class.name, :hosts => @hosts.map(&:sanitized).map(&:to_s)) + @logger.trace("Methods for ES Output on register: #{self.methods.to_s}") + @logger.trace("Instance variables for ES Output on register: #{self.instance_variables.to_s}") end # Receive an array of events and immediately attempt to index them (no buffering) @@ -166,6 +169,20 @@ def retrying_submit(actions) # We retry with whatever is didn't succeed begin + + # try making alias here? overwrite :_index with alias name? + unless @ilm_event_alias.nil? + created_aliases = Set[] + submit_actions.each do |action, params, event| + logger.trace("Created aliases so far: #{created_aliases.to_s}") + if ['index', 'create'].include?(action) + new_index = maybe_create_rollover_alias_for_event(event, created_aliases) + created_aliases << new_index + params[:_index] = new_index + end + end + end + submit_actions = submit(submit_actions) if submit_actions && submit_actions.size > 0 @logger.info("Retrying individual bulk actions that failed or were rejected by the previous bulk request.", :count => submit_actions.size) diff --git a/lib/logstash/outputs/elasticsearch/common_configs.rb b/lib/logstash/outputs/elasticsearch/common_configs.rb index 55ac65fe6..c385774e0 100644 --- a/lib/logstash/outputs/elasticsearch/common_configs.rb +++ b/lib/logstash/outputs/elasticsearch/common_configs.rb @@ -158,6 +158,9 @@ def self.included(mod) # Add index.lifecycle.rollover_alias setting to alias upon creation mod.config :ilm_set_rollover_alias, :validate => [true, false, 'true', 'false'], :default => false + # Use this for event substitution aliases + mod.config :ilm_event_alias, :validate => :string + end end end end end diff --git a/lib/logstash/outputs/elasticsearch/http_client.rb b/lib/logstash/outputs/elasticsearch/http_client.rb index 49b9f0c0a..b2287966e 100644 --- a/lib/logstash/outputs/elasticsearch/http_client.rb +++ b/lib/logstash/outputs/elasticsearch/http_client.rb @@ -30,7 +30,6 @@ class HttpClient # :setting => value # } -# # The `options` is a hash where the following symbol keys have meaning: # # * `:hosts` - array of String. Set a list of hosts to use for communication. @@ -65,6 +64,7 @@ def initialize(options={}) # mutex to prevent requests and sniffing to access the # connection pool at the same time @bulk_path = @options[:bulk_path] + logger.trace("All methods for ES output: #{self.methods.to_s}") end def build_url_template @@ -356,12 +356,14 @@ def template_put(name, template) # check whether rollover alias already exists def rollover_alias_exists?(name) + logger.debug("Rollover alias exists?: #{exists?(name)}") exists?(name) + # TODO: maybe try adding a sprintf here for @event? end # Create a new rollover alias def rollover_alias_put(alias_name, alias_definition, add_rollover_settings) - logger.info("Creating rollover alias #{alias_name}") + logger.info("Creating rollover alias #{alias_name}, escaped: #{CGI::escape(alias_name)}") begin if add_rollover_settings real_alias_name, _ = alias_definition["aliases"].first @@ -438,4 +440,4 @@ def update_action_builder(args, source) [args, source] end end -end end end +end end end \ No newline at end of file diff --git a/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb b/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb index 53c62f36a..9c66e4dec 100644 --- a/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb +++ b/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb @@ -64,6 +64,7 @@ def perform_request(url, method, path, params={}, body=nil) } end + @logger.debug("Manticore, url: #{url}, path: #{path}, params: #{params.to_s}, body: #{body}") request_uri = format_url(url, path) request_uri_as_string = remove_double_escaping(request_uri.to_s) resp = @manticore.send(method.downcase, request_uri_as_string, params) diff --git a/lib/logstash/outputs/elasticsearch/ilm.rb b/lib/logstash/outputs/elasticsearch/ilm.rb index dd04c6d12..ea6b9a2e0 100644 --- a/lib/logstash/outputs/elasticsearch/ilm.rb +++ b/lib/logstash/outputs/elasticsearch/ilm.rb @@ -87,7 +87,27 @@ def maybe_create_ilm_policy end end + def maybe_create_rollover_alias_for_event(event, created_aliases) + alias_name = event.sprintf(ilm_event_alias) + if client.rollover_alias_exists?(alias_name) or created_aliases.include?(alias_name) + return alias_name + end + alias_target = "<#{alias_name}-#{ilm_pattern}>" + alias_payload = { + 'aliases' => { + alias_name => { + 'is_write_index' => true + } + } + } + logger.debug("Maybe create rollover alias for an event?: alias_name: #{alias_name}, alias_target: #{alias_target}, alias_payload: #{alias_payload.to_s}") + client.rollover_alias_put(alias_target, alias_payload, ilm_set_rollover_alias) unless client.rollover_alias_exists?(alias_name) + + alias_name + end + def maybe_create_rollover_alias + logger.debug("Maybe create ro alias?: #{rollover_alias_target}, exists? #{client.rollover_alias_exists?(ilm_rollover_alias)}") client.rollover_alias_put(rollover_alias_target, rollover_alias_payload, ilm_set_rollover_alias) unless client.rollover_alias_exists?(ilm_rollover_alias) end From b1cc1c2f4728918e35b159c5e9bf690712e7d095 Mon Sep 17 00:00:00 2001 From: Casey Weed Date: Tue, 11 Jun 2019 16:54:20 -0400 Subject: [PATCH 04/16] Return if alias in created set --- lib/logstash/outputs/elasticsearch/ilm.rb | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch/ilm.rb b/lib/logstash/outputs/elasticsearch/ilm.rb index ea6b9a2e0..156ef4b42 100644 --- a/lib/logstash/outputs/elasticsearch/ilm.rb +++ b/lib/logstash/outputs/elasticsearch/ilm.rb @@ -89,9 +89,7 @@ def maybe_create_ilm_policy def maybe_create_rollover_alias_for_event(event, created_aliases) alias_name = event.sprintf(ilm_event_alias) - if client.rollover_alias_exists?(alias_name) or created_aliases.include?(alias_name) - return alias_name - end + return alias_name unless !created_aliases.include?(alias_name) alias_target = "<#{alias_name}-#{ilm_pattern}>" alias_payload = { 'aliases' => { From 7fc113da50f047ec9d83932d9fabaa19dd0077e4 Mon Sep 17 00:00:00 2001 From: Casey Weed Date: Wed, 12 Jun 2019 09:56:01 -0400 Subject: [PATCH 05/16] Remove some logging statements --- lib/logstash/outputs/elasticsearch/common.rb | 6 +++--- .../outputs/elasticsearch/http_client.rb | 6 +----- .../http_client/manticore_adapter.rb | 19 +++++++++---------- lib/logstash/outputs/elasticsearch/ilm.rb | 8 ++++---- 4 files changed, 17 insertions(+), 22 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch/common.rb b/lib/logstash/outputs/elasticsearch/common.rb index bc7a9c5c8..b99a5f105 100644 --- a/lib/logstash/outputs/elasticsearch/common.rb +++ b/lib/logstash/outputs/elasticsearch/common.rb @@ -29,8 +29,6 @@ def register @bulk_request_metrics = metric.namespace(:bulk_requests) @document_level_metrics = metric.namespace(:documents) @logger.info("New Elasticsearch output", :class => self.class.name, :hosts => @hosts.map(&:sanitized).map(&:to_s)) - @logger.trace("Methods for ES Output on register: #{self.methods.to_s}") - @logger.trace("Instance variables for ES Output on register: #{self.instance_variables.to_s}") end # Receive an array of events and immediately attempt to index them (no buffering) @@ -171,10 +169,12 @@ def retrying_submit(actions) begin # try making alias here? overwrite :_index with alias name? + # TODO: what if it doesn't match anything from the event? what then? + # need to have it use the default pattern or something if it doesn't unless @ilm_event_alias.nil? created_aliases = Set[] submit_actions.each do |action, params, event| - logger.trace("Created aliases so far: #{created_aliases.to_s}") + logger.trace("Created/cached aliases so far: #{created_aliases.to_s}") if ['index', 'create'].include?(action) new_index = maybe_create_rollover_alias_for_event(event, created_aliases) created_aliases << new_index diff --git a/lib/logstash/outputs/elasticsearch/http_client.rb b/lib/logstash/outputs/elasticsearch/http_client.rb index b2287966e..d22fbbd51 100644 --- a/lib/logstash/outputs/elasticsearch/http_client.rb +++ b/lib/logstash/outputs/elasticsearch/http_client.rb @@ -64,7 +64,6 @@ def initialize(options={}) # mutex to prevent requests and sniffing to access the # connection pool at the same time @bulk_path = @options[:bulk_path] - logger.trace("All methods for ES output: #{self.methods.to_s}") end def build_url_template @@ -356,18 +355,15 @@ def template_put(name, template) # check whether rollover alias already exists def rollover_alias_exists?(name) - logger.debug("Rollover alias exists?: #{exists?(name)}") exists?(name) - # TODO: maybe try adding a sprintf here for @event? end # Create a new rollover alias def rollover_alias_put(alias_name, alias_definition, add_rollover_settings) - logger.info("Creating rollover alias #{alias_name}, escaped: #{CGI::escape(alias_name)}") begin if add_rollover_settings real_alias_name, _ = alias_definition["aliases"].first - logger.info("Adding lifecycle rollover_alias setting for #{real_alias_name}") + logger.debug("Adding lifecycle rollover_alias setting for #{real_alias_name}") alias_definition.merge!({ 'settings' => { 'index.lifecycle.rollover_alias' => real_alias_name diff --git a/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb b/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb index 9c66e4dec..5d4cfa99e 100644 --- a/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb +++ b/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb @@ -3,7 +3,7 @@ module LogStash; module Outputs; class ElasticSearch; class HttpClient; DEFAULT_HEADERS = { "Content-Type" => "application/json" } - + class ManticoreAdapter attr_reader :manticore, :logger @@ -18,14 +18,14 @@ def initialize(logger, options={}) options[:cookies] = false @client_params = {:headers => DEFAULT_HEADERS.merge(options[:headers] || {})} - + if options[:proxy] options[:proxy] = manticore_proxy_hash(options[:proxy]) end - + @manticore = ::Manticore::Client.new(options) end - + # Transform the proxy option to a hash. Manticore's support for non-hash # proxy options is broken. This was fixed in https://github.com/cheald/manticore/commit/34a00cee57a56148629ed0a47c329181e7319af5 # but this is not yet released @@ -55,16 +55,15 @@ def perform_request(url, method, path, params={}, body=nil) params[:body] = body if body if url.user - params[:auth] = { + params[:auth] = { :user => CGI.unescape(url.user), # We have to unescape the password here since manticore won't do it # for us unless its part of the URL - :password => CGI.unescape(url.password), - :eager => true + :password => CGI.unescape(url.password), + :eager => true } end - @logger.debug("Manticore, url: #{url}, path: #{path}, params: #{params.to_s}, body: #{body}") request_uri = format_url(url, path) request_uri_as_string = remove_double_escaping(request_uri.to_s) resp = @manticore.send(method.downcase, request_uri_as_string, params) @@ -87,7 +86,7 @@ def perform_request(url, method, path, params={}, body=nil) # Returned urls from this method should be checked for double escaping. def format_url(url, path_and_query=nil) request_uri = url.clone - + # We excise auth info from the URL in case manticore itself tries to stick # sensitive data in a thrown exception or log data request_uri.user = nil @@ -103,7 +102,7 @@ def format_url(url, path_and_query=nil) new_query_parts = [request_uri.query, parsed_path_and_query.query].select do |part| part && !part.empty? # Skip empty nil and "" end - + request_uri.query = new_query_parts.join("&") unless new_query_parts.empty? # use `raw_path`` as `path` will unescape any escaped '/' in the path diff --git a/lib/logstash/outputs/elasticsearch/ilm.rb b/lib/logstash/outputs/elasticsearch/ilm.rb index 156ef4b42..9be1032c3 100644 --- a/lib/logstash/outputs/elasticsearch/ilm.rb +++ b/lib/logstash/outputs/elasticsearch/ilm.rb @@ -98,14 +98,14 @@ def maybe_create_rollover_alias_for_event(event, created_aliases) } } } - logger.debug("Maybe create rollover alias for an event?: alias_name: #{alias_name}, alias_target: #{alias_target}, alias_payload: #{alias_payload.to_s}") - client.rollover_alias_put(alias_target, alias_payload, ilm_set_rollover_alias) unless client.rollover_alias_exists?(alias_name) + # settings put must be done, otherwise you end up with a properly made rollover index + # with a lifecycle rollover alias setting for a completely different alias (if using a template) + client.rollover_alias_put(alias_target, alias_payload, true) unless client.rollover_alias_exists?(alias_name) alias_name end def maybe_create_rollover_alias - logger.debug("Maybe create ro alias?: #{rollover_alias_target}, exists? #{client.rollover_alias_exists?(ilm_rollover_alias)}") client.rollover_alias_put(rollover_alias_target, rollover_alias_payload, ilm_set_rollover_alias) unless client.rollover_alias_exists?(ilm_rollover_alias) end @@ -116,7 +116,7 @@ def rollover_alias_target def rollover_alias_payload { 'aliases' => { - ilm_rollover_alias =>{ + ilm_rollover_alias => { 'is_write_index' => true } } From 72ad2d3ea8b67709fd1e4b1644b705923bc7fda5 Mon Sep 17 00:00:00 2001 From: Casey Weed Date: Fri, 14 Jun 2019 11:53:25 -0400 Subject: [PATCH 06/16] Account for empty/missing alias substituion --- lib/logstash/outputs/elasticsearch/common.rb | 22 +++++++++++++------- lib/logstash/outputs/elasticsearch/ilm.rb | 17 ++++++++++++--- 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch/common.rb b/lib/logstash/outputs/elasticsearch/common.rb index b99a5f105..cbc4aaa59 100644 --- a/lib/logstash/outputs/elasticsearch/common.rb +++ b/lib/logstash/outputs/elasticsearch/common.rb @@ -168,19 +168,25 @@ def retrying_submit(actions) # We retry with whatever is didn't succeed begin - # try making alias here? overwrite :_index with alias name? - # TODO: what if it doesn't match anything from the event? what then? - # need to have it use the default pattern or something if it doesn't - unless @ilm_event_alias.nil? + # Create alias(es) before bulking, ensuring rollover aliases exist + if @ilm_enabled && !@ilm_event_alias.nil? created_aliases = Set[] submit_actions.each do |action, params, event| - logger.trace("Created/cached aliases so far: #{created_aliases.to_s}") if ['index', 'create'].include?(action) - new_index = maybe_create_rollover_alias_for_event(event, created_aliases) - created_aliases << new_index - params[:_index] = new_index + begin + new_index = maybe_create_rollover_alias_for_event(event, created_aliases) + created_aliases << new_index + params[:_index] = new_index + rescue ::LogStash::Outputs::ElasticSearch::Ilm::ImproperAliasName => e + @logger.error("Event alias name is not proper, using #{@ilm_rollover_alias} instead") + params[:_index] = @ilm_rollover_alias + rescue => e + @logger.error("Unknown error on creating event alias, #{e}, using #{@ilm_rollover_alias} instead") + params[:_index] = @ilm_rollover_alias + end end end + @logger.trace("Created/cached aliases: #{created_aliases.to_s}") end submit_actions = submit(submit_actions) diff --git a/lib/logstash/outputs/elasticsearch/ilm.rb b/lib/logstash/outputs/elasticsearch/ilm.rb index 9be1032c3..9a55b8398 100644 --- a/lib/logstash/outputs/elasticsearch/ilm.rb +++ b/lib/logstash/outputs/elasticsearch/ilm.rb @@ -87,9 +87,18 @@ def maybe_create_ilm_policy end end + class ImproperAliasName < StandardError + attr_reader :name + def initialize(msg="Index not proper", name) + @name = name + super(msg) + end + end + def maybe_create_rollover_alias_for_event(event, created_aliases) alias_name = event.sprintf(ilm_event_alias) - return alias_name unless !created_aliases.include?(alias_name) + return alias_name if created_aliases.include?(alias_name) + raise ImproperAliasName.new(name=alias_name) if alias_name == ilm_event_alias alias_target = "<#{alias_name}-#{ilm_pattern}>" alias_payload = { 'aliases' => { @@ -98,8 +107,10 @@ def maybe_create_rollover_alias_for_event(event, created_aliases) } } } - # settings put must be done, otherwise you end up with a properly made rollover index - # with a lifecycle rollover alias setting for a completely different alias (if using a template) + # Settings put must be done, otherwise you end up with a properly made rollover index + # with a lifecycle rollover alias setting for a completely different alias which will + # not work past the first index, stalling forever. You will still need something to + # maintain the proper alias after they are rolled over though, so, yeah. client.rollover_alias_put(alias_target, alias_payload, true) unless client.rollover_alias_exists?(alias_name) alias_name From 1cf1f96aec0ca9bd4d8b6b81761420e1c11a4a78 Mon Sep 17 00:00:00 2001 From: Casey Weed Date: Fri, 14 Jun 2019 11:56:28 -0400 Subject: [PATCH 07/16] Failed event alias logs to warn, not error --- lib/logstash/outputs/elasticsearch/common.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch/common.rb b/lib/logstash/outputs/elasticsearch/common.rb index cbc4aaa59..7ac6958b1 100644 --- a/lib/logstash/outputs/elasticsearch/common.rb +++ b/lib/logstash/outputs/elasticsearch/common.rb @@ -178,10 +178,10 @@ def retrying_submit(actions) created_aliases << new_index params[:_index] = new_index rescue ::LogStash::Outputs::ElasticSearch::Ilm::ImproperAliasName => e - @logger.error("Event alias name is not proper, using #{@ilm_rollover_alias} instead") + @logger.warn("Event alias name is not proper, using #{@ilm_rollover_alias} instead") params[:_index] = @ilm_rollover_alias rescue => e - @logger.error("Unknown error on creating event alias, #{e}, using #{@ilm_rollover_alias} instead") + @logger.warn("Unknown error on creating event alias, #{e}, using #{@ilm_rollover_alias} instead") params[:_index] = @ilm_rollover_alias end end From 35dcf7d7cbbc202790608b5cefee518d12b0d6c7 Mon Sep 17 00:00:00 2001 From: Casey Weed Date: Fri, 14 Jun 2019 11:58:31 -0400 Subject: [PATCH 08/16] Add some additional info on the configuration options --- lib/logstash/outputs/elasticsearch/common_configs.rb | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch/common_configs.rb b/lib/logstash/outputs/elasticsearch/common_configs.rb index c385774e0..fd6c03f5f 100644 --- a/lib/logstash/outputs/elasticsearch/common_configs.rb +++ b/lib/logstash/outputs/elasticsearch/common_configs.rb @@ -155,10 +155,11 @@ def self.included(mod) # ILM policy to use, if undefined the default policy will be used. mod.config :ilm_policy, :validate => :string, :default => DEFAULT_POLICY - # Add index.lifecycle.rollover_alias setting to alias upon creation + # Add index.lifecycle.rollover_alias setting to alias upon creation, always done in the case of aliases created via the ilm_event_alias setting. mod.config :ilm_set_rollover_alias, :validate => [true, false, 'true', 'false'], :default => false - # Use this for event substitution aliases + # Use this for event substitution aliases, ilm_rollover_alias is used as a default in the event the field is missing or other unknown errors. + # Substituion syntax will work here, e.g. "%{my_fantastic_field_here}-alias". mod.config :ilm_event_alias, :validate => :string end From 80af9b37b58b12217427b8ad1c6c492977483723 Mon Sep 17 00:00:00 2001 From: Casey Weed Date: Wed, 19 Jun 2019 14:43:21 -0400 Subject: [PATCH 09/16] Implement caching once --- lib/logstash/outputs/elasticsearch/common.rb | 16 +++++++++++----- .../outputs/elasticsearch/common_configs.rb | 3 +++ lib/logstash/outputs/elasticsearch/ilm.rb | 12 +++++------- 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch/common.rb b/lib/logstash/outputs/elasticsearch/common.rb index 7ac6958b1..e3d821c47 100644 --- a/lib/logstash/outputs/elasticsearch/common.rb +++ b/lib/logstash/outputs/elasticsearch/common.rb @@ -1,5 +1,4 @@ require "logstash/outputs/elasticsearch/template_manager" -require 'set' module LogStash; module Outputs; class ElasticSearch; module Common @@ -21,6 +20,8 @@ def register @stopping = Concurrent::AtomicBoolean.new(false) # To support BWC, we check if DLQ exists in core (< 5.4). If it doesn't, we use nil to resort to previous behavior. @dlq_writer = dlq_enabled? ? execution_context.dlq_writer : nil + @logger.debug("Caching created/seen aliases #{@ilm_cache_once ? 'once' : 'every bulk'}") + @ilm_seen_aliases = {} setup_hosts # properly sets @hosts build_client @@ -168,17 +169,22 @@ def retrying_submit(actions) # We retry with whatever is didn't succeed begin - # Create alias(es) before bulking, ensuring rollover aliases exist + # Create alias(es) before bulking, ensuring rollover aliases exist. + # + # Using the hash let's us set improper aliases such as unsubstituted fields like + # %{abc} to the default alias instead, then return immediately when found to avoid + # raising the exception multiple times, as the exceptions are typically more expensive. if @ilm_enabled && !@ilm_event_alias.nil? - created_aliases = Set[] + created_aliases = @ilm_cache_once ? @ilm_seen_aliases : {} submit_actions.each do |action, params, event| if ['index', 'create'].include?(action) begin - new_index = maybe_create_rollover_alias_for_event(event, created_aliases) - created_aliases << new_index + alias_name, new_index = maybe_create_rollover_alias_for_event(event, created_aliases) + created_aliases[alias_name] = new_index params[:_index] = new_index rescue ::LogStash::Outputs::ElasticSearch::Ilm::ImproperAliasName => e @logger.warn("Event alias name is not proper, using #{@ilm_rollover_alias} instead") + created_aliases[e.name] = @ilm_rollover_alias params[:_index] = @ilm_rollover_alias rescue => e @logger.warn("Unknown error on creating event alias, #{e}, using #{@ilm_rollover_alias} instead") diff --git a/lib/logstash/outputs/elasticsearch/common_configs.rb b/lib/logstash/outputs/elasticsearch/common_configs.rb index fd6c03f5f..e15f8f01c 100644 --- a/lib/logstash/outputs/elasticsearch/common_configs.rb +++ b/lib/logstash/outputs/elasticsearch/common_configs.rb @@ -162,6 +162,9 @@ def self.included(mod) # Substituion syntax will work here, e.g. "%{my_fantastic_field_here}-alias". mod.config :ilm_event_alias, :validate => :string + # Cache seen/created aliases and their destinations once, not during each bulk batch. + mod.config :ilm_cache_once, :validate => [true, false, 'true', 'false'], :default => false + end end end end end diff --git a/lib/logstash/outputs/elasticsearch/ilm.rb b/lib/logstash/outputs/elasticsearch/ilm.rb index 9a55b8398..5e62a56d1 100644 --- a/lib/logstash/outputs/elasticsearch/ilm.rb +++ b/lib/logstash/outputs/elasticsearch/ilm.rb @@ -97,7 +97,7 @@ def initialize(msg="Index not proper", name) def maybe_create_rollover_alias_for_event(event, created_aliases) alias_name = event.sprintf(ilm_event_alias) - return alias_name if created_aliases.include?(alias_name) + return alias_name, created_aliases[alias_name] if created_aliases.has_key?(alias_name) raise ImproperAliasName.new(name=alias_name) if alias_name == ilm_event_alias alias_target = "<#{alias_name}-#{ilm_pattern}>" alias_payload = { @@ -107,13 +107,11 @@ def maybe_create_rollover_alias_for_event(event, created_aliases) } } } - # Settings put must be done, otherwise you end up with a properly made rollover index - # with a lifecycle rollover alias setting for a completely different alias which will - # not work past the first index, stalling forever. You will still need something to - # maintain the proper alias after they are rolled over though, so, yeah. - client.rollover_alias_put(alias_target, alias_payload, true) unless client.rollover_alias_exists?(alias_name) + # Without placing the settings on the index you'll need something to run by and add this + # afterwards (or by a template) or the first index will never rollover. + client.rollover_alias_put(alias_target, alias_payload, ilm_set_rollover_alias) unless client.rollover_alias_exists?(alias_name) - alias_name + return alias_name, alias_name end def maybe_create_rollover_alias From d62cc4737dee412179d6e799047b32c870a5a877 Mon Sep 17 00:00:00 2001 From: Casey Weed Date: Thu, 20 Jun 2019 16:41:47 -0400 Subject: [PATCH 10/16] Tentatively implement ro only creation --- lib/logstash/outputs/elasticsearch.rb | 14 +++++++------- lib/logstash/outputs/elasticsearch/common.rb | 16 +++++++++++++--- .../outputs/elasticsearch/common_configs.rb | 15 +++++++++++++++ lib/logstash/outputs/elasticsearch/ilm.rb | 4 ++-- 4 files changed, 37 insertions(+), 12 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index 97ee24024..24aa6a8e0 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -17,7 +17,7 @@ # called `http.content_type.required`. If this option is set to `true`, and you # are using Logstash 2.4 through 5.2, you need to update the Elasticsearch output # plugin to version 6.2.5 or higher. -# +# # ================================================================================ # # This plugin is the recommended method of storing logs in Elasticsearch. @@ -26,8 +26,8 @@ # This output only speaks the HTTP protocol. HTTP is the preferred protocol for interacting with Elasticsearch as of Logstash 2.0. # We strongly encourage the use of HTTP over the node protocol for a number of reasons. HTTP is only marginally slower, # yet far easier to administer and work with. When using the HTTP protocol one may upgrade Elasticsearch versions without having -# to upgrade Logstash in lock-step. -# +# to upgrade Logstash in lock-step. +# # You can learn more about Elasticsearch at # # ==== Template management for Elasticsearch 5.x @@ -75,12 +75,12 @@ # # ==== HTTP Compression # -# This plugin supports request and response compression. Response compression is enabled by default and -# for Elasticsearch versions 5.0 and later, the user doesn't have to set any configs in Elasticsearch for -# it to send back compressed response. For versions before 5.0, `http.compression` must be set to `true` in +# This plugin supports request and response compression. Response compression is enabled by default and +# for Elasticsearch versions 5.0 and later, the user doesn't have to set any configs in Elasticsearch for +# it to send back compressed response. For versions before 5.0, `http.compression` must be set to `true` in # Elasticsearch[https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-http.html#modules-http] to take advantage of response compression when using this plugin # -# For requests compression, regardless of the Elasticsearch version, users have to enable `http_compression` +# For requests compression, regardless of the Elasticsearch version, users have to enable `http_compression` # setting in their Logstash config file. # class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base diff --git a/lib/logstash/outputs/elasticsearch/common.rb b/lib/logstash/outputs/elasticsearch/common.rb index e3d821c47..6343350bf 100644 --- a/lib/logstash/outputs/elasticsearch/common.rb +++ b/lib/logstash/outputs/elasticsearch/common.rb @@ -20,9 +20,19 @@ def register @stopping = Concurrent::AtomicBoolean.new(false) # To support BWC, we check if DLQ exists in core (< 5.4). If it doesn't, we use nil to resort to previous behavior. @dlq_writer = dlq_enabled? ? execution_context.dlq_writer : nil - @logger.debug("Caching created/seen aliases #{@ilm_cache_once ? 'once' : 'every bulk'}") @ilm_seen_aliases = {} + # Can only be used if ILM not in use + if ro_only_enabled + if ilm_in_use? + raise LogStash::ConfigurationError, "ILM and preemptive RO creation cannot both be enabled." + else + @logger.debug("Preemptively creating rollover aliases") + end + end + + @logger.debug("Caching seen/created aliases #{@ilm_cache_once : 'once' : 'every bulk'}") unless !(@ro_only_enabled || !ilm_in_use?) + setup_hosts # properly sets @hosts build_client setup_after_successful_connection @@ -174,12 +184,12 @@ def retrying_submit(actions) # Using the hash let's us set improper aliases such as unsubstituted fields like # %{abc} to the default alias instead, then return immediately when found to avoid # raising the exception multiple times, as the exceptions are typically more expensive. - if @ilm_enabled && !@ilm_event_alias.nil? + if (@ilm_enabled || @ro_only_enabled) && !@ilm_event_alias.nil? created_aliases = @ilm_cache_once ? @ilm_seen_aliases : {} submit_actions.each do |action, params, event| if ['index', 'create'].include?(action) begin - alias_name, new_index = maybe_create_rollover_alias_for_event(event, created_aliases) + alias_name, new_index = maybe_create_rollover_alias_for_event(event, created_aliases, @ilm_enabled) created_aliases[alias_name] = new_index params[:_index] = new_index rescue ::LogStash::Outputs::ElasticSearch::Ilm::ImproperAliasName => e diff --git a/lib/logstash/outputs/elasticsearch/common_configs.rb b/lib/logstash/outputs/elasticsearch/common_configs.rb index e15f8f01c..d4b97c260 100644 --- a/lib/logstash/outputs/elasticsearch/common_configs.rb +++ b/lib/logstash/outputs/elasticsearch/common_configs.rb @@ -165,6 +165,21 @@ def self.included(mod) # Cache seen/created aliases and their destinations once, not during each bulk batch. mod.config :ilm_cache_once, :validate => [true, false, 'true', 'false'], :default => false + + # ----- + # Rollover Alias Creation + # ----- + + # Reuses ilm_event_alias, ilm_pattern, ilm_cache_once as only ILM or RO can be used at one time. Preemptive RO creation + # follows all the same processes that the ILM tweaks does, however, it does not insert the lifecycle alias setting or policy. + # It is understood that this to bootstrap the rollover alias and index before indexing so that external tools can manage + # the actual rollover process, while ILM can manage the other lifecycle phases. + # + # Just like the ILM event substitution process, if anything does not match, it is thrown into the default alias set via + # ilm_rollover_alias. + + # Flag for enabling Rollover alias creation, conflicts with ILM. + mod.config :ro_only_enabled, :validate => [true, false, 'true', 'false'], :default 'false' end end end end end diff --git a/lib/logstash/outputs/elasticsearch/ilm.rb b/lib/logstash/outputs/elasticsearch/ilm.rb index 5e62a56d1..5c938f024 100644 --- a/lib/logstash/outputs/elasticsearch/ilm.rb +++ b/lib/logstash/outputs/elasticsearch/ilm.rb @@ -95,7 +95,7 @@ def initialize(msg="Index not proper", name) end end - def maybe_create_rollover_alias_for_event(event, created_aliases) + def maybe_create_rollover_alias_for_event(event, created_aliases, is_ilm_request) alias_name = event.sprintf(ilm_event_alias) return alias_name, created_aliases[alias_name] if created_aliases.has_key?(alias_name) raise ImproperAliasName.new(name=alias_name) if alias_name == ilm_event_alias @@ -109,7 +109,7 @@ def maybe_create_rollover_alias_for_event(event, created_aliases) } # Without placing the settings on the index you'll need something to run by and add this # afterwards (or by a template) or the first index will never rollover. - client.rollover_alias_put(alias_target, alias_payload, ilm_set_rollover_alias) unless client.rollover_alias_exists?(alias_name) + client.rollover_alias_put(alias_target, alias_payload, is_ilm_request ? ilm_set_rollover_alias : false) unless client.rollover_alias_exists?(alias_name) return alias_name, alias_name end From e200f8d45dd559d0da3c8d8a87b83fd56b59952c Mon Sep 17 00:00:00 2001 From: Casey Weed Date: Tue, 25 Jun 2019 10:58:56 -0400 Subject: [PATCH 11/16] Boolean should probably be a boolean --- lib/logstash/outputs/elasticsearch/common_configs.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logstash/outputs/elasticsearch/common_configs.rb b/lib/logstash/outputs/elasticsearch/common_configs.rb index d4b97c260..231450acd 100644 --- a/lib/logstash/outputs/elasticsearch/common_configs.rb +++ b/lib/logstash/outputs/elasticsearch/common_configs.rb @@ -179,7 +179,7 @@ def self.included(mod) # ilm_rollover_alias. # Flag for enabling Rollover alias creation, conflicts with ILM. - mod.config :ro_only_enabled, :validate => [true, false, 'true', 'false'], :default 'false' + mod.config :ro_only_enabled, :validate => [true, false, 'true', 'false'], :default => false end end end end end From 4742fdd4a9f555a41e0efe8817ffab29b3a989e9 Mon Sep 17 00:00:00 2001 From: Casey Weed Date: Tue, 25 Jun 2019 11:18:46 -0400 Subject: [PATCH 12/16] Fix logging statement --- lib/logstash/outputs/elasticsearch/common.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logstash/outputs/elasticsearch/common.rb b/lib/logstash/outputs/elasticsearch/common.rb index 6343350bf..ee3413767 100644 --- a/lib/logstash/outputs/elasticsearch/common.rb +++ b/lib/logstash/outputs/elasticsearch/common.rb @@ -31,7 +31,7 @@ def register end end - @logger.debug("Caching seen/created aliases #{@ilm_cache_once : 'once' : 'every bulk'}") unless !(@ro_only_enabled || !ilm_in_use?) + @logger.debug("Caching seen/created aliases #{@ilm_cache_once ? 'once' : 'every bulk'}") unless !(@ro_only_enabled || !ilm_in_use?) setup_hosts # properly sets @hosts build_client From 088c82a0d0dd802f5a50cc14983e9e088b32921b Mon Sep 17 00:00:00 2001 From: Casey Weed Date: Tue, 25 Jun 2019 11:50:51 -0400 Subject: [PATCH 13/16] Add trace statement --- lib/logstash/outputs/elasticsearch/common.rb | 4 +++- lib/logstash/outputs/elasticsearch/ilm.rb | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/logstash/outputs/elasticsearch/common.rb b/lib/logstash/outputs/elasticsearch/common.rb index ee3413767..cd76624a5 100644 --- a/lib/logstash/outputs/elasticsearch/common.rb +++ b/lib/logstash/outputs/elasticsearch/common.rb @@ -31,7 +31,9 @@ def register end end - @logger.debug("Caching seen/created aliases #{@ilm_cache_once ? 'once' : 'every bulk'}") unless !(@ro_only_enabled || !ilm_in_use?) + if @ro_only_enabled || ilm_in_use? + @logger.debug("Caching seen/created aliases #{@ilm_cache_once ? 'once' : 'every bulk'}") + end setup_hosts # properly sets @hosts build_client diff --git a/lib/logstash/outputs/elasticsearch/ilm.rb b/lib/logstash/outputs/elasticsearch/ilm.rb index 5c938f024..43b02fc2a 100644 --- a/lib/logstash/outputs/elasticsearch/ilm.rb +++ b/lib/logstash/outputs/elasticsearch/ilm.rb @@ -109,6 +109,7 @@ def maybe_create_rollover_alias_for_event(event, created_aliases, is_ilm_request } # Without placing the settings on the index you'll need something to run by and add this # afterwards (or by a template) or the first index will never rollover. + logger.trace("Putting rollover alias #{alias_target} for #{alias_name}, is this an ILM request?: #{is_ilm_request}, will we set rollover alias setting? #{ilm_set_rollover_alias}") client.rollover_alias_put(alias_target, alias_payload, is_ilm_request ? ilm_set_rollover_alias : false) unless client.rollover_alias_exists?(alias_name) return alias_name, alias_name From b20399079af18d80c1a3fee28ad09f6b8809a88a Mon Sep 17 00:00:00 2001 From: Casey Weed Date: Tue, 25 Jun 2019 14:18:42 -0400 Subject: [PATCH 14/16] Redo some logging statements, change order of operations for Improper alias name --- lib/logstash/outputs/elasticsearch/common.rb | 4 +++- lib/logstash/outputs/elasticsearch/http_client.rb | 6 +++--- lib/logstash/outputs/elasticsearch/ilm.rb | 11 ++++++++--- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch/common.rb b/lib/logstash/outputs/elasticsearch/common.rb index cd76624a5..7d883719c 100644 --- a/lib/logstash/outputs/elasticsearch/common.rb +++ b/lib/logstash/outputs/elasticsearch/common.rb @@ -35,6 +35,8 @@ def register @logger.debug("Caching seen/created aliases #{@ilm_cache_once ? 'once' : 'every bulk'}") end + @logger.trace("ilm_enabled: #{@ilm_enabled}, ro_only_enabled: #{@ro_only_enabled}") + setup_hosts # properly sets @hosts build_client setup_after_successful_connection @@ -195,7 +197,7 @@ def retrying_submit(actions) created_aliases[alias_name] = new_index params[:_index] = new_index rescue ::LogStash::Outputs::ElasticSearch::Ilm::ImproperAliasName => e - @logger.warn("Event alias name is not proper, using #{@ilm_rollover_alias} instead") + @logger.warn("Event alias name #{e.name} is not proper, using #{@ilm_rollover_alias} instead") created_aliases[e.name] = @ilm_rollover_alias params[:_index] = @ilm_rollover_alias rescue => e diff --git a/lib/logstash/outputs/elasticsearch/http_client.rb b/lib/logstash/outputs/elasticsearch/http_client.rb index d22fbbd51..4a2c4785b 100644 --- a/lib/logstash/outputs/elasticsearch/http_client.rb +++ b/lib/logstash/outputs/elasticsearch/http_client.rb @@ -361,9 +361,9 @@ def rollover_alias_exists?(name) # Create a new rollover alias def rollover_alias_put(alias_name, alias_definition, add_rollover_settings) begin - if add_rollover_settings + if add_rollover_settings == true real_alias_name, _ = alias_definition["aliases"].first - logger.debug("Adding lifecycle rollover_alias setting for #{real_alias_name}") + logger.debug("Adding lifecycle rollover_alias setting for #{alias_name} => #{real_alias_name}, add rollover was #{add_rollover_settings}") alias_definition.merge!({ 'settings' => { 'index.lifecycle.rollover_alias' => real_alias_name @@ -436,4 +436,4 @@ def update_action_builder(args, source) [args, source] end end -end end end \ No newline at end of file +end end end diff --git a/lib/logstash/outputs/elasticsearch/ilm.rb b/lib/logstash/outputs/elasticsearch/ilm.rb index 43b02fc2a..1cb97f1a7 100644 --- a/lib/logstash/outputs/elasticsearch/ilm.rb +++ b/lib/logstash/outputs/elasticsearch/ilm.rb @@ -98,7 +98,8 @@ def initialize(msg="Index not proper", name) def maybe_create_rollover_alias_for_event(event, created_aliases, is_ilm_request) alias_name = event.sprintf(ilm_event_alias) return alias_name, created_aliases[alias_name] if created_aliases.has_key?(alias_name) - raise ImproperAliasName.new(name=alias_name) if alias_name == ilm_event_alias + improper = alias_name == ilm_event_alias + alias_name = improper ? ilm_rollover_alias : alias_name alias_target = "<#{alias_name}-#{ilm_pattern}>" alias_payload = { 'aliases' => { @@ -109,8 +110,12 @@ def maybe_create_rollover_alias_for_event(event, created_aliases, is_ilm_request } # Without placing the settings on the index you'll need something to run by and add this # afterwards (or by a template) or the first index will never rollover. - logger.trace("Putting rollover alias #{alias_target} for #{alias_name}, is this an ILM request?: #{is_ilm_request}, will we set rollover alias setting? #{ilm_set_rollover_alias}") - client.rollover_alias_put(alias_target, alias_payload, is_ilm_request ? ilm_set_rollover_alias : false) unless client.rollover_alias_exists?(alias_name) + do_ilm_request = is_ilm_request ? ilm_set_rollover_alias : false + logger.trace("Putting rollover alias #{alias_target} for #{alias_name}, is this an ILM request?: #{is_ilm_request}, will we set the rollover alias setting? #{do_ilm_request}") + client.rollover_alias_put(alias_target, alias_payload, do_ilm_request) unless client.rollover_alias_exists?(alias_name) + + # Raise this afterwards, so we can store this properly as a broken alias + raise ImproperAliasName.new(name=event.sprintf(ilm_event_alias)) if improper return alias_name, alias_name end From 8e31f6e9d1a98a3f1441605f9c5fc91458dc711c Mon Sep 17 00:00:00 2001 From: Casey Weed Date: Tue, 25 Jun 2019 14:35:21 -0400 Subject: [PATCH 15/16] Use proper variables --- lib/logstash/outputs/elasticsearch/common.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch/common.rb b/lib/logstash/outputs/elasticsearch/common.rb index 7d883719c..8326b1e0d 100644 --- a/lib/logstash/outputs/elasticsearch/common.rb +++ b/lib/logstash/outputs/elasticsearch/common.rb @@ -23,15 +23,15 @@ def register @ilm_seen_aliases = {} # Can only be used if ILM not in use - if ro_only_enabled - if ilm_in_use? + if @ro_only_enabled + if @ilm_enabled raise LogStash::ConfigurationError, "ILM and preemptive RO creation cannot both be enabled." else @logger.debug("Preemptively creating rollover aliases") end end - if @ro_only_enabled || ilm_in_use? + if @ro_only_enabled || @ilm_enabled @logger.debug("Caching seen/created aliases #{@ilm_cache_once ? 'once' : 'every bulk'}") end From 5b0eb331dc79c6ccf43d13ce83fb11d215265765 Mon Sep 17 00:00:00 2001 From: Casey Weed Date: Tue, 25 Jun 2019 16:28:31 -0400 Subject: [PATCH 16/16] Fix up registration of ilm --- lib/logstash/outputs/elasticsearch/common.rb | 29 ++++++++----------- .../outputs/elasticsearch/common_configs.rb | 2 +- 2 files changed, 13 insertions(+), 18 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch/common.rb b/lib/logstash/outputs/elasticsearch/common.rb index 8326b1e0d..b89d8f2ff 100644 --- a/lib/logstash/outputs/elasticsearch/common.rb +++ b/lib/logstash/outputs/elasticsearch/common.rb @@ -22,27 +22,22 @@ def register @dlq_writer = dlq_enabled? ? execution_context.dlq_writer : nil @ilm_seen_aliases = {} - # Can only be used if ILM not in use - if @ro_only_enabled - if @ilm_enabled - raise LogStash::ConfigurationError, "ILM and preemptive RO creation cannot both be enabled." - else - @logger.debug("Preemptively creating rollover aliases") - end - end - - if @ro_only_enabled || @ilm_enabled - @logger.debug("Caching seen/created aliases #{@ilm_cache_once ? 'once' : 'every bulk'}") - end - - @logger.trace("ilm_enabled: #{@ilm_enabled}, ro_only_enabled: #{@ro_only_enabled}") - setup_hosts # properly sets @hosts build_client setup_after_successful_connection check_action_validity @bulk_request_metrics = metric.namespace(:bulk_requests) @document_level_metrics = metric.namespace(:documents) + + # Can only be used if ILM not in use + if @ro_only_enabled && ilm_in_use? + @logger.debug("Preemptively creating rollover aliases, not adding ILM aliases/policies/settings in the process") + end + + if @ro_only_enabled || ilm_in_use? + @logger.info("Caching seen/created aliases #{@ilm_cache_once ? 'once' : 'every bulk'}") + end + @logger.info("New Elasticsearch output", :class => self.class.name, :hosts => @hosts.map(&:sanitized).map(&:to_s)) end @@ -188,12 +183,12 @@ def retrying_submit(actions) # Using the hash let's us set improper aliases such as unsubstituted fields like # %{abc} to the default alias instead, then return immediately when found to avoid # raising the exception multiple times, as the exceptions are typically more expensive. - if (@ilm_enabled || @ro_only_enabled) && !@ilm_event_alias.nil? + if ilm_in_use? && !@ilm_event_alias.nil? created_aliases = @ilm_cache_once ? @ilm_seen_aliases : {} submit_actions.each do |action, params, event| if ['index', 'create'].include?(action) begin - alias_name, new_index = maybe_create_rollover_alias_for_event(event, created_aliases, @ilm_enabled) + alias_name, new_index = maybe_create_rollover_alias_for_event(event, created_aliases, !@ro_only_enabled) created_aliases[alias_name] = new_index params[:_index] = new_index rescue ::LogStash::Outputs::ElasticSearch::Ilm::ImproperAliasName => e diff --git a/lib/logstash/outputs/elasticsearch/common_configs.rb b/lib/logstash/outputs/elasticsearch/common_configs.rb index 231450acd..4e2975089 100644 --- a/lib/logstash/outputs/elasticsearch/common_configs.rb +++ b/lib/logstash/outputs/elasticsearch/common_configs.rb @@ -178,7 +178,7 @@ def self.included(mod) # Just like the ILM event substitution process, if anything does not match, it is thrown into the default alias set via # ilm_rollover_alias. - # Flag for enabling Rollover alias creation, conflicts with ILM. + # Flag for enabling Rollover alias creation, overrides ILM behavior. mod.config :ro_only_enabled, :validate => [true, false, 'true', 'false'], :default => false end end