diff --git a/paddle/fluid/operators/distributed/parameter_prefetch.cc b/paddle/fluid/operators/distributed/parameter_prefetch.cc index 9db53451535efe3e7706ddc5aab8e366dc2f6e8e..36f4f0eefddc5eacf12d2e64d154370d61caf88b 100644 --- a/paddle/fluid/operators/distributed/parameter_prefetch.cc +++ b/paddle/fluid/operators/distributed/parameter_prefetch.cc @@ -59,14 +59,13 @@ static std::vector ToAbsoluteSection( } static std::vector> SplitIds( - const std::string& id_name, const std::vector& height_section, - framework::Scope* scope) { - auto& id_tensor = scope->FindVar(id_name)->Get(); - auto* id_data = id_tensor.data(); + const std::vector& ids_vector, + const std::vector& height_section, framework::Scope* scope) { std::set all_ids; - for (size_t i = 0; i < id_tensor.numel(); ++i) { - all_ids.insert(id_data[i]); + for (auto id : ids_vector) { + all_ids.insert(id); } + auto abs_sections = ToAbsoluteSection(height_section); std::vector> splited_ids; splited_ids.resize(height_section.size() + 1); @@ -78,7 +77,7 @@ static std::vector> SplitIds( } static void SplitIdsIntoMultipleVarsBySection( - const std::string& id_name, const std::vector& in_var_names, + const std::vector& in_var_names, const std::vector& height_section, const std::vector>& splited_ids, framework::Scope* scope) { @@ -99,8 +98,8 @@ static void SplitIdsIntoMultipleVarsBySection( } static void MergeMultipleVarsIntoOneBySection( - const std::string& id_name, const std::string& out_name, - const std::vector& out_var_names, + const std::string& id_name, const std::vector& ids_vector, + const std::string& out_name, const std::vector& out_var_names, const std::vector& height_section, const std::vector>& splited_ids, const framework::ExecutionContext& context, framework::Scope* scope) { @@ -109,16 +108,20 @@ static void MergeMultipleVarsIntoOneBySection( auto cpu_place = platform::CPUPlace(); auto abs_sections = ToAbsoluteSection(height_section); - auto& id_tensor = scope->FindVar(id_name)->Get(); - auto* id_data = id_tensor.data(); std::unordered_map> id_to_offset; - for (size_t i = 0; i < id_tensor.numel(); ++i) { - id_to_offset[id_data[i]].push_back(i); + for (size_t i = 0; i < ids_vector.size(); ++i) { + id_to_offset[ids_vector[i]].push_back(i); } + auto& id_tensor = scope->FindVar(id_name)->Get(); auto* out_tensor = scope->FindVar(out_name)->GetMutable(); - auto* out_tensor_data = out_tensor->mutable_data(context.GetPlace()); + auto* out_tensor_data = out_tensor->mutable_data(id_tensor.place()); + + bool is_on_cpu_place = true; + if (!platform::is_cpu_place(id_tensor.place())) { + is_on_cpu_place = false; + } for (size_t section_idx = 0; section_idx < out_var_names.size(); ++section_idx) { @@ -140,9 +143,20 @@ static void MergeMultipleVarsIntoOneBySection( auto& offsets = id_to_offset[origin_id]; for (auto& offset : offsets) { // should support GPU tensor - memory::Copy(cpu_place, out_tensor_data + offset * row_numel, - cpu_place, out_var_data + i * row_numel, - sizeof(float) * row_numel); + if (is_on_cpu_place) { + memory::Copy(cpu_place, out_tensor_data + offset * row_numel, + cpu_place, out_var_data + i * row_numel, + sizeof(float) * row_numel); + } else { +#ifndef PADDLE_WITH_CUDA + PADDLE_THROW("paddle is not compiled with CUDA!"); +#else + memory::Copy(boost::get(id_tensor.place()), + out_tensor_data + offset * row_numel, cpu_place, + out_var_data + i * row_numel, + sizeof(float) * row_numel); +#endif + } } } } else { @@ -159,7 +173,7 @@ void prefetch(const std::string& id_name, const std::string& out_name, auto& local_scope = context.scope().NewScope(); platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); - auto& ctx = *pool.Get(context.GetPlace()); + auto& cpu_ctx = *pool.Get(platform::CPUPlace()); distributed::RPCClient* rpc_client = distributed::RPCClient::GetInstance( @@ -172,9 +186,34 @@ void prefetch(const std::string& id_name, const std::string& out_name, out_var_names.push_back(out_name + "@" + epmap[i]); } - auto splited_ids = SplitIds(id_name, height_sections, &local_scope); - SplitIdsIntoMultipleVarsBySection(id_name, in_var_names, height_sections, - splited_ids, &local_scope); + auto& id_tensor = local_scope.FindVar(id_name)->Get(); + std::vector ids_vector; + if (platform::is_cpu_place(id_tensor.place())) { + auto* id_data = id_tensor.data(); + for (size_t i = 0; i < id_tensor.numel(); ++i) { + ids_vector.push_back(id_data[i]); + } + } else { +#ifndef PADDLE_WITH_CUDA + PADDLE_THROW("paddle is not compiled with CUDA!"); +#else + auto cpu_place = platform::CPUPlace(); + framework::Tensor cpu_tensor; + auto* cpu_tensor_data = + cpu_tensor.mutable_data(id_tensor.dims(), cpu_place); + memory::Copy(cpu_place, cpu_tensor_data, + boost::get(id_tensor.place()), + id_tensor.data(), + sizeof(int64_t) * id_tensor.numel()); + for (size_t i = 0; i < id_tensor.numel(); ++i) { + ids_vector.push_back(cpu_tensor_data[i]); + } +#endif + } + + auto splited_ids = SplitIds(ids_vector, height_sections, &local_scope); + SplitIdsIntoMultipleVarsBySection(in_var_names, height_sections, splited_ids, + &local_scope); // create output var in local scope for (auto& name : out_var_names) { @@ -187,7 +226,7 @@ void prefetch(const std::string& id_name, const std::string& out_name, VLOG(3) << "sending " << in_var_names[i] << " to " << epmap[i] << " to get " << out_var_names[i] << " back"; rets.push_back(rpc_client->AsyncPrefetchVar( - epmap[i], ctx, local_scope, in_var_names[i], out_var_names[i], + epmap[i], cpu_ctx, local_scope, in_var_names[i], out_var_names[i], table_names[i])); } else { VLOG(3) << "don't send no-initialied variable: " << out_var_names[i]; @@ -198,9 +237,9 @@ void prefetch(const std::string& id_name, const std::string& out_name, PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); } - MergeMultipleVarsIntoOneBySection(id_name, out_name, out_var_names, - height_sections, splited_ids, context, - &local_scope); + MergeMultipleVarsIntoOneBySection(id_name, ids_vector, out_name, + out_var_names, height_sections, splited_ids, + context, &local_scope); context.scope().DeleteScope(&local_scope); } diff --git a/python/paddle/fluid/tests/unittests/test_lookup_remote_table_op.py b/python/paddle/fluid/tests/unittests/test_lookup_remote_table_op.py index 01e9eaf3c8f5e59b43b8603ac05be6190fbf9f57..b46e61d2efd101cf6ed13ec8f638f941190f6a85 100644 --- a/python/paddle/fluid/tests/unittests/test_lookup_remote_table_op.py +++ b/python/paddle/fluid/tests/unittests/test_lookup_remote_table_op.py @@ -184,8 +184,8 @@ class TestListenAndServOp(unittest.TestCase): port1 = self._get_pserver_port(p1.pid) places = [core.CPUPlace()] - # if core.is_compiled_with_cuda(): - # places.append(core.CUDAPlace(0)) + if core.is_compiled_with_cuda(): + places.append(core.CUDAPlace(0)) for place in places: self._run_lookup_table_op_one_pserver(place, port0)