Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ChenSammi committed Sep 7, 2023
1 parent fd1e80d commit 16be2d1
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@
import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.common.utils.DatanodeStoreCache;
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
Expand All @@ -61,6 +63,7 @@
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.rules.TemporaryFolder;

import org.junit.runner.RunWith;
Expand Down Expand Up @@ -88,7 +91,11 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static java.util.Collections.singletonList;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DB_PROFILE;
import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V2;
import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V3;
import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.CONTAINER_SCHEMA_V3_ENABLED;
import static org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil.isSameSchemaVersion;
import static org.apache.hadoop.ozone.container.replication.CopyContainerCompression.NO_COMPRESSION;
import static org.apache.ratis.util.Preconditions.assertTrue;
Expand Down Expand Up @@ -885,4 +892,111 @@ public void testIsEmptyContainerStateWhileImportWithoutBlock()
Assert.assertTrue(container.getContainerData().isEmpty());
}
}

/**
* Test import schema V2 replica to V3 enabled HddsVolume.
*/
@Test
public void testImportV2ReplicaToV3HddsVolume() throws Exception {
final String testDir = GenericTestUtils.getTempPath(
TestKeyValueContainer.class.getSimpleName() + "-"
+ UUID.randomUUID());
try {
testMixedSchemaImport(testDir, false);
} finally {
FileUtils.deleteDirectory(new File(testDir));
}
}

/**
* Test import schema V3 replica to V3 disabled HddsVolume.
*/
@Test
public void testImportV3ReplicaToV2HddsVolume() throws Exception {
final String testDir = GenericTestUtils.getTempPath(
TestKeyValueContainer.class.getSimpleName() + "-"
+ UUID.randomUUID());
try {
testMixedSchemaImport(testDir, true);
} finally {
FileUtils.deleteDirectory(new File(testDir));
}
}

private void testMixedSchemaImport(String dir,
boolean schemaV3Enabled) throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
String scmId = UUID.randomUUID().toString();
UUID datanodeId = UUID.randomUUID();
final String dir1 = dir + (schemaV3Enabled ? "/v3" : "/v2");

// create HddsVolume
VolumeChoosingPolicy volumeChoosingPolicy =
mock(RoundRobinVolumeChoosingPolicy.class);
HddsVolume hddsVolume1 = new HddsVolume.Builder(dir1)
.conf(conf).datanodeUuid(datanodeId.toString()).build();
conf.setBoolean(CONTAINER_SCHEMA_V3_ENABLED, schemaV3Enabled);
StorageVolumeUtil.checkVolume(hddsVolume1, scmId, scmId, conf, null, null);
Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong()))
.thenReturn(hddsVolume1);
MutableVolumeSet volumeSet = mock(MutableVolumeSet.class);

// create container
long containerId = 1;
KeyValueContainerData data = new KeyValueContainerData(containerId,
ContainerLayoutVersion.FILE_PER_BLOCK,
ContainerTestHelper.CONTAINER_MAX_SIZE, UUID.randomUUID().toString(),
UUID.randomUUID().toString());
KeyValueContainer container = new KeyValueContainer(data, conf);
container.create(volumeSet, volumeChoosingPolicy, scmId);
long pendingDeleteBlockCount = 20;
try (DBHandle meta = BlockUtils.getDB(data, conf)) {
Table<String, Long> metadataTable = meta.getStore().getMetadataTable();
metadataTable.put(data.getPendingDeleteBlockCountKey(),
pendingDeleteBlockCount);
}
container.close();

// verify container schema
if (schemaV3Enabled) {
Assert.assertEquals(SCHEMA_V3,
container.getContainerData().getSchemaVersion());
} else {
Assert.assertEquals(SCHEMA_V2,
container.getContainerData().getSchemaVersion());
}

//export container
TarContainerPacker packer = new TarContainerPacker(NO_COMPRESSION);
File file1 = new File(dir1 + "/" + containerId);
if (!file1.createNewFile()) {
Assertions.fail("Failed to create file " + file1.getAbsolutePath());
}
try (FileOutputStream fos = new FileOutputStream(file1)) {
container.exportContainerData(fos, packer);
}

