Skip to content

Commit

Permalink
Merge branch 'main' into chhwang/code-coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
chhwang authored Nov 14, 2023
2 parents 18d7c88 + 3521fb0 commit 9745fc3
Show file tree
Hide file tree
Showing 15 changed files with 143 additions and 179 deletions.
28 changes: 14 additions & 14 deletions .azure-pipelines/multi-nodes-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
curl -L https://github.com/Kitware/CMake/releases/download/v3.26.4/cmake-3.26.4-linux-x86_64.tar.gz -o /tmp/cmake-3.26.4-linux-x86_64.tar.gz
tar xzf /tmp/cmake-3.26.4-linux-x86_64.tar.gz -C /tmp
mkdir build && cd build
MPI_HOME=/usr/local/mpi /tmp/cmake-3.26.4-linux-x86_64/bin/cmake -DCMAKE_BUILD_TYPE=Release ..
MPI_HOME=/usr/local/mpi /tmp/cmake-3.26.4-linux-x86_64/bin/cmake -DCMAKE_BUILD_TYPE=Release -DBYPASS_PEERMEM_CHECK=ON ..
make -j
make pylib-copy
workingDirectory: '$(System.DefaultWorkingDirectory)'
Expand All @@ -53,11 +53,11 @@ jobs:
name: StartVMSS
displayName: Start VMSS
inputs:
azureSubscription: mscclpp
azureSubscription: msccl-it
scriptType: bash
scriptLocation: inlineScript
inlineScript: |
az vmss start --name mscclpp-it-vmss --resource-group msccl-dev
az vmss start --name mscclit-vmss --resource-group msccl-IT
- task: Bash@3
name: DeployTestEnv
Expand All @@ -79,10 +79,10 @@ jobs:
KeyFilePath=${SSHKEYFILE_SECUREFILEPATH}
rm -rf output/*
mkdir -p output
touch output/mscclpp-it-000000
tail -f output/mscclpp-it-000000 &
touch output/mscclit-000000
tail -f output/mscclit-000000 &
CHILD_PID=$!
parallel-ssh -t 0 -H mscclpp-it-000000 -l azureuser -x "-i ${KeyFilePath}" \
parallel-ssh -t 0 -H mscclit-000000 -l azureuser -x "-i ${KeyFilePath}" \
-O $SSH_OPTION -o output 'sudo docker exec -t mscclpp-test bash /root/mscclpp/run_tests.sh mscclpp-test'
kill $CHILD_PID
Expand All @@ -98,10 +98,10 @@ jobs:
KeyFilePath=${SSHKEYFILE_SECUREFILEPATH}
rm -rf output/*
mkdir -p output
touch output/mscclpp-it-000000
tail -f output/mscclpp-it-000000 &
touch output/mscclit-000000
tail -f output/mscclit-000000 &
CHILD_PID=$!
parallel-ssh -t 0 -H mscclpp-it-000000 -l azureuser -x "-i ${KeyFilePath}" \
parallel-ssh -t 0 -H mscclit-000000 -l azureuser -x "-i ${KeyFilePath}" \
-O $SSH_OPTION -o output 'sudo docker exec -t mscclpp-test bash /root/mscclpp/run_tests.sh mp-ut'
kill $CHILD_PID
Expand All @@ -117,10 +117,10 @@ jobs:
KeyFilePath=${SSHKEYFILE_SECUREFILEPATH}
rm -rf output/*
mkdir -p output
touch output/mscclpp-it-000000
tail -f output/mscclpp-it-000000 &
touch output/mscclit-000000
tail -f output/mscclit-000000 &
CHILD_PID=$!
parallel-ssh -t 0 -H mscclpp-it-000000 -l azureuser -x "-i ${KeyFilePath}" \
parallel-ssh -t 0 -H mscclit-000000 -l azureuser -x "-i ${KeyFilePath}" \
-O $SSH_OPTION -o output 'sudo docker exec -t mscclpp-test bash /root/mscclpp/run_tests.sh pytests'
kill $CHILD_PID
Expand All @@ -129,8 +129,8 @@ jobs:
displayName: Deallocate VMSS
condition: always()
inputs:
azureSubscription: mscclpp
azureSubscription: msccl-it
scriptType: bash
scriptLocation: inlineScript
inlineScript: |
az vmss deallocate --name mscclpp-it-vmss --resource-group msccl-dev
az vmss deallocate --name mscclit-vmss --resource-group msccl-IT
5 changes: 0 additions & 5 deletions include/mscclpp/core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -551,11 +551,6 @@ class NonblockingFuture {
/// @param future The shared future to move.
NonblockingFuture(std::shared_future<T>&& future) : future(std::move(future)) {}

/// Copy constructor.
///
/// @param other The @ref NonblockingFuture to copy.
NonblockingFuture(const NonblockingFuture& other) = default;

/// Check if the value is ready to be retrieved.
///
/// @return True if the value is ready, false otherwise.
Expand Down
4 changes: 2 additions & 2 deletions include/mscclpp/sm_channel_device.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ __forceinline__ __device__ void store<longlong2>(longlong2* p, const longlong2&
template <>
__forceinline__ __device__ void load<int4>(int4& v, const int4* p) {
asm volatile("ld.volatile.global.v4.u32 {%0,%1,%2,%3}, [%4];"
: "=r"(v.w), "=r"(v.x), "=r"(v.y), "=r"(v.z)
: "=r"(v.x), "=r"(v.y), "=r"(v.z), "=r"(v.w)
: "l"(p)
: "memory");
}
Expand All @@ -106,7 +106,7 @@ template <>
__forceinline__ __device__ void store<int4>(int4* p, const int4& v) {
asm volatile("st.volatile.global.v4.u32 [%0], {%1,%2,%3,%4};"
:
: "l"(p), "r"(v.w), "r"(v.x), "r"(v.y), "r"(v.z)
: "l"(p), "r"(v.x), "r"(v.y), "r"(v.z), "r"(v.w)
: "memory");
}

Expand Down
2 changes: 1 addition & 1 deletion python/test/_cpp/proxy_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class MyProxyService {
semaphores_(semaphores),
proxy_([&](mscclpp::ProxyTrigger triggerRaw) { return handleTrigger(triggerRaw); }, [&]() { bindThread(); }) {
int cudaDevice;
cudaGetDevice(&cudaDevice);
MSCCLPP_CUDATHROW(cudaGetDevice(&cudaDevice));
deviceNumaNode_ = mscclpp::getDeviceNumaNode(cudaDevice);
}

Expand Down
46 changes: 16 additions & 30 deletions test/allgather_test_cpp.cu
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,6 @@

static int nranksPerNode = 8;

// Propagate errors up

#define MSCCLPPCHECK(call) \
do { \
mscclppResult_t res = call; \
if (res != mscclppSuccess && res != mscclppInProgress) { \
/* Print the back trace*/ \
printf("Failure at %s:%d -> %s\n", __FILE__, __LINE__, mscclppGetErrorString(res)); \
return res; \
} \
} while (0)

