diff --git a/cmake/inference_lib.cmake b/cmake/inference_lib.cmake index cc758019827b9a5416a801e4da43d754d4492a73..06a7ae56827d5afe857ed0a98092210917a52430 100644 --- a/cmake/inference_lib.cmake +++ b/cmake/inference_lib.cmake @@ -70,6 +70,12 @@ copy(glog_lib DSTS ${dst_dir} ${dst_dir}/lib ) +set(dst_dir "${CMAKE_INSTALL_PREFIX}/third_party/boost/") +copy(boost_lib + SRCS ${BOOST_INCLUDE_DIR}/boost + DSTS ${dst_dir} +) + if(NOT PROTOBUF_FOUND) set(dst_dir "${CMAKE_INSTALL_PREFIX}/third_party/install/protobuf") copy(protobuf_lib diff --git a/doc/fluid/design/dist_train/async_update.md b/doc/fluid/design/dist_train/async_update.md index 6a0835b761b69030ba30697e6e8863928efbf57f..248d2ec18dafdecac9184527638754b6ba4d85b8 100644 --- a/doc/fluid/design/dist_train/async_update.md +++ b/doc/fluid/design/dist_train/async_update.md @@ -4,34 +4,37 @@ For the typical synchronous distributed training, some significant steps are as follows: -1. A Trainer will compute the gradients and SEND them to the Parameter Server(PServer) nodes. -1. After the PServer node received gradients came from all the Trainers, It will aggregate the +1. A trainer process will compute the gradients and **send** them to the parameter server (PS) nodes. +1. After the PS node received gradients came from all the Trainers, It will aggregate the gradient variables for the same parameter into one gradient variable and then apply the aggregated gradient to the respective parameter, finally using an optimize algorithms(SGD, Monument...) to update the parameters. -1. The Trainer would wait for the PServers finished the optimize stage, and GET the parameters from PServer, +1. The Trainer would wait for the PS finished the optimize stage, and GET the parameters from PS, so all the Trainers would get the same parameters. -In the synchronously distributed training, there should be a `Barrier` to synchronise the -parameters after the optimizing stage. The performance of a distributed training job would -depend on the slowest node if there were hundreds or thousands of training nodes in a -Job, the performance of synchronously distributed training might be very poor because of -the slow node. So this design doc would introduce an approach to implement -*asynchronously* distributed training in PaddlePaddle Fluid. +In Synchronous Distributed Training, there is a **barrier** on each PS to wait until all trainers processes +have completed running current mini-batch. After that, all trainers can continue to run the next +mini-batch. So, we can find that the overall performance of Synchronous Distributed Training depends +on the slowest node. + +In Asynchronous Distributed Training, we don't need to wait for a global mini-bach, the optimizer on +the PS will run immediately when the gradient is uploaded to the PS from one trainer. This mode would +train such models that achieve scaling, better throughput. In this design doc, we will introduce how to +implement the Asynchronous Distributed Training base on PaddlePaddle Fluid. ## Design -As the figure above, we describe a global view of asynchronously update process and use +As the figure above, we describe a global view of the asynchronous update process and use the parameter `w1` as an example to introduce the steps: 1. For each gradient variables, they may distribute on different GPU card and aggregate them while they are all calculated. -1. Split the gradient variable into multiple blocks according to the number of PServer +1. Split the gradient variable into multiple blocks according to the number of PS instances and then send them. -1. PServer would run an `Optimize Block` using a specified optimize algorithm to update +1. PS would run an `Optimize Block` using a specified optimize algorithm to update the specified parameter. -1. The trainer will fetch latest parameter from PServer before running forward Op which depends +1. The trainer will fetch the latest parameter from PS before running forward Op which depends on the specified parameter. 1. Broadcast the received variable into multiple GPU cards and continue to run the next mini-batch. @@ -40,8 +43,8 @@ mini-batch. - For the multiple devices distributed training, we need to aggregate the gradient variables which placed on different devices firstly and then schedule a `SendVars` Operator to -send the gradient variables to the multiple PServer instances. -- Schedule `FetchVars` operator to fetch the latest parameter from PServer before running +send the gradient variables to the multiple PS instances. +- Schedule `FetchVars` operator to fetch the latest parameter from PS before running the forward ops. - There could be a large number of gradient variables to be sent, so we need to use another thread pool(IO Threadpool) whose a number of the schedulable threads is larger than the diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index ab71e0e63ce18e4f221a046eeb2c39499c1c3816..ed1e70c6460b513c1d2e1add18ac037f71d36944 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -5,11 +5,11 @@ proto_library(framework_proto SRCS framework.proto) cc_library(ddim SRCS ddim.cc DEPS eigen3 boost) cc_test(ddim_test SRCS ddim_test.cc DEPS ddim) nv_test(dim_test SRCS dim_test.cu DEPS ddim) - +cc_library(data_type SRCS data_type.cc DEPS framework_proto ddim device_context) if(WITH_GPU) - nv_library(tensor SRCS tensor.cc tensor_util.cu DEPS ddim place memory device_context framework_proto) + nv_library(tensor SRCS tensor.cc tensor_util.cu DEPS place memory data_type) else() - cc_library(tensor SRCS tensor.cc tensor_util.cc DEPS ddim place memory device_context framework_proto) + cc_library(tensor SRCS tensor.cc tensor_util.cc DEPS place memory data_type) endif() cc_test(tensor_test SRCS tensor_test.cc DEPS tensor) diff --git a/paddle/fluid/framework/data_type.cc b/paddle/fluid/framework/data_type.cc new file mode 100644 index 0000000000000000000000000000000000000000..b9c90cb0c32f337ba82ce1eaa5b43199540491ef --- /dev/null +++ b/paddle/fluid/framework/data_type.cc @@ -0,0 +1,101 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/framework/data_type.h" +#include +#include +#include + +namespace paddle { +namespace framework { + +struct DataTypeMap { + std::unordered_map cpp_to_proto_; + std::unordered_map proto_to_cpp_; + std::unordered_map proto_to_str_; + std::unordered_map cpp_to_size_; +}; + +static DataTypeMap* InitDataTypeMap(); +static DataTypeMap& gDataTypeMap() { + static DataTypeMap* g_data_type_map_ = InitDataTypeMap(); + return *g_data_type_map_; +} + +template +static inline void RegisterType(DataTypeMap* map, + proto::VarType::Type proto_type, + const std::string& name) { + map->proto_to_cpp_.emplace(static_cast(proto_type), typeid(T)); + map->cpp_to_proto_.emplace(typeid(T), proto_type); + map->proto_to_str_.emplace(static_cast(proto_type), name); + map->cpp_to_size_.emplace(typeid(T), sizeof(T)); +} + +static DataTypeMap* InitDataTypeMap() { + auto retv = new DataTypeMap(); + +#define RegType(cc_type, proto_type) \ + RegisterType(retv, proto_type, #cc_type) + + // NOTE: Add your customize type here. + RegType(platform::float16, proto::VarType::FP16); + RegType(float, proto::VarType::FP32); + RegType(double, proto::VarType::FP64); + RegType(int, proto::VarType::INT32); + RegType(int64_t, proto::VarType::INT64); + RegType(bool, proto::VarType::BOOL); + RegType(size_t, proto::VarType::SIZE_T); + RegType(int16_t, proto::VarType::INT16); + +#undef RegType + return retv; +} + +proto::VarType::Type ToDataType(std::type_index type) { + auto it = gDataTypeMap().cpp_to_proto_.find(type); + if (it != gDataTypeMap().cpp_to_proto_.end()) { + return it->second; + } + PADDLE_THROW("Not support %s as tensor type", type.name()); +} + +std::type_index ToTypeIndex(proto::VarType::Type type) { + auto it = gDataTypeMap().proto_to_cpp_.find(static_cast(type)); + if (it != gDataTypeMap().proto_to_cpp_.end()) { + return it->second; + } + PADDLE_THROW("Not support proto::VarType::Type(%d) as tensor type", + static_cast(type)); +} + +std::string DataTypeToString(const proto::VarType::Type type) { + auto it = gDataTypeMap().proto_to_str_.find(static_cast(type)); + if (it != gDataTypeMap().proto_to_str_.end()) { + return it->second; + } + PADDLE_THROW("Not support proto::VarType::Type(%d) as tensor type", + static_cast(type)); +} + +size_t SizeOfType(std::type_index type) { + auto it = gDataTypeMap().cpp_to_size_.find(type); + if (it != gDataTypeMap().cpp_to_size_.end()) { + return it->second; + } + PADDLE_THROW("Not support %s as tensor type", type.name()); +} + +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/data_type.h b/paddle/fluid/framework/data_type.h index 2a528eb3aa562568c92059250f2c9bc5a75ec103..4b9f572ec5f1cda71c8b8dd8fae54b42e9f16f7a 100644 --- a/paddle/fluid/framework/data_type.h +++ b/paddle/fluid/framework/data_type.h @@ -17,51 +17,14 @@ limitations under the License. */ #include #include "paddle/fluid/framework/framework.pb.h" #include "paddle/fluid/platform/enforce.h" + #include "paddle/fluid/platform/float16.h" namespace paddle { namespace framework { -inline proto::VarType::Type ToDataType(std::type_index type) { - if (typeid(platform::float16).hash_code() == type.hash_code()) { - return proto::VarType::FP16; - } else if (typeid(const float).hash_code() == type.hash_code()) { - // CPPLint complains Using C-style cast. Use static_cast() instead - // One fix to this is to replace float with const float because - // typeid(T) == typeid(const T) - // http://en.cppreference.com/w/cpp/language/typeid - return proto::VarType::FP32; - } else if (typeid(const double).hash_code() == type.hash_code()) { - return proto::VarType::FP64; - } else if (typeid(const int).hash_code() == type.hash_code()) { - return proto::VarType::INT32; - } else if (typeid(const int64_t).hash_code() == type.hash_code()) { - return proto::VarType::INT64; - } else if (typeid(const bool).hash_code() == type.hash_code()) { - return proto::VarType::BOOL; - } else { - PADDLE_THROW("Not supported"); - } -} - -inline std::type_index ToTypeIndex(proto::VarType::Type type) { - switch (type) { - case proto::VarType::FP16: - return typeid(platform::float16); - case proto::VarType::FP32: - return typeid(float); - case proto::VarType::FP64: - return typeid(double); - case proto::VarType::INT32: - return typeid(int); - case proto::VarType::INT64: - return typeid(int64_t); - case proto::VarType::BOOL: - return typeid(bool); - default: - PADDLE_THROW("Not support type %d", type); - } -} +extern proto::VarType::Type ToDataType(std::type_index type); +extern std::type_index ToTypeIndex(proto::VarType::Type type); template inline void VisitDataType(proto::VarType::Type type, Visitor visitor) { @@ -89,32 +52,12 @@ inline void VisitDataType(proto::VarType::Type type, Visitor visitor) { } } -inline std::string DataTypeToString(const proto::VarType::Type type) { - switch (type) { - case proto::VarType::FP16: - return "float16"; - case proto::VarType::FP32: - return "float32"; - case proto::VarType::FP64: - return "float64"; - case proto::VarType::INT16: - return "int16"; - case proto::VarType::INT32: - return "int32"; - case proto::VarType::INT64: - return "int64"; - case proto::VarType::BOOL: - return "bool"; - default: - PADDLE_THROW("Not support type %d", type); - } -} - +extern std::string DataTypeToString(const proto::VarType::Type type); +extern size_t SizeOfType(std::type_index type); inline std::ostream& operator<<(std::ostream& out, const proto::VarType::Type& type) { out << DataTypeToString(type); return out; } - } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/details/build_strategy.h b/paddle/fluid/framework/details/build_strategy.h new file mode 100644 index 0000000000000000000000000000000000000000..91bdfe6134ffbd1404336c9d6d1222a505084b2b --- /dev/null +++ b/paddle/fluid/framework/details/build_strategy.h @@ -0,0 +1,36 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +namespace paddle { +namespace framework { +namespace details { + +struct BuildStrategy { + enum class ReduceStrategy { kAllReduce = 0, kReduce = 1 }; + + enum class GradientScaleStrategy { + kCoeffNumDevice = 0, + kOne = 1, + kCustomized = 2, + }; + + ReduceStrategy reduce_{ReduceStrategy::kAllReduce}; + GradientScaleStrategy gradient_scale_{GradientScaleStrategy::kCoeffNumDevice}; +}; + +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/details/execution_strategy.h b/paddle/fluid/framework/details/execution_strategy.h new file mode 100644 index 0000000000000000000000000000000000000000..e8d510ec955602b5a3f73ca06caa121886eb150b --- /dev/null +++ b/paddle/fluid/framework/details/execution_strategy.h @@ -0,0 +1,29 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +namespace paddle { +namespace framework { +namespace details { + +struct ExecutionStrategy { + size_t num_threads_{0}; + bool use_event_{true}; + bool allow_op_delay_{false}; +}; + +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/details/multi_devices_graph_builder.cc b/paddle/fluid/framework/details/multi_devices_graph_builder.cc index 428efb4ace819f7711a369c1a6ff2792cc725e3a..7aae514094cab674511a4e88cc642c1d96df83d7 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_builder.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_builder.cc @@ -37,31 +37,26 @@ MultiDevSSAGraphBuilder::MultiDevSSAGraphBuilder( const std::string &loss_var_name, const std::unordered_set ¶ms, const std::vector &local_scopes, - platform::NCCLContextMap *nccl_ctxs, bool use_default_grad_scale, - bool balance_parameter_opt_between_cards) + platform::NCCLContextMap *nccl_ctxs, const BuildStrategy &strategy) : loss_var_name_(loss_var_name), places_(places), local_scopes_(local_scopes), nccl_ctxs_(nccl_ctxs), - balance_parameter_opt_between_cards_( - balance_parameter_opt_between_cards) { + strategy_(strategy) { #else MultiDevSSAGraphBuilder::MultiDevSSAGraphBuilder( const std::vector &places, const std::string &loss_var_name, const std::unordered_set ¶ms, - const std::vector &local_scopes, bool use_default_grad_scale, - bool balance_parameter_opt_between_cards) + const std::vector &local_scopes, const BuildStrategy &strategy) : loss_var_name_(loss_var_name), places_(places), local_scopes_(local_scopes), - balance_parameter_opt_between_cards_( - balance_parameter_opt_between_cards) { + strategy_(strategy) { #endif for (auto &p : params) { grad_names_.insert(GradVarName(p)); } - use_default_grad_scale_ = use_default_grad_scale; } void MultiDevSSAGraphBuilder::CreateOpHandleIOs(SSAGraph *result, @@ -146,7 +141,8 @@ std::unique_ptr MultiDevSSAGraphBuilder::Build( CreateComputationalOps(&result, *op, 1); } else if (IsScaleLossOp(*op)) { // user can customize loss@grad if not use_default_grad_scale_ - if (use_default_grad_scale_) { + if (strategy_.gradient_scale_ != + BuildStrategy::GradientScaleStrategy::kCustomized) { CreateScaleLossGradOp(&result); } is_forwarding = false; @@ -168,21 +164,23 @@ std::unique_ptr MultiDevSSAGraphBuilder::Build( static_cast(OpRole::kBackward))) { auto &backward_vars = boost::get>( op->GetAttr(OpProtoAndCheckerMaker::OpRoleVarAttrName())); - for (auto &og : backward_vars) { - if (balance_parameter_opt_between_cards_) { - CreateReduceOp(&result, og, cur_device_id); - var_name_on_devices[cur_device_id].emplace(og); - bcast_var_name_set[cur_device_id].emplace( - og.substr(0, og.size() - strlen(kGradVarSuffix))); - cur_device_id = (cur_device_id + 1) % places_.size(); - } else { - if (IsSparseGradient(var_types, og)) { - CreateReduceOp(&result, og, 0); - CreateBroadcastOp(&result, og, 0); - } else { - InsertNCCLAllReduceOp(&result, og); - } + switch (strategy_.reduce_) { + case BuildStrategy::ReduceStrategy::kReduce: + CreateReduceOp(&result, og, cur_device_id); + var_name_on_devices[cur_device_id].emplace(og); + bcast_var_name_set[cur_device_id].emplace( + og.substr(0, og.size() - strlen(kGradVarSuffix))); + cur_device_id = (cur_device_id + 1) % places_.size(); + break; + case BuildStrategy::ReduceStrategy::kAllReduce: + if (IsSparseGradient(var_types, og)) { + CreateReduceOp(&result, og, 0); + CreateBroadcastOp(&result, og, 0); + } else { + InsertNCCLAllReduceOp(&result, og); + } + break; } } } @@ -308,7 +306,7 @@ bool MultiDevSSAGraphBuilder::IsParameterGradientOnce( int MultiDevSSAGraphBuilder::GetOpDeviceID( const std::vector> &var_name_on_devices, const OpDesc &op) const { - if (!balance_parameter_opt_between_cards_) { + if (strategy_.reduce_ != BuildStrategy::ReduceStrategy::kReduce) { return -1; } diff --git a/paddle/fluid/framework/details/multi_devices_graph_builder.h b/paddle/fluid/framework/details/multi_devices_graph_builder.h index 3a3e9e3b8538f52962e6a5ccd1a177e58d6c2f6b..4f708521884247fc013f0ae336ab683c3fe7ef2f 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_builder.h +++ b/paddle/fluid/framework/details/multi_devices_graph_builder.h @@ -17,6 +17,7 @@ #include #include +#include "paddle/fluid/framework/details/build_strategy.h" #include "paddle/fluid/framework/details/ssa_graph_builder.h" namespace paddle { @@ -36,15 +37,13 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder { const std::unordered_set ¶ms, const std::vector &local_scopes, platform::NCCLContextMap *nccl_ctxs, - bool use_default_grad_scale, - bool balance_parameter_opt_between_cards); + const BuildStrategy &strategy); #else MultiDevSSAGraphBuilder(const std::vector &places, const std::string &loss_var_name, const std::unordered_set ¶ms, const std::vector &local_scopes, - bool use_default_grad_scale, - bool balance_parameter_opt_between_cards); + const BuildStrategy &strategy); #endif std::unique_ptr Build(const ProgramDesc &program) const override; @@ -62,8 +61,6 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder { #ifdef PADDLE_WITH_CUDA platform::NCCLContextMap *nccl_ctxs_; #endif - bool balance_parameter_opt_between_cards_; - bool use_default_grad_scale_; bool IsScaleLossOp(const OpDesc &op) const; @@ -105,6 +102,9 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder { bool IsSparseGradient( const std::unordered_map &var_types, const std::string &og) const; + + private: + BuildStrategy strategy_; }; } // namespace details } // namespace framework diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc index e90523ebe8dc720d10034e3af9b0e51bb7a2fde9..ef263d82c5ec93f0673eb0ac70e4fb02904bff13 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc @@ -18,18 +18,17 @@ namespace paddle { namespace framework { namespace details { ThreadedSSAGraphExecutor::ThreadedSSAGraphExecutor( - size_t num_threads, bool use_event, - const std::vector &local_scopes, + const ExecutionStrategy &strategy, const std::vector &local_scopes, const std::vector &places, - std::unique_ptr &&graph, bool allow_op_delay) + std::unique_ptr &&graph) : SSAGraphExecutor(std::move(graph)), - pool_(num_threads >= 2 ? new ::ThreadPool(num_threads) : nullptr), + pool_(strategy.num_threads_ >= 2 ? new ::ThreadPool(strategy.num_threads_) + : nullptr), local_scopes_(local_scopes), places_(places), fetch_ctxs_(places), - use_event_(use_event), running_ops_(0), - allow_op_delay_(allow_op_delay) {} + strategy_(strategy) {} FeedFetchList ThreadedSSAGraphExecutor::Run( const std::vector &fetch_tensors) { @@ -86,7 +85,7 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( // // NOTE: DelayedOps have a lower priority. It will be scheduled after all // ready_ops have been performed. - if (ready_ops.empty() && allow_op_delay_ && running_ops_ == 0) { + if (ready_ops.empty() && strategy_.allow_op_delay_ && running_ops_ == 0) { run_all_ops(delayed_ops); } else { run_all_ops(ready_ops); @@ -113,7 +112,7 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( auto &deps = pending_ops[op]; --deps; if (deps == 0) { - if (op->IsMultiDeviceTransfer() && allow_op_delay_) { + if (op->IsMultiDeviceTransfer() && strategy_.allow_op_delay_) { delayed_ops.insert(op); } else { ready_ops.insert(op); @@ -191,7 +190,7 @@ void ThreadedSSAGraphExecutor::RunOp( auto op_run = [ready_var_q, op, this] { try { VLOG(10) << op << " " << op->Name() << " : " << op->DebugString(); - op->Run(use_event_); + op->Run(strategy_.use_event_); VLOG(10) << op << " " << op->Name() << " Done "; running_ops_--; ready_var_q->Extend(op->Outputs()); diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h index f18a88526b3238220fc56fd07299643d32c8b58b..1f7f88d75218e757e4555ad093f3cd6558f624dd 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h @@ -23,6 +23,7 @@ #include #include "ThreadPool.h" // ThreadPool in thrird party #include "paddle/fluid/framework/blocking_queue.h" +#include "paddle/fluid/framework/details/execution_strategy.h" #include "paddle/fluid/framework/details/fetch_op_handle.h" #include "paddle/fluid/framework/details/ssa_graph_executor.h" @@ -34,11 +35,10 @@ namespace details { class ThreadedSSAGraphExecutor : public SSAGraphExecutor { public: - ThreadedSSAGraphExecutor(size_t num_threads, bool use_event, + ThreadedSSAGraphExecutor(const ExecutionStrategy &strategy, const std::vector &local_scopes, const std::vector &places, - std::unique_ptr &&graph, - bool allow_op_delay); + std::unique_ptr &&graph); // Run a SSAGraph by a thread pool // Use topological sort algorithm @@ -55,10 +55,8 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor { std::vector local_scopes_; std::vector places_; platform::DeviceContextPool fetch_ctxs_; - const bool use_event_; std::unique_ptr exception_; std::atomic running_ops_; - bool allow_op_delay_; void InsertPendingOp(std::unordered_map *pending_ops, OpHandleBase *op_instance) const; @@ -74,6 +72,9 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor { std::unordered_map *pending_ops, std::unordered_set *pending_vars, BlockingQueue *ready_vars, FeedFetchList *fetch_data); + + private: + ExecutionStrategy strategy_; }; } // namespace details diff --git a/paddle/fluid/framework/framework.proto b/paddle/fluid/framework/framework.proto index 96f53dc1bc8747e1b8ea84166614f98ff363ae5e..d2558f111f49139b33f921f7260b41830279edc8 100644 --- a/paddle/fluid/framework/framework.proto +++ b/paddle/fluid/framework/framework.proto @@ -101,6 +101,8 @@ message VarType { FP16 = 4; FP32 = 5; FP64 = 6; + // Tensor is used in C++. + SIZE_T = 19; // Other types that may need additional descriptions LOD_TENSOR = 7; diff --git a/paddle/fluid/framework/op_kernel_type_test.cc b/paddle/fluid/framework/op_kernel_type_test.cc index d37ce149ce3df63692b41289bb03448d54e392f5..db95861c510b52a5b52229541434e6437d3fb9f4 100644 --- a/paddle/fluid/framework/op_kernel_type_test.cc +++ b/paddle/fluid/framework/op_kernel_type_test.cc @@ -27,7 +27,7 @@ TEST(OpKernelType, ToString) { LibraryType::kCUDNN); ASSERT_EQ(paddle::framework::KernelTypeToString(op_kernel_type), - "data_type[float32]:data_layout[NCHW]:place[CPUPlace]:library_type[" + "data_type[float]:data_layout[NCHW]:place[CPUPlace]:library_type[" "CUDNN]"); } diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 95e807c0afa45bc4f4feb84d450b2d0584bc3b28..50c3468d556bfe05d6c41906cf35cb671f711b1e 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -52,13 +52,12 @@ std::vector &ParallelExecutor::GetLocalScopes() { } ParallelExecutor::ParallelExecutor( - size_t num_threads, bool use_event, const std::vector &places, const std::unordered_set ¶ms, const std::unordered_set &bcast_vars, const ProgramDesc &main_program, const std::string &loss_var_name, - Scope *scope, const std::vector &local_scopes, bool allow_op_delay, - bool use_default_grad_scale, bool balance_parameter_opt_between_cards, + Scope *scope, const std::vector &local_scopes, + const ExecutionStrategy &exec_strategy, const BuildStrategy &build_strategy, size_t num_trainers, size_t trainer_id) : member_(new ParallelExecutorPrivate(places)) { member_->global_scope_ = scope; @@ -100,18 +99,16 @@ ParallelExecutor::ParallelExecutor( #ifdef PADDLE_WITH_CUDA details::MultiDevSSAGraphBuilder builder( member_->places_, loss_var_name, params, member_->local_scopes_, - member_->nccl_ctxs_.get(), use_default_grad_scale, - balance_parameter_opt_between_cards); + member_->nccl_ctxs_.get(), build_strategy); #else - details::MultiDevSSAGraphBuilder builder( - member_->places_, loss_var_name, params, member_->local_scopes_, - use_default_grad_scale, balance_parameter_opt_between_cards); + details::MultiDevSSAGraphBuilder builder(member_->places_, loss_var_name, + params, member_->local_scopes_, + build_strategy); #endif auto graph = builder.Build(main_program); member_->executor_.reset(new details::ThreadedSSAGraphExecutor( - num_threads, use_event, member_->local_scopes_, places, std::move(graph), - allow_op_delay)); + exec_strategy, member_->local_scopes_, places, std::move(graph))); // Step 3. Create vars in each scope; for (auto *var : main_program.Block(0).AllVars()) { diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index 9e279876cfeef20a1921f8bd1c27046a477b9f56..5247e790649e76567f4527d54499d6e95dac5c27 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -14,57 +14,60 @@ limitations under the License. */ #pragma once +#include #include #include #include +#include "paddle/fluid/framework/details/execution_strategy.h" #include "paddle/fluid/framework/executor.h" #include "paddle/fluid/framework/op_info.h" #include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/tensor.h" #include "paddle/fluid/platform/device_context.h" - namespace paddle { namespace framework { class ParallelExecutorPrivate; +using details::BuildStrategy; +using details::ExecutionStrategy; + class ParallelExecutor { DISABLE_COPY_AND_ASSIGN(ParallelExecutor); public: - explicit ParallelExecutor(size_t num_threads, bool use_event, - const std::vector& places, - const std::unordered_set& params, - const std::unordered_set& bcast_vars, - const ProgramDesc& main_program, - const std::string& loss_var_name, Scope* scope, - const std::vector& local_scopes, - bool allow_op_delay, bool use_default_grad_scale, - bool balance_parameter_opt_between_cards, + explicit ParallelExecutor(const std::vector &places, + const std::unordered_set ¶ms, + const std::unordered_set &bcast_vars, + const ProgramDesc &main_program, + const std::string &loss_var_name, Scope *scope, + const std::vector &local_scopes, + const ExecutionStrategy &exec_strategy, + const BuildStrategy &build_strategy, size_t num_trainers = 1, size_t trainer_id = 0); ~ParallelExecutor(); - std::vector& GetLocalScopes(); + std::vector &GetLocalScopes(); /** * Feed tensors to local scopes. The size of tensors should be equal to the * size of local scopes. */ void FeedTensorsIntoLocalScopes( - const std::vector>& tensors); + const std::vector> &tensors); void FeedAndSplitTensorIntoLocalScopes( - const std::unordered_map& tensors); + const std::unordered_map &tensors); - void Run(const std::vector& fetch_tensors, - const std::string& fetched_var_name); + void Run(const std::vector &fetch_tensors, + const std::string &fetched_var_name); - void BCastParamsToGPUs(const std::unordered_set& vars) const; + void BCastParamsToGPUs(const std::unordered_set &vars) const; private: - ParallelExecutorPrivate* member_; + ParallelExecutorPrivate *member_; }; } // namespace framework diff --git a/paddle/fluid/framework/tensor_impl.h b/paddle/fluid/framework/tensor_impl.h index f49d1a47a325b2aac6185073203df124be18b54d..0a1db7758bd9ec0dac133efcbf495de1d690021d 100644 --- a/paddle/fluid/framework/tensor_impl.h +++ b/paddle/fluid/framework/tensor_impl.h @@ -13,54 +13,14 @@ See the License for the specific language governing permissions and limitations under the License. */ #pragma once +#include "paddle/fluid/framework/data_type.h" #include "paddle/fluid/memory/memcpy.h" #include "paddle/fluid/platform/enforce.h" #include "paddle/fluid/platform/float16.h" namespace paddle { namespace framework { - -template -struct SizeOfTypeFunctor; - -template -struct SizeOfTypeFunctor { - size_t operator()(std::type_index type) const { - if (typeid(T).hash_code() == type.hash_code()) { - return sizeof(T); - } else { - return 0UL; - } - } -}; - -template <> -struct SizeOfTypeFunctor<> { - size_t operator()(std::type_index type) const { return 0UL; } -}; - -template -struct SizeOfTypeFunctor { - size_t operator()(std::type_index type) const { - SizeOfTypeFunctor head; - size_t head_size = head(type); - if (head_size != 0) { - return head_size; - } - SizeOfTypeFunctor tail; - return tail(type); - } -}; - -static inline size_t SizeOfType(std::type_index type) { - SizeOfTypeFunctor - functor; - size_t size = functor(type); - PADDLE_ENFORCE(size != 0UL, "Cannot get size of type %s", type.name()); - return size; -} - +extern size_t SizeOfType(std::type_index type); inline void Tensor::check_memory_size() const { PADDLE_ENFORCE_NOT_NULL( holder_, "Tensor holds no memory. Call Tensor::mutable_data first."); diff --git a/paddle/fluid/inference/analysis/device.h b/paddle/fluid/inference/analysis/device.h index 4423af842d28566fea419b8099efc3bda33787f4..585c9923291e5f9cb6e50dbc4bcd28c374191048 100644 --- a/paddle/fluid/inference/analysis/device.h +++ b/paddle/fluid/inference/analysis/device.h @@ -11,6 +11,7 @@ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ +#pragma once namespace paddle { namespace inference { diff --git a/paddle/fluid/inference/analysis/node.h b/paddle/fluid/inference/analysis/node.h index 59ba977798481684114d1189056be00bbb7777cf..07cb7669f98237399c4165947a03c67ce2a86aa8 100644 --- a/paddle/fluid/inference/analysis/node.h +++ b/paddle/fluid/inference/analysis/node.h @@ -19,6 +19,7 @@ limitations under the License. */ */ #pragma once +#include #include #include #include diff --git a/paddle/fluid/operators/detail/grpc_server.cc b/paddle/fluid/operators/detail/grpc_server.cc index e6ee28ea8d920ef80fead258a9bd0d5f6762c879..d09f8479b765ad26cc202bfdb2692828213c7956 100644 --- a/paddle/fluid/operators/detail/grpc_server.cc +++ b/paddle/fluid/operators/detail/grpc_server.cc @@ -306,7 +306,7 @@ void AsyncGRPCServer::TryToRegisterNewPrefetchOne() { } RequestPrefetch* prefetch = new RequestPrefetch(&service_, cq_prefetch_.get(), sync_mode_, scope_, - dev_ctx_, executor_, program_, prefetch_ctx_); + dev_ctx_, executor_, program_, prefetch_ctx_.get()); VLOG(4) << "Create RequestPrefetch status:" << prefetch->Status(); } diff --git a/paddle/fluid/operators/detail/grpc_server.h b/paddle/fluid/operators/detail/grpc_server.h index 18f1bc53d0f561f412a5bbbe018bc3d427ac9ef9..238aaa29634a7eff65429c27aa3538a185723eb2 100644 --- a/paddle/fluid/operators/detail/grpc_server.h +++ b/paddle/fluid/operators/detail/grpc_server.h @@ -64,8 +64,9 @@ class AsyncGRPCServer final { void SetExecutor(framework::Executor *executor) { executor_ = executor; } - void SetPrefetchPreparedCtx(framework::ExecutorPrepareContext *prepared) { - prefetch_ctx_ = prepared; + void SetPrefetchPreparedCtx( + std::unique_ptr prepared) { + prefetch_ctx_.reset(prepared.release()); } int GetSelectedPort() const { return selected_port_; } @@ -116,7 +117,7 @@ class AsyncGRPCServer final { std::unique_ptr t_get_; std::unique_ptr t_prefetch_; - framework::ExecutorPrepareContext *prefetch_ctx_; + std::unique_ptr prefetch_ctx_; framework::ProgramDesc *program_; framework::Executor *executor_; int selected_port_; diff --git a/paddle/fluid/operators/detail/grpc_server_test.cc b/paddle/fluid/operators/detail/grpc_server_test.cc index 25b95d608d10d6e456d5f563ce9fbe35d812cb0f..b8db0ad987cdfaec1fc9236c3f26e88891376dce 100644 --- a/paddle/fluid/operators/detail/grpc_server_test.cc +++ b/paddle/fluid/operators/detail/grpc_server_test.cc @@ -100,7 +100,7 @@ void StartServer(const std::string& endpoint) { InitTensorsOnServer(&scope, &place, 10); rpc_service_->SetProgram(&program); - rpc_service_->SetPrefetchPreparedCtx(prepared.get()); + rpc_service_->SetPrefetchPreparedCtx(std::move(prepared)); rpc_service_->SetDevCtx(&ctx); rpc_service_->SetScope(&scope); rpc_service_->SetExecutor(&exe); diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index a29e0cd52cfccf242a6490822234045e6eb66c0f..abc88d3eb1514e159f4a880f44ecc0c0960a73d9 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -322,8 +322,7 @@ void ListenAndServOp::RunImpl(const framework::Scope &scope, // prepare for prefetch VLOG(3) << "prefetch block id is " << prefetch_block->ID(); auto prefetch_prepared = executor.Prepare(*program, prefetch_block->ID()); - rpc_service_->SetPrefetchPreparedCtx(prefetch_prepared.get()); - prefetch_prepared.release(); + rpc_service_->SetPrefetchPreparedCtx(std::move(prefetch_prepared)); // start the server listening after all member initialized. server_thread_.reset(new std::thread(RunServer, rpc_service_)); diff --git a/paddle/fluid/operators/load_combine_op.cc b/paddle/fluid/operators/load_combine_op.cc index b5522dd246f250f02d69c0ba749ae6043eb810d6..0522a94195786c767194ec727d982a60451e7c62 100644 --- a/paddle/fluid/operators/load_combine_op.cc +++ b/paddle/fluid/operators/load_combine_op.cc @@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ #include - +#include "paddle/fluid/framework/data_type_transform.h" #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/platform/device_context.h" @@ -31,6 +31,7 @@ class LoadCombineOp : public framework::OperatorBase { void RunImpl(const framework::Scope &scope, const platform::Place &place) const override { auto filename = Attr("file_path"); + auto load_as_fp16 = Attr("load_as_fp16"); std::ifstream fin(filename); PADDLE_ENFORCE(static_cast(fin), @@ -59,17 +60,25 @@ class LoadCombineOp : public framework::OperatorBase { // Get data from fin to tensor DeserializeFromStream(fin, tensor, dev_ctx); - if (platform::is_gpu_place(place)) { - // copy CPU to GPU - framework::LoDTensor cpu_tensor; - cpu_tensor.ShareDataWith(*tensor); - cpu_tensor.set_lod(tensor->lod()); - - // reset tensor + auto in_dtype = framework::ToDataType(tensor->type()); + auto out_dtype = + load_as_fp16 ? framework::proto::VarType::FP16 : in_dtype; + + if (in_dtype != out_dtype) { + // convert to float16 tensor + auto in_kernel_type = framework::OpKernelType(in_dtype, place); + auto out_kernel_type = framework::OpKernelType(out_dtype, place); + framework::LoDTensor fp16_tensor; + // copy LoD info to the new tensor + fp16_tensor.set_lod(tensor->lod()); + framework::TransDataType(in_kernel_type, out_kernel_type, *tensor, + &fp16_tensor); + + // reset output tensor out_var->Clear(); tensor = out_var->GetMutable(); - tensor->set_lod(cpu_tensor.lod()); - TensorCopy(cpu_tensor, place, dev_ctx, tensor); + tensor->set_lod(fp16_tensor.lod()); + tensor->ShareDataWith(fp16_tensor); } } } @@ -82,6 +91,13 @@ class LoadCombineOpProtoMaker : public framework::OpProtoAndCheckerMaker { "Out", "(vector) The output LoDTensors that will be read from the input file.") .AsDuplicable(); + AddAttr( + "load_as_fp16", + "(boolean, default false)" + "If true, the tensor will be first loaded and then " + "converted to float16 data type. Otherwise, the tensor will be " + "directly loaded without data type conversion.") + .SetDefault(false); AddAttr("file_path", "(string) " "LoDTensors will be loaded from \"file_path\".") diff --git a/paddle/fluid/operators/save_load_combine_op_test.cc b/paddle/fluid/operators/save_load_combine_op_test.cc index 47618c51d98eb9f58988f82c0aee0083565d81a6..4743e0d9499b111d8baa921dbb245431713fd7a8 100644 --- a/paddle/fluid/operators/save_load_combine_op_test.cc +++ b/paddle/fluid/operators/save_load_combine_op_test.cc @@ -139,8 +139,9 @@ TEST(SaveLoadCombineOp, CPU) { CheckValues(expect4, actual4, expect_lod4, actual_lod4, numel4); } -// FP16 version of SaveLoadCombineOp Test -TEST(SaveLoadCombineFP16Op, CPU) { +// FP16 version of SaveLoadCombineOp Test, only altering the saving aspect +// to save as FP16. +TEST(SaveCombineFP16Op, CPU) { paddle::framework::Scope scope; paddle::platform::CPUPlace place; @@ -169,7 +170,7 @@ TEST(SaveLoadCombineFP16Op, CPU) { 20, 50, lod4, "test_var4", place, &scope, &expect_lod4); // Set attributes - std::string filename = "check_tensor_fp16.ls"; + std::string filename = "check_tensor_fp16_save.ls"; paddle::framework::AttributeMap attrs; attrs.insert({"file_path", std::string(filename)}); attrs.insert({"save_as_fp16", true}); @@ -216,6 +217,89 @@ TEST(SaveLoadCombineFP16Op, CPU) { actual_lod4, numel4); } +// FP16 version of SaveLoadCombineOp Test, only altering the loading aspect +// to load tensors with FP16 precision. +TEST(LoadCombineFP16Op, CPU) { + paddle::framework::Scope scope; + paddle::platform::CPUPlace place; + + std::vector lod1 = {0, 1, 2, 3, 10}; + int numel1 = 100; + paddle::framework::LoD expect_lod1; + float* expect1 = CreateForSaveCombineOp( + 10, 10, lod1, "test_var1", place, &scope, &expect_lod1); + + std::vector lod2 = {0, 2, 5, 10}; + int numel2 = 200; + paddle::framework::LoD expect_lod2; + float* expect2 = CreateForSaveCombineOp( + 10, 20, lod2, "test_var2", place, &scope, &expect_lod2); + + std::vector lod3 = {0, 20}; + int numel3 = 4000; + paddle::framework::LoD expect_lod3; + float* expect3 = CreateForSaveCombineOp( + 20, 200, lod3, "test_var3", place, &scope, &expect_lod3); + + std::vector lod4 = {0, 1, 20}; + int numel4 = 1000; + paddle::framework::LoD expect_lod4; + float* expect4 = CreateForSaveCombineOp( + 20, 50, lod4, "test_var4", place, &scope, &expect_lod4); + + // Set attributes + std::string filename = "check_tensor_fp16_load.ls"; + paddle::framework::AttributeMap attrs; + attrs.insert({"file_path", std::string(filename)}); + + // Run the save_combine_op + auto save_combine_op = paddle::framework::OpRegistry::CreateOp( + "save_combine", + {{"X", {"test_var1", "test_var2", "test_var3", "test_var4"}}}, {}, attrs); + save_combine_op->Run(scope, place); + + // Set up output vars + auto load_var1 = scope.Var("out_var1"); + auto load_var2 = scope.Var("out_var2"); + auto load_var3 = scope.Var("out_var3"); + auto load_var4 = scope.Var("out_var4"); + + attrs.insert({"load_as_fp16", true}); + // Run the load_combine_op + auto load_combine_op = paddle::framework::OpRegistry::CreateOp( + "load_combine", {}, + {{"Out", {"out_var1", "out_var2", "out_var3", "out_var4"}}}, attrs); + load_combine_op->Run(scope, place); + + auto* target1 = load_var1->GetMutable(); + auto* target2 = load_var2->GetMutable(); + auto* target3 = load_var3->GetMutable(); + auto* target4 = load_var4->GetMutable(); + + paddle::framework::LoD actual_lod1, actual_lod2, actual_lod3, actual_lod4; + paddle::platform::float16* actual1 = + GetValuesAfterLoadCombineOp(target1, scope, + &actual_lod1); + paddle::platform::float16* actual2 = + GetValuesAfterLoadCombineOp(target2, scope, + &actual_lod2); + paddle::platform::float16* actual3 = + GetValuesAfterLoadCombineOp(target3, scope, + &actual_lod3); + paddle::platform::float16* actual4 = + GetValuesAfterLoadCombineOp(target4, scope, + &actual_lod4); + + CheckValues(expect1, actual1, expect_lod1, + actual_lod1, numel1); + CheckValues(expect2, actual2, expect_lod2, + actual_lod2, numel2); + CheckValues(expect3, actual3, expect_lod3, + actual_lod3, numel3); + CheckValues(expect4, actual4, expect_lod4, + actual_lod4, numel4); +} + // Test with original SaveLoadTest TEST(SaveLoadTestWithCombineOp, CPU) { paddle::framework::Scope scope; diff --git a/paddle/fluid/platform/nccl_helper.h b/paddle/fluid/platform/nccl_helper.h index e30c1a9ebf08365a9856fb32b1ce5790869e2b33..09367889a9517956ad01ad2847c31e2633cc643d 100644 --- a/paddle/fluid/platform/nccl_helper.h +++ b/paddle/fluid/platform/nccl_helper.h @@ -53,7 +53,7 @@ class NCCLGroupGuard { } inline ~NCCLGroupGuard() { - PADDLE_ENFORCE(dynload::ncclGroupEnd()); + CHECK_EQ(dynload::ncclGroupEnd(), ncclSuccess); NCCLMutex().unlock(); } }; diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index b62291a99d34457dd17bf2bcafc1fc611419f086..50a1c07251b5bc4e7cc27de63f5457d3f94daef5 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -494,23 +494,61 @@ All parameter, weight, gradient are variables in Paddle. m.def("disable_profiler", platform::DisableProfiler); m.def("reset_profiler", platform::ResetProfiler); - py::class_(m, "ParallelExecutor") - .def("__init__", - [](ParallelExecutor &self, size_t num_threads, bool use_event, - const std::vector &places, - const std::unordered_set ¶ms, - const std::unordered_set &bcast_vars, - const ProgramDesc &main_program, const std::string &loss_var_name, - Scope *scope, std::vector &local_scopes, - bool allow_op_delay, bool use_default_grad_scale, - bool balance_parameter_opt_between_cards, size_t num_trainers, - size_t trainer_id) { - new (&self) ParallelExecutor( - num_threads, use_event, places, params, bcast_vars, - main_program, loss_var_name, scope, local_scopes, - allow_op_delay, use_default_grad_scale, - balance_parameter_opt_between_cards, num_trainers, trainer_id); - }) + // -- python binds for parallel executor. + py::class_ pe(m, "ParallelExecutor"); + py::class_(pe, "ExecutionStrategy") + .def(py::init()) + .def_property( + "num_threads", + [](const ExecutionStrategy &self) { return self.num_threads_; }, + [](ExecutionStrategy &self, size_t num_threads) { + self.num_threads_ = num_threads; + }) + .def_property( + "use_event", + [](const ExecutionStrategy &self) { return self.use_event_; }, + [](ExecutionStrategy &self, bool use_event) { + self.use_event_ = use_event; + }) + .def_property( + "allow_op_delay", + [](const ExecutionStrategy &self) { return self.allow_op_delay_; }, + [](ExecutionStrategy &self, bool allow_op_delay) { + self.allow_op_delay_ = allow_op_delay; + }); + py::class_ build_strategy(pe, "BuildStrategy"); + + py::enum_(build_strategy, "ReduceStrategy") + .value("Reduce", BuildStrategy::ReduceStrategy::kReduce) + .value("AllReduce", BuildStrategy::ReduceStrategy::kAllReduce); + py::enum_(build_strategy, + "GradientScaleStrategy") + .value("CoeffNumDevice", + BuildStrategy::GradientScaleStrategy::kCoeffNumDevice) + .value("One", BuildStrategy::GradientScaleStrategy::kOne) + .value("Customized", BuildStrategy::GradientScaleStrategy::kCustomized); + + build_strategy.def(py::init()) + .def_property( + "reduce_strategy", + [](const BuildStrategy &self) { return self.reduce_; }, + [](BuildStrategy &self, BuildStrategy::ReduceStrategy strategy) { + self.reduce_ = strategy; + }) + .def_property( + "gradient_scale_strategy", + [](const BuildStrategy &self) { return self.gradient_scale_; }, + [](BuildStrategy &self, + BuildStrategy::GradientScaleStrategy strategy) { + self.gradient_scale_ = strategy; + }); + + pe.def(py::init &, + const std::unordered_set &, + const std::unordered_set &, const ProgramDesc &, + const std::string &, Scope *, std::vector &, + const ExecutionStrategy &, const BuildStrategy &, size_t, + size_t>()) .def("bcast_params", &ParallelExecutor::BCastParamsToGPUs) // NOTE: even we return a vec* to Python use reference policy. // We still cannot get local_scope from this vector, since the element diff --git a/python/paddle/fluid/__init__.py b/python/paddle/fluid/__init__.py index c8a435748dc5b51bf9e57b5b597e1422f0380e8e..67aa5ec9979dbe3fcdb037e38ad94329d294cdcc 100644 --- a/python/paddle/fluid/__init__.py +++ b/python/paddle/fluid/__init__.py @@ -44,42 +44,44 @@ import transpiler from param_attr import ParamAttr, WeightNormParamAttr from data_feeder import DataFeeder from core import LoDTensor, CPUPlace, CUDAPlace, CUDAPinnedPlace -from transpiler import DistributeTranspiler, SimpleDistributeTranspiler, InferenceTranspiler, memory_optimize, release_memory +from transpiler import DistributeTranspiler, SimpleDistributeTranspiler, \ + InferenceTranspiler, memory_optimize, release_memory from concurrency import (Go, make_channel, channel_send, channel_recv, channel_close, Select) import clip import profiler import unique_name import recordio_writer -from parallel_executor import ParallelExecutor +import parallel_executor +from parallel_executor import * Tensor = LoDTensor -__all__ = framework.__all__ + executor.__all__ + concurrency.__all__ +\ - trainer.__all__ + inferencer.__all__ + transpiler.__all__ + [ - 'io', - 'initializer', - 'layers', - 'transpiler' - 'nets', - 'optimizer', - 'learning_rate_decay', - 'backward', - 'regularizer', - 'LoDTensor', - 'CPUPlace', - 'CUDAPlace', - 'CUDAPinnedPlace', - 'Tensor', - 'ParamAttr', - 'WeightNormParamAttr', - 'DataFeeder', - 'clip', - 'profiler', - 'unique_name', - 'recordio_writer', - 'ParallelExecutor', -] +__all__ = framework.__all__ + executor.__all__ + concurrency.__all__ + \ + trainer.__all__ + inferencer.__all__ + transpiler.__all__ + \ + parallel_executor.__all__ + [ + 'io', + 'initializer', + 'layers', + 'transpiler' + 'nets', + 'optimizer', + 'learning_rate_decay', + 'backward', + 'regularizer', + 'LoDTensor', + 'CPUPlace', + 'CUDAPlace', + 'CUDAPinnedPlace', + 'Tensor', + 'ParamAttr', + 'WeightNormParamAttr', + 'DataFeeder', + 'clip', + 'profiler', + 'unique_name', + 'recordio_writer', + ] def __bootstrap__(): diff --git a/python/paddle/fluid/backward.py b/python/paddle/fluid/backward.py index fea509874d22144c21e7d482f2622c91ee70e3a7..d90e27822239c518ee7f7cfa9397e275e60f9c99 100644 --- a/python/paddle/fluid/backward.py +++ b/python/paddle/fluid/backward.py @@ -498,6 +498,8 @@ def append_backward(loss, parameter_list=None, no_grad_set=None, program.current_block_idx = current_block_idx program.sync_with_cpp() + # FIXME(zcd): prevent loss.grad optimized by mem_opt. + loss.block.var(_append_grad_suffix_(loss.name)).persistable = True if parameter_list is not None: parameters = parameter_list diff --git a/python/paddle/fluid/inferencer.py b/python/paddle/fluid/inferencer.py index 1b8b9c07622dce80245c69484bb6abf737ffca83..56c008d1af70f4b5f6169ebe5174b08fcf8bc722 100644 --- a/python/paddle/fluid/inferencer.py +++ b/python/paddle/fluid/inferencer.py @@ -13,29 +13,35 @@ # limitations under the License. import core -import framework + import executor +import framework import io +import unique_name from trainer import check_and_get_place __all__ = ['Inferencer', ] class Inferencer(object): - def __init__(self, param_path, place=None): + def __init__(self, infer_func, param_path, place=None): """ - :param param_path: the path where the inference model is saved by fluid.io.save_inference_model + :param infer_func: a function that will return predict Variable + :param param_path: the path where the inference model is saved by fluid.io.save_params :param place: place to do the inference """ self.param_path = param_path self.scope = core.Scope() + self.inference_program = framework.Program() + with framework.program_guard(self.inference_program): + with unique_name.guard(): + self.predict_var = infer_func() + self.exe = executor.Executor(check_and_get_place(place)) with executor.scope_guard(self.scope): # load params from param_path into scope - [self.inference_program, _, - self.fetch_targets] = io.load_inference_model( - executor=self.exe, dirname=param_path) + io.load_params(self.exe, param_path, self.inference_program) def infer(self, inputs, return_numpy=True): """ @@ -51,7 +57,7 @@ class Inferencer(object): with executor.scope_guard(self.scope): results = self.exe.run(self.inference_program, feed=inputs, - fetch_list=self.fetch_targets, + fetch_list=[self.predict_var], return_numpy=return_numpy) return results diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index 7358c4b60e87893b9c04e3da2221dfb69d3ba0c7..3117dfe00c7a3df1035c439dc31b81e67781d0cc 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -19,7 +19,10 @@ import executor import warnings import sys -__all__ = ['ParallelExecutor'] +__all__ = ['ParallelExecutor', 'ExecutionStrategy', 'BuildStrategy'] + +ExecutionStrategy = core.ParallelExecutor.ExecutionStrategy +BuildStrategy = core.ParallelExecutor.BuildStrategy class ParallelExecutor(object): @@ -27,13 +30,12 @@ class ParallelExecutor(object): use_cuda, loss_name=None, main_program=None, - num_threads=None, - allow_op_delay=False, share_vars_from=None, - use_default_grad_scale=True, - balance_parameter_opt_between_cards=False, + exec_strategy=None, + build_strategy=None, num_trainers=1, - trainer_id=0): + trainer_id=0, + **kwargs): """ ParallelExecutor can run program in parallel. @@ -42,21 +44,8 @@ class ParallelExecutor(object): loss_name(str, default None): The loss name must set in training. main_program(Program, default None): The program that need to run, if not provided, then default_main_program will be used. - num_threads(int, default None): How many threads are used for - training. - allow_op_delay(bool, default False): Whether to delay and buffer - some operators together for scheduling or not, which may - improve performance in some cases, default False. share_vars_from(ParallelExecutor, default None): If provied, it will share variables from the specified ParallelExecutor. - use_default_grad_scale(bool, default True): If set True, a default - scale value equal to `1./device_count` would be multiplied to - gradients of each device and scaled gradients would be - aggregated. Otherwise, a customized scale value should be fed - to the network. - balance_parameter_opt_between_cards(bool, default True): Whether - updating different gradients on different cards. Currently, it - is not recommended. num_trainers(int, default 1): If greater than 1, NCCL will be initialized with multpile rank of nodes, each node should have same number of GPUs. Distributed training will be enabled then. @@ -83,6 +72,25 @@ class ParallelExecutor(object): train_loss, = train_exe.run([loss.name], feed=feed_dict) test_loss, = test_exe.run([loss.name], feed=feed_dict) """ + if len(kwargs) != 0: + err_msg = "" + for key in kwargs: + if key in dir(ExecutionStrategy): + err_msg += \ + "Setting {0} by constructor is deprecated. Use " \ + "strategy=ExecutionStrategy(); strategy.{0}=xxx; " \ + "pe=ParallelExecutor(exec_strategy=strategy) " \ + "instead.\n ".format(key) + elif key in dir(BuildStrategy): + err_msg += \ + "Setting {0} by constructor is deprecated. Use " \ + "strategy=BuildStrategy(); See help(" \ + "paddle.fluid.ParallelExecutor.BuildStrategy) \n".format( + key) + else: + err_msg += "Setting {0} by constructor is deprecated. Use strategy.\n".format( + key) + raise ValueError(err_msg) self._places = [] self._act_places = [] @@ -100,15 +108,25 @@ class ParallelExecutor(object): self._places.append(p) assert self._places, "no place for execution" - if num_threads is None: + if exec_strategy is None: + exec_strategy = ExecutionStrategy() + if use_cuda: + exec_strategy.use_event = True + else: + exec_strategy.use_event = False + + if exec_strategy.num_threads == 0: if use_cuda: # Experiments on se-resnext shows that too many threads hurt # performance. Worth tunning for other models in the future. - num_threads = len(self._places) * 2 + exec_strategy.num_threads = len(self._places) * 2 else: - num_threads = min( + exec_strategy.num_threads = min( len(self._places) * 2, multiprocessing.cpu_count()) + if build_strategy is None: + build_strategy = BuildStrategy() + main = main_program main = main if main else framework.default_main_program() scope = executor.global_scope() @@ -127,23 +145,14 @@ class ParallelExecutor(object): ] self.executor = core.ParallelExecutor( - num_threads, - True if use_cuda else False, # use_event self._places, set([ p.name for p in main.global_block().iter_parameters() if not p.stop_gradient ]), - set(self.persistable_vars), - main.desc, - loss_name if loss_name else '', - scope, - local_scopes, - allow_op_delay, - use_default_grad_scale, - balance_parameter_opt_between_cards, - num_trainers, - trainer_id) + set(self.persistable_vars), main.desc, loss_name + if loss_name else '', scope, local_scopes, exec_strategy, + build_strategy, num_trainers, trainer_id) self.scope = scope def run(self, fetch_list, feed=None, feed_dict=None): diff --git a/python/paddle/fluid/tests/book/high-level-api/fit_a_line/test_fit_a_line.py b/python/paddle/fluid/tests/book/high-level-api/fit_a_line/test_fit_a_line.py index 8c9bbb52d769282460c571ebc51d5eff18de3114..fbcf2a282f6421a546723a1d429c59fb304a0cc2 100644 --- a/python/paddle/fluid/tests/book/high-level-api/fit_a_line/test_fit_a_line.py +++ b/python/paddle/fluid/tests/book/high-level-api/fit_a_line/test_fit_a_line.py @@ -48,12 +48,11 @@ def linear(): return avg_loss -def train(use_cuda, save_dirname): +def train(use_cuda, train_program, save_dirname): place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() trainer = fluid.Trainer( - train_func=linear, - infer_func=inference_program, + train_func=train_program, place=place, optimizer=fluid.optimizer.SGD(learning_rate=0.001)) @@ -72,11 +71,7 @@ def train(use_cuda, save_dirname): ''' if float(test_metrics[0]) < 20.0: if save_dirname is not None: - # NOT clear yet - # fluid.io.save_inference_model(save_dirname, ['x'], [y_predict]) - # trainer.save_params(save_dirname) - # https://github.com/PaddlePaddle/Paddle/pull/10445 - trainer.save_inference_model(save_dirname) + trainer.save_params(save_dirname) return trainer.train( @@ -87,12 +82,13 @@ def train(use_cuda, save_dirname): # infer -def infer(use_cuda, save_dirname=None): +def infer(use_cuda, inference_program, save_dirname=None): if save_dirname is None: return place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() - inferencer = fluid.Inferencer(param_path=save_dirname, place=place) + inferencer = fluid.Inferencer( + infer_func=inference_program, param_path=save_dirname, place=place) batch_size = 10 tensor_x = numpy.random.uniform(0, 10, [batch_size, 13]).astype("float32") @@ -108,8 +104,8 @@ def main(use_cuda): # Directory for saving the trained model save_dirname = "fit_a_line.inference.model" - train(use_cuda, save_dirname) - infer(use_cuda, save_dirname) + train(use_cuda, linear, save_dirname) + infer(use_cuda, inference_program, save_dirname) class TestFitALine(unittest.TestCase): diff --git a/python/paddle/fluid/tests/book/high-level-api/recognize_digits/test_recognize_digits_conv.py b/python/paddle/fluid/tests/book/high-level-api/recognize_digits/test_recognize_digits_conv.py index 1f91f471f22f7c8fafc80672e660ddccf6f7cc4a..420e6e6e42adc22508c414f2c2d1ba93aedd4753 100644 --- a/python/paddle/fluid/tests/book/high-level-api/recognize_digits/test_recognize_digits_conv.py +++ b/python/paddle/fluid/tests/book/high-level-api/recognize_digits/test_recognize_digits_conv.py @@ -53,48 +53,40 @@ def train_program(): predict = inference_program() cost = fluid.layers.cross_entropy(input=predict, label=label) avg_cost = fluid.layers.mean(cost) - # acc = fluid.layers.accuracy(input=predict, label=label) - # return avg_cost, acc - return avg_cost + acc = fluid.layers.accuracy(input=predict, label=label) + return [avg_cost, acc] -def train(use_cuda, save_dirname): +def train(use_cuda, train_program, save_dirname): place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() optimizer = fluid.optimizer.Adam(learning_rate=0.001) trainer = fluid.Trainer( - train_func=train_program, - infer_func=inference_program, - place=place, - optimizer=optimizer) + train_func=train_program, place=place, optimizer=optimizer) def event_handler(event): if isinstance(event, fluid.EndEpochEvent): - # if (event.epoch + 1) % 10 == 0: - # trainer.save_params(save_dirname) - trainer.save_inference_model(save_dirname) - - # TODO: Uncomment this part once we are sure that .train is working - # test_reader = paddle.batch( - # paddle.dataset.mnist.test(), batch_size=BATCH_SIZE) - # test_metrics = trainer.test(reader=test_reader) - # avg_cost_set = test_metrics[0] - # acc_set = test_metrics[1] - # - # # get test acc and loss - # acc = numpy.array(acc_set).mean() - # avg_cost = numpy.array(avg_cost_set).mean() - # - # print("avg_cost: %s" % avg_cost) - # print("acc : %s" % acc) - # - # if float(acc) > 0.2: # Smaller value to increase CI speed - # trainer.save_params(save_dirname) - # else: - # print('BatchID {0}, Test Loss {1:0.2}, Acc {2:0.2}'.format( - # event.epoch + 1, float(avg_cost), float(acc))) - # if math.isnan(float(avg_cost)): - # sys.exit("got NaN loss, training failed.") + test_reader = paddle.batch( + paddle.dataset.mnist.test(), batch_size=BATCH_SIZE) + test_metrics = trainer.test( + reader=test_reader, feed_order=['img', 'label']) + avg_cost_set = test_metrics[0] + acc_set = test_metrics[1] + + # get test acc and loss + acc = numpy.array(acc_set).mean() + avg_cost = numpy.array(avg_cost_set).mean() + + print("avg_cost: %s" % avg_cost) + print("acc : %s" % acc) + + if float(acc) > 0.2: # Smaller value to increase CI speed + trainer.save_params(save_dirname) + else: + print('BatchID {0}, Test Loss {1:0.2}, Acc {2:0.2}'.format( + event.epoch + 1, float(avg_cost), float(acc))) + if math.isnan(float(avg_cost)): + sys.exit("got NaN loss, training failed.") train_reader = paddle.batch( paddle.reader.shuffle( @@ -108,10 +100,11 @@ def train(use_cuda, save_dirname): feed_order=['img', 'label']) -def infer(use_cuda, save_dirname=None): +def infer(use_cuda, inference_program, save_dirname=None): place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() - inferencer = fluid.Inferencer(param_path=save_dirname, place=place) + inferencer = fluid.Inferencer( + infer_func=inference_program, param_path=save_dirname, place=place) batch_size = 1 tensor_img = numpy.random.uniform(-1.0, 1.0, @@ -126,8 +119,14 @@ def main(use_cuda): save_dirname = "recognize_digits_conv.inference.model" # call train() with is_local argument to run distributed train - train(use_cuda=use_cuda, save_dirname=save_dirname) - infer(use_cuda=use_cuda, save_dirname=save_dirname) + train( + use_cuda=use_cuda, + train_program=train_program, + save_dirname=save_dirname) + infer( + use_cuda=use_cuda, + inference_program=inference_program, + save_dirname=save_dirname) if __name__ == '__main__': diff --git a/python/paddle/fluid/tests/book/high-level-api/recognize_digits/test_recognize_digits_mlp.py b/python/paddle/fluid/tests/book/high-level-api/recognize_digits/test_recognize_digits_mlp.py index f072d70abdba5077bf0ac2ff6ff972e24c8226f5..9427a772f54fb58ca1f50ed792cccf5d8f9b3d84 100644 --- a/python/paddle/fluid/tests/book/high-level-api/recognize_digits/test_recognize_digits_mlp.py +++ b/python/paddle/fluid/tests/book/high-level-api/recognize_digits/test_recognize_digits_mlp.py @@ -40,47 +40,40 @@ def train_program(): predict = inference_program() cost = fluid.layers.cross_entropy(input=predict, label=label) avg_cost = fluid.layers.mean(cost) - # acc = fluid.layers.accuracy(input=predict, label=label) - # return avg_cost, acc - return avg_cost + acc = fluid.layers.accuracy(input=predict, label=label) + return [avg_cost, acc] -def train(use_cuda, save_dirname): +def train(use_cuda, train_program, save_dirname): place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() optimizer = fluid.optimizer.Adam(learning_rate=0.001) trainer = fluid.Trainer( - train_func=train_program, - infer_func=inference_program, - place=place, - optimizer=optimizer) + train_func=train_program, place=place, optimizer=optimizer) def event_handler(event): if isinstance(event, fluid.EndEpochEvent): - # if (event.epoch + 1) % 10 == 0: - trainer.save_inference_model(save_dirname) - - # TODO: Uncomment this part once we are sure that .train is working - # test_reader = paddle.batch( - # paddle.dataset.mnist.test(), batch_size=BATCH_SIZE) - # test_metrics = trainer.test(reader=test_reader) - # avg_cost_set = test_metrics[0] - # acc_set = test_metrics[1] - # - # # get test acc and loss - # acc = numpy.array(acc_set).mean() - # avg_cost = numpy.array(avg_cost_set).mean() - # - # print("avg_cost: %s" % avg_cost) - # print("acc : %s" % acc) - # - # if float(acc) > 0.2: # Smaller value to increase CI speed - # trainer.save_params(save_dirname) - # else: - # print('BatchID {0}, Test Loss {1:0.2}, Acc {2:0.2}'.format( - # event.epoch + 1, float(avg_cost), float(acc))) - # if math.isnan(float(avg_cost)): - # sys.exit("got NaN loss, training failed.") + test_reader = paddle.batch( + paddle.dataset.mnist.test(), batch_size=BATCH_SIZE) + test_metrics = trainer.test( + reader=test_reader, feed_order=['img', 'label']) + avg_cost_set = test_metrics[0] + acc_set = test_metrics[1] + + # get test acc and loss + acc = numpy.array(acc_set).mean() + avg_cost = numpy.array(avg_cost_set).mean() + + print("avg_cost: %s" % avg_cost) + print("acc : %s" % acc) + + if float(acc) > 0.2: # Smaller value to increase CI speed + trainer.save_params(save_dirname) + else: + print('BatchID {0}, Test Loss {1:0.2}, Acc {2:0.2}'.format( + event.epoch + 1, float(avg_cost), float(acc))) + if math.isnan(float(avg_cost)): + sys.exit("got NaN loss, training failed.") train_reader = paddle.batch( paddle.reader.shuffle( @@ -94,10 +87,11 @@ def train(use_cuda, save_dirname): feed_order=['img', 'label']) -def infer(use_cuda, save_dirname=None): +def infer(use_cuda, inference_program, save_dirname=None): place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() - inferencer = fluid.Inferencer(param_path=save_dirname, place=place) + inferencer = fluid.Inferencer( + infer_func=inference_program, param_path=save_dirname, place=place) batch_size = 1 tensor_img = numpy.random.uniform(-1.0, 1.0, @@ -112,8 +106,14 @@ def main(use_cuda): save_dirname = "recognize_digits_mlp.inference.model" # call train() with is_local argument to run distributed train - train(use_cuda=use_cuda, save_dirname=save_dirname) - infer(use_cuda=use_cuda, save_dirname=save_dirname) + train( + use_cuda=use_cuda, + train_program=train_program, + save_dirname=save_dirname) + infer( + use_cuda=use_cuda, + inference_program=inference_program, + save_dirname=save_dirname) if __name__ == '__main__': diff --git a/python/paddle/fluid/tests/book/high-level-api/word2vec/no_test_word2vec_new_api.py b/python/paddle/fluid/tests/book/high-level-api/word2vec/no_test_word2vec_new_api.py index 00ba4acf88b1b543c256a48709d7f514ebbe1e8c..4f861e5aaeca7ce0f73450c09f9ddc1ed7417469 100644 --- a/python/paddle/fluid/tests/book/high-level-api/word2vec/no_test_word2vec_new_api.py +++ b/python/paddle/fluid/tests/book/high-level-api/word2vec/no_test_word2vec_new_api.py @@ -90,7 +90,7 @@ def train_program(is_sparse): return avg_cost -def train(use_cuda, is_sparse, save_path): +def train(use_cuda, train_program, save_path): train_reader = paddle.batch( paddle.dataset.imikolov.train(word_dict, N), BATCH_SIZE) test_reader = paddle.batch( @@ -105,23 +105,21 @@ def train(use_cuda, is_sparse, save_path): print("loss= ", avg_cost) if avg_cost < 5.0: - trainer.save_inference_model(save_path) + trainer.save_params(save_path) return if math.isnan(avg_cost): sys.exit("got NaN loss, training failed.") trainer = fluid.Trainer( - partial(train_program, is_sparse), - partial(inference_program, is_sparse), - fluid.optimizer.SGD(learning_rate=0.001), - place=place) + train_program, fluid.optimizer.SGD(learning_rate=0.001), place=place) trainer.train( reader=train_reader, num_epochs=1, event_handler=event_handler) -def infer(use_cuda, is_sparse, save_path): +def infer(use_cuda, inference_program, save_path): place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() - inferencer = fluid.Inferencer(param_path=save_path, place=place) + inferencer = fluid.Inferencer( + infer_func=inference_program, param_path=save_path, place=place) lod = [0, 1] first_word = create_random_lodtensor(lod, place, low=0, high=dict_size - 1) @@ -144,9 +142,9 @@ def main(use_cuda, is_sparse): if use_cuda and not fluid.core.is_compiled_with_cuda(): return - save_path = "word2vec.inference.model" - train(use_cuda, is_sparse, save_path) - infer(use_cuda, is_sparse, save_path) + save_path = "word2vec.params" + train(use_cuda, partial(train_program, is_sparse), save_path) + infer(use_cuda, partial(inference_program, is_sparse), save_path) if __name__ == '__main__': diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor.py b/python/paddle/fluid/tests/unittests/test_parallel_executor.py index a3be1a8db68c0d9d46746e70d95342447c35e237..6dc016487fd81a9292f94042a20b7356bc50abe1 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor.py @@ -232,14 +232,18 @@ class TestParallelExecutorBase(unittest.TestCase): place = fluid.CUDAPlace(0) startup_exe = fluid.Executor(place) startup_exe.run(startup) + exec_strategy = fluid.ExecutionStrategy() + exec_strategy.allow_op_delay = allow_op_delay + + build_strategy = fluid.BuildStrategy() + build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce if balance_parameter_opt_between_cards else fluid.BuildStrategy.ReduceStrategy.AllReduce if use_parallel_executor: exe = fluid.ParallelExecutor( True, loss_name=loss.name, - allow_op_delay=allow_op_delay, - balance_parameter_opt_between_cards=balance_parameter_opt_between_cards - ) + exec_strategy=exec_strategy, + build_strategy=build_strategy) else: exe = fluid.Executor(place=place) @@ -548,7 +552,7 @@ class TestTransformer(TestParallelExecutorBase): class ParallelExecutorTestingDuringTraining(unittest.TestCase): - def check_network_convergence(self, balance_parameter_opt_between_cards): + def check_network_convergence(self, build_strategy=None): main = fluid.Program() startup = fluid.Program() with fluid.program_guard(main, startup): @@ -571,15 +575,13 @@ class ParallelExecutorTestingDuringTraining(unittest.TestCase): use_cuda=True, loss_name=loss.name, main_program=main, - balance_parameter_opt_between_cards=balance_parameter_opt_between_cards - ) + build_strategy=build_strategy) test_exe = fluid.ParallelExecutor( use_cuda=True, main_program=test_program, share_vars_from=train_exe, - balance_parameter_opt_between_cards=balance_parameter_opt_between_cards - ) + build_strategy=build_strategy) for i in xrange(5): test_loss, = test_exe.run([loss.name], feed=feed_dict) @@ -594,10 +596,14 @@ class ParallelExecutorTestingDuringTraining(unittest.TestCase): str(test_loss)) def test_parallel_testing(self): - self.check_network_convergence(False) + build_strategy = fluid.BuildStrategy() + build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce + self.check_network_convergence(build_strategy) def test_parallel_testing_with_new_strategy(self): - self.check_network_convergence(True) + build_strategy = fluid.BuildStrategy() + build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce + self.check_network_convergence(build_strategy) import paddle.dataset.conll05 as conll05 @@ -617,7 +623,7 @@ embedding_name = 'emb' def db_lstm(word, predicate, ctx_n2, ctx_n1, ctx_0, ctx_p1, ctx_p2, mark, - is_sparse, balance_parameter_opt_between_cards, **ignored): + is_sparse, **ignored): # 8 features predicate_embedding = fluid.layers.embedding( input=predicate, @@ -686,9 +692,7 @@ def db_lstm(word, predicate, ctx_n2, ctx_n1, ctx_0, ctx_p1, ctx_p2, mark, class TestCRFModel(unittest.TestCase): - def check_network_convergence(self, - is_sparse, - balance_parameter_opt_between_cards=False): + def check_network_convergence(self, is_sparse, build_strategy=None): main = fluid.Program() startup = fluid.Program() with fluid.program_guard(main, startup): @@ -739,8 +743,7 @@ class TestCRFModel(unittest.TestCase): pe = fluid.ParallelExecutor( use_cuda=True, loss_name=avg_cost.name, - balance_parameter_opt_between_cards=balance_parameter_opt_between_cards - ) + build_strategy=build_strategy) feeder = fluid.DataFeeder( feed_list=[ @@ -756,19 +759,29 @@ class TestCRFModel(unittest.TestCase): pe.run(feed=feeder.feed(cur_batch), fetch_list=[avg_cost.name]))[0] - def test_update_sparse_parameter(self): - self.check_network_convergence(is_sparse=True) + def test_update_sparse_parameter_all_reduce(self): + build_strategy = fluid.BuildStrategy() + build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce + self.check_network_convergence( + is_sparse=True, build_strategy=build_strategy) - def test_update_dense_parameter(self): - self.check_network_convergence(is_sparse=False) + def test_update_dense_parameter_all_reduce(self): + build_strategy = fluid.BuildStrategy() + build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce + self.check_network_convergence( + is_sparse=False, build_strategy=build_strategy) - def test_update_sparse_parameter_with_new_strategy(self): + def test_update_sparse_parameter_reduce(self): + build_strategy = fluid.BuildStrategy() + build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce self.check_network_convergence( - is_sparse=False, balance_parameter_opt_between_cards=True) + is_sparse=False, build_strategy=build_strategy) - def test_update_dense_parameter_with_new_strategy(self): + def test_update_dense_parameter_reduce(self): + build_strategy = fluid.BuildStrategy() + build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce self.check_network_convergence( - is_sparse=False, balance_parameter_opt_between_cards=True) + is_sparse=False, build_strategy=build_strategy) # test fetch all the variables of global_block @@ -836,7 +849,8 @@ class TestFetchOp(unittest.TestCase): assert not math.isnan(np.sum(ret[i])) and \ not math.isinf(np.sum(ret[i])) - def test_update_sparse_parameter(self): + @unittest.skip("this test is buggy") + def test_feed(self): tst_reader = paddle.batch(flowers.test(use_xmap=False), batch_size=16) tst_reader_iter = tst_reader() diff --git a/python/paddle/fluid/tests/unittests/test_split_var.py b/python/paddle/fluid/tests/unittests/test_split_var.py index 79d387f0066672058d1640f4e5fd28ed8913fe4c..0c5e8901b903375c7d4de32943e657b205d8fae9 100644 --- a/python/paddle/fluid/tests/unittests/test_split_var.py +++ b/python/paddle/fluid/tests/unittests/test_split_var.py @@ -21,15 +21,7 @@ import random class TestSplitVar(unittest.TestCase): - def test_check_output(self): - # split below shapes to 10 servers - shapes = [[3, 5], [1024], [28, 784], [8, 1020], [800, 10]] - expected_sizes = [ - [15], [1024], - [2352, 2352, 2352, 2352, 2352, 2352, 2352, 2352, 2352, 784], - [2040, 2040, 2040, 2040], - [1150, 1150, 1150, 1150, 1150, 1150, 1100] - ] + def check_split_output(self, shapes, expected_sizes, min_size): var_list = [] program = fluid.Program() for shape in shapes: @@ -39,7 +31,7 @@ class TestSplitVar(unittest.TestCase): # dtype=core.VarDesc.VarType.LOD_TENSOR, shape=shape) var_list.append(var) - blocks = split_dense_variable(var_list, 10) + blocks = split_dense_variable(var_list, 10, min_size) all_sizes = [] for s in expected_sizes: for s2 in s: @@ -48,6 +40,25 @@ class TestSplitVar(unittest.TestCase): varname, block_id, size = block_str.split(":") self.assertEqual(int(size), all_sizes[i]) + def test_1k(self): + shapes = [[3, 5], [1024], [28, 784], [8, 1020], [800, 10]] + expected_sizes = [ + [15], [1024], + [2352, 2352, 2352, 2352, 2352, 2352, 2352, 2352, 2352, 784], + [2040, 2040, 2040, 2040], + [1150, 1150, 1150, 1150, 1150, 1150, 1100] + ] + + self.check_split_output(shapes, expected_sizes, 1024) + + def test_check_output_8k(self): + shapes = [[3, 5], [1024], [28, 784], [8, 1020], [800, 10], + [6, 33, 33, 33]] + expected_sizes = [[15], [1024], [10976, 10976], [8160], [8000], + [35937, 35937, 35937, 35937, 35937, 35937]] + + self.check_split_output(shapes, expected_sizes, 8192) + if __name__ == '__main__': unittest.main() diff --git a/python/paddle/fluid/trainer.py b/python/paddle/fluid/trainer.py index 67d8be82d5fa850a526f92753a086dac0cab5e2c..c24662ac2114c286b1c50286fea1b65cf7c1b3a8 100644 --- a/python/paddle/fluid/trainer.py +++ b/python/paddle/fluid/trainer.py @@ -92,19 +92,13 @@ class Trainer(object): place: The device place of this trainer. """ - def __init__(self, - train_func, - infer_func, - optimizer, - param_path=None, - place=None): + def __init__(self, train_func, optimizer, param_path=None, place=None): # 1. we need to generate a framework.Program by calling # program_func. Reference: fluid.program_guard in # test_word2vec.py if not isinstance(optimizer, opt_module.Optimizer): raise TypeError("The optimizer should be an instance of Optimizer") - self.infer_func = infer_func self.scope = core.Scope() self.startup_program = framework.Program() @@ -178,9 +172,9 @@ class Trainer(object): def train(self, num_epochs, event_handler, - reader=None, - parallel=False, - feed_order=None): + reader, + feed_order, + parallel=False): """ Train the model. @@ -208,7 +202,7 @@ class Trainer(object): self._train_by_executor(num_epochs, event_handler, reader, feed_order) - def test(self, reader, feed_order=None): + def test(self, reader, feed_order): """ Test the model on given test data @@ -226,15 +220,6 @@ class Trainer(object): exe = executor.Executor(self.place) io.save_persistables(exe, dirname=param_path) - def save_inference_model(self, model_path): - inference_program = framework.Program() - with framework.program_guard(inference_program): - with unique_name.guard(): - predict_var = self.infer_func() - predict_var = self.train_program.block(0).var(predict_var.name) - exe = executor.Executor(self.place) - io.save_inference_model(model_path, [], [predict_var], exe) - @contextlib.contextmanager def _prog_and_scope_guard(self): with framework.program_guard( @@ -291,12 +276,7 @@ def build_feed_var_list(program, feed_order): if not isinstance(program, framework.Program): raise TypeError("The 'program' should be an object of Program") - if feed_order is None: - feed_var_list = [ - var for var in program.global_block().vars.itervalues() - if var.is_data - ] - elif isinstance(feed_order, list): + if isinstance(feed_order, list): feed_var_list = [ program.global_block().var(var_name) for var_name in feed_order ] diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index a323f8d03613e7c4149812daab4ccb57fb940a7e..42ff0a9eb1112ed5709749e3867794c80be8f1d1 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -93,30 +93,33 @@ def same_or_split_var(p_name, var_name): return p_name == var_name or p_name.startswith(var_name + ".block") -def split_dense_variable(var_list, - pserver_count, - min_block_size=1024, - max_block_size=1048576): +def split_dense_variable(var_list, service_count, min_block_size=8192): """ - We may need to split dense tensor to one or more blocks and put - them equally onto parameter server. One block is a sub-tensor - aligned by dim[0] of the tensor. - - We need to have a minimal block size so that the calculations in - the parameter server side can gain better performance. By default - minimum block size is 1024. The max block size is used to prevent - very large blocks that may cause send error. - :return: A list of VarBlocks. Each VarBlock specifies a shard of - the var. + We may need to split dense tensor to one or more blocks and put + them equally onto parameter server. One block is a sub-tensor + aligned by dim[0] of the tensor. + + We need to have a minimal block size so that the calculations in + the parameter server side can gain better performance. By default + minimum block size 8K elements (maybe 16bit or 32bit or 64bit). + + Args: + var_list (list): List of variables. + service_count (int): Numel of pserver services. A pserver may have two + or more listening ports. + min_block_size (int): Minimum splitted block size. + Returns: + blocks (list[(varname, block_id, current_block_size)]): A list + of VarBlocks. Each VarBlock specifies a shard of the var. """ blocks = [] for var in var_list: - split_count = pserver_count + split_count = service_count var_numel = reduce(lambda x, y: x * y, var.shape) max_pserver_count = int(math.floor(var_numel / float(min_block_size))) if max_pserver_count == 0: max_pserver_count = 1 - if max_pserver_count < pserver_count: + if max_pserver_count < service_count: split_count = max_pserver_count block_size = int(math.ceil(var_numel / float(split_count))) @@ -270,6 +273,7 @@ class DistributeTranspiler: grad_var_mapping = self._append_split_op(program, grad_blocks) param_var_mapping = self._create_vars_from_blocklist(program, param_blocks) + # step3: Add gradients as send op inputs and parameters as send # op outputs. send_inputs = [] @@ -277,9 +281,11 @@ class DistributeTranspiler: for b in grad_blocks: # append by order varname, block_id, _ = b.split(":") send_inputs.append(grad_var_mapping[varname][int(block_id)]) + for b in param_blocks: varname, block_id, _ = b.split(":") send_outputs.append(param_var_mapping[varname][int(block_id)]) + # let send_op know which endpoint to send which var to, eplist has the same # order as send_inputs. eplist = split_method(send_inputs, pserver_endpoints) @@ -751,9 +757,18 @@ class DistributeTranspiler: Create vars for each split. NOTE: only grads need to be named for different trainers, use add_trainer_suffix to rename the grad vars. - :return: A dict mapping from original var name to each var split. + Args: + program (ProgramDesc): ProgramDesc which gradients blong. + block_list (list[(varname, block_id, block_size)]): List of gradient blocks. + add_trainer_suffix (Bool): Add trainer suffix to new variable's name if set True. + Returns: + var_mapping (dict(varname->[new_varname_variable])):A dict mapping + from original var name to each var split. """ + + # varname->[(block_id, current_block_size)] block_map = dict() + var_mapping = dict() for block_str in block_list: varname, offset, size = block_str.split(":") @@ -824,7 +839,16 @@ class DistributeTranspiler: persistable=persistable) def _append_split_op(self, program, gradblocks): - # Split variables that need to be split and append respective ops + """ + Split variables that need to be split and append respective ops + Args: + program (ProgramDesc): ProgramDesc that gradients blong. + gradblocks (list[(varname, block_id, block_size)]): List of gradient blocks. + Returns: + var_mapping (dict(varname->[new_splitted_variable])):A dict mapping + from original var name to each var split. + """ + add_suffix = False if self.trainer_num > 1: add_suffix = True @@ -1148,6 +1172,12 @@ class DistributeTranspiler: return lr_ops def _get_optimize_pass(self): + """ + Get optimizer operators, paramters and gradients from origin_program + Returns: + opt_ops (list): optimize operators. + params_grads (dict): paramter->gradient. + """ block = self.origin_program.global_block() opt_ops = [] params_grads = []