diff --git a/.azure-pipelines/nccl-api-test.yaml b/.azure-pipelines/nccl-api-test.yaml index e3d537fe4..c9e2fcc1c 100644 --- a/.azure-pipelines/nccl-api-test.yaml +++ b/.azure-pipelines/nccl-api-test.yaml @@ -156,6 +156,24 @@ jobs: mpirun -np 8 --bind-to numa --allow-run-as-root -x LD_PRELOAD=/root/mscclpp/build/apps/nccl/libmscclpp_nccl.so -x NCCL_DEBUG=WARN -x MSCCLPP_EXECUTION_PLAN_DIR=/root/mscclpp/msccl-users/execution-files /root/nccl-tests/build/all_reduce_perf -b 1K -e 1G -f 2 -d half -G 20 -w 10 -n 20"' workingDirectory: '$(System.DefaultWorkingDirectory)' + - task: Bash@3 + name: RunNcclGatherTest + displayName: Run NCCL Allreduce Test + inputs: + targetType: 'inline' + script: | + set -e + HOSTFILE=$(System.DefaultWorkingDirectory)/mscclpp/test/deploy/hostfile_ci + ROOT_DIR=$(System.DefaultWorkingDirectory)/mscclpp + SSH_OPTION="StrictHostKeyChecking=no" + KeyFilePath=${SSHKEYFILE_SECUREFILEPATH} + parallel-ssh -i -t 0 -h ${HOSTFILE} -x "-i ${KeyFilePath}" \ + -O $SSH_OPTION 'sudo docker exec -t mscclpp-test bash -c "\ + cd /root/mscclpp; \ + mpirun -np 8 --bind-to numa --allow-run-as-root -x LD_PRELOAD=/root/mscclpp/build/apps/nccl/libmscclpp_nccl.so -x NCCL_DEBUG=WARN -x MSCCLPP_EXECUTION_PLAN_DIR=/root/mscclpp/msccl-users/execution-files /root/nccl-tests/build/all_gather_perf -b 1K -e 1G -f 2 -d half -G 20 -w 10 -n 20"' + workingDirectory: '$(System.DefaultWorkingDirectory)' + + - task: AzureCLI@2 name: StopVMSS displayName: Deallocate VMSS diff --git a/src/executor/execution_plan.cc b/src/executor/execution_plan.cc index 144fb4174..162c3ef1c 100644 --- a/src/executor/execution_plan.cc +++ b/src/executor/execution_plan.cc @@ -165,8 +165,9 @@ std::vector ExecutionPlan::Impl::getConnectedBufferTypes(int rank) c } return std::vector(bufferTypes.begin(), bufferTypes.end()); } + size_t ExecutionPlan::Impl::getScratchBufferSize(int rank, size_t inputSize, size_t outputSize) const { - size_t sizePerRank; + size_t sizePerRank = 0; if (this->inputChunks.at(rank) != 0) sizePerRank = inputSize / this->inputChunks.at(rank); else if (this->outputChunks.at(rank) != 0) @@ -179,6 +180,23 @@ size_t ExecutionPlan::Impl::getScratchBufferSize(int rank, size_t inputSize, siz } return sizePerRank * this->scratchChunks.at(rank); } + +size_t ExecutionPlan::Impl::getMaxScratchBufferSize(int rank) const { + if (this->maxMessageSize == std::numeric_limits::max()) { + return std::numeric_limits::max(); + } + size_t sizePerChunk = 0; + if (this->inputChunks.at(rank) != 0) + sizePerChunk = maxMessageSize / this->inputChunks.at(rank); + else if (this->outputChunks.at(rank) != 0) + sizePerChunk = maxMessageSize / this->outputChunks.at(rank); + else + throw mscclpp::Error("Output or Input chunks must be greater than 0", mscclpp::ErrorCode::ExecutorError); + + return this->getScratchBufferSize(rank, sizePerChunk * this->inputChunks.at(rank), + sizePerChunk * this->outputChunks.at(rank)); +} + std::vector ExecutionPlan::Impl::getOperations(int rank, int threadblock) const { return this->operations.at(rank)[threadblock]; } diff --git a/src/executor/executor.cc b/src/executor/executor.cc index 8a3d50c03..b8e8c6af3 100644 --- a/src/executor/executor.cc +++ b/src/executor/executor.cc @@ -140,8 +140,8 @@ struct Executor::Impl { ExecutionContext setupExecutionContext(int rank, void* sendbuff, void* recvbuff, size_t inputMessageSize, size_t outputMessageSize, size_t constSrcOffset, size_t constDstOffset, - size_t sendBufferSize, size_t recvBufferSize, const ExecutionPlan& plan) { - ExecutionContextKey key = {sendbuff, recvbuff, sendBufferSize, recvBufferSize, plan.impl_->name}; + size_t sendMemRange, size_t recvMemRange, const ExecutionPlan& plan) { + ExecutionContextKey key = {sendbuff, recvbuff, sendMemRange, recvMemRange, plan.impl_->name}; DeviceExecutionPlanKey devicePlanKey = {inputMessageSize, outputMessageSize, constSrcOffset, constDstOffset}; if (this->contexts.find(key) != this->contexts.end()) { auto& devicePlans = this->contexts[key].deviceExecutionPlans; @@ -167,7 +167,9 @@ struct Executor::Impl { plan.impl_->loadExecutionPlan(inputMessageSize, outputMessageSize, constSrcOffset, constDstOffset); ExecutionContext context; - size_t scratchBufferSize = plan.impl_->getScratchBufferSize(rank, sendBufferSize, recvBufferSize); + size_t maxScratchBufferSize = plan.impl_->getMaxScratchBufferSize(rank); + size_t scratchBufferSize = + std::min(plan.impl_->getScratchBufferSize(rank, sendMemRange, recvMemRange), maxScratchBufferSize); std::shared_ptr scratchBuffer; if (isNvlsSupported()) { scratchBuffer = allocSharedPhysicalCuda(scratchBufferSize); @@ -179,8 +181,8 @@ struct Executor::Impl { context.proxyService = std::make_shared(); context.nthreadsPerBlock = plan.impl_->getNThreadsPerBlock(); this->setupConnections(context, rank, plan); - this->setupRegisteredMemories(context, sendbuff, recvbuff, sendBufferSize, recvBufferSize, rank, plan); - this->setupChannels(context, sendbuff, recvbuff, sendBufferSize, recvBufferSize, rank, plan); + this->setupRegisteredMemories(context, sendbuff, recvbuff, sendMemRange, recvMemRange, rank, plan); + this->setupChannels(context, sendbuff, recvbuff, sendMemRange, recvMemRange, rank, plan); this->setupNvlsChannels(context, sendbuff, recvbuff, rank, plan); this->setupDeviceExecutionPlan(context, devicePlanKey, rank, plan); context.deviceExecutionPlansBuffers[devicePlanKey] = @@ -438,16 +440,16 @@ Executor::Executor(std::shared_ptr comm) : impl_(std::make_unique< void Executor::execute(int rank, void* sendbuff, void* recvbuff, size_t sendBuffSize, [[maybe_unused]] size_t recvBuffSize, DataType dataType, const ExecutionPlan& plan, cudaStream_t stream, PacketType packetType) { - size_t sendBytes, recvBytes; + size_t sendMemRange, recvMemRange; CUdeviceptr sendBasePtr, recvBasePtr; - MSCCLPP_CUTHROW(cuMemGetAddressRange(&sendBasePtr, &sendBytes, (CUdeviceptr)sendbuff)); - MSCCLPP_CUTHROW(cuMemGetAddressRange(&recvBasePtr, &recvBytes, (CUdeviceptr)recvbuff)); + MSCCLPP_CUTHROW(cuMemGetAddressRange(&sendBasePtr, &sendMemRange, (CUdeviceptr)sendbuff)); + MSCCLPP_CUTHROW(cuMemGetAddressRange(&recvBasePtr, &recvMemRange, (CUdeviceptr)recvbuff)); size_t offsetIn = (char*)sendbuff - (char*)sendBasePtr; size_t offsetOut = (char*)recvbuff - (char*)recvBasePtr; ExecutionContext context = this->impl_->setupExecutionContext(rank, (void*)sendBasePtr, (void*)recvBasePtr, sendBuffSize, recvBuffSize, - offsetIn, offsetOut, sendBytes, recvBytes, plan); + offsetIn, offsetOut, sendMemRange, recvMemRange, plan); this->impl_->launchKernel(context, rank, sendbuff, recvbuff, dataType, stream, packetType); } diff --git a/src/include/execution_plan.hpp b/src/include/execution_plan.hpp index 8d291f45f..3af585508 100644 --- a/src/include/execution_plan.hpp +++ b/src/include/execution_plan.hpp @@ -73,6 +73,7 @@ struct ExecutionPlan::Impl { std::vector getConnectedPeers(int rank) const; std::vector getConnectedBufferTypes(int rank) const; size_t getScratchBufferSize(int rank, size_t inputSize, size_t outputSize) const; + size_t getMaxScratchBufferSize(int rank) const; std::vector getOperations(int rank, int threadblock) const; int getThreadblockCount(int rank) const; int getNThreadsPerBlock() const;