Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When event JSON data contains non UTF-8 invalid bytes, replace with replacement characters. #1169

Merged
merged 8 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 11.22.3
- Fixes the buggy behavior when JSON events contain non UTF-8 bytes. [#1169](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1169)
mashhurs marked this conversation as resolved.
Show resolved Hide resolved

## 11.22.2
- [DOC] Add content for sending data to Elasticsearch on serverless [#1164](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1164)

Expand Down
5 changes: 5 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,11 @@ index level and `monitoring` permissions at cluster level. The `monitoring`
permission at cluster level is necessary to perform periodic connectivity
checks.

[id="plugins-{type}s-{plugin}-handling-non-utf-8"]
==== Handling non UTF-8 data

The plugin assumes that incoming JSON data is in UTF-8 shape.
If event JSON data contains invalid UTF-8 byte sequence(s), the plugin replaces with replacement (`\uFFFD`) character.
mashhurs marked this conversation as resolved.
Show resolved Hide resolved

[id="plugins-{type}s-{plugin}-options"]
==== Elasticsearch Output Configuration Options
Expand Down
17 changes: 16 additions & 1 deletion lib/logstash/outputs/elasticsearch/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ module LogStash; module Outputs; class ElasticSearch;
# made sense. We picked one on the lowish side to not use too much heap.
TARGET_BULK_BYTES = 20 * 1024 * 1024 # 20MiB

# a replacement for invalid bytes in JSON string of the event
REPLACEMENT_CHARACTER = "\uFFFD".freeze
mashhurs marked this conversation as resolved.
Show resolved Hide resolved

class HttpClient
attr_reader :client, :options, :logger, :pool, :action_count, :recv_count
# This is here in case we use DEFAULT_OPTIONS in the future
Expand All @@ -37,7 +40,7 @@ class HttpClient
# * `:user` - String. The user to use for authentication.
# * `:password` - String. The password to use for authentication.
# * `:timeout` - Float. A duration value, in seconds, after which a socket
# operation or request will be aborted if not yet successfull
# operation or request will be aborted if not yet successful
# * `:client_settings` - a hash; see below for keys.
#
# The `client_settings` key is a has that can contain other settings:
Expand Down Expand Up @@ -132,6 +135,9 @@ def bulk(actions)
action.map {|line| LogStash::Json.dump(line)}.join("\n") :
LogStash::Json.dump(action)
as_json << "\n"

replace_invalid_bytes!(as_json)
mashhurs marked this conversation as resolved.
Show resolved Hide resolved

if (stream_writer.pos + as_json.bytesize) > TARGET_BULK_BYTES && stream_writer.pos > 0
stream_writer.flush # ensure writer has sync'd buffers before reporting sizes
logger.debug("Sending partial bulk request for batch with one or more actions remaining.",
Expand Down Expand Up @@ -496,5 +502,14 @@ def update_action_builder(args, source)
end
[args, source]
end

private
# Replaces invalid byte sequences of given JSON string with replacements character
def replace_invalid_bytes!(json_string)
has_invalid_bytes = false
json_string.scrub! { |_| has_invalid_bytes = true; REPLACEMENT_CHARACTER }
logger.debug("The event invalid bytes got replaced by replacement character") if has_invalid_bytes
mashhurs marked this conversation as resolved.
Show resolved Hide resolved
end

end
end end end
2 changes: 1 addition & 1 deletion logstash-output-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-elasticsearch'
s.version = '11.22.2'
s.version = '11.22.3'
s.licenses = ['apache-2.0']
s.summary = "Stores logs in Elasticsearch"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
5 changes: 4 additions & 1 deletion spec/integration/outputs/compressed_indexing_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@
[ {"http_compression" => true}, {"compression_level" => 1} ].each do |compression_config|
describe "indexing with http_compression turned on", :integration => true do
let(:event) { LogStash::Event.new("message" => "Hello World!", "type" => type) }
let(:event_with_invalid_utf_8_bytes) { LogStash::Event.new("message" => "Message from spacecraft which contains \xAC invalid \xD7 byte sequences.", "type" => type) }

let(:index) { 10.times.collect { rand(10).to_s }.join("") }
let(:type) { ESHelper.es_version_satisfies?("< 7") ? "doc" : "_doc" }
let(:event_count) { 10000 + rand(500) }
let(:events) { event_count.times.map { event }.to_a }
# mix the events with valid and invalid UTF-8 payloads
let(:events) { event_count.times.map do |i| i%3 == 0 ? event : event_with_invalid_utf_8_bytes end.to_a }
mashhurs marked this conversation as resolved.
Show resolved Hide resolved
let(:config) {
{
"hosts" => get_host_port,
Expand Down
14 changes: 8 additions & 6 deletions spec/unit/outputs/elasticsearch/http_client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,14 @@
end
end

context "with two messages" do
let(:message1) { "hey" }
let(:message2) { "you" }
context "with multiple messages" do
let(:message_head) { "Spacecraft message" }
let(:message_tail) { "byte sequence" }
let(:invalid_utf_8_message) { "contains invalid \xAC" }
let(:actions) { [
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message1}],
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message2}],
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message_head}],
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> invalid_utf_8_message}],
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message_tail}],
]}
it "executes one bulk_send operation" do
allow(subject).to receive(:join_bulk_responses)
Expand All @@ -257,7 +259,7 @@

context "if one exceeds TARGET_BULK_BYTES" do
let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES }
let(:message1) { "a" * (target_bulk_bytes + 1) }
let(:message_head) { "a" * (target_bulk_bytes + 1) }
it "executes two bulk_send operations" do
allow(subject).to receive(:join_bulk_responses)
expect(subject).to receive(:bulk_send).twice
Expand Down