diff --git a/CHANGELOG.md b/CHANGELOG.md index 126a8877..4deb5974 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 11.22.3 + - Fixes an issue where events containing non-unicode strings could fail to serialize correctly when compression is enabled [#1169](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1169) + ## 11.22.2 - [DOC] Add content for sending data to Elasticsearch on serverless [#1164](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1164) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index f25bee6f..7e554187 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -316,6 +316,11 @@ index level and `monitoring` permissions at cluster level. The `monitoring` permission at cluster level is necessary to perform periodic connectivity checks. +[id="plugins-{type}s-{plugin}-handling-non-utf-8"] +==== Handling non UTF-8 data + +This plugin transmits events to Elasticsearch using a JSON API, and therefore requires that all string values in events to be valid UTF-8. +When a string value on an event contains one or more byte sequences that are not valid in UTF-8, each offending byte sequence is replaced with the UTF-8 replacement character (`\uFFFD`). [id="plugins-{type}s-{plugin}-options"] ==== Elasticsearch Output Configuration Options diff --git a/lib/logstash/outputs/elasticsearch/http_client.rb b/lib/logstash/outputs/elasticsearch/http_client.rb index e0aa5ec9..e0b70e36 100644 --- a/lib/logstash/outputs/elasticsearch/http_client.rb +++ b/lib/logstash/outputs/elasticsearch/http_client.rb @@ -22,6 +22,7 @@ module LogStash; module Outputs; class ElasticSearch; # made sense. We picked one on the lowish side to not use too much heap. TARGET_BULK_BYTES = 20 * 1024 * 1024 # 20MiB + class HttpClient attr_reader :client, :options, :logger, :pool, :action_count, :recv_count # This is here in case we use DEFAULT_OPTIONS in the future @@ -37,7 +38,7 @@ class HttpClient # * `:user` - String. The user to use for authentication. # * `:password` - String. The password to use for authentication. # * `:timeout` - Float. A duration value, in seconds, after which a socket - # operation or request will be aborted if not yet successfull + # operation or request will be aborted if not yet successful # * `:client_settings` - a hash; see below for keys. # # The `client_settings` key is a has that can contain other settings: @@ -132,6 +133,9 @@ def bulk(actions) action.map {|line| LogStash::Json.dump(line)}.join("\n") : LogStash::Json.dump(action) as_json << "\n" + + as_json.scrub! # ensure generated JSON is valid UTF-8 + if (stream_writer.pos + as_json.bytesize) > TARGET_BULK_BYTES && stream_writer.pos > 0 stream_writer.flush # ensure writer has sync'd buffers before reporting sizes logger.debug("Sending partial bulk request for batch with one or more actions remaining.", @@ -496,5 +500,6 @@ def update_action_builder(args, source) end [args, source] end + end end end end diff --git a/logstash-output-elasticsearch.gemspec b/logstash-output-elasticsearch.gemspec index 320a2edd..aa1dc3ec 100644 --- a/logstash-output-elasticsearch.gemspec +++ b/logstash-output-elasticsearch.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-elasticsearch' - s.version = '11.22.2' + s.version = '11.22.3' s.licenses = ['apache-2.0'] s.summary = "Stores logs in Elasticsearch" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/integration/outputs/compressed_indexing_spec.rb b/spec/integration/outputs/compressed_indexing_spec.rb index ac0c2235..5d9afb85 100644 --- a/spec/integration/outputs/compressed_indexing_spec.rb +++ b/spec/integration/outputs/compressed_indexing_spec.rb @@ -11,10 +11,13 @@ [ {"http_compression" => true}, {"compression_level" => 1} ].each do |compression_config| describe "indexing with http_compression turned on", :integration => true do let(:event) { LogStash::Event.new("message" => "Hello World!", "type" => type) } + let(:event_with_invalid_utf_8_bytes) { LogStash::Event.new("message" => "Message from spacecraft which contains \xAC invalid \xD7 byte sequences.", "type" => type) } + let(:index) { 10.times.collect { rand(10).to_s }.join("") } let(:type) { ESHelper.es_version_satisfies?("< 7") ? "doc" : "_doc" } let(:event_count) { 10000 + rand(500) } - let(:events) { event_count.times.map { event }.to_a } + # mix the events with valid and invalid UTF-8 payloads + let(:events) { event_count.times.map { |i| i%3 == 0 ? event : event_with_invalid_utf_8_bytes }.to_a } let(:config) { { "hosts" => get_host_port, diff --git a/spec/unit/outputs/elasticsearch/http_client_spec.rb b/spec/unit/outputs/elasticsearch/http_client_spec.rb index e7d3c004..d4cee8f3 100644 --- a/spec/unit/outputs/elasticsearch/http_client_spec.rb +++ b/spec/unit/outputs/elasticsearch/http_client_spec.rb @@ -242,12 +242,14 @@ end end - context "with two messages" do - let(:message1) { "hey" } - let(:message2) { "you" } + context "with multiple messages" do + let(:message_head) { "Spacecraft message" } + let(:message_tail) { "byte sequence" } + let(:invalid_utf_8_message) { "contains invalid \xAC" } let(:actions) { [ - ["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message1}], - ["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message2}], + ["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message_head}], + ["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> invalid_utf_8_message}], + ["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message_tail}], ]} it "executes one bulk_send operation" do allow(subject).to receive(:join_bulk_responses) @@ -257,7 +259,7 @@ context "if one exceeds TARGET_BULK_BYTES" do let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES } - let(:message1) { "a" * (target_bulk_bytes + 1) } + let(:message_head) { "a" * (target_bulk_bytes + 1) } it "executes two bulk_send operations" do allow(subject).to receive(:join_bulk_responses) expect(subject).to receive(:bulk_send).twice