diff --git a/cmake/inference_lib.cmake b/cmake/inference_lib.cmake index cc758019827b9a5416a801e4da43d754d4492a73..06a7ae56827d5afe857ed0a98092210917a52430 100644 --- a/cmake/inference_lib.cmake +++ b/cmake/inference_lib.cmake @@ -70,6 +70,12 @@ copy(glog_lib DSTS ${dst_dir} ${dst_dir}/lib ) +set(dst_dir "${CMAKE_INSTALL_PREFIX}/third_party/boost/") +copy(boost_lib + SRCS ${BOOST_INCLUDE_DIR}/boost + DSTS ${dst_dir} +) + if(NOT PROTOBUF_FOUND) set(dst_dir "${CMAKE_INSTALL_PREFIX}/third_party/install/protobuf") copy(protobuf_lib diff --git a/paddle/fluid/framework/details/build_strategy.h b/paddle/fluid/framework/details/build_strategy.h new file mode 100644 index 0000000000000000000000000000000000000000..91bdfe6134ffbd1404336c9d6d1222a505084b2b --- /dev/null +++ b/paddle/fluid/framework/details/build_strategy.h @@ -0,0 +1,36 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +namespace paddle { +namespace framework { +namespace details { + +struct BuildStrategy { + enum class ReduceStrategy { kAllReduce = 0, kReduce = 1 }; + + enum class GradientScaleStrategy { + kCoeffNumDevice = 0, + kOne = 1, + kCustomized = 2, + }; + + ReduceStrategy reduce_{ReduceStrategy::kAllReduce}; + GradientScaleStrategy gradient_scale_{GradientScaleStrategy::kCoeffNumDevice}; +}; + +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/details/execution_strategy.h b/paddle/fluid/framework/details/execution_strategy.h new file mode 100644 index 0000000000000000000000000000000000000000..e8d510ec955602b5a3f73ca06caa121886eb150b --- /dev/null +++ b/paddle/fluid/framework/details/execution_strategy.h @@ -0,0 +1,29 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +namespace paddle { +namespace framework { +namespace details { + +struct ExecutionStrategy { + size_t num_threads_{0}; + bool use_event_{true}; + bool allow_op_delay_{false}; +}; + +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/details/multi_devices_graph_builder.cc b/paddle/fluid/framework/details/multi_devices_graph_builder.cc index 4755559f8d0c5b5fdeb6b56a28fff8a32ea7f82f..45bad58145a1144dfabdd3e15b38d972d57b105e 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_builder.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_builder.cc @@ -37,31 +37,26 @@ MultiDevSSAGraphBuilder::MultiDevSSAGraphBuilder( const std::string &loss_var_name, const std::unordered_set ¶ms, const std::vector &local_scopes, - platform::NCCLContextMap *nccl_ctxs, bool use_default_grad_scale, - bool balance_parameter_opt_between_cards) + platform::NCCLContextMap *nccl_ctxs, const BuildStrategy &strategy) : loss_var_name_(loss_var_name), places_(places), local_scopes_(local_scopes), nccl_ctxs_(nccl_ctxs), - balance_parameter_opt_between_cards_( - balance_parameter_opt_between_cards) { + strategy_(strategy) { #else MultiDevSSAGraphBuilder::MultiDevSSAGraphBuilder( const std::vector &places, const std::string &loss_var_name, const std::unordered_set ¶ms, - const std::vector &local_scopes, bool use_default_grad_scale, - bool balance_parameter_opt_between_cards) + const std::vector &local_scopes, const BuildStrategy &strategy) : loss_var_name_(loss_var_name), places_(places), local_scopes_(local_scopes), - balance_parameter_opt_between_cards_( - balance_parameter_opt_between_cards) { + strategy_(strategy) { #endif for (auto &p : params) { grad_names_.insert(GradVarName(p)); } - use_default_grad_scale_ = use_default_grad_scale; } void MultiDevSSAGraphBuilder::CreateOpHandleIOs(SSAGraph *result, @@ -146,7 +141,8 @@ std::unique_ptr MultiDevSSAGraphBuilder::Build( CreateComputationalOps(&result, *op, 1); } else if (IsScaleLossOp(*op)) { // user can customize loss@grad if not use_default_grad_scale_ - if (use_default_grad_scale_) { + if (strategy_.gradient_scale_ != + BuildStrategy::GradientScaleStrategy::kCustomized) { CreateScaleLossGradOp(&result); } is_forwarding = false; @@ -165,19 +161,22 @@ std::unique_ptr MultiDevSSAGraphBuilder::Build( // broadcast, and each gradient is only broadcast once. for (auto &og : op->OutputArgumentNames()) { if (IsParameterGradientOnce(og, &og_has_been_broadcast)) { - if (balance_parameter_opt_between_cards_) { - CreateReduceOp(&result, og, cur_device_id); - var_name_on_devices[cur_device_id].emplace(og); - bcast_var_name_set[cur_device_id].emplace( - og.substr(0, og.size() - strlen(kGradVarSuffix))); - cur_device_id = (cur_device_id + 1) % places_.size(); - } else { - if (IsSparseGradient(var_types, og)) { - CreateReduceOp(&result, og, 0); - CreateBroadcastOp(&result, og, 0); - } else { - InsertNCCLAllReduceOp(&result, og); - } + switch (strategy_.reduce_) { + case BuildStrategy::ReduceStrategy::kReduce: + CreateReduceOp(&result, og, cur_device_id); + var_name_on_devices[cur_device_id].emplace(og); + bcast_var_name_set[cur_device_id].emplace( + og.substr(0, og.size() - strlen(kGradVarSuffix))); + cur_device_id = (cur_device_id + 1) % places_.size(); + break; + case BuildStrategy::ReduceStrategy::kAllReduce: + if (IsSparseGradient(var_types, og)) { + CreateReduceOp(&result, og, 0); + CreateBroadcastOp(&result, og, 0); + } else { + InsertNCCLAllReduceOp(&result, og); + } + break; } } } @@ -303,7 +302,7 @@ bool MultiDevSSAGraphBuilder::IsParameterGradientOnce( int MultiDevSSAGraphBuilder::GetOpDeviceID( const std::vector> &var_name_on_devices, const OpDesc &op) const { - if (!balance_parameter_opt_between_cards_) { + if (strategy_.reduce_ != BuildStrategy::ReduceStrategy::kReduce) { return -1; } diff --git a/paddle/fluid/framework/details/multi_devices_graph_builder.h b/paddle/fluid/framework/details/multi_devices_graph_builder.h index 3a3e9e3b8538f52962e6a5ccd1a177e58d6c2f6b..4f708521884247fc013f0ae336ab683c3fe7ef2f 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_builder.h +++ b/paddle/fluid/framework/details/multi_devices_graph_builder.h @@ -17,6 +17,7 @@ #include #include +#include "paddle/fluid/framework/details/build_strategy.h" #include "paddle/fluid/framework/details/ssa_graph_builder.h" namespace paddle { @@ -36,15 +37,13 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder { const std::unordered_set ¶ms, const std::vector &local_scopes, platform::NCCLContextMap *nccl_ctxs, - bool use_default_grad_scale, - bool balance_parameter_opt_between_cards); + const BuildStrategy &strategy); #else MultiDevSSAGraphBuilder(const std::vector &places, const std::string &loss_var_name, const std::unordered_set ¶ms, const std::vector &local_scopes, - bool use_default_grad_scale, - bool balance_parameter_opt_between_cards); + const BuildStrategy &strategy); #endif std::unique_ptr Build(const ProgramDesc &program) const override; @@ -62,8 +61,6 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder { #ifdef PADDLE_WITH_CUDA platform::NCCLContextMap *nccl_ctxs_; #endif - bool balance_parameter_opt_between_cards_; - bool use_default_grad_scale_; bool IsScaleLossOp(const OpDesc &op) const; @@ -105,6 +102,9 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder { bool IsSparseGradient( const std::unordered_map &var_types, const std::string &og) const; + + private: + BuildStrategy strategy_; }; } // namespace details } // namespace framework diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc index e90523ebe8dc720d10034e3af9b0e51bb7a2fde9..ef263d82c5ec93f0673eb0ac70e4fb02904bff13 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc @@ -18,18 +18,17 @@ namespace paddle { namespace framework { namespace details { ThreadedSSAGraphExecutor::ThreadedSSAGraphExecutor( - size_t num_threads, bool use_event, - const std::vector &local_scopes, + const ExecutionStrategy &strategy, const std::vector &local_scopes, const std::vector &places, - std::unique_ptr &&graph, bool allow_op_delay) + std::unique_ptr &&graph) : SSAGraphExecutor(std::move(graph)), - pool_(num_threads >= 2 ? new ::ThreadPool(num_threads) : nullptr), + pool_(strategy.num_threads_ >= 2 ? new ::ThreadPool(strategy.num_threads_) + : nullptr), local_scopes_(local_scopes), places_(places), fetch_ctxs_(places), - use_event_(use_event), running_ops_(0), - allow_op_delay_(allow_op_delay) {} + strategy_(strategy) {} FeedFetchList ThreadedSSAGraphExecutor::Run( const std::vector &fetch_tensors) { @@ -86,7 +85,7 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( // // NOTE: DelayedOps have a lower priority. It will be scheduled after all // ready_ops have been performed. - if (ready_ops.empty() && allow_op_delay_ && running_ops_ == 0) { + if (ready_ops.empty() && strategy_.allow_op_delay_ && running_ops_ == 0) { run_all_ops(delayed_ops); } else { run_all_ops(ready_ops); @@ -113,7 +112,7 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( auto &deps = pending_ops[op]; --deps; if (deps == 0) { - if (op->IsMultiDeviceTransfer() && allow_op_delay_) { + if (op->IsMultiDeviceTransfer() && strategy_.allow_op_delay_) { delayed_ops.insert(op); } else { ready_ops.insert(op); @@ -191,7 +190,7 @@ void ThreadedSSAGraphExecutor::RunOp( auto op_run = [ready_var_q, op, this] { try { VLOG(10) << op << " " << op->Name() << " : " << op->DebugString(); - op->Run(use_event_); + op->Run(strategy_.use_event_); VLOG(10) << op << " " << op->Name() << " Done "; running_ops_--; ready_var_q->Extend(op->Outputs()); diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h index f18a88526b3238220fc56fd07299643d32c8b58b..1f7f88d75218e757e4555ad093f3cd6558f624dd 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h @@ -23,6 +23,7 @@ #include #include "ThreadPool.h" // ThreadPool in thrird party #include "paddle/fluid/framework/blocking_queue.h" +#include "paddle/fluid/framework/details/execution_strategy.h" #include "paddle/fluid/framework/details/fetch_op_handle.h" #include "paddle/fluid/framework/details/ssa_graph_executor.h" @@ -34,11 +35,10 @@ namespace details { class ThreadedSSAGraphExecutor : public SSAGraphExecutor { public: - ThreadedSSAGraphExecutor(size_t num_threads, bool use_event, + ThreadedSSAGraphExecutor(const ExecutionStrategy &strategy, const std::vector &local_scopes, const std::vector &places, - std::unique_ptr &&graph, - bool allow_op_delay); + std::unique_ptr &&graph); // Run a SSAGraph by a thread pool // Use topological sort algorithm @@ -55,10 +55,8 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor { std::vector local_scopes_; std::vector places_; platform::DeviceContextPool fetch_ctxs_; - const bool use_event_; std::unique_ptr exception_; std::atomic running_ops_; - bool allow_op_delay_; void InsertPendingOp(std::unordered_map *pending_ops, OpHandleBase *op_instance) const; @@ -74,6 +72,9 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor { std::unordered_map *pending_ops, std::unordered_set *pending_vars, BlockingQueue *ready_vars, FeedFetchList *fetch_data); + + private: + ExecutionStrategy strategy_; }; } // namespace details diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 95e807c0afa45bc4f4feb84d450b2d0584bc3b28..50c3468d556bfe05d6c41906cf35cb671f711b1e 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -52,13 +52,12 @@ std::vector &ParallelExecutor::GetLocalScopes() { } ParallelExecutor::ParallelExecutor( - size_t num_threads, bool use_event, const std::vector &places, const std::unordered_set ¶ms, const std::unordered_set &bcast_vars, const ProgramDesc &main_program, const std::string &loss_var_name, - Scope *scope, const std::vector &local_scopes, bool allow_op_delay, - bool use_default_grad_scale, bool balance_parameter_opt_between_cards, + Scope *scope, const std::vector &local_scopes, + const ExecutionStrategy &exec_strategy, const BuildStrategy &build_strategy, size_t num_trainers, size_t trainer_id) : member_(new ParallelExecutorPrivate(places)) { member_->global_scope_ = scope; @@ -100,18 +99,16 @@ ParallelExecutor::ParallelExecutor( #ifdef PADDLE_WITH_CUDA details::MultiDevSSAGraphBuilder builder( member_->places_, loss_var_name, params, member_->local_scopes_, - member_->nccl_ctxs_.get(), use_default_grad_scale, - balance_parameter_opt_between_cards); + member_->nccl_ctxs_.get(), build_strategy); #else - details::MultiDevSSAGraphBuilder builder( - member_->places_, loss_var_name, params, member_->local_scopes_, - use_default_grad_scale, balance_parameter_opt_between_cards); + details::MultiDevSSAGraphBuilder builder(member_->places_, loss_var_name, + params, member_->local_scopes_, + build_strategy); #endif auto graph = builder.Build(main_program); member_->executor_.reset(new details::ThreadedSSAGraphExecutor( - num_threads, use_event, member_->local_scopes_, places, std::move(graph), - allow_op_delay)); + exec_strategy, member_->local_scopes_, places, std::move(graph))); // Step 3. Create vars in each scope; for (auto *var : main_program.Block(0).AllVars()) { diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index 9e279876cfeef20a1921f8bd1c27046a477b9f56..5247e790649e76567f4527d54499d6e95dac5c27 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -14,57 +14,60 @@ limitations under the License. */ #pragma once +#include #include #include #include +#include "paddle/fluid/framework/details/execution_strategy.h" #include "paddle/fluid/framework/executor.h" #include "paddle/fluid/framework/op_info.h" #include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/tensor.h" #include "paddle/fluid/platform/device_context.h" - namespace paddle { namespace framework { class ParallelExecutorPrivate; +using details::BuildStrategy; +using details::ExecutionStrategy; + class ParallelExecutor { DISABLE_COPY_AND_ASSIGN(ParallelExecutor); public: - explicit ParallelExecutor(size_t num_threads, bool use_event, - const std::vector& places, - const std::unordered_set& params, - const std::unordered_set& bcast_vars, - const ProgramDesc& main_program, - const std::string& loss_var_name, Scope* scope, - const std::vector& local_scopes, - bool allow_op_delay, bool use_default_grad_scale, - bool balance_parameter_opt_between_cards, + explicit ParallelExecutor(const std::vector &places, + const std::unordered_set ¶ms, + const std::unordered_set &bcast_vars, + const ProgramDesc &main_program, + const std::string &loss_var_name, Scope *scope, + const std::vector &local_scopes, + const ExecutionStrategy &exec_strategy, + const BuildStrategy &build_strategy, size_t num_trainers = 1, size_t trainer_id = 0); ~ParallelExecutor(); - std::vector& GetLocalScopes(); + std::vector &GetLocalScopes(); /** * Feed tensors to local scopes. The size of tensors should be equal to the * size of local scopes. */ void FeedTensorsIntoLocalScopes( - const std::vector>& tensors); + const std::vector> &tensors); void FeedAndSplitTensorIntoLocalScopes( - const std::unordered_map& tensors); + const std::unordered_map &tensors); - void Run(const std::vector& fetch_tensors, - const std::string& fetched_var_name); + void Run(const std::vector &fetch_tensors, + const std::string &fetched_var_name); - void BCastParamsToGPUs(const std::unordered_set& vars) const; + void BCastParamsToGPUs(const std::unordered_set &vars) const; private: - ParallelExecutorPrivate* member_; + ParallelExecutorPrivate *member_; }; } // namespace framework diff --git a/paddle/fluid/inference/analysis/device.h b/paddle/fluid/inference/analysis/device.h index 4423af842d28566fea419b8099efc3bda33787f4..585c9923291e5f9cb6e50dbc4bcd28c374191048 100644 --- a/paddle/fluid/inference/analysis/device.h +++ b/paddle/fluid/inference/analysis/device.h @@ -11,6 +11,7 @@ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ +#pragma once namespace paddle { namespace inference { diff --git a/paddle/fluid/inference/analysis/node.h b/paddle/fluid/inference/analysis/node.h index 59ba977798481684114d1189056be00bbb7777cf..07cb7669f98237399c4165947a03c67ce2a86aa8 100644 --- a/paddle/fluid/inference/analysis/node.h +++ b/paddle/fluid/inference/analysis/node.h @@ -19,6 +19,7 @@ limitations under the License. */ */ #pragma once +#include #include #include #include diff --git a/paddle/fluid/operators/load_combine_op.cc b/paddle/fluid/operators/load_combine_op.cc index b5522dd246f250f02d69c0ba749ae6043eb810d6..0522a94195786c767194ec727d982a60451e7c62 100644 --- a/paddle/fluid/operators/load_combine_op.cc +++ b/paddle/fluid/operators/load_combine_op.cc @@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ #include - +#include "paddle/fluid/framework/data_type_transform.h" #include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/platform/device_context.h" @@ -31,6 +31,7 @@ class LoadCombineOp : public framework::OperatorBase { void RunImpl(const framework::Scope &scope, const platform::Place &place) const override { auto filename = Attr("file_path"); + auto load_as_fp16 = Attr("load_as_fp16"); std::ifstream fin(filename); PADDLE_ENFORCE(static_cast(fin), @@ -59,17 +60,25 @@ class LoadCombineOp : public framework::OperatorBase { // Get data from fin to tensor DeserializeFromStream(fin, tensor, dev_ctx); - if (platform::is_gpu_place(place)) { - // copy CPU to GPU - framework::LoDTensor cpu_tensor; - cpu_tensor.ShareDataWith(*tensor); - cpu_tensor.set_lod(tensor->lod()); - - // reset tensor + auto in_dtype = framework::ToDataType(tensor->type()); + auto out_dtype = + load_as_fp16 ? framework::proto::VarType::FP16 : in_dtype; + + if (in_dtype != out_dtype) { + // convert to float16 tensor + auto in_kernel_type = framework::OpKernelType(in_dtype, place); + auto out_kernel_type = framework::OpKernelType(out_dtype, place); + framework::LoDTensor fp16_tensor; + // copy LoD info to the new tensor + fp16_tensor.set_lod(tensor->lod()); + framework::TransDataType(in_kernel_type, out_kernel_type, *tensor, + &fp16_tensor); + + // reset output tensor out_var->Clear(); tensor = out_var->GetMutable(); - tensor->set_lod(cpu_tensor.lod()); - TensorCopy(cpu_tensor, place, dev_ctx, tensor); + tensor->set_lod(fp16_tensor.lod()); + tensor->ShareDataWith(fp16_tensor); } } } @@ -82,6 +91,13 @@ class LoadCombineOpProtoMaker : public framework::OpProtoAndCheckerMaker { "Out", "(vector) The output LoDTensors that will be read from the input file.") .AsDuplicable(); + AddAttr( + "load_as_fp16", + "(boolean, default false)" + "If true, the tensor will be first loaded and then " + "converted to float16 data type. Otherwise, the tensor will be " + "directly loaded without data type conversion.") + .SetDefault(false); AddAttr("file_path", "(string) " "LoDTensors will be loaded from \"file_path\".") diff --git a/paddle/fluid/operators/save_load_combine_op_test.cc b/paddle/fluid/operators/save_load_combine_op_test.cc index 47618c51d98eb9f58988f82c0aee0083565d81a6..4743e0d9499b111d8baa921dbb245431713fd7a8 100644 --- a/paddle/fluid/operators/save_load_combine_op_test.cc +++ b/paddle/fluid/operators/save_load_combine_op_test.cc @@ -139,8 +139,9 @@ TEST(SaveLoadCombineOp, CPU) { CheckValues(expect4, actual4, expect_lod4, actual_lod4, numel4); } -// FP16 version of SaveLoadCombineOp Test -TEST(SaveLoadCombineFP16Op, CPU) { +// FP16 version of SaveLoadCombineOp Test, only altering the saving aspect +// to save as FP16. +TEST(SaveCombineFP16Op, CPU) { paddle::framework::Scope scope; paddle::platform::CPUPlace place; @@ -169,7 +170,7 @@ TEST(SaveLoadCombineFP16Op, CPU) { 20, 50, lod4, "test_var4", place, &scope, &expect_lod4); // Set attributes - std::string filename = "check_tensor_fp16.ls"; + std::string filename = "check_tensor_fp16_save.ls"; paddle::framework::AttributeMap attrs; attrs.insert({"file_path", std::string(filename)}); attrs.insert({"save_as_fp16", true}); @@ -216,6 +217,89 @@ TEST(SaveLoadCombineFP16Op, CPU) { actual_lod4, numel4); } +// FP16 version of SaveLoadCombineOp Test, only altering the loading aspect +// to load tensors with FP16 precision. +TEST(LoadCombineFP16Op, CPU) { + paddle::framework::Scope scope; + paddle::platform::CPUPlace place; + + std::vector lod1 = {0, 1, 2, 3, 10}; + int numel1 = 100; + paddle::framework::LoD expect_lod1; + float* expect1 = CreateForSaveCombineOp( + 10, 10, lod1, "test_var1", place, &scope, &expect_lod1); + + std::vector lod2 = {0, 2, 5, 10}; + int numel2 = 200; + paddle::framework::LoD expect_lod2; + float* expect2 = CreateForSaveCombineOp( + 10, 20, lod2, "test_var2", place, &scope, &expect_lod2); + + std::vector lod3 = {0, 20}; + int numel3 = 4000; + paddle::framework::LoD expect_lod3; + float* expect3 = CreateForSaveCombineOp( + 20, 200, lod3, "test_var3", place, &scope, &expect_lod3); + + std::vector lod4 = {0, 1, 20}; + int numel4 = 1000; + paddle::framework::LoD expect_lod4; + float* expect4 = CreateForSaveCombineOp( + 20, 50, lod4, "test_var4", place, &scope, &expect_lod4); + + // Set attributes + std::string filename = "check_tensor_fp16_load.ls"; + paddle::framework::AttributeMap attrs; + attrs.insert({"file_path", std::string(filename)}); + + // Run the save_combine_op + auto save_combine_op = paddle::framework::OpRegistry::CreateOp( + "save_combine", + {{"X", {"test_var1", "test_var2", "test_var3", "test_var4"}}}, {}, attrs); + save_combine_op->Run(scope, place); + + // Set up output vars + auto load_var1 = scope.Var("out_var1"); + auto load_var2 = scope.Var("out_var2"); + auto load_var3 = scope.Var("out_var3"); + auto load_var4 = scope.Var("out_var4"); + + attrs.insert({"load_as_fp16", true}); + // Run the load_combine_op + auto load_combine_op = paddle::framework::OpRegistry::CreateOp( + "load_combine", {}, + {{"Out", {"out_var1", "out_var2", "out_var3", "out_var4"}}}, attrs); + load_combine_op->Run(scope, place); + + auto* target1 = load_var1->GetMutable(); + auto* target2 = load_var2->GetMutable(); + auto* target3 = load_var3->GetMutable(); + auto* target4 = load_var4->GetMutable(); + + paddle::framework::LoD actual_lod1, actual_lod2, actual_lod3, actual_lod4; + paddle::platform::float16* actual1 = + GetValuesAfterLoadCombineOp(target1, scope, + &actual_lod1); + paddle::platform::float16* actual2 = + GetValuesAfterLoadCombineOp(target2, scope, + &actual_lod2); + paddle::platform::float16* actual3 = + GetValuesAfterLoadCombineOp(target3, scope, + &actual_lod3); + paddle::platform::float16* actual4 = + GetValuesAfterLoadCombineOp(target4, scope, + &actual_lod4); + + CheckValues(expect1, actual1, expect_lod1, + actual_lod1, numel1); + CheckValues(expect2, actual2, expect_lod2, + actual_lod2, numel2); + CheckValues(expect3, actual3, expect_lod3, + actual_lod3, numel3); + CheckValues(expect4, actual4, expect_lod4, + actual_lod4, numel4); +} + // Test with original SaveLoadTest TEST(SaveLoadTestWithCombineOp, CPU) { paddle::framework::Scope scope; diff --git a/paddle/fluid/platform/nccl_helper.h b/paddle/fluid/platform/nccl_helper.h index e30c1a9ebf08365a9856fb32b1ce5790869e2b33..09367889a9517956ad01ad2847c31e2633cc643d 100644 --- a/paddle/fluid/platform/nccl_helper.h +++ b/paddle/fluid/platform/nccl_helper.h @@ -53,7 +53,7 @@ class NCCLGroupGuard { } inline ~NCCLGroupGuard() { - PADDLE_ENFORCE(dynload::ncclGroupEnd()); + CHECK_EQ(dynload::ncclGroupEnd(), ncclSuccess); NCCLMutex().unlock(); } }; diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index b62291a99d34457dd17bf2bcafc1fc611419f086..50a1c07251b5bc4e7cc27de63f5457d3f94daef5 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -494,23 +494,61 @@ All parameter, weight, gradient are variables in Paddle. m.def("disable_profiler", platform::DisableProfiler); m.def("reset_profiler", platform::ResetProfiler); - py::class_(m, "ParallelExecutor") - .def("__init__", - [](ParallelExecutor &self, size_t num_threads, bool use_event, - const std::vector &places, - const std::unordered_set ¶ms, - const std::unordered_set &bcast_vars, - const ProgramDesc &main_program, const std::string &loss_var_name, - Scope *scope, std::vector &local_scopes, - bool allow_op_delay, bool use_default_grad_scale, - bool balance_parameter_opt_between_cards, size_t num_trainers, - size_t trainer_id) { - new (&self) ParallelExecutor( - num_threads, use_event, places, params, bcast_vars, - main_program, loss_var_name, scope, local_scopes, - allow_op_delay, use_default_grad_scale, - balance_parameter_opt_between_cards, num_trainers, trainer_id); - }) + // -- python binds for parallel executor. + py::class_ pe(m, "ParallelExecutor"); + py::class_(pe, "ExecutionStrategy") + .def(py::init()) + .def_property( + "num_threads", + [](const ExecutionStrategy &self) { return self.num_threads_; }, + [](ExecutionStrategy &self, size_t num_threads) { + self.num_threads_ = num_threads; + }) + .def_property( + "use_event", + [](const ExecutionStrategy &self) { return self.use_event_; }, + [](ExecutionStrategy &self, bool use_event) { + self.use_event_ = use_event; + }) + .def_property( + "allow_op_delay", + [](const ExecutionStrategy &self) { return self.allow_op_delay_; }, + [](ExecutionStrategy &self, bool allow_op_delay) { + self.allow_op_delay_ = allow_op_delay; + }); + py::class_ build_strategy(pe, "BuildStrategy"); + + py::enum_(build_strategy, "ReduceStrategy") + .value("Reduce", BuildStrategy::ReduceStrategy::kReduce) + .value("AllReduce", BuildStrategy::ReduceStrategy::kAllReduce); + py::enum_(build_strategy, + "GradientScaleStrategy") + .value("CoeffNumDevice", + BuildStrategy::GradientScaleStrategy::kCoeffNumDevice) + .value("One", BuildStrategy::GradientScaleStrategy::kOne) + .value("Customized", BuildStrategy::GradientScaleStrategy::kCustomized); + + build_strategy.def(py::init()) + .def_property( + "reduce_strategy", + [](const BuildStrategy &self) { return self.reduce_; }, + [](BuildStrategy &self, BuildStrategy::ReduceStrategy strategy) { + self.reduce_ = strategy; + }) + .def_property( + "gradient_scale_strategy", + [](const BuildStrategy &self) { return self.gradient_scale_; }, + [](BuildStrategy &self, + BuildStrategy::GradientScaleStrategy strategy) { + self.gradient_scale_ = strategy; + }); + + pe.def(py::init &, + const std::unordered_set &, + const std::unordered_set &, const ProgramDesc &, + const std::string &, Scope *, std::vector &, + const ExecutionStrategy &, const BuildStrategy &, size_t, + size_t>()) .def("bcast_params", &ParallelExecutor::BCastParamsToGPUs) // NOTE: even we return a vec* to Python use reference policy. // We still cannot get local_scope from this vector, since the element diff --git a/python/paddle/fluid/__init__.py b/python/paddle/fluid/__init__.py index c8a435748dc5b51bf9e57b5b597e1422f0380e8e..67aa5ec9979dbe3fcdb037e38ad94329d294cdcc 100644 --- a/python/paddle/fluid/__init__.py +++ b/python/paddle/fluid/__init__.py @@ -44,42 +44,44 @@ import transpiler from param_attr import ParamAttr, WeightNormParamAttr from data_feeder import DataFeeder from core import LoDTensor, CPUPlace, CUDAPlace, CUDAPinnedPlace -from transpiler import DistributeTranspiler, SimpleDistributeTranspiler, InferenceTranspiler, memory_optimize, release_memory +from transpiler import DistributeTranspiler, SimpleDistributeTranspiler, \ + InferenceTranspiler, memory_optimize, release_memory from concurrency import (Go, make_channel, channel_send, channel_recv, channel_close, Select) import clip import profiler import unique_name import recordio_writer -from parallel_executor import ParallelExecutor +import parallel_executor +from parallel_executor import * Tensor = LoDTensor -__all__ = framework.__all__ + executor.__all__ + concurrency.__all__ +\ - trainer.__all__ + inferencer.__all__ + transpiler.__all__ + [ - 'io', - 'initializer', - 'layers', - 'transpiler' - 'nets', - 'optimizer', - 'learning_rate_decay', - 'backward', - 'regularizer', - 'LoDTensor', - 'CPUPlace', - 'CUDAPlace', - 'CUDAPinnedPlace', - 'Tensor', - 'ParamAttr', - 'WeightNormParamAttr', - 'DataFeeder', - 'clip', - 'profiler', - 'unique_name', - 'recordio_writer', - 'ParallelExecutor', -] +__all__ = framework.__all__ + executor.__all__ + concurrency.__all__ + \ + trainer.__all__ + inferencer.__all__ + transpiler.__all__ + \ + parallel_executor.__all__ + [ + 'io', + 'initializer', + 'layers', + 'transpiler' + 'nets', + 'optimizer', + 'learning_rate_decay', + 'backward', + 'regularizer', + 'LoDTensor', + 'CPUPlace', + 'CUDAPlace', + 'CUDAPinnedPlace', + 'Tensor', + 'ParamAttr', + 'WeightNormParamAttr', + 'DataFeeder', + 'clip', + 'profiler', + 'unique_name', + 'recordio_writer', + ] def __bootstrap__(): diff --git a/python/paddle/fluid/inferencer.py b/python/paddle/fluid/inferencer.py index 1b8b9c07622dce80245c69484bb6abf737ffca83..56c008d1af70f4b5f6169ebe5174b08fcf8bc722 100644 --- a/python/paddle/fluid/inferencer.py +++ b/python/paddle/fluid/inferencer.py @@ -13,29 +13,35 @@ # limitations under the License. import core -import framework + import executor +import framework import io +import unique_name from trainer import check_and_get_place __all__ = ['Inferencer', ] class Inferencer(object): - def __init__(self, param_path, place=None): + def __init__(self, infer_func, param_path, place=None): """ - :param param_path: the path where the inference model is saved by fluid.io.save_inference_model + :param infer_func: a function that will return predict Variable + :param param_path: the path where the inference model is saved by fluid.io.save_params :param place: place to do the inference """ self.param_path = param_path self.scope = core.Scope() + self.inference_program = framework.Program() + with framework.program_guard(self.inference_program): + with unique_name.guard(): + self.predict_var = infer_func() + self.exe = executor.Executor(check_and_get_place(place)) with executor.scope_guard(self.scope): # load params from param_path into scope - [self.inference_program, _, - self.fetch_targets] = io.load_inference_model( - executor=self.exe, dirname=param_path) + io.load_params(self.exe, param_path, self.inference_program) def infer(self, inputs, return_numpy=True): """ @@ -51,7 +57,7 @@ class Inferencer(object): with executor.scope_guard(self.scope): results = self.exe.run(self.inference_program, feed=inputs, - fetch_list=self.fetch_targets, + fetch_list=[self.predict_var], return_numpy=return_numpy) return results diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index 7358c4b60e87893b9c04e3da2221dfb69d3ba0c7..3117dfe00c7a3df1035c439dc31b81e67781d0cc 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -19,7 +19,10 @@ import executor import warnings import sys -__all__ = ['ParallelExecutor'] +__all__ = ['ParallelExecutor', 'ExecutionStrategy', 'BuildStrategy'] + +ExecutionStrategy = core.ParallelExecutor.ExecutionStrategy +BuildStrategy = core.ParallelExecutor.BuildStrategy class ParallelExecutor(object): @@ -27,13 +30,12 @@ class ParallelExecutor(object): use_cuda, loss_name=None, main_program=None, - num_threads=None, - allow_op_delay=False, share_vars_from=None, - use_default_grad_scale=True, - balance_parameter_opt_between_cards=False, + exec_strategy=None, + build_strategy=None, num_trainers=1, - trainer_id=0): + trainer_id=0, + **kwargs): """ ParallelExecutor can run program in parallel. @@ -42,21 +44,8 @@ class ParallelExecutor(object): loss_name(str, default None): The loss name must set in training. main_program(Program, default None): The program that need to run, if not provided, then default_main_program will be used. - num_threads(int, default None): How many threads are used for - training. - allow_op_delay(bool, default False): Whether to delay and buffer - some operators together for scheduling or not, which may - improve performance in some cases, default False. share_vars_from(ParallelExecutor, default None): If provied, it will share variables from the specified ParallelExecutor. - use_default_grad_scale(bool, default True): If set True, a default - scale value equal to `1./device_count` would be multiplied to - gradients of each device and scaled gradients would be - aggregated. Otherwise, a customized scale value should be fed - to the network. - balance_parameter_opt_between_cards(bool, default True): Whether - updating different gradients on different cards. Currently, it - is not recommended. num_trainers(int, default 1): If greater than 1, NCCL will be initialized with multpile rank of nodes, each node should have same number of GPUs. Distributed training will be enabled then. @@ -83,6 +72,25 @@ class ParallelExecutor(object): train_loss, = train_exe.run([loss.name], feed=feed_dict) test_loss, = test_exe.run([loss.name], feed=feed_dict) """ + if len(kwargs) != 0: + err_msg = "" + for key in kwargs: + if key in dir(ExecutionStrategy): + err_msg += \ + "Setting {0} by constructor is deprecated. Use " \ + "strategy=ExecutionStrategy(); strategy.{0}=xxx; " \ + "pe=ParallelExecutor(exec_strategy=strategy) " \ + "instead.\n ".format(key) + elif key in dir(BuildStrategy): + err_msg += \ + "Setting {0} by constructor is deprecated. Use " \ + "strategy=BuildStrategy(); See help(" \ + "paddle.fluid.ParallelExecutor.BuildStrategy) \n".format( + key) + else: + err_msg += "Setting {0} by constructor is deprecated. Use strategy.\n".format( + key) + raise ValueError(err_msg) self._places = [] self._act_places = [] @@ -100,15 +108,25 @@ class ParallelExecutor(object): self._places.append(p) assert self._places, "no place for execution" - if num_threads is None: + if exec_strategy is None: + exec_strategy = ExecutionStrategy() + if use_cuda: + exec_strategy.use_event = True + else: + exec_strategy.use_event = False + + if exec_strategy.num_threads == 0: if use_cuda: # Experiments on se-resnext shows that too many threads hurt # performance. Worth tunning for other models in the future. - num_threads = len(self._places) * 2 + exec_strategy.num_threads = len(self._places) * 2 else: - num_threads = min( + exec_strategy.num_threads = min( len(self._places) * 2, multiprocessing.cpu_count()) + if build_strategy is None: + build_strategy = BuildStrategy() + main = main_program main = main if main else framework.default_main_program() scope = executor.global_scope() @@ -127,23 +145,14 @@ class ParallelExecutor(object): ] self.executor = core.ParallelExecutor( - num_threads, - True if use_cuda else False, # use_event self._places, set([ p.name for p in main.global_block().iter_parameters() if not p.stop_gradient ]), - set(self.persistable_vars), - main.desc, - loss_name if loss_name else '', - scope, - local_scopes, - allow_op_delay, - use_default_grad_scale, - balance_parameter_opt_between_cards, - num_trainers, - trainer_id) + set(self.persistable_vars), main.desc, loss_name + if loss_name else '', scope, local_scopes, exec_strategy, + build_strategy, num_trainers, trainer_id) self.scope = scope def run(self, fetch_list, feed=None, feed_dict=None): diff --git a/python/paddle/fluid/tests/book/high-level-api/fit_a_line/test_fit_a_line.py b/python/paddle/fluid/tests/book/high-level-api/fit_a_line/test_fit_a_line.py index 8c9bbb52d769282460c571ebc51d5eff18de3114..fbcf2a282f6421a546723a1d429c59fb304a0cc2 100644 --- a/python/paddle/fluid/tests/book/high-level-api/fit_a_line/test_fit_a_line.py +++ b/python/paddle/fluid/tests/book/high-level-api/fit_a_line/test_fit_a_line.py @@ -48,12 +48,11 @@ def linear(): return avg_loss -def train(use_cuda, save_dirname): +def train(use_cuda, train_program, save_dirname): place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() trainer = fluid.Trainer( - train_func=linear, - infer_func=inference_program, + train_func=train_program, place=place, optimizer=fluid.optimizer.SGD(learning_rate=0.001)) @@ -72,11 +71,7 @@ def train(use_cuda, save_dirname): ''' if float(test_metrics[0]) < 20.0: if save_dirname is not None: - # NOT clear yet - # fluid.io.save_inference_model(save_dirname, ['x'], [y_predict]) - # trainer.save_params(save_dirname) - # https://github.com/PaddlePaddle/Paddle/pull/10445 - trainer.save_inference_model(save_dirname) + trainer.save_params(save_dirname) return trainer.train( @@ -87,12 +82,13 @@ def train(use_cuda, save_dirname): # infer -def infer(use_cuda, save_dirname=None): +def infer(use_cuda, inference_program, save_dirname=None): if save_dirname is None: return place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() - inferencer = fluid.Inferencer(param_path=save_dirname, place=place) + inferencer = fluid.Inferencer( + infer_func=inference_program, param_path=save_dirname, place=place) batch_size = 10 tensor_x = numpy.random.uniform(0, 10, [batch_size, 13]).astype("float32") @@ -108,8 +104,8 @@ def main(use_cuda): # Directory for saving the trained model save_dirname = "fit_a_line.inference.model" - train(use_cuda, save_dirname) - infer(use_cuda, save_dirname) + train(use_cuda, linear, save_dirname) + infer(use_cuda, inference_program, save_dirname) class TestFitALine(unittest.TestCase): diff --git a/python/paddle/fluid/tests/book/high-level-api/recognize_digits/test_recognize_digits_conv.py b/python/paddle/fluid/tests/book/high-level-api/recognize_digits/test_recognize_digits_conv.py index 1f91f471f22f7c8fafc80672e660ddccf6f7cc4a..420e6e6e42adc22508c414f2c2d1ba93aedd4753 100644 --- a/python/paddle/fluid/tests/book/high-level-api/recognize_digits/test_recognize_digits_conv.py +++ b/python/paddle/fluid/tests/book/high-level-api/recognize_digits/test_recognize_digits_conv.py @@ -53,48 +53,40 @@ def train_program(): predict = inference_program() cost = fluid.layers.cross_entropy(input=predict, label=label) avg_cost = fluid.layers.mean(cost) - # acc = fluid.layers.accuracy(input=predict, label=label) - # return avg_cost, acc - return avg_cost + acc = fluid.layers.accuracy(input=predict, label=label) + return [avg_cost, acc] -def train(use_cuda, save_dirname): +def train(use_cuda, train_program, save_dirname): place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() optimizer = fluid.optimizer.Adam(learning_rate=0.001) trainer = fluid.Trainer( - train_func=train_program, - infer_func=inference_program, - place=place, - optimizer=optimizer) + train_func=train_program, place=place, optimizer=optimizer) def event_handler(event): if isinstance(event, fluid.EndEpochEvent): - # if (event.epoch + 1) % 10 == 0: - # trainer.save_params(save_dirname) - trainer.save_inference_model(save_dirname) - - # TODO: Uncomment this part once we are sure that .train is working - # test_reader = paddle.batch( - # paddle.dataset.mnist.test(), batch_size=BATCH_SIZE) - # test_metrics = trainer.test(reader=test_reader) - # avg_cost_set = test_metrics[0] - # acc_set = test_metrics[1] - # - # # get test acc and loss - # acc = numpy.array(acc_set).mean() - # avg_cost = numpy.array(avg_cost_set).mean() - # - # print("avg_cost: %s" % avg_cost) - # print("acc : %s" % acc) - # - # if float(acc) > 0.2: # Smaller value to increase CI speed - # trainer.save_params(save_dirname) - # else: - # print('BatchID {0}, Test Loss {1:0.2}, Acc {2:0.2}'.format( - # event.epoch + 1, float(avg_cost), float(acc))) - # if math.isnan(float(avg_cost)): - # sys.exit("got NaN loss, training failed.") + test_reader = paddle.batch( + paddle.dataset.mnist.test(), batch_size=BATCH_SIZE) + test_metrics = trainer.test( + reader=test_reader, feed_order=['img', 'label']) + avg_cost_set = test_metrics[0] + acc_set = test_metrics[1] + + # get test acc and loss + acc = numpy.array(acc_set).mean() + avg_cost = numpy.array(avg_cost_set).mean() + + print("avg_cost: %s" % avg_cost) + print("acc : %s" % acc) + + if float(acc) > 0.2: # Smaller value to increase CI speed + trainer.save_params(save_dirname) + else: + print('BatchID {0}, Test Loss {1:0.2}, Acc {2:0.2}'.format( + event.epoch + 1, float(avg_cost), float(acc))) + if math.isnan(float(avg_cost)): + sys.exit("got NaN loss, training failed.") train_reader = paddle.batch( paddle.reader.shuffle( @@ -108,10 +100,11 @@ def train(use_cuda, save_dirname): feed_order=['img', 'label']) -def infer(use_cuda, save_dirname=None): +def infer(use_cuda, inference_program, save_dirname=None): place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() - inferencer = fluid.Inferencer(param_path=save_dirname, place=place) + inferencer = fluid.Inferencer( + infer_func=inference_program, param_path=save_dirname, place=place) batch_size = 1 tensor_img = numpy.random.uniform(-1.0, 1.0, @@ -126,8 +119,14 @@ def main(use_cuda): save_dirname = "recognize_digits_conv.inference.model" # call train() with is_local argument to run distributed train - train(use_cuda=use_cuda, save_dirname=save_dirname) - infer(use_cuda=use_cuda, save_dirname=save_dirname) + train( + use_cuda=use_cuda, + train_program=train_program, + save_dirname=save_dirname) + infer( + use_cuda=use_cuda, + inference_program=inference_program, + save_dirname=save_dirname) if __name__ == '__main__': diff --git a/python/paddle/fluid/tests/book/high-level-api/recognize_digits/test_recognize_digits_mlp.py b/python/paddle/fluid/tests/book/high-level-api/recognize_digits/test_recognize_digits_mlp.py index f072d70abdba5077bf0ac2ff6ff972e24c8226f5..9427a772f54fb58ca1f50ed792cccf5d8f9b3d84 100644 --- a/python/paddle/fluid/tests/book/high-level-api/recognize_digits/test_recognize_digits_mlp.py +++ b/python/paddle/fluid/tests/book/high-level-api/recognize_digits/test_recognize_digits_mlp.py @@ -40,47 +40,40 @@ def train_program(): predict = inference_program() cost = fluid.layers.cross_entropy(input=predict, label=label) avg_cost = fluid.layers.mean(cost) - # acc = fluid.layers.accuracy(input=predict, label=label) - # return avg_cost, acc - return avg_cost + acc = fluid.layers.accuracy(input=predict, label=label) + return [avg_cost, acc] -def train(use_cuda, save_dirname): +def train(use_cuda, train_program, save_dirname): place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() optimizer = fluid.optimizer.Adam(learning_rate=0.001) trainer = fluid.Trainer( - train_func=train_program, - infer_func=inference_program, - place=place, - optimizer=optimizer) + train_func=train_program, place=place, optimizer=optimizer) def event_handler(event): if isinstance(event, fluid.EndEpochEvent): - # if (event.epoch + 1) % 10 == 0: - trainer.save_inference_model(save_dirname) - - # TODO: Uncomment this part once we are sure that .train is working - # test_reader = paddle.batch( - # paddle.dataset.mnist.test(), batch_size=BATCH_SIZE) - # test_metrics = trainer.test(reader=test_reader) - # avg_cost_set = test_metrics[0] - # acc_set = test_metrics[1] - # - # # get test acc and loss - # acc = numpy.array(acc_set).mean() - # avg_cost = numpy.array(avg_cost_set).mean() - # - # print("avg_cost: %s" % avg_cost) - # print("acc : %s" % acc) - # - # if float(acc) > 0.2: # Smaller value to increase CI speed - # trainer.save_params(save_dirname) - # else: - # print('BatchID {0}, Test Loss {1:0.2}, Acc {2:0.2}'.format( - # event.epoch + 1, float(avg_cost), float(acc))) - # if math.isnan(float(avg_cost)): - # sys.exit("got NaN loss, training failed.") + test_reader = paddle.batch( + paddle.dataset.mnist.test(), batch_size=BATCH_SIZE) + test_metrics = trainer.test( + reader=test_reader, feed_order=['img', 'label']) + avg_cost_set = test_metrics[0] + acc_set = test_metrics[1] + + # get test acc and loss + acc = numpy.array(acc_set).mean() + avg_cost = numpy.array(avg_cost_set).mean() + + print("avg_cost: %s" % avg_cost) + print("acc : %s" % acc) + + if float(acc) > 0.2: # Smaller value to increase CI speed + trainer.save_params(save_dirname) + else: + print('BatchID {0}, Test Loss {1:0.2}, Acc {2:0.2}'.format( + event.epoch + 1, float(avg_cost), float(acc))) + if math.isnan(float(avg_cost)): + sys.exit("got NaN loss, training failed.") train_reader = paddle.batch( paddle.reader.shuffle( @@ -94,10 +87,11 @@ def train(use_cuda, save_dirname): feed_order=['img', 'label']) -def infer(use_cuda, save_dirname=None): +def infer(use_cuda, inference_program, save_dirname=None): place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() - inferencer = fluid.Inferencer(param_path=save_dirname, place=place) + inferencer = fluid.Inferencer( + infer_func=inference_program, param_path=save_dirname, place=place) batch_size = 1 tensor_img = numpy.random.uniform(-1.0, 1.0, @@ -112,8 +106,14 @@ def main(use_cuda): save_dirname = "recognize_digits_mlp.inference.model" # call train() with is_local argument to run distributed train - train(use_cuda=use_cuda, save_dirname=save_dirname) - infer(use_cuda=use_cuda, save_dirname=save_dirname) + train( + use_cuda=use_cuda, + train_program=train_program, + save_dirname=save_dirname) + infer( + use_cuda=use_cuda, + inference_program=inference_program, + save_dirname=save_dirname) if __name__ == '__main__': diff --git a/python/paddle/fluid/tests/book/high-level-api/word2vec/no_test_word2vec_new_api.py b/python/paddle/fluid/tests/book/high-level-api/word2vec/no_test_word2vec_new_api.py index 00ba4acf88b1b543c256a48709d7f514ebbe1e8c..4f861e5aaeca7ce0f73450c09f9ddc1ed7417469 100644 --- a/python/paddle/fluid/tests/book/high-level-api/word2vec/no_test_word2vec_new_api.py +++ b/python/paddle/fluid/tests/book/high-level-api/word2vec/no_test_word2vec_new_api.py @@ -90,7 +90,7 @@ def train_program(is_sparse): return avg_cost -def train(use_cuda, is_sparse, save_path): +def train(use_cuda, train_program, save_path): train_reader = paddle.batch( paddle.dataset.imikolov.train(word_dict, N), BATCH_SIZE) test_reader = paddle.batch( @@ -105,23 +105,21 @@ def train(use_cuda, is_sparse, save_path): print("loss= ", avg_cost) if avg_cost < 5.0: - trainer.save_inference_model(save_path) + trainer.save_params(save_path) return if math.isnan(avg_cost): sys.exit("got NaN loss, training failed.") trainer = fluid.Trainer( - partial(train_program, is_sparse), - partial(inference_program, is_sparse), - fluid.optimizer.SGD(learning_rate=0.001), - place=place) + train_program, fluid.optimizer.SGD(learning_rate=0.001), place=place) trainer.train( reader=train_reader, num_epochs=1, event_handler=event_handler) -def infer(use_cuda, is_sparse, save_path): +def infer(use_cuda, inference_program, save_path): place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() - inferencer = fluid.Inferencer(param_path=save_path, place=place) + inferencer = fluid.Inferencer( + infer_func=inference_program, param_path=save_path, place=place) lod = [0, 1] first_word = create_random_lodtensor(lod, place, low=0, high=dict_size - 1) @@ -144,9 +142,9 @@ def main(use_cuda, is_sparse): if use_cuda and not fluid.core.is_compiled_with_cuda(): return - save_path = "word2vec.inference.model" - train(use_cuda, is_sparse, save_path) - infer(use_cuda, is_sparse, save_path) + save_path = "word2vec.params" + train(use_cuda, partial(train_program, is_sparse), save_path) + infer(use_cuda, partial(inference_program, is_sparse), save_path) if __name__ == '__main__': diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor.py b/python/paddle/fluid/tests/unittests/test_parallel_executor.py index 926c6bc28a1bae2bee31eecec95f7951893cbf18..056f9e1781997aa1586d972874b652d5b725fe3f 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor.py @@ -232,14 +232,18 @@ class TestParallelExecutorBase(unittest.TestCase): place = fluid.CUDAPlace(0) startup_exe = fluid.Executor(place) startup_exe.run(startup) + exec_strategy = fluid.ExecutionStrategy() + exec_strategy.allow_op_delay = allow_op_delay + + build_strategy = fluid.BuildStrategy() + build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce if balance_parameter_opt_between_cards else fluid.BuildStrategy.ReduceStrategy.AllReduce if use_parallel_executor: exe = fluid.ParallelExecutor( True, loss_name=loss.name, - allow_op_delay=allow_op_delay, - balance_parameter_opt_between_cards=balance_parameter_opt_between_cards - ) + exec_strategy=exec_strategy, + build_strategy=build_strategy) else: exe = fluid.Executor(place=place) @@ -548,7 +552,7 @@ class TestTransformer(TestParallelExecutorBase): class ParallelExecutorTestingDuringTraining(unittest.TestCase): - def check_network_convergence(self, balance_parameter_opt_between_cards): + def check_network_convergence(self, build_strategy=None): main = fluid.Program() startup = fluid.Program() with fluid.program_guard(main, startup): @@ -571,15 +575,13 @@ class ParallelExecutorTestingDuringTraining(unittest.TestCase): use_cuda=True, loss_name=loss.name, main_program=main, - balance_parameter_opt_between_cards=balance_parameter_opt_between_cards - ) + build_strategy=build_strategy) test_exe = fluid.ParallelExecutor( use_cuda=True, main_program=test_program, share_vars_from=train_exe, - balance_parameter_opt_between_cards=balance_parameter_opt_between_cards - ) + build_strategy=build_strategy) for i in xrange(5): test_loss, = test_exe.run([loss.name], feed=feed_dict) @@ -594,10 +596,14 @@ class ParallelExecutorTestingDuringTraining(unittest.TestCase): str(test_loss)) def test_parallel_testing(self): - self.check_network_convergence(False) + build_strategy = fluid.BuildStrategy() + build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce + self.check_network_convergence(build_strategy) def test_parallel_testing_with_new_strategy(self): - self.check_network_convergence(True) + build_strategy = fluid.BuildStrategy() + build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce + self.check_network_convergence(build_strategy) import paddle.dataset.conll05 as conll05 @@ -617,7 +623,7 @@ embedding_name = 'emb' def db_lstm(word, predicate, ctx_n2, ctx_n1, ctx_0, ctx_p1, ctx_p2, mark, - is_sparse, balance_parameter_opt_between_cards, **ignored): + is_sparse, **ignored): # 8 features predicate_embedding = fluid.layers.embedding( input=predicate, @@ -686,9 +692,7 @@ def db_lstm(word, predicate, ctx_n2, ctx_n1, ctx_0, ctx_p1, ctx_p2, mark, class TestCRFModel(unittest.TestCase): - def check_network_convergence(self, - is_sparse, - balance_parameter_opt_between_cards=False): + def check_network_convergence(self, is_sparse, build_strategy=None): main = fluid.Program() startup = fluid.Program() with fluid.program_guard(main, startup): @@ -739,8 +743,7 @@ class TestCRFModel(unittest.TestCase): pe = fluid.ParallelExecutor( use_cuda=True, loss_name=avg_cost.name, - balance_parameter_opt_between_cards=balance_parameter_opt_between_cards - ) + build_strategy=build_strategy) feeder = fluid.DataFeeder( feed_list=[ @@ -756,19 +759,29 @@ class TestCRFModel(unittest.TestCase): pe.run(feed=feeder.feed(cur_batch), fetch_list=[avg_cost.name]))[0] - def test_update_sparse_parameter(self): - self.check_network_convergence(is_sparse=True) + def test_update_sparse_parameter_all_reduce(self): + build_strategy = fluid.BuildStrategy() + build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce + self.check_network_convergence( + is_sparse=True, build_strategy=build_strategy) - def test_update_dense_parameter(self): - self.check_network_convergence(is_sparse=False) + def test_update_dense_parameter_all_reduce(self): + build_strategy = fluid.BuildStrategy() + build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce + self.check_network_convergence( + is_sparse=False, build_strategy=build_strategy) - def test_update_sparse_parameter_with_new_strategy(self): + def test_update_sparse_parameter_reduce(self): + build_strategy = fluid.BuildStrategy() + build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce self.check_network_convergence( - is_sparse=True, balance_parameter_opt_between_cards=True) + is_sparse=True, build_strategy=build_strategy) - def test_update_dense_parameter_with_new_strategy(self): + def test_update_dense_parameter_reduce(self): + build_strategy = fluid.BuildStrategy() + build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce self.check_network_convergence( - is_sparse=False, balance_parameter_opt_between_cards=True) + is_sparse=False, build_strategy=build_strategy) # test fetch all the variables of global_block diff --git a/python/paddle/fluid/trainer.py b/python/paddle/fluid/trainer.py index 67d8be82d5fa850a526f92753a086dac0cab5e2c..c24662ac2114c286b1c50286fea1b65cf7c1b3a8 100644 --- a/python/paddle/fluid/trainer.py +++ b/python/paddle/fluid/trainer.py @@ -92,19 +92,13 @@ class Trainer(object): place: The device place of this trainer. """ - def __init__(self, - train_func, - infer_func, - optimizer, - param_path=None, - place=None): + def __init__(self, train_func, optimizer, param_path=None, place=None): # 1. we need to generate a framework.Program by calling # program_func. Reference: fluid.program_guard in # test_word2vec.py if not isinstance(optimizer, opt_module.Optimizer): raise TypeError("The optimizer should be an instance of Optimizer") - self.infer_func = infer_func self.scope = core.Scope() self.startup_program = framework.Program() @@ -178,9 +172,9 @@ class Trainer(object): def train(self, num_epochs, event_handler, - reader=None, - parallel=False, - feed_order=None): + reader, + feed_order, + parallel=False): """ Train the model. @@ -208,7 +202,7 @@ class Trainer(object): self._train_by_executor(num_epochs, event_handler, reader, feed_order) - def test(self, reader, feed_order=None): + def test(self, reader, feed_order): """ Test the model on given test data @@ -226,15 +220,6 @@ class Trainer(object): exe = executor.Executor(self.place) io.save_persistables(exe, dirname=param_path) - def save_inference_model(self, model_path): - inference_program = framework.Program() - with framework.program_guard(inference_program): - with unique_name.guard(): - predict_var = self.infer_func() - predict_var = self.train_program.block(0).var(predict_var.name) - exe = executor.Executor(self.place) - io.save_inference_model(model_path, [], [predict_var], exe) - @contextlib.contextmanager def _prog_and_scope_guard(self): with framework.program_guard( @@ -291,12 +276,7 @@ def build_feed_var_list(program, feed_order): if not isinstance(program, framework.Program): raise TypeError("The 'program' should be an object of Program") - if feed_order is None: - feed_var_list = [ - var for var in program.global_block().vars.itervalues() - if var.is_data - ] - elif isinstance(feed_order, list): + if isinstance(feed_order, list): feed_var_list = [ program.global_block().var(var_name) for var_name in feed_order ]