diff --git a/ambry-api/src/main/java/com/github/ambry/server/StoreManager.java b/ambry-api/src/main/java/com/github/ambry/server/StoreManager.java index 997b231a62..ff7d492945 100644 --- a/ambry-api/src/main/java/com/github/ambry/server/StoreManager.java +++ b/ambry-api/src/main/java/com/github/ambry/server/StoreManager.java @@ -30,6 +30,13 @@ */ public interface StoreManager { + /** + * Add a new BlobStore for FileCopy based replication with given {@link ReplicaId}. + * @param replica the {@link ReplicaId} of the {@link Store} which would be added. + * @return {@code true} if adding store was successful. {@code false} if not. + */ + boolean addBlobStoreForFileCopy(ReplicaId replica); + /** * Add a new BlobStore with given {@link ReplicaId}. * @param replica the {@link ReplicaId} of the {@link Store} which would be added. @@ -44,6 +51,12 @@ public interface StoreManager { */ boolean addFileStore(ReplicaId replicaId); + /** + * Build state after filecopy is completed + * @param replica the {@link ReplicaId} of the {@link Store} for which store needs to be built + */ + void buildStateForFileCopy(ReplicaId replica); + /** * Remove store from storage manager. * @param id the {@link PartitionId} associated with store diff --git a/ambry-cloud/src/main/java/com/github/ambry/cloud/CloudStorageManager.java b/ambry-cloud/src/main/java/com/github/ambry/cloud/CloudStorageManager.java index ce33006769..09c0a786ce 100644 --- a/ambry-cloud/src/main/java/com/github/ambry/cloud/CloudStorageManager.java +++ b/ambry-cloud/src/main/java/com/github/ambry/cloud/CloudStorageManager.java @@ -57,10 +57,19 @@ public CloudStorageManager(VerifiableProperties properties, VcrMetrics vcrMetric lock = new ReentrantReadWriteLock(); } + @Override + public boolean addBlobStoreForFileCopy(ReplicaId replica) { + throw new UnsupportedOperationException("Method not supported"); + } + @Override public boolean addBlobStore(ReplicaId replica) { return createAndStartBlobStoreIfAbsent(replica.getPartitionId()) != null; } + @Override + public void buildStateForFileCopy(ReplicaId replica){ + throw new UnsupportedOperationException("Method not supported"); + } /** * Returning false because this will not be used as part of CloudStorageManager Implementation. diff --git a/ambry-server/src/test/java/com/github/ambry/server/MockStorageManager.java b/ambry-server/src/test/java/com/github/ambry/server/MockStorageManager.java index 82e59ba397..7b336dd7b5 100644 --- a/ambry-server/src/test/java/com/github/ambry/server/MockStorageManager.java +++ b/ambry-server/src/test/java/com/github/ambry/server/MockStorageManager.java @@ -626,6 +626,15 @@ public boolean startBlobStore(PartitionId id) { return returnValueOfStartingBlobStore; } + @Override + public void buildStateForFileCopy(ReplicaId replica) { + throw new UnsupportedOperationException("Method not supported"); + } + @Override + public boolean addBlobStoreForFileCopy(ReplicaId replica) { + throw new UnsupportedOperationException("Method not supported"); + } + @Override public boolean addBlobStore(ReplicaId id) { updatePartitionToDiskManager(id); diff --git a/ambry-store/src/main/java/com/github/ambry/store/DiskManager.java b/ambry-store/src/main/java/com/github/ambry/store/DiskManager.java index 0ee8489ca3..d6412b0b31 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/DiskManager.java +++ b/ambry-store/src/main/java/com/github/ambry/store/DiskManager.java @@ -420,6 +420,43 @@ boolean controlCompactionForBlobStore(PartitionId id, boolean enabled) { return succeed; } + + /** + * Add a new BlobStore with given {@link ReplicaId}. + * @param replica the {@link ReplicaId} of the {@link Store} which would be added. + * @return {@code true} if adding store was successful. {@code false} if not. + */ + boolean addBlobStoreForFileCopy(ReplicaId replica) { + rwLock.writeLock().lock(); + boolean succeed = false; + try { + if (!running) { + logger.error("Failed to add {} because disk manager is not running", replica.getPartitionId()); + } else { + // Directory is already created in prefilecopy steps. So no directory cleanup required. + BlobStore store = new BlobStore(replica, storeConfig, scheduler, longLivedTaskScheduler, this, diskIOScheduler, + diskSpaceAllocator, storeMainMetrics, storeUnderCompactionMetrics, keyFactory, recovery, hardDelete, + replicaStatusDelegates, time, accountService, null, indexPersistScheduler); + store.start(); + // add store into CompactionManager + compactionManager.addBlobStore(store); + // add new created store into in-memory data structures. + stores.put(replica.getPartitionId(), store); + // create a bootstrap-in-progress file to distinguish it from regular stores (the file will be checked during + // BOOTSTRAP -> STANDBY transition) + createBootstrapFileIfAbsent(replica); + logger.info("New store for partitionId {} is successfully added into DiskManager.", replica.getPartitionId()); + succeed = true; + } + } catch (Exception e) { + logger.error("Failed to start new added store for partitionId {} for FileCopy based replication", replica.getPartitionId(), + e); + } finally { + rwLock.writeLock().unlock(); + } + return succeed; + } + /** * Add a new BlobStore with given {@link ReplicaId}. * @param replica the {@link ReplicaId} of the {@link Store} which would be added. @@ -460,7 +497,7 @@ boolean addBlobStore(ReplicaId replica) { // create a bootstrap-in-progress file to distinguish it from regular stores (the file will be checked during // BOOTSTRAP -> STANDBY transition) createBootstrapFileIfAbsent(replica); - logger.info("New store is successfully added into DiskManager."); + logger.info("New store is successfully added into DiskManager for partitionId {}.", replica.getPartitionId()); succeed = true; } } catch (Exception e) { diff --git a/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java b/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java index 0059d06b6a..1f1956082e 100644 --- a/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java +++ b/ambry-store/src/main/java/com/github/ambry/store/StorageManager.java @@ -536,6 +536,28 @@ DiskManager addDisk(DiskId diskId) { }); } + /** + * Add a new store to the storage manager for file copy based replication post filecopy is completed. + * @param replica the {@link ReplicaId} of the {@link Store} which would be added. + * @return + */ + @Override + public boolean addBlobStoreForFileCopy(ReplicaId replica) { + if (!partitionToDiskManager.containsKey(replica.getPartitionId())) { + logger.info("PartitionId {} doesn't exist in storage manager during state build, rejecting adding store request", replica.getPartitionId()); + return false; + } + // We don't require addDisk since DiskManager is already started during initialization of StorageManager as part + // of prefilecopy steps. We will fetch it from partitionToDiskManager map. + DiskManager diskManager = partitionToDiskManager.get(replica.getPartitionId()); + if (diskManager == null || !diskManager.addBlobStoreForFileCopy(replica)) { + logger.error("Failed to add new store into DiskManager"); + return false; + } + logger.info("New store is successfully added into StorageManager"); + return true; + } + @Override public boolean addBlobStore(ReplicaId replica) { if (partitionToDiskManager.containsKey(replica.getPartitionId())) { @@ -553,11 +575,51 @@ public boolean addBlobStore(ReplicaId replica) { return true; } + /** + * Build inmemory state for file copy based replication post filecopy is completed. + * @param replica the {@link ReplicaId} of the {@link Store} for which store needs to be built + */ @Override public boolean addFileStore(ReplicaId replicaId) { //TODO: Implementation To Be added. return false; } + public void buildStateForFileCopy(ReplicaId replica){ + if (replica == null) { + logger.error("ReplicaId is null"); + throw new StateTransitionException("ReplicaId null is not found in clustermap for " + currentNode, ReplicaNotFound); + } + PartitionId partitionId = replica.getPartitionId(); + + if (!addBlobStoreForFileCopy(replica)){ + // We have decreased the available disk space in HelixClusterManager#getDiskForBootstrapReplica. Increase it + // back since addition of store failed. + replica.getDiskId().increaseAvailableSpaceInBytes(replica.getCapacityInBytes()); + if (!clusterMap.isDataNodeInFullAutoMode(currentNode)) { + logger.error("Failed to add store for replica {} into storage manager", partitionId.getId()); + throw new StateTransitionException("Failed to add store for replica " + partitionId.getId() + " into storage manager", + ReplicaOperationFailure); + } else { + logger.info("Failed to add store for replica {} at location {}. Cleanup and raise StateTransitionException", + partitionId.getId(), replica.getReplicaPath()); + // This will remove the reserved space from diskSpaceAllocator + tryRemoveFailedBootstrapBlobStore(replica); + // Throwing StateTransitionException here since we cannot retry adding BlobStore since Filecopy has copied data + // into the selected disk itself. Hence, putting the replica into ERROR state via StateTransitionException + throw new StateTransitionException("Failed to add store for replica " + partitionId.getId() + " into storage manager", + ReplicaOperationFailure); + } + } + Store store = getStore(replica.getPartitionId(), false); + // Only update store state if this is a state transition for primary participant. Since replication Manager + // which eventually moves this state to STANDBY/LEADER only listens to primary participant, store state gets + // stuck in BOOTSTRAP if this is updated by second participant listener too + ReplicaState currentState = store.getCurrentState(); + if (currentState != ReplicaState.LEADER && currentState != ReplicaState.STANDBY) { + // Only set the current state to BOOTSTRAP when it's not LEADER or STANDBY + store.setCurrentState(ReplicaState.BOOTSTRAP); + } + } /** * If a bootstrap replica fails, try to remove all the files and directories associated with it. diff --git a/ambry-store/src/test/java/com/github/ambry/store/StorageManagerTest.java b/ambry-store/src/test/java/com/github/ambry/store/StorageManagerTest.java index b2e86d054a..5ea3fcfd2c 100644 --- a/ambry-store/src/test/java/com/github/ambry/store/StorageManagerTest.java +++ b/ambry-store/src/test/java/com/github/ambry/store/StorageManagerTest.java @@ -68,6 +68,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -269,6 +270,132 @@ public void scheduleAndControlCompactionTest() throws Exception { shutdownAndAssertStoresInaccessible(storageManager, replicas); } + /** + * Helper util to add blobs to a given Store + * @param store store to add blob to. + * @param size size in bytes of the randomized blob + * @param expiresAtMs expiry in milliseconds to be set for the blob + * @return {@link MockId} the mock id of the blob added + * @throws StoreException + */ + public MockId addRandomBlobToStore(Store store, long size, long expiresAtMs) throws StoreException { + final Random random = new Random(); + short accountId = Utils.getRandomShort(TestUtils.RANDOM); + short containerId = Utils.getRandomShort(TestUtils.RANDOM); + short lifeVersion = MessageInfo.LIFE_VERSION_FROM_FRONTEND; + + MockId id = new MockId(TestUtils.getRandomString(MOCK_ID_STRING_LENGTH), accountId, containerId); + long crc = random.nextLong(); + MessageInfo info = + new MessageInfo(id, size, false, false, false, expiresAtMs, crc, id.getAccountId(), id.getContainerId(), + Utils.Infinite_Time, lifeVersion); + ByteBuffer buffer = ByteBuffer.wrap(TestUtils.getRandomBytes((int) size)); + store.put(new MockMessageWriteSet(Collections.singletonList(info), Collections.singletonList(buffer))); + return id; + } + + private StorageManager initializeStorageManagerForStateBuildTests(int newMountPathIndex) throws Exception { + generateConfigs(true, false); + MockDataNodeId localNode = clusterMap.getDataNodes().get(0); + // add new MountPath to local node + File f = File.createTempFile("ambry", ".tmp"); + File mountFile = + new File(f.getParent(), "mountpathfile" + MockClusterMap.PLAIN_TEXT_PORT_START_NUMBER + newMountPathIndex); + MockClusterMap.deleteFileOrDirectory(mountFile); + assertTrue("Couldn't create mount path directory", mountFile.mkdir()); + localNode.addMountPaths(Collections.singletonList(mountFile.getAbsolutePath())); + + StorageManager storageManager = createStorageManager(localNode, metricRegistry, null); + storageManager.start(); + return storageManager; + } + + // TODO: Add additional negative tests for StateBuild exception handling. + /** + * Test buildStateForFileCopy with newly created {@link ReplicaId}. + * @throws Exception + */ + @Test + public void buildStateForFileCopyTest() throws Exception { + int newMountPathIndex = 3; + int newPartitionId = 803; + StorageManager storageManager = initializeStorageManagerForStateBuildTests(newMountPathIndex); + PartitionId newPartition = + new MockPartitionId(newPartitionId, MockClusterMap.DEFAULT_PARTITION_CLASS, clusterMap.getDataNodes(), newMountPathIndex); + MockDataNodeId localNode = clusterMap.getDataNodes().get(0); + // test add store onto a new disk, which should succeed + assertTrue("Add new store should succeed", storageManager.addBlobStore(newPartition.getReplicaIds().get(0))); + assertNotNull("The store shouldn't be null because new store is successfully added", + storageManager.getStore(newPartition, false)); + DiskManager dm = storageManager.getDiskManager(newPartition); + Store store = dm.getStore(newPartition, false); + MockId id1 = addRandomBlobToStore(store, 100, Utils.Infinite_Time); + MockId id2 = addRandomBlobToStore(store, 200, Utils.Infinite_Time); + + // Shutdown store and try to build state using buildStateForFileCopy assuming state has to be built for the same + // store with 2 blobs present on the partition's file on disk. + store.shutdown(); + storageManager.buildStateForFileCopy(newPartition.getReplicaIds().get(0)); + dm = storageManager.getDiskManager(newPartition); + store = dm.getStore(newPartition, false); + assertNotNull(store.get(Collections.singletonList(id1), EnumSet.noneOf(StoreGetOptions.class))); + assertNotNull(store.get(Collections.singletonList(id2), EnumSet.noneOf(StoreGetOptions.class))); + } + + + /** + * Test buildStateForFileCopy with newly created {@link ReplicaId} for failure to add an already started blob store. + * @throws Exception + */ + @Test + public void buildStateForFileCopyDuplicateBlobStoreFailureTest() throws Exception { + int newMountPathIndex = 3; + int newPartitionId = 803; + StorageManager storageManager = initializeStorageManagerForStateBuildTests(newMountPathIndex); + + PartitionId newPartition = + new MockPartitionId(newPartitionId, MockClusterMap.DEFAULT_PARTITION_CLASS, clusterMap.getDataNodes(), newMountPathIndex); + + // test add store onto a new disk, which should succeed + assertTrue("Add new store should succeed", storageManager.addBlobStore(newPartition.getReplicaIds().get(0))); + assertNotNull("The store shouldn't be null because new store is successfully added", + storageManager.getStore(newPartition, false)); + // Attempting to add store via addBlobStoreForFileCopy should fail since the newPartition already has store started. + assertFalse("Add store which is already existing should fail", storageManager.addBlobStoreForFileCopy(newPartition.getReplicaIds().get(0))); + storageManager.getStore(newPartition, false).shutdown(); + + // Testing flow where addBlobStoreForFileCopy is called before addBlobStore + // test add store onto a new disk, which should succeed + assertTrue("Add store using addBlobStoreForFileCopy should succeed", storageManager.addBlobStoreForFileCopy(newPartition.getReplicaIds().get(0))); + assertNotNull("The store shouldn't be null because new store is successfully added", + storageManager.getStore(newPartition, false)); + // This should fail since the newPartition already has store started. + assertFalse("Add the duplicate store using addBlobStore should fail", storageManager.addBlobStore(newPartition.getReplicaIds().get(0))); + storageManager.getStore(newPartition, false).shutdown(); + } + + /** + * Test buildStateForFileCopy with {@link ReplicaId} as null. + * @throws Exception + */ + @Test(expected = StateTransitionException.class) + public void buildStateForFileCopyReplicaNullFailureTest() throws Exception { + int newMountPathIndex = 3; + int newPartitionId = 803; + StorageManager storageManager = initializeStorageManagerForStateBuildTests(newMountPathIndex); + + PartitionId newPartition = + new MockPartitionId(newPartitionId, MockClusterMap.DEFAULT_PARTITION_CLASS, clusterMap.getDataNodes(), newMountPathIndex); + // test add store onto a new disk, which should succeed + assertTrue("Add new store should succeed", storageManager.addBlobStore(newPartition.getReplicaIds().get(0))); + assertNotNull("The store shouldn't be null because new store is successfully added", + storageManager.getStore(newPartition, false)); + // This should fail since the newPartition already has store started. + assertFalse("Add store which is already existing should fail", storageManager.addBlobStoreForFileCopy(newPartition.getReplicaIds().get(0))); + storageManager.buildStateForFileCopy(null); + } + + /** * Test add new BlobStore with given {@link ReplicaId}. */