diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java index 7efbe8a1b3f..dec69d77e87 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainer.java @@ -33,10 +33,12 @@ import org.apache.hadoop.hdds.utils.db.RocksDatabase.ColumnFamily; import org.apache.hadoop.hdds.utils.db.Table; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml; import org.apache.hadoop.ozone.container.common.interfaces.DBHandle; +import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; import org.apache.hadoop.ozone.container.common.utils.DatanodeStoreCache; import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil; @@ -61,6 +63,7 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.jupiter.api.Assertions; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; @@ -88,7 +91,11 @@ import java.util.stream.IntStream; import java.util.stream.Stream; +import static java.util.Collections.singletonList; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DB_PROFILE; +import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V2; +import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V3; +import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.CONTAINER_SCHEMA_V3_ENABLED; import static org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil.isSameSchemaVersion; import static org.apache.hadoop.ozone.container.replication.CopyContainerCompression.NO_COMPRESSION; import static org.apache.ratis.util.Preconditions.assertTrue; @@ -885,4 +892,111 @@ public void testIsEmptyContainerStateWhileImportWithoutBlock() Assert.assertTrue(container.getContainerData().isEmpty()); } } + + /** + * Test import schema V2 replica to V3 enabled HddsVolume. + */ + @Test + public void testImportV2ReplicaToV3HddsVolume() throws Exception { + final String testDir = GenericTestUtils.getTempPath( + TestKeyValueContainer.class.getSimpleName() + "-" + + UUID.randomUUID()); + try { + testMixedSchemaImport(testDir, false); + } finally { + FileUtils.deleteDirectory(new File(testDir)); + } + } + + /** + * Test import schema V3 replica to V3 disabled HddsVolume. + */ + @Test + public void testImportV3ReplicaToV2HddsVolume() throws Exception { + final String testDir = GenericTestUtils.getTempPath( + TestKeyValueContainer.class.getSimpleName() + "-" + + UUID.randomUUID()); + try { + testMixedSchemaImport(testDir, true); + } finally { + FileUtils.deleteDirectory(new File(testDir)); + } + } + + private void testMixedSchemaImport(String dir, + boolean schemaV3Enabled) throws IOException { + OzoneConfiguration conf = new OzoneConfiguration(); + String scmId = UUID.randomUUID().toString(); + UUID datanodeId = UUID.randomUUID(); + final String dir1 = dir + (schemaV3Enabled ? "/v3" : "/v2"); + + // create HddsVolume + VolumeChoosingPolicy volumeChoosingPolicy = + mock(RoundRobinVolumeChoosingPolicy.class); + HddsVolume hddsVolume1 = new HddsVolume.Builder(dir1) + .conf(conf).datanodeUuid(datanodeId.toString()).build(); + conf.setBoolean(CONTAINER_SCHEMA_V3_ENABLED, schemaV3Enabled); + StorageVolumeUtil.checkVolume(hddsVolume1, scmId, scmId, conf, null, null); + Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong())) + .thenReturn(hddsVolume1); + MutableVolumeSet volumeSet = mock(MutableVolumeSet.class); + + // create container + long containerId = 1; + KeyValueContainerData data = new KeyValueContainerData(containerId, + ContainerLayoutVersion.FILE_PER_BLOCK, + ContainerTestHelper.CONTAINER_MAX_SIZE, UUID.randomUUID().toString(), + UUID.randomUUID().toString()); + KeyValueContainer container = new KeyValueContainer(data, conf); + container.create(volumeSet, volumeChoosingPolicy, scmId); + long pendingDeleteBlockCount = 20; + try (DBHandle meta = BlockUtils.getDB(data, conf)) { + Table metadataTable = meta.getStore().getMetadataTable(); + metadataTable.put(data.getPendingDeleteBlockCountKey(), + pendingDeleteBlockCount); + } + container.close(); + + // verify container schema + if (schemaV3Enabled) { + Assert.assertEquals(SCHEMA_V3, + container.getContainerData().getSchemaVersion()); + } else { + Assert.assertEquals(SCHEMA_V2, + container.getContainerData().getSchemaVersion()); + } + + //export container + TarContainerPacker packer = new TarContainerPacker(NO_COMPRESSION); + File file1 = new File(dir1 + "/" + containerId); + if (!file1.createNewFile()) { + Assertions.fail("Failed to create file " + file1.getAbsolutePath()); + } + try (FileOutputStream fos = new FileOutputStream(file1)) { + container.exportContainerData(fos, packer); + } + + // create new HddsVolume + conf.setBoolean(CONTAINER_SCHEMA_V3_ENABLED, !schemaV3Enabled); + final String dir2 = dir + (schemaV3Enabled ? "/v2" : "/v3"); + HddsVolume hddsVolume2 = new HddsVolume.Builder(dir2) + .conf(conf).datanodeUuid(datanodeId.toString()).build(); + StorageVolumeUtil.checkVolume(hddsVolume2, scmId, scmId, conf, null, null); + Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong())) + .thenReturn(hddsVolume2); + Mockito.when(volumeSet.getVolumesList()).thenReturn( + singletonList(hddsVolume2)); + + // import container to new HddsVolume + KeyValueContainer importedContainer = new KeyValueContainer(data, conf); + importedContainer.populatePathFields(scmId, hddsVolume2); + try (FileInputStream fio = new FileInputStream(file1)) { + importedContainer.importContainerData(fio, packer); + } + + Assert.assertEquals(schemaV3Enabled ? SCHEMA_V3 : SCHEMA_V2, + importedContainer.getContainerData().getSchemaVersion()); + Assert.assertEquals(pendingDeleteBlockCount, + importedContainer.getContainerData().getNumPendingDeletionBlocks()); + } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index f38c62615db..739a4f59653 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -18,8 +18,6 @@ package org.apache.hadoop.ozone.container.replication; -import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; import java.nio.file.Path; import java.nio.file.Paths; @@ -35,44 +33,31 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; -import org.apache.commons.io.FileUtils; import org.apache.hadoop.hdds.client.ECReplicationConfig; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority; import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; -import org.apache.hadoop.ozone.container.ContainerTestHelper; -import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; -import org.apache.hadoop.ozone.container.common.interfaces.DBHandle; -import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; -import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; -import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCommandInfo; import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinatorTask; import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; -import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; -import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; -import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; -import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand; import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; import org.apache.ozone.test.GenericTestUtils; @@ -90,22 +75,14 @@ import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; -import static java.util.Collections.singletonMap; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE; -import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V2; -import static org.apache.hadoop.ozone.OzoneConsts.SCHEMA_V3; -import static org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.CONTAINER_SCHEMA_V3_ENABLED; import static org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status.DONE; -import static org.apache.hadoop.ozone.container.replication.CopyContainerCompression.NO_COMPRESSION; import static org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand.fromSources; import static org.junit.jupiter.api.Assertions.fail; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority.LOW; import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority.NORMAL; -import static org.mockito.ArgumentMatchers.anyList; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.Mockito.mock; /** * Test the replication supervisor. @@ -483,149 +460,6 @@ public void testPriorityOrdering() throws InterruptedException { supervisor.getInFlightReplications(BlockingTask.class)); } - /** - * Test replicate schema V2 replica to V3 enabled datanode. - */ - @Test - public void testV2ToV3Replication() throws Exception { - final String testDir = GenericTestUtils.getTempPath( - TestReplicationSupervisor.class.getSimpleName() + "-" - + UUID.randomUUID()); - try { - testMixedSchemaReplication(testDir, false); - } finally { - FileUtils.deleteDirectory(new File(testDir)); - } - } - - /** - * Test replicate schema V3 replica to V3 disabled datanode. - */ - @Test - public void testV3ToV2Replication() throws Exception { - final String testDir = GenericTestUtils.getTempPath( - TestReplicationSupervisor.class.getSimpleName() + "-" - + UUID.randomUUID()); - try { - testMixedSchemaReplication(testDir, true); - } finally { - FileUtils.deleteDirectory(new File(testDir)); - } - } - - private void testMixedSchemaReplication(String dir, - boolean schemaV3Enabled) throws IOException, InterruptedException, - TimeoutException { - OzoneConfiguration conf = new OzoneConfiguration(); - String scmId = UUID.randomUUID().toString(); - UUID datanodeId = UUID.randomUUID(); - final String dir1 = dir + (schemaV3Enabled ? "/v3" : "/v2"); - - // create HddsVolume - VolumeChoosingPolicy volumeChoosingPolicy = - mock(RoundRobinVolumeChoosingPolicy.class); - HddsVolume hddsVolume1 = new HddsVolume.Builder(dir1) - .conf(conf).datanodeUuid(datanodeId.toString()).build(); - conf.setBoolean(CONTAINER_SCHEMA_V3_ENABLED, schemaV3Enabled); - StorageVolumeUtil.checkVolume(hddsVolume1, scmId, scmId, conf, null, null); - Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong())) - .thenReturn(hddsVolume1); - MutableVolumeSet volumeSet = mock(MutableVolumeSet.class); - - // create container - long containerId = 1; - KeyValueContainerData data = new KeyValueContainerData(containerId, - ContainerLayoutVersion.FILE_PER_BLOCK, - ContainerTestHelper.CONTAINER_MAX_SIZE, UUID.randomUUID().toString(), - UUID.randomUUID().toString()); - KeyValueContainer container = new KeyValueContainer(data, conf); - container.create(volumeSet, volumeChoosingPolicy, scmId); - try (DBHandle meta = BlockUtils.getDB(data, conf)) { - data.resetPendingDeleteBlockCount(meta); - } - container.close(); - - // verify container schema - if (schemaV3Enabled) { - Assert.assertEquals(SCHEMA_V3, - container.getContainerData().getSchemaVersion()); - } else { - Assert.assertEquals(SCHEMA_V2, - container.getContainerData().getSchemaVersion()); - } - - //export container - TarContainerPacker packer = new TarContainerPacker(NO_COMPRESSION); - File file1 = new File(dir1 + "/" + containerId); - if (!file1.createNewFile()) { - fail("Failed to create file " + file1.getAbsolutePath()); - } - try (FileOutputStream fos = new FileOutputStream(file1)) { - container.exportContainerData(fos, packer); - } - - // create new HddsVolume - conf.setBoolean(CONTAINER_SCHEMA_V3_ENABLED, !schemaV3Enabled); - final String dir2 = dir + (schemaV3Enabled ? "/v2" : "/v3"); - HddsVolume hddsVolume2 = new HddsVolume.Builder(dir2) - .conf(conf).datanodeUuid(datanodeId.toString()).build(); - StorageVolumeUtil.checkVolume(hddsVolume2, scmId, scmId, conf, null, null); - Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong())) - .thenReturn(hddsVolume2); - Mockito.when(volumeSet.getVolumesList()).thenReturn( - singletonList(hddsVolume2)); - - // import container to new HddsVolume - SimpleContainerDownloader moc = - Mockito.mock(SimpleContainerDownloader.class); - Path tarFile = file1.getAbsoluteFile().toPath(); - Mockito.when( - moc.getContainerDataFromReplicas(Mockito.anyLong(), Mockito.anyList(), - Mockito.any(Path.class), Mockito.any())) - .thenReturn(tarFile); - - // create DownloadAndImportReplicator - ContainerSet containerSet = new ContainerSet(1000); - Assert.assertEquals(0, containerSet.getContainerMap().size()); - ContainerMetrics metrics = ContainerMetrics.create(conf); - KeyValueHandler keyValueHandler = - new KeyValueHandler(conf, datanodeId.toString(), containerSet, - volumeSet, metrics, c -> { }); - keyValueHandler.setClusterID(scmId); - ContainerController controller = new ContainerController(containerSet, - singletonMap(ContainerProtos.ContainerType.KeyValueContainer, - keyValueHandler)); - ContainerImporter importer = - new ContainerImporter(conf, containerSet, controller, volumeSet); - ContainerReplicator replicator = - new DownloadAndImportReplicator(conf, containerSet, importer, moc); - - // create replication task and execute - ReplicateContainerCommand cmd = - ReplicateContainerCommand.forTest(containerId); - cmd.setTerm(1); - replicatorRef.set(replicator); - GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer - .captureLogs(DownloadAndImportReplicator.LOG); - ReplicationSupervisor supervisor = ReplicationSupervisor.newBuilder() - .stateContext(context) - .executor(newDirectExecutorService()) - .clock(clock) - .build(); - supervisor.addTask(createTask(1L)); - - // verify replication succeed and check replicated container schema - GenericTestUtils.waitFor(() - -> 1 == supervisor.getReplicationSuccessCount(), 100, 3000); - GenericTestUtils.waitFor(() - -> logCapturer.getOutput() - .contains("Container 1 is replicated successfully"), 100, 3000); - KeyValueContainerData containerData = (KeyValueContainerData) - containerSet.getContainer(containerId).getContainerData(); - Assert.assertEquals(schemaV3Enabled ? SCHEMA_V3 : SCHEMA_V2, - containerData.getSchemaVersion()); - } - private static class BlockingTask extends AbstractReplicationTask { private final CountDownLatch runningLatch;