Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logstash output plugin for Kafka (AWS MSK) not working with SSL #33

Open
nealabh opened this issue Apr 30, 2020 · 1 comment
Open

Logstash output plugin for Kafka (AWS MSK) not working with SSL #33

nealabh opened this issue Apr 30, 2020 · 1 comment

Comments

@nealabh
Copy link

nealabh commented Apr 30, 2020

VERSION:
Logstash 5.6
Kafka: 2.2.1
Kafka output plugin 5.1.11

OS:
Ubuntu 16.04

CONFIG:
input {
elasticsearch {
hosts => "xxx:9200"
size => 1000
index => "xxx"
scroll => "5m"
docinfo => true
}
}
output {
kafka {
bootstrap_servers => "xxx:9094"
codec => json
topic_id => "xxx"
ssl => true
ssl_truststore_location => "/tmp/kafka.client.truststore.jks"
}
}

ERROR:
[2020-04-30T12:11:28,697][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name bufferpool-wait-time
[2020-04-30T12:11:28,699][DEBUG][org.apache.kafka.common.metrics.Metrics] Added sensor with name buffer-exhausted-records
[2020-04-30T12:11:28,735][INFO ][org.apache.kafka.clients.producer.KafkaProducer] Closing the Kafka producer with timeoutMillis = 0 ms.
[2020-04-30T12:11:28,736][DEBUG][org.apache.kafka.clients.producer.KafkaProducer] The Kafka producer has closed.
[2020-04-30T12:11:28,738][ERROR][logstash.outputs.kafka ] Unable to create Kafka producer from given configuration {:kafka_error_message=>org.apache.kafka.common.KafkaException: Failed to construct kafka producer}
[2020-04-30T12:11:28,740][ERROR][logstash.pipeline ] Error registering plugin {:plugin=>"#<LogStash::OutputDelegator:0x580afd45 @namespaced_metric=#<LogStash::Instrument::NamespacedMetric:0x3ff04e36 @Metric=#<LogStash::Instrument::Metric:0x233ba0df @collector=#<LogStash::Instrument::Collector:0x4063a978 @agent=nil, @metric_store=#<LogStash::Instrument::MetricStore:0x4f0c94e4 @store=#<Concurrent::Map:0x00000000065b68 entries=3 default_proc=nil>, @structured_lookup_mutex=#Mutex:0x72b0f2cd, @fast_lookup=#<Concurrent::Map:0x00000000065b6c entries=61 default_proc=nil>>>>, @namespace_name=[:stats, :pipelines, :main, :plugins, :outputs, :"112805a66f2d1d9c7627582eafaa30a5bd45a13a-2"]>, @Metric=#<LogStash::Instrument::NamespacedMetric:0x5106c983 @Metric=#<LogStash::Instrument::Metric:0x233ba0df @collector=#<LogStash::Instrument::Collector:0x4063a978 @agent=nil, @metric_store=#<LogStash::Instrument::MetricStore:0x4f0c94e4 @store=#<Concurrent::Map:0x00000000065b68 entries=3 default_proc=nil>, @structured_lookup_mutex=#Mutex:0x72b0f2cd, @fast_lookup=#<Concurrent::Map:0x00000000065b6c entries=61 default_proc=nil>>>>, @namespace_name=[:stats, :pipelines, :main, :plugins, :outputs]>, @logger=#<LogStash::Logging::Logger:0xf557fe4 @logger=#Java::OrgApacheLoggingLog4jCore::Logger:0x1db311bb>, @out_counter=LogStash::Instrument::MetricType::Counter - namespaces: [:stats, :pipelines, :main, :plugins, :outputs, :"112805a66f2d1d9c7627582eafaa30a5bd45a13a-2", :events] key: out value: 0, @in_counter=LogStash::Instrument::MetricType::Counter - namespaces: [:stats, :pipelines, :main, :plugins, :outputs, :"112805a66f2d1d9c7627582eafaa30a5bd45a13a-2", :events] key: in value: 0, @strategy=#<LogStash::OutputDelegatorStrategies::Shared:0x34fe0e7b @output=<LogStash::Outputs::Kafka bootstrap_servers=>"b-1.host:9094,host2:9094", codec=><LogStash::Codecs::JSON id=>"json_92740a35-849e-4794-90d9-64ab2acfe240", enable_metric=>true, charset=>"UTF-8">, topic_id=>"test-migration-topic", ssl=>true, ssl_truststore_location=>"/tmp/kafka.client.truststore.jks", id=>"112805a66f2d1d9c7627582eafaa30a5bd45a13a-2", enable_metric=>true, workers=>1, acks=>"1", batch_size=>16384, block_on_buffer_full=>true, buffer_memory=>33554432, compression_type=>"none", key_serializer=>"org.apache.kafka.common.serialization.StringSerializer", linger_ms=>0, max_request_size=>1048576, metadata_fetch_timeout_ms=>60000, metadata_max_age_ms=>300000, receive_buffer_bytes=>32768, reconnect_backoff_ms=>10, retry_backoff_ms=>100, send_buffer_bytes=>131072, security_protocol=>"PLAINTEXT", sasl_mechanism=>"GSSAPI", timeout_ms=>30000, value_serializer=>"org.apache.kafka.common.serialization.StringSerializer">>, @id="112805a66f2d1d9c7627582eafaa30a5bd45a13a-2", @time_metric=LogStash::Instrument::MetricType::Counter - namespaces: [:stats, :pipelines, :main, :plugins, :outputs, :"112805a66f2d1d9c7627582eafaa30a5bd45a13a-2", :events] key: duration_in_millis value: 0, @metric_events=#<LogStash::Instrument::NamespacedMetric:0x1070ed5f @Metric=#<LogStash::Instrument::Metric:0x233ba0df @collector=#<LogStash::Instrument::Collector:0x4063a978 @agent=nil, @metric_store=#<LogStash::Instrument::MetricStore:0x4f0c94e4 @store=#<Concurrent::Map:0x00000000065b68 entries=3 default_proc=nil>, @structured_lookup_mutex=#Mutex:0x72b0f2cd, @fast_lookup=#<Concurrent::Map:0x00000000065b6c entries=61 default_proc=nil>>>>, @namespace_name=[:stats, :pipelines, :main, :plugins, :outputs, :"112805a66f2d1d9c7627582eafaa30a5bd45a13a-2", :events]>, @output_class=LogStash::Outputs::Kafka>", :error=>"Failed to construct kafka producer"}
[2020-04-30T12:11:28,748][ERROR][logstash.agent ] Pipeline aborted due to error {:exception=>org.apache.kafka.common.KafkaException: Failed to construct kafka producer, :backtrace=>["org.apache.kafka.clients.producer.KafkaProducer.(org/apache/kafka/clients/producer/KafkaProducer.java:335)", "org.apache.kafka.clients.producer.KafkaProducer.(org/apache/kafka/clients/producer/KafkaProducer.java:188)", "java.lang.reflect.Constructor.newInstance(java/lang/reflect/Constructor.java:423)", "RUBY.create_producer(/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-kafka-5.1.11/lib/logstash/outputs/kafka.rb:334)", "RUBY.register(/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-kafka-5.1.11/lib/logstash/outputs/kafka.rb:195)", "RUBY.register(/usr/share/logstash/logstash-core/lib/logstash/output_delegator_strategies/shared.rb:9)", "RUBY.register(/usr/share/logstash/logstash-core/lib/logstash/output_delegator.rb:43)", "RUBY.register_plugin(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:290)", "RUBY.register_plugins(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:301)", "org.jruby.RubyArray.each(org/jruby/RubyArray.java:1613)", "RUBY.register_plugins(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:301)", "RUBY.start_workers(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:310)", "RUBY.run(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:235)", "RUBY.start_pipeline(/usr/share/logstash/logstash-core/lib/logstash/agent.rb:408)", "java.lang.Thread.run(java/lang/Thread.java:748)"]}
[2020-04-30T12:11:28,759][DEBUG][logstash.agent ] Starting puma
[2020-04-30T12:11:28,759][DEBUG][logstash.agent ] Trying to start WebServer {:port=>9600}
[2020-04-30T12:11:28,761][DEBUG][logstash.api.service ] [api-service] start
[2020-04-30T12:11:28,781][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2020-04-30T12:11:31,761][DEBUG][logstash.instrument.periodicpoller.os] PeriodicPoller: Stopping
[2020-04-30T12:11:31,761][DEBUG][logstash.instrument.periodicpoller.jvm] PeriodicPoller: Stopping
[2020-04-30T12:11:31,761][DEBUG][logstash.instrument.periodicpoller.persistentqueue] PeriodicPoller: Stopping
[2020-04-30T12:11:31,762][DEBUG][logstash.instrument.periodicpoller.deadletterqueue] PeriodicPoller: Stopping
[2020-04-30T12:11:31,765][WARN ][logstash.agent ] stopping pipeline {:id=>"main"}
[2020-04-30T12:11:31,766][DEBUG][logstash.pipeline ] Closing inputs
[2020-04-30T12:11:31,769][DEBUG][logstash.inputs.elasticsearch] stopping {:plugin=>"LogStash::Inputs::Elasticsearch"}
[2020-04-30T12:11:31,769][DEBUG][logstash.pipeline ] Closed inputs

Steps to reproduce:
Use these version with this config,
I was able to Send data without SSL (PLAIN TEXT) but it fails with SSL.

on the other hand using cli I am able to do this with same settings
https://docs.aws.amazon.com/msk/latest/developerguide/produce-consume.html
security.protocol=SSL
ssl.truststore.location=/tmp/kafka.client.truststore.jks

Not sure if the versions listed are not supported or the bug is a plugin problem. Or my config is missing something. Please help.

@nealabh nealabh changed the title Logstash output plugin for Kafka not working with SSL Logstash output plugin for Kafka (AWS MSK) not working with SSL Apr 30, 2020
@nealabh
Copy link
Author

nealabh commented Apr 30, 2020

I have to use Logstash 5.6 because my Input is Elasticsearch 2.3.5, so that's the max I can go.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant