提交 a7e072f3 编写于 作者: M Megvii Engine Team

Imported from upstream.

GitOrigin-RevId: a990ae4f2618215d3c4d0ca1c949619e5c26327b
上级
---
Language: Cpp
BasedOnStyle: LLVM
AccessModifierOffset: -4
AlignAfterOpenBracket: Align
AlignConsecutiveAssignments: false
AlignConsecutiveDeclarations: false
AlignEscapedNewlinesLeft: true
AlignOperands: true
AlignTrailingComments: true
AllowAllParametersOfDeclarationOnNextLine: true
AllowShortBlocksOnASingleLine: false
AllowShortCaseLabelsOnASingleLine: false
AllowShortFunctionsOnASingleLine: Inline
AllowShortIfStatementsOnASingleLine: false
AllowShortLoopsOnASingleLine: false
AlwaysBreakAfterDefinitionReturnType: None
AlwaysBreakAfterReturnType: None
AlwaysBreakBeforeMultilineStrings: true
AlwaysBreakTemplateDeclarations: true
BinPackArguments: true
BinPackParameters: true
BraceWrapping:
AfterClass: false
AfterControlStatement: false
AfterEnum: false
AfterFunction: false
AfterNamespace: false
AfterObjCDeclaration: false
AfterStruct: false
AfterUnion: false
BeforeCatch: false
BeforeElse: false
IndentBraces: false
BreakBeforeBinaryOperators: None
BreakBeforeBraces: Attach
BreakBeforeTernaryOperators: true
BreakConstructorInitializersBeforeComma: false
BreakAfterJavaFieldAnnotations: false
BreakStringLiterals: true
ColumnLimit: 80
CommentPragmas: '^ IWYU pragma:'
CompactNamespaces: false
ConstructorInitializerAllOnOneLineOrOnePerLine: true
ConstructorInitializerIndentWidth: 8
ContinuationIndentWidth: 8
Cpp11BracedListStyle: true
DerivePointerAlignment: false
DisableFormat: false
ExperimentalAutoDetectBinPacking: false
ForEachMacros: [ foreach, Q_FOREACH, BOOST_FOREACH ]
IncludeCategories:
- Regex: '^<.*\.h>'
Priority: 1
- Regex: '^<.*'
Priority: 2
- Regex: '.*'
Priority: 3
IncludeIsMainRegex: '([-_](test|unittest))?$'
IndentCaseLabels: true
IndentWidth: 4
IndentWrappedFunctionNames: false
JavaScriptQuotes: Leave
JavaScriptWrapImports: true
KeepEmptyLinesAtTheStartOfBlocks: false
MacroBlockBegin: ''
MacroBlockEnd: ''
MaxEmptyLinesToKeep: 1
NamespaceIndentation: None
ObjCBlockIndentWidth: 2
ObjCSpaceAfterProperty: false
ObjCSpaceBeforeProtocolList: false
PenaltyBreakBeforeFirstCallParameter: 1
PenaltyBreakComment: 300
PenaltyBreakFirstLessLess: 120
PenaltyBreakString: 1000
PenaltyExcessCharacter: 1000000
PenaltyReturnTypeOnItsOwnLine: 200
PointerAlignment: Left
ReflowComments: true
SortIncludes: true
SpaceAfterCStyleCast: false
SpaceBeforeAssignmentOperators: true
SpaceBeforeParens: ControlStatements
SpaceInEmptyParentheses: false
SpacesBeforeTrailingComments: 2
SpacesInAngles: false
SpacesInContainerLiterals: true
SpacesInCStyleCastParentheses: false
SpacesInParentheses: false
SpacesInSquareBrackets: false
Standard: Auto
TabWidth: 8
UseTab: Never
...
build
.vscode
cmake_minimum_required(VERSION 3.9.0)
project(MegRay VERSION 0.1.0)
set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF)
set(CMAKE_POSITION_INDEPENDENT_CODE ON)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall")
if(NOT MSVC)
set(CMAKE_CXX_ARCHIVE_CREATE "<CMAKE_AR> Dqc <TARGET> <LINK_FLAGS> <OBJECTS>")
set(CMAKE_CXX_ARCHIVE_APPEND "<CMAKE_AR> Dq <TARGET> <LINK_FLAGS> <OBJECTS>")
set(CMAKE_CXX_ARCHIVE_FINISH "<CMAKE_RANLIB> -D <TARGET>")
endif()
include(CheckLanguage)
check_language(CUDA)
if(NOT CMAKE_CUDA_COMPILER)
message(FATAL_ERROR "CUDA compiler not found in PATH")
endif()
enable_language(CUDA)
set(CMAKE_CUDA_STANDARD 14)
set(CMAKE_CUDA_STANDARD_REQUIRED ON)
set(THREADS_PREFER_PTHREAD_FLAG ON)
find_package(Threads)
if(${CMAKE_THREAD_LIBS_INIT} STREQUAL "-pthread" AND MGE_WITH_CUDA)
set_property(TARGET Threads::Threads
PROPERTY INTERFACE_COMPILE_OPTIONS "$<$<COMPILE_LANGUAGE:CUDA>:-Xcompiler=-pthread>"
"$<$<NOT:$<COMPILE_LANGUAGE:CUDA>>:-pthread>")
endif()
include_directories(${CMAKE_CUDA_TOOLKIT_INCLUDE_DIRECTORIES})
link_directories(${CMAKE_CUDA_HOST_IMPLICIT_LINK_DIRECTORIES})
set(CMAKE_CUDA_FLAGS_DEBUG "-O0 -g")
set(CMAKE_CUDA_FLAGS_RELEASE "-O3")
set(CMAKE_CUDA_FLAGS_RELWITHDEBINFO "-O3 -g")
set(CMAKE_CUDA_FLAGS_MINSIZEREL "-Os")
set(CMAKE_CUDA_FLAGS "-Xcompiler -Wall,-Wextra,-Werror -Xfatbin -compress-all")
set(CMAKE_CUDA_HOST_COMPILER $(CMAKE_CXX_COMPILER))
find_program(CCACHE_BIN ccache)
if(CCACHE_BIN)
set(CMAKE_CXX_COMPILER_LAUNCHER ${CCACHE_BIN})
if(MGE_WITH_CUDA AND NOT ${CMAKE_VERSION} VERSION_LESS "3.10.0")
message("-- Using ccache as CMAKE_CUDA_COMPILER_LAUNCHER")
set(CMAKE_CUDA_COMPILER_LAUNCHER ${CCACHE_BIN})
endif()
endif()
option(MEGRAY_CUDA_USE_STATIC "Enable MegEngine CUDA static linking." ON)
set(MEGRAY_CUDA_GENCODE "" CACHE STRING "Overwrite -gencode specifications for CUDA")
set(CMAKE_CUDA_FLAGS "${CMAKE_CUDA_FLAGS} -DMEGDNN_THREADS_512=0")
if(${CMAKE_CUDA_COMPILER_VERSION} VERSION_GREATER "10.0.0" OR ${CMAKE_CUDA_COMPILER_VERSION} VERSION_EQUAL "10.0.0")
set(MEGRAY_CUDA_GENCODE "${MEGRAY_CUDA_GENCODE} -gencode arch=compute_52,code=sm_52")
set(MEGRAY_CUDA_GENCODE "${MEGRAY_CUDA_GENCODE} -gencode arch=compute_60,code=sm_60")
set(MEGRAY_CUDA_GENCODE "${MEGRAY_CUDA_GENCODE} -gencode arch=compute_61,code=sm_61")
set(MEGRAY_CUDA_GENCODE "${MEGRAY_CUDA_GENCODE} -gencode arch=compute_70,code=sm_70")
set(MEGRAY_CUDA_GENCODE "${MEGRAY_CUDA_GENCODE} -gencode arch=compute_75,code=sm_75")
set(MEGRAY_CUDA_GENCODE "${MEGRAY_CUDA_GENCODE} -gencode arch=compute_75,code=compute_75")
elseif(${CMAKE_CUDA_COMPILER_VERSION} VERSION_GREATER "9.0.0" OR ${CMAKE_CUDA_COMPILER_VERSION} VERSION_EQUAL "9.0.0")
set(MEGRAY_CUDA_GENCODE "${MEGRAY_CUDA_GENCODE} -gencode arch=compute_52,code=sm_52")
set(MEGRAY_CUDA_GENCODE "${MEGRAY_CUDA_GENCODE} -gencode arch=compute_60,code=sm_60")
set(MEGRAY_CUDA_GENCODE "${MEGRAY_CUDA_GENCODE} -gencode arch=compute_61,code=sm_61")
set(MEGRAY_CUDA_GENCODE "${MEGRAY_CUDA_GENCODE} -gencode arch=compute_70,code=sm_70")
set(MEGRAY_CUDA_GENCODE "${MEGRAY_CUDA_GENCODE} -gencode arch=compute_70,code=compute_70")
else()
set(MEGRAY_CUDA_GENCODE "${MEGRAY_CUDA_GENCODE} -gencode arch=compute_35,code=sm_35")
set(MEGRAY_CUDA_GENCODE "${MEGRAY_CUDA_GENCODE} -gencode arch=compute_52,code=sm_52")
set(MEGRAY_CUDA_GENCODE "${MEGRAY_CUDA_GENCODE} -gencode arch=compute_60,code=sm_60")
set(MEGRAY_CUDA_GENCODE "${MEGRAY_CUDA_GENCODE} -gencode arch=compute_61,code=sm_61")
set(MEGRAY_CUDA_GENCODE "${MEGRAY_CUDA_GENCODE} -gencode arch=compute_61,code=compute_61")
endif()
if(MEGRAY_CUDA_USE_STATIC)
list(APPEND CUDA_LIBS cudart_static)
else()
list(APPEND CUDA_LIBS cudart)
endif()
include(cmake/ucx.cmake)
include(cmake/nccl.cmake)
option(MEGRAY_TEST "Enable test for MegRay." OFF)
if(MEGRAY_TEST)
include(cmake/gtest.cmake)
add_subdirectory(test)
endif()
include_directories("${CMAKE_CURRRENT_BINARY_DIR}/src")
file(GLOB_RECURSE SRC_FILES "src/*.cpp")
file(GLOB_RECURSE CUDA_SRC_FILES "src/*.cu")
add_library(megray STATIC ${SRC_FILES} ${CUDA_SRC_FILES})
target_link_libraries(megray libucx libnccl ${CUDA_LIBS})
target_include_directories(megray INTERFACE src)
target_compile_options(megray PRIVATE
"$<$<COMPILE_LANGUAGE:CUDA>:-Xcompiler=-Wno-unused-parameter>"
"$<$<NOT:$<COMPILE_LANGUAGE:CUDA>>:-Wno-unused-parameter>"
"$<$<COMPILE_LANGUAGE:CUDA>:-Xcompiler=-Wno-unused-varible>"
"$<$<NOT:$<COMPILE_LANGUAGE:CUDA>>:-Wno-unused-variable>"
"$<$<COMPILE_LANGUAGE:CUDA>:-Xcompiler=-Wno-unused-result>"
"$<$<NOT:$<COMPILE_LANGUAGE:CUDA>>:-Wno-unused-result>"
"$<$<COMPILE_LANGUAGE:CUDA>:-Xcompiler=-Wno-unused-but-set-variable>"
"$<$<NOT:$<COMPILE_LANGUAGE:CUDA>>:-Wno-unused-but-set-variable>"
)
# MegRay
MegRay is a cross-platform communication library providing point-to-point and collective communication methods, such as send, recv, all\_gather, all\_reduce, reduce\_scatter, reduce and broadcast. In the area of deep learning, these methods can be utilized for implementing distributed training framework, including data parallel and model parallel. Currently there are two backends, nccl and ucx, and only cuda platform is supported. In the future, algorithms on more platforms will be added.
## Build
0. prepare third party repositories.
```
./third_party/prepare.sh
```
1. Make a directory for build.
```
mkdir build
cd build
```
2. Generate build configurations by `CMake`.
```
cmake .. -DMEGRAY_TEST=ON
```
3. Start to build
```
make
```
add_subdirectory(${PROJECT_SOURCE_DIR}/third_party/gtest ${CMAKE_CURRENT_BINARY_DIR}/gtest EXCLUDE_FROM_ALL)
set(NCCL_DIR ${PROJECT_SOURCE_DIR}/third_party/nccl CACHE STRING "nccl directory")
set(NCCL_BUILD_DIR ${PROJECT_BINARY_DIR}/third_party/nccl/build)
set(NCCL_LIBS ${NCCL_BUILD_DIR}/lib/libnccl_static.a)
get_filename_component(CUDA_HOME ${CMAKE_CUDA_COMPILER} DIRECTORY)
get_filename_component(CUDA_HOME ${CUDA_HOME} DIRECTORY)
if(${CMAKE_GENERATOR} STREQUAL "Ninja")
set(MAKE_COMMAND make)
else()
set(MAKE_COMMAND "$(MAKE)")
endif()
add_custom_command(
OUTPUT ${NCCL_LIBS}
COMMAND ${MAKE_COMMAND} src.build NVCC_GENCODE=${MEGRAY_CUDA_GENCODE} BUILDDIR=${NCCL_BUILD_DIR} CUDA_HOME=${CUDA_HOME}
WORKING_DIRECTORY ${NCCL_DIR}
VERBATIM
)
file(MAKE_DIRECTORY ${NCCL_BUILD_DIR}/include)
add_custom_target(nccl DEPENDS ${NCCL_LIBS})
add_library(libnccl STATIC IMPORTED GLOBAL)
add_dependencies(libnccl nccl)
set_target_properties(
libnccl PROPERTIES
IMPORTED_LOCATION ${NCCL_LIBS}
INTERFACE_INCLUDE_DIRECTORIES ${NCCL_BUILD_DIR}/include
)
include(ExternalProject)
option(UCX_WITH_GDRCOPY "Build ucx with gdrcopy" OFF)
if(${CMAKE_GENERATOR} STREQUAL "Ninja")
set(MAKE_COMMAND make)
else()
set(MAKE_COMMAND "$(MAKE)")
endif()
get_filename_component(CUDA_HOME ${CMAKE_CUDA_COMPILER} DIRECTORY)
get_filename_component(CUDA_HOME ${CUDA_HOME} DIRECTORY)
if(UCX_WITH_GDRCOPY)
set(GDRCOPY_DIR ${PROJECT_SOURCE_DIR}/third_party/gdrcopy)
set(GDRCOPY_BUILD_DIR ${PROJECT_BINARY_DIR}/third_party/gdrcopy)
set(GDRCOPY_LIB ${GDRCOPY_BUILD_DIR}/lib64/libgdrapi.so)
ExternalProject_add(
gdrcopy
SOURCE_DIR ${GDRCOPY_DIR}
PREFIX ${GDRCOPY_BUILD_DIR}
CONFIGURE_COMMAND ""
BUILD_COMMAND ${MAKE_COMMAND} -C ${GDRCOPY_DIR} lib CUDA_HOME=${CUDA_HOME}
INSTALL_COMMAND ${MAKE_COMMAND} -C ${GDRCOPY_DIR} lib_install PREFIX="" DESTDIR=${GDRCOPY_BUILD_DIR}
BUILD_BYPRODUCTS ${GDRCOPY_LIB}
)
ExternalProject_Add_Step(
gdrcopy
clean
COMMAND make clean
WORKING_DIRECTORY <SOURCE_DIR>
DEPENDEES install
)
set(GDRCOPY_INC ${GDRCOPY_BUILD_DIR}/include)
file(MAKE_DIRECTORY ${GDRCOPY_INC})
add_library(libgdrcopy SHARED IMPORTED GLOBAL)
add_dependencies(libgdrcopy gdrcopy)
set_target_properties(
libgdrcopy PROPERTIES
IMPORTED_LOCATION ${GDRCOPY_LIB}
INTERFACE_INCLUDE_DIRECTORIES ${GDRCOPY_INC}
)
list(APPEND UCX_CONFIGURE_ARGS --with-gdrcopy=${GDRCOPY_BUILD_DIR})
endif()
set(UCX_DIR ${PROJECT_SOURCE_DIR}/third_party/ucx)
set(UCX_BUILD_DIR ${PROJECT_BINARY_DIR}/third_party/ucx)
file(STRINGS ${UCX_DIR}/configure.ac UCX_SO_LINE REGEX "\\[libucx_so_version\\]")
string(REGEX MATCH "([0-9]+):([0-9]+):([0-9]+)" _ ${UCX_SO_LINE})
set(UCX_SO_VER_MAJOR ${CMAKE_MATCH_1})
set(UCX_SO_VER_MINOR ${CMAKE_MATCH_2})
set(UCX_SO_VER_PATCH ${CMAKE_MATCH_3})
set(UCP_LIB ${UCX_BUILD_DIR}/lib/libucp.so.${UCX_SO_VER_MAJOR})
set(UCT_LIB ${UCX_BUILD_DIR}/lib/libuct.so.${UCX_SO_VER_MAJOR})
set(UCS_LIB ${UCX_BUILD_DIR}/lib/libucs.so.${UCX_SO_VER_MAJOR})
set(UCM_LIB ${UCX_BUILD_DIR}/lib/libucm.so.${UCX_SO_VER_MAJOR})
ExternalProject_add(
ucx
SOURCE_DIR ${UCX_DIR}
PREFIX ${UCX_BUILD_DIR}
CONFIGURE_COMMAND ${UCX_DIR}/configure --enable-mt --with-pic --disable-static
--with-cuda --disable-numa --prefix=${UCX_BUILD_DIR} ${UCX_CONFIGURE_ARGS}
BUILD_COMMAND ${MAKE_COMMAND} all
INSTALL_COMMAND ${MAKE_COMMAND} install
BUILD_BYPRODUCTS ${UCP_LIB} ${UCT_LIB} ${UCS_LIB} ${UCM_LIB}
)
ExternalProject_Add_Step(
ucx
autogen
COMMAND ./autogen.sh
WORKING_DIRECTORY <SOURCE_DIR>
DEPENDERS configure
)
if(UCX_WITH_GDRCOPY)
add_dependencies(ucx gdrcopy)
endif()
set(UCX_INC ${UCX_BUILD_DIR}/include)
file(MAKE_DIRECTORY ${UCX_INC})
add_library(libucp SHARED IMPORTED GLOBAL)
add_dependencies(libucp ucx)
set_target_properties(
libucp PROPERTIES
IMPORTED_LOCATION ${UCP_LIB}
)
add_library(libuct SHARED IMPORTED GLOBAL)
add_dependencies(libuct ucx)
set_target_properties(
libuct PROPERTIES
IMPORTED_LOCATION ${UCT_LIB}
)
add_library(libucs SHARED IMPORTED GLOBAL)
add_dependencies(libucs ucx)
set_target_properties(
libucs PROPERTIES
IMPORTED_LOCATION ${UCS_LIB}
)
add_library(libucm SHARED IMPORTED GLOBAL)
add_dependencies(libucm ucx)
set_target_properties(
libucm PROPERTIES
IMPORTED_LOCATION ${UCM_LIB}
)
add_library(libucx INTERFACE)
target_link_libraries(libucx INTERFACE libucp libuct libucs libucm)
if(UCX_WITH_GDRCOPY)
target_link_libraries(libucx INTERFACE libgdrcopy)
endif()
target_include_directories(libucx INTERFACE ${UCX_INC})
/**
* \file src/common.cpp
* MegRay is Licensed under the Apache License, Version 2.0 (the "License")
*
* Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
#include "common.h"
#include <cstdint>
namespace MegRay {
size_t get_dtype_size(DType dtype) {
switch (dtype) {
case MEGRAY_INT8:
case MEGRAY_UINT8:
return 1;
case MEGRAY_FLOAT16:
return 2;
case MEGRAY_INT32:
case MEGRAY_UINT32:
case MEGRAY_FLOAT32:
return 4;
case MEGRAY_INT64:
case MEGRAY_UINT64:
case MEGRAY_FLOAT64:
return 8;
default:
MEGRAY_THROW("unknown dtype");
}
}
} // namespace MegRay
/**
* \file src/common.h
* MegRay is Licensed under the Apache License, Version 2.0 (the "License")
*
* Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
#pragma once
#include "cuda_runtime.h"
#include "debug.h"
namespace MegRay {
typedef enum {
MEGRAY_OK = 0,
MEGRAY_CUDA_ERR = 1,
MEGRAY_NCCL_ERR = 2,
MEGRAY_UCX_ERR = 3,
MEGRAY_NOT_IMPLEMENTED = 4
} Status;
#define MEGRAY_CHECK(expr) \
do { \
Status status = (expr); \
if (status != MEGRAY_OK) { \
MEGRAY_ERROR("error [%d]", status); \
return status; \
} \
} while (0)
#define CUDA_CHECK(expr) \
do { \
cudaError_t status = (expr); \
if (status != cudaSuccess) { \
MEGRAY_ERROR("cuda error [%d]: %s", status, \
cudaGetErrorString(status)); \
return MEGRAY_CUDA_ERR; \
} \
} while (0)
#define CUDA_ASSERT(expr) \
do { \
cudaError_t status = (expr); \
if (status != cudaSuccess) { \
MEGRAY_ERROR("cuda error [%d]: %s", status, \
cudaGetErrorString(status)); \
MEGRAY_THROW("cuda error"); \
} \
} while (0)
typedef enum {
MEGRAY_NCCL = 0,
MEGRAY_UCX = 1,
} Backend;
typedef enum {
MEGRAY_INT8 = 0,
MEGRAY_UINT8 = 1,
MEGRAY_INT32 = 2,
MEGRAY_UINT32 = 3,
MEGRAY_INT64 = 4,
MEGRAY_UINT64 = 5,
MEGRAY_FLOAT16 = 6,
MEGRAY_FLOAT32 = 7,
MEGRAY_FLOAT64 = 8
} DType;
size_t get_dtype_size(DType dtype);
typedef enum {
MEGRAY_SUM = 0,
MEGRAY_MAX = 1,
MEGRAY_MIN = 2
} ReduceOp;
} // namespace MegRay
/**
* \file src/communicator.cpp
* MegRay is Licensed under the Apache License, Version 2.0 (the "License")
*
* Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
#include "communicator.h"
#include "nccl/communicator.h"
#include "ucx/communicator.h"
namespace MegRay {
std::shared_ptr<Communicator> get_communicator(uint32_t nranks, uint32_t rank, Backend backend) {
std::shared_ptr<Communicator> comm;
switch (backend) {
case MEGRAY_NCCL:
comm = std::make_shared<NcclCommunicator>(nranks, rank);
break;
case MEGRAY_UCX:
comm = std::make_shared<UcxCommunicator>(nranks, rank);
break;
default:
MEGRAY_THROW("unknown backend");
}
return comm;
}
} // namespace MegRay
/**
* \file src/communicator.h
* MegRay is Licensed under the Apache License, Version 2.0 (the "License")
*
* Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
#pragma once
#include <memory>
#include <string>
#include <vector>
#include "common.h"
#include "context.h"
namespace MegRay {
/*!
* abstract class of MegRay main interface
* MegRay communicator corresponds to a nccl communicator or a ucp worker
* providing send/recv and collective communication methods
*/
class Communicator {
public:
// construct a MegRay communicator with the rank of this process
// and the number of all ranks
Communicator(uint32_t nranks, uint32_t rank) : m_nranks(nranks), m_rank(rank) {}
// get the number of all ranks
uint32_t nranks() { return m_nranks; }
// get the rank of this process
uint32_t rank() { return m_rank; }
// get the unique id of this communicator
virtual std::string get_uid() = 0;
// build a group with unique ids of all communicators in the group
virtual Status init(const std::vector<std::string>& uids) = 0;
// send data to another communicator in the group
virtual Status send(const void* sendbuff, size_t len, uint32_t rank,
std::shared_ptr<Context> ctx) = 0;
// receive data from another communicator in the group
virtual Status recv(void* recvbuf, size_t len, uint32_t rank,
std::shared_ptr<Context> ctx) = 0;
// the length of sendbuff = sendlen
// the length of recvbuff = sendlen * m_nranks
virtual Status all_gather(const void* sendbuff, void* recvbuff, size_t sendlen,
DType dtype, std::shared_ptr<Context> ctx) = 0;
// the length of sendbuff = the length of recvbuff = len
virtual Status all_reduce(const void* sendbuff, void* recvbuff, size_t len,
DType dtype, ReduceOp op, std::shared_ptr<Context> ctx) = 0;
// the length of sendbuff = recvlen * m_nranks
// the length of recvbuff = recvlen
virtual Status reduce_scatter(const void* sendbuff, void* recvbuff, size_t recvlen,
DType dtype, ReduceOp op, std::shared_ptr<Context> ctx) = 0;
// the length of sendbuff = the length of recvbuff = len
virtual Status broadcast(const void* sendbuff, void* recvbuff, size_t len,
DType dtype, uint32_t root, std::shared_ptr<Context> ctx) = 0;
// the length of sendbuff = the length of recvbuff = len
virtual Status reduce(const void* sendbuff, void* recvbuff, size_t len,
DType dtype, ReduceOp op, uint32_t root, std::shared_ptr<Context> ctx) = 0;
protected:
uint32_t m_nranks;
uint32_t m_rank;
};
/*!
* get a communicator implemented with nccl or ucx
* return std::shared_ptr<NcclCommunicator> or std::shared_ptr<UcxCommunicator>
*/
std::shared_ptr<Communicator> get_communicator(uint32_t nranks, uint32_t rank, Backend backend);
} // namespace MegRay
/**
* \file src/context.h
* MegRay is Licensed under the Apache License, Version 2.0 (the "License")
*
* Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
#pragma once
#include "cuda_runtime.h"
namespace MegRay {
typedef enum {
MEGRAY_CTX_DEFAULT = 0,
MEGRAY_CTX_CUDA = 1
} ContextType;
/*!
* MegRay context is an abstraction of communication contexts (e.g. cuda stream)
* on different platforms, a context should be passed as a parameter when
* a communicator operation is called
*/
class Context {
public:
Context() = default;
virtual ContextType type() const {
return MEGRAY_CTX_DEFAULT;
}
static std::shared_ptr<Context> make() {
return std::make_shared<Context>();
}
};
/*!
* CudaContext is a wrapper of cuda stream
*/
class CudaContext : public Context {
public:
CudaContext() = delete;
CudaContext(cudaStream_t stream) : m_stream(stream) {}
ContextType type() const override {
return MEGRAY_CTX_CUDA;
}
static std::shared_ptr<CudaContext> make(cudaStream_t stream) {
return std::make_shared<CudaContext>(stream);
}
cudaStream_t get_stream() const { return m_stream; }
private:
cudaStream_t m_stream;
};
} // namespace MegRay
/**
* \file src/debug.cpp
* MegRay is Licensed under the Apache License, Version 2.0 (the "License")
*
* Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
#include "debug.h"
#include <cstdarg>
namespace MegRay {
void MEGRAY_LOG(const char* level, const char* file, int line, const char* fmt, ...) {
va_list ap;
va_start(ap, fmt);
fprintf(stderr, "[%s]\t%s:%d, ", level, file, line);
vfprintf(stderr, fmt, ap);
fprintf(stderr, "\n");
va_end(ap);
}
} // namespace MegRay
/**
* \file src/debug.h
* MegRay is Licensed under the Apache License, Version 2.0 (the "License")
*
* Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
#pragma once
#include <cstdio>
#include <stdexcept>
#include <string>
namespace MegRay {
typedef enum {
DEBUG = 0,
INFO = 1,
WARN = 2,
ERROR = 3
} LogLevel;
#ifndef MEGRAY_LOG_LEVEL
#define MEGRAY_LOG_LEVEL 2
#endif
void MEGRAY_LOG(const char* level, const char* file, int line, const char* fmt, ...);
#define MEGRAY_DEBUG(fmt...) \
do { \
if (MegRay::LogLevel::DEBUG >= MEGRAY_LOG_LEVEL) { \
MegRay::MEGRAY_LOG("DEBUG", __FILE__, __LINE__, fmt); \
} \
} while (0)
#define MEGRAY_INFO(fmt...) \
do { \
if (MegRay::LogLevel::INFO >= MEGRAY_LOG_LEVEL) { \
MegRay::MEGRAY_LOG("INFO", __FILE__, __LINE__, fmt); \
} \
} while (0)
#define MEGRAY_WARN(fmt...) \
do { \
if (MegRay::LogLevel::WARN >= MEGRAY_LOG_LEVEL) { \
MegRay::MEGRAY_LOG("WARN", __FILE__, __LINE__, fmt); \
} \
} while (0)
#define MEGRAY_ERROR(fmt...) \
do { \
if (MegRay::LogLevel::ERROR >= MEGRAY_LOG_LEVEL) { \
MegRay::MEGRAY_LOG("ERROR", __FILE__, __LINE__, fmt); \
} \
} while (0)
class Exception : public std::runtime_error {
public:
Exception() = default;
explicit Exception(const std::string& msg) : std::runtime_error(msg) {}
};
#define MEGRAY_THROW(message) throw MegRay::Exception(message)
#define MEGRAY_ASSERT(expr, fmt...) \
do { \
if (!(expr)) { \
MEGRAY_ERROR("assertion failed: %s", #expr); \
MEGRAY_ERROR(fmt); \
MEGRAY_THROW("assertion failed"); \
} \
} while (0)
} // namespace MegRay
/**
* \file src/megray.h
* MegRay is Licensed under the Apache License, Version 2.0 (the "License")
*
* Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
#pragma once
#include "communicator.h"
/**
* \file src/nccl/communicator.cpp
* MegRay is Licensed under the Apache License, Version 2.0 (the "License")
*
* Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
#include "communicator.h"
#include <string.h>
#include "utils.h"
namespace MegRay {
NcclCommunicator::NcclCommunicator(int nranks, int rank) :
Communicator(nranks, rank), m_inited(false) {
NCCL_ASSERT(ncclGetUniqueId(&m_uid));
}
NcclCommunicator::~NcclCommunicator() {
if (m_inited) {
ncclCommDestroy(m_comm);
}
}
std::string NcclCommunicator::get_uid() {
// serialize ncclUniqueId into a string
return std::string(m_uid.internal, NCCL_UNIQUE_ID_BYTES);
}
Status NcclCommunicator::init(const std::vector<std::string>& uids) {
MEGRAY_ASSERT(uids.size() == m_nranks, "incorrect size of uids");
// only use unique id of rank 0 for initialization
const std::string uid = uids[0];
MEGRAY_ASSERT(uid.size() == NCCL_UNIQUE_ID_BYTES, "invalid uid");
memcpy(m_uid.internal, uid.data(), NCCL_UNIQUE_ID_BYTES);
// initialize nccl communicator
NCCL_CHECK(ncclCommInitRank(&m_comm, m_nranks, m_uid, m_rank));
m_inited = true;
return MEGRAY_OK;
}
Status NcclCommunicator::send(const void* sendbuff, size_t len, uint32_t rank,
std::shared_ptr<Context> ctx) {
// derived from base class, not implemented
MEGRAY_THROW("not implemented");
return MEGRAY_NOT_IMPLEMENTED;
}
Status NcclCommunicator::recv(void* recvbuf, size_t len, uint32_t rank,
std::shared_ptr<Context> ctx) {
// derived from base class, not implemented
MEGRAY_THROW("not implemented");
return MEGRAY_NOT_IMPLEMENTED;
}
Status NcclCommunicator::all_gather(const void* sendbuff, void* recvbuff, size_t sendlen,
DType dtype, std::shared_ptr<Context> ctx) {
// check context type and get cuda stream
MEGRAY_ASSERT(ctx->type() == MEGRAY_CTX_CUDA, "only cuda context supported");
cudaStream_t stream = static_cast<CudaContext*>(ctx.get())->get_stream();
// perform all gather synchronously
NCCL_CHECK(ncclAllGather(sendbuff, recvbuff, sendlen, get_nccl_dtype(dtype),
m_comm, stream));
CUDA_CHECK(cudaStreamSynchronize(stream));
return MEGRAY_OK;
}
Status NcclCommunicator::all_reduce(const void* sendbuff, void* recvbuff, size_t len,
DType dtype, ReduceOp op, std::shared_ptr<Context> ctx) {
// check context type and get cuda stream
MEGRAY_ASSERT(ctx->type() == MEGRAY_CTX_CUDA, "only cuda context supported");
cudaStream_t stream = static_cast<CudaContext*>(ctx.get())->get_stream();
// perform all reduce synchronously
NCCL_CHECK(ncclAllReduce(sendbuff, recvbuff, len, get_nccl_dtype(dtype),
get_nccl_reduce_op(op), m_comm, stream));
CUDA_CHECK(cudaStreamSynchronize(stream));
return MEGRAY_OK;
}
Status NcclCommunicator::reduce_scatter(const void* sendbuff, void* recvbuff, size_t recvlen,
DType dtype, ReduceOp op, std::shared_ptr<Context> ctx) {
// check context type and get cuda stream
MEGRAY_ASSERT(ctx->type() == MEGRAY_CTX_CUDA, "only cuda context supported");
cudaStream_t stream = static_cast<CudaContext*>(ctx.get())->get_stream();
// perform reduce scatter synchronously
NCCL_CHECK(ncclReduceScatter(sendbuff, recvbuff, recvlen, get_nccl_dtype(dtype),
get_nccl_reduce_op(op), m_comm, stream));
CUDA_CHECK(cudaStreamSynchronize(stream));
return MEGRAY_OK;
}
Status NcclCommunicator::broadcast(const void* sendbuff, void* recvbuff, size_t len,
DType dtype, uint32_t root, std::shared_ptr<Context> ctx) {
// check context type and get cuda stream
MEGRAY_ASSERT(ctx->type() == MEGRAY_CTX_CUDA, "only cuda context supported");
cudaStream_t stream = static_cast<CudaContext*>(ctx.get())->get_stream();
// perform broadcast synchronously
NCCL_CHECK(ncclBroadcast(sendbuff, recvbuff, len, get_nccl_dtype(dtype), root,
m_comm, stream));
CUDA_CHECK(cudaStreamSynchronize(stream));
return MEGRAY_OK;
}
Status NcclCommunicator::reduce(const void* sendbuff, void* recvbuff, size_t len,
DType dtype, ReduceOp op, uint32_t root, std::shared_ptr<Context> ctx) {
// check context type and get cuda stream
MEGRAY_ASSERT(ctx->type() == MEGRAY_CTX_CUDA, "only cuda context supported");
cudaStream_t stream = static_cast<CudaContext*>(ctx.get())->get_stream();
// perform reduce synchronously
NCCL_CHECK(ncclReduce(sendbuff, recvbuff, len, get_nccl_dtype(dtype),
get_nccl_reduce_op(op), root, m_comm, stream));
CUDA_CHECK(cudaStreamSynchronize(stream));
return MEGRAY_OK;
}
} // namespace MegRay
/**
* \file src/nccl/communicator.h
* MegRay is Licensed under the Apache License, Version 2.0 (the "License")
*
* Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
#pragma once
#include <string>
#include "nccl.h"
#include "../communicator.h"
namespace MegRay {
/*!
* a wrapper of ncclComm_t with MegRay interface
* collective communications are performed synchronously
*/
class NcclCommunicator : public Communicator {
public:
NcclCommunicator(int nranks, int rank);
~NcclCommunicator();
// get a serialized string of ncclUniqueId
std::string get_uid() override;
Status init(const std::vector<std::string>& uids) override;
// not implemented, use ucx communicator instead
Status send(const void* sendbuff, size_t len, uint32_t rank,
std::shared_ptr<Context> ctx) override;
// not implemented, use ucx communicator instead
Status recv(void* recvbuf, size_t len, uint32_t rank,
std::shared_ptr<Context> ctx) override;
Status all_gather(const void* sendbuff, void* recvbuff, size_t sendlen,
DType dtype, std::shared_ptr<Context> ctx) override;
Status all_reduce(const void* sendbuff, void* recvbuff, size_t len,
DType dtype, ReduceOp op, std::shared_ptr<Context> ctx) override;
Status reduce_scatter(const void* sendbuff, void* recvbuff, size_t recvlen,
DType dtype, ReduceOp op, std::shared_ptr<Context> ctx) override;
Status broadcast(const void* sendbuff, void* recvbuff, size_t len,
DType dtype, uint32_t root, std::shared_ptr<Context> ctx) override;
Status reduce(const void* sendbuff, void* recvbuff, size_t len,
DType dtype, ReduceOp op, uint32_t root, std::shared_ptr<Context> ctx) override;
private:
ncclUniqueId m_uid;
ncclComm_t m_comm;
bool m_inited;
};
} // namespace MegRay
/**
* \file src/nccl/utils.cpp
* MegRay is Licensed under the Apache License, Version 2.0 (the "License")
*
* Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
#include "utils.h"
namespace MegRay {
ncclDataType_t get_nccl_dtype(const DType dtype) {
switch (dtype) {
case MEGRAY_INT8:
return ncclInt8;
case MEGRAY_UINT8:
return ncclUint8;
case MEGRAY_INT32:
return ncclInt32;
case MEGRAY_UINT32:
return ncclUint32;
case MEGRAY_INT64:
return ncclInt64;
case MEGRAY_UINT64:
return ncclUint64;
case MEGRAY_FLOAT16:
return ncclFloat16;
case MEGRAY_FLOAT32:
return ncclFloat32;
case MEGRAY_FLOAT64:
return ncclFloat64;
default:
MEGRAY_THROW("unknown dtype");
}
}
ncclRedOp_t get_nccl_reduce_op(const ReduceOp red_op) {
switch (red_op) {
case MEGRAY_SUM:
return ncclSum;
case MEGRAY_MAX:
return ncclMax;
case MEGRAY_MIN:
return ncclMin;
default:
MEGRAY_THROW("unknown reduce op");
}
}
} // namespace MegRay
/**
* \file src/nccl/utils.h
* MegRay is Licensed under the Apache License, Version 2.0 (the "License")
*
* Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
#pragma once
#include <iostream>
#include "nccl.h"
#include "../common.h"
namespace MegRay {
#define NCCL_CHECK(expr) \
do { \
ncclResult_t result = (expr); \
if (result != ncclSuccess) { \
MEGRAY_ERROR("nccl error [%d]: %s", result, \
ncclGetErrorString(result)); \
return MEGRAY_NCCL_ERR; \
} \
} while (0);
#define NCCL_ASSERT(expr) \
do { \
ncclResult_t result = (expr); \
if (result != ncclSuccess) { \
MEGRAY_ERROR("nccl error [%d]: %s", result, \
ncclGetErrorString(result)); \
MEGRAY_THROW("nccl error"); \
} \
} while (0);
ncclDataType_t get_nccl_dtype(const DType dtype);
ncclRedOp_t get_nccl_reduce_op(const ReduceOp red_op);
} // namespace MegRay
/**
* \file src/ucx/all_gather.cpp
* MegRay is Licensed under the Apache License, Version 2.0 (the "License")
*
* Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
#include "communicator.h"
#include "utils.h"
namespace MegRay {
Status UcxCommunicator::all_gather(const void* sendbuff, void* recvbuff, size_t sendlen,
DType dtype, std::shared_ptr<Context> ctx) {
// get cuda stream
MEGRAY_ASSERT(ctx->type() == MEGRAY_CTX_CUDA, "only cuda context supported");
cudaStream_t stream = static_cast<CudaContext*>(ctx.get())->get_stream();
CUDA_CHECK(cudaStreamSynchronize(stream));
// copy local data
size_t size = get_dtype_size(dtype);
CUDA_CHECK(cudaMemcpy((char*)recvbuff + m_rank * sendlen * size, sendbuff,
sendlen * size, cudaMemcpyDeviceToDevice));
uint32_t r_rank = (m_rank + 1) % m_nranks;
uint32_t l_rank = (m_rank + m_nranks - 1) % m_nranks;
char sync_send, sync_recv;
// ring all gather
// each round all nodes pass the msg to next node and next node saves a copy
// i-th part of msg passes from i-th node to i+1 to i+2 and finally to (i-1)-th node
// after nranks - 1 rounds every node has all msg
for (size_t i = 0; i < m_nranks - 1; i++) {
uint32_t send_rank = ring_sub(m_rank, i, m_nranks);
uint32_t recv_rank = ring_sub(m_rank, i + 1, m_nranks);
size_t recvlen = sendlen * size;
// pass (rank - i)-th part to next node
// recv (rank - i - 1)-th part from previous node
MEGRAY_CHECK(_send((char*)recvbuff + send_rank * recvlen, recvlen, r_rank));
MEGRAY_CHECK(_recv((char*)recvbuff + recv_rank * recvlen, recvlen, l_rank));
MEGRAY_CHECK(_flush());
// synchronization
MEGRAY_CHECK(_send(&sync_send, sizeof(char), l_rank));
MEGRAY_CHECK(_recv(&sync_recv, sizeof(char), r_rank));
MEGRAY_CHECK(_flush());
}
return MEGRAY_OK;
}
} // namespace MegRay
/**
* \file src/ucx/all_reduce.cpp
* MegRay is Licensed under the Apache License, Version 2.0 (the "License")
*
* Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
#include "communicator.h"
#include <vector>
#include "utils.h"
namespace MegRay {
Status UcxCommunicator::all_reduce(const void* sendbuff, void* recvbuff, size_t len,
DType dtype, ReduceOp op, std::shared_ptr<Context> ctx) {
// get cuda stream
MEGRAY_ASSERT(ctx->type() == MEGRAY_CTX_CUDA, "only cuda context supported");
cudaStream_t stream = static_cast<CudaContext*>(ctx.get())->get_stream();
CUDA_CHECK(cudaStreamSynchronize(stream));
// compute chunk sizes
size_t quotient = len / m_nranks;
size_t remainder = len % m_nranks;
std::vector<size_t> chunk_sizes(m_nranks, quotient);
for (size_t i = 0; i < remainder; i++) {
chunk_sizes[i]++;
}
// allocate workspace for recv, chunk_0 is the largest
size_t size = get_dtype_size(dtype);
void* workspace;
CUDA_CHECK(cudaMalloc(&workspace, chunk_sizes[0] * size));
CUDA_CHECK(cudaMemcpy(recvbuff, sendbuff, len * size, cudaMemcpyDeviceToDevice));
// compute chunk offsets
std::vector<size_t> offsets(m_nranks, 0);
for (size_t i = 1; i < m_nranks; i++) {
offsets[i] = offsets[i - 1] + chunk_sizes[i - 1] * size;
}
uint32_t r_rank = (m_rank + 1) % m_nranks;
uint32_t l_rank = (m_rank + m_nranks - 1) % m_nranks;
char sync_send, sync_recv;
// step 1: all reduce chunks
// split data with n chunks , reduce i-th chunk data at (i-1)-th node
// pass and add i-th chunk from i-th node to i+1 to i+2 finally to (i-1)-th node
// at last i-th node has the sum of (i+1)-th chunk data
for (uint32_t i = 0; i < m_nranks - 1; i++) {
uint32_t send_chunk = ring_sub(m_rank, i, m_nranks);
uint32_t recv_chunk = ring_sub(m_rank, i + 1, m_nranks);
size_t send_offset = offsets[send_chunk];
size_t recv_offset = offsets[recv_chunk];
MEGRAY_CHECK(_send((char*)recvbuff + send_offset, chunk_sizes[send_chunk] * size, r_rank));
MEGRAY_CHECK(_recv((char*)workspace, chunk_sizes[recv_chunk] * size, l_rank));
MEGRAY_CHECK(_flush());
_reduce((char*)recvbuff + recv_offset, (char*)workspace,
(char*)recvbuff + recv_offset, chunk_sizes[recv_chunk], dtype, op,
stream);
CUDA_CHECK(cudaStreamSynchronize(stream));
MEGRAY_CHECK(_send(&sync_send, sizeof(char), l_rank));
MEGRAY_CHECK(_recv(&sync_recv, sizeof(char), r_rank));
MEGRAY_CHECK(_flush());
}
// step 2: ring allgather
// each round all nodes pass the msg to next node and next node saves it
// i-th part of msg passes from i-th node to i+1 to i+2 and finaly to (i-1)-th node
// after n-1 rounds every node has all msg
for (uint32_t i = 0; i < m_nranks - 1; i++) {
uint32_t send_chunk = ring_sub(m_rank + 1, i, m_nranks);
uint32_t recv_chunk = ring_sub(m_rank, i, m_nranks);
MEGRAY_CHECK(_send((char*)recvbuff + offsets[send_chunk], chunk_sizes[send_chunk] * size, r_rank));
MEGRAY_CHECK(_recv((char*)recvbuff + offsets[recv_chunk], chunk_sizes[recv_chunk] * size, l_rank));
MEGRAY_CHECK(_flush());
MEGRAY_CHECK(_send(&sync_send, sizeof(char), l_rank));
MEGRAY_CHECK(_recv(&sync_recv, sizeof(char), r_rank));
MEGRAY_CHECK(_flush());
}
// copy output and free workspace
CUDA_CHECK(cudaMemcpy(recvbuff, recvbuff, len * size, cudaMemcpyDeviceToDevice));
CUDA_CHECK(cudaFree(workspace));
return MEGRAY_OK;
}
} // namespace MegRay
/**
* \file src/ucx/broadcast.cpp
* MegRay is Licensed under the Apache License, Version 2.0 (the "License")
*
* Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
#include "communicator.h"
#include "utils.h"
namespace MegRay {
Status UcxCommunicator::broadcast(const void* sendbuff, void* recvbuff, size_t len,
DType dtype, uint32_t root, std::shared_ptr<Context> ctx) {
// get cuda stream
MEGRAY_ASSERT(ctx->type() == MEGRAY_CTX_CUDA, "only cuda context supported");
cudaStream_t stream = static_cast<CudaContext*>(ctx.get())->get_stream();
CUDA_CHECK(cudaStreamSynchronize(stream));
size_t size = get_dtype_size(dtype);
if (m_rank == root) {
CUDA_CHECK(cudaMemcpy(recvbuff, sendbuff, len * size,
cudaMemcpyDeviceToDevice));
}
// offset to make sure virtual_root is 0
auto virtual_rank = ring_sub(m_rank, root, m_nranks);
// we need d rounds to broadcast data
size_t d = 0, t = m_nranks - 1;
while (t > 0) {
++d;
t >>= 1;
}
// begin with one node with msg
// on each round every node with msg sends to one node without msg
// on the 1-st round , node B0000000 sends msg to node B1000000
// on the i-th round , node Bxxx0000 sends msg to node Bxxx1000
// on the last round , node Bxxxxxx0 sends msg to node Bxxxxxx1
int mask = (1 << d) - 1;
for(int i = d - 1; i >= 0; -- i) {
int bit = 1 << i;
mask = mask ^ bit;
if ((virtual_rank & mask) == 0) {
if ((virtual_rank & bit) == 0) {
auto virtual_dest = virtual_rank ^ bit;
auto actual_dest = ring_add(virtual_dest, root, m_nranks);
if (virtual_dest < m_nranks){ // valid dest
MEGRAY_CHECK(_send(recvbuff, len * size, actual_dest));
MEGRAY_CHECK(_flush());
CUDA_CHECK(cudaStreamSynchronize(stream));
}
} else {
auto virtual_src = virtual_rank ^ bit;
auto actual_src = ring_add(virtual_src, root, m_nranks);
if (virtual_src < m_nranks){ // valid src
MEGRAY_CHECK(_recv(recvbuff, len * size, actual_src));
MEGRAY_CHECK(_flush());
CUDA_CHECK(cudaStreamSynchronize(stream));
}
}
}
}
return MEGRAY_OK;
}
} // namespace MegRay
/**
* \file src/ucx/communicator.cpp
* MegRay is Licensed under the Apache License, Version 2.0 (the "License")
*
* Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
#include "communicator.h"
#include <cstring>
namespace MegRay {
// returned handler of ucp requests
// flag "completed" is set in callback functions
struct Request {
int completed;
};
// init request handler
static void request_init(void* request) {
static_cast<Request*>(request)->completed = 0;
}
// send callback, set flag "completed" to 1
static void send_cb(void* request, ucs_status_t status) {
((Request*)request)->completed = 1;
}
// receive callback, set flag "completed" to 1
static void recv_cb(void* request, ucs_status_t status, ucp_tag_recv_info_t* info) {
((Request*)request)->completed = 1;
}
UcxCommunicator::UcxCommunicator(int nranks, int rank) :
Communicator(nranks, rank), m_inited(false) {
const char* env = "UCX_MEMTYPE_CACHE=n";
putenv((char*)env);
// set ucp context params
ucp_params_t ucp_params;
memset(&ucp_params, 0, sizeof(ucp_params));
ucp_params.field_mask = UCP_PARAM_FIELD_FEATURES |
UCP_PARAM_FIELD_REQUEST_SIZE |
UCP_PARAM_FIELD_REQUEST_INIT;
ucp_params.features = UCP_FEATURE_TAG | UCP_FEATURE_RMA | UCP_FEATURE_WAKEUP;
ucp_params.request_size = sizeof(Request);
ucp_params.request_init = request_init;
// init ucp context
ucs_status_t status;
status = ucp_init(&ucp_params, nullptr, &m_context);
MEGRAY_ASSERT(status == UCS_OK, "failed to init ucp context");
// set ucp worker params
ucp_worker_params_t worker_params;
memset(&worker_params, 0, sizeof(worker_params));
worker_params.field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE;
worker_params.thread_mode = UCS_THREAD_MODE_SINGLE;
// create ucp worker
status = ucp_worker_create(m_context, &worker_params, &m_worker);
MEGRAY_ASSERT(status == UCS_OK, "failed to create ucp worker");
}
UcxCommunicator::~UcxCommunicator() {
// destroy ucp worker and cleanup ucp context
ucp_worker_destroy(m_worker);
ucp_cleanup(m_context);
}
std::string UcxCommunicator:: get_uid() {
size_t addr_len;
ucp_address_t* addr;
// get ucp worker address
ucs_status_t status = ucp_worker_get_address(m_worker, &addr, &addr_len);
MEGRAY_ASSERT(status == UCS_OK, "failed to get ucp worker address");
// copy bytes to a string
std::string uid((char*)addr, addr_len);
ucp_worker_release_address(m_worker, addr);
return uid;
}
Status UcxCommunicator::init(const std::vector<std::string>& uids) {
MEGRAY_ASSERT(uids.size() == m_nranks, "incorrect size of uids");
m_eps.resize(m_nranks);
// set endpoint params
ucp_ep_params_t ep_params;
ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS;
ucs_status_t status;
for (size_t i = 0; i < m_nranks; i++) {
if (i == m_rank) continue;
// set endpoint address
ep_params.address = reinterpret_cast<const ucp_address_t*>(uids[i].data());
// create ucp endpoint
status = ucp_ep_create(m_worker, &ep_params, &m_eps[i]);
MEGRAY_ASSERT(status == UCS_OK, "failed to create ucp endpoint");
}
return MEGRAY_OK;
}
Status UcxCommunicator::send(const void* sendbuff, size_t len, uint32_t rank,
std::shared_ptr<Context> ctx) {
char sync;
MEGRAY_CHECK(_send(sendbuff, len, rank));
MEGRAY_CHECK(_recv(&sync, sizeof(char), rank)); // synchronize
MEGRAY_CHECK(_flush());
return MEGRAY_OK;
}
Status UcxCommunicator::recv(void* recvbuff, size_t len, uint32_t rank,
std::shared_ptr<Context> ctx) {
char sync;
MEGRAY_CHECK(_recv(recvbuff, len, rank));
MEGRAY_CHECK(_send(&sync, sizeof(char), rank)); // synchronize
MEGRAY_CHECK(_flush());
return MEGRAY_OK;
}
Status UcxCommunicator::_send(const void* sendbuff, size_t len, uint32_t rank) {
MEGRAY_ASSERT(rank != m_rank, "invalid send rank");
std::lock_guard<std::mutex> lock(m_requests_mtx);
// submit non-blocking send request to ucp
void* ptr = ucp_tag_send_nb(m_eps[rank], sendbuff, len,
ucp_dt_make_contig(1), m_rank, send_cb);
if (UCS_PTR_IS_PTR(ptr)) {
m_requests.push_back(ptr); // send request is pending
} else if (UCS_PTR_STATUS(ptr) != UCS_OK) {
return MEGRAY_UCX_ERR;
}
return MEGRAY_OK;
}
Status UcxCommunicator::_recv(void* recvbuff, size_t len, uint32_t rank) {
MEGRAY_ASSERT(rank != m_rank, "invalid recv rank");
std::lock_guard<std::mutex> lock(m_requests_mtx);
// submit non-blocking receive request to ucp
// mask 0xffffffff means matching every bit of uint32
void* ptr = ucp_tag_recv_nb(m_worker, recvbuff, len,
ucp_dt_make_contig(1), rank, 0xffffffff, recv_cb);
if (UCS_PTR_IS_PTR(ptr)) {
m_requests.push_back(ptr);
} else if (UCS_PTR_STATUS(ptr) != UCS_OK) { // receive request is pending
return MEGRAY_UCX_ERR;
}
return MEGRAY_OK;
}
Status UcxCommunicator::_flush() {
std::lock_guard<std::mutex> lock(m_requests_mtx);
for (size_t i = 0; i < m_requests.size(); i++) {
Request* req = (Request*)(m_requests[i]);
// check flag "completed" of request handler
while (req->completed == 0) {
ucp_worker_progress(m_worker);
}
// release request handler
req->completed = 0;
ucp_request_release(req);
}
m_requests.clear();
return MEGRAY_OK;
}
} // namespace MegRay
/**
* \file src/ucx/communicator.h
* MegRay is Licensed under the Apache License, Version 2.0 (the "License")
*
* Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
#pragma once
#include <mutex>
#include <vector>
#include <ucp/api/ucp.h>
#include "../communicator.h"
namespace MegRay {
/*!
* simple implementation of collective communications using ucp api
* a ucx communicator corresponds to a ucp worker
*/
class UcxCommunicator : public Communicator {
public:
UcxCommunicator(int nranks, int rank);
~UcxCommunicator();
// get a serialized string of ucp worker address
std::string get_uid() override;
Status init(const std::vector<std::string>& uids) override;
Status send(const void* sendbuff, size_t len, uint32_t rank,
std::shared_ptr<Context> ctx) override;
Status recv(void* recvbuff, size_t len, uint32_t rank,
std::shared_ptr<Context> ctx) override;
Status all_gather(const void* sendbuff, void* recvbuff, size_t sendlen,
DType dtype, std::shared_ptr<Context> ctx) override;
Status all_reduce(const void* sendbuff, void* recvbuff, size_t len,
DType dtype, ReduceOp op, std::shared_ptr<Context> ctx) override;
Status reduce_scatter(const void* sendbuff, void* recvbuff, size_t recvlen,
DType dtype, ReduceOp op, std::shared_ptr<Context> ctx) override;
Status broadcast(const void* sendbuff, void* recvbuff, size_t len,
DType dtype, uint32_t root, std::shared_ptr<Context> ctx) override;
Status reduce(const void* sendbuff, void* recvbuff, size_t len,
DType dtype, ReduceOp op, uint32_t root, std::shared_ptr<Context> ctx) override;
private:
// internal non-blocking send method
Status _send(const void* sendbuff, size_t len, uint32_t rank);
// internal non-blocking receive method
Status _recv(void* recvbuff, size_t len, uint32_t rank);
// flush _send and _recv requests
Status _flush();
// launch cuda kernel for reduce operations
void _reduce(void* i0, void* i1, void* o, size_t len, DType dtype,
ReduceOp op, cudaStream_t stream);
ucp_context_h m_context;
ucp_worker_h m_worker;
bool m_inited;
std::vector<ucp_ep_h> m_eps; // ucp endpoints
std::vector<void*> m_requests;
std::mutex m_requests_mtx;
};
} // namespace MegRay
/**
* \file src/ucx/kernel.cu
* MegRay is Licensed under the Apache License, Version 2.0 (the "License")
*
* Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
#include "communicator.h"
namespace MegRay {
template <typename T>
__global__ void reduce_sum_kernel(T* i0, T* i1, T* o, size_t len) {
size_t i = blockDim.x * blockIdx.x + threadIdx.x;
if (i < len) {
o[i] = i0[i] + i1[i];
}
}
template <typename T>
__global__ void reduce_max_kernel(T* i0, T* i1, T* o, size_t len) {
size_t i = blockDim.x * blockIdx.x + threadIdx.x;
if (i < len) {
o[i] = (i0[i] > i1[i]) ? i0[i] : i1[i];
}
}
template <typename T>
__global__ void reduce_min_kernel(T* i0, T* i1, T* o, size_t len) {
size_t i = blockDim.x * blockIdx.x + threadIdx.x;
if (i < len) {
o[i] = (i0[i] < i1[i]) ? i0[i] : i1[i];
}
}
template <typename T>
void reduce_helper(T* i0, T* i1, T* o, size_t len, ReduceOp op,
cudaStream_t stream) {
size_t block_dim = 512;
size_t grid_dim = (len + block_dim - 1) / block_dim;
switch (op) {
case MEGRAY_SUM:
reduce_sum_kernel<T><<<grid_dim, block_dim, 0, stream>>>(i0, i1, o, len);
break;
case MEGRAY_MAX:
reduce_max_kernel<T><<<grid_dim, block_dim, 0, stream>>>(i0, i1, o, len);
break;
case MEGRAY_MIN:
reduce_min_kernel<T><<<grid_dim, block_dim, 0, stream>>>(i0, i1, o, len);
break;
default:
MEGRAY_THROW("unknown reduce op");
}
}
void UcxCommunicator::_reduce(void* i0, void* i1, void* o, size_t len,
DType dtype, ReduceOp op, cudaStream_t stream) {
switch (dtype) {
case MEGRAY_INT8:
reduce_helper<int8_t>((int8_t*)i0, (int8_t*)i1, (int8_t*)o,
len, op, stream);
break;
case MEGRAY_UINT8:
reduce_helper<uint8_t>((uint8_t*)i0, (uint8_t*)i1, (uint8_t*)o,
len, op, stream);
break;
case MEGRAY_INT32:
reduce_helper<int32_t>((int32_t*)i0, (int32_t*)i1, (int32_t*)o,
len, op, stream);
break;
case MEGRAY_UINT32:
reduce_helper<uint32_t>((uint32_t*)i0, (uint32_t*)i1, (uint32_t*)o,
len, op, stream);
break;
case MEGRAY_INT64:
reduce_helper<int64_t>((int64_t*)i0, (int64_t*)i1, (int64_t*)o,
len, op, stream);
break;
case MEGRAY_UINT64:
reduce_helper<uint64_t>((uint64_t*)i0, (uint64_t*)i1, (uint64_t*)o,
len, op, stream);
break;
case MEGRAY_FLOAT32:
reduce_helper<float>((float*)i0, (float*)i1, (float*)o,
len, op, stream);
break;
case MEGRAY_FLOAT64:
reduce_helper<double>((double*)i0, (double*)i1, (double*)o,
len, op, stream);
break;
default:
MEGRAY_THROW("unknown dtype");
}
}
} // namespace MegRay
/**
* \file src/ucx/reduce.cpp
* MegRay is Licensed under the Apache License, Version 2.0 (the "License")
*
* Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
#include "communicator.h"
#include "utils.h"
namespace MegRay {
Status UcxCommunicator::reduce(const void* sendbuff, void* recvbuff, size_t len,
DType dtype, ReduceOp op, uint32_t root, std::shared_ptr<Context> ctx) {
// get cuda stream
MEGRAY_ASSERT(ctx->type() == MEGRAY_CTX_CUDA, "only cuda context supported");
cudaStream_t stream = static_cast<CudaContext*>(ctx.get())->get_stream();
CUDA_CHECK(cudaStreamSynchronize(stream));
// allocate workspace
size_t size = get_dtype_size(dtype);
void* workspace;
CUDA_CHECK(cudaMalloc(&workspace, 2 * len * size));
// init lbuffer and rbuffer
char* lbuffer = (char*)workspace;
char* rbuffer = (char*)workspace + len * size;
CUDA_CHECK(cudaMemcpy(rbuffer, sendbuff, len * size, cudaMemcpyDeviceToDevice));
// offset to make sure virtual_root is 0
auto virtual_rank = ring_sub(m_rank, root, m_nranks);
// we need d rounds to reduce data
size_t d = 0, t = m_nranks - 1;
while (t > 0) {
++d;
t >>= 1;
}
// on each round half nodes send msg to the other half
// on the 1-st round , node Bxxxxxx1 sends msg to node Bxxxxxx0
// on the i-th round , node Bxxx1000 sends msg to node Bxxx0000
// on the last round , node B1000000 sends msg to node B0000000
int mask = 0;
for(size_t i = 0; i < d; i++) {
int bit = 1 << i;
if ((virtual_rank & mask) == 0) {
if ((virtual_rank & bit) != 0) {
auto virtual_dest = virtual_rank ^ bit;
auto actual_dest = ring_add(virtual_dest, root, m_nranks);
if (virtual_dest < m_nranks){ // valid dest
MEGRAY_CHECK(_send(rbuffer, len * size, actual_dest));
MEGRAY_CHECK(_flush());
CUDA_CHECK(cudaStreamSynchronize(stream));
}
} else {
auto virtual_src = virtual_rank ^ bit;
auto actual_src = ring_add(virtual_src, root, m_nranks);
if (virtual_src < m_nranks){ // valid src
MEGRAY_CHECK(_recv(lbuffer, len * size, actual_src));
MEGRAY_CHECK(_flush());
_reduce(lbuffer, rbuffer, rbuffer, len, dtype, op, stream);
CUDA_CHECK(cudaStreamSynchronize(stream));
}
}
}
mask = mask ^ bit;
}
if (m_rank == root) {
CUDA_CHECK(cudaMemcpy(recvbuff, rbuffer, len * size, cudaMemcpyDeviceToDevice));
}
CUDA_CHECK(cudaFree(workspace));
return MEGRAY_OK;
}
} // namespace MegRay
/**
* \file src/ucx/reduce_scatter.cpp
* MegRay is Licensed under the Apache License, Version 2.0 (the "License")
*
* Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
#include "communicator.h"
#include "utils.h"
namespace MegRay {
Status UcxCommunicator::reduce_scatter(const void* sendbuff, void* recvbuff, size_t recvlen,
DType dtype, ReduceOp op, std::shared_ptr<Context> ctx) {
// get cuda stream
MEGRAY_ASSERT(ctx->type() == MEGRAY_CTX_CUDA, "only cuda context supported");
cudaStream_t stream = static_cast<CudaContext*>(ctx.get())->get_stream();
CUDA_CHECK(cudaStreamSynchronize(stream));
// allocate lbuffer and rbuffer
size_t size = get_dtype_size(dtype);
char* lbuffer;
char* rbuffer;
CUDA_CHECK(cudaMalloc(&lbuffer, recvlen * size));
CUDA_CHECK(cudaMalloc(&rbuffer, recvlen * m_nranks * size));
CUDA_CHECK(cudaMemcpy(rbuffer, sendbuff, recvlen * m_nranks * size, cudaMemcpyDeviceToDevice));
// pass and add (i-1)-th part from i-th node to i+1 to i+2 finally to (i-1)-th node
// at last i-th node has the sum of i-th part data
size_t lrank = ring_sub(m_rank, 1, m_nranks);
size_t rrank = ring_add(m_rank, 1, m_nranks);
for (size_t i = 1; i < m_nranks; ++i) {
size_t send_offset = recvlen * size * ring_sub(m_rank, i, m_nranks);
size_t recv_offset =
recvlen * size * ring_sub(m_rank, i + 1, m_nranks);
MEGRAY_CHECK(_send(rbuffer + send_offset, recvlen * size, rrank));
MEGRAY_CHECK(_recv(lbuffer, recvlen * size, lrank));
MEGRAY_CHECK(_flush());
_reduce(lbuffer, rbuffer + recv_offset, rbuffer + recv_offset,
recvlen, dtype, op, stream);
CUDA_CHECK(cudaStreamSynchronize(stream));
}
size_t offset = recvlen * size * m_rank;
CUDA_CHECK(cudaMemcpy(recvbuff, rbuffer + offset, recvlen * size, cudaMemcpyDeviceToDevice));
CUDA_CHECK(cudaFree(lbuffer));
CUDA_CHECK(cudaFree(rbuffer));
return MEGRAY_OK;
}
} // namespace MegRay
/**
* \file src/ucx/utils.h
* MegRay is Licensed under the Apache License, Version 2.0 (the "License")
*
* Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
#pragma once
#include <cstdint>
namespace MegRay {
inline uint32_t ring_add(uint32_t n, uint32_t delta, uint32_t m) {
return (n + delta) % m;
}
inline uint32_t ring_sub(uint32_t n, uint32_t delta, uint32_t m) {
return (n + m - delta % m) % m;
}
} // namespace MegRay
file(GLOB_RECURSE SOURCES "*.cpp")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS}")
add_executable(megray_test ${SOURCES})
target_link_libraries(megray_test megray libnccl libucp cudart gtest pthread dl rt)
/**
* \file test/test_base.h
* MegRay is Licensed under the Apache License, Version 2.0 (the "License")
*
* Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
#pragma once
#include <thread>
#include <vector>
#include <gtest/gtest.h>
#include "../src/megray.h"
template <typename T>
void run_test(int nranks, MegRay::Backend backend,
std::vector<std::vector<T>>& inputs,
std::vector<std::vector<T>>& expect_outputs, MegRay::DType dtype,
std::function<void(std::shared_ptr<MegRay::Communicator>,
std::vector<std::string>&, int,
std::vector<T>&, std::vector<T>&)>
main_func) {
std::vector<std::shared_ptr<MegRay::Communicator>> comms(nranks);
std::vector<std::string> uids(nranks);
std::vector<std::vector<T>> outputs(nranks);
for (int i = 0; i < nranks; i++) {
comms[i] = MegRay::get_communicator(nranks, i, backend);
uids[i] = comms[i]->get_uid();
outputs[i].resize(expect_outputs[i].size());
}
std::vector<std::thread> threads;
for (int i = 0; i < nranks; i++) {
threads.push_back(std::thread(main_func, comms[i], std::ref(uids), i,
std::ref(inputs[i]),
std::ref(outputs[i])));
}
for (int i = 0; i < nranks; i++) {
threads[i].join();
}
for (int i = 0; i < nranks; i++) {
for (size_t j = 0; j < expect_outputs[i].size(); j++) {
ASSERT_FLOAT_EQ(expect_outputs[i][j], outputs[i][j]);
}
}
}
/**
* \file test/test_main.cpp
* MegRay is Licensed under the Apache License, Version 2.0 (the "License")
*
* Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
#include <gtest/gtest.h>
#include "../src/megray.h"
int main(int argc, char* argv[]) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
/**
* \file test/test_opr.cpp
* MegRay is Licensed under the Apache License, Version 2.0 (the "License")
*
* Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
#include <algorithm>
#include <iostream>
#include <limits>
#include <string>
#include <thread>
#include <vector>
#include <gtest/gtest.h>
#include "../src/megray.h"
#include "test_base.h"
TEST(TestNcclCommunicator, Init) {
const int nranks = 3;
std::vector<std::shared_ptr<MegRay::Communicator>> comms(nranks);
std::vector<std::string> uids(nranks);
for (size_t i = 0; i < nranks; i++) {
comms[i] = MegRay::get_communicator(nranks, i, MegRay::MEGRAY_NCCL);
uids[i] = comms[i]->get_uid();
}
auto run = [&](int rank) { comms[rank]->init(uids); };
std::vector<std::thread> threads;
for (size_t i = 0; i < nranks; i++) {
threads.push_back(std::thread(run, i));
}
for (size_t i = 0; i < nranks; i++) {
threads[i].join();
}
}
TEST(TestUcxCommunicator, Init) {
const int nranks = 3;
std::vector<std::shared_ptr<MegRay::Communicator>> comms(nranks);
std::vector<std::string> uids(nranks);
for (int i = 0; i < nranks; i++) {
comms[i] = MegRay::get_communicator(nranks, i, MegRay::MEGRAY_UCX);
uids[i] = comms[i]->get_uid();
}
auto run = [&](int rank) {
cudaSetDevice(rank);
comms[rank]->init(uids);
};
std::vector<std::thread> threads;
for (int i = 0; i < nranks; i++) {
threads.push_back(std::thread(run, i));
}
for (int i = 0; i < nranks; i++) {
threads[i].join();
}
}
TEST(TestOpr, SendRecv) {
auto send_comm = MegRay::get_communicator(2, 0, MegRay::MEGRAY_UCX);
auto recv_comm = MegRay::get_communicator(2, 1, MegRay::MEGRAY_UCX);
std::vector<std::string> uids(2);
uids[0] = send_comm->get_uid();
uids[1] = recv_comm->get_uid();
std::string msg("test_message");
size_t len = msg.size();
std::string output;
auto sender = [&]() {
CUDA_ASSERT(cudaSetDevice(0));
send_comm->init(uids);
cudaStream_t stream;
CUDA_ASSERT(cudaStreamCreate(&stream));
auto ctx = MegRay::CudaContext::make(stream);
void* ptr;
cudaMalloc(&ptr, len);
CUDA_ASSERT(cudaMemcpy(ptr, msg.data(), len, cudaMemcpyHostToDevice));
send_comm->send(msg.data(), len, 1, ctx);
CUDA_ASSERT(cudaStreamSynchronize(stream));
};
auto receiver = [&]() {
CUDA_ASSERT(cudaSetDevice(1));
recv_comm->init(uids);
cudaStream_t stream;
CUDA_ASSERT(cudaStreamCreate(&stream));
auto ctx = MegRay::CudaContext::make(stream);
void* ptr;
CUDA_ASSERT(cudaMalloc(&ptr, len));
recv_comm->recv(ptr, len, 0, ctx);
CUDA_ASSERT(cudaStreamSynchronize(stream));
char* outbuff = new char[len];
CUDA_ASSERT(cudaMemcpy(outbuff, ptr, len, cudaMemcpyDeviceToHost));
output = std::string(outbuff, len);
delete outbuff;
};
std::thread send_th(sender);
std::thread recv_th(receiver);
send_th.join();
recv_th.join();
ASSERT_EQ(msg, output);
}
TEST(TestOpr, AllGather) {
const int nranks = 3;
const size_t sendlen = 10;
std::vector<std::vector<float>> inputs(nranks, std::vector<float>(sendlen));
std::vector<std::vector<float>> outputs(
nranks, std::vector<float>(nranks * sendlen));
for (size_t j = 0; j < sendlen; j++) {
for (size_t i = 0; i < nranks; i++) {
inputs[i][j] = 1.0 * (i + 1) * (j + 1);
for (int k = 0; k < nranks; k++) {
outputs[k][i * sendlen + j] = inputs[i][j];
}
}
}
auto run = [nranks, sendlen](std::shared_ptr<MegRay::Communicator> comm,
std::vector<std::string>& uids, int rank,
std::vector<float>& input,
std::vector<float>& output) -> void {
CUDA_ASSERT(cudaSetDevice(rank));
comm->init(uids);
cudaStream_t stream;
CUDA_ASSERT(cudaStreamCreate(&stream));
auto ctx = MegRay::CudaContext::make(stream);
void *in_ptr, *out_ptr;
CUDA_ASSERT(cudaMalloc(&in_ptr, sendlen * sizeof(float)));
CUDA_ASSERT(cudaMalloc(&out_ptr, sendlen * nranks * sizeof(float)));
CUDA_ASSERT(cudaMemcpy(in_ptr, input.data(), sendlen * sizeof(float),
cudaMemcpyHostToDevice));
int ret = comm->all_gather(in_ptr, out_ptr, sendlen,
MegRay::MEGRAY_FLOAT32, ctx);
ASSERT_EQ(ret, 0);
CUDA_ASSERT(cudaStreamSynchronize(stream));
CUDA_ASSERT(cudaMemcpy(output.data(), out_ptr,
nranks * sendlen * sizeof(float),
cudaMemcpyDeviceToHost));
};
run_test<float>(nranks, MegRay::MEGRAY_NCCL, inputs, outputs,
MegRay::MEGRAY_FLOAT32, run);
run_test<float>(nranks, MegRay::MEGRAY_UCX, inputs, outputs,
MegRay::MEGRAY_FLOAT32, run);
}
TEST(TestOpr, AllReduce) {
const int nranks = 3;
const size_t len = 10;
std::vector<std::vector<float>> inputs(nranks, std::vector<float>(len));
std::vector<std::vector<float>> expected_outputs(nranks,
std::vector<float>(len));
auto reduce_func = [nranks, len](MegRay::ReduceOp op) {
auto run = [nranks, len, op](std::shared_ptr<MegRay::Communicator> comm,
std::vector<std::string>& uids, int rank,
std::vector<float>& input,
std::vector<float>& output) {
CUDA_ASSERT(cudaSetDevice(rank));
comm->init(uids);
cudaStream_t stream;
CUDA_ASSERT(cudaStreamCreate(&stream));
auto ctx = MegRay::CudaContext::make(stream);
void *in_ptr, *out_ptr;
CUDA_ASSERT(cudaMalloc(&in_ptr, len * sizeof(float)));
CUDA_ASSERT(cudaMalloc(&out_ptr, len * sizeof(float)));
CUDA_ASSERT(cudaMemcpy(in_ptr, input.data(), len * sizeof(float),
cudaMemcpyHostToDevice));
int ret = comm->all_reduce(in_ptr, out_ptr, len,
MegRay::MEGRAY_FLOAT32, op, ctx);
ASSERT_EQ(ret, 0);
CUDA_ASSERT(cudaStreamSynchronize(stream));
CUDA_ASSERT(cudaMemcpy(output.data(), out_ptr, len * sizeof(float),
cudaMemcpyDeviceToHost));
};
return run;
};
for (size_t j = 0; j < len; j++) {
float sum = 0;
for (size_t i = 0; i < nranks; i++) {
inputs[i][j] = 1.0 * (i + 1) * (j + 1);
sum += inputs[i][j];
}
for (size_t i = 0; i < nranks; i++) {
expected_outputs[i][j] = sum;
}
}
run_test<float>(nranks, MegRay::MEGRAY_NCCL, inputs, expected_outputs,
MegRay::MEGRAY_FLOAT32, reduce_func(MegRay::MEGRAY_SUM));
run_test<float>(nranks, MegRay::MEGRAY_UCX, inputs, expected_outputs,
MegRay::MEGRAY_FLOAT32, reduce_func(MegRay::MEGRAY_SUM));
for (size_t j = 0; j < len; j++) {
float max_val = std::numeric_limits<float>::min();
for (size_t i = 0; i < nranks; i++) {
inputs[i][j] = 1.0 * (i + 1) * (j + 1);
max_val = std::max(max_val, inputs[i][j]);
}
for (size_t i = 0; i < nranks; i++) {
expected_outputs[i][j] = max_val;
}
}
run_test<float>(nranks, MegRay::MEGRAY_NCCL, inputs, expected_outputs,
MegRay::MEGRAY_FLOAT32, reduce_func(MegRay::MEGRAY_MAX));
run_test<float>(nranks, MegRay::MEGRAY_UCX, inputs, expected_outputs,
MegRay::MEGRAY_FLOAT32, reduce_func(MegRay::MEGRAY_MAX));
for (size_t j = 0; j < len; j++) {
float min_val = std::numeric_limits<float>::max();
for (size_t i = 0; i < nranks; i++) {
inputs[i][j] = 1.0 * (i + 1) * (j + 1);
min_val = std::min(min_val, inputs[i][j]);
}
for (size_t i = 0; i < nranks; i++) {
expected_outputs[i][j] = min_val;
}
}
run_test<float>(nranks, MegRay::MEGRAY_NCCL, inputs, expected_outputs,
MegRay::MEGRAY_FLOAT32, reduce_func(MegRay::MEGRAY_MIN));
run_test<float>(nranks, MegRay::MEGRAY_UCX, inputs, expected_outputs,
MegRay::MEGRAY_FLOAT32, reduce_func(MegRay::MEGRAY_MIN));
}
TEST(TestOpr, ReduceScatterSum) {
const int nranks = 3;
const size_t recvlen = 10;
std::vector<std::vector<float>> inputs(
nranks, std::vector<float>(nranks * recvlen));
std::vector<std::vector<float>> expected_outputs(
nranks, std::vector<float>(recvlen));
auto reduce_func = [nranks, recvlen](MegRay::ReduceOp op) {
auto run = [nranks, recvlen,
op](std::shared_ptr<MegRay::Communicator> comm,
std::vector<std::string>& uids, int rank,
std::vector<float>& input, std::vector<float>& output) {
CUDA_ASSERT(cudaSetDevice(rank));
comm->init(uids);
cudaStream_t stream;
CUDA_ASSERT(cudaStreamCreate(&stream));
auto ctx = MegRay::CudaContext::make(stream);
void *in_ptr, *out_ptr;
CUDA_ASSERT(cudaMalloc(&in_ptr, nranks * recvlen * sizeof(float)));
CUDA_ASSERT(cudaMalloc(&out_ptr, recvlen * sizeof(float)));
CUDA_ASSERT(cudaMemcpy(in_ptr, input.data(),
nranks * recvlen * sizeof(float),
cudaMemcpyHostToDevice));
int ret = comm->reduce_scatter(in_ptr, out_ptr, recvlen,
MegRay::MEGRAY_FLOAT32, op, ctx);
ASSERT_EQ(ret, 0);
CUDA_ASSERT(cudaStreamSynchronize(stream));
CUDA_ASSERT(cudaMemcpy(output.data(), out_ptr,
recvlen * sizeof(float),
cudaMemcpyDeviceToHost));
};
return run;
};
for (int k = 0; k < nranks; k++) {
for (size_t j = 0; j < recvlen; j++) {
float sum = 0;
for (size_t i = 0; i < nranks; i++) {
int m = k * recvlen + j;
inputs[i][m] = 1.0 * (i + 1) * (m + 1);
sum += inputs[i][m];
}
expected_outputs[k][j] = sum;
}
}
run_test<float>(nranks, MegRay::MEGRAY_NCCL, inputs, expected_outputs,
MegRay::MEGRAY_FLOAT32, reduce_func(MegRay::MEGRAY_SUM));
run_test<float>(nranks, MegRay::MEGRAY_UCX, inputs, expected_outputs,
MegRay::MEGRAY_FLOAT32, reduce_func(MegRay::MEGRAY_SUM));
for (int k = 0; k < nranks; k++) {
for (size_t j = 0; j < recvlen; j++) {
float max_val = std::numeric_limits<float>::min();
for (size_t i = 0; i < nranks; i++) {
int m = k * recvlen + j;
inputs[i][m] = 1.0 * (i + 1) * (m + 1);
max_val = std::max(inputs[i][m], max_val);
}
expected_outputs[k][j] = max_val;
}
}
run_test<float>(nranks, MegRay::MEGRAY_NCCL, inputs, expected_outputs,
MegRay::MEGRAY_FLOAT32, reduce_func(MegRay::MEGRAY_MAX));
run_test<float>(nranks, MegRay::MEGRAY_UCX, inputs, expected_outputs,
MegRay::MEGRAY_FLOAT32, reduce_func(MegRay::MEGRAY_MAX));
for (int k = 0; k < nranks; k++) {
for (size_t j = 0; j < recvlen; j++) {
float min_val = std::numeric_limits<float>::max();
for (size_t i = 0; i < nranks; i++) {
int m = k * recvlen + j;
inputs[i][m] = 1.0 * (i + 1) * (m + 1);
min_val = std::min(inputs[i][m], min_val);
}
expected_outputs[k][j] = min_val;
}
}
run_test<float>(nranks, MegRay::MEGRAY_NCCL, inputs, expected_outputs,
MegRay::MEGRAY_FLOAT32, reduce_func(MegRay::MEGRAY_MIN));
run_test<float>(nranks, MegRay::MEGRAY_UCX, inputs, expected_outputs,
MegRay::MEGRAY_FLOAT32, reduce_func(MegRay::MEGRAY_MIN));
}
TEST(TestOpr, Broadcast) {
const int nranks = 3;
const int root = 1;
const size_t len = 10;
std::vector<std::vector<float>> inputs(nranks, std::vector<float>(len));
std::vector<std::vector<float>> outputs(nranks, std::vector<float>(len));
for (size_t j = 0; j < len; j++) {
for (size_t i = 0; i < nranks; i++) {
inputs[i][j] = 1.0 * (i + 1) * (j + 1);
}
for (size_t i = 0; i < nranks; i++) {
outputs[i][j] = inputs[root][j];
}
}
auto run = [nranks, root, len](std::shared_ptr<MegRay::Communicator> comm,
std::vector<std::string>& uids, int rank,
std::vector<float>& input,
std::vector<float>& output) {
CUDA_ASSERT(cudaSetDevice(rank));
comm->init(uids);
cudaStream_t stream;
CUDA_ASSERT(cudaStreamCreate(&stream));
auto ctx = MegRay::CudaContext::make(stream);
void *in_ptr, *out_ptr;
CUDA_ASSERT(cudaMalloc(&in_ptr, len * sizeof(float)));
CUDA_ASSERT(cudaMalloc(&out_ptr, len * sizeof(float)));
CUDA_ASSERT(cudaMemcpy(in_ptr, input.data(), len * sizeof(float),
cudaMemcpyHostToDevice));
int ret = comm->broadcast(in_ptr, out_ptr, len, MegRay::MEGRAY_FLOAT32,
root, ctx);
ASSERT_EQ(ret, 0);
CUDA_ASSERT(cudaStreamSynchronize(stream));
CUDA_ASSERT(cudaMemcpy(output.data(), out_ptr, len * sizeof(float),
cudaMemcpyDeviceToHost));
};
run_test<float>(nranks, MegRay::MEGRAY_NCCL, inputs, outputs,
MegRay::MEGRAY_FLOAT32, run);
run_test<float>(nranks, MegRay::MEGRAY_UCX, inputs, outputs,
MegRay::MEGRAY_FLOAT32, run);
}
TEST(TestOpr, ReduceSum) {
const int nranks = 3;
const int root = 1;
const size_t len = 10;
std::vector<std::vector<float>> inputs(nranks, std::vector<float>(len));
std::vector<std::vector<float>> expected_outputs(nranks);
expected_outputs[root].resize(len);
auto reduce_func = [nranks, root, len](MegRay::ReduceOp op) {
auto run = [nranks, root, len,
op](std::shared_ptr<MegRay::Communicator> comm,
std::vector<std::string>& uids, int rank,
std::vector<float>& input, std::vector<float>& output) {
CUDA_ASSERT(cudaSetDevice(rank));
comm->init(uids);
cudaStream_t stream;
CUDA_ASSERT(cudaStreamCreate(&stream));
auto ctx = MegRay::CudaContext::make(stream);
void *in_ptr, *out_ptr;
CUDA_ASSERT(cudaMalloc(&in_ptr, len * sizeof(float)));
if (rank == root) {
CUDA_ASSERT(cudaMalloc(&out_ptr, len * sizeof(float)));
}
CUDA_ASSERT(cudaMemcpy(in_ptr, input.data(), len * sizeof(float),
cudaMemcpyHostToDevice));
int ret = comm->reduce(in_ptr, out_ptr, len, MegRay::MEGRAY_FLOAT32,
op, root, ctx);
ASSERT_EQ(ret, 0);
CUDA_ASSERT(cudaStreamSynchronize(stream));
if (rank == root) {
CUDA_ASSERT(cudaMemcpy(output.data(), out_ptr,
len * sizeof(float),
cudaMemcpyDeviceToHost));
}
};
return run;
};
for (size_t j = 0; j < len; j++) {
float sum = 0;
for (size_t i = 0; i < nranks; i++) {
inputs[i][j] = 1.0 * (i + 1) * (j + 1);
sum += inputs[i][j];
}
expected_outputs[root][j] = sum;
}
run_test<float>(nranks, MegRay::MEGRAY_NCCL, inputs, expected_outputs,
MegRay::MEGRAY_FLOAT32, reduce_func(MegRay::MEGRAY_SUM));
run_test<float>(nranks, MegRay::MEGRAY_UCX, inputs, expected_outputs,
MegRay::MEGRAY_FLOAT32, reduce_func(MegRay::MEGRAY_SUM));
for (size_t j = 0; j < len; j++) {
float max_val = std::numeric_limits<float>::min();
for (size_t i = 0; i < nranks; i++) {
inputs[i][j] = 1.0 * (i + 1) * (j + 1);
max_val = std::max(inputs[i][j], max_val);
}
expected_outputs[root][j] = max_val;
}
run_test<float>(nranks, MegRay::MEGRAY_NCCL, inputs, expected_outputs,
MegRay::MEGRAY_FLOAT32, reduce_func(MegRay::MEGRAY_MAX));
run_test<float>(nranks, MegRay::MEGRAY_UCX, inputs, expected_outputs,
MegRay::MEGRAY_FLOAT32, reduce_func(MegRay::MEGRAY_MAX));
for (size_t j = 0; j < len; j++) {
float min_val = std::numeric_limits<float>::max();
for (size_t i = 0; i < nranks; i++) {
inputs[i][j] = 1.0 * (i + 1) * (j + 1);
min_val = std::min(inputs[i][j], min_val);
}
expected_outputs[root][j] = min_val;
}
run_test<float>(nranks, MegRay::MEGRAY_NCCL, inputs, expected_outputs,
MegRay::MEGRAY_FLOAT32, reduce_func(MegRay::MEGRAY_MIN));
run_test<float>(nranks, MegRay::MEGRAY_UCX, inputs, expected_outputs,
MegRay::MEGRAY_FLOAT32, reduce_func(MegRay::MEGRAY_MIN));
}
#!/bin/bash -e
cd $(dirname $0)
git submodule sync
git submodule update --init gdrcopy
git submodule update --init gtest
git submodule update --init nccl
git submodule update --init ucx
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册