Skip to content

Commit

Permalink
[controller] Stop writing writing ready-to-serve instances info in me…
Browse files Browse the repository at this point in the history
…ta system store (#1228)

When materializing the meta store, we generate a snapshot of ready to serve instances  
for all user store versions. However, if a version is in an ERROR state, its Helix resources  
may be absent. Checking these resources in such cases can trigger exceptions,  
potentially interrupting the addVersion process.

Previously, the meta store required readyToServe instances data for client use,
but since no clients currently rely on it, it's safe to discontinue storing this
information in the meta system store.
  • Loading branch information
sushantmane authored Oct 9, 2024
1 parent 53adf35 commit 1ce7d5e
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@
import static com.linkedin.venice.fastclient.utils.ClientTestUtils.FASTCLIENT_HTTP_VARIANTS;
import static com.linkedin.venice.fastclient.utils.ClientTestUtils.REQUEST_TYPES_SMALL;
import static com.linkedin.venice.fastclient.utils.ClientTestUtils.STORE_METADATA_FETCH_MODES;
import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_CLUSTER_NAME;
import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_PARTITION_ID;
import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_STORE_NAME;
import static com.linkedin.venice.system.store.MetaStoreWriter.KEY_STRING_VERSION_NUMBER;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
Expand Down Expand Up @@ -44,9 +40,6 @@
import com.linkedin.venice.serialization.VeniceKafkaSerializer;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.system.store.MetaStoreDataType;
import com.linkedin.venice.systemstore.schemas.StoreMetaKey;
import com.linkedin.venice.systemstore.schemas.StoreMetaValue;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.SslUtils;
Expand Down Expand Up @@ -281,31 +274,6 @@ private void prepareMetaSystemStore() throws Exception {
30,
TimeUnit.SECONDS);
});

// Verify meta system store received the snapshot writes.
try (AvroSpecificStoreClient<StoreMetaKey, StoreMetaValue> metaClient =
com.linkedin.venice.client.store.ClientFactory.getAndStartSpecificAvroClient(
com.linkedin.venice.client.store.ClientConfig
.defaultSpecificClientConfig(metaSystemStoreName, StoreMetaValue.class)
.setVeniceURL(veniceCluster.getRandomRouterURL())
.setSslFactory(SslUtils.getVeniceLocalSslFactory()))) {
StoreMetaKey replicaStatusKey =
MetaStoreDataType.STORE_REPLICA_STATUSES.getStoreMetaKey(new HashMap<String, String>() {
{
put(KEY_STRING_STORE_NAME, storeName);
put(KEY_STRING_CLUSTER_NAME, veniceCluster.getClusterName());
put(
KEY_STRING_VERSION_NUMBER,
Integer.toString(Version.parseVersionFromVersionTopicName(storeVersionName)));
put(KEY_STRING_PARTITION_ID, "0");
}
});
TestUtils.waitForNonDeterministicAssertion(
30,
TimeUnit.SECONDS,
true,
() -> assertNotNull(metaClient.get(replicaStatusKey).get()));
}
}

private void waitForRouterD2() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7958,41 +7958,6 @@ void setUpMetaStoreAndMayProduceSnapshot(String clusterName, String regularStore
"Wrote value schemas to meta system store for venice store: {} in cluster: {}",
regularStoreName,
clusterName);
// replica status for all the available versions
List<Version> versions = store.getVersions();
if (versions.isEmpty()) {
return;
}
for (Version version: versions) {
int versionNumber = version.getNumber();
String topic = Version.composeKafkaTopic(regularStoreName, versionNumber);
int partitionCount = version.getPartitionCount();
HelixCustomizedViewOfflinePushRepository customizedViewOfflinePushRepository =
getHelixVeniceClusterResources(clusterName).getCustomizedViewRepository();
for (int curPartitionId = 0; curPartitionId < partitionCount; ++curPartitionId) {
List<Instance> readyToServeInstances =
customizedViewOfflinePushRepository.getReadyToServeInstances(topic, curPartitionId);
metaStoreWriter.get()
.writeReadyToServerStoreReplicas(
clusterName,
regularStoreName,
versionNumber,
curPartitionId,
readyToServeInstances);
LOGGER.info(
"Wrote the following ready-to-serve instance: {} for store: {}, version: {}, partition id: {} in cluster: {}",
readyToServeInstances.toString(),
regularStoreName,
versionNumber,
curPartitionId,
clusterName);
}
LOGGER.info(
"Wrote replica status snapshot for version: {} to meta system store for venice store: {} in cluster: {}",
versionNumber,
regularStoreName,
clusterName);
}
}

private boolean isAmplificationFactorUpdateOnly(
Expand Down

0 comments on commit 1ce7d5e

Please sign in to comment.