Skip to content

Commit

Permalink
[server][dvc] Open store before bootstrapping from blob transfer. (#1290
Browse files Browse the repository at this point in the history
)
  • Loading branch information
jingy-li authored Nov 12, 2024
1 parent 931638f commit 5752ad4
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ public void startConsumption(VeniceStoreVersionConfig storeConfig, int partition
LOGGER.info("Retrieving storage engine for store {} partition {}", storeVersion, partition);
Pair<Store, Version> storeAndVersion =
Utils.waitStoreVersionOrThrow(storeVersion, getStoreIngestionService().getMetadataRepo());
Supplier<StoreVersionState> svsSupplier = () -> storageMetadataService.getStoreVersionState(storeVersion);
syncStoreVersionConfig(storeAndVersion.getFirst(), storeConfig);

Runnable runnable = () -> {
Supplier<StoreVersionState> svsSupplier = () -> storageMetadataService.getStoreVersionState(storeVersion);
syncStoreVersionConfig(storeAndVersion.getFirst(), storeConfig);
AbstractStorageEngine storageEngine =
storageService.openStoreForNewPartition(storeConfig, partition, svsSupplier);
topicStorageEngineReferenceMap.compute(storeVersion, (key, storageEngineAtomicReference) -> {
Expand All @@ -79,6 +80,7 @@ public void startConsumption(VeniceStoreVersionConfig storeConfig, int partition
if (!storeAndVersion.getFirst().isBlobTransferEnabled() || blobTransferManager == null) {
runnable.run();
} else {
storageService.openStore(storeConfig, svsSupplier);
CompletionStage<Void> bootstrapFuture =
bootstrapFromBlobs(storeAndVersion.getFirst(), storeAndVersion.getSecond().getNumber(), partition);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public class MetaDataHandler extends SimpleChannelInboundHandler<HttpRequest> {
static final String REQUEST_ERROR_STORE_NOT_FOUND_IN_CLUSTER = "Store: %s could not be found in cluster: %s";

static final String REQUEST_BLOB_DISCOVERY_ERROR_INVALID_SETTINGS =
"Blob Discovery: blob transfer is not enabled or store: %s is not a batch-only store";
"Blob Discovery: blob transfer is not enabled for store: %s";

static final String REQUEST_BLOB_DISCOVERY_MISSING_QUERY_PARAMS =
"Blob Discovery: missing storeName:%s, storeVersion:%s, or storePartition:%s";
Expand Down Expand Up @@ -553,7 +553,7 @@ private void handleBlobDiscovery(ChannelHandlerContext ctx, VenicePathParserHelp
return;
}

if (!store.isBlobTransferEnabled() || store.isHybrid()) {
if (!store.isBlobTransferEnabled()) {
byte[] errBody = (String.format(REQUEST_BLOB_DISCOVERY_ERROR_INVALID_SETTINGS, storeName)).getBytes();
setupResponseAndFlush(FORBIDDEN, errBody, false, ctx);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1556,7 +1556,7 @@ public void testHandleBlobDiscoverySettings(Boolean isBlobTransferEnabled, Boole
storeRepository,
pushStatusStoreReader);

if (isBlobTransferEnabled && !isHybrid) {
if (isBlobTransferEnabled) {
Assert.assertEquals(response.status(), HttpResponseStatus.OK);
Assert.assertEquals(response.headers().get(CONTENT_TYPE), JSON);
} else {
Expand Down

0 comments on commit 5752ad4

Please sign in to comment.