// Check CUDA RT calls
#define CUDACHECK(cmd) \
do { \
Expand All @@ -54,8 +42,7 @@ template <class T>
using DeviceHandle = mscclpp::DeviceHandle<T>;
__constant__ DeviceHandle<mscclpp::SimpleProxyChannel> constProxyChans[16];

__device__ void allgather0(DeviceHandle<mscclpp::SimpleProxyChannel> proxyChan, int rank, int world_size,
int remoteRank, size_t nelemsPerGPU) {
__device__ void allgather0(DeviceHandle<mscclpp::SimpleProxyChannel> proxyChan, int rank, size_t nelemsPerGPU) {
// this allgather is really simple and implemented as an alltoall

// this thread's role is a sender role
Expand All @@ -70,8 +57,8 @@ __device__ void allgather0(DeviceHandle<mscclpp::SimpleProxyChannel> proxyChan,
if ((threadIdx.x % 32) == 0) proxyChan.wait();
}

__device__ void localAllGather(DeviceHandle<mscclpp::SimpleProxyChannel> proxyChan, int rank, int world_size,
int nranksPerNode, int remoteRank, uint64_t offset, uint64_t size) {
__device__ void localAllGather(DeviceHandle<mscclpp::SimpleProxyChannel> proxyChan, int rank, int nranksPerNode,
int remoteRank, uint64_t offset, uint64_t size) {
// this allgather algorithm works as follows:
// Step 1: GPU rank i sends data to GPU rank (i+1) % nranksPerNode
// and waits for data from GPU rank (i-1) % nranksPerNode
Expand All @@ -91,9 +78,9 @@ __device__ void localAllGather(DeviceHandle<mscclpp::SimpleProxyChannel> proxyCh
}
}

__device__ void allgather1(DeviceHandle<mscclpp::SimpleProxyChannel> proxyChan, int rank, int world_size,
int nranksPerNode, int remoteRank, size_t nelemsPerGPU) {
localAllGather(proxyChan, rank, world_size, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int),
__device__ void allgather1(DeviceHandle<mscclpp::SimpleProxyChannel> proxyChan, int rank, int nranksPerNode,
int remoteRank, size_t nelemsPerGPU) {
localAllGather(proxyChan, rank, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int),
nelemsPerGPU * sizeof(int));
if (remoteRank / nranksPerNode == rank / nranksPerNode)
if ((threadIdx.x % 32) == 0) proxyChan.flush();
Expand All @@ -116,7 +103,7 @@ __device__ void allgather2(DeviceHandle<mscclpp::SimpleProxyChannel> proxyChan,
// Step 1
// local allgather
if (remoteRank / nranksPerNode == rank / nranksPerNode) {
localAllGather(proxyChan, rank, world_size, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int),
localAllGather(proxyChan, rank, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int),
nelemsPerGPU * sizeof(int));
}
// cross-node exchange
Expand All @@ -134,7 +121,7 @@ __device__ void allgather2(DeviceHandle<mscclpp::SimpleProxyChannel> proxyChan,
// local allgather
int otherNghr = (rank + nranksPerNode) % world_size;
if (remoteRank / nranksPerNode == rank / nranksPerNode) {
localAllGather(proxyChan, rank, world_size, nranksPerNode, remoteRank, otherNghr * nelemsPerGPU * sizeof(int),
localAllGather(proxyChan, rank, nranksPerNode, remoteRank, otherNghr * nelemsPerGPU * sizeof(int),
(nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int));
}

Expand All @@ -152,7 +139,7 @@ __device__ void allgather2(DeviceHandle<mscclpp::SimpleProxyChannel> proxyChan,
// Step 3
// local allgather
if (remoteRank / nranksPerNode == rank / nranksPerNode) {
localAllGather(proxyChan, rank, world_size, nranksPerNode, remoteRank,
localAllGather(proxyChan, rank, nranksPerNode, remoteRank,
(otherNghr * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) * sizeof(int),
nelemsPerGPU / pipelineSize * sizeof(int));
}
Expand All @@ -170,9 +157,9 @@ __global__ void kernel(int rank, int world_size, int nranksPerNode, size_t nelem
DeviceHandle<mscclpp::SimpleProxyChannel> proxyChan = constProxyChans[warpId];

if (kernel == 0)
allgather0(proxyChan, rank, world_size, remoteRank, nelemsPerGPU);
allgather0(proxyChan, rank, nelemsPerGPU);
else if (kernel == 1)
allgather1(proxyChan, rank, world_size, nranksPerNode, remoteRank, nelemsPerGPU);
allgather1(proxyChan, rank, nranksPerNode, remoteRank, nelemsPerGPU);
else if (kernel == 2)
allgather2(proxyChan, rank, world_size, nranksPerNode, remoteRank, nelemsPerGPU);
}
Expand Down Expand Up @@ -388,7 +375,6 @@ int main(int argc, const char* argv[]) {
}
ip_port = (char*)parsedArgs["ip_port"].c_str();

int thisNode = rankToNode(rank);
int cudaNum = rankToLocalRank(rank);
CUDACHECK(cudaSetDevice(cudaNum));

Expand Down Expand Up @@ -452,19 +438,19 @@ int main(int argc, const char* argv[]) {
if (rank == 0) printf("Capturing %d iterations of the kernel in a CUDA graph\n", cudagraphiter);
cudaGraph_t graph;
cudaGraphExec_t instance;
cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal);
CUDACHECK(cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal));
for (int i = 0; i < cudagraphiter; ++i) {
kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size, nranksPerNode, nelemsPerGPU, kernelNum);
}
cudaStreamEndCapture(stream, &graph);
cudaGraphInstantiate(&instance, graph, NULL, NULL, 0);
CUDACHECK(cudaStreamEndCapture(stream, &graph));
CUDACHECK(cudaGraphInstantiate(&instance, graph, NULL, NULL, 0));

int cudagraphwarmup = 10;
if (rank == 0)
printf("Warming up %d iterations of the CUDA graph with %d iterations of the kernel\n", cudagraphwarmup,
cudagraphiter);
for (int i = 0; i < cudagraphwarmup; ++i) {
cudaGraphLaunch(instance, stream);
CUDACHECK(cudaGraphLaunch(instance, stream));
}
CUDACHECK(cudaStreamSynchronize(stream));

Expand All @@ -477,7 +463,7 @@ int main(int argc, const char* argv[]) {
double t0, t1, ms, time_in_us;
t0 = getTime();
for (int i = 0; i < cudagraphlaunch; ++i) {
cudaGraphLaunch(instance, stream);
CUDACHECK(cudaGraphLaunch(instance, stream));
}
CUDACHECK(cudaStreamSynchronize(stream));

Expand Down
Loading

0 comments on commit 9745fc3

Please sign in to comment.