Skip to content

Commit

Permalink
[server] Added info to the exception in AASIT::getUpstreamKafkaUrlFro…
Browse files Browse the repository at this point in the history
…mKafkaValue (#799)

The exception now looks like this:

com.linkedin.venice.exceptions.VeniceException: No Kafka cluster ID found in the cluster ID to Kafka URL map. Got cluster ID -1 and ID to cluster URL map {0=>url0, 1=>url1}. Source Kafka: sourceKafkaURL; TP(topic: 'topic', partition: 0); Offset: 100; Message type: CONTROL_MESSAGE/TOPIC_SWITCH; ProducerMetadata: {"producerGUID": null, "segmentNumber": 0, "messageSequenceNumber": 0, "messageTimestamp": 0, "logicalTimestamp": 0}; LeaderMetadataFooter: {"hostName": null, "upstreamOffset": 0, "upstreamKafkaClusterId": -1}
  • Loading branch information
FelixGV authored Dec 12, 2023
1 parent b54fbb9 commit 3ac710d
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.kafka.protocol.TopicSwitch;
import com.linkedin.venice.kafka.protocol.Update;
import com.linkedin.venice.kafka.protocol.enums.ControlMessageType;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.kafka.protocol.state.StoreVersionState;
import com.linkedin.venice.message.KafkaKey;
Expand All @@ -55,6 +56,7 @@
import com.linkedin.venice.writer.DeleteMetadata;
import com.linkedin.venice.writer.LeaderMetadataWrapper;
import com.linkedin.venice.writer.PutMetadata;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand Down Expand Up @@ -1194,7 +1196,8 @@ private String getUpstreamKafkaUrl(
*/
upstreamKafkaURL = localKafkaServer;
} else {
upstreamKafkaURL = getUpstreamKafkaUrlFromKafkaValue(kafkaValue);
upstreamKafkaURL =
getUpstreamKafkaUrlFromKafkaValue(consumerRecord, recordSourceKafkaUrl, this.kafkaClusterIdToUrlMap);
}
}
return upstreamKafkaURL;
Expand Down Expand Up @@ -1243,18 +1246,35 @@ protected void updateLatestInMemoryLeaderConsumedRTOffset(
pcs.updateLeaderConsumedUpstreamRTOffset(kafkaUrl, offset);
}

private String getUpstreamKafkaUrlFromKafkaValue(KafkaMessageEnvelope kafkaValue) {
/**
* N.B. package-private for testing purposes.
*/
static String getUpstreamKafkaUrlFromKafkaValue(
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord,
String recordSourceKafkaUrl,
Int2ObjectMap<String> kafkaClusterIdToUrlMap) {
KafkaMessageEnvelope kafkaValue = consumerRecord.getValue();
if (kafkaValue.leaderMetadataFooter == null) {
throw new VeniceException("leaderMetadataFooter field in KME should have been set.");
}
String upstreamKafkaURL = this.kafkaClusterIdToUrlMap.get(kafkaValue.leaderMetadataFooter.upstreamKafkaClusterId);
String upstreamKafkaURL = kafkaClusterIdToUrlMap.get(kafkaValue.leaderMetadataFooter.upstreamKafkaClusterId);
if (upstreamKafkaURL == null) {
MessageType type = MessageType.valueOf(kafkaValue.messageType);
throw new VeniceException(
String.format(
"No Kafka cluster ID found in the cluster ID to Kafka URL map. "
+ "Got cluster ID %d and ID to cluster URL map %s",
+ "Got cluster ID %d and ID to cluster URL map %s. Source Kafka: %s; "
+ "%s; Offset: %d; Message type: %s; ProducerMetadata: %s; LeaderMetadataFooter: %s",
kafkaValue.leaderMetadataFooter.upstreamKafkaClusterId,
kafkaClusterIdToUrlMap));
kafkaClusterIdToUrlMap,
recordSourceKafkaUrl,
consumerRecord.getTopicPartition(),
consumerRecord.getOffset(),
type.toString() + (type == MessageType.CONTROL_MESSAGE
? "/" + ControlMessageType.valueOf((ControlMessage) kafkaValue.getPayloadUnion())
: ""),
kafkaValue.producerMetadata,
kafkaValue.leaderMetadataFooter));
}
return upstreamKafkaURL;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;

import com.github.luben.zstd.Zstd;
import com.linkedin.davinci.config.VeniceServerConfig;
Expand All @@ -36,10 +38,14 @@
import com.linkedin.venice.compression.NoopCompressor;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.compression.ZstdWithDictCompressor;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.ControlMessage;
import com.linkedin.venice.kafka.protocol.GUID;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.LeaderMetadata;
import com.linkedin.venice.kafka.protocol.ProducerMetadata;
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.kafka.protocol.enums.ControlMessageType;
import com.linkedin.venice.kafka.protocol.enums.MessageType;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.BufferReplayPolicy;
Expand All @@ -57,12 +63,15 @@
import com.linkedin.venice.meta.VersionImpl;
import com.linkedin.venice.meta.ZKStore;
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
import com.linkedin.venice.pubsub.ImmutablePubSubMessage;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import com.linkedin.venice.pubsub.api.PubSubProducerAdapter;
import com.linkedin.venice.pubsub.api.PubSubProducerCallback;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.pubsub.api.PubSubTopicType;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.serialization.KeyWithChunkingSuffixSerializer;
Expand All @@ -78,6 +87,7 @@
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterOptions;
import it.unimi.dsi.fastutil.ints.Int2ObjectArrayMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.objects.Object2IntArrayMap;
import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -90,6 +100,8 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.mockito.ArgumentCaptor;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
Expand All @@ -98,6 +110,7 @@


