Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logstash sending garbage data to Kafka #53

Open
himanshigpta opened this issue Aug 28, 2020 · 1 comment
Open

Logstash sending garbage data to Kafka #53

himanshigpta opened this issue Aug 28, 2020 · 1 comment

Comments

@himanshigpta
Copy link

I'm using Logstash to read data from Oracle DB & ingesting it into a Kafka topic, then reading the data from the topic & ingesting it into ES (basically Oracle -> Kafka, Kafka -> ES). The query used is capturing data for a specific timestamp which fetches 52 rows, everything works as expected till here but after inserting all the rows, Logstash starts inserting below garbage value in Kafka topic:

2020-08-28T06:26:33.657Z %{host} 2020-08-28T06:26:33.523Z %{host} 2020-08-28T06:26:33.330Z %{host} 2020-08-28T06:26:33.174Z %{host} 2020-08-28T06:26:33.007Z %{host} 2020-08-28T06:26:32.863Z %{host} 2020-08-28T06:26:32.752Z %{host} 2020-08-28T06:26:32.638Z %{host} 2020-08-28T06:26:32.528Z %{host} 2020-08-28T06:26:32.387Z %{host} 2020-08-28T06:26:32.190Z %{host} 2020-08-28T06:26:32.072Z %{host} 2020-08-28T06:26:31.951Z %{host} 2020-08-28T06:26:31.775Z %{host} 2020-08-28T06:26:31.547Z %{host} 2020-08-28T06:26:22.638Z %{host} 2020-08-28T06:26:22.509Z %{host} 2020-08-28T06:26:22.397Z %{host} 2020-08-28T06:26:22.283Z %{host} 2020-08-28T06:26:22.169Z %{host} 2020-08-28T06:26:22.050Z %{host} 2020-08-28T06:26:21.925Z %{host} 2020-08-28T06:26:21.800Z %{host} 2020-08-28T06:26:21.622Z %{host} 2020-08-28T06:26:21.489Z %{host} 2020-08-28T06:26:21.349Z %{host} 2020-08-28T06:26:20.732Z %{host} %{message}

Configs that I'm using:

Oracle -> Kafka:

    input {
      jdbc {
        jdbc_driver_library => "/usr/share/logstash/ojdbc8.jar"
        jdbc_driver_class => "Java::oracle.jdbc.OracleDriver"
        jdbc_connection_string => "jdbc:oracle:thin:@redacted"
        jdbc_user => "redacted"
        jdbc_password => "redacted"
        tracking_column => "timestamp"
        use_column_value => true
        tracking_column_type => "timestamp"
        jdbc_default_timezone => "Asia/Kolkata"
        schedule => "*/10 * * * *"
        statement_filepath => "redacted"
      }
    }

    output {
      kafka {
       topic_id => "audit_data"
       bootstrap_servers => "redacted"
       acks => "0"
       jaas_path => "/usr/share/logstash/jaas.conf"
       sasl_kerberos_service_name => "kafka"
       kerberos_config => "redacted"
       codec => plain
       security_protocol => "SASL_PLAINTEXT"
      }
    }

Kafka -> ES

input {
        kafka{
    #    group_id => "logstash"
        jaas_path => "/usr/share/logstash/jaas.conf"
        sasl_kerberos_service_name => "kafka"
        kerberos_config => "redacted"
        auto_offset_reset => "latest"
        topics => ["audit_data"]
        codec => plain
        bootstrap_servers => redacted
        security_protocol => "SASL_PLAINTEXT"
    #    type => "syslog"
        decorate_events => true
        }
    }


    output {
        #stdout { codec =>  "json"}
        elasticsearch {
            hosts => ["redacted"]
            user => "redacted"
            password => "redacted"
            cacert => ["redacted"]
            action => "index"
            index => "kafka_logstash"
        }
    }

I checked Kafka topic data from consumer console & could see the garbage values continuously flowing in, so I removed the old data from topic(set the retention to 1000ms), used the same query & config parameters, this time directly from Oracle to ES & it worked fine without any garbage value. Below is the config I used:

input {
      jdbc {
        jdbc_driver_library => "/usr/share/logstash/ojdbc8.jar"
        jdbc_driver_class => "Java::oracle.jdbc.OracleDriver"
        jdbc_connection_string => "jdbc:oracle:thin:@redacted"
        jdbc_user => "redacted"
        jdbc_password => "redacted"
        tracking_column => "timestamp"
        use_column_value => true
        tracking_column_type => "timestamp"
        jdbc_default_timezone => "Asia/Kolkata"
        schedule => "*/10 * * * *"
        statement_filepath => "/usr/share/logstash/oracle.sql"
      }
    }

    output {
        #stdout { codec =>  "json"}
        elasticsearch {
            hosts => ["redacted"]
            user => "redacted"
            password => "redacted"
            cacert => ["redacted"]
            action => "index"
            index => "test_kafka_logstash"
        }
    }

I didn't observe any errors in the logs, not even during the time garbage value was flowing in Kafka topic, just this:


