Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add filter_path to bulk messages #1154

Merged
merged 6 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 11.19.0
- Added `filter_path` to bulk requests to reduce the size of responses from elasticsearch
robbavey marked this conversation as resolved.
Show resolved Hide resolved

## 11.18.0
- Added request header `Elastic-Api-Version` for serverless [#1147](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1147)

Expand Down
6 changes: 5 additions & 1 deletion lib/logstash/outputs/elasticsearch/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,11 @@ def join_bulk_responses(bulk_responses)
end

def bulk_send(body_stream, batch_actions)
params = compression_level? ? {:headers => {"Content-Encoding" => "gzip"}} : {}
params = {
:query => {"filter_path" => "errors,items.*.error,items.*.status"}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the only side effect here is that a user setting bulk_path => "/filter_path=errors" for some strange reason may see their setting overwritten

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wondered about that, and was trying to think of a valid reason why they would change the filter path, other than to filter the response to reduce the payload - this feels like it is an implementation details, which I think we only exposed for the monitoring use case, and the payload reduction we already covered.

Possibly debugging?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like manticore (and httpclient) will allow both to coexist:

Manticore.get("http://localhost:3333/?q=test", query: {q: "kittens"}).body

causes:

❯ nc -l 3333           
GET /?q=test&q=kittens HTTP/1.1
Connection: Keep-Alive
Content-Length: 0
Host: localhost:3333
User-Agent: Manticore 0.9.1
Accept-Encoding: gzip,deflate

So it depends how ES chooses to treat these cases since the RFC doesn't seem to prohibit this case, nor explain what the server should do.
Either way we should confirm how ES handles this situation and add a note to the docs about how query parameters in bulk_path interact with parameters, w/ a special mention about filter_path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we have options

  • Keep it as it is in the PR
  • Detect filter_path in the existing URL and use that instead of the new setting, and log appropriately.
  • Detect filter_path in the existing URL and drop it, enforcing the new setting, and log appropriately.

I'm easy with any of them, as long as we document it clearly

}
params[:headers] = {"Content-Encoding" => "gzip"} if compression_level?

response = @pool.post(@bulk_path, params, body_stream.string)

@bulk_response_metrics.increment(response.code.to_s)
Expand Down
2 changes: 1 addition & 1 deletion logstash-output-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-elasticsearch'
s.version = '11.18.0'
s.version = '11.19.0'
s.licenses = ['apache-2.0']
s.summary = "Stores logs in Elasticsearch"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
2 changes: 1 addition & 1 deletion spec/integration/outputs/compressed_indexing_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@

it "sets the correct content-encoding header and body is compressed" do
expect(subject.client.pool.adapter.client).to receive(:send).
with(anything, anything, {:headers=>{"Content-Encoding"=>"gzip", "Content-Type"=>"application/json"}, :body => a_valid_gzip_encoded_string}).
with(anything, anything, {:query=>{"filter_path"=>"errors,items.*.error,items.*.status"}, :headers=>{"Content-Encoding"=>"gzip", "Content-Type"=>"application/json"}, :body => a_valid_gzip_encoded_string}).
and_call_original
subject.multi_receive(events)
end
Expand Down
58 changes: 51 additions & 7 deletions spec/integration/outputs/index_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def curl_and_get_json_response(url, method: :get, retrieve_err_payload: false);
let(:config) { "not implemented" }
let(:events) { event_count.times.map { event }.to_a }
subject { LogStash::Outputs::ElasticSearch.new(config) }

let(:es_url) { "http://#{get_host_port}" }
let(:index_url) { "#{es_url}/#{index}" }

Expand All @@ -178,7 +178,7 @@ def curl_and_get_json_response(url, method: :get, retrieve_err_payload: false);
subject.do_close
end

shared_examples "an indexer" do |secure|
shared_examples "an indexer" do |secure, expected_path|
before(:each) do
host_unreachable_error_class = LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError
allow(host_unreachable_error_class).to receive(:new).with(any_args).and_wrap_original do |m, original, url|
Expand Down Expand Up @@ -212,13 +212,14 @@ def curl_and_get_json_response(url, method: :get, retrieve_err_payload: false);
expect(doc["_index"]).to eq(index)
end
end

it "sets the correct content-type header" do
expected_manticore_opts = {:headers => {"Content-Type" => "application/json"}, :body => anything}
expected_manticore_opts = {:headers => {"Content-Type" => "application/json"}, :body => anything, :query => anything}
if secure
expected_manticore_opts = {
:headers => {"Content-Type" => "application/json"},
:body => anything,
:headers => {"Content-Type" => "application/json"},
:query=>{"filter_path"=>"errors,items.*.error,items.*.status"},
:body => anything,
:auth => {
:user => user,
:password => password,
Expand All @@ -230,6 +231,22 @@ def curl_and_get_json_response(url, method: :get, retrieve_err_payload: false);
and_call_original
subject.multi_receive(events)
end

it "sets the bulk path URL and filter path parameter correctly" do
expect(subject.client.pool.adapter.client).to receive(:send).
with(anything, expected_path != nil ? expected_path : anything,
hash_including(:query => {"filter_path" => "errors,items.*.error,items.*.status"})).at_least(:once).
and_call_original
subject.multi_receive(events)
end

it "receives a filtered response" do
expect(subject.client).to receive(:join_bulk_responses).
with([{"errors"=>false, "items"=>[{"index"=>{"status"=>201}}]}]).
and_call_original
subject.multi_receive([event])
end

end

shared_examples "PKIX path failure" do
Expand Down Expand Up @@ -269,6 +286,33 @@ def curl_and_get_json_response(url, method: :get, retrieve_err_payload: false);
it_behaves_like("an indexer")
end

describe "an indexer with custom bulk path", :integration => true do
let(:config) {
{
"hosts" => get_host_port,
"index" => index,
"http_compression" => false,
"bulk_path" => "/_bulk?routing=true"
}
}
it_behaves_like("an indexer", false) do
let (:expected_path) { "#{es_url}#{bulk_path}" }
end
end

describe "an indexer with the standard bulk path", :integration => true do
let(:config) {
{
"hosts" => get_host_port,
"index" => index,
"http_compression" => false
}
}
it_behaves_like("an indexer", false) do
let (:expected_path) { "#{es_url}/_bulk" }
end
end

describe "an indexer with no type value set (default to doc)", :integration => true do
let(:type) { ESHelper.es_version_satisfies?("< 7") ? "doc" : "_doc" }
let(:config) {
Expand Down Expand Up @@ -296,7 +340,7 @@ def curl_and_get_json_response(url, method: :get, retrieve_err_payload: false);
"index" => index,
"http_compression" => false
}
end
end

let(:curl_opts) { "-u #{user}:#{password}" }

Expand Down
17 changes: 17 additions & 0 deletions spec/unit/outputs/elasticsearch/http_client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,23 @@
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message}],
]}

context "when sending a bulk message" do
let(:base_options) { super().merge(:client_settings => {:compression_level => 0}) }
let(:message1) { "hey" }
let(:actions) { [
["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message1}],
]}
let(:post_response) {
double("response", :code => 200, :body => LogStash::Json::dump( { "body" => "body" }))
}

it "sets the filter path" do
expect(subject.pool).to receive(:post).with(anything, {:query=>{"filter_path"=>"errors,items.*.error,items.*.status"}}, anything).and_return(post_response)
subject.send(:bulk, actions)
end
end


[0, 9].each do |compression_level|
context "with `compression_level => #{compression_level}`" do

Expand Down