public class ActiveActiveStoreIngestionTaskTest {
private static final Logger LOGGER = LogManager.getLogger(ActiveActiveStoreIngestionTaskTest.class);
String STORE_NAME = "Thvorusleikir_store";
String PUSH_JOB_ID = "yule";
String BOOTSTRAP_SERVER = "Stekkjastaur";
Expand Down Expand Up @@ -541,6 +554,99 @@ public void testUnwrapByteBufferFromOldValueProvider() {
assertNotNull(lazyBB.get());
}

@Test
public void testGetUpstreamKafkaUrlFromKafkaValue() {
PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
PubSubTopicPartition partition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic("topic"), 0);
long offset = 100;
long timestamp = System.currentTimeMillis();
int payloadSize = 200;
String sourceKafka = "sourceKafkaURL";

Int2ObjectMap<String> kafkaClusterIdToUrlMap = new Int2ObjectArrayMap<>(2);
kafkaClusterIdToUrlMap.put(0, "url0");
kafkaClusterIdToUrlMap.put(1, "url1");

KafkaMessageEnvelope kmeWithNullLeaderMetadata = new KafkaMessageEnvelope();
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessageWithNullLeaderMetadata =
new ImmutablePubSubMessage<>(
new KafkaKey(MessageType.PUT, new byte[] { 1 }),
kmeWithNullLeaderMetadata,
partition,
offset,
timestamp,
payloadSize);
try {
ActiveActiveStoreIngestionTask
.getUpstreamKafkaUrlFromKafkaValue(pubSubMessageWithNullLeaderMetadata, sourceKafka, kafkaClusterIdToUrlMap);
} catch (VeniceException e) {
LOGGER.info("kmeWithNullLeaderMetadata", e);
assertEquals(e.getMessage(), "leaderMetadataFooter field in KME should have been set.");
}

KafkaMessageEnvelope kmeWithAbsentUpstreamCluster = new KafkaMessageEnvelope();
kmeWithAbsentUpstreamCluster.setLeaderMetadataFooter(new LeaderMetadata());
kmeWithAbsentUpstreamCluster.getLeaderMetadataFooter().upstreamKafkaClusterId = -1;
kmeWithAbsentUpstreamCluster.setMessageType(MessageType.PUT.getValue());
kmeWithAbsentUpstreamCluster.setProducerMetadata(new ProducerMetadata());
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> msgWithAbsentUpstreamCluster = new ImmutablePubSubMessage<>(
new KafkaKey(MessageType.PUT, new byte[] { 1 }),
kmeWithAbsentUpstreamCluster,
partition,
offset,
timestamp,
payloadSize);
try {
ActiveActiveStoreIngestionTask
.getUpstreamKafkaUrlFromKafkaValue(msgWithAbsentUpstreamCluster, sourceKafka, kafkaClusterIdToUrlMap);
} catch (VeniceException e) {
LOGGER.info("kmeWithAbsentUpstreamCluster", e);
assertTrue(e.getMessage().startsWith("No Kafka cluster ID found in the cluster ID to Kafka URL map."));
assertTrue(e.getMessage().contains("Message type: " + MessageType.PUT));
}

KafkaMessageEnvelope kmeForControlMessage = new KafkaMessageEnvelope();
kmeForControlMessage.setLeaderMetadataFooter(new LeaderMetadata());
kmeForControlMessage.getLeaderMetadataFooter().upstreamKafkaClusterId = -1;
kmeForControlMessage.setMessageType(MessageType.CONTROL_MESSAGE.getValue());
ControlMessage controlMessage = new ControlMessage();
controlMessage.setControlMessageType(ControlMessageType.TOPIC_SWITCH.getValue());
kmeForControlMessage.setPayloadUnion(controlMessage);
kmeForControlMessage.setProducerMetadata(new ProducerMetadata());
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> msgForControlMessage = new ImmutablePubSubMessage<>(
new KafkaKey(MessageType.CONTROL_MESSAGE, new byte[] { 1 }),
kmeForControlMessage,
partition,
offset,
timestamp,
payloadSize);
try {
ActiveActiveStoreIngestionTask
.getUpstreamKafkaUrlFromKafkaValue(msgForControlMessage, sourceKafka, kafkaClusterIdToUrlMap);
} catch (VeniceException e) {
LOGGER.info("kmeForControlMessage", e);
assertTrue(e.getMessage().startsWith("No Kafka cluster ID found in the cluster ID to Kafka URL map."));
assertTrue(
e.getMessage()
.contains("Message type: " + MessageType.CONTROL_MESSAGE + "/" + ControlMessageType.TOPIC_SWITCH));
}

KafkaMessageEnvelope validKME = new KafkaMessageEnvelope();
validKME.setLeaderMetadataFooter(new LeaderMetadata());
validKME.getLeaderMetadataFooter().upstreamKafkaClusterId = 0;
validKME.setProducerMetadata(new ProducerMetadata());
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> validMsg = new ImmutablePubSubMessage<>(
new KafkaKey(MessageType.PUT, new byte[] { 1 }),
validKME,
partition,
offset,
timestamp,
payloadSize);
assertEquals(
ActiveActiveStoreIngestionTask.getUpstreamKafkaUrlFromKafkaValue(validMsg, sourceKafka, kafkaClusterIdToUrlMap),
"url0");
}

private VeniceCompressor getCompressor(CompressionStrategy strategy) {
if (Objects.requireNonNull(strategy) == CompressionStrategy.ZSTD_WITH_DICT) {
byte[] dictionary = ZstdWithDictCompressor.buildDictionaryOnSyntheticAvroData();
Expand Down

0 comments on commit 3ac710d

Please sign in to comment.