// create new HddsVolume
conf.setBoolean(CONTAINER_SCHEMA_V3_ENABLED, !schemaV3Enabled);
final String dir2 = dir + (schemaV3Enabled ? "/v2" : "/v3");
HddsVolume hddsVolume2 = new HddsVolume.Builder(dir2)
.conf(conf).datanodeUuid(datanodeId.toString()).build();
StorageVolumeUtil.checkVolume(hddsVolume2, scmId, scmId, conf, null, null);
Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong()))
.thenReturn(hddsVolume2);
Mockito.when(volumeSet.getVolumesList()).thenReturn(
singletonList(hddsVolume2));

// import container to new HddsVolume
KeyValueContainer importedContainer = new KeyValueContainer(data, conf);
importedContainer.populatePathFields(scmId, hddsVolume2);
try (FileInputStream fio = new FileInputStream(file1)) {
importedContainer.importContainerData(fio, packer);
}

Assert.assertEquals(schemaV3Enabled ? SCHEMA_V3 : SCHEMA_V2,
importedContainer.getContainerData().getSchemaVersion());
Assert.assertEquals(pendingDeleteBlockCount,
importedContainer.getContainerData().getNumPendingDeletionBlocks());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.hadoop.ozone.container.replication;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
Expand All @@ -35,44 +33,31 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority;
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCommandInfo;
import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinatorTask;
import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;

import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.ozone.test.GenericTestUtils;
Expand All @@ -90,22 +75,14 @@
import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V2;
import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V3;
import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.CONTAINER_SCHEMA_V3_ENABLED;
import static org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status.DONE;
import static org.apache.hadoop.ozone.container.replication.CopyContainerCompression.NO_COMPRESSION;
import static org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand.fromSources;
import static org.junit.jupiter.api.Assertions.fail;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority.LOW;
import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority.NORMAL;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;

/**
* Test the replication supervisor.
Expand Down Expand Up @@ -483,149 +460,6 @@ public void testPriorityOrdering() throws InterruptedException {
supervisor.getInFlightReplications(BlockingTask.class));
}

/**
* Test replicate schema V2 replica to V3 enabled datanode.
*/
@Test
public void testV2ToV3Replication() throws Exception {
final String testDir = GenericTestUtils.getTempPath(
TestReplicationSupervisor.class.getSimpleName() + "-"
+ UUID.randomUUID());
try {
testMixedSchemaReplication(testDir, false);
} finally {
FileUtils.deleteDirectory(new File(testDir));
}
}

/**
* Test replicate schema V3 replica to V3 disabled datanode.
*/
@Test
public void testV3ToV2Replication() throws Exception {
final String testDir = GenericTestUtils.getTempPath(
TestReplicationSupervisor.class.getSimpleName() + "-"
+ UUID.randomUUID());
try {
testMixedSchemaReplication(testDir, true);
} finally {
FileUtils.deleteDirectory(new File(testDir));
}
}

private void testMixedSchemaReplication(String dir,
boolean schemaV3Enabled) throws IOException, InterruptedException,
TimeoutException {
OzoneConfiguration conf = new OzoneConfiguration();
String scmId = UUID.randomUUID().toString();
UUID datanodeId = UUID.randomUUID();
final String dir1 = dir + (schemaV3Enabled ? "/v3" : "/v2");

// create HddsVolume
VolumeChoosingPolicy volumeChoosingPolicy =
mock(RoundRobinVolumeChoosingPolicy.class);
HddsVolume hddsVolume1 = new HddsVolume.Builder(dir1)
.conf(conf).datanodeUuid(datanodeId.toString()).build();
conf.setBoolean(CONTAINER_SCHEMA_V3_ENABLED, schemaV3Enabled);
StorageVolumeUtil.checkVolume(hddsVolume1, scmId, scmId, conf, null, null);
Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong()))
.thenReturn(hddsVolume1);
MutableVolumeSet volumeSet = mock(MutableVolumeSet.class);

