From 2e708d335f8a7e9ce78761038d77749f6157981d Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Wed, 15 Nov 2023 14:31:56 -0800 Subject: [PATCH] [da-vinci][server][controller] Fetch latest update schema available for push status writer in all Venice components (#750) Previously, PushStatusStoreWriter uses PUSH_STATUS_SYSTEM_SCHEMA_STORE's current protocol id as value schema, and relying on a config: PUSH_STATUS_STORE_DERIVED_SCHEMA_ID (default = 1) to as update schema id, and use a hardcoded specific write op record as partial update record. This is wrong as 1-1 is our legacy schema type which is a union with content of [update schema, del op]. The actual static update record is only the first branch of the union. And we found that Fast-Avro fails to deserialize this schema as writer schema and 1-3 as reader schema. Even though vanilla Avro does not throw exception, but it does not give correct deserialized result as well. This PR fixed the above issue by asking all components that use PushStatusStoreWriter to fetch the latest value schema and latest update schema from StoreSchemaFetcher and schema repository. PushStatusStoreWriter will no longer use protocol ID / value schema from AvroProtocolDefinition as it might not be registered. It will also not rely on config to find update protocol ID. This PR also performs some cleanup: Remove PushStatusStoreRecordDeleter and merge functionality into PushStatuStoreWriter. Make PushStatusStoreWriter/PushStatusStoreReader no longer optional in Admin code as it has been rolled out for more than 2 years. Some other code cleanup. --- .../com/linkedin/davinci/DaVinciBackend.java | 16 +- .../helix/HelixParticipationService.java | 22 ++- .../schema/RouterBasedStoreSchemaFetcher.java | 27 ++- .../client/schema/StoreSchemaFetcher.java | 10 ++ .../client/store/AvroGenericStoreClient.java | 2 +- .../PushStatusStoreRecordDeleter.java | 77 --------- .../PushStatusStoreVeniceWriterCache.java | 15 +- .../PushStatusStoreWriter.java | 158 +++++++++++------- .../writer/update/UpdateBuilderImpl.java | 3 +- .../v1/PushStatusValueWriteOpRecord.avsc | 54 ------ .../PushStatusStoreWriterTest.java | 107 ++++++++---- .../venice/endToEnd/PushStatusStoreTest.java | 23 ++- .../com/linkedin/venice/controller/Admin.java | 10 +- .../HelixVeniceClusterResources.java | 2 +- .../UserSystemStoreLifeCycleHelper.java | 10 +- .../controller/VeniceControllerConfig.java | 2 +- .../venice/controller/VeniceHelixAdmin.java | 94 ++++------- .../controller/VeniceParentHelixAdmin.java | 15 +- .../linkedin/venice/server/VeniceServer.java | 1 + 19 files changed, 305 insertions(+), 343 deletions(-) delete mode 100644 internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreRecordDeleter.java delete mode 100644 internal/venice-common/src/main/resources/avro/PushStatusValueWriteOpRecord/v1/PushStatusValueWriteOpRecord.avsc diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index b96b7dd113..17b0a01c93 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -1,6 +1,5 @@ package com.linkedin.davinci; -import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_STORE_DERIVED_SCHEMA_ID; import static com.linkedin.venice.ConfigKeys.VALIDATE_VENICE_INTERNAL_SCHEMA_VERSION; import static java.lang.Thread.currentThread; @@ -26,6 +25,7 @@ import com.linkedin.davinci.store.AbstractStorageEngine; import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend; import com.linkedin.davinci.store.cache.backend.ObjectCacheConfig; +import com.linkedin.venice.client.schema.StoreSchemaFetcher; import com.linkedin.venice.client.store.ClientConfig; import com.linkedin.venice.client.store.ClientFactory; import com.linkedin.venice.common.VeniceSystemStoreType; @@ -44,7 +44,9 @@ import com.linkedin.venice.pubsub.PubSubClientsFactory; import com.linkedin.venice.pushmonitor.ExecutionStatus; import com.linkedin.venice.pushstatushelper.PushStatusStoreWriter; +import com.linkedin.venice.schema.SchemaEntry; import com.linkedin.venice.schema.SchemaReader; +import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry; import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer; import com.linkedin.venice.serialization.avro.SchemaPresenceChecker; @@ -187,8 +189,16 @@ public DaVinciBackend( new VeniceWriterFactory(backendProps.toProperties(), pubSubClientsFactory.getProducerAdapterFactory(), null); String pid = Utils.getPid(); String instanceName = Utils.getHostName() + "_" + (pid == null ? "NA" : pid); - int derivedSchemaID = backendProps.getInt(PUSH_STATUS_STORE_DERIVED_SCHEMA_ID, 1); - pushStatusStoreWriter = new PushStatusStoreWriter(writerFactory, instanceName, derivedSchemaID); + + // Fetch latest update schema's protocol ID for Push Status Store from Router. + ClientConfig pushStatusStoreClientConfig = ClientConfig.cloneConfig(clientConfig) + .setStoreName(VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.getZkSharedStoreName()); + try (StoreSchemaFetcher schemaFetcher = ClientFactory.createStoreSchemaFetcher(pushStatusStoreClientConfig)) { + SchemaEntry valueSchemaEntry = schemaFetcher.getLatestValueSchemaEntry(); + DerivedSchemaEntry updateSchemaEntry = schemaFetcher.getUpdateSchemaEntry(valueSchemaEntry.getId()); + pushStatusStoreWriter = + new PushStatusStoreWriter(writerFactory, instanceName, valueSchemaEntry, updateSchemaEntry); + } SchemaReader kafkaMessageEnvelopeSchemaReader = ClientFactory.getSchemaReader( ClientConfig.cloneConfig(clientConfig) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/HelixParticipationService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/HelixParticipationService.java index 9456f46575..5f7daf72b2 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/HelixParticipationService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/HelixParticipationService.java @@ -1,7 +1,5 @@ package com.linkedin.davinci.helix; -import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_STORE_DERIVED_SCHEMA_ID; - import com.linkedin.davinci.config.VeniceConfigLoader; import com.linkedin.davinci.config.VeniceServerConfig; import com.linkedin.davinci.config.VeniceStoreVersionConfig; @@ -16,6 +14,7 @@ import com.linkedin.davinci.stats.ParticipantStateTransitionStats; import com.linkedin.davinci.storage.StorageMetadataService; import com.linkedin.davinci.storage.StorageService; +import com.linkedin.venice.common.VeniceSystemStoreType; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.helix.HelixAdapterSerializer; import com.linkedin.venice.helix.HelixInstanceConverter; @@ -26,10 +25,13 @@ import com.linkedin.venice.helix.ZkClientFactory; import com.linkedin.venice.meta.IngestionMode; import com.linkedin.venice.meta.Instance; +import com.linkedin.venice.meta.ReadOnlySchemaRepository; import com.linkedin.venice.meta.ReadOnlyStoreRepository; import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; import com.linkedin.venice.pushmonitor.KillOfflinePushMessage; import com.linkedin.venice.pushstatushelper.PushStatusStoreWriter; +import com.linkedin.venice.schema.SchemaEntry; +import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry; import com.linkedin.venice.service.AbstractVeniceService; import com.linkedin.venice.stats.HelixMessageChannelStats; import com.linkedin.venice.status.StatusMessageHandler; @@ -76,6 +78,7 @@ public class HelixParticipationService extends AbstractVeniceService private final StorageService storageService; private final VeniceConfigLoader veniceConfigLoader; private final ReadOnlyStoreRepository helixReadOnlyStoreRepository; + private final ReadOnlySchemaRepository helixReadOnlySchemaRepository; private final MetricsRepository metricsRepository; private final VeniceIngestionBackend ingestionBackend; private final CompletableFuture managerFuture; // complete this future when the manager is connected @@ -99,6 +102,7 @@ public HelixParticipationService( StorageMetadataService storageMetadataService, VeniceConfigLoader veniceConfigLoader, ReadOnlyStoreRepository helixReadOnlyStoreRepository, + ReadOnlySchemaRepository helixReadOnlySchemaRepository, MetricsRepository metricsRepository, String zkAddress, String clusterName, @@ -113,6 +117,7 @@ public HelixParticipationService( this.zkAddress = zkAddress; this.veniceConfigLoader = veniceConfigLoader; this.helixReadOnlyStoreRepository = helixReadOnlyStoreRepository; + this.helixReadOnlySchemaRepository = helixReadOnlySchemaRepository; this.metricsRepository = metricsRepository; this.instance = new Instance(participantName, hostname, port); this.managerFuture = managerFuture; @@ -299,17 +304,20 @@ private void checkBeforeJoinInCluster() { private void asyncStart() { zkClient = ZkClientFactory.newZkClient(zkAddress); - // we use push status store for persisting incremental push statuses VeniceServerConfig veniceServerConfig = veniceConfigLoader.getVeniceServerConfig(); VeniceProperties veniceProperties = veniceServerConfig.getClusterProperties(); PubSubProducerAdapterFactory pubSubProducerAdapterFactory = veniceServerConfig.getPubSubClientsFactory().getProducerAdapterFactory(); VeniceWriterFactory writerFactory = new VeniceWriterFactory(veniceProperties.toProperties(), pubSubProducerAdapterFactory, null); - statusStoreWriter = new PushStatusStoreWriter( - writerFactory, - instance.getNodeId(), - veniceProperties.getInt(PUSH_STATUS_STORE_DERIVED_SCHEMA_ID, 1)); + String dummyPushStatusStoreName = VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.getSystemStoreName("dummy"); + SchemaEntry valueSchemaEntry = + helixReadOnlySchemaRepository.getSupersetOrLatestValueSchema(dummyPushStatusStoreName); + DerivedSchemaEntry updateSchemaEntry = + helixReadOnlySchemaRepository.getLatestDerivedSchema(dummyPushStatusStoreName, valueSchemaEntry.getId()); + // We use push status store for persisting incremental push statuses + statusStoreWriter = + new PushStatusStoreWriter(writerFactory, instance.getNodeId(), valueSchemaEntry, updateSchemaEntry); // Record replica status in Zookeeper. // Need to be started before connecting to ZK, otherwise some notification will not be sent by this notifier. diff --git a/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/schema/RouterBasedStoreSchemaFetcher.java b/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/schema/RouterBasedStoreSchemaFetcher.java index 10d4f9e4cf..7baf3dc205 100644 --- a/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/schema/RouterBasedStoreSchemaFetcher.java +++ b/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/schema/RouterBasedStoreSchemaFetcher.java @@ -11,6 +11,8 @@ import com.linkedin.venice.exceptions.InvalidVeniceSchemaException; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.schema.SchemaData; +import com.linkedin.venice.schema.SchemaEntry; +import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry; import com.linkedin.venice.utils.ObjectMapperFactory; import com.linkedin.venice.utils.RetryUtils; import java.io.IOException; @@ -80,6 +82,13 @@ public Schema getLatestValueSchema() { return latestSchema; } + @Override + public SchemaEntry getLatestValueSchemaEntry() { + String latestSchemaRequestPath = TYPE_LATEST_VALUE_SCHEMA + "/" + storeClient.getStoreName(); + SchemaResponse schemaResponse = fetchSingleSchema(latestSchemaRequestPath); + return new SchemaEntry(schemaResponse.getId(), schemaResponse.getSchemaStr()); + } + private Schema getLatestValueSchemaFromAllValueSchemas() { MultiSchemaResponse multiSchemaResponse = fetchAllValueSchemas(); int targetSchemaId = multiSchemaResponse.getSuperSetSchemaId(); @@ -117,6 +126,17 @@ public Map getAllValueSchemasWithId() { return schemaSet; } + @Override + public DerivedSchemaEntry getUpdateSchemaEntry(int valueSchemaId) { + // Fetch the latest update schema for the specified value schema. + String updateSchemaRequestPath = TYPE_GET_UPDATE_SCHEMA + "/" + storeClient.getStoreName() + "/" + valueSchemaId; + SchemaResponse updateSchemaResponse = fetchSingleSchema(updateSchemaRequestPath); + return new DerivedSchemaEntry( + updateSchemaResponse.getId(), + updateSchemaResponse.getDerivedSchemaId(), + updateSchemaResponse.getSchemaStr()); + } + @Override public Schema getUpdateSchema(Schema valueSchema) throws VeniceException { int valueSchemaId = getValueSchemaId(valueSchema); @@ -195,7 +215,7 @@ private MultiSchemaResponse fetchAllValueSchemas() { return multiSchemaResponse; } - private String fetchSingleSchemaString(String requestPath) throws VeniceClientException { + SchemaResponse fetchSingleSchema(String requestPath) throws VeniceClientException { SchemaResponse schemaResponse; byte[] response = executeRequest(requestPath); try { @@ -208,6 +228,11 @@ private String fetchSingleSchemaString(String requestPath) throws VeniceClientEx "Received an error while fetching schema from path: " + requestPath + ", error message: " + schemaResponse.getError()); } + return schemaResponse; + } + + private String fetchSingleSchemaString(String requestPath) throws VeniceClientException { + SchemaResponse schemaResponse = fetchSingleSchema(requestPath); if (schemaResponse.getSchemaStr() == null) { throw new VeniceException("Received bad schema response with null schema string"); } diff --git a/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/schema/StoreSchemaFetcher.java b/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/schema/StoreSchemaFetcher.java index 3652c238b9..4878705aa7 100644 --- a/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/schema/StoreSchemaFetcher.java +++ b/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/schema/StoreSchemaFetcher.java @@ -2,6 +2,8 @@ import com.linkedin.venice.exceptions.InvalidVeniceSchemaException; import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.schema.SchemaEntry; +import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry; import java.io.Closeable; import java.util.HashSet; import java.util.Map; @@ -55,4 +57,12 @@ default Set getAllValueSchemas() { * Returns the Venice store name this class is associated with. */ String getStoreName(); + + SchemaEntry getLatestValueSchemaEntry(); + + /** + * Get the latest derived schema of a value schema. Return the derived schema in {@link DerivedSchemaEntry} format, + * which contains Schema and protocol ID. + */ + DerivedSchemaEntry getUpdateSchemaEntry(int valueSchemaId); } diff --git a/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/store/AvroGenericStoreClient.java b/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/store/AvroGenericStoreClient.java index f817eafbac..44e79b85ad 100644 --- a/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/store/AvroGenericStoreClient.java +++ b/clients/venice-thin-client/src/main/java/com/linkedin/venice/client/store/AvroGenericStoreClient.java @@ -157,7 +157,7 @@ public void onCompletion(Optional exception) { * Get the latest value schema known in current store client. * This function doesn't guarantee it will return the latest schema if you add a new value schema * when current store client is running. - * @deprecated This method is considered deprecated. Please use {@link StoreSchemaFetcher#getLatestValueSchema()} to fetch + * @deprecated This method is considered deprecated. Please use {@link StoreSchemaFetcher#getLatestValueSchemaEntry()} to fetch * latest value schema instead. */ @Deprecated diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreRecordDeleter.java b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreRecordDeleter.java deleted file mode 100644 index 66e1dae7d9..0000000000 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreRecordDeleter.java +++ /dev/null @@ -1,77 +0,0 @@ -package com.linkedin.venice.pushstatushelper; - -import com.linkedin.venice.common.PushStatusStoreUtils; -import com.linkedin.venice.pubsub.api.PubSubProduceResult; -import com.linkedin.venice.pushstatus.PushStatusKey; -import com.linkedin.venice.utils.LatencyUtils; -import com.linkedin.venice.writer.VeniceWriter; -import com.linkedin.venice.writer.VeniceWriterFactory; -import java.util.Optional; -import java.util.concurrent.Future; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - - -/** - * PushStatusRecordDeleter is a class help controller purge push status of outdated versions. - */ -public class PushStatusStoreRecordDeleter implements AutoCloseable { - private static final Logger LOGGER = LogManager.getLogger(PushStatusStoreRecordDeleter.class); - private final PushStatusStoreVeniceWriterCache veniceWriterCache; - - public PushStatusStoreRecordDeleter(VeniceWriterFactory veniceWriterFactory) { - this.veniceWriterCache = new PushStatusStoreVeniceWriterCache(veniceWriterFactory); - } - - public void deletePushStatus( - String storeName, - int version, - Optional incrementalPushVersion, - int partitionCount) { - VeniceWriter writer = veniceWriterCache.prepareVeniceWriter(storeName); - LOGGER.info("Deleting pushStatus of storeName: {}, version: {}", storeName, version); - for (int partitionId = 0; partitionId < partitionCount; partitionId++) { - PushStatusKey pushStatusKey = PushStatusStoreUtils.getPushKey(version, partitionId, incrementalPushVersion); - writer.delete(pushStatusKey, null); - } - } - - /** - * N.B.: Currently used by tests only. - * @return - */ - public Future deletePartitionIncrementalPushStatus( - String storeName, - int version, - String incrementalPushVersion, - int partitionId) { - PushStatusKey pushStatusKey = PushStatusStoreUtils.getPushKey( - version, - partitionId, - Optional.ofNullable(incrementalPushVersion), - Optional.of(PushStatusStoreUtils.SERVER_INCREMENTAL_PUSH_PREFIX)); - LOGGER.info( - "Deleting incremental push status belonging to a partition:{}. pushStatusKey:{}", - partitionId, - pushStatusKey); - return veniceWriterCache.prepareVeniceWriter(storeName).delete(pushStatusKey, null); - } - - public void removePushStatusStoreVeniceWriter(String storeName) { - LOGGER.info("Removing push status store writer for store {}", storeName); - long veniceWriterRemovingStartTimeInNs = System.nanoTime(); - veniceWriterCache.removeVeniceWriter(storeName); - LOGGER.info( - "Removed push status store writer for store {} in {}ms.", - storeName, - LatencyUtils.getLatencyInMS(veniceWriterRemovingStartTimeInNs)); - } - - @Override - public void close() { - LOGGER.info("Closing VeniceWriter cache"); - long cacheClosingStartTimeInNs = System.nanoTime(); - veniceWriterCache.close(); - LOGGER.info("Closed VeniceWriter cache in {}ms.", LatencyUtils.getLatencyInMS(cacheClosingStartTimeInNs)); - } -} diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreVeniceWriterCache.java b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreVeniceWriterCache.java index 017c86213a..c3ea4ccb6c 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreVeniceWriterCache.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreVeniceWriterCache.java @@ -2,7 +2,6 @@ import com.linkedin.venice.common.VeniceSystemStoreUtils; import com.linkedin.venice.meta.Version; -import com.linkedin.venice.schema.writecompute.WriteComputeSchemaConverter; import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; @@ -24,23 +23,25 @@ public class PushStatusStoreVeniceWriterCache implements AutoCloseable { private final VeniceWriterFactory writerFactory; // Local cache of VeniceWriters. private final Map veniceWriters = new VeniceConcurrentHashMap<>(); + private final Schema valueSchema; + private final Schema updateSchema; // writerFactory Used for instantiating VeniceWriter - public PushStatusStoreVeniceWriterCache(VeniceWriterFactory writerFactory) { + public PushStatusStoreVeniceWriterCache(VeniceWriterFactory writerFactory, Schema valueSchema, Schema updateSchema) { this.writerFactory = writerFactory; + this.valueSchema = valueSchema; + this.updateSchema = updateSchema; } public VeniceWriter prepareVeniceWriter(String storeName) { return veniceWriters.computeIfAbsent(storeName, s -> { String rtTopic = Version.composeRealTimeTopic(VeniceSystemStoreUtils.getDaVinciPushStatusStoreName(storeName)); - Schema valueSchema = AvroProtocolDefinition.PUSH_STATUS_SYSTEM_SCHEMA_STORE.getCurrentProtocolVersionSchema(); - Schema writeComputeSchema = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(valueSchema); VeniceWriterOptions options = new VeniceWriterOptions.Builder(rtTopic) .setKeySerializer( new VeniceAvroKafkaSerializer( AvroProtocolDefinition.PUSH_STATUS_SYSTEM_SCHEMA_STORE_KEY.getCurrentProtocolVersionSchema())) .setValueSerializer(new VeniceAvroKafkaSerializer(valueSchema)) - .setWriteComputeSerializer(new VeniceAvroKafkaSerializer(writeComputeSchema)) + .setWriteComputeSerializer(new VeniceAvroKafkaSerializer(updateSchema)) .setChunkingEnabled(false) .setPartitionCount(1) .build(); @@ -57,10 +58,6 @@ public void removeVeniceWriter(String storeName) { } } - public VeniceWriter getVeniceWriterFromMap(String storeName) { - return veniceWriters.get(storeName); - } - @Override public void close() { veniceWriters.forEach((k, v) -> { diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriter.java b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriter.java index 2a8b2d00f5..44a88f67d5 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriter.java @@ -1,17 +1,20 @@ package com.linkedin.venice.pushstatushelper; import com.linkedin.venice.common.PushStatusStoreUtils; +import com.linkedin.venice.pubsub.api.PubSubProduceResult; import com.linkedin.venice.pushmonitor.ExecutionStatus; -import com.linkedin.venice.pushstatus.NoOp; import com.linkedin.venice.pushstatus.PushStatusKey; -import com.linkedin.venice.pushstatus.PushStatusValue; -import com.linkedin.venice.pushstatus.PushStatusValueWriteOpRecord; -import com.linkedin.venice.pushstatus.instancesMapOps; -import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; +import com.linkedin.venice.schema.SchemaEntry; +import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry; +import com.linkedin.venice.utils.LatencyUtils; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterFactory; +import com.linkedin.venice.writer.update.UpdateBuilder; +import com.linkedin.venice.writer.update.UpdateBuilderImpl; import java.util.Collections; import java.util.Optional; +import java.util.concurrent.Future; +import org.apache.avro.Schema; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -26,43 +29,50 @@ public class PushStatusStoreWriter implements AutoCloseable { private static final Logger LOGGER = LogManager.getLogger(PushStatusStoreWriter.class); private final String instanceName; private final PushStatusStoreVeniceWriterCache veniceWriterCache; + private final int valueSchemaId; private final int derivedSchemaId; + private final Schema updateSchema; - /** - * @param writerFactory Used for instantiate veniceWriterCache - * @param instanceName format = hostAddress,appName - * @param derivedSchemaId writeCompute schema for updating push status - */ - public PushStatusStoreWriter(VeniceWriterFactory writerFactory, String instanceName, int derivedSchemaId) { - this(new PushStatusStoreVeniceWriterCache(writerFactory), instanceName, derivedSchemaId); + public PushStatusStoreWriter( + VeniceWriterFactory writerFactory, + String instanceName, + SchemaEntry valueSchemaEntry, + DerivedSchemaEntry updateSchemaEntry) { + this( + new PushStatusStoreVeniceWriterCache( + writerFactory, + valueSchemaEntry.getSchema(), + updateSchemaEntry.getSchema()), + instanceName, + updateSchemaEntry.getValueSchemaID(), + updateSchemaEntry.getId(), + updateSchemaEntry.getSchema()); } - PushStatusStoreWriter(PushStatusStoreVeniceWriterCache veniceWriterCache, String instanceName, int derivedSchemaId) { + PushStatusStoreWriter( + PushStatusStoreVeniceWriterCache veniceWriterCache, + String instanceName, + int valueSchemaId, + int derivedSchemaId, + Schema updateSchema) { this.veniceWriterCache = veniceWriterCache; this.instanceName = instanceName; + this.valueSchemaId = valueSchemaId; this.derivedSchemaId = derivedSchemaId; - } - - public void writeHeartbeat(String storeName, long heartbeat) { - VeniceWriter writer = veniceWriterCache.prepareVeniceWriter(storeName); - PushStatusKey pushStatusKey = PushStatusStoreUtils.getHeartbeatKey(instanceName); - PushStatusValue pushStatusValue = new PushStatusValue(); - pushStatusValue.reportTimestamp = heartbeat; - pushStatusValue.instances = Collections.emptyMap(); - LOGGER.info("Sending heartbeat of {}", instanceName); - writer.put( - pushStatusKey, - pushStatusValue, - AvroProtocolDefinition.PUSH_STATUS_SYSTEM_SCHEMA_STORE.getCurrentProtocolVersion()); - + this.updateSchema = updateSchema; } public void writeHeartbeat(String storeName) { writeHeartbeat(storeName, System.currentTimeMillis()); } - public void writePushStatus(String storeName, int version, int partitionId, ExecutionStatus status) { - writePushStatus(storeName, version, partitionId, status, Optional.empty()); + public void writeHeartbeat(String storeName, long heartbeat) { + VeniceWriter writer = veniceWriterCache.prepareVeniceWriter(storeName); + PushStatusKey pushStatusKey = PushStatusStoreUtils.getHeartbeatKey(instanceName); + UpdateBuilder updateBuilder = new UpdateBuilderImpl(updateSchema); + updateBuilder.setNewFieldValue("reportTimestamp", heartbeat); + LOGGER.info("Sending heartbeat of {}", instanceName); + writer.update(pushStatusKey, updateBuilder.build(), valueSchemaId, derivedSchemaId, null); } public void writePushStatus( @@ -84,12 +94,8 @@ public void writePushStatus( VeniceWriter writer = veniceWriterCache.prepareVeniceWriter(storeName); PushStatusKey pushStatusKey = PushStatusStoreUtils.getPushKey(version, partitionId, incrementalPushVersion, incrementalPushPrefix); - PushStatusValueWriteOpRecord writeComputeRecord = new PushStatusValueWriteOpRecord(); - instancesMapOps instances = new instancesMapOps(); - instances.mapUnion = Collections.singletonMap(instanceName, status.getValue()); - instances.mapDiff = Collections.emptyList(); - writeComputeRecord.instances = instances; - writeComputeRecord.reportTimestamp = new NoOp(); + UpdateBuilder updateBuilder = new UpdateBuilderImpl(updateSchema); + updateBuilder.setEntriesToAddToMapField("instances", Collections.singletonMap(instanceName, status.getValue())); LOGGER.info( "Updating pushStatus of {} to {}. Store: {}, version: {}, partition: {}", instanceName, @@ -97,12 +103,7 @@ public void writePushStatus( storeName, version, partitionId); - writer.update( - pushStatusKey, - writeComputeRecord, - AvroProtocolDefinition.PUSH_STATUS_SYSTEM_SCHEMA_STORE.getCurrentProtocolVersion(), - derivedSchemaId, - null); + writer.update(pushStatusKey, updateBuilder.build(), valueSchemaId, derivedSchemaId, null); // If this is a server side SOIP status update then add this incremental // push to the ongoing incremental pushes in push status store. @@ -119,24 +120,16 @@ public void addToSupposedlyOngoingIncrementalPushVersions( String incrementalPushVersion, ExecutionStatus status) { PushStatusKey pushStatusKey = PushStatusStoreUtils.getOngoingIncrementalPushStatusesKey(storeVersion); - PushStatusValueWriteOpRecord writeComputeRecord = new PushStatusValueWriteOpRecord(); - instancesMapOps incrementalPushes = new instancesMapOps(); - incrementalPushes.mapUnion = Collections.singletonMap(incrementalPushVersion, status.getValue()); - incrementalPushes.mapDiff = Collections.emptyList(); - writeComputeRecord.instances = incrementalPushes; - writeComputeRecord.reportTimestamp = new NoOp(); + UpdateBuilder updateBuilder = new UpdateBuilderImpl(updateSchema); + updateBuilder + .setEntriesToAddToMapField("instances", Collections.singletonMap(incrementalPushVersion, status.getValue())); LOGGER.info( "Adding incremental push version: {} to ongoingIncrementalPushes of store: {} from instance: {}", incrementalPushVersion, storeName, instanceName); veniceWriterCache.prepareVeniceWriter(storeName) - .update( - pushStatusKey, - writeComputeRecord, - AvroProtocolDefinition.PUSH_STATUS_SYSTEM_SCHEMA_STORE.getCurrentProtocolVersion(), - derivedSchemaId, - null); + .update(pushStatusKey, updateBuilder.build(), valueSchemaId, derivedSchemaId, null); } public void removeFromSupposedlyOngoingIncrementalPushVersions( @@ -144,24 +137,59 @@ public void removeFromSupposedlyOngoingIncrementalPushVersions( int storeVersion, String incrementalPushVersion) { PushStatusKey pushStatusKey = PushStatusStoreUtils.getOngoingIncrementalPushStatusesKey(storeVersion); - PushStatusValueWriteOpRecord writeComputeRecord = new PushStatusValueWriteOpRecord(); - instancesMapOps incrementalPushes = new instancesMapOps(); - incrementalPushes.mapUnion = Collections.emptyMap(); - incrementalPushes.mapDiff = Collections.singletonList(incrementalPushVersion); - writeComputeRecord.instances = incrementalPushes; - writeComputeRecord.reportTimestamp = new NoOp(); + UpdateBuilder updateBuilder = new UpdateBuilderImpl(updateSchema); + updateBuilder.setKeysToRemoveFromMapField("instances", Collections.singletonList(incrementalPushVersion)); LOGGER.info( "Removing incremental push version: {} from ongoingIncrementalPushes of store: {} from instance: {}", incrementalPushVersion, storeName, instanceName); veniceWriterCache.prepareVeniceWriter(storeName) - .update( - pushStatusKey, - writeComputeRecord, - AvroProtocolDefinition.PUSH_STATUS_SYSTEM_SCHEMA_STORE.getCurrentProtocolVersion(), - derivedSchemaId, - null); + .update(pushStatusKey, updateBuilder.build(), valueSchemaId, derivedSchemaId, null); + } + + public void deletePushStatus( + String storeName, + int version, + Optional incrementalPushVersion, + int partitionCount) { + VeniceWriter writer = veniceWriterCache.prepareVeniceWriter(storeName); + LOGGER.info("Deleting pushStatus of storeName: {}, version: {}", storeName, version); + for (int partitionId = 0; partitionId < partitionCount; partitionId++) { + PushStatusKey pushStatusKey = PushStatusStoreUtils.getPushKey(version, partitionId, incrementalPushVersion); + writer.delete(pushStatusKey, null); + } + } + + /** + * N.B.: Currently used by tests only. + * @return + */ + public Future deletePartitionIncrementalPushStatus( + String storeName, + int version, + String incrementalPushVersion, + int partitionId) { + PushStatusKey pushStatusKey = PushStatusStoreUtils.getPushKey( + version, + partitionId, + Optional.ofNullable(incrementalPushVersion), + Optional.of(PushStatusStoreUtils.SERVER_INCREMENTAL_PUSH_PREFIX)); + LOGGER.info( + "Deleting incremental push status belonging to a partition:{}. pushStatusKey:{}", + partitionId, + pushStatusKey); + return veniceWriterCache.prepareVeniceWriter(storeName).delete(pushStatusKey, null); + } + + public void removePushStatusStoreVeniceWriter(String storeName) { + LOGGER.info("Removing push status store writer for store {}", storeName); + long veniceWriterRemovingStartTimeInNs = System.nanoTime(); + veniceWriterCache.removeVeniceWriter(storeName); + LOGGER.info( + "Removed push status store writer for store {} in {}ms.", + storeName, + LatencyUtils.getLatencyInMS(veniceWriterRemovingStartTimeInNs)); } @Override diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/writer/update/UpdateBuilderImpl.java b/internal/venice-common/src/main/java/com/linkedin/venice/writer/update/UpdateBuilderImpl.java index 7bc113cdc7..cfb44a69b0 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/writer/update/UpdateBuilderImpl.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/writer/update/UpdateBuilderImpl.java @@ -200,7 +200,8 @@ private void validateFieldType(String fieldName, Schema.Type expectedType) { } Schema.Type valueFieldType = getCorrespondingValueFieldType(updateField); if (valueFieldType != expectedType) { - throw new IllegalStateException(); + throw new IllegalStateException( + "Field: " + fieldName + " expect type: " + expectedType + ", got type: " + valueFieldType); } } diff --git a/internal/venice-common/src/main/resources/avro/PushStatusValueWriteOpRecord/v1/PushStatusValueWriteOpRecord.avsc b/internal/venice-common/src/main/resources/avro/PushStatusValueWriteOpRecord/v1/PushStatusValueWriteOpRecord.avsc deleted file mode 100644 index 2e1a90164d..0000000000 --- a/internal/venice-common/src/main/resources/avro/PushStatusValueWriteOpRecord/v1/PushStatusValueWriteOpRecord.avsc +++ /dev/null @@ -1,54 +0,0 @@ -{ - "type": "record", - "name": "PushStatusValueWriteOpRecord", - "namespace": "com.linkedin.venice.pushstatus", - "fields": [ - { - "name": "instances", - "type": [ - { - "type": "record", - "name": "NoOp", - "fields": [] - }, - { - "type": "record", - "name": "instancesMapOps", - "fields": [ - { - "name": "mapUnion", - "type": { - "type": "map", - "values": "int" - }, - "default": {} - }, - { - "name": "mapDiff", - "type": { - "type": "array", - "items": "string" - }, - "default": [] - } - ] - }, - { - "type": "map", - "values": "int" - } - ], - "default": {} - }, - { - "name": "reportTimestamp", - "type": [ - "NoOp", - "null", - "long" - ], - "doc": "heartbeat.", - "default": {} - } - ] -} \ No newline at end of file diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriterTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriterTest.java index 3160ebdd7e..1a569166d5 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriterTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriterTest.java @@ -10,14 +10,16 @@ import static org.mockito.Mockito.when; import com.linkedin.venice.common.PushStatusStoreUtils; -import com.linkedin.venice.pushstatus.NoOp; import com.linkedin.venice.pushstatus.PushStatusKey; -import com.linkedin.venice.pushstatus.PushStatusValueWriteOpRecord; -import com.linkedin.venice.pushstatus.instancesMapOps; +import com.linkedin.venice.schema.writecompute.WriteComputeSchemaConverter; import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.writer.VeniceWriter; +import com.linkedin.venice.writer.update.UpdateBuilder; +import com.linkedin.venice.writer.update.UpdateBuilderImpl; import java.util.Collections; import java.util.Optional; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -26,30 +28,46 @@ public class PushStatusStoreWriterTest { private PushStatusStoreVeniceWriterCache veniceWriterCacheMock; private VeniceWriter veniceWriterMock; private PushStatusStoreWriter pushStatusStoreWriter; - private final static String storeName = "venice-test-push-status-store"; - private final static int storeVersion = 42; - private final static String incPushVersion = "inc_push_test_version_1"; - private final static int derivedSchemaId = 42; - private final static int protoVersion = + private static final String instanceName = "instanceX"; + private static final String storeName = "venice-test-push-status-store"; + private static final int storeVersion = 42; + private static final String incPushVersion = "inc_push_test_version_1"; + private static final int derivedSchemaId = 1; + private static final int valueSchemaId = AvroProtocolDefinition.PUSH_STATUS_SYSTEM_SCHEMA_STORE.getCurrentProtocolVersion(); + private static final Schema valueSchema = + AvroProtocolDefinition.PUSH_STATUS_SYSTEM_SCHEMA_STORE.getCurrentProtocolVersionSchema(); + private static final Schema updateSchema = + WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(valueSchema); @BeforeMethod public void setUp() { veniceWriterCacheMock = mock(PushStatusStoreVeniceWriterCache.class); veniceWriterMock = mock(VeniceWriter.class); - pushStatusStoreWriter = new PushStatusStoreWriter(veniceWriterCacheMock, "instanceX", derivedSchemaId); - + pushStatusStoreWriter = + new PushStatusStoreWriter(veniceWriterCacheMock, instanceName, valueSchemaId, derivedSchemaId, updateSchema); when(veniceWriterCacheMock.prepareVeniceWriter(storeName)).thenReturn(veniceWriterMock); } - private PushStatusValueWriteOpRecord getWriteComputeRecord() { - PushStatusValueWriteOpRecord writeOpRecord = new PushStatusValueWriteOpRecord(); - instancesMapOps instances = new instancesMapOps(); - instances.mapDiff = Collections.emptyList(); - instances.mapUnion = Collections.singletonMap(incPushVersion, START_OF_INCREMENTAL_PUSH_RECEIVED.getValue()); - writeOpRecord.instances = instances; - writeOpRecord.reportTimestamp = new NoOp(); - return writeOpRecord; + private GenericRecord getAddIncrementalPushUpdateRecord() { + UpdateBuilder updateBuilder = new UpdateBuilderImpl(updateSchema); + updateBuilder.setEntriesToAddToMapField( + "instances", + Collections.singletonMap(incPushVersion, START_OF_INCREMENTAL_PUSH_RECEIVED.getValue())); + return updateBuilder.build(); + } + + private GenericRecord getRemoveIncrementalPushUpdateRecord() { + UpdateBuilder updateBuilder = new UpdateBuilderImpl(updateSchema); + updateBuilder.setKeysToRemoveFromMapField("instances", Collections.singletonList(incPushVersion)); + return updateBuilder.build(); + } + + private GenericRecord getHeartbeatRecord(long heartbeatTimestamp) { + UpdateBuilder updateBuilder = new UpdateBuilderImpl(updateSchema); + updateBuilder.setNewFieldValue("reportTimestamp", heartbeatTimestamp); + return updateBuilder.build(); + } @Test(description = "Expect an update call for adding current inc-push to ongoing-inc-pushes when status is SOIP") @@ -66,10 +84,14 @@ public void testWritePushStatusWhenStatusIsSOIP() { Optional.of(incPushVersion), Optional.of(SERVER_INCREMENTAL_PUSH_PREFIX)); - verify(veniceWriterMock).update(eq(serverPushStatusKey), any(), eq(protoVersion), eq(derivedSchemaId), eq(null)); + verify(veniceWriterMock).update(eq(serverPushStatusKey), any(), eq(valueSchemaId), eq(derivedSchemaId), eq(null)); verify(veniceWriterCacheMock, times(2)).prepareVeniceWriter(storeName); - verify(veniceWriterMock) - .update(eq(ongoPushStatusKey), eq(getWriteComputeRecord()), eq(protoVersion), eq(derivedSchemaId), eq(null)); + verify(veniceWriterMock).update( + eq(ongoPushStatusKey), + eq(getAddIncrementalPushUpdateRecord()), + eq(valueSchemaId), + eq(derivedSchemaId), + eq(null)); } @Test @@ -81,22 +103,45 @@ public void testAddToSupposedlyOngoingIncrementalPushVersions() { incPushVersion, START_OF_INCREMENTAL_PUSH_RECEIVED); verify(veniceWriterCacheMock).prepareVeniceWriter(storeName); - verify(veniceWriterMock) - .update(eq(statusKey), eq(getWriteComputeRecord()), eq(protoVersion), eq(derivedSchemaId), eq(null)); + verify(veniceWriterMock).update( + eq(statusKey), + eq(getAddIncrementalPushUpdateRecord()), + eq(valueSchemaId), + eq(derivedSchemaId), + eq(null)); } @Test public void testRemoveFromOngoingIncrementalPushVersions() { PushStatusKey statusKey = PushStatusStoreUtils.getOngoingIncrementalPushStatusesKey(storeVersion); - PushStatusValueWriteOpRecord writeOpRecord = new PushStatusValueWriteOpRecord(); - instancesMapOps instances = new instancesMapOps(); - instances.mapDiff = Collections.singletonList(incPushVersion); - instances.mapUnion = Collections.emptyMap(); - writeOpRecord.instances = instances; - writeOpRecord.reportTimestamp = new NoOp(); - pushStatusStoreWriter.removeFromSupposedlyOngoingIncrementalPushVersions(storeName, storeVersion, incPushVersion); verify(veniceWriterCacheMock).prepareVeniceWriter(storeName); - verify(veniceWriterMock).update(eq(statusKey), eq(writeOpRecord), eq(protoVersion), eq(derivedSchemaId), eq(null)); + verify(veniceWriterMock).update( + eq(statusKey), + eq(getRemoveIncrementalPushUpdateRecord()), + eq(valueSchemaId), + eq(derivedSchemaId), + eq(null)); + } + + @Test + public void testDeletePushStatus() { + int partitionCount = 4; + pushStatusStoreWriter.deletePushStatus(storeName, storeVersion, Optional.empty(), partitionCount); + verify(veniceWriterCacheMock).prepareVeniceWriter(storeName); + for (int i = 0; i < partitionCount; i++) { + PushStatusKey statusKey = PushStatusStoreUtils.getPushKey(storeVersion, 0, Optional.empty()); + verify(veniceWriterMock).delete(eq(statusKey), eq(null)); + } + } + + @Test + public void testWriteHeartbeat() { + long heartbeat = 12345L; + PushStatusKey statusKey = PushStatusStoreUtils.getHeartbeatKey(instanceName); + pushStatusStoreWriter.writeHeartbeat(storeName, heartbeat); + verify(veniceWriterCacheMock).prepareVeniceWriter(storeName); + verify(veniceWriterMock) + .update(eq(statusKey), eq(getHeartbeatRecord(heartbeat)), eq(valueSchemaId), eq(derivedSchemaId), eq(null)); } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreTest.java index fab2603934..aca9f9b83e 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreTest.java @@ -42,7 +42,11 @@ import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pushmonitor.ExecutionStatus; import com.linkedin.venice.pushstatushelper.PushStatusStoreReader; -import com.linkedin.venice.pushstatushelper.PushStatusStoreRecordDeleter; +import com.linkedin.venice.pushstatushelper.PushStatusStoreWriter; +import com.linkedin.venice.schema.SchemaEntry; +import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry; +import com.linkedin.venice.schema.writecompute.WriteComputeSchemaConverter; +import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.utils.DataProviderUtils; import com.linkedin.venice.utils.PropertyBuilder; import com.linkedin.venice.utils.TestUtils; @@ -54,6 +58,7 @@ import java.util.Optional; import java.util.Properties; import java.util.concurrent.TimeUnit; +import org.apache.avro.Schema; import org.apache.avro.util.Utf8; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -264,12 +269,20 @@ public void testIncrementalPushStatusReadingFromPushStatusStoreInController() th response = controllerClient.queryJobStatus(job.getTopicToMonitor(), job.getIncrementalPushVersion()); assertEquals(response.getStatus(), ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED.name()); - PushStatusStoreRecordDeleter statusStoreDeleter = new PushStatusStoreRecordDeleter( - cluster.getLeaderVeniceController().getVeniceHelixAdmin().getVeniceWriterFactory()); + int valueSchemaId = AvroProtocolDefinition.PUSH_STATUS_SYSTEM_SCHEMA_STORE.getCurrentProtocolVersion(); + Schema valueSchema = AvroProtocolDefinition.PUSH_STATUS_SYSTEM_SCHEMA_STORE.getCurrentProtocolVersionSchema(); + Schema updateSchema = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(valueSchema); + SchemaEntry valueSchemaEntry = new SchemaEntry(valueSchemaId, valueSchema); + DerivedSchemaEntry updateSchemaEntry = new DerivedSchemaEntry(valueSchemaId, 1, updateSchema); + PushStatusStoreWriter statusStoreWriter = new PushStatusStoreWriter( + cluster.getLeaderVeniceController().getVeniceHelixAdmin().getVeniceWriterFactory(), + "dummyInstance", + valueSchemaEntry, + updateSchemaEntry); // After deleting the inc push status belonging to just one partition we should expect // SOIP from the controller since other partition has replicas with EOIP status - statusStoreDeleter.deletePartitionIncrementalPushStatus(storeName, 1, incPushVersion.get(), 1).get(); + statusStoreWriter.deletePartitionIncrementalPushStatus(storeName, 1, incPushVersion.get(), 1).get(); TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> { // N.B.: Even though we block on the deleter's future, that only means the delete message is persisted into // Kafka, but querying the system store may still yield a stale result, hence the need for retrying. @@ -279,7 +292,7 @@ public void testIncrementalPushStatusReadingFromPushStatusStoreInController() th }); // expect NOT_CREATED when statuses of all partitions are not available in the push status store - statusStoreDeleter.deletePartitionIncrementalPushStatus(storeName, 1, incPushVersion.get(), 0).get(); + statusStoreWriter.deletePartitionIncrementalPushStatus(storeName, 1, incPushVersion.get(), 0).get(); TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> { JobStatusQueryResponse jobStatusQueryResponse = controllerClient.queryJobStatus(job.getTopicToMonitor(), job.getIncrementalPushVersion()); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java index 0d1b31338e..d99aaeb25d 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/Admin.java @@ -28,7 +28,6 @@ import com.linkedin.venice.pubsub.PubSubConsumerAdapterFactory; import com.linkedin.venice.pushmonitor.ExecutionStatus; import com.linkedin.venice.pushstatushelper.PushStatusStoreReader; -import com.linkedin.venice.pushstatushelper.PushStatusStoreRecordDeleter; import com.linkedin.venice.pushstatushelper.PushStatusStoreWriter; import com.linkedin.venice.schema.GeneratedSchemaID; import com.linkedin.venice.schema.SchemaEntry; @@ -749,11 +748,6 @@ void updateRoutersClusterConfig( MetaStoreReader getMetaStoreReader(); - /** - * Return {@link PushStatusStoreRecordDeleter}. - */ - Optional getPushStatusStoreRecordDeleter(); - /** Get a list of clusters this controller is a leader of. * @return a list of clusters this controller is a leader of. */ @@ -948,9 +942,9 @@ default void startInstanceMonitor(String clusterName) { default void clearInstanceMonitor(String clusterName) { } - Optional getPushStatusStoreReader(); + PushStatusStoreReader getPushStatusStoreReader(); - Optional getPushStatusStoreWriter(); + PushStatusStoreWriter getPushStatusStoreWriter(); /** * Send a heartbeat timestamp to targeted system store. diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/HelixVeniceClusterResources.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/HelixVeniceClusterResources.java index bda7689993..9724523826 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/HelixVeniceClusterResources.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/HelixVeniceClusterResources.java @@ -166,7 +166,7 @@ public HelixVeniceClusterResources( getActiveActiveRealTimeSourceKafkaURLs(config), helixAdminClient, config, - admin.getPushStatusStoreReader().orElse(null), + admin.getPushStatusStoreReader(), metricsRepository); this.leakedPushStatusCleanUpService = new LeakedPushStatusCleanUpService( diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/UserSystemStoreLifeCycleHelper.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/UserSystemStoreLifeCycleHelper.java index 60f6f5563a..1b43fb97a6 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/UserSystemStoreLifeCycleHelper.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/UserSystemStoreLifeCycleHelper.java @@ -11,7 +11,6 @@ import com.linkedin.venice.meta.Store; import com.linkedin.venice.meta.Version; import com.linkedin.venice.pushmonitor.PushMonitorDelegator; -import com.linkedin.venice.pushstatushelper.PushStatusStoreRecordDeleter; import com.linkedin.venice.system.store.MetaStoreWriter; import java.util.ArrayList; import java.util.Arrays; @@ -129,7 +128,6 @@ public static void deleteSystemStore( String systemStoreName, boolean isStoreMigrating, MetaStoreWriter metaStoreWriter, - Optional pushStatusStoreRecordDeleter, Logger LOGGER) { LOGGER.info("Start deleting system store: {}", systemStoreName); admin.deleteAllVersionsInStore(clusterName, systemStoreName); @@ -141,9 +139,8 @@ public static void deleteSystemStore( metaStoreWriter.removeMetaStoreWriter(systemStoreName); break; case DAVINCI_PUSH_STATUS_STORE: - pushStatusStoreRecordDeleter.ifPresent( - deleter -> deleter.removePushStatusStoreVeniceWriter( - DAVINCI_PUSH_STATUS_STORE.extractRegularStoreName(systemStoreName))); + admin.getPushStatusStoreWriter() + .removePushStatusStoreVeniceWriter(DAVINCI_PUSH_STATUS_STORE.extractRegularStoreName(systemStoreName)); break; case BATCH_JOB_HEARTBEAT_STORE: // TODO: do we need to do any clean up here? HEARTBEAT_STORE is not coupled with any specific user store. @@ -172,7 +169,6 @@ public static void maybeDeleteSystemStoresForUserStore( String clusterName, Store userStore, MetaStoreWriter metaStoreWriter, - Optional pushStatusStoreRecordDeleter, Logger LOGGER) { if (userStore.isDaVinciPushStatusStoreEnabled()) { deleteSystemStore( @@ -183,7 +179,6 @@ public static void maybeDeleteSystemStoresForUserStore( DAVINCI_PUSH_STATUS_STORE.getSystemStoreName(userStore.getName()), userStore.isMigrating(), metaStoreWriter, - pushStatusStoreRecordDeleter, LOGGER); } // We must delete meta system store at the end as deleting other system store will try to send update to meta system @@ -197,7 +192,6 @@ public static void maybeDeleteSystemStoresForUserStore( VeniceSystemStoreType.META_STORE.getSystemStoreName(userStore.getName()), userStore.isMigrating(), metaStoreWriter, - pushStatusStoreRecordDeleter, LOGGER); } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerConfig.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerConfig.java index 5361cb984a..8207469a54 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerConfig.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerConfig.java @@ -475,7 +475,7 @@ public VeniceControllerConfig(VeniceProperties props) { this.daVinciPushStatusScanMaxOfflineInstance = props.getInt(DAVINCI_PUSH_STATUS_SCAN_MAX_OFFLINE_INSTANCE, 10); this.zkSharedDaVinciPushStatusSystemSchemaStoreAutoCreationEnabled = - props.getBoolean(CONTROLLER_ZK_SHARED_DAVINCI_PUSH_STATUS_SYSTEM_SCHEMA_STORE_AUTO_CREATION_ENABLED, false); + props.getBoolean(CONTROLLER_ZK_SHARED_DAVINCI_PUSH_STATUS_SYSTEM_SCHEMA_STORE_AUTO_CREATION_ENABLED, true); this.systemStoreAclSynchronizationDelayMs = props.getLong(CONTROLLER_SYSTEM_STORE_ACL_SYNCHRONIZATION_DELAY_MS, TimeUnit.HOURS.toMillis(1)); this.regionName = RegionUtils.getLocalRegionName(props, parent); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index afe72bf595..076c324302 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -5,7 +5,6 @@ import static com.linkedin.venice.ConfigKeys.KAFKA_MIN_IN_SYNC_REPLICAS; import static com.linkedin.venice.ConfigKeys.KAFKA_OVER_SSL; import static com.linkedin.venice.ConfigKeys.KAFKA_REPLICATION_FACTOR; -import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_STORE_DERIVED_SCHEMA_ID; import static com.linkedin.venice.ConfigKeys.SSL_KAFKA_BOOTSTRAP_SERVERS; import static com.linkedin.venice.ConfigKeys.SSL_TO_KAFKA_LEGACY; import static com.linkedin.venice.controller.UserSystemStoreLifeCycleHelper.AUTO_META_SYSTEM_STORE_PUSH_ID_PREFIX; @@ -165,7 +164,6 @@ import com.linkedin.venice.pushmonitor.PushStatusDecider; import com.linkedin.venice.pushmonitor.StatusSnapshot; import com.linkedin.venice.pushstatushelper.PushStatusStoreReader; -import com.linkedin.venice.pushstatushelper.PushStatusStoreRecordDeleter; import com.linkedin.venice.pushstatushelper.PushStatusStoreWriter; import com.linkedin.venice.schema.AvroSchemaParseUtils; import com.linkedin.venice.schema.GeneratedSchemaID; @@ -207,6 +205,7 @@ import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.utils.concurrent.ConcurrencyUtils; import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; +import com.linkedin.venice.utils.lazy.Lazy; import com.linkedin.venice.utils.locks.AutoCloseableLock; import com.linkedin.venice.views.VeniceView; import com.linkedin.venice.views.ViewUtils; @@ -350,9 +349,8 @@ public class VeniceHelixAdmin implements Admin, StoreCleaner { private final String coloLeaderClusterName; private final Optional sslFactory; private final String pushJobStatusStoreClusterName; - private final Optional pushStatusStoreReader; - private final Optional pushStatusStoreWriter; - private final Optional pushStatusStoreDeleter; + private final PushStatusStoreReader pushStatusStoreReader; + private final Lazy pushStatusStoreWriter; private final SharedHelixReadOnlyZKSharedSystemStoreRepository zkSharedSystemStoreRepository; private final SharedHelixReadOnlyZKSharedSchemaRepository zkSharedSchemaRepository; private final MetaStoreWriter metaStoreWriter; @@ -527,24 +525,6 @@ public VeniceHelixAdmin( isControllerClusterHAAS = commonConfig.isControllerClusterLeaderHAAS(); coloLeaderClusterName = commonConfig.getClusterName(); pushJobStatusStoreClusterName = commonConfig.getPushJobStatusStoreClusterName(); - // TODO: We need to consider removing this config, as push status store is rolled out everywhere. - if (commonConfig.isDaVinciPushStatusStoreEnabled()) { - pushStatusStoreReader = Optional.of( - new PushStatusStoreReader( - d2Client, - commonConfig.getClusterDiscoveryD2ServiceName(), - commonConfig.getPushStatusStoreHeartbeatExpirationTimeInSeconds())); - pushStatusStoreWriter = Optional.of( - new PushStatusStoreWriter( - veniceWriterFactory, - controllerName, - commonConfig.getProps().getInt(PUSH_STATUS_STORE_DERIVED_SCHEMA_ID, 1))); - pushStatusStoreDeleter = Optional.of(new PushStatusStoreRecordDeleter(veniceWriterFactory)); - } else { - pushStatusStoreReader = Optional.empty(); - pushStatusStoreWriter = Optional.empty(); - pushStatusStoreDeleter = Optional.empty(); - } usePushStatusStoreToReadServerIncrementalPushStatus = commonConfig.usePushStatusStoreForIncrementalPush(); zkSharedSystemStoreRepository = new SharedHelixReadOnlyZKSharedSystemStoreRepository( @@ -566,6 +546,17 @@ public VeniceHelixAdmin( commonConfig.getMetaStoreWriterCloseTimeoutInMS(), commonConfig.getMetaStoreWriterCloseConcurrency()); metaStoreReader = new MetaStoreReader(d2Client, commonConfig.getClusterDiscoveryD2ServiceName()); + pushStatusStoreReader = new PushStatusStoreReader( + d2Client, + commonConfig.getClusterDiscoveryD2ServiceName(), + commonConfig.getPushStatusStoreHeartbeatExpirationTimeInSeconds()); + pushStatusStoreWriter = Lazy.of(() -> { + String pushStatusStoreName = VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.getZkSharedStoreName(); + SchemaEntry valueSchemaEntry = zkSharedSchemaRepository.getSupersetOrLatestValueSchema(pushStatusStoreName); + DerivedSchemaEntry updateSchemaEntry = + zkSharedSchemaRepository.getLatestDerivedSchema(pushStatusStoreName, valueSchemaEntry.getId()); + return new PushStatusStoreWriter(veniceWriterFactory, controllerName, valueSchemaEntry, updateSchemaEntry); + }); clusterToLiveClusterConfigRepo = new VeniceConcurrentHashMap<>(); dataRecoveryManager = new DataRecoveryManager( @@ -1043,7 +1034,6 @@ private void deleteStore( clusterName, store, metaStoreWriter, - pushStatusStoreDeleter, LOGGER); if (isForcedDelete) { @@ -3165,13 +3155,12 @@ private void deleteOneStoreVersion(String clusterName, String storeName, int ver } cleanUpViewResources(new Properties(), store, deletedVersion.get().getNumber()); } - if (store.isDaVinciPushStatusStoreEnabled() && pushStatusStoreDeleter.isPresent()) { - pushStatusStoreDeleter.get() - .deletePushStatus( - storeName, - deletedVersion.get().getNumber(), - Optional.empty(), - deletedVersion.get().getPartitionCount()); + if (store.isDaVinciPushStatusStoreEnabled()) { + getPushStatusStoreWriter().deletePushStatus( + storeName, + deletedVersion.get().getNumber(), + Optional.empty(), + deletedVersion.get().getPartitionCount()); } } PubSubTopic rtTopic = pubSubTopicRepository.getTopic(Version.composeRealTimeTopic(storeName)); @@ -5501,12 +5490,11 @@ public OfflinePushStatusInfo getOffLinePushStatus( // if status is not SOIP remove incremental push version from the supposedlyOngoingIncrementalPushVersions if (incrementalPushVersion.isPresent() && (status == ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED || status == ExecutionStatus.NOT_CREATED) - && usePushStatusStoreToReadServerIncrementalPushStatus && pushStatusStoreWriter.isPresent()) { - pushStatusStoreWriter.get() - .removeFromSupposedlyOngoingIncrementalPushVersions( - store.getName(), - versionNumber, - incrementalPushVersion.get()); + && usePushStatusStoreToReadServerIncrementalPushStatus) { + getPushStatusStoreWriter().removeFromSupposedlyOngoingIncrementalPushVersions( + store.getName(), + versionNumber, + incrementalPushVersion.get()); } return list.get(0); } @@ -5521,14 +5509,11 @@ private ExecutionStatusWithDetails getIncrementalPushStatus( if (!usePushStatusStoreToReadServerIncrementalPushStatus) { return monitor.getIncrementalPushStatusAndDetails(kafkaTopic, incrementalPushVersion, cvRepo); } - if (!pushStatusStoreReader.isPresent()) { - throw new VeniceException("Cannot read server incremental push status from the status store."); - } return monitor.getIncrementalPushStatusFromPushStatusStore( kafkaTopic, incrementalPushVersion, cvRepo, - pushStatusStoreReader.get()); + getPushStatusStoreReader()); } private OfflinePushStatusInfo getOfflinePushStatusInfo( @@ -5584,7 +5569,7 @@ private OfflinePushStatusInfo getOfflinePushStatusInfo( if (store.getVersion(versionNumber).isPresent()) { Version version = store.getVersion(versionNumber).get(); ExecutionStatusWithDetails daVinciStatusAndDetails = PushMonitorUtils.getDaVinciPushStatusAndDetails( - pushStatusStoreReader.orElse(null), + getPushStatusStoreReader(), version.kafkaTopicName(), version.getPartitionCount(), incrementalPushVersion, @@ -6640,6 +6625,9 @@ Store checkPreConditionForUpdateStoreMetadata(String clusterName, String storeNa */ @Override public void close() { + Utils.closeQuietlyWithErrorLogged(getPushStatusStoreReader()); + pushStatusStoreWriter.ifPresent(PushStatusStoreWriter::close); + helixManager.disconnect(); Utils.closeQuietlyWithErrorLogged(zkSharedSystemStoreRepository); Utils.closeQuietlyWithErrorLogged(zkSharedSchemaRepository); @@ -6650,9 +6638,6 @@ public void close() { participantMessageWriterMap.clear(); dataRecoveryManager.close(); Utils.closeQuietlyWithErrorLogged(topicManagerRepository); - pushStatusStoreReader.ifPresent(PushStatusStoreReader::close); - pushStatusStoreWriter.ifPresent(PushStatusStoreWriter::close); - pushStatusStoreDeleter.ifPresent(PushStatusStoreRecordDeleter::close); Utils.closeQuietlyWithErrorLogged(pushJobDetailsStoreClient); Utils.closeQuietlyWithErrorLogged(livenessHeartbeatStoreClient); clusterControllerClientPerColoMap.forEach( @@ -7376,14 +7361,6 @@ public MetaStoreReader getMetaStoreReader() { return metaStoreReader; } - /** - * @see Admin#getPushStatusStoreRecordDeleter() - */ - @Override - public Optional getPushStatusStoreRecordDeleter() { - return pushStatusStoreDeleter; - } - /** * @see Admin#getEmergencySourceRegion(String) */ @@ -7889,13 +7866,13 @@ public void removeStoreFromGraveyard(String clusterName, String storeName) { } @Override - public Optional getPushStatusStoreReader() { + public PushStatusStoreReader getPushStatusStoreReader() { return pushStatusStoreReader; } @Override - public Optional getPushStatusStoreWriter() { - return pushStatusStoreWriter; + public PushStatusStoreWriter getPushStatusStoreWriter() { + return pushStatusStoreWriter.get(); } @Override @@ -7904,8 +7881,7 @@ public void sendHeartbeatToSystemStore(String clusterName, String storeName, lon String userStoreName = systemStoreType.extractRegularStoreName(storeName); long currentTimestamp = System.currentTimeMillis(); if (VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.equals(systemStoreType)) { - // Push status store is fully rolled out in controller. It will be cleaned up to become non-optional argument. - getPushStatusStoreWriter().get().writeHeartbeat(userStoreName, currentTimestamp); + getPushStatusStoreWriter().writeHeartbeat(userStoreName, currentTimestamp); } else { getMetaStoreWriter().writeHeartbeat(userStoreName, currentTimestamp); } @@ -7919,7 +7895,7 @@ public long getHeartbeatFromSystemStore(String clusterName, String systemStoreNa return RetryUtils.executeWithMaxRetriesAndFixedAttemptDuration(() -> { long retrievedTimestamp; if (systemStoreType == VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE) { - retrievedTimestamp = getPushStatusStoreReader().get() + retrievedTimestamp = getPushStatusStoreReader() .getHeartbeat(userStoreName, PushStatusStoreUtils.CONTROLLER_HEARTBEAT_INSTANCE_NAME); } else { retrievedTimestamp = getMetaStoreReader().getHeartbeat(userStoreName); diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index 32549b42e3..fdc43e6e77 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -188,7 +188,6 @@ import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pushmonitor.ExecutionStatus; import com.linkedin.venice.pushstatushelper.PushStatusStoreReader; -import com.linkedin.venice.pushstatushelper.PushStatusStoreRecordDeleter; import com.linkedin.venice.pushstatushelper.PushStatusStoreWriter; import com.linkedin.venice.schema.AvroSchemaParseUtils; import com.linkedin.venice.schema.GeneratedSchemaID; @@ -4757,14 +4756,6 @@ public MetaStoreReader getMetaStoreReader() { return getVeniceHelixAdmin().getMetaStoreReader(); } - /** - * @see Admin#getPushStatusStoreRecordDeleter() - */ - @Override - public Optional getPushStatusStoreRecordDeleter() { - return getVeniceHelixAdmin().getPushStatusStoreRecordDeleter(); - } - /** * @see Admin#getEmergencySourceRegion(String) */ @@ -5235,13 +5226,13 @@ public void removeStoreFromGraveyard(String clusterName, String storeName) { } @Override - public Optional getPushStatusStoreReader() { + public PushStatusStoreReader getPushStatusStoreReader() { throw new VeniceUnsupportedOperationException("Parent controller does not have Da Vinci push status store reader"); } @Override - public Optional getPushStatusStoreWriter() { - throw new VeniceUnsupportedOperationException("Parent controller does not have Da Vinci push status store writer"); + public PushStatusStoreWriter getPushStatusStoreWriter() { + return getVeniceHelixAdmin().getPushStatusStoreWriter(); } @Override diff --git a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java index de192e8c30..1eae3be015 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java @@ -458,6 +458,7 @@ private List createServices() { storageMetadataService, veniceConfigLoader, metadataRepo, + schemaRepo, metricsRepository, clusterConfig.getZookeeperAddress(), clusterConfig.getClusterName(),