Skip to content

Commit

Permalink
Add a new UT
Browse files Browse the repository at this point in the history
  • Loading branch information
maobaolong committed Nov 17, 2024
1 parent 58f4644 commit 8f8173d
Show file tree
Hide file tree
Showing 2 changed files with 239 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.test;

import java.io.File;
import java.util.List;
import java.util.Map;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.roaringbitmap.longlong.Roaring64NavigableMap;

import org.apache.uniffle.client.TestUtils;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
import org.apache.uniffle.client.response.SendShuffleDataResult;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleDataSegment;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleDataReadEvent;
import org.apache.uniffle.server.ShuffleServer;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.storage.Raid0LocalStorageManager;
import org.apache.uniffle.storage.common.LocalStorage;
import org.apache.uniffle.storage.util.StorageType;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

/** This class is to test the {@link Raid0LocalStorageManager}. */
public class ShuffleClientWithLocalMultiDiskTest extends ShuffleReadWriteBase {

private static ShuffleServerInfo shuffleServerInfo;
private ShuffleWriteClientImpl shuffleWriteClientImpl;

@BeforeAll
public static void setupShuffleServers() throws Exception {
CoordinatorConf coordinatorConf = getCoordinatorConf();
createCoordinatorServer(coordinatorConf);

ShuffleServerConf shuffleServerConf = getShuffleServerConf(ServerType.GRPC);
shuffleServerConf.setLong(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 4000000);
File tmpDir = Files.createTempDir();
tmpDir.deleteOnExit();
File dataDir1 = new File(tmpDir, "data1");
File dataDir2 = new File(tmpDir, "data2");
String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath();
System.out.println("base: " + basePath);
shuffleServerConf.setString(RssBaseConf.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE.name());
shuffleServerConf.setBoolean(RssBaseConf.RSS_TEST_MODE_ENABLE.key(), true);
shuffleServerConf.setString(RssBaseConf.RSS_STORAGE_BASE_PATH.key(), basePath);
shuffleServerConf.setLong(ShuffleServerConf.DISK_CAPACITY, 10 * 1024 * 1024);
shuffleServerConf.set(
ShuffleServerConf.SERVER_LOCAL_STORAGE_MANAGER_CLASS,
Raid0LocalStorageManager.class.getName());
createShuffleServer(shuffleServerConf);
startServers();
shuffleServerInfo =
new ShuffleServerInfo(
"127.0.0.1-20001",
grpcShuffleServers.get(0).getIp(),
grpcShuffleServers.get(0).getGrpcPort());
}

@BeforeEach
public void createClient() {
shuffleWriteClientImpl =
new ShuffleWriteClientImpl(
ShuffleClientFactory.newWriteBuilder()
.clientType(ClientType.GRPC.name())
.retryMax(3)
.retryIntervalMax(1000)
.heartBeatThreadNum(1)
.replica(1)
.replicaWrite(1)
.replicaRead(1)
.replicaSkipEnabled(true)
.dataTransferPoolSize(1)
.dataCommitPoolSize(1)
.unregisterThreadPoolSize(10)
.unregisterRequestTimeSec(10));
}

@AfterEach
public void closeClient() {
shuffleWriteClientImpl.close();
}

@Test
public void testClientRemoteReadFromMultipleDisk() throws Exception {
String appId = "testClientRemoteReadFromMultipleDisk_appId";
shuffleWriteClientImpl.registerShuffle(
shuffleServerInfo,
appId,
0,
Lists.newArrayList(new PartitionRange(0, 0)),
new RemoteStorageInfo(""),
ShuffleDataDistributionType.NORMAL,
1);

Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();

// First committing, blocks will be written to one disk
List<ShuffleBlockInfo> blocks =
createShuffleBlockList(
0, 0, 0, 3, 25, blockIdBitmap, expectedData, Lists.newArrayList(shuffleServerInfo));
SendShuffleDataResult result =
shuffleWriteClientImpl.sendShuffleData(appId, blocks, () -> false);
assertEquals(0, result.getFailedBlockIds().size());
assertEquals(3, result.getSuccessBlockIds().size());

boolean commitResult =
shuffleWriteClientImpl.sendCommit(Sets.newHashSet(shuffleServerInfo), appId, 0, 1);
assertTrue(commitResult);

// Mark one storage reaching high watermark, it should switch another storage for next writing
ShuffleServer shuffleServer = grpcShuffleServers.get(0);
ShuffleDataReadEvent readEvent = new ShuffleDataReadEvent(appId, 0, 0, 0, 0);
LocalStorage storage1 =
(LocalStorage) shuffleServer.getStorageManager().selectStorage(readEvent);
storage1.getMetaData().setSize(20 * 1024 * 1024);

blocks =
createShuffleBlockList(
0, 0, 0, 3, 25, blockIdBitmap, expectedData, Lists.newArrayList(shuffleServerInfo));
result = shuffleWriteClientImpl.sendShuffleData(appId, blocks, () -> false);
assertEquals(0, result.getFailedBlockIds().size());
assertEquals(3, result.getSuccessBlockIds().size());
commitResult =
shuffleWriteClientImpl.sendCommit(Sets.newHashSet(shuffleServerInfo), appId, 0, 1);
assertTrue(commitResult);

readEvent = new ShuffleDataReadEvent(appId, 0, 0, 0, ShuffleDataSegment.wrapOffset(0, 1));
LocalStorage storage2 =
(LocalStorage) shuffleServer.getStorageManager().selectStorage(readEvent);
assertNotEquals(storage1, storage2);

ShuffleReadClientImpl readClient =
ShuffleClientFactory.newReadBuilder()
.clientType(ClientType.GRPC)
.storageType(StorageType.LOCALFILE.name())
.appId(appId)
.shuffleId(0)
.partitionId(0)
.indexReadLimit(100)
.partitionNumPerRange(1)
.partitionNum(1)
.readBufferSize(1000)
.basePath("")
.blockIdBitmap(blockIdBitmap)
.taskIdBitmap(Roaring64NavigableMap.bitmapOf(0))
.shuffleServerInfoList(Lists.newArrayList(shuffleServerInfo))
.build();

TestUtils.validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
readClient.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,20 @@
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataSegment;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.BlockIdLayout;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleDataReadEvent;
import org.apache.uniffle.server.ShuffleServer;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.common.LocalStorage;
import org.apache.uniffle.storage.util.StorageType;

import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
Expand Down Expand Up @@ -497,6 +502,50 @@ public void readTest10(boolean isNettyMode) throws Exception {
readClient.close();
}

@ParameterizedTest
@MethodSource("isNettyModeProvider")
public void testClientRemoteReadFromMultipleDisk(boolean isNettyMode) {
String testAppId = "testClientRemoteReadFromMultipleDisk_appId";
registerApp(testAppId, Lists.newArrayList(new PartitionRange(0, 0)), isNettyMode);

// Send shuffle data
Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);

List<ShuffleBlockInfo> blocks =
createShuffleBlockList(0, 0, 0, 5, 30, blockIdBitmap, expectedData, mockSSI);
sendTestData(testAppId, blocks, isNettyMode);

List<ShuffleServer> shuffleServers = isNettyMode ? nettyShuffleServers : grpcShuffleServers;
// Mark one storage reaching high watermark, it should switch another storage for next writing
ShuffleServer shuffleServer = shuffleServers.get(0);
ShuffleDataReadEvent readEvent = new ShuffleDataReadEvent(testAppId, 0, 0, 0, 0);
LocalStorage storage1 =
(LocalStorage) shuffleServer.getStorageManager().selectStorage(readEvent);
storage1.getMetaData().setSize(20 * 1024 * 1024);

blocks = createShuffleBlockList(0, 0, 0, 3, 25, blockIdBitmap, expectedData, mockSSI);
sendTestData(testAppId, blocks, isNettyMode);

readEvent = new ShuffleDataReadEvent(testAppId, 0, 0, 0, ShuffleDataSegment.wrapOffset(0, 1));
LocalStorage storage2 =
(LocalStorage) shuffleServer.getStorageManager().selectStorage(readEvent);
assertNotEquals(storage1, storage2);

// unexpected taskAttemptId should be filtered
ShuffleReadClientImpl readClient =
baseReadBuilder(isNettyMode)
.appId(testAppId)
.blockIdBitmap(blockIdBitmap)
.taskIdBitmap(taskIdBitmap)
.build();

validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
readClient.close();
}

protected void registerApp(
String testAppId, List<PartitionRange> partitionRanges, boolean isNettyMode) {
ShuffleServerGrpcClient shuffleServerClient =
Expand Down

0 comments on commit 8f8173d

Please sign in to comment.