Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ILM event substitution PoC #882

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
14 changes: 7 additions & 7 deletions lib/logstash/outputs/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# called `http.content_type.required`. If this option is set to `true`, and you
# are using Logstash 2.4 through 5.2, you need to update the Elasticsearch output
# plugin to version 6.2.5 or higher.
#
#
# ================================================================================
#
# This plugin is the recommended method of storing logs in Elasticsearch.
Expand All @@ -26,8 +26,8 @@
# This output only speaks the HTTP protocol. HTTP is the preferred protocol for interacting with Elasticsearch as of Logstash 2.0.
# We strongly encourage the use of HTTP over the node protocol for a number of reasons. HTTP is only marginally slower,
# yet far easier to administer and work with. When using the HTTP protocol one may upgrade Elasticsearch versions without having
# to upgrade Logstash in lock-step.
#
# to upgrade Logstash in lock-step.
#
# You can learn more about Elasticsearch at <https://www.elastic.co/products/elasticsearch>
#
# ==== Template management for Elasticsearch 5.x
Expand Down Expand Up @@ -75,12 +75,12 @@
#
# ==== HTTP Compression
#
# This plugin supports request and response compression. Response compression is enabled by default and
# for Elasticsearch versions 5.0 and later, the user doesn't have to set any configs in Elasticsearch for
# it to send back compressed response. For versions before 5.0, `http.compression` must be set to `true` in
# This plugin supports request and response compression. Response compression is enabled by default and
# for Elasticsearch versions 5.0 and later, the user doesn't have to set any configs in Elasticsearch for
# it to send back compressed response. For versions before 5.0, `http.compression` must be set to `true` in
# Elasticsearch[https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-http.html#modules-http] to take advantage of response compression when using this plugin
#
# For requests compression, regardless of the Elasticsearch version, users have to enable `http_compression`
# For requests compression, regardless of the Elasticsearch version, users have to enable `http_compression`
# setting in their Logstash config file.
#
class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
Expand Down
38 changes: 38 additions & 0 deletions lib/logstash/outputs/elasticsearch/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,24 @@ def register
@stopping = Concurrent::AtomicBoolean.new(false)
# To support BWC, we check if DLQ exists in core (< 5.4). If it doesn't, we use nil to resort to previous behavior.
@dlq_writer = dlq_enabled? ? execution_context.dlq_writer : nil
@ilm_seen_aliases = {}

setup_hosts # properly sets @hosts
build_client
setup_after_successful_connection
check_action_validity
@bulk_request_metrics = metric.namespace(:bulk_requests)
@document_level_metrics = metric.namespace(:documents)

# Can only be used if ILM not in use
if @ro_only_enabled && ilm_in_use?
@logger.debug("Preemptively creating rollover aliases, not adding ILM aliases/policies/settings in the process")
end

if @ro_only_enabled || ilm_in_use?
@logger.info("Caching seen/created aliases #{@ilm_cache_once ? 'once' : 'every bulk'}")
end

@logger.info("New Elasticsearch output", :class => self.class.name, :hosts => @hosts.map(&:sanitized).map(&:to_s))
end

Expand Down Expand Up @@ -166,6 +177,33 @@ def retrying_submit(actions)

# We retry with whatever is didn't succeed
begin

# Create alias(es) before bulking, ensuring rollover aliases exist.
#
# Using the hash let's us set improper aliases such as unsubstituted fields like
# %{abc} to the default alias instead, then return immediately when found to avoid
# raising the exception multiple times, as the exceptions are typically more expensive.
if ilm_in_use? && !@ilm_event_alias.nil?
created_aliases = @ilm_cache_once ? @ilm_seen_aliases : {}
submit_actions.each do |action, params, event|
if ['index', 'create'].include?(action)
begin
alias_name, new_index = maybe_create_rollover_alias_for_event(event, created_aliases, !@ro_only_enabled)
created_aliases[alias_name] = new_index
params[:_index] = new_index
rescue ::LogStash::Outputs::ElasticSearch::Ilm::ImproperAliasName => e
@logger.warn("Event alias name #{e.name} is not proper, using #{@ilm_rollover_alias} instead")
created_aliases[e.name] = @ilm_rollover_alias
params[:_index] = @ilm_rollover_alias
rescue => e
@logger.warn("Unknown error on creating event alias, #{e}, using #{@ilm_rollover_alias} instead")
params[:_index] = @ilm_rollover_alias
end
end
end
@logger.trace("Created/cached aliases: #{created_aliases.to_s}")
end

submit_actions = submit(submit_actions)
if submit_actions && submit_actions.size > 0
@logger.info("Retrying individual bulk actions that failed or were rejected by the previous bulk request.", :count => submit_actions.size)
Expand Down
33 changes: 29 additions & 4 deletions lib/logstash/outputs/elasticsearch/common_configs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ def self.included(mod)
# Joda formats are defined http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html[here].
mod.config :index, :validate => :string, :default => DEFAULT_INDEX_NAME

mod.config :document_type,
:validate => :string,
mod.config :document_type,
:validate => :string,
:deprecated => "Document types are being deprecated in Elasticsearch 6.0, and removed entirely in 7.0. You should avoid this feature"

# From Logstash 1.3 onwards, a template is applied to Elasticsearch during
Expand Down Expand Up @@ -67,7 +67,7 @@ def self.included(mod)
# The version to use for indexing. Use sprintf syntax like `%{my_version}` to use a field value here.
# See https://www.elastic.co/blog/elasticsearch-versioning-support.
mod.config :version, :validate => :string

# The version_type to use for indexing.
# See https://www.elastic.co/blog/elasticsearch-versioning-support.
# See also https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#_version_types
Expand All @@ -93,7 +93,7 @@ def self.included(mod)
# `["https://127.0.0.1:9200/mypath"]` (If using a proxy on a subpath)
# It is important to exclude http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-node.html[dedicated master nodes] from the `hosts` list
# to prevent LS from sending bulk requests to the master nodes. So this parameter should only reference either data or client nodes in Elasticsearch.
#
#
# Any special characters present in the URLs here MUST be URL escaped! This means `#` should be put in as `%23` for instance.
mod.config :hosts, :validate => :uri, :default => [::LogStash::Util::SafeURI.new("//127.0.0.1")], :list => true

Expand Down Expand Up @@ -155,6 +155,31 @@ def self.included(mod)
# ILM policy to use, if undefined the default policy will be used.
mod.config :ilm_policy, :validate => :string, :default => DEFAULT_POLICY

# Add index.lifecycle.rollover_alias setting to alias upon creation, always done in the case of aliases created via the ilm_event_alias setting.
mod.config :ilm_set_rollover_alias, :validate => [true, false, 'true', 'false'], :default => false

# Use this for event substitution aliases, ilm_rollover_alias is used as a default in the event the field is missing or other unknown errors.
# Substituion syntax will work here, e.g. "%{my_fantastic_field_here}-alias".
mod.config :ilm_event_alias, :validate => :string

# Cache seen/created aliases and their destinations once, not during each bulk batch.
mod.config :ilm_cache_once, :validate => [true, false, 'true', 'false'], :default => false


# -----
# Rollover Alias Creation
# -----

# Reuses ilm_event_alias, ilm_pattern, ilm_cache_once as only ILM or RO can be used at one time. Preemptive RO creation
# follows all the same processes that the ILM tweaks does, however, it does not insert the lifecycle alias setting or policy.
# It is understood that this to bootstrap the rollover alias and index before indexing so that external tools can manage
# the actual rollover process, while ILM can manage the other lifecycle phases.
#
# Just like the ILM event substitution process, if anything does not match, it is thrown into the default alias set via
# ilm_rollover_alias.

# Flag for enabling Rollover alias creation, overrides ILM behavior.
mod.config :ro_only_enabled, :validate => [true, false, 'true', 'false'], :default => false
end
end
end end end
25 changes: 16 additions & 9 deletions lib/logstash/outputs/elasticsearch/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ class HttpClient
# :setting => value
# }

#
# The `options` is a hash where the following symbol keys have meaning:
#
# * `:hosts` - array of String. Set a list of hosts to use for communication.
Expand Down Expand Up @@ -110,7 +109,7 @@ def bulk(actions)
if http_compression
body_stream.set_encoding "BINARY"
stream_writer = Zlib::GzipWriter.new(body_stream, Zlib::DEFAULT_COMPRESSION, Zlib::DEFAULT_STRATEGY)
else
else
stream_writer = body_stream
end
bulk_responses = []
Expand Down Expand Up @@ -215,7 +214,7 @@ def scheme
else
nil
end

calculated_scheme = calculate_property(uris, :scheme, explicit_scheme, sniffing)

if calculated_scheme && calculated_scheme !~ /https?/
Expand All @@ -235,7 +234,7 @@ def port
# Enter things like foo:123, bar and wind up with foo:123, bar:9200
calculate_property(uris, :port, nil, sniffing) || 9200
end

def uris
@options[:hosts]
end
Expand All @@ -254,7 +253,7 @@ def http_compression

def build_adapter(options)
timeout = options[:timeout] || 0

adapter_options = {
:socket_timeout => timeout,
:request_timeout => timeout,
Expand All @@ -281,7 +280,7 @@ def build_adapter(options)
adapter_class = ::LogStash::Outputs::ElasticSearch::HttpClient::ManticoreAdapter
adapter = adapter_class.new(@logger, adapter_options)
end

def build_pool(options)
adapter = build_adapter(options)

Expand Down Expand Up @@ -331,7 +330,7 @@ def host_to_url(h)
h.query
end
prefixed_raw_query = raw_query && !raw_query.empty? ? "?#{raw_query}" : nil

raw_url = "#{raw_scheme}://#{postfixed_userinfo}#{raw_host}:#{raw_port}#{prefixed_raw_path}#{prefixed_raw_query}"

::LogStash::Util::SafeURI.new(raw_url)
Expand Down Expand Up @@ -360,9 +359,17 @@ def rollover_alias_exists?(name)
end

# Create a new rollover alias
def rollover_alias_put(alias_name, alias_definition)
logger.info("Creating rollover alias #{alias_name}")
def rollover_alias_put(alias_name, alias_definition, add_rollover_settings)
begin
if add_rollover_settings == true
real_alias_name, _ = alias_definition["aliases"].first
logger.debug("Adding lifecycle rollover_alias setting for #{alias_name} => #{real_alias_name}, add rollover was #{add_rollover_settings}")
alias_definition.merge!({
'settings' => {
'index.lifecycle.rollover_alias' => real_alias_name
}
})
end
@pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition))
# If the rollover alias already exists, ignore the error that comes back from Elasticsearch
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

