Skip to content

Commit

Permalink
[da-vinci][server][controller] Fetch latest update schema available f…
Browse files Browse the repository at this point in the history
…or push status writer in all Venice components (#750)

Previously, PushStatusStoreWriter uses PUSH_STATUS_SYSTEM_SCHEMA_STORE's current protocol id as value schema, and relying on a config: PUSH_STATUS_STORE_DERIVED_SCHEMA_ID (default = 1) to as update schema id, and use a hardcoded specific write op record as partial update record.
This is wrong as 1-1 is our legacy schema type which is a union with content of [update schema, del op]. The actual static update record is only the first branch of the union. And we found that Fast-Avro fails to deserialize this schema as writer schema and 1-3 as reader schema. Even though vanilla Avro does not throw exception, but it does not give correct deserialized result as well.
This PR fixed the above issue by asking all components that use PushStatusStoreWriter to fetch the latest value schema and latest update schema from StoreSchemaFetcher and schema repository. PushStatusStoreWriter will no longer use protocol ID / value schema from AvroProtocolDefinition as it might not be registered. It will also not rely on config to find update protocol ID.

This PR also performs some cleanup:

Remove PushStatusStoreRecordDeleter and merge functionality into PushStatuStoreWriter.
Make PushStatusStoreWriter/PushStatusStoreReader no longer optional in Admin code as it has been rolled out for more than 2 years.
Some other code cleanup.
  • Loading branch information
sixpluszero authored Nov 15, 2023
1 parent 3cf409e commit 2e708d3
Show file tree
Hide file tree
Showing 19 changed files with 305 additions and 343 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.linkedin.davinci;

import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_STORE_DERIVED_SCHEMA_ID;
import static com.linkedin.venice.ConfigKeys.VALIDATE_VENICE_INTERNAL_SCHEMA_VERSION;
import static java.lang.Thread.currentThread;

Expand All @@ -26,6 +25,7 @@
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend;
import com.linkedin.davinci.store.cache.backend.ObjectCacheConfig;
import com.linkedin.venice.client.schema.StoreSchemaFetcher;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.common.VeniceSystemStoreType;
Expand All @@ -44,7 +44,9 @@
import com.linkedin.venice.pubsub.PubSubClientsFactory;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.pushstatushelper.PushStatusStoreWriter;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.schema.SchemaReader;
import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
import com.linkedin.venice.serialization.avro.SchemaPresenceChecker;
Expand Down Expand Up @@ -187,8 +189,16 @@ public DaVinciBackend(
new VeniceWriterFactory(backendProps.toProperties(), pubSubClientsFactory.getProducerAdapterFactory(), null);
String pid = Utils.getPid();
String instanceName = Utils.getHostName() + "_" + (pid == null ? "NA" : pid);
int derivedSchemaID = backendProps.getInt(PUSH_STATUS_STORE_DERIVED_SCHEMA_ID, 1);
pushStatusStoreWriter = new PushStatusStoreWriter(writerFactory, instanceName, derivedSchemaID);

// Fetch latest update schema's protocol ID for Push Status Store from Router.
ClientConfig pushStatusStoreClientConfig = ClientConfig.cloneConfig(clientConfig)
.setStoreName(VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.getZkSharedStoreName());
try (StoreSchemaFetcher schemaFetcher = ClientFactory.createStoreSchemaFetcher(pushStatusStoreClientConfig)) {
SchemaEntry valueSchemaEntry = schemaFetcher.getLatestValueSchemaEntry();
DerivedSchemaEntry updateSchemaEntry = schemaFetcher.getUpdateSchemaEntry(valueSchemaEntry.getId());
pushStatusStoreWriter =
new PushStatusStoreWriter(writerFactory, instanceName, valueSchemaEntry, updateSchemaEntry);
}

SchemaReader kafkaMessageEnvelopeSchemaReader = ClientFactory.getSchemaReader(
ClientConfig.cloneConfig(clientConfig)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.linkedin.davinci.helix;

import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_STORE_DERIVED_SCHEMA_ID;

import com.linkedin.davinci.config.VeniceConfigLoader;
import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
Expand All @@ -16,6 +14,7 @@
import com.linkedin.davinci.stats.ParticipantStateTransitionStats;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.venice.common.VeniceSystemStoreType;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.helix.HelixAdapterSerializer;
import com.linkedin.venice.helix.HelixInstanceConverter;
Expand All @@ -26,10 +25,13 @@
import com.linkedin.venice.helix.ZkClientFactory;
import com.linkedin.venice.meta.IngestionMode;
import com.linkedin.venice.meta.Instance;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory;
import com.linkedin.venice.pushmonitor.KillOfflinePushMessage;
import com.linkedin.venice.pushstatushelper.PushStatusStoreWriter;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry;
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.stats.HelixMessageChannelStats;
import com.linkedin.venice.status.StatusMessageHandler;
Expand Down Expand Up @@ -76,6 +78,7 @@ public class HelixParticipationService extends AbstractVeniceService
private final StorageService storageService;
private final VeniceConfigLoader veniceConfigLoader;
private final ReadOnlyStoreRepository helixReadOnlyStoreRepository;
private final ReadOnlySchemaRepository helixReadOnlySchemaRepository;
private final MetricsRepository metricsRepository;
private final VeniceIngestionBackend ingestionBackend;
private final CompletableFuture<SafeHelixManager> managerFuture; // complete this future when the manager is connected
Expand All @@ -99,6 +102,7 @@ public HelixParticipationService(
StorageMetadataService storageMetadataService,
VeniceConfigLoader veniceConfigLoader,
ReadOnlyStoreRepository helixReadOnlyStoreRepository,
ReadOnlySchemaRepository helixReadOnlySchemaRepository,
MetricsRepository metricsRepository,
String zkAddress,
String clusterName,
Expand All @@ -113,6 +117,7 @@ public HelixParticipationService(
this.zkAddress = zkAddress;
this.veniceConfigLoader = veniceConfigLoader;
this.helixReadOnlyStoreRepository = helixReadOnlyStoreRepository;
this.helixReadOnlySchemaRepository = helixReadOnlySchemaRepository;
this.metricsRepository = metricsRepository;
this.instance = new Instance(participantName, hostname, port);
this.managerFuture = managerFuture;
Expand Down Expand Up @@ -299,17 +304,20 @@ private void checkBeforeJoinInCluster() {
private void asyncStart() {
zkClient = ZkClientFactory.newZkClient(zkAddress);

// we use push status store for persisting incremental push statuses
VeniceServerConfig veniceServerConfig = veniceConfigLoader.getVeniceServerConfig();
VeniceProperties veniceProperties = veniceServerConfig.getClusterProperties();
PubSubProducerAdapterFactory pubSubProducerAdapterFactory =
veniceServerConfig.getPubSubClientsFactory().getProducerAdapterFactory();
VeniceWriterFactory writerFactory =
new VeniceWriterFactory(veniceProperties.toProperties(), pubSubProducerAdapterFactory, null);
statusStoreWriter = new PushStatusStoreWriter(
writerFactory,
instance.getNodeId(),
veniceProperties.getInt(PUSH_STATUS_STORE_DERIVED_SCHEMA_ID, 1));
String dummyPushStatusStoreName = VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.getSystemStoreName("dummy");
SchemaEntry valueSchemaEntry =
helixReadOnlySchemaRepository.getSupersetOrLatestValueSchema(dummyPushStatusStoreName);
DerivedSchemaEntry updateSchemaEntry =
helixReadOnlySchemaRepository.getLatestDerivedSchema(dummyPushStatusStoreName, valueSchemaEntry.getId());
// We use push status store for persisting incremental push statuses
statusStoreWriter =
new PushStatusStoreWriter(writerFactory, instance.getNodeId(), valueSchemaEntry, updateSchemaEntry);

// Record replica status in Zookeeper.
// Need to be started before connecting to ZK, otherwise some notification will not be sent by this notifier.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import com.linkedin.venice.exceptions.InvalidVeniceSchemaException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.schema.SchemaData;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry;
import com.linkedin.venice.utils.ObjectMapperFactory;
import com.linkedin.venice.utils.RetryUtils;
import java.io.IOException;
Expand Down Expand Up @@ -80,6 +82,13 @@ public Schema getLatestValueSchema() {
return latestSchema;
}

@Override
public SchemaEntry getLatestValueSchemaEntry() {
String latestSchemaRequestPath = TYPE_LATEST_VALUE_SCHEMA + "/" + storeClient.getStoreName();
SchemaResponse schemaResponse = fetchSingleSchema(latestSchemaRequestPath);
return new SchemaEntry(schemaResponse.getId(), schemaResponse.getSchemaStr());
}

private Schema getLatestValueSchemaFromAllValueSchemas() {
MultiSchemaResponse multiSchemaResponse = fetchAllValueSchemas();
int targetSchemaId = multiSchemaResponse.getSuperSetSchemaId();
Expand Down Expand Up @@ -117,6 +126,17 @@ public Map<Integer, Schema> getAllValueSchemasWithId() {
return schemaSet;
}

@Override
public DerivedSchemaEntry getUpdateSchemaEntry(int valueSchemaId) {
// Fetch the latest update schema for the specified value schema.
String updateSchemaRequestPath = TYPE_GET_UPDATE_SCHEMA + "/" + storeClient.getStoreName() + "/" + valueSchemaId;
SchemaResponse updateSchemaResponse = fetchSingleSchema(updateSchemaRequestPath);
return new DerivedSchemaEntry(
updateSchemaResponse.getId(),
updateSchemaResponse.getDerivedSchemaId(),
updateSchemaResponse.getSchemaStr());
}

@Override
public Schema getUpdateSchema(Schema valueSchema) throws VeniceException {
int valueSchemaId = getValueSchemaId(valueSchema);
Expand Down Expand Up @@ -195,7 +215,7 @@ private MultiSchemaResponse fetchAllValueSchemas() {
return multiSchemaResponse;
}

private String fetchSingleSchemaString(String requestPath) throws VeniceClientException {
SchemaResponse fetchSingleSchema(String requestPath) throws VeniceClientException {
SchemaResponse schemaResponse;
byte[] response = executeRequest(requestPath);
try {
Expand All @@ -208,6 +228,11 @@ private String fetchSingleSchemaString(String requestPath) throws VeniceClientEx
"Received an error while fetching schema from path: " + requestPath + ", error message: "
+ schemaResponse.getError());
}
return schemaResponse;
}

private String fetchSingleSchemaString(String requestPath) throws VeniceClientException {
SchemaResponse schemaResponse = fetchSingleSchema(requestPath);
if (schemaResponse.getSchemaStr() == null) {
throw new VeniceException("Received bad schema response with null schema string");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.linkedin.venice.exceptions.InvalidVeniceSchemaException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry;
import java.io.Closeable;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -55,4 +57,12 @@ default Set<Schema> getAllValueSchemas() {
* Returns the Venice store name this class is associated with.
*/
String getStoreName();

SchemaEntry getLatestValueSchemaEntry();

/**
* Get the latest derived schema of a value schema. Return the derived schema in {@link DerivedSchemaEntry} format,
* which contains Schema and protocol ID.
*/
DerivedSchemaEntry getUpdateSchemaEntry(int valueSchemaId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public void onCompletion(Optional<Exception> exception) {
* Get the latest value schema known in current store client.
* This function doesn't guarantee it will return the latest schema if you add a new value schema
* when current store client is running.
* @deprecated This method is considered deprecated. Please use {@link StoreSchemaFetcher#getLatestValueSchema()} to fetch
* @deprecated This method is considered deprecated. Please use {@link StoreSchemaFetcher#getLatestValueSchemaEntry()} to fetch
* latest value schema instead.
*/
@Deprecated
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.schema.writecompute.WriteComputeSchemaConverter;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
Expand All @@ -24,23 +23,25 @@ public class PushStatusStoreVeniceWriterCache implements AutoCloseable {
private final VeniceWriterFactory writerFactory;
// Local cache of VeniceWriters.
private final Map<String, VeniceWriter> veniceWriters = new VeniceConcurrentHashMap<>();
private final Schema valueSchema;
private final Schema updateSchema;

// writerFactory Used for instantiating VeniceWriter
public PushStatusStoreVeniceWriterCache(VeniceWriterFactory writerFactory) {
public PushStatusStoreVeniceWriterCache(VeniceWriterFactory writerFactory, Schema valueSchema, Schema updateSchema) {
this.writerFactory = writerFactory;
this.valueSchema = valueSchema;
this.updateSchema = updateSchema;
}

public VeniceWriter prepareVeniceWriter(String storeName) {
return veniceWriters.computeIfAbsent(storeName, s -> {
String rtTopic = Version.composeRealTimeTopic(VeniceSystemStoreUtils.getDaVinciPushStatusStoreName(storeName));
Schema valueSchema = AvroProtocolDefinition.PUSH_STATUS_SYSTEM_SCHEMA_STORE.getCurrentProtocolVersionSchema();
Schema writeComputeSchema = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(valueSchema);
VeniceWriterOptions options = new VeniceWriterOptions.Builder(rtTopic)
.setKeySerializer(
new VeniceAvroKafkaSerializer(
AvroProtocolDefinition.PUSH_STATUS_SYSTEM_SCHEMA_STORE_KEY.getCurrentProtocolVersionSchema()))
.setValueSerializer(new VeniceAvroKafkaSerializer(valueSchema))
.setWriteComputeSerializer(new VeniceAvroKafkaSerializer(writeComputeSchema))
.setWriteComputeSerializer(new VeniceAvroKafkaSerializer(updateSchema))
.setChunkingEnabled(false)
.setPartitionCount(1)
.build();
Expand All @@ -57,10 +58,6 @@ public void removeVeniceWriter(String storeName) {
}
}

public VeniceWriter getVeniceWriterFromMap(String storeName) {
return veniceWriters.get(storeName);
}

@Override
public void close() {
veniceWriters.forEach((k, v) -> {
Expand Down
Loading

0 comments on commit 2e708d3

Please sign in to comment.