diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/largemessage/MessageSplitterImpl.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/largemessage/MessageSplitterImpl.java index e4935d8..b18d65b 100644 --- a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/largemessage/MessageSplitterImpl.java +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/largemessage/MessageSplitterImpl.java @@ -94,8 +94,8 @@ public List> split(String topic, LargeMessageSegment segment = new LargeMessageSegment(segmentMessageId, seq, numberOfSegments, messageSizeInBytes, payload); - // NOTE: we have to use null topic here to serialize because the segment should be topic independent. - byte[] segmentValue = _segmentSerializer.serialize(null, segment); + // NOTE: Even though we are passing topic here to serialize, the segment itself should be topic independent. + byte[] segmentValue = _segmentSerializer.serialize(topic, segment); ProducerRecord segmentProducerRecord = new ProducerRecord<>(topic, partition, timestamp, segmentKey, segmentValue); segments.add(segmentProducerRecord);