module LogStash; module Outputs; class ElasticSearch; class HttpClient;
DEFAULT_HEADERS = { "Content-Type" => "application/json" }

class ManticoreAdapter
attr_reader :manticore, :logger

Expand All @@ -18,14 +18,14 @@ def initialize(logger, options={})
options[:cookies] = false

@client_params = {:headers => DEFAULT_HEADERS.merge(options[:headers] || {})}

if options[:proxy]
options[:proxy] = manticore_proxy_hash(options[:proxy])
end

@manticore = ::Manticore::Client.new(options)
end

# Transform the proxy option to a hash. Manticore's support for non-hash
# proxy options is broken. This was fixed in https://github.com/cheald/manticore/commit/34a00cee57a56148629ed0a47c329181e7319af5
# but this is not yet released
Expand Down Expand Up @@ -55,12 +55,12 @@ def perform_request(url, method, path, params={}, body=nil)
params[:body] = body if body

if url.user
params[:auth] = {
params[:auth] = {
:user => CGI.unescape(url.user),
# We have to unescape the password here since manticore won't do it
# for us unless its part of the URL
:password => CGI.unescape(url.password),
:eager => true
:password => CGI.unescape(url.password),
:eager => true
}
end

Expand All @@ -86,7 +86,7 @@ def perform_request(url, method, path, params={}, body=nil)
# Returned urls from this method should be checked for double escaping.
def format_url(url, path_and_query=nil)
request_uri = url.clone

# We excise auth info from the URL in case manticore itself tries to stick
# sensitive data in a thrown exception or log data
request_uri.user = nil
Expand All @@ -102,7 +102,7 @@ def format_url(url, path_and_query=nil)
new_query_parts = [request_uri.query, parsed_path_and_query.query].select do |part|
part && !part.empty? # Skip empty nil and ""
end

request_uri.query = new_query_parts.join("&") unless new_query_parts.empty?

# use `raw_path`` as `path` will unescape any escaped '/' in the path
Expand Down
39 changes: 36 additions & 3 deletions lib/logstash/outputs/elasticsearch/ilm.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,41 @@ def maybe_create_ilm_policy
end
end

class ImproperAliasName < StandardError
attr_reader :name
def initialize(msg="Index not proper", name)
@name = name
super(msg)
end
end

def maybe_create_rollover_alias_for_event(event, created_aliases, is_ilm_request)
alias_name = event.sprintf(ilm_event_alias)
return alias_name, created_aliases[alias_name] if created_aliases.has_key?(alias_name)
improper = alias_name == ilm_event_alias
alias_name = improper ? ilm_rollover_alias : alias_name
alias_target = "<#{alias_name}-#{ilm_pattern}>"
alias_payload = {
'aliases' => {
alias_name => {
'is_write_index' => true
}
}
}
# Without placing the settings on the index you'll need something to run by and add this
# afterwards (or by a template) or the first index will never rollover.
do_ilm_request = is_ilm_request ? ilm_set_rollover_alias : false
logger.trace("Putting rollover alias #{alias_target} for #{alias_name}, is this an ILM request?: #{is_ilm_request}, will we set the rollover alias setting? #{do_ilm_request}")
client.rollover_alias_put(alias_target, alias_payload, do_ilm_request) unless client.rollover_alias_exists?(alias_name)

# Raise this afterwards, so we can store this properly as a broken alias
raise ImproperAliasName.new(name=event.sprintf(ilm_event_alias)) if improper

return alias_name, alias_name
end

def maybe_create_rollover_alias
client.rollover_alias_put(rollover_alias_target, rollover_alias_payload) unless client.rollover_alias_exists?(ilm_rollover_alias)
client.rollover_alias_put(rollover_alias_target, rollover_alias_payload, ilm_set_rollover_alias) unless client.rollover_alias_exists?(ilm_rollover_alias)
end

def rollover_alias_target
Expand All @@ -98,7 +131,7 @@ def rollover_alias_target
def rollover_alias_payload
{
'aliases' => {
ilm_rollover_alias =>{
ilm_rollover_alias => {
'is_write_index' => true
}
}
Expand All @@ -110,4 +143,4 @@ def policy_payload
LogStash::Json.load(::IO.read(policy_path))
end
end
end end end
end end end