diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index eeda759ff18ccb86ce6a585fe41cb972ea3ae295..e718b32cb6c48d11e73600509a17db107f438708 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -23,7 +23,7 @@ repos: - id: clang-format-with-version-check name: clang-format description: Format files with ClangFormat. - entry: bash ./.clang_format.hook -i + entry: bash ./tools/codestyle/clang_format.hook -i language: system files: \.(c|cc|cxx|cpp|cu|h|hpp|hxx|proto)$ - repo: local @@ -52,7 +52,7 @@ repos: hooks: - id: copyright_checker name: copyright_checker - entry: python ./.copyright.hook + entry: python ./tools/codestyle/copyright.hook language: system files: \.(c|cc|cxx|cpp|cu|h|hpp|hxx|proto|py)$ exclude: (?!.*third_party)^.*$ | (?!.*book)^.*$ diff --git a/paddle/contrib/inference/high_level_api.md b/paddle/contrib/inference/high_level_api.md new file mode 100644 index 0000000000000000000000000000000000000000..563b696143de9cbf67db38048bbd2f7c11b3a66e --- /dev/null +++ b/paddle/contrib/inference/high_level_api.md @@ -0,0 +1,59 @@ +# Inference High-level APIs +This document describes the high-level inference APIs one can use to easily deploy a Paddle model for an application. + +The APIs are described in `paddle_inference_api.h`, just one header file, and two libaries `libpaddle_fluid.so` and `libpaddle_fluid_api.so` are needed. + +## PaddleTensor +We provide the `PaddleTensor` data structure is to give a general tensor interface. + +The definition is + +```c++ +struct PaddleTensor { + std::string name; // variable name. + std::vector shape; + PaddleBuf data; // blob of data. + PaddleDType dtype; +}; +``` + +The data is stored in a continuous memory `PaddleBuf`, and tensor's data type is specified by a `PaddleDType`. +The `name` field is used to specify the name of input variable, +that is important when there are multiple inputs and need to distiuish which variable to set. + +## engine +The inference APIs has two different underlying implementation, currently there are two valid engines: + +- the native engine, which is consists of the native operators and framework, +- the Anakin engine, which is a Anakin library embeded. + +The native engine takes a native Paddle model as input, and supports any model that trained by Paddle, +but the Anakin engine can only take the Anakin model as input(user need to manully transform the format first) and currently not all Paddle models are supported. + +```c++ +enum class PaddleEngineKind { + kNative = 0, // Use the native Fluid facility. + kAnakin, // Use Anakin for inference. +}; +``` + +## PaddlePredictor and how to create one +The main interface is `PaddlePredictor`, there are following methods + +- `bool Run(const std::vector& inputs, std::vector* output_data)` + - take inputs and output `output_data` +- `Clone` to clone a predictor from an existing one, with model parameter shared. + +There is a factory method to help create a predictor, and the user takes the ownership of this object. + +```c++ +template +std::unique_ptr CreatePaddlePredictor(const ConfigT& config); +``` + +By specifying the engine kind and config, one can get an specific implementation. + +## Reference + +- [paddle_inference_api.h](./paddle_inference_api.h) +- [demos](./demo) diff --git a/paddle/contrib/inference/paddle_inference_api.h b/paddle/contrib/inference/paddle_inference_api.h index bd4530fcf9518cb3bf06179d8f60a1dde38ff7dd..38e3cc21413b9ab715b84f278f00b9df23cb7682 100644 --- a/paddle/contrib/inference/paddle_inference_api.h +++ b/paddle/contrib/inference/paddle_inference_api.h @@ -109,8 +109,7 @@ class PaddlePredictor { // The common configs for all the predictors. struct Config { - std::string model_dir; // path to the model directory. - bool enable_engine{false}; // Enable to execute (part of) the model on + std::string model_dir; // path to the model directory. }; }; diff --git a/paddle/fluid/framework/details/multi_devices_graph_builder.cc b/paddle/fluid/framework/details/multi_devices_graph_builder.cc index 78356cb1be3bd089c26dde663275e2c8109df951..99dcaf27134e879fa85f57e5a675382442e9edf2 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_builder.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_builder.cc @@ -57,6 +57,7 @@ MultiDevSSAGraphBuilder::MultiDevSSAGraphBuilder( for (auto &p : params) { grad_names_.insert(GradVarName(p)); } + balance_vars_.resize(places_.size(), 0); } void MultiDevSSAGraphBuilder::CreateOpHandleIOs(SSAGraph *result, @@ -140,11 +141,30 @@ bool MultiDevSSAGraphBuilder::IsDistTrainOp( checker(op.InputArgumentNames(), recv_vars); } +size_t MultiDevSSAGraphBuilder::GetAppropriateDeviceID( + const std::vector &var_names) const { + int64_t numel_sum = 0; + for (auto var_name : var_names) { + auto var_desc = all_vars_.at(var_name); + PADDLE_ENFORCE_NOT_NULL(var_desc); + auto dim = framework::make_ddim(var_desc->GetShape()); + int64_t numel = framework::product(dim); + PADDLE_ENFORCE_GT(numel, 0); + numel_sum += numel; + } + + auto smallest = + std::min_element(std::begin(balance_vars_), std::end(balance_vars_)); + size_t dev_id = + static_cast(std::distance(std::begin(balance_vars_), smallest)); + balance_vars_[dev_id] += numel_sum; + return dev_id; +} + std::unique_ptr MultiDevSSAGraphBuilder::Build( const ProgramDesc &program) const { - std::unordered_map all_vars; for (auto *var : program.Block(0).AllVars()) { - all_vars[var->Name()] = var; + all_vars_.emplace(var->Name(), var); } auto graph = new SSAGraph(); @@ -161,35 +181,16 @@ std::unique_ptr MultiDevSSAGraphBuilder::Build( auto send_vars = FindDistTrainSendVars(program); auto recv_vars = FindDistTrainRecvVars(program); - std::vector> var_name_on_devices; std::vector> bcast_var_name_set; - var_name_on_devices.resize(places_.size()); bcast_var_name_set.resize(places_.size()); size_t cur_device_id = 0; - std::vector balance_grads(places_.size(), 0); - - auto get_appropriate_dev = [&](std::string &g_name) -> size_t { - auto var_desc = all_vars.at(g_name); - PADDLE_ENFORCE_NOT_NULL(var_desc); - auto dim = framework::make_ddim(var_desc->GetShape()); - int64_t numel = framework::product(dim); - PADDLE_ENFORCE_GE(numel, 0); - auto smallest = - std::min_element(std::begin(balance_grads), std::end(balance_grads)); - size_t dev_id = - static_cast(std::distance(std::begin(balance_grads), smallest)); - balance_grads[dev_id] += numel; - return dev_id; - }; - bool is_forwarding = true; + for (auto *op : program.Block(0).AllOps()) { if (boost::get( op->GetAttr(OpProtoAndCheckerMaker::OpRoleAttrName())) == static_cast(OpRole::kRPC)) { - // append rpc op if program is distributed trainer main program. - // always use the first device CreateRPCOp(&result, *op); } else if (IsDistTrainOp(*op, send_vars, recv_vars)) { CreateDistTrainOp(&result, *op); @@ -199,15 +200,19 @@ std::unique_ptr MultiDevSSAGraphBuilder::Build( BuildStrategy::GradientScaleStrategy::kCustomized) { CreateScaleLossGradOp(&result); } + // This assumes the backward generating code will ensure IsScaleLossOp + // is true only for the op that scale the final scalar loss. + // It also assumes backward op will always follow the forward op in + // the block. is_forwarding = false; } else { - int op_dev_id = GetOpDeviceID(var_name_on_devices, *op); + int op_dev_id = GetOpDeviceID(*op); if (op_dev_id == -1) { // var on all device CreateComputationalOps(&result, *op, places_.size()); } else { CreateComputationalOp(&result, *op, op_dev_id); for (auto &var_name : op->OutputArgumentNames()) { - var_name_on_devices[op_dev_id].emplace(var_name); + var_name_on_devices_.emplace(var_name, op_dev_id); } } if (!is_forwarding && places_.size() > 1) { @@ -230,19 +235,22 @@ std::unique_ptr MultiDevSSAGraphBuilder::Build( switch (strategy_.reduce_) { case BuildStrategy::ReduceStrategy::kReduce: - cur_device_id = get_appropriate_dev(g_name); + cur_device_id = GetAppropriateDeviceID({g_name}); CreateReduceOp(&result, g_name, cur_device_id); - var_name_on_devices[cur_device_id].emplace(g_name); + var_name_on_devices_.emplace(g_name, cur_device_id); bcast_var_name_set[cur_device_id].emplace(p_name); break; case BuildStrategy::ReduceStrategy::kAllReduce: - if (IsSparseGradient(all_vars, g_name)) { + if (IsSparseGradient(g_name)) { CreateReduceOp(&result, g_name, 0); CreateBroadcastOp(&result, g_name, 0); } else { InsertAllReduceOp(&result, g_name); } break; + default: + LOG(FATAL) << "Unknown reduce strategy "; + break; } } } catch (boost::bad_get e) { @@ -261,7 +269,7 @@ std::unique_ptr MultiDevSSAGraphBuilder::Build( } /* Dependency graph has been constructed. However, there are still data - harzaeds need to be handled. + hazards need to be handled. */ PolishGraphToSupportDataHazards(&result); @@ -273,11 +281,9 @@ std::unique_ptr MultiDevSSAGraphBuilder::Build( return std::unique_ptr(graph); } -bool MultiDevSSAGraphBuilder::IsSparseGradient( - const std::unordered_map &all_vars, - const std::string &og) const { - PADDLE_ENFORCE(all_vars.count(og) != 0); - if (all_vars.at(og)->GetType() == proto::VarType::SELECTED_ROWS) { +bool MultiDevSSAGraphBuilder::IsSparseGradient(const std::string &og) const { + PADDLE_ENFORCE(all_vars_.count(og) != 0); + if (all_vars_.at(og)->GetType() == proto::VarType::SELECTED_ROWS) { return true; } return false; @@ -363,24 +369,23 @@ bool MultiDevSSAGraphBuilder::IsParameterGradientOnce( return is_pg_once; } -int MultiDevSSAGraphBuilder::GetOpDeviceID( - const std::vector> &var_name_on_devices, - const OpDesc &op) const { +int MultiDevSSAGraphBuilder::GetOpDeviceID(const OpDesc &op) const { if (strategy_.reduce_ != BuildStrategy::ReduceStrategy::kReduce) { return -1; } - int var_dev_id = -1; - for (auto &var_name : op.InputArgumentNames()) { - if (var_dev_id != -1) break; - for (size_t i = 0; i < var_name_on_devices.size(); ++i) { - if (var_name_on_devices[i].count(var_name)) { - var_dev_id = static_cast(i); - break; - } + for (auto &varname : op.InputArgumentNames()) { + int dev_id = GetVarDeviceID(varname); + if (dev_id != -1) { + return dev_id; } } - return var_dev_id; + return -1; +} + +int MultiDevSSAGraphBuilder::GetVarDeviceID(const std::string &varname) const { + auto got = var_name_on_devices_.find(varname); + return got == var_name_on_devices_.end() ? -1 : got->second; } void MultiDevSSAGraphBuilder::CreateScaleLossGradOp(SSAGraph *result) const { @@ -449,6 +454,8 @@ VarHandle *MultiDevSSAGraphBuilder::CreateReduceOp(SSAGraph *result, return var; } +// Find the first occurence of `prev_op_name` and make current `op` depend +// on it. void MultiDevSSAGraphBuilder::ConnectOp(SSAGraph *result, OpHandleBase *op, const std::string &prev_op_name) const { for (auto &prev_op : result->ops_) { @@ -463,16 +470,66 @@ void MultiDevSSAGraphBuilder::ConnectOp(SSAGraph *result, OpHandleBase *op, void MultiDevSSAGraphBuilder::CreateDistTrainOp(SSAGraph *result, const OpDesc &op) const { - CreateComputationalOp(result, op, 0); + int op_dev_id = -1; + if (op.Type() == "split_byref") { + op_dev_id = GetVarDeviceID(op.InputArgumentNames()[0]); + if (strategy_.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce) { + op_dev_id = GetAppropriateDeviceID(op.InputArgumentNames()); + for (auto &varname : op.InputArgumentNames()) { + var_name_on_devices_.emplace(varname, op_dev_id); + } + } + for (auto &varname : op.OutputArgumentNames()) { + var_name_on_devices_.emplace(varname, op_dev_id); + } + } else if (op.Type() == "concat") { + op_dev_id = GetVarDeviceID(op.InputArgumentNames()[0]); + } else { + PADDLE_ENFORCE( + "the distribute training related op should be in [split_byref, " + "concat]."); + } + + PADDLE_ENFORCE(op_dev_id != -1, + "can not find right place for distributed op: %s", op.Type()); + + CreateComputationalOp(result, op, op_dev_id); if (op.Type() == "concat") { ConnectOp(result, result->ops_.back().get(), "fetch_barrier"); } } +// Create RPC related op handles that connects its in ops and out ops. void MultiDevSSAGraphBuilder::CreateRPCOp(SSAGraph *result, const OpDesc &op) const { - result->ops_.emplace_back( - new RPCOpHandle(op, local_scopes_[0], op.Type(), places_[0])); + int op_dev_id = -1; + if (op.Type() == "send") { + op_dev_id = GetVarDeviceID(op.InputArgumentNames()[0]); + // the variable name which contains .block means it was splited by + // split_byref op + // so that we can balance the variable blocks to all the pserver instances. + if (strategy_.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce && + op.InputArgumentNames()[0].find(".block") == std::string::npos) { + op_dev_id = GetAppropriateDeviceID(op.InputArgumentNames()); + for (auto &varname : op.InputArgumentNames()) { + var_name_on_devices_.emplace(varname, op_dev_id); + } + } + } else if (op.Type() == "recv") { + op_dev_id = GetAppropriateDeviceID(op.OutputArgumentNames()); + for (auto &varname : op.OutputArgumentNames()) { + var_name_on_devices_.emplace(varname, op_dev_id); + } + } else { + // send_barrier and fetch_barrier op can be scheduled on device 0 + op_dev_id = 0; + } + + PADDLE_ENFORCE(op_dev_id != -1, "can not find the right place for rpc op: %s", + op.Type()); + + result->ops_.emplace_back(new RPCOpHandle(op, local_scopes_[op_dev_id], + op.Type(), places_[op_dev_id])); if (op.Type() == "send_barrier") { ConnectOp(result, result->ops_.back().get(), "send"); @@ -488,9 +545,7 @@ void MultiDevSSAGraphBuilder::CreateRPCOp(SSAGraph *result, "send, send_barrier. recv, fetch_barrier]"); } - // TODO(Yancey1989): schedule rpc op on different place may - // increate throughput - CreateOpHandleIOs(result, op, 0); + CreateOpHandleIOs(result, op, op_dev_id); } bool MultiDevSSAGraphBuilder::IsScaleLossOp(const OpDesc &op) const { diff --git a/paddle/fluid/framework/details/multi_devices_graph_builder.h b/paddle/fluid/framework/details/multi_devices_graph_builder.h index 78581755fe4890800636944d6cd89875a852cc19..eb1c07630ab665a90d76b810a421cffb0ce673c2 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_builder.h +++ b/paddle/fluid/framework/details/multi_devices_graph_builder.h @@ -47,10 +47,11 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder { #endif std::unique_ptr Build(const ProgramDesc &program) const override; + int GetVarDeviceID(const std::string &varname) const; private: void CreateOpHandleIOs(SSAGraph *result, const OpDesc &op, - size_t place_id) const; + size_t device_id) const; private: std::string loss_var_name_; @@ -96,21 +97,23 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder { const std::string &og, std::unordered_set *og_has_been_broadcast) const; - int GetOpDeviceID( - const std::vector> &var_name_on_devices, - const OpDesc &op) const; + int GetOpDeviceID(const OpDesc &op) const; void InsertAllReduceOp(SSAGraph *result, const std::string &og) const; void CreateBroadcastOp(SSAGraph *result, const std::string &p_name, size_t src_dev_id) const; - bool IsSparseGradient( - const std::unordered_map &all_vars, - const std::string &og) const; + bool IsSparseGradient(const std::string &og) const; + + size_t GetAppropriateDeviceID( + const std::vector &var_names) const; private: BuildStrategy strategy_; + mutable std::unordered_map all_vars_; + mutable std::unordered_map var_name_on_devices_; + mutable std::vector balance_vars_; void SetCommunicationContext(OpHandleBase *op_handle, const platform::Place &p) const; diff --git a/paddle/fluid/framework/details/ssa_graph_builder.h b/paddle/fluid/framework/details/ssa_graph_builder.h index 5fc12a44b51fae26e5a8f5fdba952d3879e82d0f..9eb23c46264f9036f009b0ae9aeeb34ec70c0e53 100644 --- a/paddle/fluid/framework/details/ssa_graph_builder.h +++ b/paddle/fluid/framework/details/ssa_graph_builder.h @@ -30,6 +30,7 @@ class SSAGraphBuilder { SSAGraphBuilder() {} virtual ~SSAGraphBuilder() {} virtual std::unique_ptr Build(const ProgramDesc &program) const = 0; + virtual int GetVarDeviceID(const std::string &var_name) const { return -1; } DISABLE_COPY_AND_ASSIGN(SSAGraphBuilder); diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc index 6c5098ce85b784a3edcf8f48d2cc828aabd8e161..b1706eb12d080364d04108c7ef4da31e1e7c1deb 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc @@ -96,6 +96,7 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( auto cur_ready_vars = ready_vars.PopAll(1, &timeout); if (timeout) { + std::lock_guard l(exception_mu_); if (exception_) { auto exp = *exception_; exception_.reset(); @@ -199,6 +200,7 @@ void ThreadedSSAGraphExecutor::RunOp( ready_var_q->Extend(op->Outputs()); VLOG(10) << op << " " << op->Name() << "Signal posted"; } catch (platform::EnforceNotMet ex) { + std::lock_guard l(exception_mu_); exception_.reset(new platform::EnforceNotMet(ex)); } catch (...) { LOG(FATAL) << "Unknown exception catched"; diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h index 4a2075f1cccb3211316567197da56c01d26f35ce..90430be996758364387b552019762d9c2e9dfe45 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h @@ -56,6 +56,7 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor { std::vector local_scopes_; std::vector places_; platform::DeviceContextPool fetch_ctxs_; + std::mutex exception_mu_; std::unique_ptr exception_; std::atomic running_ops_; diff --git a/paddle/fluid/framework/executor.cc b/paddle/fluid/framework/executor.cc index f1eccb351ef68697c748814efb2987041b0da8d9..ae98fccc9600a2a75f12fa516c982bec0ef13f9f 100644 --- a/paddle/fluid/framework/executor.cc +++ b/paddle/fluid/framework/executor.cc @@ -21,7 +21,7 @@ limitations under the License. */ #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/reader.h" #ifdef PADDLE_WITH_DISTRIBUTE -#include "paddle/fluid/operators/detail/grpc_client.h" +#include "paddle/fluid/operators/distributed/grpc_client.h" #endif #include "paddle/fluid/platform/place.h" #include "paddle/fluid/platform/profiler.h" @@ -49,8 +49,8 @@ Executor::Executor(const platform::Place& place) : place_(place) {} #ifdef PADDLE_WITH_DISTRIBUTE void Executor::Complete() { - ::paddle::operators::detail::RPCClient::GetInstance< - ::paddle::operators::detail::GRPCClient>() + ::paddle::operators::distributed::RPCClient::GetInstance< + ::paddle::operators::distributed::GRPCClient>() ->SendComplete(); } #endif diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 9406c6155da860c90739bddac1e81403b094e619..d478865fa8f24c653a4185cabd05747a5410ceaa 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -110,7 +110,6 @@ ParallelExecutor::ParallelExecutor( // Step 3. Convert main_program to SSA form and dependency graph. Also, insert // ncclOp - details::SSAGraphBuilderFactory builder_factory( member_->places_, loss_var_name, params, member_->local_scopes_, build_strategy); @@ -122,9 +121,10 @@ ParallelExecutor::ParallelExecutor( #endif } + builder_ = std::move(builder_factory.Create()); member_->executor_.reset(new details::ThreadedSSAGraphExecutor( exec_strategy, member_->local_scopes_, places, - builder_factory.Create()->Build(main_program))); + builder_->Build(main_program))); member_->executor_.reset(new details::ScopeBufferedSSAGraphExecutor( exec_strategy, member_->local_scopes_, std::move(var_infos), @@ -133,10 +133,22 @@ ParallelExecutor::ParallelExecutor( void ParallelExecutor::BCastParamsToGPUs( const std::unordered_set &vars) const { - auto *main_scope = member_->local_scopes_[0]; + // the the initialize bcast, all vars would be bcast from device(0), otherwise + // bcast from the specified device. + bool initialize = builder_.get() == nullptr ? true : false; for (auto &var : vars) { - auto *main_var = main_scope->FindVar(var); + int var_dev_id = + builder_.get() == nullptr ? -1 : builder_->GetVarDeviceID(var); + if (!initialize && var_dev_id == -1) continue; + + framework::Variable *main_var = nullptr; + if (initialize) { + main_var = member_->local_scopes_[0]->FindVar(var); + } else { + main_var = member_->local_scopes_[var_dev_id]->FindVar(var); + } + if (main_var == nullptr || !main_var->IsType()) { continue; } @@ -151,7 +163,8 @@ void ParallelExecutor::BCastParamsToGPUs( for (size_t i = 0; i < member_->places_.size(); ++i) { auto place = member_->places_[i]; void *buffer; - if (i == 0) { + + if ((initialize && i == 0) || (!initialize && i == var_dev_id)) { buffer = const_cast(main_tensor.data()); } else { auto local_scope = member_->local_scopes_[i]; diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index 5247e790649e76567f4527d54499d6e95dac5c27..058f83f07c26224e3180d140630c08a24c40cd80 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -19,12 +19,14 @@ limitations under the License. */ #include #include #include "paddle/fluid/framework/details/execution_strategy.h" +#include "paddle/fluid/framework/details/multi_devices_graph_builder.h" #include "paddle/fluid/framework/executor.h" #include "paddle/fluid/framework/op_info.h" #include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/tensor.h" #include "paddle/fluid/platform/device_context.h" + namespace paddle { namespace framework { @@ -68,6 +70,7 @@ class ParallelExecutor { private: ParallelExecutorPrivate *member_; + std::unique_ptr builder_; }; } // namespace framework diff --git a/paddle/fluid/operators/CMakeLists.txt b/paddle/fluid/operators/CMakeLists.txt index d6a36eff09c7f70803d3be619b26d16660da1ec2..d3988ae16d7d4ceccaf01503c6200066f2fa4073 100644 --- a/paddle/fluid/operators/CMakeLists.txt +++ b/paddle/fluid/operators/CMakeLists.txt @@ -184,9 +184,9 @@ else() set(DEPS_OPS ${DEPS_OPS} nccl_op) endif() -add_subdirectory(detail) if(WITH_DISTRIBUTE) - + add_subdirectory(distributed) + set(DISTRIBUTE_DEPS "") if(WITH_GRPC) set(DISTRIBUTE_DEPS sendrecvop_grpc grpc++_unsecure grpc_unsecure gpr cares zlib protobuf) @@ -195,18 +195,11 @@ if(WITH_DISTRIBUTE) endif() set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor") - op_library(prefetch_op DEPS ${DISTRIBUTE_DEPS}) - set_source_files_properties(prefetch_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) - op_library(recv_op DEPS ${DISTRIBUTE_DEPS}) - set_source_files_properties(recv_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) - op_library(listen_and_serv_op DEPS ${DISTRIBUTE_DEPS}) - set_source_files_properties(listen_and_serv_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) - op_library(send_op DEPS ${DISTRIBUTE_DEPS}) - set_source_files_properties(send_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) - op_library(send_barrier_op DEPS ${DISTRIBUTE_DEPS}) - op_library(fetch_barrier_op DEPS ${DISTRIBUTE_DEPS}) - set_source_files_properties(send_barrier_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) - set_source_files_properties(fetch_barrier_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) + foreach(dist_op "prefetch_op" "listen_and_serv_op" "send_op" "recv_op" "send_barrier_op" "fetch_barrier_op") + op_library(${dist_op} DEPS ${DISTRIBUTE_DEPS}) + set_source_files_properties(${dist_op}.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) + endforeach() + #set_source_files_properties(send_recv_op_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) #cc_test(test_send_recv SRCS send_recv_op_test.cc DEPS prefetch_op send_op # listen_and_serv_op sum_op executor SERIAL) diff --git a/paddle/fluid/operators/detail/macros.h b/paddle/fluid/operators/detail/macros.h index da1de72dad00db3ffe609e17bd198ef0a56bbfcd..b9e385994efcea0388756e8bd780ebfc719ed08d 100644 --- a/paddle/fluid/operators/detail/macros.h +++ b/paddle/fluid/operators/detail/macros.h @@ -15,13 +15,13 @@ #pragma once #ifdef PADDLE_WITH_GRPC -#include "paddle/fluid/operators/detail/grpc_client.h" -#include "paddle/fluid/operators/detail/grpc_server.h" -#define RPCSERVER_T detail::AsyncGRPCServer -#define RPCCLIENT_T detail::GRPCClient +#include "paddle/fluid/operators/distributed/grpc_client.h" +#include "paddle/fluid/operators/distributed/grpc_server.h" +#define RPCSERVER_T distributed::AsyncGRPCServer +#define RPCCLIENT_T distributed::GRPCClient #else -#include "paddle/fluid/operators/detail/brpc_client.h" -#include "paddle/fluid/operators/detail/brpc_server.h" -#define RPCSERVER_T detail::AsyncBRPCServer -#define RPCCLIENT_T detail::BRPCClient +#include "paddle/fluid/operators/distributed/brpc_client.h" +#include "paddle/fluid/operators/distributed/brpc_server.h" +#define RPCSERVER_T distributed::AsyncBRPCServer +#define RPCCLIENT_T distributed::BRPCClient #endif diff --git a/paddle/fluid/operators/detail/CMakeLists.txt b/paddle/fluid/operators/distributed/CMakeLists.txt similarity index 97% rename from paddle/fluid/operators/detail/CMakeLists.txt rename to paddle/fluid/operators/distributed/CMakeLists.txt index abc5aad0430e71928a441c9488dda16dfdd63b9c..312f80e09077f21a47985c1c936c2ac41c292ead 100644 --- a/paddle/fluid/operators/detail/CMakeLists.txt +++ b/paddle/fluid/operators/distributed/CMakeLists.txt @@ -1,8 +1,3 @@ -if(NOT WITH_DISTRIBUTE) - return() -endif() - - if(WITH_GRPC) grpc_library(sendrecvop_grpc SRCS bytebuffer_stream.cc sendrecvop_utils.cc grpc_client.cc request_handler_impl.cc rpc_client.cc rpc_server.cc grpc_server.cc variable_response.cc PROTO send_recv.proto DEPS lod_tensor diff --git a/paddle/fluid/operators/detail/brpc_client.cc b/paddle/fluid/operators/distributed/brpc_client.cc similarity index 98% rename from paddle/fluid/operators/detail/brpc_client.cc rename to paddle/fluid/operators/distributed/brpc_client.cc index 9a4e410f1d83e93883438fae116c38eb60787673..b394c678fb6503eb73a1e11e6feb814251e9e940 100644 --- a/paddle/fluid/operators/detail/brpc_client.cc +++ b/paddle/fluid/operators/distributed/brpc_client.cc @@ -12,12 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "paddle/fluid/operators/detail/brpc_client.h" +#include "paddle/fluid/operators/distributed/brpc_client.h" #include "paddle/fluid/framework/threadpool.h" namespace paddle { namespace operators { -namespace detail { +namespace distributed { DEFINE_int32(brpc_channel_num, 24, "Number of channels to send requests connected to one server"); @@ -175,6 +175,6 @@ ChannelQueuePtr BRPCClient::GetChannel(const std::string& ep) { return q; } -} // namespace detail +} // namespace distributed } // namespace operators } // namespace paddle diff --git a/paddle/fluid/operators/detail/brpc_client.h b/paddle/fluid/operators/distributed/brpc_client.h similarity index 94% rename from paddle/fluid/operators/detail/brpc_client.h rename to paddle/fluid/operators/distributed/brpc_client.h index 1e953ea431d51a9586bfd0b352c7f27d079ff1a8..34f140687f91d866536f5e2b647c7445a6624736 100644 --- a/paddle/fluid/operators/detail/brpc_client.h +++ b/paddle/fluid/operators/distributed/brpc_client.h @@ -31,13 +31,13 @@ limitations under the License. */ #include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/selected_rows.h" -#include "paddle/fluid/operators/detail/rpc_client.h" -#include "paddle/fluid/operators/detail/send_recv.pb.h" +#include "paddle/fluid/operators/distributed/rpc_client.h" +#include "paddle/fluid/operators/distributed/send_recv.pb.h" #include "paddle/fluid/platform/macros.h" // for DISABLE_COPY_AND_ASSIGN namespace paddle { namespace operators { -namespace detail { +namespace distributed { struct ChannelContext { brpc::Channel channel; @@ -95,6 +95,6 @@ class BRPCClient : public RPCClient { DISABLE_COPY_AND_ASSIGN(BRPCClient); }; -} // namespace detail +} // namespace distributed } // namespace operators } // namespace paddle diff --git a/paddle/fluid/operators/detail/brpc_server.cc b/paddle/fluid/operators/distributed/brpc_server.cc similarity index 86% rename from paddle/fluid/operators/detail/brpc_server.cc rename to paddle/fluid/operators/distributed/brpc_server.cc index 2170abe679f9ededff3b53e3139e56f8aad227cb..862167f02084cfe81db1c0936bbfb0415fa85721 100644 --- a/paddle/fluid/operators/detail/brpc_server.cc +++ b/paddle/fluid/operators/distributed/brpc_server.cc @@ -12,13 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "paddle/fluid/operators/detail/brpc_server.h" -#include "paddle/fluid/operators/detail/request_handler.h" +#include "paddle/fluid/operators/distributed/brpc_server.h" +#include "paddle/fluid/operators/distributed/request_handler.h" namespace sendrecv { typedef std::unordered_map + paddle::operators::distributed::RequestHandler*> HandlerMap; class BRPCServiceImpl : public SendRecvService { @@ -27,17 +27,17 @@ class BRPCServiceImpl : public SendRecvService { : request_send_h_(nullptr), request_get_h_(nullptr), request_prefetch_h_(nullptr) { - auto it = rpc_call_map.find(paddle::operators::detail::kRequestSend); + auto it = rpc_call_map.find(paddle::operators::distributed::kRequestSend); if (it != rpc_call_map.end()) { request_send_h_ = it->second; } - it = rpc_call_map.find(paddle::operators::detail::kRequestSend); + it = rpc_call_map.find(paddle::operators::distributed::kRequestSend); if (it != rpc_call_map.end()) { request_get_h_ = it->second; } - it = rpc_call_map.find(paddle::operators::detail::kRequestPrefetch); + it = rpc_call_map.find(paddle::operators::distributed::kRequestPrefetch); if (it != rpc_call_map.end()) { request_prefetch_h_ = it->second; } @@ -88,15 +88,15 @@ class BRPCServiceImpl : public SendRecvService { } private: - paddle::operators::detail::RequestHandler* request_send_h_; - paddle::operators::detail::RequestHandler* request_get_h_; - paddle::operators::detail::RequestHandler* request_prefetch_h_; + paddle::operators::distributed::RequestHandler* request_send_h_; + paddle::operators::distributed::RequestHandler* request_get_h_; + paddle::operators::distributed::RequestHandler* request_prefetch_h_; }; } // namespace sendrecv namespace paddle { namespace operators { -namespace detail { +namespace distributed { void AsyncBRPCServer::StartServer() { // Instance of your service. @@ -139,6 +139,6 @@ void AsyncBRPCServer::WaitServerReady() { VLOG(3) << "AsyncGRPCServer WaitSeverReady"; } -}; // namespace detail +}; // namespace distributed }; // namespace operators }; // namespace paddle diff --git a/paddle/fluid/operators/detail/brpc_server.h b/paddle/fluid/operators/distributed/brpc_server.h similarity index 88% rename from paddle/fluid/operators/detail/brpc_server.h rename to paddle/fluid/operators/distributed/brpc_server.h index 0105c8074a46849031d8fa9c21a5507a982ec3c3..85a7ad0dfe843dad483d43631b69a79d75211ce9 100644 --- a/paddle/fluid/operators/detail/brpc_server.h +++ b/paddle/fluid/operators/distributed/brpc_server.h @@ -19,12 +19,12 @@ limitations under the License. */ #include #include "brpc/server.h" -#include "paddle/fluid/operators/detail/rpc_server.h" -#include "paddle/fluid/operators/detail/send_recv.pb.h" +#include "paddle/fluid/operators/distributed/rpc_server.h" +#include "paddle/fluid/operators/distributed/send_recv.pb.h" namespace paddle { namespace operators { -namespace detail { +namespace distributed { class AsyncBRPCServer final : public RPCServer { public: @@ -48,6 +48,6 @@ class AsyncBRPCServer final : public RPCServer { int ready_; }; -}; // namespace detail +}; // namespace distributed }; // namespace operators }; // namespace paddle diff --git a/paddle/fluid/operators/detail/bytebuffer_stream.cc b/paddle/fluid/operators/distributed/bytebuffer_stream.cc similarity index 94% rename from paddle/fluid/operators/detail/bytebuffer_stream.cc rename to paddle/fluid/operators/distributed/bytebuffer_stream.cc index a14171563edb0ac9a22b7ae493c965de3efb7823..6e91b447db838c9095432eda22e9e1171e938d31 100644 --- a/paddle/fluid/operators/detail/bytebuffer_stream.cc +++ b/paddle/fluid/operators/distributed/bytebuffer_stream.cc @@ -17,11 +17,11 @@ limitations under the License. */ // file and did some modifications so that we can send gRPC // requests without too much copying of the tensor data. -#include "paddle/fluid/operators/detail/bytebuffer_stream.h" +#include "paddle/fluid/operators/distributed/bytebuffer_stream.h" namespace paddle { namespace operators { -namespace detail { +namespace distributed { GrpcByteBufferSource::GrpcByteBufferSource() {} @@ -83,6 +83,6 @@ google::protobuf::int64 GrpcByteBufferSource::ByteCount() const { return byte_count_; } -} // namespace detail +} // namespace distributed } // namespace operators } // namespace paddle diff --git a/paddle/fluid/operators/detail/bytebuffer_stream.h b/paddle/fluid/operators/distributed/bytebuffer_stream.h similarity index 99% rename from paddle/fluid/operators/detail/bytebuffer_stream.h rename to paddle/fluid/operators/distributed/bytebuffer_stream.h index 054dd4ff294414cca55d7e033f2c5403bbb85526..e7de172c79c30761483b5d96f5bad19860208832 100644 --- a/paddle/fluid/operators/detail/bytebuffer_stream.h +++ b/paddle/fluid/operators/distributed/bytebuffer_stream.h @@ -106,7 +106,7 @@ class GrpcBufferReader final namespace paddle { namespace operators { -namespace detail { +namespace distributed { // Source provides a way for a particular RPC implementation to provide // received data to ParseFrom. class Source { @@ -183,6 +183,6 @@ class GrpcByteSource : public Source { char space_[sizeof(Reader)]; }; -} // namespace detail +} // namespace distributed } // namespace operators } // namespace paddle diff --git a/paddle/fluid/operators/detail/grpc_client.cc b/paddle/fluid/operators/distributed/grpc_client.cc similarity index 97% rename from paddle/fluid/operators/detail/grpc_client.cc rename to paddle/fluid/operators/distributed/grpc_client.cc index ea004f7cd340030e61571825941a50e89735ef05..65d63784c1486f3304c053a2bd3c58b8b30eda2f 100644 --- a/paddle/fluid/operators/detail/grpc_client.cc +++ b/paddle/fluid/operators/distributed/grpc_client.cc @@ -12,19 +12,19 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include "paddle/fluid/operators/detail/grpc_client.h" +#include "paddle/fluid/operators/distributed/grpc_client.h" #include #include #include "paddle/fluid/framework/threadpool.h" -#include "paddle/fluid/operators/detail/request_handler.h" +#include "paddle/fluid/operators/distributed/request_handler.h" #include "paddle/fluid/platform/profiler.h" namespace paddle { namespace operators { -namespace detail { +namespace distributed { void GRPCClient::InitImpl() { InitEventLoop(); } @@ -276,6 +276,6 @@ std::shared_ptr GRPCClient::GetChannel(const std::string& ep) { return ch; } -} // namespace detail +} // namespace distributed } // namespace operators } // namespace paddle diff --git a/paddle/fluid/operators/detail/grpc_client.h b/paddle/fluid/operators/distributed/grpc_client.h similarity index 97% rename from paddle/fluid/operators/detail/grpc_client.h rename to paddle/fluid/operators/distributed/grpc_client.h index 44000c028b499d9ad1a0e0dd40a5e287cd61d143..a6efa7dfd11e87caafa6109391f133b0233d58dd 100644 --- a/paddle/fluid/operators/detail/grpc_client.h +++ b/paddle/fluid/operators/distributed/grpc_client.h @@ -38,13 +38,13 @@ limitations under the License. */ #include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/selected_rows.h" -#include "paddle/fluid/operators/detail/rpc_client.h" -#include "paddle/fluid/operators/detail/sendrecvop_utils.h" +#include "paddle/fluid/operators/distributed/rpc_client.h" +#include "paddle/fluid/operators/distributed/sendrecvop_utils.h" #include "paddle/fluid/platform/macros.h" // for DISABLE_COPY_AND_ASSIGN namespace paddle { namespace operators { -namespace detail { +namespace distributed { struct VarHandle { std::string ep; @@ -226,6 +226,6 @@ class GRPCClient : public RPCClient { DISABLE_COPY_AND_ASSIGN(GRPCClient); }; -} // namespace detail +} // namespace distributed } // namespace operators } // namespace paddle diff --git a/paddle/fluid/operators/detail/grpc_serde_test.cc b/paddle/fluid/operators/distributed/grpc_serde_test.cc similarity index 93% rename from paddle/fluid/operators/detail/grpc_serde_test.cc rename to paddle/fluid/operators/distributed/grpc_serde_test.cc index 15892295e6901fe649788c9e34604008fc8cbdfa..3d107b533bcb7bfef3f9b13ec99afbd579a62e52 100644 --- a/paddle/fluid/operators/detail/grpc_serde_test.cc +++ b/paddle/fluid/operators/distributed/grpc_serde_test.cc @@ -21,8 +21,8 @@ limitations under the License. */ #include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/tensor_util.h" #include "paddle/fluid/framework/variable.h" -#include "paddle/fluid/operators/detail/sendrecvop_utils.h" -#include "paddle/fluid/operators/detail/variable_response.h" +#include "paddle/fluid/operators/distributed/sendrecvop_utils.h" +#include "paddle/fluid/operators/distributed/variable_response.h" #include "paddle/fluid/operators/math/math_function.h" #include "paddle/fluid/platform/place.h" #include "paddle/fluid/string/printf.h" @@ -50,7 +50,7 @@ void RunSerdeTestSelectedRows(platform::Place place) { for (int i = 0; i < 564; ++i) rows->push_back(i); ::grpc::ByteBuffer msg; - operators::detail::SerializeToByteBuffer("myvar", &var, ctx, &msg); + operators::distributed::SerializeToByteBuffer("myvar", &var, ctx, &msg); EXPECT_GT(msg.Length(), static_cast(0)); // deserialize @@ -81,10 +81,10 @@ void RunSerdeTestSelectedRows(platform::Place place) { // deserialize zero-copy // framework::Variable var2; - // operators::detail::DeserializeFromByteBuffer(msg, ctx, &var2); + // operators::distributed::DeserializeFromByteBuffer(msg, ctx, &var2); framework::Scope scope; scope.Var("myvar"); - operators::detail::VariableResponse resp(&scope, &ctx); + operators::distributed::VariableResponse resp(&scope, &ctx); EXPECT_EQ(resp.Parse(msg), 0); framework::Variable* var2 = resp.GetVar(); @@ -128,7 +128,7 @@ void RunTestLodTensor(platform::Place place, int from_type = 0) { math::set_constant(ctx, tensor, 31.9); ::grpc::ByteBuffer msg; - operators::detail::SerializeToByteBuffer("myvar", &var, ctx, &msg); + operators::distributed::SerializeToByteBuffer("myvar", &var, ctx, &msg); EXPECT_GT(msg.Length(), static_cast(0)); // deserialize @@ -171,7 +171,7 @@ void RunTestLodTensor(platform::Place place, int from_type = 0) { // deserialize zero-copy framework::Scope scope; scope.Var("myvar"); - operators::detail::VariableResponse resp(&scope, &ctx); + operators::distributed::VariableResponse resp(&scope, &ctx); if (from_type == 0) { EXPECT_EQ(resp.Parse(msg), 0); } else { diff --git a/paddle/fluid/operators/detail/grpc_server.cc b/paddle/fluid/operators/distributed/grpc_server.cc similarity index 96% rename from paddle/fluid/operators/detail/grpc_server.cc rename to paddle/fluid/operators/distributed/grpc_server.cc index 5a87258901c6563fe793d4041f344011a56d9a01..707f665a29d83d7cdf6e4e80624f2402a7b0a2e7 100644 --- a/paddle/fluid/operators/detail/grpc_server.cc +++ b/paddle/fluid/operators/distributed/grpc_server.cc @@ -15,13 +15,13 @@ limitations under the License. */ #include #include -#include "paddle/fluid/operators/detail/grpc_server.h" +#include "paddle/fluid/operators/distributed/grpc_server.h" using ::grpc::ServerAsyncResponseWriter; namespace paddle { namespace operators { -namespace detail { +namespace distributed { enum CallStatus { PROCESS = 0, FINISH }; // reference: @@ -74,7 +74,7 @@ class RequestSend final : public RequestBase { request_.reset(new VariableResponse(request_handler->scope(), request_handler->dev_ctx(), !request_handler->sync_mode())); - int method_id = static_cast(detail::GrpcMethod::kSendVariable); + int method_id = static_cast(distributed::GrpcMethod::kSendVariable); service_->RequestAsyncUnary( method_id, &ctx_, request_.get(), &responder_, cq_, cq_, reinterpret_cast(static_cast(req_id))); @@ -106,7 +106,7 @@ class RequestGet final : public RequestBase { ::grpc::ServerCompletionQueue* cq, RequestHandler* request_handler, int req_id) : RequestBase(service, cq, request_handler, req_id), responder_(&ctx_) { - auto method_id = static_cast(detail::GrpcMethod::kGetVariable); + auto method_id = static_cast(distributed::GrpcMethod::kGetVariable); service_->RequestAsyncUnary( method_id, &ctx_, &request_, &responder_, cq_, cq_, reinterpret_cast(static_cast(req_id))); @@ -150,7 +150,8 @@ class RequestPrefetch final : public RequestBase { local_scope_(nullptr) { request_.reset(new VariableResponse(request_handler->scope(), request_handler->dev_ctx(), true)); - int method_id = static_cast(detail::GrpcMethod::kPrefetchVariable); + int method_id = + static_cast(distributed::GrpcMethod::kPrefetchVariable); service_->RequestAsyncUnary( method_id, &ctx_, request_.get(), &responder_, cq_, cq_, reinterpret_cast(static_cast(req_id))); @@ -354,6 +355,6 @@ void AsyncGRPCServer::HandleRequest( } } -} // namespace detail +} // namespace distributed } // namespace operators } // namespace paddle diff --git a/paddle/fluid/operators/detail/grpc_server.h b/paddle/fluid/operators/distributed/grpc_server.h similarity index 85% rename from paddle/fluid/operators/detail/grpc_server.h rename to paddle/fluid/operators/distributed/grpc_server.h index f1db7590f6f14d5d44acc12453861a446e278cd2..d2524f5e65db6dedab78f45e17380359b58a3d11 100644 --- a/paddle/fluid/operators/detail/grpc_server.h +++ b/paddle/fluid/operators/distributed/grpc_server.h @@ -29,17 +29,17 @@ limitations under the License. */ #include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/selected_rows.h" #include "paddle/fluid/framework/var_type.h" -#include "paddle/fluid/operators/detail/grpc_service.h" -#include "paddle/fluid/operators/detail/request_handler.h" -#include "paddle/fluid/operators/detail/rpc_server.h" -#include "paddle/fluid/operators/detail/send_recv.grpc.pb.h" -#include "paddle/fluid/operators/detail/send_recv.pb.h" -#include "paddle/fluid/operators/detail/sendrecvop_utils.h" +#include "paddle/fluid/operators/distributed/grpc_service.h" +#include "paddle/fluid/operators/distributed/request_handler.h" +#include "paddle/fluid/operators/distributed/rpc_server.h" +#include "paddle/fluid/operators/distributed/send_recv.grpc.pb.h" +#include "paddle/fluid/operators/distributed/send_recv.pb.h" +#include "paddle/fluid/operators/distributed/sendrecvop_utils.h" #include "paddle/fluid/platform/profiler.h" namespace paddle { namespace operators { -namespace detail { +namespace distributed { class RequestBase; @@ -84,6 +84,6 @@ class AsyncGRPCServer final : public RPCServer { std::map> rpc_reqs_; }; -}; // namespace detail +}; // namespace distributed }; // namespace operators }; // namespace paddle diff --git a/paddle/fluid/operators/detail/grpc_service.h b/paddle/fluid/operators/distributed/grpc_service.h similarity index 87% rename from paddle/fluid/operators/detail/grpc_service.h rename to paddle/fluid/operators/distributed/grpc_service.h index e0505c2b9d0903837713d7e0032b01ab091c2e04..141be3e68012743a32e4df5de148a55717f8e9a2 100644 --- a/paddle/fluid/operators/detail/grpc_service.h +++ b/paddle/fluid/operators/distributed/grpc_service.h @@ -23,7 +23,7 @@ #include #include #include -#include "paddle/fluid/operators/detail/variable_response.h" +#include "paddle/fluid/operators/distributed/variable_response.h" #include "paddle/fluid/platform/profiler.h" @@ -42,24 +42,25 @@ class ServerContext; // Support parsing/unparsing of tensorflow::VariableResponse. // Wire-format is identical to RecvVariableResponse. template <> -class SerializationTraits { +class SerializationTraits { public: static Status Serialize( - const paddle::operators::detail::VariableResponse& msg, + const paddle::operators::distributed::VariableResponse& msg, grpc_byte_buffer** bp, bool* own_buffer) { PADDLE_ENFORCE(false, "SerializationTraits::Serialize not implemented!"); return Status(); } - static Status Deserialize(grpc_byte_buffer* buffer, - paddle::operators::detail::VariableResponse* msg, - int max_message_size = INT_MAX) { + static Status Deserialize( + grpc_byte_buffer* buffer, + paddle::operators::distributed::VariableResponse* msg, + int max_message_size = INT_MAX) { if (buffer == nullptr) { return Status(StatusCode::INTERNAL, "No payload"); } Status result = g_core_codegen_interface->ok(); if (result.ok()) { - paddle::operators::detail::GrpcByteSource source(buffer); + paddle::operators::distributed::GrpcByteSource source(buffer); int ret = msg->Parse(&source); if (ret != 0) { result = Status(StatusCode::INTERNAL, "VariableResponse parse error"); @@ -73,7 +74,7 @@ class SerializationTraits { namespace paddle { namespace operators { -namespace detail { +namespace distributed { enum class GrpcMethod { kSendVariable, @@ -118,6 +119,6 @@ class GrpcService final { }; }; -} // namespace detail +} // namespace distributed } // namespace operators } // namespace paddle diff --git a/paddle/fluid/operators/detail/proto_encoder_helper.h b/paddle/fluid/operators/distributed/proto_encoder_helper.h similarity index 98% rename from paddle/fluid/operators/detail/proto_encoder_helper.h rename to paddle/fluid/operators/distributed/proto_encoder_helper.h index d91d054b2507f32d1e948dde33da06a70cabe775..2fab02e32fe18ee04f86a69bb5bae1cbe7c6762c 100644 --- a/paddle/fluid/operators/detail/proto_encoder_helper.h +++ b/paddle/fluid/operators/distributed/proto_encoder_helper.h @@ -26,7 +26,7 @@ limitations under the License. */ namespace paddle { namespace operators { -namespace detail { +namespace distributed { char* EncodeVarint32(char* dst, uint32_t v) { // Operate on characters as unsigneds @@ -144,6 +144,6 @@ class ProtoEncodeHelper { char* limit_; // Just for CHECKs }; -} // namespace detail +} // namespace distributed } // namespace operators } // namespace paddle diff --git a/paddle/fluid/operators/detail/request_handler.h b/paddle/fluid/operators/distributed/request_handler.h similarity index 98% rename from paddle/fluid/operators/detail/request_handler.h rename to paddle/fluid/operators/distributed/request_handler.h index a2d08747d59220d30a5b8fd56074fd2739ae3bab..cf106656aa56c2130d8be8dbe7478c3397f9b9ad 100644 --- a/paddle/fluid/operators/detail/request_handler.h +++ b/paddle/fluid/operators/distributed/request_handler.h @@ -31,7 +31,7 @@ namespace paddle { namespace operators { -namespace detail { +namespace distributed { constexpr char kRequestSend[] = "RequestSend"; constexpr char kRequestGet[] = "RequestGet"; @@ -124,6 +124,6 @@ class RequestHandler { RPCServer* rpc_server_; }; -} // namespace detail +} // namespace distributed } // namespace operators } // namespace paddle diff --git a/paddle/fluid/operators/detail/request_handler_impl.cc b/paddle/fluid/operators/distributed/request_handler_impl.cc similarity index 95% rename from paddle/fluid/operators/detail/request_handler_impl.cc rename to paddle/fluid/operators/distributed/request_handler_impl.cc index 7425bee798cd9ba0af8cd777a6db63862c8a4031..cb78c15c01e8e7f47ec759a75090f9a6b880b493 100644 --- a/paddle/fluid/operators/detail/request_handler_impl.cc +++ b/paddle/fluid/operators/distributed/request_handler_impl.cc @@ -20,12 +20,12 @@ #include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/selected_rows.h" -#include "paddle/fluid/operators/detail/request_handler_impl.h" -#include "paddle/fluid/operators/detail/rpc_server.h" +#include "paddle/fluid/operators/distributed/request_handler_impl.h" +#include "paddle/fluid/operators/distributed/rpc_server.h" namespace paddle { namespace operators { -namespace detail { +namespace distributed { bool RequestSendHandler::Handle(const std::string& varname, framework::Scope* scope, @@ -119,6 +119,6 @@ bool RequestPrefetchHandler::Handle(const std::string& varname, return true; } -} // namespace detail +} // namespace distributed } // namespace operators } // namespace paddle diff --git a/paddle/fluid/operators/detail/request_handler_impl.h b/paddle/fluid/operators/distributed/request_handler_impl.h similarity index 95% rename from paddle/fluid/operators/detail/request_handler_impl.h rename to paddle/fluid/operators/distributed/request_handler_impl.h index 3f77c09a9598b431d747f1b824615e49d939098e..abbe8778911a21ece3090bc9790d51a3cb31b6d7 100644 --- a/paddle/fluid/operators/detail/request_handler_impl.h +++ b/paddle/fluid/operators/distributed/request_handler_impl.h @@ -28,11 +28,11 @@ #include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/selected_rows.h" #include "paddle/fluid/framework/var_type.h" -#include "paddle/fluid/operators/detail/request_handler.h" +#include "paddle/fluid/operators/distributed/request_handler.h" namespace paddle { namespace operators { -namespace detail { +namespace distributed { class RequestSendHandler final : public RequestHandler { public: @@ -66,6 +66,6 @@ class RequestPrefetchHandler final : public RequestHandler { const std::string& out_var_name = "") override; }; -} // namespace detail +} // namespace distributed } // namespace operators } // namespace paddle diff --git a/paddle/fluid/operators/detail/rpc_client.cc b/paddle/fluid/operators/distributed/rpc_client.cc similarity index 88% rename from paddle/fluid/operators/detail/rpc_client.cc rename to paddle/fluid/operators/distributed/rpc_client.cc index 9a791403e3d6b99c5d4de5183e83e1af655d7d4c..c71edf977c18e554c502732e9bf4bb4ea99f8f99 100644 --- a/paddle/fluid/operators/detail/rpc_client.cc +++ b/paddle/fluid/operators/distributed/rpc_client.cc @@ -12,15 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "paddle/fluid/operators/detail/rpc_client.h" +#include "paddle/fluid/operators/distributed/rpc_client.h" namespace paddle { namespace operators { -namespace detail { +namespace distributed { std::once_flag RPCClient::init_flag_; std::unique_ptr RPCClient::rpc_client_(nullptr); -} // namespace detail +} // namespace distributed } // namespace operators } // namespace paddle diff --git a/paddle/fluid/operators/detail/rpc_client.h b/paddle/fluid/operators/distributed/rpc_client.h similarity index 98% rename from paddle/fluid/operators/detail/rpc_client.h rename to paddle/fluid/operators/distributed/rpc_client.h index 47c6ffb4fd7a002fc0bd8053fb3314a2fbf18fd3..72fa6d940886bc676e9d03d13f12d07772f5f5a7 100644 --- a/paddle/fluid/operators/detail/rpc_client.h +++ b/paddle/fluid/operators/distributed/rpc_client.h @@ -22,7 +22,7 @@ namespace paddle { namespace operators { -namespace detail { +namespace distributed { class RPCClient { public: @@ -84,6 +84,6 @@ class RPCClient { static std::once_flag init_flag_; static std::unique_ptr rpc_client_; }; -} // namespace detail +} // namespace distributed } // namespace operators } // namespace paddle diff --git a/paddle/fluid/operators/detail/rpc_server.cc b/paddle/fluid/operators/distributed/rpc_server.cc similarity index 96% rename from paddle/fluid/operators/detail/rpc_server.cc rename to paddle/fluid/operators/distributed/rpc_server.cc index cd0fe96e2301ee3304fe9a2967df58b9f7072d8d..fa0cb71b3056de92f65139c5402132fc8cbb7a87 100644 --- a/paddle/fluid/operators/detail/rpc_server.cc +++ b/paddle/fluid/operators/distributed/rpc_server.cc @@ -17,11 +17,11 @@ #include #include -#include "paddle/fluid/operators/detail/rpc_server.h" +#include "paddle/fluid/operators/distributed/rpc_server.h" namespace paddle { namespace operators { -namespace detail { +namespace distributed { void RPCServer::ShutDown() { LOG(INFO) << "RPCServer ShutDown "; @@ -112,6 +112,6 @@ void RPCServer::WaitCond(const std::string& rpc_name) { lock, [=] { return (cur_cond_.load() == cond || exit_flag_.load()); }); } -} // namespace detail +} // namespace distributed } // namespace operators } // namespace paddle diff --git a/paddle/fluid/operators/detail/rpc_server.h b/paddle/fluid/operators/distributed/rpc_server.h similarity index 95% rename from paddle/fluid/operators/detail/rpc_server.h rename to paddle/fluid/operators/distributed/rpc_server.h index 2e3342428cb56c34abaca655d5906668cda8f140..cf25e78435bb470b25a46db647ca818571cc83a5 100644 --- a/paddle/fluid/operators/detail/rpc_server.h +++ b/paddle/fluid/operators/distributed/rpc_server.h @@ -19,11 +19,11 @@ #include // NOLINT #include #include -#include "paddle/fluid/operators/detail/request_handler.h" +#include "paddle/fluid/operators/distributed/request_handler.h" namespace paddle { namespace operators { -namespace detail { +namespace distributed { class RPCServer { public: @@ -86,6 +86,6 @@ class RPCServer { friend class RequestHandler; }; -}; // namespace detail +}; // namespace distributed }; // namespace operators }; // namespace paddle diff --git a/paddle/fluid/operators/detail/rpc_server_test.cc b/paddle/fluid/operators/distributed/rpc_server_test.cc similarity index 87% rename from paddle/fluid/operators/detail/rpc_server_test.cc rename to paddle/fluid/operators/distributed/rpc_server_test.cc index 463a7b80cfac280de5afe91ee85caaaf074cef32..a0693cffabcc561b0adfafc2c49027a890dd5efc 100644 --- a/paddle/fluid/operators/detail/rpc_server_test.cc +++ b/paddle/fluid/operators/distributed/rpc_server_test.cc @@ -22,18 +22,18 @@ limitations under the License. */ #include "paddle/fluid/framework/operator.h" #include "paddle/fluid/operators/detail/macros.h" -#include "paddle/fluid/operators/detail/request_handler_impl.h" -#include "paddle/fluid/operators/detail/rpc_client.h" -#include "paddle/fluid/operators/detail/rpc_server.h" +#include "paddle/fluid/operators/distributed/request_handler_impl.h" +#include "paddle/fluid/operators/distributed/rpc_client.h" +#include "paddle/fluid/operators/distributed/rpc_server.h" namespace framework = paddle::framework; namespace platform = paddle::platform; -namespace detail = paddle::operators::detail; +namespace distributed = paddle::operators::distributed; USE_OP(lookup_table); -std::unique_ptr g_rpc_service; -std::unique_ptr g_req_handler; +std::unique_ptr g_rpc_service; +std::unique_ptr g_req_handler; framework::BlockDesc* AppendPrefetchBlcok(framework::ProgramDesc* program) { auto root_block = program->MutableBlock(0); @@ -113,19 +113,21 @@ void StartServer() { g_req_handler->SetScope(&scope); g_req_handler->SetExecutor(&exe); - g_rpc_service->RegisterRPC(detail::kRequestPrefetch, g_req_handler.get()); + g_rpc_service->RegisterRPC(distributed::kRequestPrefetch, + g_req_handler.get()); g_req_handler->SetRPCServer(g_rpc_service.get()); std::thread server_thread( - std::bind(&detail::RPCServer::StartServer, g_rpc_service.get())); + std::bind(&distributed::RPCServer::StartServer, g_rpc_service.get())); server_thread.join(); } TEST(PREFETCH, CPU) { - g_req_handler.reset(new detail::RequestPrefetchHandler(true)); + g_req_handler.reset(new distributed::RequestPrefetchHandler(true)); g_rpc_service.reset(new RPCSERVER_T("127.0.0.1:0", 1)); - detail::RPCClient* client = detail::RPCClient::GetInstance(); + distributed::RPCClient* client = + distributed::RPCClient::GetInstance(); std::thread server_thread(StartServer); g_rpc_service->WaitServerReady(); diff --git a/paddle/fluid/operators/detail/send_recv.proto b/paddle/fluid/operators/distributed/send_recv.proto similarity index 100% rename from paddle/fluid/operators/detail/send_recv.proto rename to paddle/fluid/operators/distributed/send_recv.proto diff --git a/paddle/fluid/operators/detail/sendrecvop_utils.cc b/paddle/fluid/operators/distributed/sendrecvop_utils.cc similarity index 95% rename from paddle/fluid/operators/detail/sendrecvop_utils.cc rename to paddle/fluid/operators/distributed/sendrecvop_utils.cc index 507b465435609a91ebca97dd70b176c3b79bee02..98129d9f1014c39347e3409533f2bc10092611d2 100644 --- a/paddle/fluid/operators/detail/sendrecvop_utils.cc +++ b/paddle/fluid/operators/distributed/sendrecvop_utils.cc @@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include "paddle/fluid/operators/detail/sendrecvop_utils.h" +#include "paddle/fluid/operators/distributed/sendrecvop_utils.h" #ifdef PADDLE_WITH_CUDA #include @@ -23,14 +23,14 @@ limitations under the License. */ #include "google/protobuf/io/coded_stream.h" #include "google/protobuf/io/zero_copy_stream.h" #include "paddle/fluid/framework/data_type.h" -#include "paddle/fluid/operators/detail/bytebuffer_stream.h" -#include "paddle/fluid/operators/detail/proto_encoder_helper.h" -#include "paddle/fluid/operators/detail/variable_response.h" +#include "paddle/fluid/operators/distributed/bytebuffer_stream.h" +#include "paddle/fluid/operators/distributed/proto_encoder_helper.h" +#include "paddle/fluid/operators/distributed/variable_response.h" #include "paddle/fluid/platform/profiler.h" namespace paddle { namespace operators { -namespace detail { +namespace distributed { using VarMsg = sendrecv::VariableMessage; @@ -222,11 +222,11 @@ void DeserializeFromByteBuffer(const ::grpc::ByteBuffer& msg, const platform::DeviceContext& ctx, const framework::Scope* scope, framework::Variable** var) { - operators::detail::VariableResponse resp(scope, &ctx); + operators::distributed::VariableResponse resp(scope, &ctx); PADDLE_ENFORCE(resp.Parse(msg) == 0, "parse bytebuffer to tensor error!"); *var = resp.GetVar(); } -} // namespace detail +} // namespace distributed } // namespace operators } // namespace paddle diff --git a/paddle/fluid/operators/detail/sendrecvop_utils.h b/paddle/fluid/operators/distributed/sendrecvop_utils.h similarity index 92% rename from paddle/fluid/operators/detail/sendrecvop_utils.h rename to paddle/fluid/operators/distributed/sendrecvop_utils.h index bd16bf1dab8d933ffd18b6d6d9e3ce1c7d73029b..fe25e73fa608727ba0bb912a82776b330ec8d83a 100644 --- a/paddle/fluid/operators/detail/sendrecvop_utils.h +++ b/paddle/fluid/operators/distributed/sendrecvop_utils.h @@ -25,12 +25,12 @@ limitations under the License. */ #include "paddle/fluid/framework/tensor_util.h" #include "paddle/fluid/framework/var_type.h" -#include "paddle/fluid/operators/detail/send_recv.grpc.pb.h" -#include "paddle/fluid/operators/detail/send_recv.pb.h" +#include "paddle/fluid/operators/distributed/send_recv.grpc.pb.h" +#include "paddle/fluid/operators/distributed/send_recv.pb.h" namespace paddle { namespace operators { -namespace detail { +namespace distributed { typedef void (*DestroyCallback)(void*); @@ -61,6 +61,6 @@ inline std::type_index ToTypeIndex(sendrecv::VariableMessage::Type type) { } } -} // namespace detail +} // namespace distributed } // namespace operators } // namespace paddle diff --git a/paddle/fluid/operators/detail/variable_response.cc b/paddle/fluid/operators/distributed/variable_response.cc similarity index 96% rename from paddle/fluid/operators/detail/variable_response.cc rename to paddle/fluid/operators/distributed/variable_response.cc index 24cb91a3bb820a0e5d51aaa49154434919080f69..619890b1939be8777b89b94e415a3c2d63376658 100644 --- a/paddle/fluid/operators/detail/variable_response.cc +++ b/paddle/fluid/operators/distributed/variable_response.cc @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "paddle/fluid/operators/detail/variable_response.h" +#include "paddle/fluid/operators/distributed/variable_response.h" #include #include @@ -22,12 +22,12 @@ #endif #include "paddle/fluid/platform/profiler.h" -#include "paddle/fluid/operators/detail/send_recv.pb.h" -#include "paddle/fluid/operators/detail/sendrecvop_utils.h" +#include "paddle/fluid/operators/distributed/send_recv.pb.h" +#include "paddle/fluid/operators/distributed/sendrecvop_utils.h" namespace paddle { namespace operators { -namespace detail { +namespace distributed { enum WireType { WIRETYPE_VARINT = 0, @@ -158,13 +158,13 @@ bool VariableResponse::CopySelectRowsTensorData( slr->set_height(meta_.slr_height()); auto* tensor = slr->mutable_value(); tensor->Resize(dims); - PADDLE_ENFORCE_EQ( - static_cast(tensor->numel()), - length / framework::SizeOfType( - paddle::operators::detail::ToTypeIndex(meta_.data_type()))); + PADDLE_ENFORCE_EQ(static_cast(tensor->numel()), + length / framework::SizeOfType( + paddle::operators::distributed::ToTypeIndex( + meta_.data_type()))); void* tensor_data = tensor->mutable_data( ctx.GetPlace(), - paddle::operators::detail::ToTypeIndex(meta_.data_type())); + paddle::operators::distributed::ToTypeIndex(meta_.data_type())); if (!ReadRaw(input, ctx, tensor->place(), tensor_data, length)) { return false; @@ -480,6 +480,6 @@ int VariableResponse::Parse(Source* source) { return 0; } -}; // namespace detail +}; // namespace distributed }; // namespace operators }; // namespace paddle diff --git a/paddle/fluid/operators/detail/variable_response.h b/paddle/fluid/operators/distributed/variable_response.h similarity index 92% rename from paddle/fluid/operators/detail/variable_response.h rename to paddle/fluid/operators/distributed/variable_response.h index 69cfd784f8dd4f129f50c6882061e53e8535b949..1db4a0a522654ff2497b8bd9ee1381b5ab64067a 100644 --- a/paddle/fluid/operators/detail/variable_response.h +++ b/paddle/fluid/operators/distributed/variable_response.h @@ -22,17 +22,17 @@ #include "paddle/fluid/framework/selected_rows.h" #include "paddle/fluid/framework/var_type.h" -#include "paddle/fluid/operators/detail/send_recv.grpc.pb.h" -#include "paddle/fluid/operators/detail/send_recv.pb.h" +#include "paddle/fluid/operators/distributed/send_recv.grpc.pb.h" +#include "paddle/fluid/operators/distributed/send_recv.pb.h" #include "google/protobuf/io/coded_stream.h" #include "google/protobuf/io/zero_copy_stream.h" #include "paddle/fluid/framework/tensor.h" -#include "paddle/fluid/operators/detail/bytebuffer_stream.h" +#include "paddle/fluid/operators/distributed/bytebuffer_stream.h" namespace paddle { namespace operators { -namespace detail { +namespace distributed { class VariableResponse { public: @@ -99,6 +99,6 @@ class VariableResponse { sendrecv::VariableMessage meta_; }; -}; // namespace detail +}; // namespace distributed }; // namespace operators }; // namespace paddle diff --git a/paddle/fluid/operators/fetch_barrier_op.cc b/paddle/fluid/operators/fetch_barrier_op.cc index 98b051afb551f373009d2bd3df1a8daa64b7e6c7..02beb80fc8a9f451393dcdd54492c4f88f908497 100644 --- a/paddle/fluid/operators/fetch_barrier_op.cc +++ b/paddle/fluid/operators/fetch_barrier_op.cc @@ -42,8 +42,8 @@ class FetchBarrierOp : public framework::OperatorBase { // For profiling platform::RecordEvent record_event(Type(), &ctx); - detail::RPCClient* rpc_client = - detail::RPCClient::GetInstance(); + distributed::RPCClient* rpc_client = + distributed::RPCClient::GetInstance(); rpc_client->Wait(); diff --git a/paddle/fluid/operators/gen_nccl_id_op.cc b/paddle/fluid/operators/gen_nccl_id_op.cc index f824eee4e7d1ef19c9a38fd5d3369265f9c549a0..697c239e59d158428ae9ba9f7feded19637dff28 100644 --- a/paddle/fluid/operators/gen_nccl_id_op.cc +++ b/paddle/fluid/operators/gen_nccl_id_op.cc @@ -22,7 +22,7 @@ limitations under the License. */ #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/threadpool.h" #include "paddle/fluid/operators/detail/macros.h" -#include "paddle/fluid/operators/detail/request_handler_impl.h" +#include "paddle/fluid/operators/distributed/request_handler_impl.h" #include "paddle/fluid/platform/nccl_helper.h" namespace paddle { @@ -60,7 +60,8 @@ class GenNCCLIdOp : public framework::OperatorBase { std::vector endpoint_list = Attr>("endpoint_list"); - detail::RPCClient* client = detail::RPCClient::GetInstance(); + distributed::RPCClient* client = + distributed::RPCClient::GetInstance(); for (auto& ep : endpoint_list) { VLOG(3) << "sending nccl id to " << ep; @@ -80,11 +81,11 @@ class GenNCCLIdOp : public framework::OperatorBase { // NOTE: Can not use unique_ptr here because the default // deleter will call GRPC Server's base class's dtor and // that will cause a wired crash. - detail::RequestSendHandler rpc_h(true); - std::unique_ptr rpc_service( + distributed::RequestSendHandler rpc_h(true); + std::unique_ptr rpc_service( new RPCSERVER_T(endpoint, 1)); - rpc_service->RegisterRPC(detail::kRequestSend, &rpc_h); + rpc_service->RegisterRPC(distributed::kRequestSend, &rpc_h); rpc_h.SetRPCServer(rpc_service.get()); framework::ProgramDesc empty_program; @@ -95,11 +96,11 @@ class GenNCCLIdOp : public framework::OperatorBase { rpc_h.SetExecutor(&executor); std::thread server_thread( - std::bind(&detail::RPCServer::StartServer, rpc_service.get())); + std::bind(&distributed::RPCServer::StartServer, rpc_service.get())); - rpc_service->SetCond(detail::kRequestSend); + rpc_service->SetCond(distributed::kRequestSend); VLOG(3) << "start getting nccl id from trainer 0..."; - rpc_service->WaitBarrier(detail::kRequestSend); + rpc_service->WaitBarrier(distributed::kRequestSend); VLOG(3) << "got nccl id and stop server..."; rpc_service->ShutDown(); VLOG(3) << "rpc server stopped"; diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 2fbd62505d7f144c962119600f464788b41abeed..0f2863cc5991224136175813ef93b9bea4bd0712 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -21,14 +21,14 @@ limitations under the License. */ #include "paddle/fluid/operators/detail/macros.h" -#include "paddle/fluid/operators/detail/request_handler_impl.h" +#include "paddle/fluid/operators/distributed/request_handler_impl.h" #include "paddle/fluid/operators/listen_and_serv_op.h" #include "paddle/fluid/platform/profiler.h" namespace paddle { namespace operators { -void RunServer(std::shared_ptr service) { +void RunServer(std::shared_ptr service) { service->StartServer(); VLOG(4) << "RunServer thread end"; } @@ -119,12 +119,12 @@ void ListenAndServOp::RunSyncLoop( while (true) { // Get from multiple trainers, we don't care about the order in which // the gradients arrives, just add suffix 0~n and merge the gradient. - rpc_service_->SetCond(detail::kRequestSend); - rpc_service_->WaitBarrier(detail::kRequestSend); + rpc_service_->SetCond(distributed::kRequestSend); + rpc_service_->WaitBarrier(distributed::kRequestSend); if (rpc_service_->IsExit()) { LOG(WARNING) << "get exit!rpc_processor break!"; - rpc_service_->SetCond(detail::kRequestGet); + rpc_service_->SetCond(distributed::kRequestGet); break; } @@ -152,11 +152,11 @@ void ListenAndServOp::RunSyncLoop( recv_scope); VLOG(2) << "run all blocks spent " << GetTimestamp() - ts << "(ms)"; - rpc_service_->SetCond(detail::kRequestGet); - rpc_service_->WaitBarrier(detail::kRequestGet); + rpc_service_->SetCond(distributed::kRequestGet); + rpc_service_->WaitBarrier(distributed::kRequestGet); rpc_service_->ResetBarrierCounter(); // reset received sparse vars to avoid reuse it in the next mini-batch - dynamic_cast(request_send_handler_.get()) + dynamic_cast(request_send_handler_.get()) ->ResetSparseVarRecorder(); } // while(true) } @@ -213,13 +213,13 @@ void ListenAndServOp::RunAsyncLoop(framework::Executor *executor, } static void FillRequestCtx( - detail::RequestHandler *h, framework::Scope *scope, + distributed::RequestHandler *h, framework::Scope *scope, platform::DeviceContext *dev_ctx, framework::Executor *executor, framework::ProgramDesc *program, std::unordered_map> *prefetch_ctx, - detail::RPCServer *rpc_server) { + distributed::RPCServer *rpc_server) { h->SetScope(scope); h->SetDevCtx(dev_ctx); h->SetExecutor(executor); @@ -247,14 +247,16 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, rpc_service_.reset(new RPCSERVER_T(endpoint, fan_in)); - request_send_handler_.reset(new detail::RequestSendHandler(sync_mode)); - request_get_handler_.reset(new detail::RequestGetHandler(sync_mode)); + request_send_handler_.reset(new distributed::RequestSendHandler(sync_mode)); + request_get_handler_.reset(new distributed::RequestGetHandler(sync_mode)); request_prefetch_handler_.reset( - new detail::RequestPrefetchHandler(sync_mode)); + new distributed::RequestPrefetchHandler(sync_mode)); - rpc_service_->RegisterRPC(detail::kRequestSend, request_send_handler_.get()); - rpc_service_->RegisterRPC(detail::kRequestGet, request_get_handler_.get()); - rpc_service_->RegisterRPC(detail::kRequestPrefetch, + rpc_service_->RegisterRPC(distributed::kRequestSend, + request_send_handler_.get()); + rpc_service_->RegisterRPC(distributed::kRequestGet, + request_get_handler_.get()); + rpc_service_->RegisterRPC(distributed::kRequestPrefetch, request_prefetch_handler_.get()); auto optimize_blocks = diff --git a/paddle/fluid/operators/listen_and_serv_op.h b/paddle/fluid/operators/listen_and_serv_op.h index 55da61e34a67c2ae859368e1671182807e0ad832..634c1b4f4b541be9f4950a9ef48f944863486705 100644 --- a/paddle/fluid/operators/listen_and_serv_op.h +++ b/paddle/fluid/operators/listen_and_serv_op.h @@ -24,8 +24,8 @@ limitations under the License. */ #include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/threadpool.h" -#include "paddle/fluid/operators/detail/request_handler.h" -#include "paddle/fluid/operators/detail/rpc_server.h" +#include "paddle/fluid/operators/distributed/request_handler.h" +#include "paddle/fluid/operators/distributed/rpc_server.h" namespace paddle { namespace operators { @@ -33,7 +33,7 @@ namespace operators { constexpr char kOptimizeBlocks[] = "optimize_blocks"; constexpr char kPrefetchVarNameToBlockId[] = "prefetch_var_name_to_block_id"; -void RunServer(std::shared_ptr service); +void RunServer(std::shared_ptr service); class ListenAndServOp : public framework::OperatorBase { public: @@ -62,10 +62,11 @@ class ListenAndServOp : public framework::OperatorBase { const platform::Place& dev_place) const override; protected: - mutable std::shared_ptr rpc_service_; - mutable std::shared_ptr request_send_handler_; - mutable std::shared_ptr request_get_handler_; - mutable std::shared_ptr request_prefetch_handler_; + mutable std::shared_ptr rpc_service_; + mutable std::shared_ptr request_send_handler_; + mutable std::shared_ptr request_get_handler_; + mutable std::shared_ptr + request_prefetch_handler_; mutable std::shared_ptr server_thread_; }; diff --git a/paddle/fluid/operators/math/concat.cc b/paddle/fluid/operators/math/concat.cc index 14964fc62af6dd947e49a2054511780a1bb20cf2..55c8a472aca7fe700ef6a3f96bed1496d7b12b80 100644 --- a/paddle/fluid/operators/math/concat.cc +++ b/paddle/fluid/operators/math/concat.cc @@ -93,10 +93,10 @@ class ConcatGradFunctor { auto cpu_place = boost::get(context.GetPlace()); // computation - for (size_t k = 0; k < input_rows; ++k) { + for (int k = 0; k < input_rows; ++k) { const T* src_ptr = input.data() + k * input_cols; int col_idx = 0; - for (int j = 0; j < num; ++j) { + for (size_t j = 0; j < num; ++j) { int col_len = output_cols[j]; auto* out_tensor = outputs->at(j); if (out_tensor != nullptr) { diff --git a/paddle/fluid/operators/parallel_do_op.cc b/paddle/fluid/operators/parallel_do_op.cc index c9744db3d0654ef63357963d9a9a3cb946f56e2d..1012640d5e2052e4f347ad458cea9072a004f334 100644 --- a/paddle/fluid/operators/parallel_do_op.cc +++ b/paddle/fluid/operators/parallel_do_op.cc @@ -295,7 +295,7 @@ class ParallelDoGradOp : public framework::OperatorBase { auto sum_op = framework::OpRegistry::CreateOp( "sum", {{"X", {s, tmp_name}}}, {{"Out", {s}}}, - framework::AttributeMap{{"use_mkldnn", {false}}}); + framework::AttributeMap{}); VLOG(10) << sum_op->DebugStringEx(sub_scopes[0]); sum_op->Run(*sub_scopes[0], places[0]); WaitOnPlace(places[0]); diff --git a/paddle/fluid/operators/prefetch_op.cc b/paddle/fluid/operators/prefetch_op.cc index f71ba84b318c1f8b0604310f3db8a0826124e207..8734282fe496b8e90af19abd5549566d62316fc3 100644 --- a/paddle/fluid/operators/prefetch_op.cc +++ b/paddle/fluid/operators/prefetch_op.cc @@ -41,8 +41,8 @@ class PrefetchOp : public framework::OperatorBase { platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); auto& ctx = *pool.Get(place); - detail::RPCClient* rpc_client = - detail::RPCClient::GetInstance(); + distributed::RPCClient* rpc_client = + distributed::RPCClient::GetInstance(); for (size_t i = 0; i < ins.size(); i++) { if (NeedSend(scope, ins[i])) { diff --git a/paddle/fluid/operators/recurrent_op.cc b/paddle/fluid/operators/recurrent_op.cc index 162bfcbb0844d29385d0f8ad5d25a3f8de6bd41b..9c1cee7022a9b9a98f026f7602f0f7badc44a49b 100644 --- a/paddle/fluid/operators/recurrent_op.cc +++ b/paddle/fluid/operators/recurrent_op.cc @@ -429,8 +429,7 @@ class RecurrentGradOp : public RecurrentBase { auto sum_op = framework::OpRegistry::CreateOp( "sum", {{"X", {pg_names[param_id], new_inside_name}}}, - {{"Out", {pg_names[param_id]}}}, - framework::AttributeMap{{"use_mkldnn", {false}}}); + {{"Out", {pg_names[param_id]}}}, framework::AttributeMap{}); sum_op->Run(cur_scope, place); cur_scope.Rename(new_inside_name, inside_grad_name); diff --git a/paddle/fluid/operators/recv_op.cc b/paddle/fluid/operators/recv_op.cc index 15dfb5469bf51330b98d6699fb3ce708222212ed..9854a31f5b10f5ecd940c0d41c2c3e468fc17bad 100644 --- a/paddle/fluid/operators/recv_op.cc +++ b/paddle/fluid/operators/recv_op.cc @@ -43,8 +43,8 @@ class RecvOp : public framework::OperatorBase { // For profiling platform::RecordEvent record_event(Type(), &ctx); - detail::RPCClient* rpc_client = - detail::RPCClient::GetInstance(); + distributed::RPCClient* rpc_client = + distributed::RPCClient::GetInstance(); for (size_t i = 0; i < outs.size(); i++) { VLOG(3) << "getting " << outs[i] << " from " << epmap[i]; diff --git a/paddle/fluid/operators/send_barrier_op.cc b/paddle/fluid/operators/send_barrier_op.cc index c6c975a23ce846464388c72af5d8902144ceb16a..6b4572dcccc21e783f1df0b9bcde11d532ff4ba8 100644 --- a/paddle/fluid/operators/send_barrier_op.cc +++ b/paddle/fluid/operators/send_barrier_op.cc @@ -44,8 +44,8 @@ class SendBarrierOp : public framework::OperatorBase { // For profiling platform::RecordEvent record_event(Type(), &ctx); - detail::RPCClient* rpc_client = - detail::RPCClient::GetInstance(); + distributed::RPCClient* rpc_client = + distributed::RPCClient::GetInstance(); VLOG(3) << "SendBarrierOp sync_mode:" << sync_mode; diff --git a/paddle/fluid/operators/send_op.cc b/paddle/fluid/operators/send_op.cc index 84ec36625314572d16e5c537884b6efec420cc60..0cac329aafa8c4c67cae48ba62a48575f5edba92 100644 --- a/paddle/fluid/operators/send_op.cc +++ b/paddle/fluid/operators/send_op.cc @@ -45,8 +45,8 @@ class SendOp : public framework::OperatorBase { // For profiling platform::RecordEvent record_event(Type(), &ctx); - detail::RPCClient* rpc_client = - detail::RPCClient::GetInstance(); + distributed::RPCClient* rpc_client = + distributed::RPCClient::GetInstance(); for (size_t i = 0; i < ins.size(); i++) { if (NeedSend(scope, ins[i])) { diff --git a/paddle/fluid/operators/sum_mkldnn_op.cc b/paddle/fluid/operators/sum_mkldnn_op.cc deleted file mode 100644 index f78d977760f18c9eb1270e515e68acb208a7c9a4..0000000000000000000000000000000000000000 --- a/paddle/fluid/operators/sum_mkldnn_op.cc +++ /dev/null @@ -1,240 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/*Licensed under the Apache License, Version 2.0(the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. */ - -#include "mkldnn.hpp" -#include "paddle/fluid/framework/tensor.h" -#include "paddle/fluid/operators/math/selected_rows_functor.h" -#include "paddle/fluid/operators/sum_op.h" -#include "paddle/fluid/platform/device_context.h" -#include "paddle/fluid/platform/mkldnn_helper.h" - -namespace paddle { -namespace operators { - -using paddle::framework::Tensor; -using paddle::platform::MKLDNNDeviceContext; -using paddle::platform::CPUDeviceContext; -using framework::DataLayout; -using mkldnn::memory; -using mkldnn::primitive; -using mkldnn::stream; -using mkldnn::sum; -using mkldnn::reorder; -using platform::to_void_cast; - -template -class SumMKLDNNOpKernel : public paddle::framework::OpKernel { - public: - void Compute(const paddle::framework::ExecutionContext& ctx) const override { - PADDLE_ENFORCE(paddle::platform::is_cpu_place(ctx.GetPlace()), - "It must use CPUPlace."); - auto& dev_ctx = ctx.template device_context(); - const auto& mkldnn_engine = dev_ctx.GetEngine(); - auto in_vars = ctx.MultiInputVar("X"); - - const int N = in_vars.size(); - auto out_var = ctx.OutputVar("Out"); - bool in_place = out_var == in_vars[0]; - - if (out_var->IsType()) { - LoDTensor* output = ctx.Output("Out"); - T* output_data = output->mutable_data(ctx.GetPlace()); - - std::vector dst_tz = framework::vectorize2int(output->dims()); - auto src_tz = dst_tz; - memory::format output_format{memory::format::format_undef}; - std::vector scales; - std::vector srcs_mpd; - std::vector srcs_mem; - - PADDLE_ENFORCE(in_vars[0]->IsType(), - "Input[0] must be LoDTensors"); - auto& input0 = in_vars[0]->Get(); - PADDLE_ENFORCE(input0.layout() == DataLayout::kMKLDNN && - input0.format() != memory::format::format_undef, - "Wrong layout/format for inputs[0]"); - - memory::format input_format = input0.format(); - - if (src_tz.size() == 1 && (input_format == memory::format::nchw || - input_format == memory::format::nhwc)) { - input_format = memory::format::x; - } - if (src_tz.size() == 2 && (input_format == memory::format::nchw || - input_format == memory::format::nhwc)) { - input_format = memory::format::nc; - } - - for (int i = in_place ? 1 : 0; i < N; i++) { - PADDLE_ENFORCE(in_vars[i]->IsType(), - "all inputs must be all LoDTensors"); - auto& input = in_vars[i]->Get(); - PADDLE_ENFORCE(input.layout() == DataLayout::kMKLDNN && - input.format() != memory::format::format_undef, - "Wrong layout/format for inputs"); - - if (input.numel() == 0) { - continue; - } - - const T* input_data = input.data(); - - auto src_md = - memory::desc(src_tz, memory::data_type::f32, input_format); - auto src_mpd = memory::primitive_desc(src_md, mkldnn_engine); - auto src_mem = memory(src_mpd, to_void_cast(input_data)); - srcs_mpd.push_back(src_mpd); - srcs_mem.push_back(src_mem); - scales.push_back(1.0); - } - - auto dst_md = - memory::desc(dst_tz, memory::data_type::f32, memory::format::any); - - auto sum_pd = sum::primitive_desc(dst_md, scales, srcs_mpd); - - std::shared_ptr dst_mem; - if (in_place) { - dst_mem.reset(new memory(sum_pd.dst_primitive_desc())); - } else { - dst_mem.reset(new memory(sum_pd.dst_primitive_desc(), output_data)); - } - std::vector inputs; - for (size_t i = 0; i < srcs_mem.size(); ++i) { - inputs.push_back(srcs_mem[i]); - } - - auto sum_prim = mkldnn::sum(sum_pd, inputs, *dst_mem); - output_format = (memory::format)platform::GetMKLDNNFormat(sum_pd); - - primitive reorder_prim; - std::shared_ptr target_mem; - if (in_place) { - output_format = input_format; - target_mem.reset(new memory( - {{{src_tz}, memory::data_type::f32, output_format}, mkldnn_engine}, - output_data)); - reorder_prim = reorder(*dst_mem, *target_mem); - } - - std::vector pipeline; - pipeline.push_back(sum_prim); - if (in_place) pipeline.push_back(reorder_prim); - stream(stream::kind::eager).submit(pipeline).wait(); - - output->set_layout(DataLayout::kMKLDNN); - output->set_format(output_format); - } else if (out_var->IsType()) { - // TODO(@mozga-intel) Add MKLDNN SelectedRows support - std::unique_ptr in0; - if (in_place) { - // If is in_place, we store the input[0] to in0 - auto& in_sel0 = in_vars[0]->Get(); - auto& rows = in_sel0.rows(); - in0.reset(new framework::SelectedRows(rows, in_sel0.height())); - in0->mutable_value()->ShareDataWith(in_sel0.value()); - } - - auto get_selected_row = [&](size_t i) -> const SelectedRows& { - if (i == 0 && in0) { - return *in0.get(); - } else { - return in_vars[i]->Get(); - } - }; - auto* out = ctx.Output("Out"); - out->mutable_rows()->clear(); - auto* out_value = out->mutable_value(); - - // Runtime InferShape - size_t first_dim = 0; - for (int i = 0; i < N; i++) { - auto& sel_row = get_selected_row(i); - first_dim += sel_row.rows().size(); - } - auto in_dim = - framework::vectorize(get_selected_row(N - 1).value().dims()); - in_dim[0] = static_cast(first_dim); - - out_value->Resize(framework::make_ddim(in_dim)); - - // if all the input sparse vars are empty, no need to - // merge these vars. - if (first_dim == 0UL) { - return; - } - out_value->mutable_data(ctx.GetPlace()); - math::SelectedRowsAddTo functor; - int64_t offset = 0; - for (int i = 0; i < N; i++) { - auto& sel_row = get_selected_row(i); - if (sel_row.rows().size() == 0) { - continue; - } - PADDLE_ENFORCE_EQ(out->height(), sel_row.height()); - functor(ctx.template device_context(), sel_row, - offset, out); - offset += sel_row.value().numel(); - } - } else if (out_var->IsType()) { - // TODO(@mozga-intel) Add MKLDNN LoDTensorArray support - auto& out_array = *out_var->GetMutable(); - for (size_t i = in_place ? 1 : 0; i < in_vars.size(); ++i) { - PADDLE_ENFORCE(in_vars[i]->IsType(), - "Only support all inputs are TensorArray"); - auto& in_array = in_vars[i]->Get(); - - for (size_t i = 0; i < in_array.size(); ++i) { - if (in_array[i].numel() != 0) { - if (i >= out_array.size()) { - out_array.resize(i + 1); - } - if (out_array[i].numel() == 0) { - framework::TensorCopy(in_array[i], in_array[i].place(), - ctx.device_context(), &out_array[i]); - out_array[i].set_lod(in_array[i].lod()); - } else { - PADDLE_ENFORCE(out_array[i].lod() == in_array[i].lod()); - auto in = EigenVector::Flatten(in_array[i]); - auto result = EigenVector::Flatten(out_array[i]); - result.device(*ctx.template device_context() - .eigen_device()) = result + in; - } - } - } - } - } else { - PADDLE_THROW("Unexpected branch, output variable type is %s", - out_var->Type().name()); - } - } -}; - -} // namespace operators -} // namespace paddle - -REGISTER_OP_KERNEL(sum, MKLDNN, ::paddle::platform::CPUPlace, - paddle::operators::SumMKLDNNOpKernel); diff --git a/paddle/fluid/operators/sum_op.cc b/paddle/fluid/operators/sum_op.cc index fe7c7039c7dec714e265ede1b7167fd800ddc2f7..863baba9ea7663d0b21875e0b423dc4a6ce2d59a 100644 --- a/paddle/fluid/operators/sum_op.cc +++ b/paddle/fluid/operators/sum_op.cc @@ -18,10 +18,6 @@ limitations under the License. */ #include "paddle/fluid/framework/var_type_inference.h" #include "paddle/fluid/operators/detail/safe_ref.h" -#ifdef PADDLE_WITH_MKLDNN -#include "paddle/fluid/platform/mkldnn_helper.h" -#endif - namespace paddle { namespace operators { using framework::Tensor; @@ -67,18 +63,6 @@ class SumOp : public framework::OperatorWithKernel { framework::OpKernelType GetExpectedKernelType( const framework::ExecutionContext& ctx) const override { auto x_vars = ctx.MultiInputVar("X"); - - framework::LibraryType library{framework::LibraryType::kPlain}; - framework::DataLayout layout{framework::DataLayout::kAnyLayout}; - -#ifdef PADDLE_WITH_MKLDNN - if (library == framework::LibraryType::kPlain && - platform::CanMKLDNNBeUsed(ctx)) { - library = framework::LibraryType::kMKLDNN; - layout = framework::DataLayout::kMKLDNN; - } -#endif - if (x_vars[0]->IsType()) { int dtype = -1; for (auto& x_var : x_vars) { @@ -96,27 +80,26 @@ class SumOp : public framework::OperatorWithKernel { "Sum operator should have at least one tensor"); return framework::OpKernelType( - static_cast(dtype), ctx.GetPlace(), - layout, library); + static_cast(dtype), + ctx.device_context()); } else if (x_vars[0]->IsType()) { for (auto& var : x_vars) { auto& value = var->Get().value(); if (value.IsInitialized()) { return framework::OpKernelType(framework::ToDataType(value.type()), - ctx.device_context(), layout, library); + ctx.device_context()); } } // if input sparse vars are not initialized, use an default kernel type. return framework::OpKernelType(framework::proto::VarType::FP32, - ctx.device_context(), layout, library); + ctx.device_context()); } else if (x_vars[0]->IsType()) { for (auto& x_var : x_vars) { auto& array = x_var->Get(); for (auto& each : array) { if (each.numel() != 0) { return framework::OpKernelType(framework::ToDataType(each.type()), - ctx.device_context(), layout, - library); + ctx.device_context()); } } } @@ -133,9 +116,6 @@ class SumOpMaker : public framework::OpProtoAndCheckerMaker { AddInput("X", "(vector) The input tensors of sum operator.") .AsDuplicable(); AddOutput("Out", "(Tensor) The output tensor of sum operator.").Reuse("X"); - AddAttr("use_mkldnn", - "(bool, default false) Only used in mkldnn kernel") - .SetDefault(false); AddComment(R"DOC( Sum operator. @@ -152,6 +132,7 @@ class SumOpVarTypeInference : public framework::VarTypeInference { framework::BlockDesc* block) const override { auto& inputs = op_desc.Input("X"); auto var_type = framework::proto::VarType::SELECTED_ROWS; + for (auto& name : op_desc.Input("X")) { VLOG(10) << name << " " << block->FindRecursiveOrCreateVar(name).GetType(); @@ -225,7 +206,6 @@ namespace ops = paddle::operators; REGISTER_OPERATOR(sum, ops::SumOp, ops::SumOpMaker, ops::SumGradMaker, ops::SumOpVarTypeInference); - REGISTER_OP_CPU_KERNEL( sum, ops::SumKernel, ops::SumKernel, diff --git a/paddle/fluid/operators/test_send_nccl_id.cc b/paddle/fluid/operators/test_send_nccl_id.cc index 5015b1005569ba70b147ebb795243e24ab81ea5c..e2b7b6b8e447381229e4ad594b7974bc0aa159d5 100644 --- a/paddle/fluid/operators/test_send_nccl_id.cc +++ b/paddle/fluid/operators/test_send_nccl_id.cc @@ -21,7 +21,7 @@ limitations under the License. */ #include "paddle/fluid/framework/operator.h" #include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/operators/detail/macros.h" -#include "paddle/fluid/operators/detail/request_handler_impl.h" +#include "paddle/fluid/operators/distributed/request_handler_impl.h" #include "paddle/fluid/operators/listen_and_serv_op.h" #include "paddle/fluid/operators/math/math_function.h" #include "paddle/fluid/operators/math/selected_rows_functor.h" @@ -37,11 +37,11 @@ USE_NO_KERNEL_OP(listen_and_serv); namespace f = paddle::framework; namespace p = paddle::platform; namespace m = paddle::operators::math; -namespace detail = paddle::operators::detail; +namespace distributed = paddle::operators::distributed; namespace string = paddle::string; -std::unique_ptr g_rpc_service; -std::unique_ptr g_req_handler; +std::unique_ptr g_rpc_service; +std::unique_ptr g_req_handler; void StartServer() { f::Scope scope; @@ -57,14 +57,14 @@ void StartServer() { g_req_handler->SetProgram(&empty_program); g_req_handler->SetExecutor(&executor); - g_rpc_service->RegisterRPC(detail::kRequestSend, g_req_handler.get()); + g_rpc_service->RegisterRPC(distributed::kRequestSend, g_req_handler.get()); g_req_handler->SetRPCServer(g_rpc_service.get()); std::thread server_thread( - std::bind(&detail::RPCServer::StartServer, g_rpc_service.get())); + std::bind(&distributed::RPCServer::StartServer, g_rpc_service.get())); - g_rpc_service->SetCond(detail::kRequestSend); - g_rpc_service->WaitBarrier(detail::kRequestSend); + g_rpc_service->SetCond(distributed::kRequestSend); + g_rpc_service->WaitBarrier(distributed::kRequestSend); LOG(INFO) << "got nccl id and stop server..."; g_rpc_service->ShutDown(); @@ -72,7 +72,7 @@ void StartServer() { } TEST(SendNcclId, RPCServer) { - g_req_handler.reset(new detail::RequestSendHandler(true)); + g_req_handler.reset(new distributed::RequestSendHandler(true)); g_rpc_service.reset(new RPCSERVER_T("127.0.0.1:0", 1)); std::thread server_thread(StartServer); @@ -91,7 +91,8 @@ TEST(SendNcclId, RPCServer) { std::string ep = string::Sprintf("127.0.0.1:%d", port); - detail::RPCClient* client = detail::RPCClient::GetInstance(); + distributed::RPCClient* client = + distributed::RPCClient::GetInstance(); LOG(INFO) << "connect to server" << ep; client->AsyncSendVar(ep, dev_ctx, scope, NCCL_ID_VARNAME); diff --git a/paddle/fluid/operators/while_op.cc b/paddle/fluid/operators/while_op.cc index f440058e8db2024f5c8a0129db3af87a80d6e551..175c3ac5d79f24e47d21417df8e3eaeb4d5b2335 100644 --- a/paddle/fluid/operators/while_op.cc +++ b/paddle/fluid/operators/while_op.cc @@ -203,11 +203,11 @@ class WhileGradOp : public framework::OperatorBase { ->set_lod(inside_tensor.lod()); } } + auto new_inside_name = cur_scope.Rename(inside_grad_name); auto sum_op = framework::OpRegistry::CreateOp( "sum", {{"X", {pg_names[param_id], new_inside_name}}}, - {{"Out", {pg_names[param_id]}}}, - framework::AttributeMap{{"use_mkldnn", {false}}}); + {{"Out", {pg_names[param_id]}}}, framework::AttributeMap{}); sum_op->Run(cur_scope, dev_place); cur_scope.Rename(new_inside_name, inside_grad_name); } diff --git a/paddle/fluid/platform/mkldnn_helper.h b/paddle/fluid/platform/mkldnn_helper.h index 2689d5e0787e0164bfb8e539399d8a378964e50a..de711b7d23ef01d57a62087c552ea090f01f0386 100644 --- a/paddle/fluid/platform/mkldnn_helper.h +++ b/paddle/fluid/platform/mkldnn_helper.h @@ -99,11 +99,5 @@ inline mkldnn::memory::format GetMKLDNNFormat(const mkldnn::memory memory) { memory.get_primitive_desc().desc().data.format); } -inline mkldnn::memory::format GetMKLDNNFormat( - const mkldnn::sum::primitive_desc& memory) { - return static_cast( - memory.dst_primitive_desc().desc().data.format); -} - } // namespace platform } // namespace paddle diff --git a/python/paddle/fluid/backward.py b/python/paddle/fluid/backward.py index 4faa06303170488d0de2fda4c1461cfe2d623d35..f7bbc98fe1f41c3a049692d36915d0a722779416 100644 --- a/python/paddle/fluid/backward.py +++ b/python/paddle/fluid/backward.py @@ -132,9 +132,9 @@ def _addup_repetitive_outputs_(op_descs): for idx, op_desc in enumerate(op_descs): for var_name in op_desc.input_arg_names(): if len(renamed_vars[var_name]) > 1: - pending_sum_ops.append((_create_op_desc_( - "sum", {"X": renamed_vars[var_name]}, {"Out": [var_name]}, - {"use_mkldnn": False}), idx)) + pending_sum_ops.append( + (_create_op_desc_("sum", {"X": renamed_vars[var_name]}, + {"Out": [var_name]}, {}), idx)) renamed_vars[var_name] = [var_name] for var_name in op_desc.output_arg_names(): if var_name == core.empty_var_name( @@ -161,9 +161,8 @@ def _addup_repetitive_outputs_(op_descs): renamed_vars[var_name].append(new_name) for var_name, inputs in renamed_vars.iteritems(): if len(inputs) > 1: - pending_sum_ops.append( - (_create_op_desc_("sum", {"X": inputs}, {"Out": [var_name]}, - {"use_mkldnn": False}), len(op_descs))) + pending_sum_ops.append((_create_op_desc_( + "sum", {"X": inputs}, {"Out": [var_name]}, {}), len(op_descs))) # sum_op descs are sorted according to their insert position for p in reversed(pending_sum_ops): op_descs.insert(p[1], p[0]) diff --git a/python/paddle/fluid/evaluator.py b/python/paddle/fluid/evaluator.py index 7c6ad6f27dcfd7040f79c72c01413c8cc84a28ba..00ba1a0457583d1cc1fa7136ebd51e9ced167832 100644 --- a/python/paddle/fluid/evaluator.py +++ b/python/paddle/fluid/evaluator.py @@ -41,7 +41,12 @@ def _clone_var_(block, var): class Evaluator(object): """ - Base Class for all evaluators + Warning: better to use the fluid.metrics.* things, more + flexible support via pure Python and Operator, and decoupled + with executor. Short doc are intended to urge new user + start from Metrics. + + Base Class for all evaluators. Args: name(str): The name of evaluator. such as, "accuracy". Used for generate @@ -69,6 +74,10 @@ class Evaluator(object): def reset(self, executor, reset_program=None): """ reset metric states at the begin of each pass/user specified batch + + Args: + executor(Executor|ParallelExecutor): a executor for executing the reset_program + reset_program(Program): a single Program for reset process """ if reset_program is None: reset_program = Program() @@ -85,15 +94,16 @@ class Evaluator(object): def eval(self, executor, eval_program=None): """ Evaluate the statistics merged by multiple mini-batches. + Args: + executor(Executor|ParallelExecutor): a executor for executing the eval_program + eval_program(Program): a single Program for eval process """ raise NotImplementedError() - def create_state(self, suffix, dtype, shape): + def _create_state(self, suffix, dtype, shape): """ Create state variable. - NOTE: It is not a public API. - Args: suffix(str): the state suffix. dtype(str|core.VarDesc.VarType): the state data type @@ -113,9 +123,35 @@ class Evaluator(object): class ChunkEvaluator(Evaluator): """ + Warning: This would be deprecated in the future. Please use fluid.metrics.ChunkEvaluator + instead. + Accumulate counter numbers output by chunk_eval from mini-batches and compute the precision recall and F1-score using the accumulated counter numbers. + For some basics of chunking, please refer to + 'Chunking with Support Vector Machines '. + + Args: + input (Variable): prediction output of the network. + label (Variable): label of the test data set. + chunk_scheme (str): can be IOB/IOE/IOBES and IO. See the chunk_eval op for details. + num_chunk_types (int): the number of chunk type. + excluded_chunk_types (list): A list including chunk type ids, indicating chunk types that are not counted. + + Returns: + tuple: tuple containing: precision, recall, f1_score + + Examples: + .. code-block:: python + + exe = fluid.executor(place) + evaluator = fluid.Evaluator.ChunkEvaluator(input, label) + for epoch in PASS_NUM: + evaluator.reset(exe) + for data in batches: + loss = exe.run(fetch_list=[cost]) + distance, instance_error = distance_evaluator.eval(exe) """ def __init__( @@ -130,11 +166,11 @@ class ChunkEvaluator(Evaluator): if main_program.current_block().idx != 0: raise ValueError("You can only invoke Evaluator in root block") - self.num_infer_chunks = self.create_state( + self.num_infer_chunks = self._create_state( dtype='int64', shape=[1], suffix='num_infer_chunks') - self.num_label_chunks = self.create_state( + self.num_label_chunks = self._create_state( dtype='int64', shape=[1], suffix='num_label_chunks') - self.num_correct_chunks = self.create_state( + self.num_correct_chunks = self._create_state( dtype='int64', shape=[1], suffix='num_correct_chunks') precision, recall, f1_score, num_infer_chunks, num_label_chunks, num_correct_chunks = layers.chunk_eval( input=input, @@ -178,6 +214,8 @@ class ChunkEvaluator(Evaluator): class EditDistance(Evaluator): """ + Warning: This would be deprecated in the future. Please use fluid.metrics.EditDistance + instead. Accumulate edit distance sum and sequence number from mini-batches and compute the average edit_distance and instance error of all batches. @@ -188,15 +226,16 @@ class EditDistance(Evaluator): ignored_tokens(list of int): Tokens that should be removed before calculating edit distance. - Example: + Examples: + .. code-block:: python - exe = fluid.executor(place) - distance_evaluator = fluid.Evaluator.EditDistance(input, label) - for epoch in PASS_NUM: - distance_evaluator.reset(exe) - for data in batches: - loss = exe.run(fetch_list=[cost]) - distance, instance_error = distance_evaluator.eval(exe) + exe = fluid.executor(place) + distance_evaluator = fluid.Evaluator.EditDistance(input, label) + for epoch in PASS_NUM: + distance_evaluator.reset(exe) + for data in batches: + loss = exe.run(fetch_list=[cost]) + distance, instance_error = distance_evaluator.eval(exe) In the above example: 'distance' is the average of the edit distance in a pass. @@ -210,11 +249,11 @@ class EditDistance(Evaluator): if main_program.current_block().idx != 0: raise ValueError("You can only invoke Evaluator in root block") - self.total_distance = self.create_state( + self.total_distance = self._create_state( dtype='float32', shape=[1], suffix='total_distance') - self.seq_num = self.create_state( + self.seq_num = self._create_state( dtype='int64', shape=[1], suffix='seq_num') - self.instance_error = self.create_state( + self.instance_error = self._create_state( dtype='int64', shape=[1], suffix='instance_error') distances, seq_num = layers.edit_distance( input=input, label=label, ignored_tokens=ignored_tokens) @@ -256,9 +295,10 @@ class EditDistance(Evaluator): class DetectionMAP(Evaluator): """ + Warning: This would be deprecated in the future. Please use fluid.metrics.DetectionMAP + instead. Calculate the detection mean average precision (mAP). - TODO (Dang Qingqing): update the following doc. The general steps are as follows: 1. calculate the true positive and false positive according to the input of detection and labels. @@ -293,17 +333,18 @@ class DetectionMAP(Evaluator): - 11point: the 11-point interpolated average precision. - integral: the natural integral of the precision-recall curve. - Example: + Examples: + .. code-block:: python - exe = fluid.executor(place) - map_evaluator = fluid.Evaluator.DetectionMAP(input, - gt_label, gt_box, gt_difficult) - cur_map, accum_map = map_evaluator.get_map_var() - fetch = [cost, cur_map, accum_map] - for epoch in PASS_NUM: - map_evaluator.reset(exe) - for data in batches: - loss, cur_map_v, accum_map_v = exe.run(fetch_list=fetch) + exe = fluid.executor(place) + map_evaluator = fluid.Evaluator.DetectionMAP(input, + gt_label, gt_box, gt_difficult) + cur_map, accum_map = map_evaluator.get_map_var() + fetch = [cost, cur_map, accum_map] + for epoch in PASS_NUM: + map_evaluator.reset(exe) + for data in batches: + loss, cur_map_v, accum_map_v = exe.run(fetch_list=fetch) In the above example: @@ -340,9 +381,10 @@ class DetectionMAP(Evaluator): evaluate_difficult=evaluate_difficult, ap_version=ap_version) - self.create_state(dtype='int32', shape=None, suffix='accum_pos_count') - self.create_state(dtype='float32', shape=None, suffix='accum_true_pos') - self.create_state(dtype='float32', shape=None, suffix='accum_false_pos') + self._create_state(dtype='int32', shape=None, suffix='accum_pos_count') + self._create_state(dtype='float32', shape=None, suffix='accum_true_pos') + self._create_state( + dtype='float32', shape=None, suffix='accum_false_pos') self.has_state = None var = self.helper.create_variable( diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 159b0ca39eed547e4f3448e7ebf4807299d465b2..dc275674618ee147dad2e32c7db29132ab55eb29 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -18,7 +18,7 @@ from framework import Program, default_main_program, Variable from . import core __all__ = [ - 'Executor', 'global_scope', 'scope_guard', 'switch_scope', 'fetch_var' + 'Executor', 'global_scope', 'scope_guard', '_switch_scope', 'fetch_var' ] g_scope = core.Scope() @@ -35,7 +35,7 @@ def global_scope(): return g_scope -def switch_scope(scope): +def _switch_scope(scope): global g_scope ex = g_scope g_scope = scope @@ -57,12 +57,27 @@ def scope_guard(scope): Args: scope: The new global/default scope. """ - ex = switch_scope(scope) + ex = _switch_scope(scope) yield - switch_scope(ex) + _switch_scope(ex) def as_numpy(tensor): + """ + Convert a Tensor to a numpy.ndarray, its only support Tensor without LoD information. + For higher dimensional sequence data, please use LoDTensor directly. + Examples: + >>> import paddle.fluid as fluid + >>> outs = executor.run(...) + >>> np_outs = map(lambda x: as_numpy(x), outs) + >>> ... + + Args: + tensor(Variable): a instance of Tensor + + Returns: + numpy.ndarray + """ if isinstance(tensor, list): return [as_numpy(t) for t in tensor] assert isinstance(tensor, core.LoDTensor) @@ -186,7 +201,7 @@ def fetch_var(name, scope=None, return_numpy=True): return tensor -def get_program_cache_key(feed, fetch_list): +def _get_program_cache_key(feed, fetch_list): feed_var_names = feed.keys() def to_name_str(var): @@ -205,6 +220,25 @@ def get_program_cache_key(feed, fetch_list): class Executor(object): + """ + An Executor in Python, only support the single-GPU running. For multi-cards, please refer to + ParallelExecutor. + Python executor takes a program, add feed operators and fetch operators to this program according + to feed map and fetch_list. Feed map provides input data for the program. fetch_list provides + the variables(or names) that user want to get after program run. Note: the executor will run all + operators in the program but not only the operators dependent by the fetch_list. + It store the global variables into the global scope, and create a local scope for the temporary + variables. The local scope contents will be discarded after every minibatch forward/backward finished. + But the global scope variables will be persistent through different runs. + All of ops in program will be running in sequence. + + Args: + place(core.CPUPlace|core.CUDAPlace(n)): indicate the executor run on which device + + Note: For debugging complicated network in parallel-GPUs, you can test it on the executor. + They has the exactly same arguments, and expected the same results. + """ + def __init__(self, place): self.place = place p = core.Place() @@ -213,6 +247,23 @@ class Executor(object): self.program_caches = dict() def as_lodtensor(self, data): + """ + Convert numpy.ndarray to Tensor, its only support Tensor without LoD information. + For higher dimensional sequence data, please use LoDTensor directly. + + Examples: + >>> import paddle.fluid as fluid + >>> exe = fluid.executor(fluid.CPUPlace()) + >>> data = np.array(size=(100, 200, 300)) + >>> np_outs = map(lambda x: exe.as_lodtensor(x), data) + >>> ... + + Args: + data(numpy.ndarray): a instance of array + + Returns: + LoDTensor + """ if isinstance(data, list): raise RuntimeError("Some of your feed data hold LoD information. \ They can not be completely cast from a list of Python \ @@ -304,23 +355,47 @@ class Executor(object): scope=None, return_numpy=True, use_program_cache=False): - """ Run program by this Executor. Feed data by feed map, fetch result by fetch_list. - + """ + Run program by this Executor. Feed data by feed map, fetch result by fetch_list. Python executor takes a program, add feed operators and fetch operators to this program according to feed map and fetch_list. Feed map provides input data for the program. fetch_list provides - the variables(or names) that user want to get after program run. Note: the executor will run all + the variables(or names) that user want to get after program run. + + Note: the executor will run all operators in the program but not only the operators dependent by the fetch_list - :param program: the program that need to run, if not provied, then default_main_program will be used. - :param feed: feed variable map, e.g. {"image": ImageData, "label": LableData} - :param fetch_list: a list of variable or variable names that user want to get, run will return them according - to this list. - :param feed_var_name: the name for the input variable of feed Operator. - :param fetch_var_name: the name for the output variable of feed Operator. - :param scope: the scope used to run this program, you can switch it to different scope. default is global_scope - :param return_numpy: if convert the fetched tensor to numpy - :param use_program_cache: set use_program_cache to true if program not changed compare to the last step. - :return: result according to fetch_list. + Args: + program(Program): the program that need to run, if not provied, then default_main_program will be used. + feed(dict): feed variable map, e.g. {"image": ImageData, "label": LableData} + fetch_list(list): a list of variable or variable names that user want to get, run will return them according to this list. + feed_var_name(str): the name for the input variable of feed Operator. + fetch_var_name(str): the name for the output variable of fetch Operator. + scope(Scope): the scope used to run this program, you can switch it to different scope. default is global_scope + return_numpy(bool): if convert the fetched tensor to numpy + use_program_cache(bool): set use_program_cache to true if program not changed compare to the last step. + + Returns: + + list(numpy.array): fetch result according to fetch_list. + + + Examples: + + >>> data = layers.data(name='X', shape=[1], dtype='float32') + >>> hidden = layers.fc(input=data, size=10) + >>> layers.assign(hidden, out) + >>> loss = layers.mean(out) + >>> adam = fluid.optimizer.Adam() + >>> adam.minimize(loss) + + >>> cpu = core.CPUPlace() + >>> exe = Executor(cpu) + >>> exe.run(default_startup_program()) + + >>> x = numpy.random.random(size=(10, 1)).astype('float32') + >>> outs = exe.run( + >>> feed={'X': x}, + >>> fetch_list=[loss.name]) """ if feed is None: feed = {} @@ -341,7 +416,7 @@ class Executor(object): if scope is None: scope = global_scope() - cache_key = get_program_cache_key(feed, fetch_list) + cache_key = _get_program_cache_key(feed, fetch_list) if use_program_cache: cached_program = self._get_program_cache(cache_key) if cached_program is None: diff --git a/python/paddle/fluid/layers/__init__.py b/python/paddle/fluid/layers/__init__.py index a568f61dcb2da976baa7847ae26281a34d6f88dd..cd1492da24d5e9d09a9eaac0b1b9c7aaffac6250 100644 --- a/python/paddle/fluid/layers/__init__.py +++ b/python/paddle/fluid/layers/__init__.py @@ -28,8 +28,8 @@ import math_op_patch from math_op_patch import * import detection from detection import * -import metric -from metric import * +import metric_op +from metric_op import * from learning_rate_scheduler import * __all__ = [] @@ -41,5 +41,5 @@ __all__ += control_flow.__all__ __all__ += ops.__all__ __all__ += device.__all__ __all__ += detection.__all__ -__all__ += metric.__all__ +__all__ += metric_op.__all__ __all__ += learning_rate_scheduler.__all__ diff --git a/python/paddle/fluid/layers/metric.py b/python/paddle/fluid/layers/metric_op.py similarity index 99% rename from python/paddle/fluid/layers/metric.py rename to python/paddle/fluid/layers/metric_op.py index 58de1b6b9fe17a24203e93de6780190b9fc6b3e7..99e82fdd04282177fae63f1fb94b5e32d41c612e 100644 --- a/python/paddle/fluid/layers/metric.py +++ b/python/paddle/fluid/layers/metric_op.py @@ -126,7 +126,7 @@ def auc(input, label, curve='ROC', num_thresholds=200): topk_out, topk_indices = nn.topk(input, k=k) auc_out = helper.create_tmp_variable(dtype="float32") helper.append_op( - type="accuracy", + type="auc", inputs={ "Out": [topk_out], "Indices": [topk_indices], diff --git a/python/paddle/fluid/layers/nn.py b/python/paddle/fluid/layers/nn.py index 2979ff3057a78ac3074cbb43b7a32966212073f6..787054a91c42a16c88e48f706b5a4ae430a54823 100644 --- a/python/paddle/fluid/layers/nn.py +++ b/python/paddle/fluid/layers/nn.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """ -All layers just related to the neural network. +All layers just related to the neural network. """ from ..layer_helper import LayerHelper @@ -109,14 +109,14 @@ def fc(input, """ **Fully Connected Layer** - This function creates a fully connected layer in the network. It can take - multiple tensors as its inputs. It creates a variable called weights for - each input tensor, which represents a fully connected weight matrix from - each input unit to each output unit. The fully connected layer multiplies - each input tensor with its coresponding weight to produce an output Tensor. - If multiple input tensors are given, the results of multiple multiplications - will be sumed up. If bias_attr is not None, a bias variable will be created - and added to the output. Finally, if activation is not None, it will be applied + This function creates a fully connected layer in the network. It can take + multiple tensors as its inputs. It creates a variable called weights for + each input tensor, which represents a fully connected weight matrix from + each input unit to each output unit. The fully connected layer multiplies + each input tensor with its coresponding weight to produce an output Tensor. + If multiple input tensors are given, the results of multiple multiplications + will be sumed up. If bias_attr is not None, a bias variable will be created + and added to the output. Finally, if activation is not None, it will be applied to the output as well. This process can be formulated as follows: @@ -198,10 +198,7 @@ def fc(input, else: pre_bias = helper.create_tmp_variable(dtype) helper.append_op( - type="sum", - inputs={"X": mul_results}, - outputs={"Out": pre_bias}, - attrs={"use_mkldnn": use_mkldnn}) + type="sum", inputs={"X": mul_results}, outputs={"Out": pre_bias}) # add bias pre_activation = helper.append_bias_op(pre_bias, dim_start=num_flatten_dims) # add activation @@ -850,7 +847,7 @@ def crf_decoding(input, param_attr, label=None): Returns: Variable: ${viterbi_path_comment} - + Examples: .. code-block:: python @@ -1088,7 +1085,7 @@ def chunk_eval(input, Here is a NER example of labeling for these tagging schemes: .. code-block:: python - + ====== ====== ====== ===== == ============ ===== ===== ===== == ========= Li Ming works at Agricultural Bank of China in Beijing. ====== ====== ====== ===== == ============ ===== ===== ===== == ========= @@ -1114,7 +1111,7 @@ def chunk_eval(input, is the num of chunk types, and `tag_type` get its value from the following table. .. code-block:: python - + Scheme Begin Inside End Single plain 0 - - - IOB 0 1 - - @@ -1150,7 +1147,7 @@ def chunk_eval(input, tuple: tuple containing: precision, recall, f1_score, num_infer_chunks, num_label_chunks, num_correct_chunks - + Examples: .. code-block:: python @@ -1250,7 +1247,7 @@ def sequence_softmax(input, param_attr=None, bias_attr=None, use_cudnn=True): """ This function computes the softmax activation among all time-steps for each sequence. The dimension of each time-step should be 1. Thus, the shape of - input Tensor can be either :math:`[N, 1]` or :math:`[N]`, where :math:`N` + input Tensor can be either :math:`[N, 1]` or :math:`[N]`, where :math:`N` is the sum of the length of all sequences. For i-th sequence in a mini-batch: @@ -1270,7 +1267,7 @@ def sequence_softmax(input, param_attr=None, bias_attr=None, use_cudnn=True): param_attr (ParamAttr|None): attributes for parameter use_cudnn (bool): Use cudnn kernel or not, it is valid only when the cudnn \ library is installed. Default: True - + Returns: Variable: output of sequence_softmax @@ -1831,11 +1828,11 @@ def pool2d(input, ${comment} Args: - input (Variable): The input tensor of pooling operator. The format of - input tensor is NCHW, where N is batch size, C is - the number of channels, H is the height of the + input (Variable): The input tensor of pooling operator. The format of + input tensor is NCHW, where N is batch size, C is + the number of channels, H is the height of the feature, and W is the width of the feature. - pool_size (int): The side length of pooling windows. All pooling + pool_size (int): The side length of pooling windows. All pooling windows are squares with pool_size on a side. pool_type: ${pooling_type_comment} pool_stride (int): stride of the pooling layer. @@ -1844,7 +1841,7 @@ def pool2d(input, use_cudnn: ${use_cudnn_comment} ceil_mode: ${ceil_mode_comment} use_mkldnn: ${use_mkldnn_comment} - name (str|None): A name for this layer(optional). If set None, the + name (str|None): A name for this layer(optional). If set None, the layer will be named automatically. Returns: @@ -1862,10 +1859,10 @@ def pool2d(input, data = fluid.layers.data( name='data', shape=[3, 32, 32], dtype='float32') conv2d = fluid.layers.pool2d( - input=data, - pool_size=2, - pool_type='max', - pool_stride=1, + input=data, + pool_size=2, + pool_type='max', + pool_stride=1, global_pooling=False) """ if pool_type not in ["max", "avg"]: @@ -2230,14 +2227,14 @@ def beam_search_decode(ids, scores, name=None): This layers is to pack the output of beam search layer into sentences and associated scores. It is usually called after the beam search layer. Typically, the output of beam search layer is a tensor of selected ids, with - a tensor of the score of each id. Beam search layer's output ids, however, - are generated directly during the tree search, and they are stacked by each - level of the search tree. Thus we need to reorganize them into sentences, + a tensor of the score of each id. Beam search layer's output ids, however, + are generated directly during the tree search, and they are stacked by each + level of the search tree. Thus we need to reorganize them into sentences, based on the score of each id. This layer takes the output of beam search layer as input and repack them into sentences. Args: - ids (Variable): The selected ids, output of beam search layer. + ids (Variable): The selected ids, output of beam search layer. scores (Variable): The associated scores of the ids, out put of beam search layer. name (str): The name of this layer. It is optional. @@ -2245,7 +2242,7 @@ def beam_search_decode(ids, scores, name=None): Returns: tuple(Variable): a tuple of two output tensors: sentence_ids, sentence_scores. sentence_ids is a tensor with shape [size, length], where size is the - beam size of beam search, and length is the length of each sentence. + beam size of beam search, and length is the length of each sentence. Note that the length of sentences may vary. sentence_scores is a tensor with the same shape as sentence_ids. @@ -2922,7 +2919,7 @@ def reduce_mean(input, dim=None, keep_dim=False, name=None): `None`, compute the mean over all elements of :attr:`input` and return a variable with a single element, otherwise it must be in the range :math:`[-rank(input), rank(input))`. If - :math:`dim[i] < 0`, the dimension to reduce is + :math:`dim[i] < 0`, the dimension to reduce is :math:`rank(input) + dim[i]`. keep_dim (bool): Whether to reserve the reduced dimension in the output Tensor. The result tensor will have one fewer dimension @@ -3393,16 +3390,16 @@ def topk(input, k, name=None): Args: input(Variable): The input variable which can be a vector or Tensor with higher rank. - k(int): The number of top elements to look for along the last dimension + k(int): The number of top elements to look for along the last dimension of input. name(str|None): A name for this layer(optional). If set None, the layer - will be named automatically. + will be named automatically. Default: None Returns: - Tuple[Variable]: A tuple with two elements. Each element is a Variable. - The first one is k largest elements along each last - dimensional slice. The second one is indices of values + Tuple[Variable]: A tuple with two elements. Each element is a Variable. + The first one is k largest elements along each last + dimensional slice. The second one is indices of values within the last dimension of input. Raises: @@ -3597,15 +3594,15 @@ def warpctc(input, label, blank=0, norm_by_times=False): It's shape is [Lp, num_classes + 1], where Lp is the sum of all input sequences' length and num_classes is the true number of classes. (not including the blank label). - label (Variable): The ground truth of variable-length sequence, + label (Variable): The ground truth of variable-length sequence, which is a 2-D Tensor with LoD information. It is of the shape [Lg, 1], where Lg is th sum of all labels' length. blank (int, default 0): The blank label index of Connectionist Temporal Classification (CTC) loss, which is in the half-opened interval [0, num_classes + 1). - norm_by_times(bool, default false): Whether to normalize the gradients - by the number of time-step, which is also the sequence's length. - There is no need to normalize the gradients if warpctc layer was + norm_by_times(bool, default false): Whether to normalize the gradients + by the number of time-step, which is also the sequence's length. + There is no need to normalize the gradients if warpctc layer was follewed by a mean_op. Returns: @@ -3711,8 +3708,8 @@ def nce(input, input (Variable): input variable. label (Variable): label. num_total_classes (int):${num_total_classes_comment} - sample_weight (Variable|None): A Variable of shape [batch_size, 1] - storing a weight for each sample. The default weight for each + sample_weight (Variable|None): A Variable of shape [batch_size, 1] + storing a weight for each sample. The default weight for each sample is 1.0. param_attr (ParamAttr|None): attributes for parameter bias_attr (ParamAttr|None): attributes for bias @@ -4102,7 +4099,7 @@ def smooth_l1(x, y, inside_weight=None, outside_weight=None, sigma=None): This layer computes the smooth L1 loss for Variable :attr:`x` and :attr:`y`. It takes the first dimension of :attr:`x` and :attr:`y` as batch size. For each instance, it computes the smooth L1 loss element by element first - and then sums all the losses. So the shape of ouput Variable is + and then sums all the losses. So the shape of ouput Variable is [batch_size, 1]. Args: @@ -4111,14 +4108,14 @@ def smooth_l1(x, y, inside_weight=None, outside_weight=None, sigma=None): y (Variable): A tensor with rank at least 2. The target value of smooth L1 loss op with same shape as :attr:`x`. inside_weight (Variable|None): A tensor with rank at least 2. This - input is optional and should have same shape with :attr:`x`. If - provided, the result of (:attr:`x` - :attr:`y`) will be multiplied + input is optional and should have same shape with :attr:`x`. If + provided, the result of (:attr:`x` - :attr:`y`) will be multiplied by this tensor element by element. outside_weight (Variable|None): A tensor with rank at least 2. This - input is optional and should have same shape with :attr:`x`. If - provided, the out smooth L1 loss will be multiplied by this tensor + input is optional and should have same shape with :attr:`x`. If + provided, the out smooth L1 loss will be multiplied by this tensor element by element. - sigma (float|None): Hyper parameter of smooth L1 loss layer. A float + sigma (float|None): Hyper parameter of smooth L1 loss layer. A float scalar with default value 1.0. Returns: @@ -4164,7 +4161,7 @@ def one_hot(input, depth): Examples: .. code-block:: python - + label = layers.data(name="label", shape=[1], dtype="float32") one_hot_label = layers.one_hot(input=label, depth=10) """ @@ -4318,10 +4315,10 @@ def reshape(x, shape, actual_shape=None, act=None, inplace=True, name=None): def lod_reset(x, y=None, target_lod=None): """ Set LoD of :attr:`x` to a new one specified by :attr:`y` or - :attr:`target_lod`. When :attr:`y` provided, :attr:`y.lod` would be - considered as target LoD first, otherwise :attr:`y.data` would be - considered as target LoD. If :attr:`y` is not provided, target LoD should - be specified by :attr:`target_lod`. If target LoD is specified by + :attr:`target_lod`. When :attr:`y` provided, :attr:`y.lod` would be + considered as target LoD first, otherwise :attr:`y.data` would be + considered as target LoD. If :attr:`y` is not provided, target LoD should + be specified by :attr:`target_lod`. If target LoD is specified by :attr:`Y.data` or :attr:`target_lod`, only one level LoD is supported. .. code-block:: text @@ -4375,7 +4372,7 @@ def lod_reset(x, y=None, target_lod=None): Args: x (Variable): Input variable which could be a Tensor or LodTensor. - y (Variable|None): If provided, output's LoD would be derived + y (Variable|None): If provided, output's LoD would be derived from :attr:`y`. target_lod (list|tuple|None): One level LoD which should be considered as target LoD when :attr:`y` not provided. @@ -4691,7 +4688,7 @@ def image_resize(input, """ **Resize a Batch of Images** - The input must be a tensor of the shape (num_batches, channels, in_h, in_w), + The input must be a tensor of the shape (num_batches, channels, in_h, in_w), and the resizing only applies on the last two dimensions(hight and width). Supporting resample methods: @@ -4787,9 +4784,9 @@ def resize_bilinear(input, out_shape=None, scale=None, name=None): def image_resize_short(input, out_short_len, resample='BILINEAR'): """ - Resize a batch of images. The short edge of input images will be - resized to the given 'out_short_len'. The long edge of input images - will be resized proportionately to make images' length-width ratio + Resize a batch of images. The short edge of input images will be + resized to the given 'out_short_len'. The long edge of input images + will be resized proportionately to make images' length-width ratio constant. Args: @@ -4822,7 +4819,7 @@ def gather(input, index): """ **Gather Layer** - Output is obtained by gathering entries of the outer-most dimension + Output is obtained by gathering entries of the outer-most dimension of X indexed by `index` and concatenate them together. .. math:: @@ -4847,7 +4844,7 @@ def gather(input, index): [5, 6]] Args: - input (Variable): The source input with rank>=1. + input (Variable): The source input with rank>=1. index (Variable): The index input with rank=1. Returns: @@ -4883,7 +4880,7 @@ def random_crop(x, shape, seed=None): Returns: ${out_comment} - + Examples: >>> img = fluid.layers.data("img", [3, 256, 256]) >>> cropped_img = fluid.layers.random_crop(img, shape=[3, 224, 224]) @@ -4929,7 +4926,7 @@ def log(x): Out = \\ln(x) Args: - x (Variable): Input tensor. + x (Variable): Input tensor. Returns: Variable: The natural log of the input tensor computed element-wise. @@ -4958,7 +4955,7 @@ def relu(x): Out = \\max(0, x) Args: - x (Variable): The input tensor. + x (Variable): The input tensor. Returns: Variable: The output tensor with the same shape as input. @@ -4979,15 +4976,15 @@ def relu(x): def mean_iou(input, label, num_classes): """ Mean Intersection-Over-Union is a common evaluation metric for - semantic image segmentation, which first computes the IOU for each - semantic class and then computes the average over classes. - IOU is defined as follows: - + semantic image segmentation, which first computes the IOU for each + semantic class and then computes the average over classes. + IOU is defined as follows: + .. math:: IOU = \\frac{true\_positiv}{(true\_positive + false\_positive + false\_negative)}. - The predictions are accumulated in a confusion matrix and mean-IOU + The predictions are accumulated in a confusion matrix and mean-IOU is then calculated from it. @@ -5000,12 +4997,12 @@ def mean_iou(input, label, num_classes): Returns: mean_iou (Variable): A Tensor representing the mean intersection-over-union with shape [1]. out_wrong(Variable): A Tensor with shape [num_classes]. The wrong numbers of each class. - out_correct(Variable): A Tensor with shape [num_classes]. The correct numbers of each class. + out_correct(Variable): A Tensor with shape [num_classes]. The correct numbers of each class. Examples: .. code-block:: python - + iou, wrongs, corrects = fluid.layers.mean_iou(predict, label, num_classes) """ helper = LayerHelper('mean_iou', **locals()) diff --git a/python/paddle/fluid/layers/tensor.py b/python/paddle/fluid/layers/tensor.py index b7a8bff30d3baffb7ec4d67a9bf6f5b00e3aa983..149e77b52415025e78fbf4ee641151880b415fb0 100644 --- a/python/paddle/fluid/layers/tensor.py +++ b/python/paddle/fluid/layers/tensor.py @@ -230,11 +230,7 @@ def sums(input, out=None): helper = LayerHelper('sum', **locals()) if out is None: out = helper.create_tmp_variable(dtype=helper.input_dtype()) - helper.append_op( - type='sum', - inputs={'X': input}, - outputs={'Out': out}, - attrs={'use_mkldnn': False}) + helper.append_op(type='sum', inputs={'X': input}, outputs={'Out': out}) return out @@ -384,7 +380,7 @@ def argmin(x, axis=0): """ **argmin** - This function computes the indices of the min elements + This function computes the indices of the min elements of the input tensor's element along the provided axis. Args: @@ -399,7 +395,7 @@ def argmin(x, axis=0): .. code-block:: python out = fluid.layers.argmin(x=in, axis=0) - out = fluid.layers.argmin(x=in, axis=-1) + out = fluid.layers.argmin(x=in, axis=-1) """ helper = LayerHelper("arg_min", **locals()) out = helper.create_tmp_variable(VarDesc.VarType.INT64) @@ -415,7 +411,7 @@ def argmax(x, axis=0): """ **argmax** - This function computes the indices of the max elements + This function computes the indices of the max elements of the input tensor's element along the provided axis. Args: @@ -430,7 +426,7 @@ def argmax(x, axis=0): .. code-block:: python out = fluid.layers.argmax(x=in, axis=0) - out = fluid.layers.argmax(x=in, axis=-1) + out = fluid.layers.argmax(x=in, axis=-1) """ helper = LayerHelper("arg_max", **locals()) out = helper.create_tmp_variable(VarDesc.VarType.INT64) @@ -499,9 +495,9 @@ def reverse(x, axis): Args: x(Vairbale): the input to be reversed. - axis(int|tuple|list): Axis that along which order of elements - is reversed. If it is a tuple or a list, reversing - will be apply on each axis in the tuple or list. + axis(int|tuple|list): Axis that along which order of elements + is reversed. If it is a tuple or a list, reversing + will be apply on each axis in the tuple or list. Returns: Variable: The reversed tensor. @@ -532,9 +528,9 @@ def save(x, file_path, overwrite=True): Args: x(variable): The Tensor/LoDTensor to be saved. file_path(str): The file path where the variable will be saved. - overwrite(bool): Whether or not cover the given file when it has already - existed. If it's set 'False' and the file is existed, a runtime - error will be thrown. + overwrite(bool): Whether or not cover the given file when it has already + existed. If it's set 'False' and the file is existed, a runtime + error will be thrown. """ helper = LayerHelper("save", **locals()) helper.append_op( @@ -554,8 +550,8 @@ def save_combine(x, file_path, overwrite=True): a single file. file_path(str): The file path where variables will be saved. overwrite(bool): Whether or not cover the given file when it has already - existed. If it's set 'False' and the file is existed, a runtime - error will be thrown. + existed. If it's set 'False' and the file is existed, a runtime + error will be thrown. Returns: There is no return value. diff --git a/python/paddle/fluid/metrics.py b/python/paddle/fluid/metrics.py index 572475b483ff0341a97a91b6c5309fcf337dacbe..c9cd881979a4ea4b14299ce219be4b5bd1f153fc 100644 --- a/python/paddle/fluid/metrics.py +++ b/python/paddle/fluid/metrics.py @@ -23,6 +23,8 @@ import warnings __all__ = [ 'MetricBase', 'CompositeMetric', + 'Precision', + 'Recall', 'Accuracy', 'ChunkEvaluator', 'EditDistance', @@ -46,33 +48,34 @@ def _is_number_or_matrix_(var): class MetricBase(object): """ - Base Class for all evaluators + Base Class for all Metrics. + MetricBase define a group of interfaces for the + model evaluation methods. Metrics accumulate metric states between + consecutive minibatches, at every minibatch, use update + interface to add current minibatch value to global states. + Use eval to compute accumative metric value from last reset() + or from scratch on. + If you need to custom a new metric, please inherit from MetricBase and + custom implementation. Args: - name(str): The name of evaluator. such as, "accuracy". Used for generate - temporary variable name. - Interface: - Note(*) : the states is the attributes who not has _ prefix. - - get_config(): print current states and configuration - reset(): clear the states. If the Metrics states type is not (int, float, np.ndarray), - Please override this method. - update(): update states at every minibatch - eval(): get metric evaluation in numpy type. + name(str): The name of metric instance. such as, "accuracy". + It needed if you want to distinct different metrics in a model. + """ - def __init__(self, name, **kwargs): + def __init__(self, name): self._name = str(name) if name != None else self.__class__.__name__ - self._kwargs = kwargs if kwargs != None else dict() - self.reset() def __str__(self): return self._name def reset(self): """ - states is the attributes who not has _ prefix. - reset the states of metrics. + reset clear the states of metrics. By default, the states + are the members who do not has _ prefix, reset set them to inital states. + If you violate the implicit name rule, please also custom the reset + interface. """ states = { attr: value @@ -90,61 +93,231 @@ class MetricBase(object): setattr(self, attr, None) def get_config(self): + """ + Get the metric and current states. + The states are the members who do not has "_" prefix. + + Args: + None + + Returns: + dict: a dict of metric and states + """ states = { attr: value for attr, value in self.__dict__.iteritems() if not attr.startswith("_") } - config = copy.deepcopy(self._kwargs) + config = {} config.update({"name": self._name, "states": copy.deepcopy(states)}) return config - def update(self): - raise NotImplementedError() + def update(self, preds, labels): + """ + Updates the metric states at every minibatch. + One user can compute the minibatch metric via pure Python, or + via a c++ operator. + + Args: + preds(numpy.array): the predictions of current minibatch + labels(numpy.array): the labels of current minibatch, if the label is one-hot + or soft-label, should custom the corresponding update rule. + """ + raise NotImplementedError( + "Should not use it directly, please extend it.") def eval(self): - raise NotImplementedError() + """ + Evalute the current metrics based the accumulated states. + + Returns: + float|list(float)|numpy.array: the metrics via Python. + """ + raise NotImplementedError( + "Should not use it directly, please extend it.") class CompositeMetric(MetricBase): """ - Compute multiple metrics in each minibatch. + Composite multiple metrics in one instance. for example, merge F1, accuracy, recall into one Metric. + + Examples: + .. code-block:: python + + labels = fluid.layers.data(name="data", shape=[1], dtype="int32") + data = fluid.layers.data(name="data", shape=[32, 32], dtype="int32") + pred = fluid.layers.fc(input=data, size=1000, act="tanh") + comp = fluid.metrics.CompositeMetric() + acc = fluid.metrics.Precision() + recall = fluid.metrics.Recall() + comp.add_metric(acc) + comp.add_metric(recall) + for pass in range(PASSES): + comp.reset() + for data in train_reader(): + loss, preds, labels = exe.run(fetch_list=[cost, preds, labels]) + comp.update(preds=preds, labels=labels) + numpy_acc, numpy_recall = comp.eval() """ - def __init__(self, name=None, **kwargs): - super(CompositeMetric, self).__init__(name, kwargs) + def __init__(self, name=None): + super(CompositeMetric, self).__init__(name) self._metrics = [] def add_metric(self, metric): + """ + add one metric instance to CompositeMetric. + + Args: + metric: a instance of MetricBase. + """ if not isinstance(metric, MetricBase): raise ValueError("SubMetric should be inherit from MetricBase.") self._metrics.append(metric) + def update(self, preds, labels): + """ + Update every metrics in sequence. + + Args: + preds(numpy.array): the predictions of current minibatch + labels(numpy.array): the labels of current minibatch, if the label is one-hot + or soft-label, should custom the corresponding update rule. + """ + for m in self._metrics: + ans.append(m.update(preds, labels)) + def eval(self): + """ + Evaluate every metrics in sequence. + + Returns: + list(float|numpy.array): a list of metrics value in Python. + """ ans = [] for m in self._metrics: ans.append(m.eval()) return ans +class Precision(MetricBase): + """ + Precision (also called positive predictive value) is the fraction of + relevant instances among the retrieved instances. + https://en.wikipedia.org/wiki/Evaluation_of_binary_classifiers + + Note Precision is different with Accuracy in binary classifiers. + accuracy = true positive / total instances + precision = true positive / all positive instance + + Examples: + .. code-block:: python + + metric = fluid.metrics.Precision() + for pass in range(PASSES): + metric.reset() + for data in train_reader(): + loss, preds, labels = exe.run(fetch_list=[cost, preds, labels]) + metric.update(preds=preds, labels=labels) + numpy_precision = metric.eval() + """ + + def __init__(self, name=None): + super(Precision, self).__init__(name) + self.tp = 0 # true positive + self.fp = 0 # false positive + + def update(self, preds, labels): + if not _is_numpy_(preds): + raise ValueError("The 'preds' must be a numpy ndarray.") + if not _is_numpy_(labels): + raise ValueError("The 'labels' must be a numpy ndarray.") + sample_num = labels[0] + for i in range(sample_num): + pred = preds[i].astype("int32") + label = labels[i] + if label == 1: + if pred == label: + self.tp += 1 + else: + self.fp += 1 + + def eval(self): + ap = self.tp + self.fp + return float(self.tp) / ap if ap != 0 else .0 + + +class Recall(MetricBase): + """ + Recall (also known as sensitivity) is the fraction of + relevant instances that have been retrieved over the + total amount of relevant instances + + https://en.wikipedia.org/wiki/Precision_and_recall + + Examples: + .. code-block:: python + + metric = fluid.metrics.Recall() + for pass in range(PASSES): + metric.reset() + for data in train_reader(): + loss, preds, labels = exe.run(fetch_list=[cost, preds, labels]) + metric.update(preds=preds, labels=labels) + numpy_recall = metric.eval() + """ + + def __init__(self, name=None): + super(Recall, self).__init__(name) + self.tp = 0 # true positive + self.fn = 0 # false negtive + + def update(self, preds, labels): + if not _is_numpy_(preds): + raise ValueError("The 'preds' must be a numpy ndarray.") + if not _is_numpy_(labels): + raise ValueError("The 'labels' must be a numpy ndarray.") + sample_num = labels[0] + for i in range(sample_num): + pred = preds[i].astype("int32") + label = labels[i] + if label == 1: + if pred == label: + self.tp += 1 + else: + if pred != label: + self.fn += 1 + + def eval(self): + recall = self.tp + self.fn + return float(self.tp) / recall if recall != 0 else .0 + + class Accuracy(MetricBase): """ Accumulate the accuracy from minibatches and compute the average accuracy for every pass. + https://en.wikipedia.org/wiki/Accuracy_and_precision Args: name: the metrics name - Example: - minibatch_accuracy = fluid.layers.accuracy(pred, label) - accuracy_evaluator = fluid.metrics.Accuracy() - for epoch in PASS_NUM: - accuracy_evaluator.reset() - for data in batches: - loss = exe.run(fetch_list=[cost, minibatch_accuracy]) - accuracy_evaluator.update(value=minibatch_accuracy, weight=batches) - accuracy = accuracy_evaluator.eval() + Examples: + .. code-block:: python + + labels = fluid.layers.data(name="data", shape=[1], dtype="int32") + data = fluid.layers.data(name="data", shape=[32, 32], dtype="int32") + pred = fluid.layers.fc(input=data, size=1000, act="tanh") + minibatch_accuracy = fluid.layers.accuracy(pred, label) + accuracy_evaluator = fluid.metrics.Accuracy() + for pass in range(PASSES): + accuracy_evaluator.reset() + for data in train_reader(): + batch_size = data[0] + loss = exe.run(fetch_list=[cost, minibatch_accuracy]) + accuracy_evaluator.update(value=minibatch_accuracy, weight=batch_size) + numpy_acc = accuracy_evaluator.eval() """ def __init__(self, name=None): @@ -153,6 +326,13 @@ class Accuracy(MetricBase): self.weight = .0 def update(self, value, weight): + """ + Update minibatch states. + + Args: + value(float|numpy.array): accuracy of one minibatch. + weight(int|float): batch size. + """ if not _is_number_or_matrix_(value): raise ValueError( "The 'value' must be a number(int, float) or a numpy ndarray.") @@ -163,9 +343,8 @@ class Accuracy(MetricBase): def eval(self): if self.weight == 0: - raise ValueError( - "There is no data in Accuracy Metrics. Please check layers.accuracy output has added to Accuracy." - ) + raise ValueError("There is no data in Accuracy Metrics. \ + Please check layers.accuracy output has added to Accuracy.") return self.value / self.weight @@ -174,6 +353,25 @@ class ChunkEvaluator(MetricBase): Accumulate counter numbers output by chunk_eval from mini-batches and compute the precision recall and F1-score using the accumulated counter numbers. + For some basics of chunking, please refer to + 'Chunking with Support Vector Machines '. + ChunkEvalEvaluator computes the precision, recall, and F1-score of chunk detection, + and supports IOB, IOE, IOBES and IO (also known as plain) tagging schemes. + + Examples: + .. code-block:: python + + labels = fluid.layers.data(name="data", shape=[1], dtype="int32") + data = fluid.layers.data(name="data", shape=[32, 32], dtype="int32") + pred = fluid.layers.fc(input=data, size=1000, act="tanh") + precision, recall, f1_score, num_infer_chunks, num_label_chunks, num_correct_chunks = layers.chunk_eval( + input=pred, + label=label) + metric = fluid.metrics.ChunkEvaluator() + for data in train_reader(): + loss, preds, labels = exe.run(fetch_list=[cost, preds, labels]) + metric.update(num_infer_chunks, num_label_chunks, num_correct_chunks) + numpy_precision, numpy_recall, numpy_f1 = metric.eval() """ def __init__(self, name=None): @@ -183,9 +381,17 @@ class ChunkEvaluator(MetricBase): self.num_correct_chunks = 0 def update(self, num_infer_chunks, num_label_chunks, num_correct_chunks): + """ + Update the states based on the layers.chunk_eval() ouputs. + Args: + num_infer_chunks(int|numpy.array): The number of chunks in Inference on the given minibatch. + num_label_chunks(int|numpy.array): The number of chunks in Label on the given mini-batch. + num_correct_chunks(int|float|numpy.array): The number of chunks both in Inference and Label on the + given mini-batch. + """ if not _is_number_or_matrix_(num_infer_chunks): raise ValueError( - "The 'num_infer_chunks' must be a number(int, float) or a numpy ndarray." + "The 'num_infer_chunks' must be a number(int) or a numpy ndarray." ) if not _is_number_or_matrix_(num_label_chunks): raise ValueError( @@ -212,21 +418,28 @@ class ChunkEvaluator(MetricBase): class EditDistance(MetricBase): """ + Edit distance is a way of quantifying how dissimilar two strings + (e.g., words) are to one another by counting the minimum number + of operations required to transform one string into the other. + Refer to https://en.wikipedia.org/wiki/Edit_distance + Accumulate edit distance sum and sequence number from mini-batches and compute the average edit_distance and instance error of all batches. Args: name: the metrics name - Example: - edit_distance_metrics = fluid.layers.edit_distance(input, label) - distance_evaluator = fluid.metrics.EditDistance() - for epoch in PASS_NUM: - distance_evaluator.reset() - for data in batches: - loss = exe.run(fetch_list=[cost] + list(edit_distance_metrics)) - distance_evaluator.update(*edit_distance_metrics) - distance, instance_error = distance_evaluator.eval() + Examples: + .. code-block:: python + + distances, seq_num = fluid.layers.edit_distance(input, label) + distance_evaluator = fluid.metrics.EditDistance() + for epoch in PASS_NUM: + distance_evaluator.reset() + for data in batches: + loss = exe.run(fetch_list=[cost] + list(edit_distance_metrics)) + distance_evaluator.update(distances, seq_num) + distance, instance_error = distance_evaluator.eval() In the above example: 'distance' is the average of the edit distance in a pass. @@ -264,16 +477,38 @@ class EditDistance(MetricBase): class DetectionMAP(MetricBase): """ Calculate the detection mean average precision (mAP). - - TODO (Dang Qingqing): update the following doc. - The general steps are as follows: - 1. calculate the true positive and false positive according to the input - of detection and labels. - 2. calculate mAP value, support two versions: '11 point' and 'integral'. - + mAP is the metric to measure the accuracy of object detectors + like Faster R-CNN, SSD, etc. + It is the average of the maximum precisions at different recall values. Please get more information from the following articles: https://sanchom.wordpress.com/tag/average-precision/ + https://arxiv.org/abs/1512.02325 + + The general steps are as follows: + + 1. calculate the true positive and false positive according to the input + of detection and labels. + 2. calculate mAP value, support two versions: '11 point' and 'integral'. + + Examples: + .. code-block:: python + + pred = fluid.layers.fc(input=data, size=1000, act="tanh") + batch_map = layers.detection_map( + input, + label, + class_num, + background_label, + overlap_threshold=overlap_threshold, + evaluate_difficult=evaluate_difficult, + ap_version=ap_version) + metric = fluid.metrics.DetectionMAP() + for data in train_reader(): + loss, preds, labels = exe.run(fetch_list=[cost, batch_map]) + batch_size = data[0] + metric.update(value=batch_map, weight=batch_size) + numpy_map = metric.eval() """ def __init__(self, name=None): @@ -302,17 +537,18 @@ class DetectionMAP(MetricBase): class Auc(MetricBase): """ - Auc Metrics which adapts to binary classification. - Need to note that auc metrics compute the value via Python natively. + Auc metric adapts to the binary classification. + Refer to https://en.wikipedia.org/wiki/Receiver_operating_characteristic#Area_under_the_curve + Need to note that auc metric compute the value via Python natively. If you concern the speed, please use the fluid.layers.auc instead. The `auc` function creates four local variables, `true_positives`, - `true_negatives`, `false_positives` and `false_negatives` that are used to - compute the AUC. To discretize the AUC curve, a linearly spaced set of - thresholds is used to compute pairs of recall and precision values. The area - under the ROC-curve is therefore computed using the height of the recall - values by the false positive rate, while the area under the PR-curve is the - computed using the height of the precision values by the recall. + `true_negatives`, `false_positives` and `false_negatives` that are used to + compute the AUC. To discretize the AUC curve, a linearly spaced set of + thresholds is used to compute pairs of recall and precision values. The area + under the ROC-curve is therefore computed using the height of the recall + values by the false positive rate, while the area under the PR-curve is the + computed using the height of the precision values by the recall. Args: name: metric name @@ -322,6 +558,16 @@ class Auc(MetricBase): curve. "NOTE: only implement the ROC curve type via Python now." + + Examples: + .. code-block:: python + + pred = fluid.layers.fc(input=data, size=1000, act="tanh") + metric = fluid.metrics.Auc() + for data in train_reader(): + loss, preds, labels = exe.run(fetch_list=[cost, preds, labels]) + metric.update(preds, labels) + numpy_auc = metric.eval() """ def __init__(self, name, curve='ROC', num_thresholds=200): @@ -334,10 +580,10 @@ class Auc(MetricBase): self.tn_list = np.zeros((num_thresholds, )) self.fp_list = np.zeros((num_thresholds, )) - def update(self, labels, predictions, axis=1): + def update(self, preds, labels): if not _is_numpy_(labels): raise ValueError("The 'labels' must be a numpy ndarray.") - if not _is_numpy_(predictions): + if not _is_numpy_(preds): raise ValueError("The 'predictions' must be a numpy ndarray.") kepsilon = 1e-7 # to account for floating point imprecisions diff --git a/python/paddle/fluid/tests/unittests/test_sum_mkldnn_op.py b/python/paddle/fluid/tests/unittests/test_sum_mkldnn_op.py deleted file mode 100644 index 7956897d68a3fb49d62ba696d0b6400b4f909989..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/unittests/test_sum_mkldnn_op.py +++ /dev/null @@ -1,26 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest - -from test_sum_op import TestSumOp - - -class TestMKLDNN(TestSumOp): - def init_kernel_type(self): - self.use_mkldnn = True - - -if __name__ == '__main__': - unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_sum_op.py b/python/paddle/fluid/tests/unittests/test_sum_op.py index 1d90414e137a70e6265042e24e106fe565802778..2faf5b10647a1fa1d44e4847f017db177ee8808a 100644 --- a/python/paddle/fluid/tests/unittests/test_sum_op.py +++ b/python/paddle/fluid/tests/unittests/test_sum_op.py @@ -20,15 +20,12 @@ from op_test import OpTest class TestSumOp(OpTest): def setUp(self): self.op_type = "sum" - self.use_mkldnn = False - self.init_kernel_type() x0 = np.random.random((3, 4)).astype('float32') x1 = np.random.random((3, 4)).astype('float32') x2 = np.random.random((3, 4)).astype('float32') self.inputs = {"X": [("x0", x0), ("x1", x1), ("x2", x2)]} y = x0 + x1 + x2 self.outputs = {'Out': y} - self.attrs = {'use_mkldnn': self.use_mkldnn} def test_check_output(self): self.check_output() @@ -36,9 +33,6 @@ class TestSumOp(OpTest): def test_check_grad(self): self.check_grad(['x0'], 'Out') - def init_kernel_type(self): - pass - if __name__ == "__main__": unittest.main() diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index cf59808b3d2e463239c5785e4e1a727584b6754b..391dddcf3e9c4f4e087cac703eacbf56a8dbf80f 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -880,8 +880,7 @@ class DistributeTranspiler(object): table_opt_block.append_op( type="sum", inputs={"X": pserver_side_table_grad_list}, - outputs={"Out": [grad_var]}, - attrs={"use_mkldnn": False}) + outputs={"Out": [grad_var]}) else: # in async_mode, for table gradient, it also need to be splited to each parameter server origin_grad_name = grad_var.name @@ -1113,8 +1112,7 @@ class DistributeTranspiler(object): optimize_block.append_op( type="sum", inputs={"X": vars2merge}, - outputs={"Out": merged_var}, - attrs={"use_mkldnn": False}) + outputs={"Out": merged_var}) # TODO(panyx0718): What if it's SELECTED_ROWS. if not merged_var.type == core.VarDesc.VarType.SELECTED_ROWS: optimize_block.append_op( diff --git a/python/paddle/reader/decorator.py b/python/paddle/reader/decorator.py index 1f83cabb8481451736944823be45185deea4f43b..44a6e344630bb35d28ee29078bf8727053a24bef 100644 --- a/python/paddle/reader/decorator.py +++ b/python/paddle/reader/decorator.py @@ -336,7 +336,7 @@ def _buf2lines(buf, line_break="\n"): class PipeReader: """ - PipeReader read data by stream from a command, take it's + PipeReader read data by stream from a command, take it's stdout into a pipe buffer and redirect it to the parser to parse, then yield data as your desired format. @@ -352,7 +352,7 @@ class PipeReader: An example: .. code-block:: python - + def example_reader(): for f in myfiles: pr = PipeReader("cat %s"%f) diff --git a/tools/check_ctest_hung.py b/tools/check_ctest_hung.py new file mode 100644 index 0000000000000000000000000000000000000000..7de76c381b29a1ff8dcf2167f0e861dc261aa47b --- /dev/null +++ b/tools/check_ctest_hung.py @@ -0,0 +1,53 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import re + + +def escape(input): + o = input.replace("\n", "") + o = o.replace("\r", "") + return o + + +def main(): + usage = """Usage: +1. Download the Paddle_PR_CI_*.log from TeamCity +2. run: python check_ctest_hung.py Paddle_PR_CI_*.log +3. If there is hung ctest, the result likes: +Diff: set(['test_parallel_executor_crf']) + """ + if len(sys.argv) < 2: + print(usage) + exit(0) + + logfile = sys.argv[1] + started = set() + passed = set() + with open(logfile, "r") as fn: + for l in fn.readlines(): + if l.find("Test ") != -1 and \ + l.find("Passed") != -1: + m = re.search("Test\s+#[0-9]*\:\s([a-z0-9_]+)", escape(l)) + passed.add(m.group(1)) + if l.find("Start ") != -1: + start_parts = escape(l).split(" ") + m = re.search("Start\s+[0-9]+\:\s([a-z0-9_]+)", escape(l)) + started.add(m.group(1)) + print "Diff: ", started - passed + + +if __name__ == "__main__": + main() diff --git a/.clang_format.hook b/tools/codestyle/clang_format.hook similarity index 100% rename from .clang_format.hook rename to tools/codestyle/clang_format.hook diff --git a/.copyright.hook b/tools/codestyle/copyright.hook similarity index 100% rename from .copyright.hook rename to tools/codestyle/copyright.hook