From 8f8173d8f43bfd86c54eca6650c009420540f40f Mon Sep 17 00:00:00 2001 From: baoloongmao Date: Sun, 17 Nov 2024 14:48:43 +0800 Subject: [PATCH] Add a new UT --- .../ShuffleClientWithLocalMultiDiskTest.java | 190 ++++++++++++++++++ .../test/SparkClientWithLocalTest.java | 49 +++++ 2 files changed, 239 insertions(+) create mode 100644 integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleClientWithLocalMultiDiskTest.java diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleClientWithLocalMultiDiskTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleClientWithLocalMultiDiskTest.java new file mode 100644 index 0000000000..586bcb87d1 --- /dev/null +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleClientWithLocalMultiDiskTest.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.test; + +import java.io.File; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.io.Files; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.roaringbitmap.longlong.Roaring64NavigableMap; + +import org.apache.uniffle.client.TestUtils; +import org.apache.uniffle.client.factory.ShuffleClientFactory; +import org.apache.uniffle.client.impl.ShuffleReadClientImpl; +import org.apache.uniffle.client.impl.ShuffleWriteClientImpl; +import org.apache.uniffle.client.response.SendShuffleDataResult; +import org.apache.uniffle.common.ClientType; +import org.apache.uniffle.common.PartitionRange; +import org.apache.uniffle.common.RemoteStorageInfo; +import org.apache.uniffle.common.ShuffleBlockInfo; +import org.apache.uniffle.common.ShuffleDataDistributionType; +import org.apache.uniffle.common.ShuffleDataSegment; +import org.apache.uniffle.common.ShuffleServerInfo; +import org.apache.uniffle.common.config.RssBaseConf; +import org.apache.uniffle.common.rpc.ServerType; +import org.apache.uniffle.coordinator.CoordinatorConf; +import org.apache.uniffle.server.ShuffleDataReadEvent; +import org.apache.uniffle.server.ShuffleServer; +import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.server.storage.Raid0LocalStorageManager; +import org.apache.uniffle.storage.common.LocalStorage; +import org.apache.uniffle.storage.util.StorageType; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** This class is to test the {@link Raid0LocalStorageManager}. */ +public class ShuffleClientWithLocalMultiDiskTest extends ShuffleReadWriteBase { + + private static ShuffleServerInfo shuffleServerInfo; + private ShuffleWriteClientImpl shuffleWriteClientImpl; + + @BeforeAll + public static void setupShuffleServers() throws Exception { + CoordinatorConf coordinatorConf = getCoordinatorConf(); + createCoordinatorServer(coordinatorConf); + + ShuffleServerConf shuffleServerConf = getShuffleServerConf(ServerType.GRPC); + shuffleServerConf.setLong(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 4000000); + File tmpDir = Files.createTempDir(); + tmpDir.deleteOnExit(); + File dataDir1 = new File(tmpDir, "data1"); + File dataDir2 = new File(tmpDir, "data2"); + String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath(); + System.out.println("base: " + basePath); + shuffleServerConf.setString(RssBaseConf.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE.name()); + shuffleServerConf.setBoolean(RssBaseConf.RSS_TEST_MODE_ENABLE.key(), true); + shuffleServerConf.setString(RssBaseConf.RSS_STORAGE_BASE_PATH.key(), basePath); + shuffleServerConf.setLong(ShuffleServerConf.DISK_CAPACITY, 10 * 1024 * 1024); + shuffleServerConf.set( + ShuffleServerConf.SERVER_LOCAL_STORAGE_MANAGER_CLASS, + Raid0LocalStorageManager.class.getName()); + createShuffleServer(shuffleServerConf); + startServers(); + shuffleServerInfo = + new ShuffleServerInfo( + "127.0.0.1-20001", + grpcShuffleServers.get(0).getIp(), + grpcShuffleServers.get(0).getGrpcPort()); + } + + @BeforeEach + public void createClient() { + shuffleWriteClientImpl = + new ShuffleWriteClientImpl( + ShuffleClientFactory.newWriteBuilder() + .clientType(ClientType.GRPC.name()) + .retryMax(3) + .retryIntervalMax(1000) + .heartBeatThreadNum(1) + .replica(1) + .replicaWrite(1) + .replicaRead(1) + .replicaSkipEnabled(true) + .dataTransferPoolSize(1) + .dataCommitPoolSize(1) + .unregisterThreadPoolSize(10) + .unregisterRequestTimeSec(10)); + } + + @AfterEach + public void closeClient() { + shuffleWriteClientImpl.close(); + } + + @Test + public void testClientRemoteReadFromMultipleDisk() throws Exception { + String appId = "testClientRemoteReadFromMultipleDisk_appId"; + shuffleWriteClientImpl.registerShuffle( + shuffleServerInfo, + appId, + 0, + Lists.newArrayList(new PartitionRange(0, 0)), + new RemoteStorageInfo(""), + ShuffleDataDistributionType.NORMAL, + 1); + + Map expectedData = Maps.newHashMap(); + Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf(); + + // First committing, blocks will be written to one disk + List blocks = + createShuffleBlockList( + 0, 0, 0, 3, 25, blockIdBitmap, expectedData, Lists.newArrayList(shuffleServerInfo)); + SendShuffleDataResult result = + shuffleWriteClientImpl.sendShuffleData(appId, blocks, () -> false); + assertEquals(0, result.getFailedBlockIds().size()); + assertEquals(3, result.getSuccessBlockIds().size()); + + boolean commitResult = + shuffleWriteClientImpl.sendCommit(Sets.newHashSet(shuffleServerInfo), appId, 0, 1); + assertTrue(commitResult); + + // Mark one storage reaching high watermark, it should switch another storage for next writing + ShuffleServer shuffleServer = grpcShuffleServers.get(0); + ShuffleDataReadEvent readEvent = new ShuffleDataReadEvent(appId, 0, 0, 0, 0); + LocalStorage storage1 = + (LocalStorage) shuffleServer.getStorageManager().selectStorage(readEvent); + storage1.getMetaData().setSize(20 * 1024 * 1024); + + blocks = + createShuffleBlockList( + 0, 0, 0, 3, 25, blockIdBitmap, expectedData, Lists.newArrayList(shuffleServerInfo)); + result = shuffleWriteClientImpl.sendShuffleData(appId, blocks, () -> false); + assertEquals(0, result.getFailedBlockIds().size()); + assertEquals(3, result.getSuccessBlockIds().size()); + commitResult = + shuffleWriteClientImpl.sendCommit(Sets.newHashSet(shuffleServerInfo), appId, 0, 1); + assertTrue(commitResult); + + readEvent = new ShuffleDataReadEvent(appId, 0, 0, 0, ShuffleDataSegment.wrapOffset(0, 1)); + LocalStorage storage2 = + (LocalStorage) shuffleServer.getStorageManager().selectStorage(readEvent); + assertNotEquals(storage1, storage2); + + ShuffleReadClientImpl readClient = + ShuffleClientFactory.newReadBuilder() + .clientType(ClientType.GRPC) + .storageType(StorageType.LOCALFILE.name()) + .appId(appId) + .shuffleId(0) + .partitionId(0) + .indexReadLimit(100) + .partitionNumPerRange(1) + .partitionNum(1) + .readBufferSize(1000) + .basePath("") + .blockIdBitmap(blockIdBitmap) + .taskIdBitmap(Roaring64NavigableMap.bitmapOf(0)) + .shuffleServerInfoList(Lists.newArrayList(shuffleServerInfo)) + .build(); + + TestUtils.validateResult(readClient, expectedData); + readClient.checkProcessedBlockIds(); + readClient.close(); + } +} diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java index 11e60540ee..c3e0afad63 100644 --- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java +++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java @@ -46,15 +46,20 @@ import org.apache.uniffle.common.ClientType; import org.apache.uniffle.common.PartitionRange; import org.apache.uniffle.common.ShuffleBlockInfo; +import org.apache.uniffle.common.ShuffleDataSegment; import org.apache.uniffle.common.ShuffleServerInfo; import org.apache.uniffle.common.rpc.ServerType; import org.apache.uniffle.common.util.BlockId; import org.apache.uniffle.common.util.BlockIdLayout; import org.apache.uniffle.common.util.RssUtils; import org.apache.uniffle.coordinator.CoordinatorConf; +import org.apache.uniffle.server.ShuffleDataReadEvent; +import org.apache.uniffle.server.ShuffleServer; import org.apache.uniffle.server.ShuffleServerConf; +import org.apache.uniffle.storage.common.LocalStorage; import org.apache.uniffle.storage.util.StorageType; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -497,6 +502,50 @@ public void readTest10(boolean isNettyMode) throws Exception { readClient.close(); } + @ParameterizedTest + @MethodSource("isNettyModeProvider") + public void testClientRemoteReadFromMultipleDisk(boolean isNettyMode) { + String testAppId = "testClientRemoteReadFromMultipleDisk_appId"; + registerApp(testAppId, Lists.newArrayList(new PartitionRange(0, 0)), isNettyMode); + + // Send shuffle data + Map expectedData = Maps.newHashMap(); + Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf(); + Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0); + + List blocks = + createShuffleBlockList(0, 0, 0, 5, 30, blockIdBitmap, expectedData, mockSSI); + sendTestData(testAppId, blocks, isNettyMode); + + List shuffleServers = isNettyMode ? nettyShuffleServers : grpcShuffleServers; + // Mark one storage reaching high watermark, it should switch another storage for next writing + ShuffleServer shuffleServer = shuffleServers.get(0); + ShuffleDataReadEvent readEvent = new ShuffleDataReadEvent(testAppId, 0, 0, 0, 0); + LocalStorage storage1 = + (LocalStorage) shuffleServer.getStorageManager().selectStorage(readEvent); + storage1.getMetaData().setSize(20 * 1024 * 1024); + + blocks = createShuffleBlockList(0, 0, 0, 3, 25, blockIdBitmap, expectedData, mockSSI); + sendTestData(testAppId, blocks, isNettyMode); + + readEvent = new ShuffleDataReadEvent(testAppId, 0, 0, 0, ShuffleDataSegment.wrapOffset(0, 1)); + LocalStorage storage2 = + (LocalStorage) shuffleServer.getStorageManager().selectStorage(readEvent); + assertNotEquals(storage1, storage2); + + // unexpected taskAttemptId should be filtered + ShuffleReadClientImpl readClient = + baseReadBuilder(isNettyMode) + .appId(testAppId) + .blockIdBitmap(blockIdBitmap) + .taskIdBitmap(taskIdBitmap) + .build(); + + validateResult(readClient, expectedData); + readClient.checkProcessedBlockIds(); + readClient.close(); + } + protected void registerApp( String testAppId, List partitionRanges, boolean isNettyMode) { ShuffleServerGrpcClient shuffleServerClient =