Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(backport: 11.4 for LS7.x) Fixes an issue where events containing non-unicode strings. #1171

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .ci/docker-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ if [ "$ELASTIC_STACK_VERSION" ]; then

if [ "$INTEGRATION" == "true" ]; then
docker-compose down
docker-compose build
docker-compose build --quiet
else
docker-compose down
docker-compose build logstash
docker-compose build logstash --quiet
fi
else
echo "Please set the ELASTIC_STACK_VERSION environment variable"
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 11.4.2
- Fixes an issue where events containing non-unicode strings could fail to serialize correctly when compression is enabled [#1169](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1169)

## 11.4.1
- Feat: upgrade manticore (http-client) library [#1063](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1063)
- the underlying changes include latest HttpClient (4.5.13)
Expand Down
5 changes: 5 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,11 @@ index level and `monitoring` permissions at cluster level. The `monitoring`
permission at cluster level is necessary to perform periodic connectivity
checks.

[id="plugins-{type}s-{plugin}-handling-non-utf-8"]
==== Handling non UTF-8 data

This plugin transmits events to Elasticsearch using a JSON API, and therefore requires that all string values in events to be valid UTF-8.
When a string value on an event contains one or more byte sequences that are not valid in UTF-8, each offending byte sequence is replaced with the UTF-8 replacement character (`\uFFFD`).

[id="plugins-{type}s-{plugin}-options"]
==== Elasticsearch Output Configuration Options
Expand Down
3 changes: 3 additions & 0 deletions lib/logstash/outputs/elasticsearch/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ def bulk(actions)
action.map {|line| LogStash::Json.dump(line)}.join("\n") :
LogStash::Json.dump(action)
as_json << "\n"

as_json.scrub! # ensure generated JSON is valid UTF-8

if (stream_writer.pos + as_json.bytesize) > TARGET_BULK_BYTES && stream_writer.pos > 0
stream_writer.flush # ensure writer has sync'd buffers before reporting sizes
logger.debug("Sending partial bulk request for batch with one or more actions remaining.",
Expand Down
2 changes: 1 addition & 1 deletion logstash-output-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-elasticsearch'
s.version = '11.4.1'
s.version = '11.4.2'

s.licenses = ['apache-2.0']
s.summary = "Stores logs in Elasticsearch"
Expand Down
4 changes: 3 additions & 1 deletion spec/integration/outputs/compressed_indexing_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@

describe "indexing with http_compression turned on", :integration => true do
let(:event) { LogStash::Event.new("message" => "Hello World!", "type" => type) }
let(:event_with_invalid_utf_8_bytes) { LogStash::Event.new("message" => "Message from spacecraft which contains \xAC invalid \xD7 byte sequences.", "type" => type) }
let(:index) { 10.times.collect { rand(10).to_s }.join("") }
let(:type) { ESHelper.es_version_satisfies?("< 7") ? "doc" : "_doc" }
let(:event_count) { 10000 + rand(500) }
let(:events) { event_count.times.map { event }.to_a }
# mix the events with valid and invalid UTF-8 payloads
let(:events) { event_count.times.map { |i| i%3 == 0 ? event : event_with_invalid_utf_8_bytes }.to_a }
let(:config) {
{
"hosts" => get_host_port,
Expand Down
14 changes: 8 additions & 6 deletions spec/unit/outputs/elasticsearch/http_client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -243,12 +243,14 @@
end
end

context "with two messages" do
let(:message1) { "hey" }
let(:message2) { "you" }
context "with multiple messages" do
let(:message_head) { "Spacecraft message" }
let(:message_tail) { "byte sequence" }
let(:invalid_utf_8_message) { "contains invalid \xAC" }
let(:actions) { [
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message1}],
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message2}],
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message_head}],
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> invalid_utf_8_message}],
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message_tail}],
]}
it "executes one bulk_send operation" do
allow(subject).to receive(:join_bulk_responses)
Expand All @@ -258,7 +260,7 @@

context "if one exceeds TARGET_BULK_BYTES" do
let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES }
let(:message1) { "a" * (target_bulk_bytes + 1) }
let(:message_head) { "a" * (target_bulk_bytes + 1) }
it "executes two bulk_send operations" do
allow(subject).to receive(:join_bulk_responses)
expect(subject).to receive(:bulk_send).twice
Expand Down