diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f90f3d5..3ca12ff0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 11.2.0 + - Added preflight checks on Elasticsearch [#1026](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1026) + ## 11.1.0 - Feat: add `user-agent` header passed to the Elasticsearch HTTP connection [#1038](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1038) diff --git a/lib/logstash/outputs/elasticsearch/http_client/pool.rb b/lib/logstash/outputs/elasticsearch/http_client/pool.rb index 1675afbb..a293dfd0 100644 --- a/lib/logstash/outputs/elasticsearch/http_client/pool.rb +++ b/lib/logstash/outputs/elasticsearch/http_client/pool.rb @@ -37,6 +37,9 @@ def message ROOT_URI_PATH = '/'.freeze LICENSE_PATH = '/_license'.freeze + VERSION_6_TO_7 = Gem::Requirement.new([">= 6.0.0", "< 7.0.0"]) + VERSION_7_TO_7_14 = Gem::Requirement.new([">= 7.0.0", "< 7.14.0"]) + DEFAULT_OPTIONS = { :healthcheck_path => ROOT_URI_PATH, :sniffing_path => "/_nodes/http", @@ -211,7 +214,7 @@ def sniffer_alive? def start_resurrectionist @resurrectionist = Thread.new do until_stopped("resurrection", @resurrect_delay) do - healthcheck! + healthcheck!(false) end end end @@ -232,11 +235,18 @@ def health_check_request(url) perform_request_to_url(url, :head, @healthcheck_path) end - def healthcheck! + def healthcheck!(register_phase = true) # Try to keep locking granularity low such that we don't affect IO... @state_mutex.synchronize { @url_info.select {|url,meta| meta[:state] != :alive } }.each do |url,meta| begin health_check_request(url) + + # when called from resurrectionist skip the product check done during register phase + if register_phase + if !elasticsearch?(url) + raise LogStash::ConfigurationError, "Could not connect to a compatible version of Elasticsearch" + end + end # If no exception was raised it must have succeeded! logger.warn("Restored connection to ES instance", url: url.sanitized.to_s) # We reconnected to this node, check its ES version @@ -254,6 +264,42 @@ def healthcheck! end end + def elasticsearch?(url) + begin + response = perform_request_to_url(url, :get, ROOT_URI_PATH) + rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + return false if response.code == 401 || response.code == 403 + raise e + end + + version_info = LogStash::Json.load(response.body) + return false if version_info['version'].nil? + + version = Gem::Version.new(version_info["version"]['number']) + return false if version < Gem::Version.new('6.0.0') + + if VERSION_6_TO_7.satisfied_by?(version) + return valid_tagline?(version_info) + elsif VERSION_7_TO_7_14.satisfied_by?(version) + build_flavor = version_info["version"]['build_flavor'] + return false if build_flavor.nil? || build_flavor != 'default' || !valid_tagline?(version_info) + else + # case >= 7.14 + lower_headers = response.headers.transform_keys {|key| key.to_s.downcase } + product_header = lower_headers['x-elastic-product'] + return false if product_header != 'Elasticsearch' + end + return true + rescue => e + logger.error("Unable to retrieve Elasticsearch version", url: url.sanitized.to_s, exception: e.class, message: e.message) + false + end + + def valid_tagline?(version_info) + tagline = version_info['tagline'] + tagline == "You Know, for Search" + end + def stop_resurrectionist @resurrectionist.join if @resurrectionist end diff --git a/logstash-output-elasticsearch.gemspec b/logstash-output-elasticsearch.gemspec index 30697f0f..6600c9ac 100644 --- a/logstash-output-elasticsearch.gemspec +++ b/logstash-output-elasticsearch.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-elasticsearch' - s.version = '11.1.0' + s.version = '11.2.0' s.licenses = ['apache-2.0'] s.summary = "Stores logs in Elasticsearch" @@ -31,6 +31,7 @@ Gem::Specification.new do |s| s.add_development_dependency 'flores' s.add_development_dependency 'cabin', ['~> 0.6'] s.add_development_dependency 'webrick' + s.add_development_dependency 'webmock' # Still used in some specs, we should remove this ASAP s.add_development_dependency 'elasticsearch' end diff --git a/spec/unit/outputs/elasticsearch/http_client/pool_spec.rb b/spec/unit/outputs/elasticsearch/http_client/pool_spec.rb index 58511152..edf3b341 100644 --- a/spec/unit/outputs/elasticsearch/http_client/pool_spec.rb +++ b/spec/unit/outputs/elasticsearch/http_client/pool_spec.rb @@ -50,15 +50,18 @@ describe "healthcheck url handling" do let(:initial_urls) { [::LogStash::Util::SafeURI.new("http://localhost:9200")] } + before(:example) do + expect(adapter).to receive(:perform_request).with(anything, :get, "/", anything, anything) do |url, _, _, _, _| + expect(url.path).to be_empty + end + end context "and not setting healthcheck_path" do it "performs the healthcheck to the root" do - expect(adapter).to receive(:perform_request) do |url, method, req_path, _, _| - expect(method).to eq(:head) + expect(adapter).to receive(:perform_request).with(anything, :head, "/", anything, anything) do |url, _, _, _, _| expect(url.path).to be_empty - expect(req_path).to eq("/") end - subject.healthcheck! + expect { subject.healthcheck! }.to raise_error(LogStash::ConfigurationError, "Could not connect to a compatible version of Elasticsearch") end end @@ -66,12 +69,10 @@ let(:healthcheck_path) { "/my/health" } let(:options) { super().merge(:healthcheck_path => healthcheck_path) } it "performs the healthcheck to the healthcheck_path" do - expect(adapter).to receive(:perform_request) do |url, method, req_path, _, _| - expect(method).to eq(:head) + expect(adapter).to receive(:perform_request).with(anything, :head, eq(healthcheck_path), anything, anything) do |url, _, _, _, _| expect(url.path).to be_empty - expect(req_path).to eq(healthcheck_path) end - subject.healthcheck! + expect { subject.healthcheck! }.to raise_error(LogStash::ConfigurationError, "Could not connect to a compatible version of Elasticsearch") end end end @@ -164,6 +165,20 @@ end end + class MockResponse + attr_reader :code, :headers + + def initialize(code = 200, body = nil, headers = {}) + @code = code + @body = body + @headers = headers + end + + def body + @body.to_json + end + end + describe "connection management" do before(:each) { subject.start } context "with only one URL in the list" do @@ -175,8 +190,17 @@ end context "with multiple URLs in the list" do + let(:version_ok) do + MockResponse.new(200, {"tagline" => "You Know, for Search", + "version" => { + "number" => '7.13.0', + "build_flavor" => 'default'} + }) + end + before :each do allow(adapter).to receive(:perform_request).with(anything, :head, subject.healthcheck_path, {}, nil) + allow(adapter).to receive(:perform_request).with(anything, :get, subject.healthcheck_path, {}, nil).and_return(version_ok) end let(:initial_urls) { [ ::LogStash::Util::SafeURI.new("http://localhost:9200"), ::LogStash::Util::SafeURI.new("http://localhost:9201"), ::LogStash::Util::SafeURI.new("http://localhost:9202") ] } @@ -220,8 +244,14 @@ ::LogStash::Util::SafeURI.new("http://otherhost:9201") ] } + let(:valid_response) { MockResponse.new(200, {"tagline" => "You Know, for Search", + "version" => { + "number" => '7.13.0', + "build_flavor" => 'default'} + }) } + before(:each) do - allow(subject).to receive(:perform_request_to_url).and_return(nil) + allow(subject).to receive(:perform_request_to_url).and_return(valid_response) subject.start end @@ -240,6 +270,7 @@ describe "license checking" do before(:each) do allow(subject).to receive(:health_check_request) + allow(subject).to receive(:elasticsearch?).and_return(true) end let(:options) do @@ -273,6 +304,7 @@ before(:each) do allow(subject).to receive(:health_check_request) + allow(subject).to receive(:elasticsearch?).and_return(true) end context "if ES doesn't return a valid license" do @@ -319,3 +351,120 @@ end end end + +describe "#elasticsearch?" do + let(:logger) { Cabin::Channel.get } + let(:adapter) { double("Manticore Adapter") } + let(:initial_urls) { [::LogStash::Util::SafeURI.new("http://localhost:9200")] } + let(:options) { {:resurrect_delay => 2, :url_normalizer => proc {|u| u}} } # Shorten the delay a bit to speed up tests + let(:es_node_versions) { [ "0.0.0" ] } + let(:license_status) { 'active' } + + subject { LogStash::Outputs::ElasticSearch::HttpClient::Pool.new(logger, adapter, initial_urls, options) } + + let(:url) { ::LogStash::Util::SafeURI.new("http://localhost:9200") } + + context "in case HTTP error code" do + it "should fail for 401" do + allow(adapter).to receive(:perform_request) + .with(anything, :get, "/", anything, anything) + .and_return(MockResponse.new(401)) + + expect(subject.elasticsearch?(url)).to be false + end + + it "should fail for 403" do + allow(adapter).to receive(:perform_request) + .with(anything, :get, "/", anything, anything) + .and_return(status: 403) + expect(subject.elasticsearch?(url)).to be false + end + end + + context "when connecting to a cluster which reply without 'version' field" do + it "should fail" do + allow(adapter).to receive(:perform_request) + .with(anything, :get, "/", anything, anything) + .and_return(body: {"field" => "funky.com"}.to_json) + expect(subject.elasticsearch?(url)).to be false + end + end + + context "when connecting to a cluster with version < 6.0.0" do + it "should fail" do + allow(adapter).to receive(:perform_request) + .with(anything, :get, "/", anything, anything) + .and_return(200, {"version" => { "number" => "5.0.0"}}.to_json) + expect(subject.elasticsearch?(url)).to be false + end + end + + context "when connecting to a cluster with version in [6.0.0..7.0.0)" do + it "must be successful with valid 'tagline'" do + allow(adapter).to receive(:perform_request) + .with(anything, :get, "/", anything, anything) + .and_return(MockResponse.new(200, {"version" => {"number" => "6.5.0"}, "tagline" => "You Know, for Search"})) + expect(subject.elasticsearch?(url)).to be true + end + + it "should fail if invalid 'tagline'" do + allow(adapter).to receive(:perform_request) + .with(anything, :get, "/", anything, anything) + .and_return(MockResponse.new(200, {"version" => {"number" => "6.5.0"}, "tagline" => "You don't know"})) + expect(subject.elasticsearch?(url)).to be false + end + + it "should fail if 'tagline' is not present" do + allow(adapter).to receive(:perform_request) + .with(anything, :get, "/", anything, anything) + .and_return(MockResponse.new(200, {"version" => {"number" => "6.5.0"}})) + expect(subject.elasticsearch?(url)).to be false + end + end + + context "when connecting to a cluster with version in [7.0.0..7.14.0)" do + it "must be successful is 'build_flavor' is 'default' and tagline is correct" do + allow(adapter).to receive(:perform_request) + .with(anything, :get, "/", anything, anything) + .and_return(MockResponse.new(200, {"version": {"number": "7.5.0", "build_flavor": "default"}, "tagline": "You Know, for Search"})) + expect(subject.elasticsearch?(url)).to be true + end + + it "should fail if 'build_flavor' is not 'default' and tagline is correct" do + allow(adapter).to receive(:perform_request) + .with(anything, :get, "/", anything, anything) + .and_return(MockResponse.new(200, {"version": {"number": "7.5.0", "build_flavor": "oss"}, "tagline": "You Know, for Search"})) + expect(subject.elasticsearch?(url)).to be false + end + + it "should fail if 'build_flavor' is not present and tagline is correct" do + allow(adapter).to receive(:perform_request) + .with(anything, :get, "/", anything, anything) + .and_return(MockResponse.new(200, {"version": {"number": "7.5.0"}, "tagline": "You Know, for Search"})) + expect(subject.elasticsearch?(url)).to be false + end + end + + context "when connecting to a cluster with version >= 7.14.0" do + it "should fail if 'X-elastic-product' header is not present" do + allow(adapter).to receive(:perform_request) + .with(anything, :get, "/", anything, anything) + .and_return(MockResponse.new(200, {"version": {"number": "7.14.0"}})) + expect(subject.elasticsearch?(url)).to be false + end + + it "should fail if 'X-elastic-product' header is present but with bad value" do + allow(adapter).to receive(:perform_request) + .with(anything, :get, "/", anything, anything) + .and_return(MockResponse.new(200, {"version": {"number": "7.14.0"}}, {'X-elastic-product' => 'not good'})) + expect(subject.elasticsearch?(url)).to be false + end + + it "must be successful when 'X-elastic-product' header is present with 'Elasticsearch' value" do + allow(adapter).to receive(:perform_request) + .with(anything, :get, "/", anything, anything) + .and_return(MockResponse.new(200, {"version": {"number": "7.14.0"}}, {'X-elastic-product' => 'Elasticsearch'})) + expect(subject.elasticsearch?(url)).to be true + end + end +end diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index 46821a55..cd0e5165 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -156,7 +156,7 @@ include_examples("an authenticated config") end - context 'claud_auth also set' do + context 'cloud_auth also set' do let(:do_register) { false } # this is what we want to test, so we disable the before(:each) call let(:options) { { "user" => user, "password" => password, "cloud_auth" => "elastic:my-passwd-00" } }