From 65c7368400319b4c5ea93c47ec761b1c29b7ca0f Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Wed, 28 Aug 2019 14:04:40 +0800 Subject: [PATCH] Fix the correctness of async mode at distributed training (#18863) * fix correctness of the communicator * fix a bug in send thread when sending var context is empty, test=develop * add lookup_table_prefetch_op and prefetch optimize, test=develop * remove remote prefetch GPU supported * word2vec force with CPU, test=develop * test dist remote lookup table force with CPU, test=develop --- .../multi_devices_graph_pass.h | 7 - paddle/fluid/framework/operator.h | 2 + .../operators/distributed/communicator.cc | 52 ++-- .../operators/distributed/communicator.h | 1 + .../distributed/parameter_prefetch.cc | 237 ++++++++++-------- .../distributed/parameter_prefetch.h | 63 ++--- .../distributed/request_handler_impl.cc | 37 +-- .../distributed_lookup_table_op.cc | 166 ++++++++++++ .../fluid/operators/hierarchical_sigmoid_op.h | 8 +- paddle/fluid/operators/lookup_table_op.cu | 61 ++--- paddle/fluid/operators/lookup_table_op.h | 11 +- paddle/fluid/operators/nce_op.h | 5 +- .../distribute_transpiler/__init__.py | 10 +- .../paddle/fluid/tests/test_communicator.py | 39 +++ .../fluid/tests/unittests/test_dist_ctr.py | 15 +- .../tests/unittests/test_dist_fleet_ctr.py | 12 + .../tests/unittests/test_dist_word2vec.py | 3 + .../unittests/test_lookup_remote_table_op.py | 2 - .../fluid/transpiler/distribute_transpiler.py | 68 +++-- 19 files changed, 509 insertions(+), 290 deletions(-) create mode 100644 paddle/fluid/operators/distributed_ops/distributed_lookup_table_op.cc diff --git a/paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.h b/paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.h index 9b36d231081..cd94c3063ac 100644 --- a/paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.h +++ b/paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.h @@ -133,13 +133,6 @@ class AsyncSSAGraphBuilder : public MultiDevSSAGraphBuilderBase { VLOG(1) << "set recv op do_not_run to true"; node->Op()->SetAttr("do_not_run", 1); node->Op()->Flush(); - } else if (node->Name() == "lookup_table" || node->Name() == "nce" || - node->Name() == "hierarchical_sigmoid") { - // in async_mode, we do not need remote prefetch, because communicator - // will do async parameter recv. - VLOG(1) << "set " << node->Name() << " op remote_prefetch to false"; - node->Op()->SetAttr("remote_prefetch", false); - node->Op()->Flush(); } return false; } diff --git a/paddle/fluid/framework/operator.h b/paddle/fluid/framework/operator.h index 07e7abd5b29..037d2e41b36 100644 --- a/paddle/fluid/framework/operator.h +++ b/paddle/fluid/framework/operator.h @@ -248,6 +248,8 @@ class ExecutionContext { return op_.Attr(name); } + bool HasAttr(const std::string& name) const { return op_.HasAttr(name); } + bool HasInput(const std::string& name) const; bool HasOutput(const std::string& name) const; diff --git a/paddle/fluid/operators/distributed/communicator.cc b/paddle/fluid/operators/distributed/communicator.cc index a7a761fa39a..eeab787cc31 100644 --- a/paddle/fluid/operators/distributed/communicator.cc +++ b/paddle/fluid/operators/distributed/communicator.cc @@ -76,14 +76,26 @@ Communicator::Communicator(const RpcCtxMap &send_varname_to_ctx, VLOG(0) << "communicator_fake_rpc: " << FLAGS_communicator_fake_rpc; VLOG(0) << "communicator_merge_sparse_grad: " << FLAGS_communicator_merge_sparse_grad; - send_scope_.reset(new Scope()); - for (auto &iter : send_varname_to_ctx_) { - send_varname_to_queue_[iter.first] = - std::make_shared>>( - FLAGS_communicator_send_queue_size); + + if (send_varname_to_ctx.size() == 0) { + VLOG(0) << "nothing need to be send, will not start send_thread"; + } else { + send_scope_.reset(new Scope()); + for (auto &iter : send_varname_to_ctx_) { + send_varname_to_queue_[iter.first] = + std::make_shared>>( + FLAGS_communicator_send_queue_size); + } + send_threadpool_.reset( + new ::ThreadPool(FLAGS_communicator_thread_pool_size)); + } + + if (recv_varname_to_ctx.size() == 0) { + VLOG(0) << "nothing need to be received, will not start recv_thread"; + } else { + recv_threadpool_.reset( + new ::ThreadPool(FLAGS_communicator_thread_pool_size)); } - send_threadpool_.reset(new ::ThreadPool(FLAGS_communicator_thread_pool_size)); - recv_threadpool_.reset(new ::ThreadPool(FLAGS_communicator_thread_pool_size)); } Communicator::~Communicator() { @@ -160,18 +172,28 @@ void Communicator::SendThread() { task_f.wait(); } auto after_run_send_graph = GetCurrentUS(); - auto send_graph_use_time = after_run_send_graph - before_run_send_graph; - if (send_graph_use_time > 100) { - VLOG(1) << "run send graph use time " - << after_run_send_graph - before_run_send_graph; - } - if (!FLAGS_communicator_independent_recv_thread) { - RecvAll(); - } + + VLOG(3) << "run send graph use time " + << after_run_send_graph - before_run_send_graph; + RecvNonIndependent(); } VLOG(0) << "communicator stopped, send thread exit"; } +void Communicator::RecvNonIndependent() { + if (!FLAGS_communicator_independent_recv_thread) { + return; + } + + auto grad_num = grad_num_.load(); + if (grad_num > 0) { + RecvAll(); + grad_num_.store(0); + } else { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } +} + void Communicator::RecvAll() { VLOG(3) << "parallel run recv graph"; if (!running_) return; diff --git a/paddle/fluid/operators/distributed/communicator.h b/paddle/fluid/operators/distributed/communicator.h index 6db02fc8402..b79d6f7020c 100644 --- a/paddle/fluid/operators/distributed/communicator.h +++ b/paddle/fluid/operators/distributed/communicator.h @@ -175,6 +175,7 @@ class Communicator { private: // recv all parameter void RecvAll(); + void RecvNonIndependent(); void SendThread(); void RecvThread(); diff --git a/paddle/fluid/operators/distributed/parameter_prefetch.cc b/paddle/fluid/operators/distributed/parameter_prefetch.cc index 0e8d877e08c..de2c37d8056 100644 --- a/paddle/fluid/operators/distributed/parameter_prefetch.cc +++ b/paddle/fluid/operators/distributed/parameter_prefetch.cc @@ -12,10 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include #include +#include #include #include "paddle/fluid/operators/distributed/parameter_prefetch.h" @@ -78,45 +80,64 @@ static void SplitIdsIntoMultipleVarsBySection( } } -static void MergeMultipleVarsIntoOneBySection( - const std::string& id_name, const std::vector& ids_vector, - const std::string& out_name, const std::vector& out_var_names, - const std::vector& height_section, - const std::vector>& splited_ids, - const framework::ExecutionContext& context, framework::Scope* scope, - platform::DeviceContext* actual_ctx) { - PADDLE_ENFORCE_EQ(out_var_names.size(), height_section.size(), ""); +typedef std::vector> TableAndEndpoints; - auto cpu_place = platform::CPUPlace(); +void prefetch_core( + const std::vector& ids, const TableAndEndpoints& tables, + const std::vector& height_sections, + const framework::ExecutionContext& context, const framework::Scope& scope, + std::unordered_map>* recved_vec_map) { + platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); + auto& actual_ctx = *pool.Get(context.GetPlace()); - auto abs_sections = ToAbsoluteSection(height_section); - std::unordered_map> id_to_offset; - for (size_t i = 0; i < ids_vector.size(); ++i) { - id_to_offset[ids_vector[i]].push_back(i); + std::unique_ptr local_scope = scope.NewTmpScope(); + + std::vector in_var_names; + std::vector out_var_names; + for (size_t i = 0; i < tables.size(); ++i) { + in_var_names.push_back("prefetch_send@" + tables[i].second); + out_var_names.push_back("prefetch_recv@" + tables[i].second); } - auto& id_tensor = scope->FindVar(id_name)->Get(); - auto* out_tensor = - scope->FindVar(out_name)->GetMutable(); + auto splited_ids = SplitIds(ids, height_sections); + SplitIdsIntoMultipleVarsBySection(in_var_names, height_sections, splited_ids, + local_scope.get()); + + // create output var in local scope + for (auto& name : out_var_names) { + local_scope->Var(name)->GetMutable(); + } - PADDLE_ENFORCE_GT( - out_tensor->numel(), 0, - "When calling this method, the LoDTensor's numel must larger than zero. " - "Please check LoDTensor::Resize has been called first."); + distributed::RPCClient* rpc_client = + distributed::RPCClient::GetInstance( + context.Attr("trainer_id")); - auto* out_tensor_data = out_tensor->mutable_data(id_tensor.place()); + std::vector rets; + for (size_t i = 0; i < in_var_names.size(); i++) { + if (NeedSend(*local_scope.get(), in_var_names[i])) { + VLOG(3) << "sending " << in_var_names[i] << " to " << tables[i].second + << " to get " << out_var_names[i] << " back"; + rets.push_back(rpc_client->AsyncPrefetchVar( + tables[i].second, actual_ctx, *local_scope.get(), in_var_names[i], + out_var_names[i], tables[i].first)); + } else { + VLOG(3) << "don't send no-initialied variable: " << out_var_names[i]; + } + } - bool is_on_cpu_place = true; - if (!platform::is_cpu_place(id_tensor.place())) { - is_on_cpu_place = false; + for (size_t i = 0; i < rets.size(); i++) { + PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); } + PADDLE_ENFORCE_EQ(out_var_names.size(), height_sections.size(), ""); + + auto abs_sections = ToAbsoluteSection(height_sections); for (size_t section_idx = 0; section_idx < out_var_names.size(); ++section_idx) { auto& ids_in_this_section = splited_ids[section_idx]; if (!ids_in_this_section.empty()) { - auto& prefetch_out_var = - scope->Var(out_var_names[section_idx])->Get(); + auto& prefetch_out_var = local_scope->Var(out_var_names[section_idx]) + ->Get(); const auto* out_var_data = prefetch_out_var.data(); auto& dims = prefetch_out_var.dims(); @@ -128,26 +149,9 @@ static void MergeMultipleVarsIntoOneBySection( for (int64_t i = 0; i < dims[0]; ++i) { auto id = ids_in_this_section[i]; auto origin_id = id + abs_sections[section_idx]; - auto& offsets = id_to_offset[origin_id]; - for (auto& offset : offsets) { - // should support GPU tensor - if (is_on_cpu_place) { - memory::Copy(cpu_place, out_tensor_data + offset * row_numel, - cpu_place, out_var_data + i * row_numel, - sizeof(float) * row_numel); - } else { -#ifndef PADDLE_WITH_CUDA - PADDLE_THROW("paddle is not compiled with CUDA!"); -#else - auto stream = - static_cast(actual_ctx)->stream(); - memory::Copy(boost::get(id_tensor.place()), - out_tensor_data + offset * row_numel, cpu_place, - out_var_data + i * row_numel, - sizeof(float) * row_numel, stream); -#endif - } - } + std::vector vecs(row_numel); + std::copy_n(out_var_data + i * row_numel, row_numel, vecs.begin()); + (*recved_vec_map)[origin_id] = vecs; } } else { VLOG(3) << "ids in this section is empty"; @@ -156,84 +160,107 @@ static void MergeMultipleVarsIntoOneBySection( } void prefetch(const std::string& id_name, const std::string& out_name, + const std::string& persistable_var_name, const bool backfill, const std::vector& table_names, - const std::vector& epmap, + const std::vector& endpoints, const std::vector& height_sections, const framework::ExecutionContext& context, const framework::Scope& scope) { - std::unique_ptr local_scope = scope.NewTmpScope(); - - platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); - auto& cpu_ctx = *pool.Get(platform::CPUPlace()); - auto& actual_ctx = *pool.Get(context.GetPlace()); - - distributed::RPCClient* rpc_client = - distributed::RPCClient::GetInstance( - context.Attr("trainer_id")); + prefetchs({id_name}, {out_name}, persistable_var_name, backfill, table_names, + endpoints, height_sections, context, scope); +} - std::vector in_var_names; - std::vector out_var_names; - for (size_t i = 0; i < epmap.size(); ++i) { - in_var_names.push_back(id_name + "@" + epmap[i]); - out_var_names.push_back(out_name + "@" + epmap[i]); +void prefetchs(const std::vector& id_var_names, + const std::vector& out_var_names, + const std::string& persistable_var_name, const bool backfill, + const std::vector& table_names, + const std::vector& endpoints, + const std::vector& height_sections, + const framework::ExecutionContext& context, + const framework::Scope& scope) { + PADDLE_ENFORCE_GT(id_var_names.size(), 0, ""); + PADDLE_ENFORCE_EQ(id_var_names.size(), out_var_names.size(), ""); + PADDLE_ENFORCE_EQ(table_names.size(), endpoints.size(), ""); + PADDLE_ENFORCE_EQ(table_names.size(), height_sections.size(), ""); + + auto* reconstruct_var = + scope.FindVar(persistable_var_name)->GetMutable(); + const auto vec_dim_1 = reconstruct_var->dims()[1]; + + const auto place = + scope.FindVar(id_var_names[0])->Get().place(); + + if (!platform::is_cpu_place(place)) { + PADDLE_THROW("multi prefetch only support CPU currently"); } - auto& id_tensor = scope.FindVar(id_name)->Get(); - std::vector ids_vector; - if (platform::is_cpu_place(id_tensor.place())) { + std::vector> ids_group; + std::vector ids_union; + std::vector ids_lods; + TableAndEndpoints tables; + + for (auto& id_name : id_var_names) { + auto& id_tensor = scope.FindVar(id_name)->Get(); auto* id_data = id_tensor.data(); + std::vector ids; + for (int64_t i = 0; i < id_tensor.numel(); ++i) { - ids_vector.push_back(id_data[i]); - } - } else { -#ifndef PADDLE_WITH_CUDA - PADDLE_THROW("paddle is not compiled with CUDA!"); -#else - auto cpu_place = platform::CPUPlace(); - framework::LoDTensor cpu_tensor; - auto* cpu_tensor_data = - cpu_tensor.mutable_data(id_tensor.dims(), cpu_place); - auto stream = - static_cast(&actual_ctx)->stream(); - memory::Copy(cpu_place, cpu_tensor_data, - boost::get(id_tensor.place()), - id_tensor.data(), sizeof(int64_t) * id_tensor.numel(), - stream); - for (int64_t i = 0; i < cpu_tensor.numel(); ++i) { - ids_vector.push_back(cpu_tensor_data[i]); + ids.push_back(id_data[i]); + ids_union.push_back(id_data[i]); } -#endif + ids_group.push_back(ids); + ids_lods.push_back(id_tensor.lod()); } - auto splited_ids = SplitIds(ids_vector, height_sections); - SplitIdsIntoMultipleVarsBySection(in_var_names, height_sections, splited_ids, - local_scope.get()); + std::unordered_set s(ids_union.begin(), ids_union.end()); + ids_union.assign(s.begin(), s.end()); - // create output var in local scope - for (auto& name : out_var_names) { - local_scope->Var(name)->GetMutable(); + for (int i; i < table_names.size(); i++) { + tables.push_back(std::make_pair(table_names[i], endpoints[i])); } - std::vector rets; - for (size_t i = 0; i < in_var_names.size(); i++) { - if (NeedSend(*local_scope.get(), in_var_names[i])) { - VLOG(3) << "sending " << in_var_names[i] << " to " << epmap[i] - << " to get " << out_var_names[i] << " back"; - rets.push_back(rpc_client->AsyncPrefetchVar( - epmap[i], cpu_ctx, *local_scope.get(), in_var_names[i], - out_var_names[i], table_names[i])); - } else { - VLOG(3) << "don't send no-initialied variable: " << out_var_names[i]; - } + std::unordered_map> recved_vec_map; + prefetch_core(ids_union, tables, height_sections, context, scope, + &recved_vec_map); + + auto padding_idx = distributed::kNoPadding; + + if (context.HasAttr("padding_idx")) { + padding_idx = context.Attr("padding_idx"); } - for (size_t i = 0; i < rets.size(); i++) { - PADDLE_ENFORCE(rets[i]->Wait(), "internal error in RPCClient"); + // copy vectors to out vars + for (int i = 0; i < out_var_names.size(); i++) { + auto& ids = ids_group[i]; + auto* out_t = + scope.FindVar(out_var_names[i])->GetMutable(); + out_t->Resize( + framework::make_ddim({static_cast(ids.size()), vec_dim_1})); + out_t->set_lod(ids_lods[i]); + + auto* out_d = out_t->mutable_data(place); + + for (int idx = 0; idx < ids.size(); idx++) { + const auto& id = ids[idx]; + + if (padding_idx != distributed::kNoPadding && id == padding_idx) { + memset(out_d + idx * vec_dim_1, 0, sizeof(float) * vec_dim_1); + } else { + std::copy_n(recved_vec_map[id].begin(), vec_dim_1, + out_d + idx * vec_dim_1); + } + } } - MergeMultipleVarsIntoOneBySection(id_name, ids_vector, out_name, - out_var_names, height_sections, splited_ids, - context, local_scope.get(), &actual_ctx); + if (backfill) { + VLOG(3) << "backfill persistable var's id with vecs"; + + auto* reconstruct_d = reconstruct_var->data(); + for (auto& id : ids_union) { + std::copy(recved_vec_map[id].begin(), recved_vec_map[id].end(), + reconstruct_d + id * vec_dim_1); + } + } } }; // namespace distributed diff --git a/paddle/fluid/operators/distributed/parameter_prefetch.h b/paddle/fluid/operators/distributed/parameter_prefetch.h index 0429ec4415d..a531c87f57c 100644 --- a/paddle/fluid/operators/distributed/parameter_prefetch.h +++ b/paddle/fluid/operators/distributed/parameter_prefetch.h @@ -15,6 +15,7 @@ #pragma once #include +#include #include #include "paddle/fluid/framework/operator.h" @@ -23,61 +24,25 @@ namespace paddle { namespace operators { namespace distributed { +constexpr int64_t kNoPadding = -1; + +void prefetchs(const std::vector& id_var_names, + const std::vector& out_var_names, + const std::string& persistable_var_name, const bool backfill, + const std::vector& table_names, + const std::vector& endpoints, + const std::vector& height_sections, + const framework::ExecutionContext& context, + const framework::Scope& scope); + void prefetch(const std::string& id_name, const std::string& out_name, + const std::string& persistable_var_name, const bool backfill, const std::vector& table_names, - const std::vector& epmap, + const std::vector& endpoints, const std::vector& height_sections, const framework::ExecutionContext& context, const framework::Scope& scope); -template -void prefetch_with_reconstruct(const std::string& id_name, - const std::string& out_name, - const std::vector& table_names, - const std::vector& epmap, - const std::vector& height_sections, - const framework::ExecutionContext& context, - const framework::Scope& scope, - framework::LoDTensor* original) { - prefetch(id_name, out_name, table_names, epmap, height_sections, context, - scope); - auto& out = scope.FindVar(out_name)->Get(); - auto& ids = scope.FindVar(id_name)->Get(); - auto* original_value = original->data(); - auto* out_value = out.data(); - size_t original_width = original->numel() / original->dims()[0]; - - bool is_on_cpu_place = true; - if (!platform::is_cpu_place(ids.place())) { - is_on_cpu_place = false; - } - if (is_on_cpu_place) { - for (int64_t i = 0; i < ids.numel(); i++) { - const T* out_rows = out_value + original_width * i; - T* original_row = - original_value + original_width * ids.data()[i]; - std::memcpy(original_row, out_rows, original_width * sizeof(T)); - } - } else { -#ifndef PADDLE_WITH_CUDA - PADDLE_THROW("paddle is not compiled with CUDA!"); -#else - platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); - auto& actual_ctx = *pool.Get(context.GetPlace()); - for (int64_t i = 0; i < ids.numel(); i++) { - const T* out_rows = out_value + original_width * i; - T* original_row = - original_value + original_width * ids.data()[i]; - auto stream = - static_cast(&actual_ctx)->stream(); - memory::Copy(boost::get(ids.place()), original_row, - platform::CPUPlace(), out_rows, original_width * sizeof(T), - stream); - } -#endif - } -} - }; // namespace distributed }; // namespace operators }; // namespace paddle diff --git a/paddle/fluid/operators/distributed/request_handler_impl.cc b/paddle/fluid/operators/distributed/request_handler_impl.cc index 876b764a751..c2368ab10eb 100644 --- a/paddle/fluid/operators/distributed/request_handler_impl.cc +++ b/paddle/fluid/operators/distributed/request_handler_impl.cc @@ -116,42 +116,7 @@ bool RequestGetHandler::Handle(const std::string& varname, VLOG(3) << "copying " << varname << " to " << param_bak_name; framework::TensorCopy(t_orig, dev_ctx_->GetPlace(), t); } - if (AsyncSparseParamUpdateRecorder::GetInstance()->HasParam(varname) && - !table_name.empty()) { - std::vector updated_rows; - AsyncSparseParamUpdateRecorder::GetInstance()->GetAndClear( - varname, trainer_id, &updated_rows); - if (VLOG_IS_ON(3)) { - std::ostringstream sstream; - sstream << "["; - for (auto& row_id : updated_rows) { - sstream << row_id << ", "; - } - sstream << "]"; - VLOG(3) << "updated_rows size: " << updated_rows.size() << " " - << sstream.str(); - } - auto& origin_tensor = - scope_->FindVar(varname)->Get(); - auto* origin_tensor_data = origin_tensor.data(); - auto& dims = origin_tensor.dims(); - *outvar = scope->Var(); - auto* out_slr = (*outvar)->GetMutable(); - out_slr->set_rows(updated_rows); - out_slr->set_height(dims[0]); - auto out_dims = framework::make_ddim( - {static_cast(updated_rows.size()), dims[1]}); - auto* data = out_slr->mutable_value()->mutable_data( - out_dims, origin_tensor.place()); - auto width = dims[1]; - for (auto i = 0; i < updated_rows.size(); ++i) { - PADDLE_ENFORCE_LT(updated_rows[i], dims[0]); - memcpy(data + i * width, origin_tensor_data + updated_rows[i] * width, - sizeof(float) * width); - } - } else { - *outvar = scope_->FindVar(varname); - } + *outvar = scope_->FindVar(varname); } } return true; diff --git a/paddle/fluid/operators/distributed_ops/distributed_lookup_table_op.cc b/paddle/fluid/operators/distributed_ops/distributed_lookup_table_op.cc new file mode 100644 index 00000000000..3e354791ea9 --- /dev/null +++ b/paddle/fluid/operators/distributed_ops/distributed_lookup_table_op.cc @@ -0,0 +1,166 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include + +#include "paddle/fluid/framework/data_type.h" +#include "paddle/fluid/framework/op_registry.h" +#include "paddle/fluid/operators/distributed/parameter_prefetch.h" +#include "paddle/fluid/operators/math/math_function.h" + +namespace paddle { +namespace operators { + +class DistributedLookupTableOp : public framework::OperatorWithKernel { + public: + using framework::OperatorWithKernel::OperatorWithKernel; + + void InferShape(framework::InferShapeContext *ctx) const override { + PADDLE_ENFORCE(ctx->HasInputs("Ids"), + "Input(Ids) of LookupTableOp should not be null."); + PADDLE_ENFORCE(ctx->HasInput("W"), + "Input(W) of LookupTableOp should not be null."); + PADDLE_ENFORCE(ctx->HasOutputs("Outputs"), + "Output(Outs) of LookupTableOp should not be null."); + + auto ids_dims = ctx->GetInputsDim("Ids"); + auto table_dims = ctx->GetInputDim("W"); + + PADDLE_ENFORCE_EQ(table_dims.size(), 2, + "Only 2 dimensions of the 'Embedding' is supported."); + + for (auto &ids_dim : ids_dims) { + PADDLE_ENFORCE_EQ(ids_dim.size(), 2, + "The dimension of the 'Ids' tensor must be 2."); + PADDLE_ENFORCE_EQ(ids_dim[1], 1, + "The last dimension of the 'Ids' tensor must be 1."); + } + + auto lookup_tables = + ctx->Attrs().Get>("table_names"); + auto height_sections = + ctx->Attrs().Get>("height_sections"); + auto endpoints = ctx->Attrs().Get>("endpoints"); + + PADDLE_ENFORCE(lookup_tables.size() == height_sections.size() && + lookup_tables.size() == endpoints.size() && + lookup_tables.size() != 0, + "Attrs lookup_tables/height_sections/endpoints must have " + "save size and can not be 0."); + + auto outputs_dims = std::vector(); + + for (auto &ids_dim : ids_dims) { + outputs_dims.push_back(framework::make_ddim({ids_dim[0], table_dims[1]})); + } + + ctx->SetOutputsDim("Outputs", outputs_dims); + ctx->ShareLoD("Ids", /*->*/ "Outputs"); + } + + protected: + framework::OpKernelType GetExpectedKernelType( + const framework::ExecutionContext &ctx) const override { + auto data_type = framework::GetDataTypeOfVar(ctx.InputVar("W")); + return framework::OpKernelType(data_type, ctx.device_context()); + } +}; + +template +class DistributedLookupTableKernel : public framework::OpKernel { + public: + void Compute(const framework::ExecutionContext &context) const override { + auto ids_vars = context.MultiInputVar("Ids"); + auto emb_vars = context.MultiOutput("Embeddings"); + + auto id_names = context.Inputs("Ids"); + auto embedding_name = context.Inputs("W").front(); + auto out_names = context.Outputs("Outputs"); + + auto lookup_tables = context.Attr>("table_names"); + auto height_sections = + context.Attr>("height_sections"); + auto endpoints = context.Attr>("endpoints"); + + operators::distributed::prefetchs( + id_names, out_names, embedding_name, false, lookup_tables, endpoints, + height_sections, context, context.scope()); + } +}; + +class DistributedLookupTableOpMaker : public framework::OpProtoAndCheckerMaker { + public: + void Make() override { + AddInput("Ids", + "(LoDTensor) Ids's type should be LoDTensor" + "THe ids to be looked up in W.") + .AsDuplicable(); + + AddInput("W", + "(Tensor) The input represents embedding tensors, " + "which is a learnable parameter."); + + AddOutput("Outputs", + "(LoDTensor) The lookup results, which have the same type as W.") + .AsDuplicable(); + + AddAttr>( + "table_names", + "(string vector, such as emb_block0, emb_block1)" + "Server endpoints in the order of input variables for mapping") + .SetDefault({""}); + + AddAttr>("height_sections", + "Height for each output SelectedRows.") + .SetDefault(std::vector({})); + + AddAttr>( + "endpoints", + "(string vector, default 127.0.0.1:6164)" + "Server endpoints in the order of input variables for mapping") + .SetDefault({"127.0.0.1:6164"}); + + AddAttr("trainer_id", "trainer id from 0 ~ worker_num.").SetDefault(0); + + AddAttr("padding_idx", + "(int64, default -1) " + "If the value is -1, it makes no effect to lookup. " + "Otherwise the given value indicates padding the output " + "with zeros whenever lookup encounters it in Ids.") + .SetDefault(distributed::kNoPadding); + + AddComment(R"DOC( +Lookup Tablel Prefetch Operator. + +This operator is used to perform lookup on parameter W, +then concatenated into a sparse tensor. + +The type of Ids(Input) is SelectedRows, the rows of Ids contains +the ids to be looked up in W; +if the Id is not in the sparse table, this operator will return a +random value and set the value into the table for the next looking up. + +)DOC"); + } +}; +} // namespace operators +} // namespace paddle + +namespace ops = paddle::operators; + +REGISTER_OPERATOR(distributed_lookup_table, ops::DistributedLookupTableOp, + ops::DistributedLookupTableOpMaker); + +REGISTER_OP_CPU_KERNEL(distributed_lookup_table, + ops::DistributedLookupTableKernel); diff --git a/paddle/fluid/operators/hierarchical_sigmoid_op.h b/paddle/fluid/operators/hierarchical_sigmoid_op.h index a0af514509d..d20a7e96b10 100644 --- a/paddle/fluid/operators/hierarchical_sigmoid_op.h +++ b/paddle/fluid/operators/hierarchical_sigmoid_op.h @@ -97,10 +97,10 @@ class HierarchicalSigmoidOpKernel : public framework::OpKernel { #ifdef PADDLE_WITH_DISTRIBUTE // w_Out is set to used by prefetch, never change it in other cases - auto* w_out = ctx.Output("W_Out"); - operators::distributed::prefetch_with_reconstruct( - "Ids@Prefetch", "W@Prefetch", table_names, epmap, height_sections, - ctx, local_scope, w_out); + auto weight = ctx.Outputs("W_Out").front(); + operators::distributed::prefetch("Ids@Prefetch", "W@Prefetch", weight, + true, table_names, epmap, + height_sections, ctx, local_scope); #else PADDLE_THROW( "paddle is not compiled with distribute support, can not do " diff --git a/paddle/fluid/operators/lookup_table_op.cu b/paddle/fluid/operators/lookup_table_op.cu index cb432e6d3e9..e4add1c746a 100644 --- a/paddle/fluid/operators/lookup_table_op.cu +++ b/paddle/fluid/operators/lookup_table_op.cu @@ -98,46 +98,27 @@ class LookupTableCUDAKernel : public framework::OpKernel { auto id_name = context.Inputs("Ids").front(); auto out_name = context.Outputs("Out").front(); - // for remote prefetch - auto epmap = context.Attr>("epmap"); - auto height_sections = - context.Attr>("height_sections"); - auto table_names = context.Attr>("table_names"); - - if (!epmap.empty()) { -// if epmap is not empty, then the parameter will be fetched from remote -// parameter -// server -#ifdef PADDLE_WITH_DISTRIBUTE - operators::distributed::prefetch(id_name, out_name, table_names, epmap, - height_sections, context, - context.scope()); -#else - PADDLE_THROW( - "paddle is not compiled with distribute support, can not do " - "parameter prefetch!"); -#endif - } else { - size_t N = table_t->dims()[0]; - size_t D = table_t->dims()[1]; - size_t K = ids_t->numel(); - - auto *ids = ids_t->data(); - auto *table = table_t->data(); - auto *output = output_t->mutable_data(context.GetPlace()); - - dim3 threads(128, 8); - dim3 grids(8, 1); - - if (padding_idx == -1) - LookupTable<<< - grids, threads, 0, context.cuda_device_context().stream()>>>( - output, table, ids, N, K, D, padding_idx); - else - LookupTable<<< - grids, threads, 0, context.cuda_device_context().stream()>>>( - output, table, ids, N, K, D, padding_idx); - } + size_t N = table_t->dims()[0]; + size_t D = table_t->dims()[1]; + size_t K = ids_t->numel(); + + auto *ids = ids_t->data(); + auto *table = table_t->data(); + auto *output = output_t->mutable_data(context.GetPlace()); + + dim3 threads(128, 8); + dim3 grids(8, 1); + + if (padding_idx == -1) + LookupTable< + T, 128, 8, 8, + false><<>>( + output, table, ids, N, K, D, padding_idx); + else + LookupTable< + T, 128, 8, 8, + true><<>>( + output, table, ids, N, K, D, padding_idx); } }; diff --git a/paddle/fluid/operators/lookup_table_op.h b/paddle/fluid/operators/lookup_table_op.h index b3e48638c6c..4863ed17424 100644 --- a/paddle/fluid/operators/lookup_table_op.h +++ b/paddle/fluid/operators/lookup_table_op.h @@ -46,6 +46,7 @@ class LookupTableKernel : public framework::OpKernel { auto *table_var = context.InputVar("W"); auto id_name = context.Inputs("Ids").front(); + auto embedding_name = context.Inputs("W").front(); auto out_name = context.Outputs("Out").front(); // for remote prefetch @@ -57,12 +58,12 @@ class LookupTableKernel : public framework::OpKernel { if (remote_prefetch && !epmap.empty()) { // if epmap is not empty, then the parameter will be fetched from remote -// parameter -// server +// parameter server + #ifdef PADDLE_WITH_DISTRIBUTE - operators::distributed::prefetch(id_name, out_name, table_names, epmap, - height_sections, context, - context.scope()); + operators::distributed::prefetch(id_name, out_name, embedding_name, false, + table_names, epmap, height_sections, + context, context.scope()); #else PADDLE_THROW( "paddle is not compiled with distribute support, can not do " diff --git a/paddle/fluid/operators/nce_op.h b/paddle/fluid/operators/nce_op.h index 12f3118ec77..5665b9f5519 100644 --- a/paddle/fluid/operators/nce_op.h +++ b/paddle/fluid/operators/nce_op.h @@ -195,9 +195,10 @@ class NCEKernel : public framework::OpKernel { w_tensor->Resize(framework::make_ddim(w_dims)); #ifdef PADDLE_WITH_DISTRIBUTE + auto weight = context.Inputs("Weight").front(); operators::distributed::prefetch("Ids@Prefetch", "Weight@Prefetch", - table_names, epmap, height_sections, - context, local_scope); + weight, false, table_names, epmap, + height_sections, context, local_scope); #else PADDLE_THROW( "paddle is not compiled with distribute support, can not do " diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py index 8c230c58e32..ce022954f84 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py @@ -210,11 +210,6 @@ class DistributedTranspiler(Fleet): self._transpile_config = config self._transpiler = OriginTranspiler(config) - print("server endpoints") - print(fleet.server_endpoints(to_string=True)) - print("worker index: %d" % fleet.worker_index()) - print("worker num: %d" % fleet.worker_num()) - if self.is_worker(): self._transpiler.transpile( trainer_id=fleet.worker_index(), @@ -222,12 +217,11 @@ class DistributedTranspiler(Fleet): trainers=fleet.worker_num(), sync_mode=config.sync_mode) - wait_port = True if isinstance(self._role_maker, MPISymetricRoleMaker): - wait_port = False + config.wait_port = False self.main_program = self._transpiler.get_trainer_program( - wait_port=wait_port) + wait_port=config.wait_port) self.startup_program = default_startup_program() else: self._transpiler.transpile( diff --git a/python/paddle/fluid/tests/test_communicator.py b/python/paddle/fluid/tests/test_communicator.py index 24c8c4887ec..42448758bcf 100644 --- a/python/paddle/fluid/tests/test_communicator.py +++ b/python/paddle/fluid/tests/test_communicator.py @@ -15,12 +15,51 @@ from __future__ import print_function import unittest +import time import paddle.fluid as fluid from paddle.fluid.communicator import Communicator +import paddle.fluid.incubate.fleet.base.role_maker as role_maker +from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet +from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig + class TestCommunicator(unittest.TestCase): + def net(self): + x = fluid.layers.data(name='x', shape=[13], dtype='float32') + y_predict = fluid.layers.fc(input=x, size=1, act=None) + y = fluid.layers.data(name='y', shape=[1], dtype='float32') + + cost = fluid.layers.square_error_cost(input=y_predict, label=y) + avg_cost = fluid.layers.mean(cost) + return avg_cost + + def test_communicator_init_and_start(self): + role = role_maker.UserDefinedRoleMaker( + current_id=0, + role=role_maker.Role.WORKER, + worker_num=2, + server_endpoints=["127.0.0.1:6001", "127.0.0.1:6002"]) + + fleet.init(role) + avg_cost = self.net() + + optimizer = fluid.optimizer.SGD(0.01) + + strategy = DistributeTranspilerConfig() + strategy.sync_mode = True + strategy.wait_port = False + optimizer = fleet.distributed_optimizer(optimizer, strategy) + optimizer.minimize(avg_cost) + + comm = Communicator(fleet.main_program) + comm.start() + time.sleep(10) + comm.stop() + + +class TestCommunicator2(unittest.TestCase): def test_communicator_init_and_start(self): prog = fluid.Program() comm = Communicator(prog) diff --git a/python/paddle/fluid/tests/unittests/test_dist_ctr.py b/python/paddle/fluid/tests/unittests/test_dist_ctr.py index cc11764d559..55234a85731 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_ctr.py +++ b/python/paddle/fluid/tests/unittests/test_dist_ctr.py @@ -18,6 +18,18 @@ import unittest from test_dist_base import TestDistBase +def skip_ci(func): + on_ci = bool(int(os.environ.get("SKIP_UNSTABLE_CI", '0'))) + + def __func__(*args, **kwargs): + if on_ci: + return + return func(*args, **kwargs) + + return __func__ + + +@skip_ci class TestDistCTR2x2(TestDistBase): def _setup_config(self): self._sync_mode = True @@ -27,6 +39,7 @@ class TestDistCTR2x2(TestDistBase): self.check_with_place("dist_ctr.py", delta=1e-7, check_error_log=False) +@skip_ci class TestDistCTRWithL2Decay2x2(TestDistBase): def _setup_config(self): self._sync_mode = True @@ -37,7 +50,7 @@ class TestDistCTRWithL2Decay2x2(TestDistBase): self.check_with_place( "dist_ctr.py", delta=1e-7, - check_error_log=False, + check_error_log=True, need_envs=need_envs) diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py index 5d3c0fbdd0c..9bad641a8cb 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py @@ -19,6 +19,18 @@ import unittest from test_dist_fleet_base import TestFleetBase +def skip_ci(func): + on_ci = bool(int(os.environ.get("SKIP_UNSTABLE_CI", '0'))) + + def __func__(*args, **kwargs): + if on_ci: + return + return func(*args, **kwargs) + + return __func__ + + +@skip_ci class TestDistMnist2x2(TestFleetBase): def _setup_config(self): self._sync_mode = False diff --git a/python/paddle/fluid/tests/unittests/test_dist_word2vec.py b/python/paddle/fluid/tests/unittests/test_dist_word2vec.py index b26cbdbea12..4553cb0ffd7 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_word2vec.py +++ b/python/paddle/fluid/tests/unittests/test_dist_word2vec.py @@ -20,6 +20,7 @@ from test_dist_base import TestDistBase class TestDistW2V2x2(TestDistBase): def _setup_config(self): self._sync_mode = True + self._enforce_place = "CPU" def test_dist_train(self): self.check_with_place("dist_word2vec.py", delta=1e-4) @@ -29,6 +30,7 @@ class TestDistW2V2x2WithMemOpt(TestDistBase): def _setup_config(self): self._sync_mode = True self._mem_opt = True + self._enforce_place = "CPU" def test_dist_train(self): self.check_with_place("dist_word2vec.py", delta=1e-4) @@ -37,6 +39,7 @@ class TestDistW2V2x2WithMemOpt(TestDistBase): class TestDistW2V2x2Async(TestDistBase): def _setup_config(self): self._sync_mode = False + self._enforce_place = "CPU" def test_dist_train(self): self.check_with_place("dist_word2vec.py", delta=100) diff --git a/python/paddle/fluid/tests/unittests/test_lookup_remote_table_op.py b/python/paddle/fluid/tests/unittests/test_lookup_remote_table_op.py index 47830fb56b4..7aad9a3bcd5 100644 --- a/python/paddle/fluid/tests/unittests/test_lookup_remote_table_op.py +++ b/python/paddle/fluid/tests/unittests/test_lookup_remote_table_op.py @@ -185,8 +185,6 @@ class TestListenAndServOp(unittest.TestCase): port1 = self._get_pserver_port(p1.pid) places = [core.CPUPlace()] - if core.is_compiled_with_cuda(): - places.append(core.CUDAPlace(0)) for place in places: self._run_lookup_table_op_one_pserver(place, port0) diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index c312a36fa64..1a9c1813e47 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -357,14 +357,49 @@ class DistributeTranspiler(object): sparse_update_ops.append(op) return sparse_update_ops - def _update_remote_sparse_update_op(self, param_varname, height_sections, - endpint_map, table_names): + def _update_remote_sparse_update_op(self, program, param_varname, + height_sections, endpoints, + table_names): + + ops = [] + op_type = "" + for op in self.sparse_update_ops: - if param_varname in op.input_arg_names: - op._set_attr('epmap', endpint_map) - op._set_attr('table_names', table_names) - op._set_attr('height_sections', height_sections) - op._set_attr('trainer_id', self.trainer_id) + if param_varname in op.input_arg_names and op_type == "": + op_type = op.type + ops.append(op) + + elif param_varname in op.input_arg_names and op_type == op.type: + ops.append(op) + + if op_type == "lookup_table": + all_ops = program.global_block().ops + op_idxs = [all_ops.index(op) for op in ops] + inputs = [ + program.global_block().vars[op.input("Ids")[0]] for op in ops + ] + w = program.global_block().vars[ops[0].input("W")[0]] + padding_idx = ops[0].attr("padding_idx") + outputs = [ + program.global_block().vars[op.output("Out")[0]] for op in ops + ] + + for idx in op_idxs[::-1]: + program.global_block()._remove_op(idx) + + program.global_block()._insert_op( + index=op_idxs[0], + type="distributed_lookup_table", + inputs={"Ids": inputs, + 'W': w}, + outputs={"Outputs": outputs}, + attrs={ + "table_names": table_names, + "height_sections": height_sections, + "endpoints": endpoints, + "padding_idx": padding_idx, + "trainer_id": self.trainer_id + }) def _is_input_of_remote_sparse_update_op(self, param_name): for op in self.sparse_update_ops: @@ -523,17 +558,12 @@ class DistributeTranspiler(object): splited_grad_varname = splited_vars[0].name index = find_op_by_output_arg( program.global_block(), splited_grad_varname, reverse=True) - if splited_vars[0].type == core.VarDesc.VarType.SELECTED_ROWS: - sparse_param_name = self.grad_name_to_param_name[ - grad_varname] - if self._is_input_of_remote_sparse_update_op( - sparse_param_name): - self.sparse_param_to_height_sections[ - sparse_param_name] = [splited_vars[0].shape[0]] + elif len(splited_vars) > 1: orig_var = program.global_block().vars[splited_grad_varname] index = find_op_by_output_arg( program.global_block(), splited_grad_varname, reverse=True) + if not self.config.runtime_split_send_recv: self._insert_split_op(program, orig_var, index, splited_vars) @@ -542,6 +572,13 @@ class DistributeTranspiler(object): AssertionError("Can not insert the send op by original " "variable name :", splited_grad_varname) + if splited_vars[0].type == core.VarDesc.VarType.SELECTED_ROWS: + sparse_param_name = self.grad_name_to_param_name[grad_varname] + if self._is_input_of_remote_sparse_update_op(sparse_param_name): + self.sparse_param_to_height_sections[sparse_param_name] = [ + splited_var.shape[0] for splited_var in splited_vars + ] + dummy_output = program.global_block().create_var( name=framework.generate_control_dev_var_name()) self.grad_name_to_send_dummy_out[grad_varname] = dummy_output @@ -639,7 +676,6 @@ class DistributeTranspiler(object): recv_op_role_var_name = splited_trainer_grad[0].name if param_varname in self.sparse_param_to_height_sections: - for table_name in table_names: distributed_var = self.vars_overview.get_distributed_var_by_slice( table_name) @@ -648,7 +684,7 @@ class DistributeTranspiler(object): height_sections = self.sparse_param_to_height_sections[ param_varname] self._update_remote_sparse_update_op( - param_varname, height_sections, eps, table_names) + program, param_varname, height_sections, eps, table_names) else: recv_varnames = [] if self.config.runtime_split_send_recv: -- GitLab