diff --git a/.ci/docker-setup.sh b/.ci/docker-setup.sh index 08bd3585..8e03b1c9 100755 --- a/.ci/docker-setup.sh +++ b/.ci/docker-setup.sh @@ -48,10 +48,10 @@ if [ "$ELASTIC_STACK_VERSION" ]; then if [ "$INTEGRATION" == "true" ]; then docker-compose down - docker-compose build + docker-compose build --quiet else docker-compose down - docker-compose build logstash + docker-compose build logstash --quiet fi else echo "Please set the ELASTIC_STACK_VERSION environment variable" diff --git a/CHANGELOG.md b/CHANGELOG.md index 1071291d..c193b44c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 11.4.2 + - Fixes an issue where events containing non-unicode strings could fail to serialize correctly when compression is enabled [#1169](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1169) + ## 11.4.1 - Feat: upgrade manticore (http-client) library [#1063](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1063) - the underlying changes include latest HttpClient (4.5.13) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 85be8c8f..306aa6d3 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -293,6 +293,11 @@ index level and `monitoring` permissions at cluster level. The `monitoring` permission at cluster level is necessary to perform periodic connectivity checks. +[id="plugins-{type}s-{plugin}-handling-non-utf-8"] +==== Handling non UTF-8 data + +This plugin transmits events to Elasticsearch using a JSON API, and therefore requires that all string values in events to be valid UTF-8. +When a string value on an event contains one or more byte sequences that are not valid in UTF-8, each offending byte sequence is replaced with the UTF-8 replacement character (`\uFFFD`). [id="plugins-{type}s-{plugin}-options"] ==== Elasticsearch Output Configuration Options diff --git a/lib/logstash/outputs/elasticsearch/http_client.rb b/lib/logstash/outputs/elasticsearch/http_client.rb index 9efaeaf1..ee2c9477 100644 --- a/lib/logstash/outputs/elasticsearch/http_client.rb +++ b/lib/logstash/outputs/elasticsearch/http_client.rb @@ -127,6 +127,9 @@ def bulk(actions) action.map {|line| LogStash::Json.dump(line)}.join("\n") : LogStash::Json.dump(action) as_json << "\n" + + as_json.scrub! # ensure generated JSON is valid UTF-8 + if (stream_writer.pos + as_json.bytesize) > TARGET_BULK_BYTES && stream_writer.pos > 0 stream_writer.flush # ensure writer has sync'd buffers before reporting sizes logger.debug("Sending partial bulk request for batch with one or more actions remaining.", diff --git a/logstash-output-elasticsearch.gemspec b/logstash-output-elasticsearch.gemspec index eb33f432..2be78537 100644 --- a/logstash-output-elasticsearch.gemspec +++ b/logstash-output-elasticsearch.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-elasticsearch' - s.version = '11.4.1' + s.version = '11.4.2' s.licenses = ['apache-2.0'] s.summary = "Stores logs in Elasticsearch" diff --git a/spec/integration/outputs/compressed_indexing_spec.rb b/spec/integration/outputs/compressed_indexing_spec.rb index 4a57f323..37475031 100644 --- a/spec/integration/outputs/compressed_indexing_spec.rb +++ b/spec/integration/outputs/compressed_indexing_spec.rb @@ -10,10 +10,12 @@ describe "indexing with http_compression turned on", :integration => true do let(:event) { LogStash::Event.new("message" => "Hello World!", "type" => type) } + let(:event_with_invalid_utf_8_bytes) { LogStash::Event.new("message" => "Message from spacecraft which contains \xAC invalid \xD7 byte sequences.", "type" => type) } let(:index) { 10.times.collect { rand(10).to_s }.join("") } let(:type) { ESHelper.es_version_satisfies?("< 7") ? "doc" : "_doc" } let(:event_count) { 10000 + rand(500) } - let(:events) { event_count.times.map { event }.to_a } + # mix the events with valid and invalid UTF-8 payloads + let(:events) { event_count.times.map { |i| i%3 == 0 ? event : event_with_invalid_utf_8_bytes }.to_a } let(:config) { { "hosts" => get_host_port, diff --git a/spec/unit/outputs/elasticsearch/http_client_spec.rb b/spec/unit/outputs/elasticsearch/http_client_spec.rb index 16b84424..15774d8e 100644 --- a/spec/unit/outputs/elasticsearch/http_client_spec.rb +++ b/spec/unit/outputs/elasticsearch/http_client_spec.rb @@ -243,12 +243,14 @@ end end - context "with two messages" do - let(:message1) { "hey" } - let(:message2) { "you" } + context "with multiple messages" do + let(:message_head) { "Spacecraft message" } + let(:message_tail) { "byte sequence" } + let(:invalid_utf_8_message) { "contains invalid \xAC" } let(:actions) { [ - ["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message1}], - ["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message2}], + ["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message_head}], + ["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> invalid_utf_8_message}], + ["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message_tail}], ]} it "executes one bulk_send operation" do allow(subject).to receive(:join_bulk_responses) @@ -258,7 +260,7 @@ context "if one exceeds TARGET_BULK_BYTES" do let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES } - let(:message1) { "a" * (target_bulk_bytes + 1) } + let(:message_head) { "a" * (target_bulk_bytes + 1) } it "executes two bulk_send operations" do allow(subject).to receive(:join_bulk_responses) expect(subject).to receive(:bulk_send).twice