提交 3a3cfc2d 编写于 作者: Q Qiao Longfei

prefetch support gpu

test=develop
上级 4b9082a4
...@@ -59,14 +59,13 @@ static std::vector<int64_t> ToAbsoluteSection( ...@@ -59,14 +59,13 @@ static std::vector<int64_t> ToAbsoluteSection(
} }
static std::vector<std::vector<int64_t>> SplitIds( static std::vector<std::vector<int64_t>> SplitIds(
const std::string& id_name, const std::vector<int>& height_section, const std::vector<int64_t>& ids_vector,
framework::Scope* scope) { const std::vector<int>& height_section, framework::Scope* scope) {
auto& id_tensor = scope->FindVar(id_name)->Get<framework::LoDTensor>();
auto* id_data = id_tensor.data<int64_t>();
std::set<int64_t> all_ids; std::set<int64_t> all_ids;
for (size_t i = 0; i < id_tensor.numel(); ++i) { for (auto id : ids_vector) {
all_ids.insert(id_data[i]); all_ids.insert(id);
} }
auto abs_sections = ToAbsoluteSection(height_section); auto abs_sections = ToAbsoluteSection(height_section);
std::vector<std::vector<int64_t>> splited_ids; std::vector<std::vector<int64_t>> splited_ids;
splited_ids.resize(height_section.size() + 1); splited_ids.resize(height_section.size() + 1);
...@@ -78,7 +77,7 @@ static std::vector<std::vector<int64_t>> SplitIds( ...@@ -78,7 +77,7 @@ static std::vector<std::vector<int64_t>> SplitIds(
} }
static void SplitIdsIntoMultipleVarsBySection( static void SplitIdsIntoMultipleVarsBySection(
const std::string& id_name, const std::vector<std::string>& in_var_names, const std::vector<std::string>& in_var_names,
const std::vector<int>& height_section, const std::vector<int>& height_section,
const std::vector<std::vector<int64_t>>& splited_ids, const std::vector<std::vector<int64_t>>& splited_ids,
framework::Scope* scope) { framework::Scope* scope) {
...@@ -99,8 +98,8 @@ static void SplitIdsIntoMultipleVarsBySection( ...@@ -99,8 +98,8 @@ static void SplitIdsIntoMultipleVarsBySection(
} }
static void MergeMultipleVarsIntoOneBySection( static void MergeMultipleVarsIntoOneBySection(
const std::string& id_name, const std::string& out_name, const std::string& id_name, const std::vector<int64_t>& ids_vector,
const std::vector<std::string>& out_var_names, const std::string& out_name, const std::vector<std::string>& out_var_names,
const std::vector<int>& height_section, const std::vector<int>& height_section,
const std::vector<std::vector<int64_t>>& splited_ids, const std::vector<std::vector<int64_t>>& splited_ids,
const framework::ExecutionContext& context, framework::Scope* scope) { const framework::ExecutionContext& context, framework::Scope* scope) {
...@@ -109,16 +108,20 @@ static void MergeMultipleVarsIntoOneBySection( ...@@ -109,16 +108,20 @@ static void MergeMultipleVarsIntoOneBySection(
auto cpu_place = platform::CPUPlace(); auto cpu_place = platform::CPUPlace();
auto abs_sections = ToAbsoluteSection(height_section); auto abs_sections = ToAbsoluteSection(height_section);
auto& id_tensor = scope->FindVar(id_name)->Get<framework::LoDTensor>();
auto* id_data = id_tensor.data<int64_t>();
std::unordered_map<int64_t, std::vector<size_t>> id_to_offset; std::unordered_map<int64_t, std::vector<size_t>> id_to_offset;
for (size_t i = 0; i < id_tensor.numel(); ++i) { for (size_t i = 0; i < ids_vector.size(); ++i) {
id_to_offset[id_data[i]].push_back(i); id_to_offset[ids_vector[i]].push_back(i);
} }
auto& id_tensor = scope->FindVar(id_name)->Get<framework::LoDTensor>();
auto* out_tensor = auto* out_tensor =
scope->FindVar(out_name)->GetMutable<framework::LoDTensor>(); scope->FindVar(out_name)->GetMutable<framework::LoDTensor>();
auto* out_tensor_data = out_tensor->mutable_data<float>(context.GetPlace()); auto* out_tensor_data = out_tensor->mutable_data<float>(id_tensor.place());
bool is_on_cpu_place = true;
if (!platform::is_cpu_place(id_tensor.place())) {
is_on_cpu_place = false;
}
for (size_t section_idx = 0; section_idx < out_var_names.size(); for (size_t section_idx = 0; section_idx < out_var_names.size();
++section_idx) { ++section_idx) {
...@@ -140,9 +143,20 @@ static void MergeMultipleVarsIntoOneBySection( ...@@ -140,9 +143,20 @@ static void MergeMultipleVarsIntoOneBySection(
auto& offsets = id_to_offset[origin_id]; auto& offsets = id_to_offset[origin_id];
for (auto& offset : offsets) { for (auto& offset : offsets) {
// should support GPU tensor // should support GPU tensor
memory::Copy(cpu_place, out_tensor_data + offset * row_numel, if (is_on_cpu_place) {
cpu_place, out_var_data + i * row_numel, memory::Copy(cpu_place, out_tensor_data + offset * row_numel,
sizeof(float) * row_numel); cpu_place, out_var_data + i * row_numel,
sizeof(float) * row_numel);
} else {
#ifndef PADDLE_WITH_CUDA
PADDLE_THROW("paddle is not compiled with CUDA!");
#else
memory::Copy(boost::get<platform::CUDAPlace>(id_tensor.place()),
out_tensor_data + offset * row_numel, cpu_place,
out_var_data + i * row_numel,
sizeof(float) * row_numel);
#endif
}
} }
} }
} else { } else {
...@@ -159,7 +173,7 @@ void prefetch(const std::string& id_name, const std::string& out_name, ...@@ -159,7 +173,7 @@ void prefetch(const std::string& id_name, const std::string& out_name,
auto& local_scope = context.scope().NewScope(); auto& local_scope = context.scope().NewScope();
platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
auto& ctx = *pool.Get(context.GetPlace()); auto& cpu_ctx = *pool.Get(platform::CPUPlace());
distributed::RPCClient* rpc_client = distributed::RPCClient* rpc_client =
distributed::RPCClient::GetInstance<RPCCLIENT_T>( distributed::RPCClient::GetInstance<RPCCLIENT_T>(
...@@ -172,9 +186,34 @@ void prefetch(const std::string& id_name, const std::string& out_name, ...@@ -172,9 +186,34 @@ void prefetch(const std::string& id_name, const std::string& out_name,
out_var_names.push_back(out_name + "@" + epmap[i]); out_var_names.push_back(out_name + "@" + epmap[i]);
} }
auto splited_ids = SplitIds(id_name, height_sections, &local_scope); auto& id_tensor = local_scope.FindVar(id_name)->Get<framework::LoDTensor>();
SplitIdsIntoMultipleVarsBySection(id_name, in_var_names, height_sections, std::vector<int64_t> ids_vector;
splited_ids, &local_scope); if (platform::is_cpu_place(id_tensor.place())) {
auto* id_data = id_tensor.data<int64_t>();
for (size_t i = 0; i < id_tensor.numel(); ++i) {
ids_vector.push_back(id_data[i]);
}
} else {
#ifndef PADDLE_WITH_CUDA
PADDLE_THROW("paddle is not compiled with CUDA!");
#else
auto cpu_place = platform::CPUPlace();
framework::Tensor cpu_tensor;
auto* cpu_tensor_data =
cpu_tensor.mutable_data<int64_t>(id_tensor.dims(), cpu_place);
memory::Copy(cpu_place, cpu_tensor_data,
boost::get<platform::CUDAPlace>(id_tensor.place()),
id_tensor.data<int64_t>(),
sizeof(int64_t) * id_tensor.numel());
for (size_t i = 0; i < id_tensor.numel(); ++i) {
ids_vector.push_back(cpu_tensor_data[i]);
}
#endif
}
auto splited_ids = SplitIds(ids_vector, height_sections, &local_scope);
SplitIdsIntoMultipleVarsBySection(in_var_names, height_sections, splited_ids,
&local_scope);
// create output var in local scope // create output var in local scope
for (auto& name : out_var_names) { for (auto& name : out_var_names) {
...@@ -187,7 +226,7 @@ void prefetch(const std::string& id_name, const std::string& out_name, ...@@ -187,7 +226,7 @@ void prefetch(const std::string& id_name, const std::string& out_name,
VLOG(3) << "sending " << in_var_names[i] << " to " << epmap[i] VLOG(3) << "sending " << in_var_names[i] << " to " << epmap[i]
<< " to get " << out_var_names[i] << " back"; << " to get " << out_var_names[i] << " back";
rets.push_back(rpc_client->AsyncPrefetchVar( rets.push_back(rpc_client->AsyncPrefetchVar(
epmap[i], ctx, local_scope, in_var_names[i], out_var_names[i], epmap[i], cpu_ctx, local_scope, in_var_names[i], out_var_names[i],
table_names[i])); table_names[i]));
} else { } else {
VLOG(3) << "don't send no-initialied variable: " << out_var_names[i]; VLOG(3) << "don't send no-initialied variable: " << out_var_names[i];
...@@ -198,9 +237,9 @@ void prefetch(const std::string& id_name, const std::string& out_name, ...@@ -198,9 +237,9 @@ void prefetch(const std::string& id_name, const std::string& out_name,
PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient");
} }
MergeMultipleVarsIntoOneBySection(id_name, out_name, out_var_names, MergeMultipleVarsIntoOneBySection(id_name, ids_vector, out_name,
height_sections, splited_ids, context, out_var_names, height_sections, splited_ids,
&local_scope); context, &local_scope);
context.scope().DeleteScope(&local_scope); context.scope().DeleteScope(&local_scope);
} }
......
...@@ -184,8 +184,8 @@ class TestListenAndServOp(unittest.TestCase): ...@@ -184,8 +184,8 @@ class TestListenAndServOp(unittest.TestCase):
port1 = self._get_pserver_port(p1.pid) port1 = self._get_pserver_port(p1.pid)
places = [core.CPUPlace()] places = [core.CPUPlace()]
# if core.is_compiled_with_cuda(): if core.is_compiled_with_cuda():
# places.append(core.CUDAPlace(0)) places.append(core.CUDAPlace(0))
for place in places: for place in places:
self._run_lookup_table_op_one_pserver(place, port0) self._run_lookup_table_op_one_pserver(place, port0)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册