Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add preflight check on Elasticsearch before connecting #1026

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 11.2.0
- Added preflight checks on Elasticsearch [#1026](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1026)

## 11.1.0
- Feat: add `user-agent` header passed to the Elasticsearch HTTP connection [#1038](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1038)

Expand Down
50 changes: 48 additions & 2 deletions lib/logstash/outputs/elasticsearch/http_client/pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ def message
ROOT_URI_PATH = '/'.freeze
LICENSE_PATH = '/_license'.freeze

VERSION_6_TO_7 = Gem::Requirement.new([">= 6.0.0", "< 7.0.0"])
VERSION_7_TO_7_14 = Gem::Requirement.new([">= 7.0.0", "< 7.14.0"])

DEFAULT_OPTIONS = {
:healthcheck_path => ROOT_URI_PATH,
:sniffing_path => "/_nodes/http",
Expand Down Expand Up @@ -211,7 +214,7 @@ def sniffer_alive?
def start_resurrectionist
@resurrectionist = Thread.new do
until_stopped("resurrection", @resurrect_delay) do
healthcheck!
healthcheck!(false)
end
end
end
Expand All @@ -232,11 +235,18 @@ def health_check_request(url)
perform_request_to_url(url, :head, @healthcheck_path)
end

def healthcheck!
def healthcheck!(register_phase = true)
# Try to keep locking granularity low such that we don't affect IO...
@state_mutex.synchronize { @url_info.select {|url,meta| meta[:state] != :alive } }.each do |url,meta|
begin
health_check_request(url)

# when called from resurrectionist skip the product check done during register phase
if register_phase
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes me wonder if we could just make the elasticsearch? check everytime instead of HEAD'ing the /?

Right after we do HEAD then we request '/' in es_version = get_es_version(url).

With this change we'll be doing 4 requests to ES:

  1. HEAD request
  2. GET '/' for elasticsearch? on register_phase
  3. GET '/' to get elasticsearch version
  4. GET '/_license'

We can just do instead:

  1. GET '/' for healthcheck (and always confirm it's ES)
  2. GET '/_license'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you, I think that point 2 and 3 could be merged or at least do one one HTTP GET call to /.
About the HTTP HEAD, that call could use a customized path from configuration (healthcheck_path) that some users could have customized. If we drop the HEAD call is like ignoring that setting and we should remove it.
I'm ok with removing the HEAD call, because actually seems to be used just to ping the server for reachability and then it immediately send a GET.
We could split this work in 2 steps:

  • first collapse the GET call for point 2 and 3
  • create a follow-up PR to drop the HEAD call and deprecate the healthcheck_path param

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create a follow-up PR to drop the HEAD call and deprecate the healthcheck_path param

It's fine if there's only an issue instead to track the goal of reducing the number of network calls for a healthcheck.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened #1043 to track this feature

if !elasticsearch?(url)
raise LogStash::ConfigurationError, "Could not connect to a compatible version of Elasticsearch"
end
end
# If no exception was raised it must have succeeded!
logger.warn("Restored connection to ES instance", url: url.sanitized.to_s)
# We reconnected to this node, check its ES version
Expand All @@ -254,6 +264,42 @@ def healthcheck!
end
end

def elasticsearch?(url)
begin
response = perform_request_to_url(url, :get, ROOT_URI_PATH)
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
return false if response.code == 401 || response.code == 403
raise e
end

version_info = LogStash::Json.load(response.body)
return false if version_info['version'].nil?

version = Gem::Version.new(version_info["version"]['number'])
return false if version < Gem::Version.new('6.0.0')

if VERSION_6_TO_7.satisfied_by?(version)
return valid_tagline?(version_info)
elsif VERSION_7_TO_7_14.satisfied_by?(version)
build_flavor = version_info["version"]['build_flavor']
return false if build_flavor.nil? || build_flavor != 'default' || !valid_tagline?(version_info)
else
# case >= 7.14
lower_headers = response.headers.transform_keys {|key| key.to_s.downcase }
product_header = lower_headers['x-elastic-product']
return false if product_header != 'Elasticsearch'
end
return true
rescue => e
logger.error("Unable to retrieve Elasticsearch version", url: url.sanitized.to_s, exception: e.class, message: e.message)
false
end

def valid_tagline?(version_info)
tagline = version_info['tagline']
tagline == "You Know, for Search"
end

def stop_resurrectionist
@resurrectionist.join if @resurrectionist
end
Expand Down
3 changes: 2 additions & 1 deletion logstash-output-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-elasticsearch'
s.version = '11.1.0'
s.version = '11.2.0'

s.licenses = ['apache-2.0']
s.summary = "Stores logs in Elasticsearch"
Expand Down Expand Up @@ -31,6 +31,7 @@ Gem::Specification.new do |s|
s.add_development_dependency 'flores'
s.add_development_dependency 'cabin', ['~> 0.6']
s.add_development_dependency 'webrick'
s.add_development_dependency 'webmock'
# Still used in some specs, we should remove this ASAP
s.add_development_dependency 'elasticsearch'
end
167 changes: 158 additions & 9 deletions spec/unit/outputs/elasticsearch/http_client/pool_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,28 +50,29 @@

describe "healthcheck url handling" do
let(:initial_urls) { [::LogStash::Util::SafeURI.new("http://localhost:9200")] }
before(:example) do
expect(adapter).to receive(:perform_request).with(anything, :get, "/", anything, anything) do |url, _, _, _, _|
expect(url.path).to be_empty
end
end

context "and not setting healthcheck_path" do
it "performs the healthcheck to the root" do
expect(adapter).to receive(:perform_request) do |url, method, req_path, _, _|
expect(method).to eq(:head)
expect(adapter).to receive(:perform_request).with(anything, :head, "/", anything, anything) do |url, _, _, _, _|
expect(url.path).to be_empty
expect(req_path).to eq("/")
end
subject.healthcheck!
expect { subject.healthcheck! }.to raise_error(LogStash::ConfigurationError, "Could not connect to a compatible version of Elasticsearch")
end
end

context "and setting healthcheck_path" do
let(:healthcheck_path) { "/my/health" }
let(:options) { super().merge(:healthcheck_path => healthcheck_path) }
it "performs the healthcheck to the healthcheck_path" do
expect(adapter).to receive(:perform_request) do |url, method, req_path, _, _|
expect(method).to eq(:head)
expect(adapter).to receive(:perform_request).with(anything, :head, eq(healthcheck_path), anything, anything) do |url, _, _, _, _|
expect(url.path).to be_empty
expect(req_path).to eq(healthcheck_path)
end
subject.healthcheck!
expect { subject.healthcheck! }.to raise_error(LogStash::ConfigurationError, "Could not connect to a compatible version of Elasticsearch")
end
end
end
Expand Down Expand Up @@ -164,6 +165,20 @@
end
end

class MockResponse
attr_reader :code, :headers

def initialize(code = 200, body = nil, headers = {})
@code = code
@body = body
@headers = headers
end

def body
@body.to_json
end
end

describe "connection management" do
before(:each) { subject.start }
context "with only one URL in the list" do
Expand All @@ -175,8 +190,17 @@
end

context "with multiple URLs in the list" do
let(:version_ok) do
MockResponse.new(200, {"tagline" => "You Know, for Search",
"version" => {
"number" => '7.13.0',
"build_flavor" => 'default'}
})
end

before :each do
allow(adapter).to receive(:perform_request).with(anything, :head, subject.healthcheck_path, {}, nil)
allow(adapter).to receive(:perform_request).with(anything, :get, subject.healthcheck_path, {}, nil).and_return(version_ok)
end
let(:initial_urls) { [ ::LogStash::Util::SafeURI.new("http://localhost:9200"), ::LogStash::Util::SafeURI.new("http://localhost:9201"), ::LogStash::Util::SafeURI.new("http://localhost:9202") ] }

Expand Down Expand Up @@ -220,8 +244,14 @@
::LogStash::Util::SafeURI.new("http://otherhost:9201")
] }

let(:valid_response) { MockResponse.new(200, {"tagline" => "You Know, for Search",
"version" => {
"number" => '7.13.0',
"build_flavor" => 'default'}
}) }

before(:each) do
allow(subject).to receive(:perform_request_to_url).and_return(nil)
allow(subject).to receive(:perform_request_to_url).and_return(valid_response)
subject.start
end

Expand All @@ -240,6 +270,7 @@
describe "license checking" do
before(:each) do
allow(subject).to receive(:health_check_request)
allow(subject).to receive(:elasticsearch?).and_return(true)
end

let(:options) do
Expand Down Expand Up @@ -273,6 +304,7 @@

before(:each) do
allow(subject).to receive(:health_check_request)
allow(subject).to receive(:elasticsearch?).and_return(true)
end

context "if ES doesn't return a valid license" do
Expand Down Expand Up @@ -319,3 +351,120 @@
end
end
end

describe "#elasticsearch?" do
jsvd marked this conversation as resolved.
Show resolved Hide resolved
let(:logger) { Cabin::Channel.get }
let(:adapter) { double("Manticore Adapter") }
let(:initial_urls) { [::LogStash::Util::SafeURI.new("http://localhost:9200")] }
let(:options) { {:resurrect_delay => 2, :url_normalizer => proc {|u| u}} } # Shorten the delay a bit to speed up tests
let(:es_node_versions) { [ "0.0.0" ] }
let(:license_status) { 'active' }

subject { LogStash::Outputs::ElasticSearch::HttpClient::Pool.new(logger, adapter, initial_urls, options) }

let(:url) { ::LogStash::Util::SafeURI.new("http://localhost:9200") }

context "in case HTTP error code" do
it "should fail for 401" do
allow(adapter).to receive(:perform_request)
.with(anything, :get, "/", anything, anything)
.and_return(MockResponse.new(401))

expect(subject.elasticsearch?(url)).to be false
end

it "should fail for 403" do
allow(adapter).to receive(:perform_request)
.with(anything, :get, "/", anything, anything)
.and_return(status: 403)
expect(subject.elasticsearch?(url)).to be false
end
end

context "when connecting to a cluster which reply without 'version' field" do
it "should fail" do
allow(adapter).to receive(:perform_request)
.with(anything, :get, "/", anything, anything)
.and_return(body: {"field" => "funky.com"}.to_json)
expect(subject.elasticsearch?(url)).to be false
end
end

context "when connecting to a cluster with version < 6.0.0" do
it "should fail" do
allow(adapter).to receive(:perform_request)
.with(anything, :get, "/", anything, anything)
.and_return(200, {"version" => { "number" => "5.0.0"}}.to_json)
expect(subject.elasticsearch?(url)).to be false
end
end

context "when connecting to a cluster with version in [6.0.0..7.0.0)" do
it "must be successful with valid 'tagline'" do
allow(adapter).to receive(:perform_request)
.with(anything, :get, "/", anything, anything)
.and_return(MockResponse.new(200, {"version" => {"number" => "6.5.0"}, "tagline" => "You Know, for Search"}))
expect(subject.elasticsearch?(url)).to be true
end

it "should fail if invalid 'tagline'" do
allow(adapter).to receive(:perform_request)
.with(anything, :get, "/", anything, anything)
.and_return(MockResponse.new(200, {"version" => {"number" => "6.5.0"}, "tagline" => "You don't know"}))
expect(subject.elasticsearch?(url)).to be false
end

it "should fail if 'tagline' is not present" do
allow(adapter).to receive(:perform_request)
.with(anything, :get, "/", anything, anything)
.and_return(MockResponse.new(200, {"version" => {"number" => "6.5.0"}}))
expect(subject.elasticsearch?(url)).to be false
end
end

context "when connecting to a cluster with version in [7.0.0..7.14.0)" do
it "must be successful is 'build_flavor' is 'default' and tagline is correct" do
allow(adapter).to receive(:perform_request)
.with(anything, :get, "/", anything, anything)
.and_return(MockResponse.new(200, {"version": {"number": "7.5.0", "build_flavor": "default"}, "tagline": "You Know, for Search"}))
expect(subject.elasticsearch?(url)).to be true
end

it "should fail if 'build_flavor' is not 'default' and tagline is correct" do
allow(adapter).to receive(:perform_request)
.with(anything, :get, "/", anything, anything)
.and_return(MockResponse.new(200, {"version": {"number": "7.5.0", "build_flavor": "oss"}, "tagline": "You Know, for Search"}))
expect(subject.elasticsearch?(url)).to be false
end

it "should fail if 'build_flavor' is not present and tagline is correct" do
allow(adapter).to receive(:perform_request)
.with(anything, :get, "/", anything, anything)
.and_return(MockResponse.new(200, {"version": {"number": "7.5.0"}, "tagline": "You Know, for Search"}))
expect(subject.elasticsearch?(url)).to be false
end
end

context "when connecting to a cluster with version >= 7.14.0" do
it "should fail if 'X-elastic-product' header is not present" do
allow(adapter).to receive(:perform_request)
.with(anything, :get, "/", anything, anything)
.and_return(MockResponse.new(200, {"version": {"number": "7.14.0"}}))
expect(subject.elasticsearch?(url)).to be false
end

it "should fail if 'X-elastic-product' header is present but with bad value" do
allow(adapter).to receive(:perform_request)
.with(anything, :get, "/", anything, anything)
.and_return(MockResponse.new(200, {"version": {"number": "7.14.0"}}, {'X-elastic-product' => 'not good'}))
expect(subject.elasticsearch?(url)).to be false
end

it "must be successful when 'X-elastic-product' header is present with 'Elasticsearch' value" do
allow(adapter).to receive(:perform_request)
.with(anything, :get, "/", anything, anything)
.and_return(MockResponse.new(200, {"version": {"number": "7.14.0"}}, {'X-elastic-product' => 'Elasticsearch'}))
expect(subject.elasticsearch?(url)).to be true
end
end
end
2 changes: 1 addition & 1 deletion spec/unit/outputs/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@
include_examples("an authenticated config")
end

context 'claud_auth also set' do
context 'cloud_auth also set' do
let(:do_register) { false } # this is what we want to test, so we disable the before(:each) call
let(:options) { { "user" => user, "password" => password, "cloud_auth" => "elastic:my-passwd-00" } }

Expand Down