提交 35744e7b 编写于 作者: Y Yu Yang

Polish code

上级 22bb262a
......@@ -20,6 +20,12 @@ limitations under the License. */
namespace paddle {
namespace framework {
#ifdef PADDLE_WITH_CUDA
// FIXME: CHECK the return value of x;
#define NCCL_INVOKE(x) x
#endif
struct OpHandle;
struct VarHandle {
......@@ -71,9 +77,51 @@ class ParallelExecutorPrivate {
std::unordered_map<platform::Place, Scope *, platform::PlaceHash>
local_scopes_;
std::unordered_map<platform::Place, platform::CUDADeviceContext,
platform::PlaceHash>
dev_ctxs_;
#ifdef PADDLE_WITH_CUDA
struct NCCLContext {
std::unique_ptr<platform::CUDADeviceContext> ctx_;
ncclComm_t comm;
explicit NCCLContext(int dev_id) {
ctx_.reset(new platform::CUDADeviceContext(platform::CUDAPlace(dev_id)));
}
cudaStream_t stream() const { return ctx_->stream(); }
int device_id() const {
return boost::get<platform::CUDAPlace>(ctx_->GetPlace()).device;
}
static void InitNCCLContext(std::map<int, NCCLContext> &contexts) {
std::vector<ncclComm_t> comms;
std::vector<int> devs;
comms.resize(contexts.size());
devs.reserve(contexts.size());
for (auto &ctx : contexts) {
devs.push_back(ctx.first);
}
NCCL_INVOKE(platform::dynload::ncclCommInitAll(
&comms[0], static_cast<int>(contexts.size()), &devs[0]));
int i = 0;
for (auto &ctx : contexts) {
ctx.second.comm = comms[i++];
}
}
};
std::map<int, NCCLContext> communication_streams_;
NCCLContext &GetNCCLCtx(platform::Place p) {
int dev_id = boost::get<platform::CUDAPlace>(p).device;
return communication_streams_.at(dev_id);
}
#endif
platform::Place main_place_;
std::unordered_map<platform::Place,
......@@ -107,8 +155,10 @@ ParallelExecutor::ParallelExecutor(
member_->main_place_ = places[0];
// Bcast Parameters to all GPUs
if (platform::is_gpu_place(member_->main_place_)) { // Is CUDA
// BCastParamsToGPUs(startup_program);
if (platform::is_gpu_place(member_->main_place_) &&
member_->local_scopes_.size() != 1) { // Is CUDA
BuildNCCLCommunicator();
BCastParamsToGPUs(startup_program);
}
// Startup Program has been run. All local scopes has correct parameters.
......@@ -241,20 +291,20 @@ VarHandle *ParallelExecutor::GetVarHandle(const std::string &each_var_name,
void ParallelExecutor::BCastParamsToGPUs(
const ProgramDesc &startup_program) const {
#ifdef PADDLE_WITH_CUDA
auto *main_scope = member_->local_scopes_[member_->main_place_];
for (auto *var_desc : startup_program.Block(0).AllVars()) {
if (var_desc->GetType() == proto::VarType::LOD_TENSOR) {
auto &main_tensor =
main_scope->FindVar(var_desc->Name())->Get<LoDTensor>();
ncclDataType_t data_type = ToNCCLDataType(main_tensor.type());
auto &dims = main_tensor.dims();
size_t numel = main_tensor.numel();
std::vector<std::pair<void *, const platform::DeviceContext *>> mems;
mems.emplace_back(
const_cast<void *>(main_tensor.data<void>()),
new platform::CUDADeviceContext(
boost::get<platform::CUDAPlace>(member_->main_place_)));
std::vector<std::pair<void *, ParallelExecutorPrivate::NCCLContext *>>
mems;
mems.emplace_back(const_cast<void *>(main_tensor.data<void>()),
&member_->GetNCCLCtx(member_->main_place_));
for (auto &pair : member_->local_scopes_) {
if (pair.first == member_->main_place_) {
......@@ -265,8 +315,7 @@ void ParallelExecutor::BCastParamsToGPUs(
auto *t = local_scope->Var(var_desc->Name())->GetMutable<LoDTensor>();
t->Resize(dims);
mems.emplace_back(t->mutable_data(pair.first, main_tensor.type()),
new platform::CUDADeviceContext(
boost::get<platform::CUDAPlace>(pair.first)));
&member_->GetNCCLCtx(member_->main_place_));
}
// TODO(yy): Invoke ncclBCast here. mems, numel, data_type. The mems[0]
......@@ -274,17 +323,26 @@ void ParallelExecutor::BCastParamsToGPUs(
(void)(data_type);
(void)(numel);
}
}
#else
PADDLE_THROW("Not compiled with CUDA");
#endif
}
// Free Communication Ctx
for (auto &pair : mems) {
// Release Communication Ctx
void ParallelExecutor::BuildNCCLCommunicator() const {
#ifdef PADDLE_WITH_CUDA
for (auto &place_pair : member_->local_scopes_) {
auto place = place_pair.first;
int dev_id = boost::get<platform::CUDAPlace>(place).device;
// FIXME: Store CUDA DevCtx to member. Since NCCL All Reduce will use
// this
delete pair.second;
}
}
member_->communication_streams_.emplace(
dev_id, ParallelExecutorPrivate::NCCLContext(dev_id));
}
ParallelExecutorPrivate::NCCLContext::InitNCCLContext(
member_->communication_streams_);
#endif
}
std::vector<LoDTensor> ParallelExecutor::Run(
......
......@@ -55,6 +55,8 @@ class ParallelExecutor {
void ConstructDependencyGraph(const std::unordered_set<std::string>& params,
const ProgramDesc& main_program,
const std::string& loss_var_name) const;
void BuildNCCLCommunicator() const;
};
} // namespace framework
......
......@@ -35,7 +35,7 @@ class ParallelExecutor(unittest.TestCase):
adam = fluid.optimizer.Adam()
adam.minimize(loss)
act_places = []
for each in [fluid.CUDAPlace(0), fluid.CUDAPlace(1)]:
for each in [fluid.CUDAPlace(0)]:
p = fluid.core.Place()
p.set_place(each)
act_places.append(p)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册