Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task threw an uncaught and unrecoverable exception/NullPointerException #30

Open
mbrettschneider opened this issue Mar 22, 2019 · 5 comments · May be fixed by #47
Open

Task threw an uncaught and unrecoverable exception/NullPointerException #30

mbrettschneider opened this issue Mar 22, 2019 · 5 comments · May be fixed by #47

Comments

@mbrettschneider
Copy link

mbrettschneider commented Mar 22, 2019

I tried to start the connector with

./kafka_2.11-2.1.1/bin/connect-standalone.sh worker.properties kinesis-streams-kafka-connector.properties

Unfortunately after some promising messages:

[logging.cc:170] Set AWS Log Level to WARN (com.amazonaws.services.kinesis.producer.LogInputStreamReader:59)
[main.cc:346] Setting CA path to /tmp/amazon-kinesis-producer-native-binaries/cacerts (com.amazonaws.services.kinesis.producer.LogInputStreamReader:59)
[main.cc:382] Starting up main producer (com.amazonaws.services.kinesis.producer.LogInputStreamReader:59)
[kinesis_producer.cc:101] Using Region: eu-central-1 (com.amazonaws.services.kinesis.producer.LogInputStreamReader:59)
[kinesis_producer.cc:120] Using per request threading model. (com.amazonaws.services.kinesis.producer.LogInputStreamReader:59)
[kinesis_producer.cc:52] Using default Kinesis endpoint (com.amazonaws.services.kinesis.producer.LogInputStreamReader:59)
[kinesis_producer.cc:101] Using Region: eu-central-1 (com.amazonaws.services.kinesis.producer.LogInputStreamReader:59)
[kinesis_producer.cc:120] Using per request threading model. (com.amazonaws.services.kinesis.producer.LogInputStreamReader:59)
[kinesis_producer.cc:52] Using default CloudWatch endpoint (com.amazonaws.services.kinesis.producer.LogInputStreamReader:59)
[main.cc:393] Entering join (com.amazonaws.services.kinesis.producer.LogInputStreamReader:59)

I get the following messages:

[2019-03-12 16:54:55,833] ERROR WorkerSinkTask{id=KinesisKafkaPartnerRankConnector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
java.lang.NullPointerException
	at com.amazon.kinesis.kafka.AmazonKinesisSinkTask.close(AmazonKinesisSinkTask.java:283)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:398)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:617)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
[2019-03-12 16:54:55,837] ERROR WorkerSinkTask{id=KinesisKafkaPartnerRankConnector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)

What can I do to make it work?

@zhiyanliu
Copy link

zhiyanliu commented Sep 29, 2020

I met the same case.

After some dig, I can see rebalancing might cause the the topic partitions param of close() [0] contains more items than the opened producer map [1] in the sink task.

I think the root cause is:

TL;DR, at [2] the currentOffsets is used to call task.close() however it can be updated at [3] where is before to open the partition and prepare the producer for the sink task at [4], this finally causes producerMap in the sink task out-of-sync with the currentOffsets in the sink task work.

In the detail, during the consumer polls the message from the broker at [5], the rebalanceException can happen at [6] and can happen more than one time, however doesn't like [3] openPartitions() will be called only once at first time.

I can see an issue report about kafka-connect-storage-cloud cased by a similar out-of-sync case at [7].

The solution:

Instead of just to handle all the the local producer/partition items but ignore the param partitions (topicPartitionWriters at [8]), I prefer to do an easy check like [9], will verify it.

[0] https://github.com/awslabs/kinesis-kafka-connector/blob/master/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkTask.java#L280
[1] https://github.com/awslabs/kinesis-kafka-connector/blob/master/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkTask.java#L275
[2] https://github.com/apache/kafka/blob/2.3.1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L397
[3] https://github.com/apache/kafka/blob/2.3.1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L630
[4] https://github.com/apache/kafka/blob/2.3.1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L652
[5] https://github.com/apache/kafka/blob/2.3.1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L447
[6] https://github.com/apache/kafka/blob/2.3.1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L650
[7] https://github.com/confluentinc/kafka-connect-storage-cloud/pull/322/files
[8] https://github.com/confluentinc/kafka-connect-storage-cloud/pull/322/files#diff-16b6be2d931b0825d79f3b4c517327b4R225
[9] https://github.com/zhiyanliu/kinesis-kafka-connector/blob/master/src/main/java/com/amazon/kinesis/kafka/AmazonKinesisSinkTask.java#L284

@bdesert
Copy link
Contributor

bdesert commented Oct 1, 2020

@zhiyanliu what would be the fastest way to reproduce the issue? Just make it rebalancing all the time until async happens? Topic repartitioning?

@zhiyanliu
Copy link

@bdesert I think so at the moment, trigger rebalancing continually until it happens. According to the 3 points listed in the doc [0], beside topic repartitioning you can also trigger it by:

  • a consumer join the group
  • a consumer shutdown
  • a consumer is considered dead by the group coordinator

For me, we met this in a dedicated environment I can't touch, I didn't try to reproduce it locally and analyzed it by static code logic.

[0] https://www.confluent.io/blog/kafka-rebalance-protocol-static-membership/

zhiyanliu added a commit to zhiyanliu/kinesis-kafka-connector that referenced this issue Oct 10, 2020
@bdesert
Copy link
Contributor

bdesert commented Oct 12, 2022

@akhon, can you confirm from your side the fix is working as expected?

@mbrettschneider
Copy link
Author

Not relevant anymore.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants