diff --git a/paddle/fluid/framework/details/multi_devices_graph_pass.cc b/paddle/fluid/framework/details/multi_devices_graph_pass.cc index 8c98b781301e884d5d5c7d141f3d901d74d51285..359064cbf27ee020642d5b5fdf8eb3d0f81ddcdc 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_pass.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_pass.cc @@ -862,7 +862,7 @@ int MultiDevSSAGraphBuilder::CreateRPCOp( if (node->Op()->Type() == "fetch_barrier") { outvar_dev_id = GetVarDeviceID(*result, output->Name(), *sharded_var_device); - PADDLE_ENFORCE_NE(outvar_dev_id, -1); + PADDLE_ENFORCE_NE(outvar_dev_id, -1, "output name %s", output->Name()); } p = places_[outvar_dev_id]; ir::Node *new_node = nullptr; diff --git a/paddle/fluid/operators/distributed/parameter_prefetch.cc b/paddle/fluid/operators/distributed/parameter_prefetch.cc index 23beabc6e16352d5b2b8fac9ea86af031f29f4af..f409b13f01bb209c4ea2a67c8d121288ab238b4d 100644 --- a/paddle/fluid/operators/distributed/parameter_prefetch.cc +++ b/paddle/fluid/operators/distributed/parameter_prefetch.cc @@ -100,7 +100,7 @@ inline void SplitIdsIntoMultipleVarsBySection( } } -inline void MergeMultipleVarsIntoOnBySection( +inline void MergeMultipleVarsIntoOneBySection( const std::string& id_name, const std::string& out_name, const std::vector& out_var_names, const std::vector& height_section, @@ -125,25 +125,30 @@ inline void MergeMultipleVarsIntoOnBySection( for (size_t section_idx = 0; section_idx < out_var_names.size(); ++section_idx) { auto& ids_in_this_section = splited_ids[section_idx]; - auto& prefetch_out_var = - scope->Var(out_var_names[section_idx])->Get(); - const auto* out_var_data = prefetch_out_var.data(); - auto& dims = prefetch_out_var.dims(); - - PADDLE_ENFORCE_EQ(dims.size(), 2, ""); - PADDLE_ENFORCE_EQ(ids_in_this_section.size(), dims[0]); - - auto row_numel = dims[1]; - - for (size_t i = 0; i < dims[0]; ++i) { - auto id = ids_in_this_section[i]; - auto origin_id = id + abs_sections[section_idx]; - auto& offsets = id_to_offset[origin_id]; - for (auto& offset : offsets) { - // should support GPU tensor - memory::Copy(cpu_place, out_tensor_data + offset * row_numel, cpu_place, - out_var_data + i * row_numel, sizeof(float) * row_numel); + if (!ids_in_this_section.empty()) { + auto& prefetch_out_var = + scope->Var(out_var_names[section_idx])->Get(); + const auto* out_var_data = prefetch_out_var.data(); + auto& dims = prefetch_out_var.dims(); + + PADDLE_ENFORCE_EQ(dims.size(), 2, ""); + PADDLE_ENFORCE_EQ(ids_in_this_section.size(), dims[0]); + + auto row_numel = dims[1]; + + for (size_t i = 0; i < dims[0]; ++i) { + auto id = ids_in_this_section[i]; + auto origin_id = id + abs_sections[section_idx]; + auto& offsets = id_to_offset[origin_id]; + for (auto& offset : offsets) { + // should support GPU tensor + memory::Copy(cpu_place, out_tensor_data + offset * row_numel, + cpu_place, out_var_data + i * row_numel, + sizeof(float) * row_numel); + } } + } else { + VLOG(30) << "ids in this section is empty"; } } } @@ -190,13 +195,14 @@ void prefetch(const std::string& id_name, const std::string& out_name, VLOG(30) << "don't send no-initialied variable: " << out_var_names[i]; } } + for (size_t i = 0; i < rets.size(); i++) { PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); } - MergeMultipleVarsIntoOnBySection(id_name, out_name, out_var_names, - height_sections, splited_ids, context, - &local_scope); + MergeMultipleVarsIntoOneBySection(id_name, out_name, out_var_names, + height_sections, splited_ids, context, + &local_scope); context.scope().DeleteScope(&local_scope); } diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index ddf7468cddb863fbf70efe2ea089917b415529d3..59f89e331dac5a10a68cdfd7fc82564ff72d953e 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -444,7 +444,7 @@ class DistributeTranspiler(object): # connect deps to send op in async mode recv_dep_in = self.grad_name_to_send_dummy_out[ self.param_name_to_grad_name[param_varname]] - all_recv_outputs.extend(splited_var) + # get recv op_role_var, if not splited, the grad should have .trainer suffix # if splited, grad should be the original grad var name. ParallelExecutor # will use op_role_var to get expected device place to run this op. @@ -460,6 +460,7 @@ class DistributeTranspiler(object): self._update_remote_sparse_update_op(param_varname, height_sections, eps) else: + all_recv_outputs.extend(splited_var) program.global_block().append_op( type="recv", inputs={"X": [recv_dep_in]},