Skip to content

Commit

Permalink
Add preflight check on Elasticsearch before connecting (#1026)
Browse files Browse the repository at this point in the history
Adds Elasticsearch preflight check during plugin registration.

Co-authored-by: João Duarte <[email protected]>
  • Loading branch information
andsel and jsvd authored Oct 12, 2021
1 parent eb41141 commit 3a170cf
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 13 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 11.2.0
- Added preflight checks on Elasticsearch [#1026](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1026)

## 11.1.0
- Feat: add `user-agent` header passed to the Elasticsearch HTTP connection [#1038](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1038)

Expand Down
50 changes: 48 additions & 2 deletions lib/logstash/outputs/elasticsearch/http_client/pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ def message
ROOT_URI_PATH = '/'.freeze
LICENSE_PATH = '/_license'.freeze

VERSION_6_TO_7 = Gem::Requirement.new([">= 6.0.0", "< 7.0.0"])
VERSION_7_TO_7_14 = Gem::Requirement.new([">= 7.0.0", "< 7.14.0"])

DEFAULT_OPTIONS = {
:healthcheck_path => ROOT_URI_PATH,
:sniffing_path => "/_nodes/http",
Expand Down Expand Up @@ -211,7 +214,7 @@ def sniffer_alive?
def start_resurrectionist
@resurrectionist = Thread.new do
until_stopped("resurrection", @resurrect_delay) do
healthcheck!
healthcheck!(false)
end
end
end
Expand All @@ -232,11 +235,18 @@ def health_check_request(url)
perform_request_to_url(url, :head, @healthcheck_path)
end

def healthcheck!
def healthcheck!(register_phase = true)
# Try to keep locking granularity low such that we don't affect IO...
@state_mutex.synchronize { @url_info.select {|url,meta| meta[:state] != :alive } }.each do |url,meta|
begin
health_check_request(url)

# when called from resurrectionist skip the product check done during register phase
if register_phase
if !elasticsearch?(url)
raise LogStash::ConfigurationError, "Could not connect to a compatible version of Elasticsearch"
end
end
# If no exception was raised it must have succeeded!
logger.warn("Restored connection to ES instance", url: url.sanitized.to_s)
# We reconnected to this node, check its ES version
Expand All @@ -254,6 +264,42 @@ def healthcheck!
end
end

def elasticsearch?(url)
begin
response = perform_request_to_url(url, :get, ROOT_URI_PATH)
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
return false if response.code == 401 || response.code == 403
raise e
end

version_info = LogStash::Json.load(response.body)
return false if version_info['version'].nil?

version = Gem::Version.new(version_info["version"]['number'])
return false if version < Gem::Version.new('6.0.0')

if VERSION_6_TO_7.satisfied_by?(version)
return valid_tagline?(version_info)
elsif VERSION_7_TO_7_14.satisfied_by?(version)
build_flavor = version_info["version"]['build_flavor']
return false if build_flavor.nil? || build_flavor != 'default' || !valid_tagline?(version_info)
else
# case >= 7.14
lower_headers = response.headers.transform_keys {|key| key.to_s.downcase }
product_header = lower_headers['x-elastic-product']
return false if product_header != 'Elasticsearch'
end
return true
rescue => e
logger.error("Unable to retrieve Elasticsearch version", url: url.sanitized.to_s, exception: e.class, message: e.message)
false
end

def valid_tagline?(version_info)
tagline = version_info['tagline']
tagline == "You Know, for Search"
end

def stop_resurrectionist
@resurrectionist.join if @resurrectionist
end
Expand Down
3 changes: 2 additions & 1 deletion logstash-output-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-elasticsearch'
s.version = '11.1.0'
s.version = '11.2.0'

s.licenses = ['apache-2.0']
s.summary = "Stores logs in Elasticsearch"
Expand Down Expand Up @@ -31,6 +31,7 @@ Gem::Specification.new do |s|
s.add_development_dependency 'flores'
s.add_development_dependency 'cabin', ['~> 0.6']
s.add_development_dependency 'webrick'
s.add_development_dependency 'webmock'
# Still used in some specs, we should remove this ASAP
s.add_development_dependency 'elasticsearch'
end
167 changes: 158 additions & 9 deletions spec/unit/outputs/elasticsearch/http_client/pool_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,28 +50,29 @@

describe "healthcheck url handling" do
let(:initial_urls) { [::LogStash::Util::SafeURI.new("http://localhost:9200")] }
before(:example) do
expect(adapter).to receive(:perform_request).with(anything, :get, "/", anything, anything) do |url, _, _, _, _|
expect(url.path).to be_empty
end
end

context "and not setting healthcheck_path" do
it "performs the healthcheck to the root" do
expect(adapter).to receive(:perform_request) do |url, method, req_path, _, _|
expect(method).to eq(:head)
expect(adapter).to receive(:perform_request).with(anything, :head, "/", anything, anything) do |url, _, _, _, _|
expect(url.path).to be_empty
expect(req_path).to eq("/")
end
subject.healthcheck!
expect { subject.healthcheck! }.to raise_error(LogStash::ConfigurationError, "Could not connect to a compatible version of Elasticsearch")
end
end

context "and setting healthcheck_path" do
let(:healthcheck_path) { "/my/health" }
let(:options) { super().merge(:healthcheck_path => healthcheck_path) }
it "performs the healthcheck to the healthcheck_path" do
expect(adapter).to receive(:perform_request) do |url, method, req_path, _, _|
expect(method).to eq(:head)
expect(adapter).to receive(:perform_request).with(anything, :head, eq(healthcheck_path), anything, anything) do |url, _, _, _, _|
expect(url.path).to be_empty
expect(req_path).to eq(healthcheck_path)
end
subject.healthcheck!
expect { subject.healthcheck! }.to raise_error(LogStash::ConfigurationError, "Could not connect to a compatible version of Elasticsearch")
end
end
end
Expand Down Expand Up @@ -164,6 +165,20 @@
end
end

class MockResponse
attr_reader :code, :headers

def initialize(code = 200, body = nil, headers = {})
@code = code
@body = body
@headers = headers
end

def body
@body.to_json
end
end

describe "connection management" do
before(:each) { subject.start }
context "with only one URL in the list" do
Expand All @@ -175,8 +190,17 @@
end

context "with multiple URLs in the list" do
let(:version_ok) do
MockResponse.new(200, {"tagline" => "You Know, for Search",
"version" => {
"number" => '7.13.0',
"build_flavor" => 'default'}
})
end

before :each do
allow(adapter).to receive(:perform_request).with(anything, :head, subject.healthcheck_path, {}, nil)
allow(adapter).to receive(:perform_request).with(anything, :get, subject.healthcheck_path, {}, nil).and_return(version_ok)
end
let(:initial_urls) { [ ::LogStash::Util::SafeURI.new("http://localhost:9200"), ::LogStash::Util::SafeURI.new("http://localhost:9201"), ::LogStash::Util::SafeURI.new("http://localhost:9202") ] }

Expand Down Expand Up @@ -220,8 +244,14 @@
::LogStash::Util::SafeURI.new("http://otherhost:9201")
] }

let(:valid_response) { MockResponse.new(200, {"tagline" => "You Know, for Search",
"version" => {
"number" => '7.13.0',
"build_flavor" => 'default'}
}) }

before(:each) do
allow(subject).to receive(:perform_request_to_url).and_return(nil)
allow(subject).to receive(:perform_request_to_url).and_return(valid_response)
subject.start
end

Expand All @@ -240,6 +270,7 @@
describe "license checking" do
before(:each) do
allow(subject).to receive(:health_check_request)
allow(subject).to receive(:elasticsearch?).and_return(true)
end

let(:options) do
Expand Down Expand Up @@ -273,6 +304,7 @@

before(:each) do
allow(subject).to receive(:health_check_request)
allow(subject).to receive(:elasticsearch?).and_return(true)
end

context "if ES doesn't return a valid license" do
Expand Down Expand Up @@ -319,3 +351,120 @@
end
end
end

describe "#elasticsearch?" do
let(:logger) { Cabin::Channel.get }
let(:adapter) { double("Manticore Adapter") }
let(:initial_urls) { [::LogStash::Util::SafeURI.new("http://localhost:9200")] }
let(:options) { {:resurrect_delay => 2, :url_normalizer => proc {|u| u}} } # Shorten the delay a bit to speed up tests
let(:es_node_versions) { [ "0.0.0" ] }
let(:license_status) { 'active' }

subject { LogStash::Outputs::ElasticSearch::HttpClient::Pool.new(logger, adapter, initial_urls, options) }

let(:url) { ::LogStash::Util::SafeURI.new("http://localhost:9200") }

context "in case HTTP error code" do
it "should fail for 401" do
allow(adapter).to receive(:perform_request)
.with(anything, :get, "/", anything, anything)
.and_return(MockResponse.new(401))

expect(subject.elasticsearch?(url)).to be false
end

it "should fail for 403" do
allow(adapter).to receive(:perform_request)
.with(anything, :get, "/", anything, anything)
.and_return(status: 403)
expect(subject.elasticsearch?(url)).to be false
end
end

context "when connecting to a cluster which reply without 'version' field" do
it "should fail" do
allow(adapter).to receive(:perform_request)
.with(anything, :get, "/", anything, anything)
.and_return(body: {"field" => "funky.com"}.to_json)
expect(subject.elasticsearch?(url)).to be false
end
end

context "when connecting to a cluster with version < 6.0.0" do
it "should fail" do
allow(adapter).to receive(:perform_request)
.with(anything, :get, "/", anything, anything)
.and_return(200, {"version" => { "number" => "5.0.0"}}.to_json)
expect(subject.elasticsearch?(url)).to be false
end
end

context "when connecting to a cluster with version in [6.0.0..7.0.0)" do
it "must be successful with valid 'tagline'" do
allow(adapter).to receive(:perform_request)
.with(anything, :get, "/", anything, anything)
.and_return(MockResponse.new(200, {"version" => {"number" => "6.5.0"}, "tagline" => "You Know, for Search"}))
expect(subject.elasticsearch?(url)).to be true
end

it "should fail if invalid 'tagline'" do
allow(adapter).to receive(:perform_request)
.with(anything, :get, "/", anything, anything)
.and_return(MockResponse.new(200, {"version" => {"number" => "6.5.0"}, "tagline" => "You don't know"}))
expect(subject.elasticsearch?(url)).to be false
end

it "should fail if 'tagline' is not present" do
allow(adapter).to receive(:perform_request)
.with(anything, :get, "/", anything, anything)
.and_return(MockResponse.new(200, {"version" => {"number" => "6.5.0"}}))
expect(subject.elasticsearch?(url)).to be false
end
end

context "when connecting to a cluster with version in [7.0.0..7.14.0)" do
it "must be successful is 'build_flavor' is 'default' and tagline is correct" do
allow(adapter).to receive(:perform_request)
.with(anything, :get, "/", anything, anything)
.and_return(MockResponse.new(200, {"version": {"number": "7.5.0", "build_flavor": "default"}, "tagline": "You Know, for Search"}))
expect(subject.elasticsearch?(url)).to be true
end

it "should fail if 'build_flavor' is not 'default' and tagline is correct" do
allow(adapter).to receive(:perform_request)
.with(anything, :get, "/", anything, anything)
.and_return(MockResponse.new(200, {"version": {"number": "7.5.0", "build_flavor": "oss"}, "tagline": "You Know, for Search"}))
expect(subject.elasticsearch?(url)).to be false
end

it "should fail if 'build_flavor' is not present and tagline is correct" do
allow(adapter).to receive(:perform_request)
.with(anything, :get, "/", anything, anything)
.and_return(MockResponse.new(200, {"version": {"number": "7.5.0"}, "tagline": "You Know, for Search"}))
expect(subject.elasticsearch?(url)).to be false
end
end

context "when connecting to a cluster with version >= 7.14.0" do
it "should fail if 'X-elastic-product' header is not present" do
allow(adapter).to receive(:perform_request)
.with(anything, :get, "/", anything, anything)
.and_return(MockResponse.new(200, {"version": {"number": "7.14.0"}}))
expect(subject.elasticsearch?(url)).to be false
end

it "should fail if 'X-elastic-product' header is present but with bad value" do
allow(adapter).to receive(:perform_request)
.with(anything, :get, "/", anything, anything)
.and_return(MockResponse.new(200, {"version": {"number": "7.14.0"}}, {'X-elastic-product' => 'not good'}))
expect(subject.elasticsearch?(url)).to be false
end

it "must be successful when 'X-elastic-product' header is present with 'Elasticsearch' value" do
allow(adapter).to receive(:perform_request)
.with(anything, :get, "/", anything, anything)
.and_return(MockResponse.new(200, {"version": {"number": "7.14.0"}}, {'X-elastic-product' => 'Elasticsearch'}))
expect(subject.elasticsearch?(url)).to be true
end
end
end
2 changes: 1 addition & 1 deletion spec/unit/outputs/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@
include_examples("an authenticated config")
end

context 'claud_auth also set' do
context 'cloud_auth also set' do
let(:do_register) { false } # this is what we want to test, so we disable the before(:each) call
let(:options) { { "user" => user, "password" => password, "cloud_auth" => "elastic:my-passwd-00" } }

Expand Down

0 comments on commit 3a170cf

Please sign in to comment.