diff --git a/paddle/fluid/operators/distributed/parameter_recv.cc b/paddle/fluid/operators/distributed/parameter_recv.cc index e5b486d1218e4989de3e7f4030a1150e857f9a5d..2664a89ed6d7345a8c7c014f789a9e817d75ad28 100644 --- a/paddle/fluid/operators/distributed/parameter_recv.cc +++ b/paddle/fluid/operators/distributed/parameter_recv.cc @@ -27,6 +27,7 @@ #include "paddle/fluid/operators/distributed/rpc_client.h" #include "paddle/fluid/operators/distributed/variable_response.h" #include "paddle/fluid/operators/distributed_ops/send_recv_util.h" +#include "paddle/fluid/operators/strided_memcpy.h" namespace paddle { namespace operators { @@ -39,11 +40,10 @@ using DDim = framework::DDim; template void ParameterRecv::operator()(const std::string &var_name, - const std::vector &send_varnames, + const std::vector &recv_varnames, const std::vector &epmap, - const std::vector &height_sections, const framework::ExecutionContext &ctx, - const framework::Scope &scope, bool sync) { + const framework::Scope &scope) { framework::Scope *local_scope = scope.NewTmpScope(); platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); @@ -53,118 +53,41 @@ void ParameterRecv::operator()(const std::string &var_name, distributed::RPCClient::GetInstance( ctx.Attr("trainer_id")); - auto *send_var = scope.FindVar(var_name); - size_t out_num = send_varnames.size(); - if (send_var->IsType()) { - if (out_num > 1) { - auto &send_tensor = send_var->Get(); - auto &send_tensor_dims = send_tensor.dims(); - std::vector outs_dims; - outs_dims.reserve(out_num); - - // infer output shape - PADDLE_ENFORCE_EQ(height_sections.size(), out_num, - "tensor split sections size" - "should be equal to output size."); - for (size_t i = 0; i < out_num; ++i) { - auto dim = send_tensor_dims; - dim[0] = height_sections[i]; - outs_dims.push_back(dim); - } - - // create output var in local scope - size_t row_offset = 0; - for (auto i = 0; i < out_num; ++i) { - framework::Tensor *out = local_scope->Var(send_varnames[i]) - ->GetMutable(); - *out = send_tensor.Slice(row_offset, row_offset + outs_dims[i][0]); - row_offset += outs_dims[i][0]; - } + auto *recv_var = scope.FindVar(var_name); + + std::vector recved_tensors; + + // recv all vars to local scope + if (recv_var->IsType()) { + std::vector rets; + for (size_t i = 0; i < recv_varnames.size(); i++) { + auto &recv_var_name = recv_varnames[i]; + framework::Tensor *t = + local_scope->Var(recv_var_name)->GetMutable(); + recved_tensors.push_back(t); + VLOG(3) << "recv " << recv_var_name << " from " << epmap[i]; + rets.push_back(rpc_client->AsyncGetVar(epmap[i], cpu_ctx, *local_scope, + recv_var_name, recv_var_name)); } - } else if (send_var->IsType()) { - auto &send_slr = send_var->Get(); - auto abs_sections = ToAbsoluteSection(height_sections); - - auto send_rows = send_slr.rows(); - std::vector> outs_rows_idx; - std::vector> outs_dense_idx; - - outs_rows_idx.resize(out_num); - outs_dense_idx.resize(out_num); - - auto row_numel = send_slr.value().numel() / send_slr.value().dims()[0]; - auto src = send_slr.value().data(); - - // create output var in local scope - std::vector outs; - for (auto &name : send_varnames) { - auto *out = local_scope->Var(name)->GetMutable(); - outs.push_back(out); - } - - // split rows index into output sparse vars - for (size_t i = 0; i < send_rows.size(); ++i) { - int out_idx = FindOutIdx(send_rows[i], abs_sections); - outs_rows_idx[out_idx].push_back(send_rows[i]); - outs_dense_idx[out_idx].push_back(i); - } - auto place = ctx.GetPlace(); - - for (size_t i = 0; i < outs_rows_idx.size(); ++i) { - auto rows_idx = outs_rows_idx[i]; - outs[i]->set_height(height_sections[i]); - auto dims = send_slr.GetCompleteDims(); - dims[0] = rows_idx.size(); - outs[i]->mutable_value()->mutable_data(dims, send_slr.place()); - outs[i]->mutable_rows()->clear(); - if (rows_idx.size() > 0) { - for (auto idx : rows_idx) { - outs[i]->mutable_rows()->push_back(idx - abs_sections[i]); - } - auto dst = outs[i]->mutable_value()->mutable_data(ctx.GetPlace()); - for (size_t j = 0; j < rows_idx.size(); j++) { - if (platform::is_cpu_place(place)) { - memory::Copy( - platform::CPUPlace(), dst + j * row_numel, platform::CPUPlace(), - src + outs_dense_idx[i][j] * row_numel, sizeof(T) * row_numel); - } else { -#ifdef PADDLE_WITH_CUDA - auto stream = ctx.cuda_device_context().stream(); - memory::Copy(platform::CUDAPlace(), dst + j * row_numel, - platform::CUDAPlace(), - src + outs_dense_idx[i][j] * row_numel, - sizeof(T) * row_numel, stream); -#else - PADDLE_THROW("Paddle is not compiled with GPU"); -#endif - } - } - } - PADDLE_ENFORCE_EQ(rows_idx.size(), outs[i]->rows().size(), - "rows should has the same size with tensor dim 0"); + for (size_t i = 0; i < rets.size(); i++) { + PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); } - } else { PADDLE_THROW("unsupported var type to send!"); } - std::vector rets; - for (size_t i = 0; i < send_varnames.size(); i++) { - auto &send_var_name = send_varnames[i]; - auto &endpoint = epmap[i]; - if (NeedSend(*local_scope, send_var_name)) { - VLOG(3) << "sending " << send_var_name << " to " << endpoint; - rets.push_back(rpc_client->AsyncSendVar(endpoint, cpu_ctx, *local_scope, - send_var_name)); - } else { - VLOG(3) << "don't send non-initialized variable: " << send_varnames[i]; - } - } - - // note!! only support sync send now - if (true || sync) { - for (size_t i = 0; i < rets.size(); i++) { - PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); + // concat recved tensor into one var + { + size_t output_offset = 0; + framework::Tensor *recv_tensor = + recv_var->GetMutable(); + for (auto *in : recved_tensors) { + auto in_stride = framework::stride_numel(in->dims()); + auto out_stride = framework::stride_numel(recv_tensor->dims()); + StridedNumelCopyWithAxis( + ctx.device_context(), 0, recv_tensor->data() + output_offset, + out_stride, in->data(), in_stride, in_stride[0]); + output_offset += in_stride[0]; } } diff --git a/paddle/fluid/operators/distributed/parameter_recv.h b/paddle/fluid/operators/distributed/parameter_recv.h index 817115e2d1e0adeaf40623a3eb83e900899ec4ea..bc6f5f5adf26a399b9adc001fb767c20fbd95f5e 100644 --- a/paddle/fluid/operators/distributed/parameter_recv.h +++ b/paddle/fluid/operators/distributed/parameter_recv.h @@ -26,11 +26,10 @@ namespace distributed { template struct ParameterRecv { void operator()(const std::string &var_name, - const std::vector &send_varnames, + const std::vector &recv_varnames, const std::vector &epmap, - const std::vector &height_sections, const framework::ExecutionContext &context, - const framework::Scope &scope, bool sync); + const framework::Scope &scope); }; }; // namespace distributed