// create container
long containerId = 1;
KeyValueContainerData data = new KeyValueContainerData(containerId,
ContainerLayoutVersion.FILE_PER_BLOCK,
ContainerTestHelper.CONTAINER_MAX_SIZE, UUID.randomUUID().toString(),
UUID.randomUUID().toString());
KeyValueContainer container = new KeyValueContainer(data, conf);
container.create(volumeSet, volumeChoosingPolicy, scmId);
try (DBHandle meta = BlockUtils.getDB(data, conf)) {
data.resetPendingDeleteBlockCount(meta);
}
container.close();

// verify container schema
if (schemaV3Enabled) {
Assert.assertEquals(SCHEMA_V3,
container.getContainerData().getSchemaVersion());
} else {
Assert.assertEquals(SCHEMA_V2,
container.getContainerData().getSchemaVersion());
}

//export container
TarContainerPacker packer = new TarContainerPacker(NO_COMPRESSION);
File file1 = new File(dir1 + "/" + containerId);
if (!file1.createNewFile()) {
fail("Failed to create file " + file1.getAbsolutePath());
}
try (FileOutputStream fos = new FileOutputStream(file1)) {
container.exportContainerData(fos, packer);
}

// create new HddsVolume
conf.setBoolean(CONTAINER_SCHEMA_V3_ENABLED, !schemaV3Enabled);
final String dir2 = dir + (schemaV3Enabled ? "/v2" : "/v3");
HddsVolume hddsVolume2 = new HddsVolume.Builder(dir2)
.conf(conf).datanodeUuid(datanodeId.toString()).build();
StorageVolumeUtil.checkVolume(hddsVolume2, scmId, scmId, conf, null, null);
Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong()))
.thenReturn(hddsVolume2);
Mockito.when(volumeSet.getVolumesList()).thenReturn(
singletonList(hddsVolume2));

// import container to new HddsVolume
SimpleContainerDownloader moc =
Mockito.mock(SimpleContainerDownloader.class);
Path tarFile = file1.getAbsoluteFile().toPath();
Mockito.when(
moc.getContainerDataFromReplicas(Mockito.anyLong(), Mockito.anyList(),
Mockito.any(Path.class), Mockito.any()))
.thenReturn(tarFile);

// create DownloadAndImportReplicator
ContainerSet containerSet = new ContainerSet(1000);
Assert.assertEquals(0, containerSet.getContainerMap().size());
ContainerMetrics metrics = ContainerMetrics.create(conf);
KeyValueHandler keyValueHandler =
new KeyValueHandler(conf, datanodeId.toString(), containerSet,
volumeSet, metrics, c -> { });
keyValueHandler.setClusterID(scmId);
ContainerController controller = new ContainerController(containerSet,
singletonMap(ContainerProtos.ContainerType.KeyValueContainer,
keyValueHandler));
ContainerImporter importer =
new ContainerImporter(conf, containerSet, controller, volumeSet);
ContainerReplicator replicator =
new DownloadAndImportReplicator(conf, containerSet, importer, moc);

// create replication task and execute
ReplicateContainerCommand cmd =
ReplicateContainerCommand.forTest(containerId);
cmd.setTerm(1);
replicatorRef.set(replicator);
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
.captureLogs(DownloadAndImportReplicator.LOG);
ReplicationSupervisor supervisor = ReplicationSupervisor.newBuilder()
.stateContext(context)
.executor(newDirectExecutorService())
.clock(clock)
.build();
supervisor.addTask(createTask(1L));

// verify replication succeed and check replicated container schema
GenericTestUtils.waitFor(()
-> 1 == supervisor.getReplicationSuccessCount(), 100, 3000);
GenericTestUtils.waitFor(()
-> logCapturer.getOutput()
.contains("Container 1 is replicated successfully"), 100, 3000);
KeyValueContainerData containerData = (KeyValueContainerData)
containerSet.getContainer(containerId).getContainerData();
Assert.assertEquals(schemaV3Enabled ? SCHEMA_V3 : SCHEMA_V2,
containerData.getSchemaVersion());
}

private static class BlockingTask extends AbstractReplicationTask {

private final CountDownLatch runningLatch;
Expand Down

0 comments on commit 16be2d1

Please sign in to comment.