Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer can't retrieve large message from Kafka #95

Open
JsonFn opened this issue Jul 26, 2018 · 0 comments
Open

Consumer can't retrieve large message from Kafka #95

JsonFn opened this issue Jul 26, 2018 · 0 comments

Comments

@JsonFn
Copy link

JsonFn commented Jul 26, 2018

Kafka version: 1.1.1
Scala version: 2.11.12
sbt version: 0.13.16
li-apache-kafka-clients version: 0.0.15

It seems that I can produce large String message to Kafka broker, but consumer (with LiKafkaConsumerImpl) fails to retrieve the message. There is no error. Only the returned ConsumerRecords.count() size is 0, so it keeps in looping for polling message out of Kafka broker.

However I notice if consuming with KafkaConsumer instance, ConsumerRecords.count() contains size like 500.

Is following configuration correct?

* LiKafkaProducerImpl
    * Config
        * "large.message.enabled" -> "true"
        * "segment.serializer" -> classOf[DefaultSegmentSerializer].getName
        * "auditor.class" -> classOf[LoggingAuditor[_,_]].getName
        * "max.message.segment.size" -> 200 (or "max.message.segment.bytes" -> 200)

 * LiKafkaConsumerImpl
     * Config
         * "message.assembler.buffer.capacity" -> "20971520"
         * "message.assembler.expiration.offset.gap" -> "10000"
         * "exception.on.message.dropped" -> "true"
         * "max.tracked.messages.per.partition" -> "10000"

There are several warnings for both Producer and Consumer that looks like not harmful. Below are one of them

2018-07-26 14:13:59 WARN  ConsumerConfig:287 - The configuration 'exception.on.message.dropped' was supplied but isn't a known config.

Log messages seems to show that Producer works ok.

2018-07-26 14:13:59 INFO  LoggingAuditor:46 - [Thu Jul 26 14:10:00 CEST 2018 - Thu Jul 26 14:20:00 CEST 2018] : (testLargeMessageTopic,2554345,ATTEMPT), (1 messages, 2097172 bytes)
2018-07-26 14:13:59 INFO  LoggingAuditor:46 - [Thu Jul 26 14:10:00 CEST 2018 - Thu Jul 26 14:20:00 CEST 2018] : (testLargeMessageTopic,2554345,SUCCESS), (1 messages, 2097172 bytes)
2018-07-26 14:13:59 INFO  KafkaProducer:1054 - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2018-07-26 14:13:59 INFO  LiKafkaProducerImpl:301 - Shutdown complete in 5 millis

Code that generate large string

  def msg(): String = {
    val builder = new StringBuilder()
    for(idx <- 0 to (2 * 1024 * 1024)) builder.append((idx % 10))
    builder.toString
  }

Code that produce and consume large message

  val prodProps = ... // producer properties described above 
  val producer = new LiKafkaProducer(prodProps)
  producer.send("testLargeMessageTopic", "testLargeMessageKey", largeString)
  try { producer.flush } finally { producer.close }

  val conProps = ... // consumer properties describe above 
  val consumer = new LiKafkaConsumer(conProps)
  consumer.subscribe("testLargeMessageTopic")
  var flag = true
  breakable {
      while(flag) {
          val records = consumer.poll(100L)
          println(s"${records.count} records ...") // this always returns 0 so it never falls in stoping foreach loop
          
          records.asScala.foreach { case record => flag = op(record);  if(!flag) break }
      }
  }

But initialize with KafkaConsumer, it does returns records, except it's just segment data e.g.

topic: testLargeMessageTopic, key: testLargeMessageKey, value: mM�_P��H��/�4Z��^(� �01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789

What could be missing when creating LiKafkaConsumer for consuming large message?

Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant