diff --git a/lib/logstash/outputs/elasticsearch/common.rb b/lib/logstash/outputs/elasticsearch/common.rb index b94c911ac..9e51ff0fa 100644 --- a/lib/logstash/outputs/elasticsearch/common.rb +++ b/lib/logstash/outputs/elasticsearch/common.rb @@ -229,6 +229,8 @@ def retrying_submit(actions) sleep_interval = @retry_initial_interval + failed_retries = 0 + while submit_actions && submit_actions.length > 0 # We retry with whatever is didn't succeed @@ -247,6 +249,16 @@ def retrying_submit(actions) # Everything was a success! break if !submit_actions || submit_actions.empty? + failed_retries += 1 + + if @retry_max_failures > 0 && failed_retries == @retry_max_failures + @logger.warn("Giving up on individual bulk actions that failed or were rejected by the previous bulk request.", + :actions => submit_actions.map { |action_type, params, event| [action_type, params, event.to_s] }, + :retries => failed_retries, + :count => submit_actions.size) + break + end + # If we're retrying the action sleep for the recommended interval # Double the interval for the next time through to achieve exponential backoff Stud.stoppable_sleep(sleep_interval) { @stopping.true? } diff --git a/lib/logstash/outputs/elasticsearch/common_configs.rb b/lib/logstash/outputs/elasticsearch/common_configs.rb index d6abdeb2a..483dcd493 100644 --- a/lib/logstash/outputs/elasticsearch/common_configs.rb +++ b/lib/logstash/outputs/elasticsearch/common_configs.rb @@ -136,6 +136,9 @@ def self.included(mod) # Set max interval in seconds between bulk retries. mod.config :retry_max_interval, :validate => :number, :default => 64 + # The number of times we should retry on failed individual bulk actions (0 = infinity) + mod.config :retry_max_failures, :validate => :number, :default => 0 + # The number of times Elasticsearch should internally retry an update/upserted document # See the https://www.elastic.co/guide/en/elasticsearch/guide/current/partial-updates.html[partial updates] # for more info