Skip to content

Commit

Permalink
State Build implementation for File Copy based replication (#2954)
Browse files Browse the repository at this point in the history
* Added state build changes for StoreManager interface

* State build unsupported operation handling for CloudStorageManager

* StoreManager methods handling for MockStorageManager

* Added handling for addition of new blob store for filecopy based replication

* State build implementation changes for StorageManager

* Added tests for state build for filecopy based replication

* Failure tests added and minor refactoring

* PR comments changes

---------

Co-authored-by: Jai Balani <[email protected]>
  • Loading branch information
piyujai and Jai Balani authored Dec 23, 2024
1 parent 398db0b commit 81a7b6c
Show file tree
Hide file tree
Showing 6 changed files with 258 additions and 1 deletion.
13 changes: 13 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/server/StoreManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@
*/
public interface StoreManager {

/**
* Add a new BlobStore for FileCopy based replication with given {@link ReplicaId}.
* @param replica the {@link ReplicaId} of the {@link Store} which would be added.
* @return {@code true} if adding store was successful. {@code false} if not.
*/
boolean addBlobStoreForFileCopy(ReplicaId replica);

/**
* Add a new BlobStore with given {@link ReplicaId}.
* @param replica the {@link ReplicaId} of the {@link Store} which would be added.
Expand All @@ -44,6 +51,12 @@ public interface StoreManager {
*/
boolean addFileStore(ReplicaId replicaId);

/**
* Build state after filecopy is completed
* @param replica the {@link ReplicaId} of the {@link Store} for which store needs to be built
*/
void buildStateForFileCopy(ReplicaId replica);

/**
* Remove store from storage manager.
* @param id the {@link PartitionId} associated with store
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,19 @@ public CloudStorageManager(VerifiableProperties properties, VcrMetrics vcrMetric
lock = new ReentrantReadWriteLock();
}

@Override
public boolean addBlobStoreForFileCopy(ReplicaId replica) {
throw new UnsupportedOperationException("Method not supported");
}

@Override
public boolean addBlobStore(ReplicaId replica) {
return createAndStartBlobStoreIfAbsent(replica.getPartitionId()) != null;
}
@Override
public void buildStateForFileCopy(ReplicaId replica){
throw new UnsupportedOperationException("Method not supported");
}

/**
* Returning false because this will not be used as part of CloudStorageManager Implementation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,15 @@ public boolean startBlobStore(PartitionId id) {
return returnValueOfStartingBlobStore;
}

@Override
public void buildStateForFileCopy(ReplicaId replica) {
throw new UnsupportedOperationException("Method not supported");
}
@Override
public boolean addBlobStoreForFileCopy(ReplicaId replica) {
throw new UnsupportedOperationException("Method not supported");
}

@Override
public boolean addBlobStore(ReplicaId id) {
updatePartitionToDiskManager(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,43 @@ boolean controlCompactionForBlobStore(PartitionId id, boolean enabled) {
return succeed;
}


/**
* Add a new BlobStore with given {@link ReplicaId}.
* @param replica the {@link ReplicaId} of the {@link Store} which would be added.
* @return {@code true} if adding store was successful. {@code false} if not.
*/
boolean addBlobStoreForFileCopy(ReplicaId replica) {
rwLock.writeLock().lock();
boolean succeed = false;
try {
if (!running) {
logger.error("Failed to add {} because disk manager is not running", replica.getPartitionId());
} else {
// Directory is already created in prefilecopy steps. So no directory cleanup required.
BlobStore store = new BlobStore(replica, storeConfig, scheduler, longLivedTaskScheduler, this, diskIOScheduler,
diskSpaceAllocator, storeMainMetrics, storeUnderCompactionMetrics, keyFactory, recovery, hardDelete,
replicaStatusDelegates, time, accountService, null, indexPersistScheduler);
store.start();
// add store into CompactionManager
compactionManager.addBlobStore(store);
// add new created store into in-memory data structures.
stores.put(replica.getPartitionId(), store);
// create a bootstrap-in-progress file to distinguish it from regular stores (the file will be checked during
// BOOTSTRAP -> STANDBY transition)
createBootstrapFileIfAbsent(replica);
logger.info("New store for partitionId {} is successfully added into DiskManager.", replica.getPartitionId());
succeed = true;
}
} catch (Exception e) {
logger.error("Failed to start new added store for partitionId {} for FileCopy based replication", replica.getPartitionId(),
e);
} finally {
rwLock.writeLock().unlock();
}
return succeed;
}

/**
* Add a new BlobStore with given {@link ReplicaId}.
* @param replica the {@link ReplicaId} of the {@link Store} which would be added.
Expand Down Expand Up @@ -460,7 +497,7 @@ boolean addBlobStore(ReplicaId replica) {
// create a bootstrap-in-progress file to distinguish it from regular stores (the file will be checked during
// BOOTSTRAP -> STANDBY transition)
createBootstrapFileIfAbsent(replica);
logger.info("New store is successfully added into DiskManager.");
logger.info("New store is successfully added into DiskManager for partitionId {}.", replica.getPartitionId());
succeed = true;
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,28 @@ DiskManager addDisk(DiskId diskId) {
});
}

/**
* Add a new store to the storage manager for file copy based replication post filecopy is completed.
* @param replica the {@link ReplicaId} of the {@link Store} which would be added.
* @return
*/
@Override
public boolean addBlobStoreForFileCopy(ReplicaId replica) {
if (!partitionToDiskManager.containsKey(replica.getPartitionId())) {
logger.info("PartitionId {} doesn't exist in storage manager during state build, rejecting adding store request", replica.getPartitionId());
return false;
}
// We don't require addDisk since DiskManager is already started during initialization of StorageManager as part
// of prefilecopy steps. We will fetch it from partitionToDiskManager map.
DiskManager diskManager = partitionToDiskManager.get(replica.getPartitionId());
if (diskManager == null || !diskManager.addBlobStoreForFileCopy(replica)) {
logger.error("Failed to add new store into DiskManager");
return false;
}
logger.info("New store is successfully added into StorageManager");
return true;
}

@Override
public boolean addBlobStore(ReplicaId replica) {
if (partitionToDiskManager.containsKey(replica.getPartitionId())) {
Expand All @@ -553,11 +575,51 @@ public boolean addBlobStore(ReplicaId replica) {
return true;
}

/**
* Build inmemory state for file copy based replication post filecopy is completed.
* @param replica the {@link ReplicaId} of the {@link Store} for which store needs to be built
*/
@Override
public boolean addFileStore(ReplicaId replicaId) {
//TODO: Implementation To Be added.
return false;
}
public void buildStateForFileCopy(ReplicaId replica){
if (replica == null) {
logger.error("ReplicaId is null");
throw new StateTransitionException("ReplicaId null is not found in clustermap for " + currentNode, ReplicaNotFound);
}
PartitionId partitionId = replica.getPartitionId();

if (!addBlobStoreForFileCopy(replica)){
// We have decreased the available disk space in HelixClusterManager#getDiskForBootstrapReplica. Increase it
// back since addition of store failed.
replica.getDiskId().increaseAvailableSpaceInBytes(replica.getCapacityInBytes());
if (!clusterMap.isDataNodeInFullAutoMode(currentNode)) {
logger.error("Failed to add store for replica {} into storage manager", partitionId.getId());
throw new StateTransitionException("Failed to add store for replica " + partitionId.getId() + " into storage manager",
ReplicaOperationFailure);
} else {
logger.info("Failed to add store for replica {} at location {}. Cleanup and raise StateTransitionException",
partitionId.getId(), replica.getReplicaPath());
// This will remove the reserved space from diskSpaceAllocator
tryRemoveFailedBootstrapBlobStore(replica);
// Throwing StateTransitionException here since we cannot retry adding BlobStore since Filecopy has copied data
// into the selected disk itself. Hence, putting the replica into ERROR state via StateTransitionException
throw new StateTransitionException("Failed to add store for replica " + partitionId.getId() + " into storage manager",
ReplicaOperationFailure);
}
}
Store store = getStore(replica.getPartitionId(), false);
// Only update store state if this is a state transition for primary participant. Since replication Manager
// which eventually moves this state to STANDBY/LEADER only listens to primary participant, store state gets
// stuck in BOOTSTRAP if this is updated by second participant listener too
ReplicaState currentState = store.getCurrentState();
if (currentState != ReplicaState.LEADER && currentState != ReplicaState.STANDBY) {
// Only set the current state to BOOTSTRAP when it's not LEADER or STANDBY
store.setCurrentState(ReplicaState.BOOTSTRAP);
}
}

/**
* If a bootstrap replica fails, try to remove all the files and directories associated with it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -269,6 +270,132 @@ public void scheduleAndControlCompactionTest() throws Exception {
shutdownAndAssertStoresInaccessible(storageManager, replicas);
}

/**
* Helper util to add blobs to a given Store
* @param store store to add blob to.
* @param size size in bytes of the randomized blob
* @param expiresAtMs expiry in milliseconds to be set for the blob
* @return {@link MockId} the mock id of the blob added
* @throws StoreException
*/
public MockId addRandomBlobToStore(Store store, long size, long expiresAtMs) throws StoreException {
final Random random = new Random();
short accountId = Utils.getRandomShort(TestUtils.RANDOM);
short containerId = Utils.getRandomShort(TestUtils.RANDOM);
short lifeVersion = MessageInfo.LIFE_VERSION_FROM_FRONTEND;

MockId id = new MockId(TestUtils.getRandomString(MOCK_ID_STRING_LENGTH), accountId, containerId);
long crc = random.nextLong();
MessageInfo info =
new MessageInfo(id, size, false, false, false, expiresAtMs, crc, id.getAccountId(), id.getContainerId(),
Utils.Infinite_Time, lifeVersion);
ByteBuffer buffer = ByteBuffer.wrap(TestUtils.getRandomBytes((int) size));
store.put(new MockMessageWriteSet(Collections.singletonList(info), Collections.singletonList(buffer)));
return id;
}

private StorageManager initializeStorageManagerForStateBuildTests(int newMountPathIndex) throws Exception {
generateConfigs(true, false);
MockDataNodeId localNode = clusterMap.getDataNodes().get(0);
// add new MountPath to local node
File f = File.createTempFile("ambry", ".tmp");
File mountFile =
new File(f.getParent(), "mountpathfile" + MockClusterMap.PLAIN_TEXT_PORT_START_NUMBER + newMountPathIndex);
MockClusterMap.deleteFileOrDirectory(mountFile);
assertTrue("Couldn't create mount path directory", mountFile.mkdir());
localNode.addMountPaths(Collections.singletonList(mountFile.getAbsolutePath()));

StorageManager storageManager = createStorageManager(localNode, metricRegistry, null);
storageManager.start();
return storageManager;
}

// TODO: Add additional negative tests for StateBuild exception handling.
/**
* Test buildStateForFileCopy with newly created {@link ReplicaId}.
* @throws Exception
*/
@Test
public void buildStateForFileCopyTest() throws Exception {
int newMountPathIndex = 3;
int newPartitionId = 803;
StorageManager storageManager = initializeStorageManagerForStateBuildTests(newMountPathIndex);
PartitionId newPartition =
new MockPartitionId(newPartitionId, MockClusterMap.DEFAULT_PARTITION_CLASS, clusterMap.getDataNodes(), newMountPathIndex);
MockDataNodeId localNode = clusterMap.getDataNodes().get(0);
// test add store onto a new disk, which should succeed
assertTrue("Add new store should succeed", storageManager.addBlobStore(newPartition.getReplicaIds().get(0)));
assertNotNull("The store shouldn't be null because new store is successfully added",
storageManager.getStore(newPartition, false));
DiskManager dm = storageManager.getDiskManager(newPartition);
Store store = dm.getStore(newPartition, false);
MockId id1 = addRandomBlobToStore(store, 100, Utils.Infinite_Time);
MockId id2 = addRandomBlobToStore(store, 200, Utils.Infinite_Time);

// Shutdown store and try to build state using buildStateForFileCopy assuming state has to be built for the same
// store with 2 blobs present on the partition's file on disk.
store.shutdown();
storageManager.buildStateForFileCopy(newPartition.getReplicaIds().get(0));
dm = storageManager.getDiskManager(newPartition);
store = dm.getStore(newPartition, false);
assertNotNull(store.get(Collections.singletonList(id1), EnumSet.noneOf(StoreGetOptions.class)));
assertNotNull(store.get(Collections.singletonList(id2), EnumSet.noneOf(StoreGetOptions.class)));
}


/**
* Test buildStateForFileCopy with newly created {@link ReplicaId} for failure to add an already started blob store.
* @throws Exception
*/
@Test
public void buildStateForFileCopyDuplicateBlobStoreFailureTest() throws Exception {
int newMountPathIndex = 3;
int newPartitionId = 803;
StorageManager storageManager = initializeStorageManagerForStateBuildTests(newMountPathIndex);

PartitionId newPartition =
new MockPartitionId(newPartitionId, MockClusterMap.DEFAULT_PARTITION_CLASS, clusterMap.getDataNodes(), newMountPathIndex);

// test add store onto a new disk, which should succeed
assertTrue("Add new store should succeed", storageManager.addBlobStore(newPartition.getReplicaIds().get(0)));
assertNotNull("The store shouldn't be null because new store is successfully added",
storageManager.getStore(newPartition, false));
// Attempting to add store via addBlobStoreForFileCopy should fail since the newPartition already has store started.
assertFalse("Add store which is already existing should fail", storageManager.addBlobStoreForFileCopy(newPartition.getReplicaIds().get(0)));
storageManager.getStore(newPartition, false).shutdown();

// Testing flow where addBlobStoreForFileCopy is called before addBlobStore
// test add store onto a new disk, which should succeed
assertTrue("Add store using addBlobStoreForFileCopy should succeed", storageManager.addBlobStoreForFileCopy(newPartition.getReplicaIds().get(0)));
assertNotNull("The store shouldn't be null because new store is successfully added",
storageManager.getStore(newPartition, false));
// This should fail since the newPartition already has store started.
assertFalse("Add the duplicate store using addBlobStore should fail", storageManager.addBlobStore(newPartition.getReplicaIds().get(0)));
storageManager.getStore(newPartition, false).shutdown();
}

/**
* Test buildStateForFileCopy with {@link ReplicaId} as null.
* @throws Exception
*/
@Test(expected = StateTransitionException.class)
public void buildStateForFileCopyReplicaNullFailureTest() throws Exception {
int newMountPathIndex = 3;
int newPartitionId = 803;
StorageManager storageManager = initializeStorageManagerForStateBuildTests(newMountPathIndex);

PartitionId newPartition =
new MockPartitionId(newPartitionId, MockClusterMap.DEFAULT_PARTITION_CLASS, clusterMap.getDataNodes(), newMountPathIndex);
// test add store onto a new disk, which should succeed
assertTrue("Add new store should succeed", storageManager.addBlobStore(newPartition.getReplicaIds().get(0)));
assertNotNull("The store shouldn't be null because new store is successfully added",
storageManager.getStore(newPartition, false));
// This should fail since the newPartition already has store started.
assertFalse("Add store which is already existing should fail", storageManager.addBlobStoreForFileCopy(newPartition.getReplicaIds().get(0)));
storageManager.buildStateForFileCopy(null);
}


/**
* Test add new BlobStore with given {@link ReplicaId}.
*/
Expand Down

0 comments on commit 81a7b6c

Please sign in to comment.