[2020-08-28T11:57:01,510][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher][main] [Consumer clientId=logstash-0, groupId=logstash] Added READ_UNCOMMITTED fetch request for partition audit_data-0 at position FetchPosition{offset=599596, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=redacted:9092 (id: 4582 rack: null), epoch=0}} to node redacted:9092 (id: 4582 rack: null)
[2020-08-28T11:57:01,510][DEBUG][org.apache.kafka.clients.FetchSessionHandler][main] [Consumer clientId=logstash-0, groupId=logstash] Built incremental fetch (sessionId=1305743447, epoch=3824) for node 4582. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 1 partition(s)
[2020-08-28T11:57:01,510][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher][main] [Consumer clientId=logstash-0, groupId=logstash] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(audit_data-0), toForget=(), implied=()) to broker redacted:9092 (id: 4582 rack: null)
[2020-08-28T11:57:01,510][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Consumer clientId=logstash-0, groupId=logstash] Using older server API v10 to send FETCH {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,session_id=1305743447,session_epoch=3824,topics=[{topic=audit_data,partitions=[{partition=0,current_leader_epoch=0,fetch_offset=599596,log_start_offset=-1,partition_max_bytes=1048576}]}],forgotten_topics_data=[]} with correlation id 4516 to node 4582
[2020-08-28T11:57:01,512][DEBUG][org.apache.kafka.clients.FetchSessionHandler][main] [Consumer clientId=logstash-0, groupId=logstash] Node 4582 sent an incremental fetch response for session 1305743447 with 1 response partition(s)
[2020-08-28T11:57:01,513][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher][main] [Consumer clientId=logstash-0, groupId=logstash] Fetch READ_UNCOMMITTED at offset 599596 for partition audit_data-0 returned fetch data (error=NONE, highWaterMark=599598, lastStableOffset = 599598, logStartOffset = 586750, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=16565)
[2020-08-28T11:57:01,513][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher][main] [Consumer clientId=logstash-0, groupId=logstash] Added READ_UNCOMMITTED fetch request for partition audit_data-0 at position FetchPosition{offset=599598, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=redacted:9092 (id: 4582 rack: null), epoch=0}} to node redacted:9092 (id: 4582 rack: null)
[2020-08-28T11:57:01,513][DEBUG][org.apache.kafka.clients.FetchSessionHandler][main] [Consumer clientId=logstash-0, groupId=logstash] Built incremental fetch (sessionId=1305743447, epoch=3825) for node 4582. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 1 partition(s)
[2020-08-28T11:57:01,513][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher][main] [Consumer clientId=logstash-0, groupId=logstash] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(audit_data-0), toForget=(), implied=()) to broker redacted:9092 (id: 4582 rack: null)
[2020-08-28T11:57:01,514][DEBUG][org.apache.kafka.clients.NetworkClient][main] [Consumer clientId=logstash-0, groupId=logstash] Using older server API v10 to send FETCH {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,session_id=1305743447,session_epoch=3825,topics=[{topic=audit_data,partitions=[{partition=0,current_leader_epoch=0,fetch_offset=599598,log_start_offset=-1,partition_max_bytes=1048576}]}],forgotten_topics_data=[]} with correlation id 4517 to node 4582
[2020-08-28T11:57:01,590][DEBUG][org.apache.kafka.clients.FetchSessionHandler][main] [Consumer clientId=logstash-0, groupId=logstash] Node 4582 sent an incremental fetch response for session 1305743447 with 1 response partition(s)
[2020-08-28T11:57:01,591][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher][main] [Consumer clientId=logstash-0, groupId=logstash] Fetch READ_UNCOMMITTED at offset 599598 for partition audit_data-0 returned fetch data (error=NONE, highWaterMark=599600, lastStableOffset = 599600, logStartOffset = 586750, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=16368)
[2020-08-28T11:57:01,591][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher][main] [Consumer clientId=logstash-0, groupId=logstash] Added READ_UNCOMMITTED fetch request for partition audit_data-0 at position FetchPosition{offset=599600, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=redacted:9092 (id: 4582 rack: null), epoch=0}} to node redacted:9092 (id: 4582 rack: null)
[2020-08-28T11:57:01,592][DEBUG][org.apache.kafka.clients.FetchSessionHandler][main] [Consumer clientId=logstash-0, groupId=logstash] Built incremental fetch (sessionId=1305743447, epoch=3826) for node 4582. Added 0 partition(s), altered 1 partition(s), removed 0 partition(s) out of 1 partition(s)
[2020-08-28T11:57:01,592][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher][main] [Consumer clientId=logstash-0, groupId=logstash] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(audit_data-0), toForget=(), implied=()) to broker redacted:9092 (id: 4582 rack: null)

Not sure if this has something to do with the config I'm using, could someone please help in fixing this?

For all general issues, please provide the following details for fast resolution:

  • Version: logstash.version=>"7.5.1"
  • Operating System: Red Hat Enterprise Linux Server release 7.8 (Maipo)
@himanshigpta
Copy link
Author

Update : I tried the below config in output along with Kafka:

file {
       path => "/tmp/logstash-kafka.txt"
       codec => rubydebug
      }

I had opened consumer console along with this file & could see garbage data flowing in Kafka, & correct data in the logstash-kafka.txt. Same happened when I tried Kafka along with ES, data was getting correctly ingested in ES via Logstash in the absence of Kafka in between.
Another thing I observed in Kafka console was that below garbage value occured 52 times, which is the number of rows fetched from oracle, & then it stopped sending anymore data/garbage:

2020-08-29T20:57:32.574Z %{host} %{message}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant