diff --git a/paddle/fluid/API.spec b/paddle/fluid/API.spec index 5e9901bb87c9a454a393a913b6da6e82266ee719..e3b44499258d288fe5692ca23efe1c4ec234f75c 100644 --- a/paddle/fluid/API.spec +++ b/paddle/fluid/API.spec @@ -208,6 +208,7 @@ paddle.fluid.layers.bilinear_tensor_product ArgSpec(args=['x', 'y', 'size', 'act paddle.fluid.layers.merge_selected_rows ArgSpec(args=['x', 'name'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.layers.get_tensor_from_selected_rows ArgSpec(args=['x', 'name'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.layers.lstm ArgSpec(args=['input', 'init_h', 'init_c', 'max_len', 'hidden_size', 'num_layers', 'dropout_prob', 'is_bidirec', 'is_test', 'name', 'default_initializer', 'seed'], varargs=None, keywords=None, defaults=(0.0, False, False, None, None, -1)) +paddle.fluid.layers.py_func ArgSpec(args=['func', 'x', 'out', 'backward_func', 'skip_vars_in_backward_input'], varargs=None, keywords=None, defaults=(None, None)) paddle.fluid.layers.psroi_pool ArgSpec(args=['input', 'rois', 'output_channels', 'spatial_scale', 'pooled_height', 'pooled_width', 'name'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.layers.huber_loss ArgSpec(args=['input', 'label', 'delta'], varargs=None, keywords=None, defaults=None) paddle.fluid.layers.data ArgSpec(args=['name', 'shape', 'append_batch_size', 'dtype', 'lod_level', 'type', 'stop_gradient'], varargs=None, keywords=None, defaults=(True, 'float32', 0, VarType.LOD_TENSOR, True)) @@ -350,6 +351,22 @@ paddle.fluid.contrib.QuantizeTranspiler.__init__ ArgSpec(args=['self', 'weight_b paddle.fluid.contrib.QuantizeTranspiler.convert_to_int8 ArgSpec(args=['self', 'program', 'place', 'scope'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.contrib.QuantizeTranspiler.freeze_program ArgSpec(args=['self', 'program', 'place', 'fuse_bn', 'scope'], varargs=None, keywords=None, defaults=(False, None)) paddle.fluid.contrib.QuantizeTranspiler.training_transpile ArgSpec(args=['self', 'program', 'startup_program'], varargs=None, keywords=None, defaults=(None, None)) +paddle.fluid.contrib.load_persistables_for_increment ArgSpec(args=['dirname', 'executor', 'program', 'lookup_table_var', 'lookup_table_var_path'], varargs=None, keywords=None, defaults=None) +paddle.fluid.contrib.load_persistables_for_inference ArgSpec(args=['dirname', 'executor', 'program', 'lookup_table_var_name'], varargs=None, keywords=None, defaults=None) +paddle.fluid.contrib.convert_dist_to_sparse_program ArgSpec(args=['program'], varargs=None, keywords=None, defaults=None) +paddle.fluid.contrib.HDFSClient.__init__ ArgSpec(args=['self', 'hadoop_home', 'configs'], varargs=None, keywords=None, defaults=None) +paddle.fluid.contrib.HDFSClient.delete ArgSpec(args=['self', 'hdfs_path'], varargs=None, keywords=None, defaults=None) +paddle.fluid.contrib.HDFSClient.download ArgSpec(args=['self', 'hdfs_path', 'local_path', 'overwrite', 'unzip'], varargs=None, keywords=None, defaults=(False, False)) +paddle.fluid.contrib.HDFSClient.is_dir ArgSpec(args=['self', 'hdfs_path'], varargs=None, keywords=None, defaults=(None,)) +paddle.fluid.contrib.HDFSClient.is_exist ArgSpec(args=['self', 'hdfs_path'], varargs=None, keywords=None, defaults=(None,)) +paddle.fluid.contrib.HDFSClient.ls ArgSpec(args=['self', 'hdfs_path'], varargs=None, keywords=None, defaults=None) +paddle.fluid.contrib.HDFSClient.lsr ArgSpec(args=['self', 'hdfs_path', 'only_file', 'sort'], varargs=None, keywords=None, defaults=(True, True)) +paddle.fluid.contrib.HDFSClient.make_local_dirs ArgSpec(args=['local_path'], varargs=None, keywords=None, defaults=None) +paddle.fluid.contrib.HDFSClient.makedirs ArgSpec(args=['self', 'hdfs_path'], varargs=None, keywords=None, defaults=None) +paddle.fluid.contrib.HDFSClient.rename ArgSpec(args=['self', 'hdfs_src_path', 'hdfs_dst_path', 'overwrite'], varargs=None, keywords=None, defaults=(False,)) +paddle.fluid.contrib.HDFSClient.upload ArgSpec(args=['self', 'hdfs_path', 'local_path', 'overwrite', 'retry_times'], varargs=None, keywords=None, defaults=(False, 5)) +paddle.fluid.contrib.multi_download ArgSpec(args=['client', 'hdfs_path', 'local_path', 'trainer_id', 'trainers', 'multi_processes'], varargs=None, keywords=None, defaults=(5,)) +paddle.fluid.contrib.multi_upload ArgSpec(args=['client', 'hdfs_path', 'local_path', 'multi_processes', 'overwrite', 'sync'], varargs=None, keywords=None, defaults=(5, False, True)) paddle.fluid.transpiler.DistributeTranspiler.__init__ ArgSpec(args=['self', 'config'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.transpiler.DistributeTranspiler.get_pserver_program ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None) paddle.fluid.transpiler.DistributeTranspiler.get_pserver_programs ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None) diff --git a/paddle/fluid/framework/details/build_strategy.cc b/paddle/fluid/framework/details/build_strategy.cc index 779a9ed52365e66d8141f7e3a1183ef6d7832e4b..389366a8a98c5753268718c49c62c2dffe99c32f 100644 --- a/paddle/fluid/framework/details/build_strategy.cc +++ b/paddle/fluid/framework/details/build_strategy.cc @@ -131,9 +131,7 @@ std::shared_ptr BuildStrategy::CreatePassesFromStrategy( std::unique_ptr BuildStrategy::Apply( const ProgramDesc &main_program, const std::vector &places, - const std::string &loss_var_name, - const std::unordered_set ¶m_names, - const std::vector &local_scopes, + const std::string &loss_var_name, const std::vector &local_scopes, #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) const bool use_cuda, platform::NCCLContextMap *nccl_ctxs) const { #else @@ -149,9 +147,6 @@ std::unique_ptr BuildStrategy::Apply( pass->SetNotOwned>("places", &places); pass->Erase("loss_var_name"); pass->SetNotOwned("loss_var_name", &loss_var_name); - pass->Erase("params"); - pass->SetNotOwned>("params", - ¶m_names); pass->Erase("local_scopes"); pass->SetNotOwned>("local_scopes", &local_scopes); diff --git a/paddle/fluid/framework/details/build_strategy.h b/paddle/fluid/framework/details/build_strategy.h index 29396501dc0efedd31a42b77f915fd66c9943985..11db184cb4efe349a340aceb4b7e1e3f4d4b24a5 100644 --- a/paddle/fluid/framework/details/build_strategy.h +++ b/paddle/fluid/framework/details/build_strategy.h @@ -106,16 +106,15 @@ struct BuildStrategy { // Apply the passes built by the pass_builder_. The passes will be // applied to the Program and output an ir::Graph. - std::unique_ptr Apply( - const ProgramDesc &main_program, - const std::vector &places, - const std::string &loss_var_name, - const std::unordered_set ¶m_names, - const std::vector &local_scopes, + std::unique_ptr Apply(const ProgramDesc &main_program, + const std::vector &places, + const std::string &loss_var_name, + const std::vector &local_scopes, #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) - const bool use_cuda, platform::NCCLContextMap *nccl_ctxs) const; + const bool use_cuda, + platform::NCCLContextMap *nccl_ctxs) const; #else - const bool use_cuda) const; + const bool use_cuda) const; #endif private: diff --git a/paddle/fluid/framework/details/multi_devices_graph_pass.cc b/paddle/fluid/framework/details/multi_devices_graph_pass.cc index 8af1d62dea89343ff2d41dd7c6ac837459df7685..036cef1daaae4bcd52ffcd40bc0f74ee3840f3b2 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_pass.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_pass.cc @@ -130,7 +130,6 @@ void AddOutputToLeafOps(ir::Graph *graph) { static const char kLossVarName[] = "loss_var_name"; static const char kPlaces[] = "places"; -static const char kParams[] = "params"; static const char kLocalScopes[] = "local_scopes"; static const char kStrategy[] = "strategy"; static const char kNumTrainers[] = "num_trainers"; @@ -147,9 +146,6 @@ void MultiDevSSAGraphBuilder::Init() const { nccl_ctxs_ = &Get("nccl_ctxs"); #endif - for (auto &p : Get>(kParams)) { - grad_names_.insert(GradVarName(p)); - } balance_vars_.resize(places_.size(), 0); if (strategy_.enable_data_balance_ && places_.size() == 1) { LOG(WARNING) << "It is no need to enable data balance when there is only " @@ -896,7 +892,6 @@ REGISTER_PASS(multi_devices_pass, paddle::framework::details::MultiDevSSAGraphBuilder) .RequirePassAttr(paddle::framework::details::kLossVarName) .RequirePassAttr(paddle::framework::details::kPlaces) - .RequirePassAttr(paddle::framework::details::kParams) .RequirePassAttr(paddle::framework::details::kLocalScopes) .RequirePassAttr(paddle::framework::details::kStrategy) .RequirePassAttr(paddle::framework::details::kNumTrainers); diff --git a/paddle/fluid/framework/details/multi_devices_graph_pass.h b/paddle/fluid/framework/details/multi_devices_graph_pass.h index 8e462aec7dc7ce45cad592b89de0b6edde8c9146..0556232aa4754cd123a85a4aa3dce8b3f4c57b08 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_pass.h +++ b/paddle/fluid/framework/details/multi_devices_graph_pass.h @@ -102,7 +102,6 @@ class MultiDevSSAGraphBuilder : public ir::Pass { mutable std::string loss_var_name_; mutable std::vector places_; mutable std::vector local_scopes_; - mutable std::unordered_set grad_names_; mutable BuildStrategy strategy_; mutable std::unordered_map all_vars_; diff --git a/paddle/fluid/framework/op_desc.cc b/paddle/fluid/framework/op_desc.cc index dde642764fa5dfce11edcef51ad1be11be331fbc..2fe1c94ec02e8ff0a4acb81868ba2124ea89e506 100644 --- a/paddle/fluid/framework/op_desc.cc +++ b/paddle/fluid/framework/op_desc.cc @@ -110,22 +110,125 @@ class CompileTimeInferShapeContext : public InferShapeContext { } } + std::vector GetInputVarPtrs( + const std::string &name) override { + const std::vector arg_names = Inputs(name); + std::vector res; + res.reserve(arg_names.size()); + std::transform(arg_names.begin(), arg_names.end(), std::back_inserter(res), + [this](const std::string &name) { + return block_.FindVarRecursive(name); + }); + return res; + } + + std::vector GetOutputVarPtrs( + const std::string &name) override { + const std::vector arg_names = Outputs(name); + std::vector res; + res.reserve(arg_names.size()); + std::transform(arg_names.begin(), arg_names.end(), std::back_inserter(res), + [this](const std::string &name) { + return block_.FindVarRecursive(name); + }); + return res; + } + + DDim GetInputDim(const std::string &name) const override { + const std::vector &arg_names = Inputs(name); + PADDLE_ENFORCE_EQ(arg_names.size(), 1UL, + "Input(%s) should hold one element, but now it holds %d", + name, arg_names.size()); + return this->GetDim(arg_names[0]); + } + + std::vector GetInputsDim(const std::string &name) const override { + const std::vector &arg_names = Inputs(name); + return GetDims(arg_names); + } + bool IsRuntime() const override; + std::vector GetInputsVarType( + const std::string &name) const override { + return GetVarTypes(Inputs(name)); + } + + std::vector GetOutputsVarType( + const std::string &name) const override { + return GetVarTypes(Outputs(name)); + } + + void SetOutputDim(const std::string &name, const DDim &dim) override { + auto &arg_names = Outputs(name); + PADDLE_ENFORCE_EQ(arg_names.size(), 1UL, + "Output(%s) should hold one element, but now it holds %d", + name, arg_names.size()); + SetDim(arg_names[0], dim); + } + + void SetOutputsDim(const std::string &name, + const std::vector &dims) override { + auto &names = Outputs(name); + SetDims(names, dims); + } + protected: - proto::VarType::Type GetVarType(const std::string &name) const override; + std::vector GetVarTypes( + const std::vector &names) const { + std::vector retv; + retv.resize(names.size()); + std::transform( + names.begin(), names.end(), retv.begin(), + std::bind(std::mem_fn(&CompileTimeInferShapeContext::GetVarType), this, + std::placeholders::_1)); + return retv; + } + + proto::VarType::Type GetVarType(const std::string &name) const; + + DDim GetDim(const std::string &name) const { + auto var = block_.FindVarRecursive(name); + PADDLE_ENFORCE(var != nullptr, "Cannot find variable %s", name); + DDim res; + try { + auto shape = var->GetShape(); + res = shape.empty() ? make_ddim({0UL}) : make_ddim(shape); + } catch (...) { + VLOG(5) << "GetDim of variable " << name << " error"; + std::rethrow_exception(std::current_exception()); + } + return res; + } - DDim GetDim(const std::string &name) const override; + std::vector GetDims(const std::vector &names) const { + std::vector ret; + ret.reserve(names.size()); + std::transform( + names.begin(), names.end(), std::back_inserter(ret), + [this](const std::string &name) { return this->GetDim(name); }); + return ret; + } + + void SetDim(const std::string &name, const DDim &dim); - void SetDim(const std::string &name, const DDim &dim) override; + void SetDims(const std::vector &names, + const std::vector &dims) { + size_t length = names.size(); + PADDLE_ENFORCE_EQ(length, dims.size()); + for (size_t i = 0; i < length; ++i) { + if (names[i] == framework::kEmptyVarName) { + continue; + } + SetDim(names[i], dims[i]); + } + } std::vector GetRepeatedDims(const std::string &name) const override; void SetRepeatedDims(const std::string &name, const std::vector &dims) override; - InferShapeVarPtr GetVarPtr(const std::string &name) override; - const OpDesc &op_; const BlockDesc &block_; }; @@ -644,20 +747,6 @@ const std::vector &CompileTimeInferShapeContext::Outputs( return op_.Output(name); } -DDim CompileTimeInferShapeContext::GetDim(const std::string &name) const { - auto var = block_.FindVarRecursive(name); - PADDLE_ENFORCE(var != nullptr, "Cannot find variable %s", name); - DDim res; - try { - auto shape = var->GetShape(); - res = shape.empty() ? make_ddim({0UL}) : make_ddim(shape); - } catch (...) { - VLOG(5) << "GetDim of variable " << name << " error"; - std::rethrow_exception(std::current_exception()); - } - return res; -} - std::vector CompileTimeInferShapeContext::GetRepeatedDims( const std::string &name) const { auto var = block_.FindVarRecursive(name); @@ -696,10 +785,5 @@ proto::VarType::Type CompileTimeInferShapeContext::GetVarType( return block_.FindVarRecursive(name)->GetType(); } -InferShapeVarPtr CompileTimeInferShapeContext::GetVarPtr( - const std::string &name) { - return block_.FindVarRecursive(name); -} - } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/op_desc.h b/paddle/fluid/framework/op_desc.h index e8debec7f13706b7fc5a4882d237ee2257e53b7e..d7352c5ee5a63bc8b8023e1d3459c5b9f5fab8a7 100644 --- a/paddle/fluid/framework/op_desc.h +++ b/paddle/fluid/framework/op_desc.h @@ -123,6 +123,8 @@ class OpDesc { BlockDesc *Block() { return this->block_; } + const BlockDesc *Block() const { return this->block_; } + private: template static std::vector MapKeys(const MapType &map) { diff --git a/paddle/fluid/framework/operator.cc b/paddle/fluid/framework/operator.cc index 60c2dc830d4fb51fc0da6b8a37f27b57a811dd7c..67d055f30914eb4c182d480c529f5798186601a1 100644 --- a/paddle/fluid/framework/operator.cc +++ b/paddle/fluid/framework/operator.cc @@ -142,12 +142,14 @@ RuntimeContext::RuntimeContext(const VariableNameMap& innames, const Scope& scope) { for (auto& var_name_item : innames) { std::vector& input_vars = inputs[var_name_item.first]; + input_vars.reserve(var_name_item.second.size()); for (auto& var_name : var_name_item.second) { input_vars.push_back(scope.FindVar(var_name)); } } for (auto& var_name_item : outnames) { std::vector& output_vars = outputs[var_name_item.first]; + output_vars.reserve(var_name_item.second.size()); for (auto& var_name : var_name_item.second) { output_vars.push_back(scope.FindVar(var_name)); } @@ -552,30 +554,28 @@ class RuntimeInferShapeContext : public InferShapeContext { bool HasOutput(const std::string& name) const override { // has only one output - const auto& outs = op_.Outputs(); + const auto& outs = ctx_.outputs; auto it = outs.find(name); if (it == outs.end()) { return false; } const auto& out = it->second; - if (out.size() == 0 || out[0] == kEmptyVarName) { + if (out.size() == 0) { return false; } PADDLE_ENFORCE_EQ(out.size(), 1UL, "Output %s should not have more than one outputs", name); - return scope_.FindVar(out[0]) != nullptr; + return out[0] != nullptr; } bool HasInputs(const std::string& name) const override { - if (!op_.HasInputs(name)) { - return false; - } - auto inputs = op_.Inputs(name); - if (inputs.empty()) { + const auto& ins = ctx_.inputs; + auto it = ins.find(name); + if (it == ins.end() || it->second.empty()) { return false; } - for (auto& input : inputs) { - if (scope_.FindVar(input) == nullptr) { + for (auto& input : it->second) { + if (input == nullptr) { return false; } } @@ -583,15 +583,13 @@ class RuntimeInferShapeContext : public InferShapeContext { } bool HasOutputs(const std::string& name) const override { - if (!op_.HasOutputs(name)) { - return false; - } - auto outputs = op_.Outputs(name); - if (outputs.empty()) { + const auto& outs = ctx_.outputs; + auto it = outs.find(name); + if (it == outs.end() || it->second.empty()) { return false; } - for (auto& output : outputs) { - if (scope_.FindVar(output) == nullptr) { + for (auto& output : it->second) { + if (output == nullptr) { return false; } } @@ -612,16 +610,18 @@ class RuntimeInferShapeContext : public InferShapeContext { void ShareDim(const std::string& in, const std::string& out, size_t i = 0, size_t j = 0) override { - PADDLE_ENFORCE_LT(i, Inputs(in).size()); - PADDLE_ENFORCE_LT(j, Outputs(out).size()); - const std::string& input_n = Inputs(in)[i]; - const std::string& output_n = Outputs(out)[j]; + auto in_it = ctx_.inputs.find(in); + auto out_it = ctx_.outputs.find(out); + PADDLE_ENFORCE(in_it != ctx_.inputs.end() && in_it->second.size() > i, + "Inputs %s should have %llu argument", in, i); + PADDLE_ENFORCE(out_it != ctx_.outputs.end() && out_it->second.size() > j, + "Outputs %s should have %llu argument", out, j); + + Variable* in_var = in_it->second[i]; + Variable* out_var = out_it->second[j]; - Variable* in_var = scope_.FindVar(input_n); - Variable* out_var = scope_.FindVar(output_n); PADDLE_ENFORCE(in_var->Type() == out_var->Type(), - "The type of %s and %s is not the same.", output_n, - GetDim(input_n)); + "The type of %s and %s is not the same.", in, out); if (in_var->IsType()) { auto& in_sele_rows = in_var->Get(); @@ -642,13 +642,16 @@ class RuntimeInferShapeContext : public InferShapeContext { void ShareLoD(const std::string& in, const std::string& out, size_t i = 0, size_t j = 0) const override { - const std::vector& inputs = Inputs(in); - const std::vector& outputs = Outputs(out); - PADDLE_ENFORCE_LT(i, inputs.size()); - PADDLE_ENFORCE_LT(j, outputs.size()); - Variable* in_var = scope_.FindVar(inputs.at(i)); + auto in_it = ctx_.inputs.find(in); + auto out_it = ctx_.outputs.find(out); + PADDLE_ENFORCE(in_it != ctx_.inputs.end() && in_it->second.size() > i, + "Inputs %s should have %llu argument", in, i); + PADDLE_ENFORCE(out_it != ctx_.outputs.end() && out_it->second.size() > j, + "Outputs %s should have %llu argument", out, j); + + Variable* in_var = in_it->second.at(i); if (!in_var->IsType()) return; - Variable* out_var = scope_.FindVar(outputs.at(j)); + Variable* out_var = out_it->second.at(j); PADDLE_ENFORCE(out_var->IsType(), "The %d-th output of Output(%s) must be LoDTensor.", j, out); auto in_tensor = in_var->Get(); @@ -683,9 +686,64 @@ class RuntimeInferShapeContext : public InferShapeContext { bool IsRuntime() const override { return true; } + // TODO(paddle-dev): Can this be template? + std::vector GetInputVarPtrs( + const std::string& name) override { + const std::vector& vars = InputVars(name); + std::vector res; + res.reserve(vars.size()); + res.insert(res.begin(), vars.begin(), vars.end()); + return res; + } + + std::vector GetOutputVarPtrs( + const std::string& name) override { + const std::vector& vars = OutputVars(name); + std::vector res; + res.reserve(vars.size()); + res.insert(res.begin(), vars.begin(), vars.end()); + return res; + } + + DDim GetInputDim(const std::string& name) const override { + const std::vector& vars = InputVars(name); + PADDLE_ENFORCE_EQ(vars.size(), 1UL, + "Input(%s) should hold one element, but now it holds %d", + name, vars.size()); + return this->GetDim(vars[0]); + } + + std::vector GetInputsDim(const std::string& name) const override { + const std::vector& vars = InputVars(name); + return GetDims(vars); + } + + std::vector GetInputsVarType( + const std::string& name) const override { + return GetVarTypes(InputVars(name)); + } + + std::vector GetOutputsVarType( + const std::string& name) const override { + return GetVarTypes(OutputVars(name)); + } + + void SetOutputDim(const std::string& name, const DDim& dim) override { + auto& vars = OutputVars(name); + PADDLE_ENFORCE_EQ(vars.size(), 1UL, + "Output(%s) should hold one element, but now it holds %d", + name, vars.size()); + SetDim(vars[0], dim); + } + + void SetOutputsDim(const std::string& name, + const std::vector& dims) override { + auto& vars = OutputVars(name); + SetDims(vars, dims); + } + protected: - DDim GetDim(const std::string& name) const override { - Variable* var = scope_.FindVar(name); + DDim GetDim(Variable* var) const { PADDLE_ENFORCE_NOT_NULL(var); if (var->IsType()) { return var->Get().dims(); @@ -693,25 +751,44 @@ class RuntimeInferShapeContext : public InferShapeContext { return var->Get().GetCompleteDims(); } else { PADDLE_THROW( - "Only LoDTensor/SelectedRows support 'GetDim', but Variable %s's " + "Only LoDTensor/SelectedRows support 'GetDim', but Variables " "type_id is %s.", - name, var->Type().name()); + var->Type().name()); } } + std::vector GetDims(const std::vector& vars) const { + std::vector ret; + ret.reserve(vars.size()); + std::transform(vars.begin(), vars.end(), std::back_inserter(ret), + [this](Variable* var) { return this->GetDim(var); }); + return ret; + } + std::vector GetRepeatedDims(const std::string& name) const override { PADDLE_THROW("Only compile time support this method"); } - void SetDim(const std::string& name, const DDim& dim) override { - Variable* var = scope_.FindVar(name); + void SetDim(Variable* var, const DDim& dim) { if (var->IsType()) { var->GetMutable()->Resize(dim); } else if (var->IsType()) { var->GetMutable()->set_height(dim[0]); } else { - PADDLE_THROW("Variable %s type_id %s, expect LoDTensor/SelectedRows.", - name, var->Type().name()); + PADDLE_THROW("Variable type_id %s, expect LoDTensor/SelectedRows.", + var->Type().name()); + } + } + + void SetDims(const std::vector& vars, + const std::vector& dims) { + size_t length = vars.size(); + PADDLE_ENFORCE_EQ(length, dims.size()); + for (size_t i = 0; i < length; ++i) { + if (vars[i] == nullptr) { + continue; + } + SetDim(vars[i], dims[i]); } } @@ -720,16 +797,36 @@ class RuntimeInferShapeContext : public InferShapeContext { PADDLE_THROW("Only compile time support this method"); } - proto::VarType::Type GetVarType(const std::string& name) const override { - auto* var = scope_.FindVar(name); - return ToVarType(var->Type()); + std::vector GetVarTypes( + const std::vector& vars) const { + std::vector retv; + retv.resize(vars.size()); + std::transform(vars.begin(), vars.end(), retv.begin(), + std::bind(std::mem_fn(&RuntimeInferShapeContext::GetVarType), + this, std::placeholders::_1)); + return retv; } - InferShapeVarPtr GetVarPtr(const std::string& name) override { - return scope_.FindVar(name); + proto::VarType::Type GetVarType(Variable* var) const { + return ToVarType(var->Type()); } private: + const std::vector& InputVars(const std::string& name) const { + auto it = ctx_.inputs.find(name); + PADDLE_ENFORCE(it != ctx_.inputs.end(), + "Operator %s does not have the input %s.", op_.Type(), name); + return it->second; + } + + const std::vector& OutputVars(const std::string& name) const { + auto it = ctx_.outputs.find(name); + PADDLE_ENFORCE(it != ctx_.outputs.end(), + "Operator %s does not have the outputs %s.", op_.Type(), + name); + return it->second; + } + const OperatorBase& op_; const Scope& scope_; const RuntimeContext& ctx_; @@ -860,8 +957,7 @@ Scope* OperatorWithKernel::PrepareData( for (size_t i = 0; i < var_name_item.second.size(); ++i) { auto& var_name = var_name_item.second[i]; - auto* var = scope.FindVar(var_name); - input_vars[i] = var; + auto* var = input_vars[i]; // Only tensor can be tranfer to another device. if (var == nullptr || !VarIsTensor(*var)) { diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 7e3fe02eaf5560ef03e42c6b82ed338edc30b0ab..a921f469f5e0276884fe194c99b15100a11113dc 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -190,7 +190,6 @@ std::vector &ParallelExecutor::GetLocalScopes() { ParallelExecutor::ParallelExecutor( const std::vector &places, - const std::unordered_set ¶ms, const std::unordered_set &bcast_vars, const ProgramDesc &main_program, const std::string &loss_var_name, Scope *scope, const std::vector &local_scopes, @@ -209,7 +208,7 @@ ParallelExecutor::ParallelExecutor( "the number of places must be greater than 1."); } - // Step 1. Bcast the params to devs. + // Step 1. Bcast the bcast_vars to devs. // Create local scopes if (local_scopes.empty()) { member_->own_local_scope_ = true; @@ -249,12 +248,12 @@ ParallelExecutor::ParallelExecutor( // ncclOp #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) std::unique_ptr graph = build_strategy.Apply( - main_program, member_->places_, loss_var_name, params, - member_->local_scopes_, member_->use_cuda_, member_->nccl_ctxs_.get()); + main_program, member_->places_, loss_var_name, member_->local_scopes_, + member_->use_cuda_, member_->nccl_ctxs_.get()); #else std::unique_ptr graph = build_strategy.Apply(main_program, member_->places_, loss_var_name, - params, member_->local_scopes_, member_->use_cuda_); + member_->local_scopes_, member_->use_cuda_); #endif auto max_memory_size = GetEagerDeletionThreshold(); if (max_memory_size >= 0) { diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index 1fc17a0d64d50eb70ce66cacd4752a5b96d5e894..5f6c2159aa2d90378ac298a8e56b51a188225d45 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -41,7 +41,6 @@ class ParallelExecutor { public: explicit ParallelExecutor(const std::vector &places, - const std::unordered_set ¶ms, const std::unordered_set &bcast_vars, const ProgramDesc &main_program, const std::string &loss_var_name, Scope *scope, diff --git a/paddle/fluid/framework/shape_inference.cc b/paddle/fluid/framework/shape_inference.cc index ddff2c7c261746ac9986e79cff3da7e0a9654adc..4ac872ac3d3bf918678f5294a4c35097c3fb18ab 100644 --- a/paddle/fluid/framework/shape_inference.cc +++ b/paddle/fluid/framework/shape_inference.cc @@ -22,20 +22,6 @@ limitations under the License. */ namespace paddle { namespace framework { -DDim InferShapeContext::GetInputDim(const std::string &name) const { - const std::vector &arg_names = Inputs(name); - PADDLE_ENFORCE_EQ(arg_names.size(), 1UL, - "Input(%s) should hold one element, but now it holds %d", - name, arg_names.size()); - return this->GetDim(arg_names[0]); -} - -std::vector InferShapeContext::GetInputsDim( - const std::string &name) const { - const std::vector &arg_names = Inputs(name); - return GetDims(arg_names); -} - std::vector InferShapeContext::GetReaderDims( const std::string &name) const { const std::vector &arg_names = Inputs(name); @@ -46,26 +32,6 @@ std::vector InferShapeContext::GetReaderDims( return this->GetRepeatedDims(arg_names[0]); } -DDim InferShapeContext::GetInputsElementDim(const std::string &name, - int idx) const { - const std::vector &names = Inputs(name); - return this->GetDim(names[idx]); -} - -void InferShapeContext::SetOutputDim(const std::string &name, const DDim &dim) { - auto &arg_names = Outputs(name); - PADDLE_ENFORCE_EQ(arg_names.size(), 1UL, - "Output(%s) should hold one element, but now it holds %d", - name, arg_names.size()); - SetDim(arg_names[0], dim); -} - -void InferShapeContext::SetOutputsDim(const std::string &name, - const std::vector &dims) { - auto &names = Outputs(name); - SetDims(names, dims); -} - void InferShapeContext::SetReaderDims(const std::string &name, const std::vector &dims) { const std::vector &arg_names = Outputs(name); @@ -76,69 +42,5 @@ void InferShapeContext::SetReaderDims(const std::string &name, return this->SetRepeatedDims(arg_names[0], dims); } -std::vector InferShapeContext::GetInputVarPtrs( - const std::string &name) { - const std::vector arg_names = Inputs(name); - std::vector res; - res.reserve(arg_names.size()); - std::transform( - arg_names.begin(), arg_names.end(), std::back_inserter(res), - [this](const std::string &name) { return this->GetVarPtr(name); }); - return res; -} - -std::vector InferShapeContext::GetOutputVarPtrs( - const std::string &name) { - const std::vector arg_names = Outputs(name); - std::vector res; - res.reserve(arg_names.size()); - std::transform( - arg_names.begin(), arg_names.end(), std::back_inserter(res), - [this](const std::string &name) { return this->GetVarPtr(name); }); - return res; -} - -std::vector InferShapeContext::GetDims( - const std::vector &names) const { - std::vector ret; - ret.reserve(names.size()); - std::transform( - names.begin(), names.end(), std::back_inserter(ret), - [this](const std::string &name) { return this->GetDim(name); }); - return ret; -} - -void InferShapeContext::SetDims(const std::vector &names, - const std::vector &dims) { - size_t length = names.size(); - PADDLE_ENFORCE_EQ(length, dims.size()); - for (size_t i = 0; i < length; ++i) { - if (names[i] == framework::kEmptyVarName) { - continue; - } - SetDim(names[i], dims[i]); - } -} - -std::vector InferShapeContext::GetInputsVarType( - const std::string &name) const { - return GetVarTypes(Inputs(name)); -} - -std::vector InferShapeContext::GetOutputsVarType( - const std::string &name) const { - return GetVarTypes(Outputs(name)); -} - -std::vector InferShapeContext::GetVarTypes( - const std::vector &names) const { - std::vector retv; - retv.resize(names.size()); - std::transform(names.begin(), names.end(), retv.begin(), - std::bind(std::mem_fn(&InferShapeContext::GetVarType), this, - std::placeholders::_1)); - return retv; -} - } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/shape_inference.h b/paddle/fluid/framework/shape_inference.h index d73cca121e41e68f9fb6548117ed91c5cc1415ca..e0a848273b8d6b50eb1706998e368141a0d1f7f3 100644 --- a/paddle/fluid/framework/shape_inference.h +++ b/paddle/fluid/framework/shape_inference.h @@ -25,6 +25,8 @@ limitations under the License. */ namespace paddle { namespace framework { +class OperatorBase; + using InferShapeVarPtr = boost::variant; class InferShapeContext { @@ -33,22 +35,23 @@ class InferShapeContext { virtual bool HasInput(const std::string &name) const = 0; virtual bool HasOutput(const std::string &name) const = 0; - std::vector GetInputsVarType( - const std::string &name) const; - std::vector GetOutputsVarType( - const std::string &name) const; + virtual std::vector GetInputsVarType( + const std::string &name) const = 0; + virtual std::vector GetOutputsVarType( + const std::string &name) const = 0; virtual bool HasInputs(const std::string &name) const = 0; virtual bool HasOutputs(const std::string &name) const = 0; - DDim GetInputDim(const std::string &name) const; - std::vector GetInputsDim(const std::string &name) const; - std::vector GetReaderDims(const std::string &name) const; - DDim GetInputsElementDim(const std::string &name, int idx) const; + virtual DDim GetInputDim(const std::string &name) const = 0; + virtual std::vector GetInputsDim(const std::string &name) const = 0; + virtual std::vector GetReaderDims(const std::string &name) const; - void SetOutputDim(const std::string &name, const DDim &dim); - void SetOutputsDim(const std::string &name, const std::vector &dims); - void SetReaderDims(const std::string &name, const std::vector &dims); + virtual void SetOutputDim(const std::string &name, const DDim &dim) = 0; + virtual void SetOutputsDim(const std::string &name, + const std::vector &dims) = 0; + virtual void SetReaderDims(const std::string &name, + const std::vector &dims); virtual AttrReader Attrs() const = 0; virtual const std::vector &Inputs( @@ -67,27 +70,15 @@ class InferShapeContext { virtual bool IsRuntime() const = 0; - std::vector GetInputVarPtrs(const std::string &name); - std::vector GetOutputVarPtrs(const std::string &name); - virtual InferShapeVarPtr GetVarPtr(const std::string &name) = 0; - - // Note: In while op, we need this to be public - void SetDims(const std::vector &names, - const std::vector &dims); + virtual std::vector GetInputVarPtrs( + const std::string &name) = 0; + virtual std::vector GetOutputVarPtrs( + const std::string &name) = 0; protected: - virtual DDim GetDim(const std::string &name) const = 0; - virtual void SetDim(const std::string &name, const DDim &dim) = 0; virtual std::vector GetRepeatedDims(const std::string &name) const = 0; virtual void SetRepeatedDims(const std::string &name, const std::vector &dims) = 0; - - std::vector GetDims(const std::vector &names) const; - - std::vector GetVarTypes( - const std::vector &names) const; - - virtual proto::VarType::Type GetVarType(const std::string &name) const = 0; }; } // namespace framework diff --git a/paddle/fluid/inference/tests/api/analyzer_dam_tester.cc b/paddle/fluid/inference/tests/api/analyzer_dam_tester.cc index 227e2ff45873fded45899146b97a7bee0c8ad763..12d61d06ce188a2478448373427f2defae5a2524 100644 --- a/paddle/fluid/inference/tests/api/analyzer_dam_tester.cc +++ b/paddle/fluid/inference/tests/api/analyzer_dam_tester.cc @@ -254,5 +254,16 @@ TEST(Analyzer_dam, compare) { compare(); } TEST(Analyzer_dam, compare_mkldnn) { compare(true /* use_mkldnn */); } #endif +// Compare Deterministic result +TEST(Analyzer_dam, compare_determine) { + AnalysisConfig cfg; + SetConfig(&cfg); + + std::vector> input_slots_all; + SetInput(&input_slots_all); + CompareDeterministic(reinterpret_cast(&cfg), + input_slots_all); +} + } // namespace inference } // namespace paddle diff --git a/paddle/fluid/inference/tests/api/analyzer_lac_tester.cc b/paddle/fluid/inference/tests/api/analyzer_lac_tester.cc index 310852e2f7cb284bda3041911d0059b55ee3b477..142801382b4fdeaa63f51390b63cf6db6cb8f60d 100644 --- a/paddle/fluid/inference/tests/api/analyzer_lac_tester.cc +++ b/paddle/fluid/inference/tests/api/analyzer_lac_tester.cc @@ -180,6 +180,17 @@ TEST(Analyzer_LAC, compare) { reinterpret_cast(&cfg), input_slots_all); } +// Compare Deterministic result +TEST(Analyzer_LAC, compare_determine) { + AnalysisConfig cfg; + SetConfig(&cfg); + + std::vector> input_slots_all; + SetInput(&input_slots_all); + CompareDeterministic(reinterpret_cast(&cfg), + input_slots_all); +} + } // namespace analysis } // namespace inference } // namespace paddle diff --git a/paddle/fluid/inference/tests/api/analyzer_ner_tester.cc b/paddle/fluid/inference/tests/api/analyzer_ner_tester.cc index 66d85420c5701b1bf308b6850465beb6d8a0b703..f19a2ed59ef2f666393124323ffee2f1e79ccf06 100644 --- a/paddle/fluid/inference/tests/api/analyzer_ner_tester.cc +++ b/paddle/fluid/inference/tests/api/analyzer_ner_tester.cc @@ -179,5 +179,16 @@ TEST(Analyzer_Chinese_ner, compare) { reinterpret_cast(&cfg), input_slots_all); } +// Compare Deterministic result +TEST(Analyzer_Chinese_ner, compare_determine) { + AnalysisConfig cfg; + SetConfig(&cfg); + + std::vector> input_slots_all; + SetInput(&input_slots_all); + CompareDeterministic(reinterpret_cast(&cfg), + input_slots_all); +} + } // namespace inference } // namespace paddle diff --git a/paddle/fluid/inference/tests/api/analyzer_resnet50_tester.cc b/paddle/fluid/inference/tests/api/analyzer_resnet50_tester.cc index abc63577b7913a3c9de7d6c16d8ac3e85ffd7c3c..764ae5ed8506a7ed7dc51a5c36d0dd7e9df925f3 100644 --- a/paddle/fluid/inference/tests/api/analyzer_resnet50_tester.cc +++ b/paddle/fluid/inference/tests/api/analyzer_resnet50_tester.cc @@ -85,6 +85,17 @@ TEST(Analyzer_resnet50, compare) { compare(); } TEST(Analyzer_resnet50, compare_mkldnn) { compare(true /* use_mkldnn */); } #endif +// Compare Deterministic result +TEST(Analyzer_resnet50, compare_determine) { + AnalysisConfig cfg; + SetConfig(&cfg); + + std::vector> input_slots_all; + SetInput(&input_slots_all); + CompareDeterministic(reinterpret_cast(&cfg), + input_slots_all); +} + } // namespace analysis } // namespace inference } // namespace paddle diff --git a/paddle/fluid/inference/tests/api/analyzer_rnn1_tester.cc b/paddle/fluid/inference/tests/api/analyzer_rnn1_tester.cc index 1ae2b4b03a1b2a66b3ddc8cb66d9575751a52297..17f4587a5093a2f1cd2d8acc0e17f2129ad36353 100644 --- a/paddle/fluid/inference/tests/api/analyzer_rnn1_tester.cc +++ b/paddle/fluid/inference/tests/api/analyzer_rnn1_tester.cc @@ -265,6 +265,17 @@ TEST(Analyzer_rnn1, compare) { reinterpret_cast(&cfg), input_slots_all); } +// Compare Deterministic result +TEST(Analyzer_rnn1, compare_determine) { + AnalysisConfig cfg; + SetConfig(&cfg); + + std::vector> input_slots_all; + SetInput(&input_slots_all); + CompareDeterministic(reinterpret_cast(&cfg), + input_slots_all); +} + // Test Multi-Thread. TEST(Analyzer_rnn1, multi_thread) { contrib::AnalysisConfig cfg; diff --git a/paddle/fluid/inference/tests/api/analyzer_rnn2_tester.cc b/paddle/fluid/inference/tests/api/analyzer_rnn2_tester.cc index e2985006f0ed858e778bf4737be3aaee0e056021..f8354e76871e7f489fd21f2f74e7402db01845c3 100644 --- a/paddle/fluid/inference/tests/api/analyzer_rnn2_tester.cc +++ b/paddle/fluid/inference/tests/api/analyzer_rnn2_tester.cc @@ -158,5 +158,16 @@ TEST(Analyzer_rnn2, compare) { reinterpret_cast(&cfg), input_slots_all); } +// Compare Deterministic result +TEST(Analyzer_rnn2, compare_determine) { + AnalysisConfig cfg; + SetConfig(&cfg); + + std::vector> input_slots_all; + SetInput(&input_slots_all); + CompareDeterministic(reinterpret_cast(&cfg), + input_slots_all); +} + } // namespace inference } // namespace paddle diff --git a/paddle/fluid/inference/tests/api/analyzer_seq_conv1_tester.cc b/paddle/fluid/inference/tests/api/analyzer_seq_conv1_tester.cc index 858191184a377a26042c98e17d5b8df782575efc..f5082cd60f1ae4e4eaf9dbe59a446ace900ee456 100644 --- a/paddle/fluid/inference/tests/api/analyzer_seq_conv1_tester.cc +++ b/paddle/fluid/inference/tests/api/analyzer_seq_conv1_tester.cc @@ -204,5 +204,16 @@ TEST(Analyzer_seq_conv1, compare) { reinterpret_cast(&cfg), input_slots_all); } +// Compare Deterministic result +TEST(Analyzer_seq_conv1, compare_determine) { + AnalysisConfig cfg; + SetConfig(&cfg); + + std::vector> input_slots_all; + SetInput(&input_slots_all); + CompareDeterministic(reinterpret_cast(&cfg), + input_slots_all); +} + } // namespace inference } // namespace paddle diff --git a/paddle/fluid/inference/tests/api/analyzer_text_classification_tester.cc b/paddle/fluid/inference/tests/api/analyzer_text_classification_tester.cc index 34a241f070fdc62d1b1e94835fb1dad405baafa9..79f3c81ade450fa00419b652042b2cfc79b08e4c 100644 --- a/paddle/fluid/inference/tests/api/analyzer_text_classification_tester.cc +++ b/paddle/fluid/inference/tests/api/analyzer_text_classification_tester.cc @@ -106,6 +106,17 @@ TEST(Analyzer_Text_Classification, compare) { reinterpret_cast(&cfg), input_slots_all); } +// Compare Deterministic result +TEST(Analyzer_Text_Classification, compare_determine) { + AnalysisConfig cfg; + SetConfig(&cfg); + + std::vector> input_slots_all; + SetInput(&input_slots_all); + CompareDeterministic(reinterpret_cast(&cfg), + input_slots_all); +} + TEST(Analyzer_Text_Classification, compare_against_embedding_fc_lstm_fused) { AnalysisConfig cfg; SetConfig(&cfg); diff --git a/paddle/fluid/inference/tests/api/analyzer_vis_tester.cc b/paddle/fluid/inference/tests/api/analyzer_vis_tester.cc index a8f7d5c4461964bcb18bc8df24e282ea89264aa8..d73bccefd5fc8a8ad8679b7de3feac50f786daed 100644 --- a/paddle/fluid/inference/tests/api/analyzer_vis_tester.cc +++ b/paddle/fluid/inference/tests/api/analyzer_vis_tester.cc @@ -145,6 +145,17 @@ TEST(Analyzer_vis, compare) { compare(); } TEST(Analyzer_vis, compare_mkldnn) { compare(true /* use_mkldnn */); } #endif +// Compare Deterministic result +TEST(Analyzer_vis, compare_determine) { + AnalysisConfig cfg; + SetConfig(&cfg); + + std::vector> input_slots_all; + SetInput(&input_slots_all); + CompareDeterministic(reinterpret_cast(&cfg), + input_slots_all); +} + } // namespace analysis } // namespace inference } // namespace paddle diff --git a/paddle/fluid/inference/tests/api/tester_helper.h b/paddle/fluid/inference/tests/api/tester_helper.h index b07949c196ca1d41bb33a0b0499ebb3204d1be4a..b0c8f395ce05fbfceaec3d8b69367292eca714e4 100644 --- a/paddle/fluid/inference/tests/api/tester_helper.h +++ b/paddle/fluid/inference/tests/api/tester_helper.h @@ -45,6 +45,7 @@ DEFINE_bool(use_analysis, true, "Running the inference program in analysis mode."); DEFINE_bool(record_benchmark, false, "Record benchmark after profiling the model"); +DEFINE_double(accuracy, 1e-3, "Result Accuracy."); DECLARE_bool(profile); DECLARE_int32(paddle_num_threads); @@ -85,7 +86,7 @@ void CompareResult(const std::vector &outputs, float *pdata = static_cast(out.data.data()); float *pdata_ref = static_cast(ref_out.data.data()); for (size_t j = 0; j < size; ++j) { - EXPECT_NEAR(pdata_ref[j], pdata[j], 1e-3); + EXPECT_NEAR(pdata_ref[j], pdata[j], FLAGS_accuracy); } break; } @@ -283,6 +284,26 @@ void TestPrediction(const PaddlePredictor::Config *config, } } +void CompareDeterministic( + const PaddlePredictor::Config *config, + const std::vector> &inputs) { + int batch_size = FLAGS_batch_size; + int num_times = FLAGS_repeat; + auto predictor = CreateTestPredictor(config, FLAGS_use_analysis); + + // warmup run + std::vector warmup_outputs, outputs; + predictor->Run(inputs[0], &warmup_outputs, batch_size); + + // run num_times to Compare Deterministic Result. + for (int i = 0; i < num_times; i++) { + for (size_t j = 0; j < inputs.size(); j++) { + predictor->Run(inputs[j], &outputs, batch_size); + CompareResult(outputs, warmup_outputs); + } + } +} + void CompareNativeAndAnalysis( const PaddlePredictor::Config *config, const std::vector> &inputs) { diff --git a/paddle/fluid/operators/CMakeLists.txt b/paddle/fluid/operators/CMakeLists.txt index d9b0c66e5727e80486423ab065dccf9105775127..301d038546b08e778b779edb6770e7f5642a5a18 100644 --- a/paddle/fluid/operators/CMakeLists.txt +++ b/paddle/fluid/operators/CMakeLists.txt @@ -42,8 +42,7 @@ if (WITH_DISTRIBUTE) SET(OP_PREFETCH_DEPS ${OP_PREFETCH_DEPS} parameter_prefetch) endif() -register_operators(EXCLUDES warpctc_op conv_fusion_op DEPS ${OP_HEADER_DEPS} ${OP_PREFETCH_DEPS}) - +register_operators(EXCLUDES py_func_op warpctc_op conv_fusion_op DEPS ${OP_HEADER_DEPS} ${OP_PREFETCH_DEPS}) # warpctc_op needs cudnn 7 above if (WITH_GPU AND NOT WIN32) @@ -92,4 +91,8 @@ cc_test(save_load_op_test SRCS save_load_op_test.cc DEPS save_op load_op) cc_test(save_load_combine_op_test SRCS save_load_combine_op_test.cc DEPS save_combine_op load_combine_op) nv_test(dropout_op_test SRCS dropout_op_test.cc DEPS dropout_op tensor) +if (WITH_PYTHON) + cc_library(py_func_op SRCS py_func_op.cc DEPS op_registry python pybind) +endif() + set(GLOB_OP_LIB ${OP_LIBRARY} CACHE INTERNAL "Global OP library") diff --git a/paddle/fluid/operators/controlflow/while_op.cc b/paddle/fluid/operators/controlflow/while_op.cc index e91d9ef7765568a842b31ba682dc1b7e0d8ffa08..48800947fd387bf4d84a85e82fdcd7efa3f08de5 100644 --- a/paddle/fluid/operators/controlflow/while_op.cc +++ b/paddle/fluid/operators/controlflow/while_op.cc @@ -399,26 +399,41 @@ class WhileGradOpShapeInference : public framework::InferShapeBase { ctx->HasInputs(kOutputs); ctx->HasInputs(framework::GradVarName(kOutputs)); - auto p_names = ctx->Inputs(kX); auto pg_ig_names = ctx->Outputs(kXGRAD); - auto var_types = ctx->GetInputsVarType(kX); - std::vector names_to_set; - std::vector dims_to_set; - for (size_t i = 0; i < p_names.size(); ++i) { + std::vector in_var_ptrs = + ctx->GetInputVarPtrs(kX); + std::vector out_var_ptrs = + ctx->GetOutputVarPtrs(kXGRAD); + PADDLE_ENFORCE(in_var_ptrs.size() == out_var_ptrs.size()); + + for (size_t i = 0; i < in_var_ptrs.size(); ++i) { if (pg_ig_names[i] == framework::kEmptyVarName) { continue; } - auto dims = ctx->GetInputsElementDim(kX, i); - if (var_types[i] == framework::proto::VarType::LOD_TENSOR) { - names_to_set.push_back(pg_ig_names[i]); - dims_to_set.push_back(dims); - } else if (var_types[i] == framework::proto::VarType::LOD_TENSOR_ARRAY) { - // not sure how to set the dim of LOD_TENSOR_ARRAY - names_to_set.push_back(pg_ig_names[i]); - dims_to_set.push_back(dims); + if (ctx->IsRuntime()) { + framework::Variable *in_var = + boost::get(in_var_ptrs[i]); + framework::Variable *out_var = + boost::get(out_var_ptrs[i]); + + auto type = framework::ToVarType(in_var->Type()); + if (type == framework::proto::VarType::LOD_TENSOR) { + out_var->GetMutable()->Resize( + in_var->Get().dims()); + } else if (type == framework::proto::VarType::SELECTED_ROWS) { + out_var->GetMutable()->set_height( + in_var->Get().GetCompleteDims()[0]); + } else if (type == framework::proto::VarType::LOD_TENSOR_ARRAY) { + PADDLE_THROW("WhileGradOp doesn't support type %d", + static_cast(type)); + } + } else { + framework::VarDesc *in_var = + boost::get(in_var_ptrs[i]); + boost::get(out_var_ptrs[i]) + ->SetShape(in_var->GetShape()); } } - ctx->SetDims(names_to_set, dims_to_set); } }; diff --git a/paddle/fluid/operators/conv_mkldnn_op.cc b/paddle/fluid/operators/conv_mkldnn_op.cc index 154ff2bb209bb8f932c06caa319223ccf3314767..8c116c4abfe42296b616dc536821e9be55a8be84 100644 --- a/paddle/fluid/operators/conv_mkldnn_op.cc +++ b/paddle/fluid/operators/conv_mkldnn_op.cc @@ -155,11 +155,14 @@ class ConvMKLDNNOpKernel : public paddle::framework::OpKernel { auto chosen_memory_format = platform::data_format_to_memory_format(data_format); - if (is_conv3d) { - chosen_memory_format = - platform::MKLDNNFormatForSize(src_tz.size(), chosen_memory_format); + weights_format = mkldnn::memory::format::any; + // Check the format for user's special output + if (chosen_memory_format != mkldnn::memory::format::any) { + if (is_conv3d) { + chosen_memory_format = + platform::MKLDNNFormatForSize(src_tz.size(), chosen_memory_format); + } } - weights_format = GetWeightsFormat(chosen_memory_format, g, is_conv3d); auto src_md = platform::MKLDNNMemDesc( src_tz, platform::MKLDNNGetDataType(), chosen_memory_format); @@ -435,11 +438,14 @@ class ConvMKLDNNGradOpKernel : public paddle::framework::OpKernel { auto chosen_memory_format = platform::data_format_to_memory_format(data_format); - if (is_conv3d) { - chosen_memory_format = - platform::MKLDNNFormatForSize(src_tz.size(), chosen_memory_format); + weights_format = mkldnn::memory::format::any; + // Check the format for user's special output + if (chosen_memory_format != mkldnn::memory::format::any) { + if (is_conv3d) { + chosen_memory_format = + platform::MKLDNNFormatForSize(src_tz.size(), chosen_memory_format); + } } - weights_format = GetWeightsFormat(chosen_memory_format, g, is_conv3d); auto src_md = platform::MKLDNNMemDesc( src_tz, platform::MKLDNNGetDataType(), chosen_memory_format); diff --git a/paddle/fluid/operators/merge_selected_rows_op.cc b/paddle/fluid/operators/merge_selected_rows_op.cc index 3c15c839554599104d21a5225c078d41735c4a60..50f44c7fc5ec90420d7c38f0f536ff7adb8f9ec4 100644 --- a/paddle/fluid/operators/merge_selected_rows_op.cc +++ b/paddle/fluid/operators/merge_selected_rows_op.cc @@ -26,6 +26,13 @@ class MergeSelectedRowsOp : public framework::OperatorWithKernel { "Input(X) of MergeSelectedRowsOp should not be null."); PADDLE_ENFORCE(ctx->HasOutput("Out"), "Output(Out) of MergeSelectedRowsOp should not be null."); + PADDLE_ENFORCE_EQ(ctx->GetInputsVarType("X").front(), + framework::proto::VarType::SELECTED_ROWS, + "Input X only should be SelectedRows."); + PADDLE_ENFORCE_EQ(ctx->GetOutputsVarType("Out").front(), + framework::proto::VarType::SELECTED_ROWS, + "Output Y only should be SelectedRows."); + ctx->ShareDim("X", /*->*/ "Out"); } }; @@ -43,7 +50,28 @@ class MergeSelectedRowsOpMaker : public framework::OpProtoAndCheckerMaker { R"DOC( MergeSelectedRows Operator. -MergeSelectedRows is used to merge the duplicated rows of the input. +MergeSelectedRows is used to merge the duplicated rows of the input. The +output's row has no duplicated, and it's order is incremental. + +Example: + Input: + X.rows is [0, 5, 5, 4, 19] + X.height is 20 + X.value is: + [[1, 1] + [2, 2] + [3, 3] + [4, 4] + [6, 6]] + + Output: + Out.row is [0, 4, 5, 19] + Out.height is 20 + Out.value is: + [[1, 1] + [4, 4] + [5, 5] + [6, 6]] )DOC"); } }; diff --git a/paddle/fluid/operators/py_func_op.cc b/paddle/fluid/operators/py_func_op.cc new file mode 100644 index 0000000000000000000000000000000000000000..a6b1c738af1daff5e3e4b1ac8e537de5adc93b76 --- /dev/null +++ b/paddle/fluid/operators/py_func_op.cc @@ -0,0 +1,313 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/operators/py_func_op.h" +#include +#include +#include +#include "Python.h" +#include "paddle/fluid/framework/op_registry.h" + +namespace paddle { +namespace operators { + +namespace py = ::pybind11; + +static std::vector g_py_callables; + +const char kForwardPythonCallableId[] = "forward_callable_id"; +const char kBackwardPythonCallableId[] = "backward_callable_id"; +const char kPyFuncBackwardSkipVars[] = "backward_skip_vars"; + +size_t AppendPythonCallableObjectAndReturnId(const py::object &py_obj) { + g_py_callables.emplace_back(py_obj); + return g_py_callables.size() - 1; +} + +// Return py::object* instead of py::object +// Returning py::object would cause reference count increasing +// but without GIL, reference count in Python may not be safe +static py::object *GetPythonCallableObject(size_t i) { + PADDLE_ENFORCE_LT(i, g_py_callables.size(), "Invalid python callable id"); + return &g_py_callables[i]; +} + +static std::string PythonFuncDebugString(const py::object &py_callable) { + py::gil_scoped_acquire guard; + std::string wrapper_func_str = py::str(py_callable); + auto inner_func = py_callable.attr("_func"); + std::string inner_func_str = py::str(inner_func); + return inner_func_str + " wrapped by " + wrapper_func_str; +} + +static void CallPythonFunc(py::object *callable, + const std::vector &ins, + std::vector *outs) { + py::gil_scoped_acquire guard; + py::tuple in_args(ins.size()); + for (size_t i = 0; i < ins.size(); ++i) { + in_args[i] = ins[i].IsInitialized() ? py::cast(ins[i]) : py::cast(nullptr); + } + + auto ret = (*callable)(*in_args); + auto ret_tuple = py::cast(ret); + size_t ret_num = py::len(ret_tuple); + size_t out_num = outs->size(); + if (UNLIKELY(ret_num != out_num)) { + // Python function has no return values or returns None + // In this case, ret_num = 1 && ret[0] == None && out_num should be 0 + // Otherwise, ret_num must be equal to out_num + PADDLE_ENFORCE( + ret_num == 1 && out_num == 0 && + py::cast(ret_tuple[0]) == nullptr, + "Output number not match. Expected %d, actual %d", out_num, ret_num); + } + + for (size_t i = 0; i < out_num; ++i) { + auto *out = (*outs)[i]; + if (out == nullptr) { + continue; + } + try { + auto *py_out_tensor = py::cast(ret_tuple[i]); + PADDLE_ENFORCE_NOT_NULL(py_out_tensor, + "Output tensor %d should not be nullptr", i); + out->set_lod(py_out_tensor->lod()); + out->ShareDataWith(*py_out_tensor); + } catch (py::cast_error &) { + PADDLE_THROW("The %d-th output must be LoDTensor", i); + } + } +} + +class PyFuncOpVarTypInference : public framework::VarTypeInference { + public: + void operator()(const framework::OpDesc &op, + framework::BlockDesc *block) const override { + auto &outs = op.Outputs(); + bool has_out = (outs.count("Out") > 0 && !outs.at("Out").empty()); + + auto &ins = op.Inputs(); + bool has_in = (ins.count("X") > 0 && !ins.at("X").empty()); + + /** + * X or Out can be empty, so that py_func can be more flexible + * to support Python functions with no input or no output + */ + PADDLE_ENFORCE(has_in || has_out, "Input(X) or Output(Out) must exist"); + + PADDLE_ENFORCE_GE(boost::get(op.GetAttr(kForwardPythonCallableId)), 0, + "Function id cannot be less than 0"); + + if (!has_out) return; + + /** + * Traverse all outputs, check if name of any output ends with @GRAD. + * If found, set its shape, dtype, lod_level, type to be the same as + * the corresponding forward variable + */ + const std::string kGradVarSuffix = framework::kGradVarSuffix; + auto &out_var_names = outs.at("Out"); + for (auto &out_var_name : out_var_names) { + if (out_var_name == framework::kEmptyVarName || + out_var_name.size() < kGradVarSuffix.size()) { + continue; + } + + size_t len = out_var_name.size() - kGradVarSuffix.size(); + if (out_var_name.substr(len) == kGradVarSuffix) { + auto fwd_var_name = out_var_name.substr(0, len); + auto *out_var_desc = block->FindVarRecursive(out_var_name); + auto *fwd_var_desc = block->FindVarRecursive(fwd_var_name); + PADDLE_ENFORCE_NOT_NULL(out_var_desc, "Backward variable %s not found", + out_var_name); + PADDLE_ENFORCE_NOT_NULL(fwd_var_desc, "Forward variable %s not found", + fwd_var_name); + VLOG(10) << "Infer var_desc of Output(" << out_var_name << ") as Input(" + << fwd_var_name << ")"; + out_var_desc->SetShape(fwd_var_desc->GetShape()); + out_var_desc->SetDataType(fwd_var_desc->GetDataType()); + out_var_desc->SetLoDLevel(fwd_var_desc->GetLoDLevel()); + out_var_desc->SetType(fwd_var_desc->GetType()); + } + } + } +}; + +class PyFuncOpShapeInference : public framework::InferShapeBase { + public: + void operator()(framework::InferShapeContext *ctx) const override { + PADDLE_ENFORCE(!ctx->IsRuntime(), + "Infer shape cannot be called in runtime."); + } +}; + +class PyFuncOpMaker : public framework::OpProtoAndCheckerMaker { + public: + void Make() override { + AddInput("X", "Inputs of py_func op.").AsDuplicable(); + AddOutput("Out", "Outputs of py_func op").AsDuplicable(); + AddAttr(kForwardPythonCallableId, + "Index of registered forward Python function.") + .SetDefault(0); + AddAttr(kBackwardPythonCallableId, + "Index of registered backward Python function.") + .SetDefault(-1); + AddAttr>(kPyFuncBackwardSkipVars, + "Unused forward in/out in backward op") + .SetDefault(std::vector()); + AddComment(R"DOC("PyFunc Op")DOC"); + } +}; + +/** + * There are several benefits when backward op of py_func op is + * still py_func op. + * + * - Less codes are needed, since codes of backward is almost + * the same as forward. + * + * - To support high order derivative, so that py_func is + * infinite-order differentiable + */ +class PyFuncOpGradDescMaker : public framework::GradOpDescMakerBase { + private: + static std::string DebugString(const std::vector &strs) { + if (strs.empty()) return ""; + std::string ret = strs[0]; + for (size_t i = 1; i < strs.size(); ++i) { + ret += " "; + ret += strs[i]; + } + return ret; + } + + public: + using framework::GradOpDescMakerBase::GradOpDescMakerBase; + + std::vector> operator()() const override { + auto &fwd_attrs = Attrs(); + // no backward op when backward_id is less than 0 + if (boost::get(fwd_attrs.at(kBackwardPythonCallableId)) < 0) { + return {}; + } + + std::unique_ptr grad_op(new framework::OpDesc()); + grad_op->SetType("py_func"); + + framework::AttributeMap bwd_attrs; + bwd_attrs[kForwardPythonCallableId] = + fwd_attrs.at(kBackwardPythonCallableId); + bwd_attrs[kBackwardPythonCallableId] = -1; + grad_op->SetAttrMap(bwd_attrs); + + // All forward inputs + auto fwd_ins = Input("X"); + // All forward outputs + auto fwd_outs = Output("Out"); + + // For memory reused, some inputs/output in forward part may be not needed + // in backward part. Skipping these vars helps to save memory + auto &backward_skip_var_list = boost::get>( + fwd_attrs.at(kPyFuncBackwardSkipVars)); + std::unordered_set backward_skip_var_set( + backward_skip_var_list.begin(), backward_skip_var_list.end()); + std::vector bwd_ins; + bwd_ins.reserve(fwd_ins.size() + fwd_outs.size()); + for (auto &fwd_in : fwd_ins) { + if (backward_skip_var_set.count(fwd_in) == 0) { + bwd_ins.emplace_back(fwd_in); + } + } + + for (auto &fwd_out : fwd_outs) { + if (backward_skip_var_set.count(fwd_out) == 0) { + bwd_ins.emplace_back(fwd_out); + } + } + + // Backward OG cannot be skipped + // But in Python side, if OG is kEmptyVarName, input tensor would be None + auto fwd_out_grads = OutputGrad("Out"); + bwd_ins.reserve(bwd_ins.size() + fwd_out_grads.size()); + bwd_ins.insert(bwd_ins.end(), fwd_out_grads.begin(), fwd_out_grads.end()); + + // Backward IG cannot be skipped + // But in Python side, if IG is not needed, users can just return None + auto bwd_outs = InputGrad("X", false); + + VLOG(10) << "PyFunc Grad Input: " << DebugString(bwd_ins); + VLOG(10) << "PyFunc Grad Output: " << DebugString(bwd_outs); + + grad_op->SetInput("X", bwd_ins); + grad_op->SetOutput("Out", bwd_outs); + + std::vector> ret(1); + ret[0] = std::move(grad_op); + return ret; + } +}; + +class PyFuncOp : public framework::OperatorBase { + public: + using framework::OperatorBase::OperatorBase; + + protected: + void RunImpl(const framework::Scope &scope, + const platform::Place &place) const override { + auto &in_arg_names = Inputs("X"); + auto &out_arg_names = Outputs("Out"); + + std::vector inputs(in_arg_names.size()); + for (size_t i = 0; i < in_arg_names.size(); ++i) { + auto in_var = scope.FindVar(in_arg_names[i]); + // When py_func op is called in backward, in_var may be null + if (in_var == nullptr) { + continue; + } + auto &in_tensor = in_var->Get(); + if (!in_tensor.IsInitialized()) { + continue; + } + if (platform::is_gpu_place(in_tensor.place())) { + framework::TensorCopySync(in_tensor, platform::CPUPlace(), &inputs[i]); + } else { + inputs[i].ShareDataWith(in_tensor); + } + inputs[i].set_lod(in_tensor.lod()); + } + + std::vector outputs(out_arg_names.size()); + for (size_t i = 0; i < out_arg_names.size(); ++i) { + auto *out_var = scope.FindVar(out_arg_names[i]); + outputs[i] = + out_var ? out_var->GetMutable() : nullptr; + } + + auto callable_id = static_cast(Attr(kForwardPythonCallableId)); + auto *py_callable = GetPythonCallableObject(callable_id); + VLOG(10) << "Call Python function with id " << callable_id << ": " + << PythonFuncDebugString(*py_callable); + CallPythonFunc(py_callable, inputs, &outputs); + } +}; + +} // namespace operators +} // namespace paddle + +namespace ops = paddle::operators; + +REGISTER_OPERATOR(py_func, ops::PyFuncOp, ops::PyFuncOpMaker, + ops::PyFuncOpVarTypInference, ops::PyFuncOpShapeInference, + ops::PyFuncOpGradDescMaker); diff --git a/paddle/fluid/operators/py_func_op.h b/paddle/fluid/operators/py_func_op.h new file mode 100644 index 0000000000000000000000000000000000000000..4ba06bf59857d8cc4f1c56a52627d8e768ccbf7a --- /dev/null +++ b/paddle/fluid/operators/py_func_op.h @@ -0,0 +1,25 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "pybind11/pybind11.h" + +namespace paddle { +namespace operators { + +size_t AppendPythonCallableObjectAndReturnId(const ::pybind11::object &py_obj); + +} // namespace operators +} // namespace paddle diff --git a/paddle/fluid/operators/transpose_mkldnn_op.cc b/paddle/fluid/operators/transpose_mkldnn_op.cc index 37f1cadc7d2ff248e8b6dcb3f0c8ba09f8ccd8b5..2f133c9e251388e9e78a6a49ca66a45a56eef76e 100644 --- a/paddle/fluid/operators/transpose_mkldnn_op.cc +++ b/paddle/fluid/operators/transpose_mkldnn_op.cc @@ -32,7 +32,7 @@ class TransposeMKLDNNOpKernel : public paddle::framework::OpKernel { const bool is_test = ctx.Attr("is_test"); PADDLE_ENFORCE( is_test == true, - "ConvTransposeMKLDNN works only for inference!. Set is_test = True"); + "TransposeMKLDNN works only for inference!. Set is_test = True"); auto& dev_ctx = ctx.template device_context(); const auto& mkldnn_engine = dev_ctx.GetEngine(); @@ -47,69 +47,24 @@ class TransposeMKLDNNOpKernel : public paddle::framework::OpKernel { return; } - std::vector nchw_axis(ndims, 0); - for (size_t i = 0; i < nchw_axis.size(); ++i) { - nchw_axis[i] = i; - } - std::vector nchw_tz = paddle::framework::vectorize2int(input->dims()); - std::string data_format = ctx.Attr("data_format"); - - auto src_md = - input->format() != mkldnn::memory::format::nchw - ? platform::MKLDNNMemDesc(nchw_tz, platform::MKLDNNGetDataType(), - input->format()) - : Axis2MemoryDesc(nchw_tz, nchw_axis); - - this->TransposeKernel(ctx.GetPlace(), Axis2MemoryDesc(nchw_tz, axis), - src_md, output, input_data, nchw_tz, mkldnn_engine); - } - - protected: - mkldnn::memory::desc Axis2MemoryDesc(std::vector& nchw_tz, - std::vector& axis) const { - mkldnn_memory_desc_t mem_fmt; - - mem_fmt.primitive_kind = mkldnn_memory; - mem_fmt.ndims = axis.size(); - for (unsigned int i = 0; i < nchw_tz.size(); ++i) { - mem_fmt.dims[i] = nchw_tz[i]; // logical dimensions (nchw format, - // regardless physical layout) - } - mem_fmt.data_type = mkldnn_f32; - mem_fmt.format = mkldnn_blocked; - - unsigned int total_stride = 1; - for (int i = nchw_tz.size() - 1; i >= 0; --i) { - mem_fmt.layout_desc.blocking.padding_dims[i] = - nchw_tz[i]; // logical dimensions (nchw format, regardless physical - // layout) - mem_fmt.layout_desc.blocking.block_dims[i] = 1; - mem_fmt.layout_desc.blocking.offset_padding_to_data[i] = 0; // no offset - mem_fmt.layout_desc.blocking.strides[0][axis[i]] = total_stride; - mem_fmt.layout_desc.blocking.strides[1][axis[i]] = 1; - total_stride *= nchw_tz[axis[i]]; - } - mem_fmt.layout_desc.blocking.offset_padding = 0; // no initial offset - return mem_fmt; - } - void TransposeKernel(platform::Place place, mkldnn::memory::desc md_o, - mkldnn::memory::desc md_i, Tensor* output, - const T* data_i, std::vector& nchw_dims, - const mkldnn::engine& eng) const { - // Make Memory primitive descriptors - auto mpd_o = mkldnn::memory::primitive_desc(md_o, eng); - auto mpd_i = mkldnn::memory::primitive_desc(md_i, eng); + const std::string key = platform::TransposeMKLDNNHandler::GetHash( + nchw_tz, axis, ctx.op().Output("Out")); - auto data_o = output->mutable_data( - place, paddle::memory::Allocator::kDefault, mpd_o.get_size()); + platform::TransposeMKLDNNHandler handler(nchw_tz, axis, dev_ctx, + mkldnn_engine, key); - auto src = mkldnn::memory(mpd_i, (T*)(data_i)); - auto dst = mkldnn::memory(mpd_o, data_o); + auto transpose_src_memory_p = handler.AcquireSrcMemory( + input->format(), platform::to_void_cast(input_data)); + auto transpose_dst_memory_p = + handler.AcquireDstMemory(output, ctx.GetPlace()); + auto transpose_p = handler.AcquireTranspose(transpose_dst_memory_p, + transpose_src_memory_p); - auto r = mkldnn::reorder(src, dst); - mkldnn::stream(mkldnn::stream::kind::eager).submit({r}).wait(); + std::vector pipeline; + pipeline.push_back(*transpose_p); + mkldnn::stream(mkldnn::stream::kind::eager).submit(pipeline).wait(); } }; diff --git a/paddle/fluid/platform/mkldnn_reuse.h b/paddle/fluid/platform/mkldnn_reuse.h index 1c6421f3fa6ffbe7d3c682611def9e87d2fae5b0..23f00406de6190dfef91e259d6af358b5dac1713 100644 --- a/paddle/fluid/platform/mkldnn_reuse.h +++ b/paddle/fluid/platform/mkldnn_reuse.h @@ -197,6 +197,130 @@ class MKLDNNHandler { bool is_reusing_; }; +class TransposeMKLDNNHandler : public MKLDNNHandler { + public: + TransposeMKLDNNHandler(std::vector& dims, std::vector& axis, + const platform::MKLDNNDeviceContext& dev_ctx, + mkldnn::engine engine, const std::string& base_key) + : platform::MKLDNNHandler(dev_ctx, engine, base_key), + dims_(dims), + axis_(axis), + logical_axis_(dims.size(), 0) {} + + std::shared_ptr AcquireSrcMemory( + const mkldnn::memory::format& fmt, void* ptr) { + auto local_key = key_ + "@user_src_mem_p"; + auto mem_p = + std::static_pointer_cast(dev_ctx_.GetBlob(local_key)); + PADDLE_ENFORCE((mem_p != nullptr) || (is_reusing_ == false), + " find mem primitive in device context"); + if (mem_p == nullptr) { + // Make memory descriptor using input format, unless it + // cannot be trusted (nchw) then make up memory fmt manually + for (size_t i = 0; i < logical_axis_.size(); ++i) { + logical_axis_[i] = i; + } + auto src_md = fmt != mkldnn::memory::format::nchw + ? platform::MKLDNNMemDesc( + dims_, platform::MKLDNNGetDataType(), fmt) + : Axis2MemoryDesc(dims_, logical_axis_); + mem_p = std::make_shared( + mkldnn::memory::primitive_desc{src_md, engine_}, ptr); + dev_ctx_.SetBlob(local_key, mem_p); + } else { + mem_p->set_data_handle(ptr); + // Mark that reusing happenned. All primitives from operator instance + // should be reused or none of them. So we check consistency + is_reusing_ = true; + } + return mem_p; + } + + std::shared_ptr AcquireDstMemory(framework::Tensor* output, + platform::Place place) { + auto local_key = key_ + "@user_dst_mem_p"; + auto mem_p = + std::static_pointer_cast(dev_ctx_.GetBlob(local_key)); + PADDLE_ENFORCE((mem_p != nullptr) || (is_reusing_ == false), + " find mem primitive in device context"); + if (mem_p == nullptr) { + auto dst_mdp = mkldnn::memory::primitive_desc{ + Axis2MemoryDesc(dims_, axis_), engine_}; + + auto dst_data = output->mutable_data( + place, paddle::memory::Allocator::kDefault, dst_mdp.get_size()); + + mem_p = std::make_shared(dst_mdp, dst_data); + dev_ctx_.SetBlob(local_key, mem_p); + } else { + auto dst_data = output->mutable_data(place); + mem_p->set_data_handle(dst_data); + // Mark that reusing happenned. All primitives from operator instance + // should be reused or none of them. So we check consistency + is_reusing_ = true; + } + return mem_p; + } + + std::shared_ptr AcquireTranspose( + std::shared_ptr dst_memory_p, + std::shared_ptr src_memory_p) { + auto prim_key = key_ + "@transpose_p"; + auto transpose_p = + std::static_pointer_cast(dev_ctx_.GetBlob(prim_key)); + PADDLE_ENFORCE((transpose_p != nullptr) || (is_reusing_ == false), + "Fail to find convolution primitive in device context"); + if (transpose_p == nullptr) { + transpose_p = + std::make_shared(*(src_memory_p), *(dst_memory_p)); + dev_ctx_.SetBlob(prim_key, transpose_p); + } else { + is_reusing_ = true; + } + return transpose_p; + } + + static std::string GetHash(std::vector& shape, // NOLINT + std::vector& axis, // NOLINT + const std::string& suffix) { + return dims2str(shape) + dims2str(axis) + suffix; + } + + protected: + mkldnn_memory_desc_t Axis2MemoryDesc(std::vector& nchw_tz, + std::vector& axis) { + mkldnn_memory_desc_t mem_fmt; + + mem_fmt.primitive_kind = mkldnn_memory; + mem_fmt.ndims = axis.size(); + for (unsigned int i = 0; i < nchw_tz.size(); ++i) { + mem_fmt.dims[i] = nchw_tz[i]; // logical dimensions (nchw format, + // regardless physical layout) + } + mem_fmt.data_type = mkldnn_f32; + mem_fmt.format = mkldnn_blocked; + + unsigned int total_stride = 1; + for (int i = nchw_tz.size() - 1; i >= 0; --i) { + mem_fmt.layout_desc.blocking.padding_dims[i] = + nchw_tz[i]; // logical dimensions (nchw format, regardless physical + // layout) + mem_fmt.layout_desc.blocking.block_dims[i] = 1; + mem_fmt.layout_desc.blocking.offset_padding_to_data[i] = 0; // no offset + mem_fmt.layout_desc.blocking.strides[0][axis[i]] = total_stride; + mem_fmt.layout_desc.blocking.strides[1][axis[i]] = 1; + total_stride *= nchw_tz[axis[i]]; + } + mem_fmt.layout_desc.blocking.offset_padding = 0; // no initial offset + return mem_fmt; + } + + private: + std::vector dims_; + std::vector axis_; + std::vector logical_axis_; +}; + template class ConvMKLDNNTemplateHandler : public MKLDNNHandler { public: diff --git a/paddle/fluid/pybind/CMakeLists.txt b/paddle/fluid/pybind/CMakeLists.txt index c79d5d9403db613a8cdda59b9874a8b886458357..fb8bcb190bda59e23d118547f451be46c963cce9 100644 --- a/paddle/fluid/pybind/CMakeLists.txt +++ b/paddle/fluid/pybind/CMakeLists.txt @@ -1,5 +1,8 @@ set(PYBIND_DEPS pybind python proto_desc memory executor async_executor prune feed_fetch_method pass_builder parallel_executor profiler layer) +if(WITH_PYTHON) + list(APPEND PYBIND_DEPS py_func_op) +endif() set(PYBIND_SRCS pybind.cc exception.cc protobuf.cc const_value.cc recordio.cc async_executor_py.cc imperative.cc) if(WITH_PYTHON) diff --git a/paddle/fluid/pybind/protobuf.cc b/paddle/fluid/pybind/protobuf.cc index ac406b27b5c77d1d919713bafd24fd8b1e3580f1..4b218fb3a2af0933ea1e87abe20e7e031c32f721 100644 --- a/paddle/fluid/pybind/protobuf.cc +++ b/paddle/fluid/pybind/protobuf.cc @@ -328,7 +328,7 @@ void BindOpDesc(pybind11::module *m) { .def("infer_var_type", &pd::OpDesc::InferVarType) .def("set_is_target", &pd::OpDesc::SetIsTarget) .def("serialize_to_string", SerializeMessage) - .def("block", &pd::OpDesc::Block, + .def("block", [](pd::OpDesc &self) { return self.Block(); }, pybind11::return_value_policy::reference); } diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 5e3c663e1f47f6bba4522b65c14546222eb1db72..c108c8275669feeb4c313dc57107adfc632ce61d 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -37,6 +37,7 @@ limitations under the License. */ #include "paddle/fluid/imperative/layer.h" #include "paddle/fluid/memory/allocation/allocator_strategy.h" #include "paddle/fluid/operators/activation_op.h" +#include "paddle/fluid/operators/py_func_op.h" #include "paddle/fluid/operators/reader/lod_tensor_blocking_queue.h" #include "paddle/fluid/platform/cpu_info.h" #include "paddle/fluid/platform/enforce.h" @@ -110,6 +111,12 @@ PYBIND11_MODULE(core, m) { BindException(&m); + m.def( + "_append_python_callable_object_and_return_id", + [](py::object py_obj) -> size_t { + return paddle::operators::AppendPythonCallableObjectAndReturnId(py_obj); + }); + py::class_(m, "VarBase", R"DOC()DOC") .def(py::init<>()) .def("_run_backward", @@ -977,7 +984,6 @@ All parameter, weight, gradient are variables in Paddle. cannot be updated after being finalized.)DOC"); pe.def(py::init &, - const std::unordered_set &, const std::unordered_set &, const ProgramDesc &, const std::string &, Scope *, std::vector &, const ExecutionStrategy &, const BuildStrategy &, size_t, diff --git a/python/paddle/fluid/contrib/__init__.py b/python/paddle/fluid/contrib/__init__.py index 3bf2fe5db0cb2126295ebfda822eeac8762dbdb7..ece97b661fd7d60f8822439a84ee4403b9e3d81c 100644 --- a/python/paddle/fluid/contrib/__init__.py +++ b/python/paddle/fluid/contrib/__init__.py @@ -22,9 +22,12 @@ from . import op_frequence from .op_frequence import * from . import quantize from .quantize import * +from . import utils +from .utils import * __all__ = [] __all__ += decoder.__all__ __all__ += memory_usage_calc.__all__ __all__ += op_frequence.__all__ __all__ += quantize.__all__ +__all__ += utils.__all__ diff --git a/python/paddle/fluid/contrib/utils/__init__.py b/python/paddle/fluid/contrib/utils/__init__.py index 20b2cc381aaa1b837ce106410246bc8cedb2fc88..1c1c2fb22709189ca03dc543ca551257c8031c1a 100644 --- a/python/paddle/fluid/contrib/utils/__init__.py +++ b/python/paddle/fluid/contrib/utils/__init__.py @@ -13,10 +13,11 @@ # limitations under the License. from __future__ import print_function -#from . import lookup_table_utils -#from .lookup_table_utils import * +from . import lookup_table_utils +from .lookup_table_utils import * from . import hdfs_utils from .hdfs_utils import * -#__all__ = lookup_table_utils.__all__ -__all__ = hdfs_utils.__all__ +__all__ = [] +__all__ += lookup_table_utils.__all__ +__all__ += hdfs_utils.__all__ diff --git a/python/paddle/fluid/contrib/utils/hdfs_utils.py b/python/paddle/fluid/contrib/utils/hdfs_utils.py index baea57ccce0e9ca3a8fab244e43a107a89cfe67d..35ddf97ff2361d8abd34b16761be99990fc3880d 100644 --- a/python/paddle/fluid/contrib/utils/hdfs_utils.py +++ b/python/paddle/fluid/contrib/utils/hdfs_utils.py @@ -14,6 +14,7 @@ """HDFS Utils""" import os +import sys import subprocess import multiprocessing from datetime import datetime @@ -24,7 +25,7 @@ import errno import logging -__all__ = ["HDFSClient", "multi_download"] +__all__ = ["HDFSClient", "multi_download", "multi_upload"] logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s') _logger = logging.getLogger("hdfs_utils") @@ -93,13 +94,15 @@ class HDFSClient(object): def upload(self, hdfs_path, local_path, overwrite=False, retry_times=5): """ - upload the local file to hdfs - Args: - hdfs_path: hdfs path, target path - local_path: local file path, source path - overwrite: will overwrite the original file - retry_times: max times retry to upload - Returns: + upload the local file to hdfs + + Args: + hdfs_path(str): the hdfs file path + local_path(str): the local file path + overwrite(bool|None): will overwrite the file on HDFS or not + retry_times(int|5): retry times + + Returns: True or False """ assert hdfs_path is not None @@ -109,7 +112,7 @@ class HDFSClient(object): _logger.warn( "The Local path: {} is dir and I will support it later, return". format(local_path)) - return + return False base = os.path.basename(local_path) if not self.is_exist(hdfs_path): @@ -141,14 +144,16 @@ class HDFSClient(object): def download(self, hdfs_path, local_path, overwrite=False, unzip=False): """ - download from hdfs - Args: - hdfs_path: hdfs path, target path - local_path: local file path, source path - overwrite: will remove original file and overwrite it. - unzip: ignore this param - Returns - True or False + download file from HDFS + + Args: + hdfs_path(str): the hdfs file path + local_path(str): the local file path + overwrite(bool|None): will overwrite the file on HDFS or not + unzip(bool|False): if the download file is compressed by zip, unzip it or not. + + Returns: + True or False """ _logger.info('Downloading %r to %r.', hdfs_path, local_path) _logger.info('Download of %s to %r complete.', hdfs_path, local_path) @@ -188,13 +193,13 @@ class HDFSClient(object): def is_exist(self, hdfs_path=None): """ - whether the remote hdfs path exists? - Args: - hdfs_path: default value(${OUTPUT_PATH}/${SYS_USER_ID}/${SYS_JOB_ID}/tmp) - fs_name: The default values are the same as in the job configuration - fs_ugi: The default values are the same as in the job configuration - Returns: - True or False + whether the remote HDFS path exists + + Args: + hdfs_path(str): the hdfs file path + + Returns: + True or False """ exist_cmd = ['-test', '-e', hdfs_path] returncode, output, errors = self.__run_hdfs_cmd( @@ -211,13 +216,13 @@ class HDFSClient(object): def is_dir(self, hdfs_path=None): """ - whether the remote hdfs path exists? - Args: - remote_file_path: default value(${OUTPUT_PATH}/${SYS_USER_ID}/${SYS_JOB_ID}/tmp) - fs_name: The default values are the same as in the job configuration - fs_ugi: The default values are the same as in the job configuration - Returns: - True or False + whether the remote HDFS path is directory + + Args: + hdfs_path(str): the hdfs file path + + Returns: + True or False """ if not self.is_exist(hdfs_path): @@ -237,17 +242,17 @@ class HDFSClient(object): def delete(self, hdfs_path): """ - Remove a file or directory from HDFS. + Remove a file or directory from HDFS. + + whether the remote HDFS path exists Args: - param hdfs_path: HDFS path. - param recursive: Recursively delete files and directories. By default, - this method will raise an :class:`HdfsError` if trying to delete a - non-empty directory. + hdfs_path: HDFS path. + Returns: + True or False This function returns `True` if the deletion was successful and `False` if no file or directory previously existed at `hdfs_path`. - """ _logger.info('Deleting %r.', hdfs_path) @@ -273,16 +278,14 @@ class HDFSClient(object): def rename(self, hdfs_src_path, hdfs_dst_path, overwrite=False): """ - Rename a file or folder. - Args: - :param hdfs_src_path: Source path. - :param hdfs_dst_path: Destination path. If the path already exists and is - a directory, the source will be moved into it. If the path exists and is - a file, or if a parent destination directory is missing, this method will - raise an :class:`HdfsError`. + Move a file or folder on HDFS. + + Args: + hdfs_path(str): HDFS path. + overwrite(bool|False): If the path already exists and overwrite is False, will return False. + Returns: - This function returns `True` if the rename was successful and `False` if - rename was faild. + True or False """ assert hdfs_src_path is not None assert hdfs_dst_path is not None @@ -320,17 +323,20 @@ class HDFSClient(object): raise def makedirs(self, hdfs_path): - """Create a remote directory, recursively if necessary. + """ + Create a remote directory, recursively if necessary. + Args: - :param hdfs_path: Remote path. Intermediate directories will be created - appropriately. + hdfs_path(str): Remote path. Intermediate directories will be created appropriately. + Returns: - True if make a directories was successful, False when make a directiries was failed. + True or False """ _logger.info('Creating directories to %r.', hdfs_path) assert hdfs_path is not None if self.is_exist(hdfs_path): + _logger.error("HDFS path is exist: {}".format(hdfs_path)) return mkdirs_commands = ['-mkdir', hdfs_path] @@ -346,11 +352,13 @@ class HDFSClient(object): def ls(self, hdfs_path): """ - ls a hdfs_path. - Args: - :param hdfs_path: hdfs_path will be ls. + ls directory contents about HDFS hdfs_path + + Args: + hdfs_path(str): Remote HDFS path will be ls. + Returns: - This function returns a `list` that contaion all files in the hdfs_path. + List: a contents list about hdfs_path. """ assert hdfs_path is not None @@ -378,11 +386,15 @@ class HDFSClient(object): def lsr(self, hdfs_path, only_file=True, sort=True): """ - ls a hdfs_path sort by time. - Args: - :param hdfs_path: hdfs_path will be ls. + list directory contents about HDFS hdfs_path recursively + + Args: + hdfs_path(str): Remote HDFS path. + only_file(bool|True): will discard folders. + sort(bool|True): will be sorted by create time. + Returns: - This function returns a `list` that contaion all files sorted by time in the hdfs_path. + List: a contents list about hdfs_path. """ def sort_by_time(v1, v2): @@ -422,21 +434,106 @@ class HDFSClient(object): return ret_lines +def multi_download(client, + hdfs_path, + local_path, + trainer_id, + trainers, + multi_processes=5): + """ + Download files from HDFS using multi process. + + Args: + client(HDFSClient): instance of HDFSClient + hdfs_path(str): path on hdfs + local_path(str): path on local + trainer_id(int): current trainer id + trainers(int): all trainers number + multi_processes(int|5): the download data process at the same time, default=5 + + Returns: + List: + Download files in local folder. + """ + + def __subprocess_download(datas): + for data in datas: + re_path = os.path.relpath(os.path.dirname(data), hdfs_path) + if re_path == os.curdir: + sub_local_re_path = local_path + else: + sub_local_re_path = os.path.join(local_path, re_path) + client.download(data, sub_local_re_path) + + assert isinstance(client, HDFSClient) + + client.make_local_dirs(local_path) + _logger.info("Make local dir {} successfully".format(local_path)) + + all_need_download = client.lsr(hdfs_path, sort=True) + need_download = all_need_download[trainer_id::trainers] + _logger.info("Get {} files From all {} files need to be download from {}". + format(len(need_download), len(all_need_download), hdfs_path)) + + _logger.info("Start {} multi process to download datas".format( + multi_processes)) + procs = [] + for i in range(multi_processes): + process_datas = need_download[i::multi_processes] + p = multiprocessing.Process( + target=__subprocess_download, args=(process_datas, )) + procs.append(p) + p.start() + + # complete the processes + for proc in procs: + proc.join() + + _logger.info("Finish {} multi process to download datas".format( + multi_processes)) + + local_downloads = [] + for data in need_download: + data_name = os.path.basename(data) + re_path = os.path.relpath(os.path.dirname(data), hdfs_path) + if re_path == os.curdir: + local_re_path = os.path.join(local_path, data_name) + else: + local_re_path = os.path.join(local_path, re_path, data_name) + local_downloads.append(local_re_path) + + return local_downloads + + +def getfilelist(path): + rlist = [] + for dir, folder, file in os.walk(path): + for i in file: + t = os.path.join(dir, i) + rlist.append(t) + for r in rlist: + print(r) + + def multi_upload(client, hdfs_path, local_path, multi_processes=5, - overwrite=False): + overwrite=False, + sync=True): """ - Upload file to hdfs. + Upload files to HDFS using multi process. + Args: - :param overwrite: will overwrite hdfs file or not - :param multi_processes: the upload data process at the same time, default=5 - :param client: instance of HDFSClient - :param hdfs_path: path on hdfs - :param local_path: path on local + client(HDFSClient): instance of HDFSClient + hdfs_path(str): path on hdfs + local_path(str): path on local + multi_processes(int|5): the upload data process at the same time, default=5 + overwrite(bool|False): will overwrite file on HDFS or not + sync(bool|True): upload files sync or not. + Returns: - + None """ def __subprocess_upload(datas): @@ -446,13 +543,6 @@ def multi_upload(client, client.upload(hdfs_re_path, data, overwrite, retry_times=5) def get_local_files(path): - """ - Get all local files - Args: - path: local file path - Returns: - A list that contation all files in the path. - """ rlist = [] if not os.path.isdir(path): @@ -488,71 +578,6 @@ def multi_upload(client, multi_processes)) -def multi_download(client, - hdfs_path, - local_path, - trainer_id, - trainers, - file_cnt, - multi_processes=5): - """ - multi_download - Args: - :param client: instance of HDFSClient - :param hdfs_path: path on hdfs - :param local_path: path on local - :param trainer_id: current trainer id - :param trainers: all trainers number - :param file_cnt: all file number - :param multi_processes: the download data process at the same time, default=5 - :return: None - Returns: - A list that be downloaded. - """ - - def __subprocess_download(datas): - for data in datas: - re_path = os.path.relpath(os.path.dirname(data), hdfs_path) - local_re_path = os.path.join(local_path, re_path) - client.download(data, local_re_path) - - assert isinstance(client, HDFSClient) - - client.make_local_dirs(local_path) - _logger.info("Make local dir {} successfully".format(local_path)) - - all_need_download = client.lsr(hdfs_path, sort=True)[:file_cnt] - need_download = all_need_download[trainer_id::trainers] - _logger.info("Get {} files From all {} files need to be download from {}". - format(len(need_download), len(all_need_download), hdfs_path)) - - _logger.info("Start {} multi process to download datas".format( - multi_processes)) - procs = [] - for i in range(multi_processes): - process_datas = need_download[i::multi_processes] - p = multiprocessing.Process( - target=__subprocess_download, args=(process_datas, )) - procs.append(p) - p.start() - - # complete the processes - for proc in procs: - proc.join() - - _logger.info("Finish {} multi process to download datas".format( - multi_processes)) - - local_downloads = [] - for data in need_download: - data_name = os.path.basename(data) - re_path = os.path.relpath(os.path.dirname(data), hdfs_path) - local_re_path = os.path.join(local_path, re_path, data_name) - local_downloads.append(local_re_path) - - return local_downloads - - if __name__ == "__main__": hadoop_home = "/home/client/hadoop-client/hadoop/" diff --git a/python/paddle/fluid/contrib/utils/lookup_table_utils.py b/python/paddle/fluid/contrib/utils/lookup_table_utils.py index cc2418238f98d8e2b9af0cf4290f6088c11e1b92..20e6328d81cc727340ea4a16012f6ee9967ea1e6 100644 --- a/python/paddle/fluid/contrib/utils/lookup_table_utils.py +++ b/python/paddle/fluid/contrib/utils/lookup_table_utils.py @@ -18,14 +18,12 @@ import os import time import logging -import paddle -import paddle.fluid as fluid from paddle.fluid import core from paddle.fluid import io from paddle.fluid import Program __all__ = [ - "load_inference_model", "load_persistable_vars", + "load_persistables_for_increment", "load_persistables_for_inference", "convert_dist_to_sparse_program" ] @@ -80,19 +78,28 @@ def __get_prefetch_op_tuples(main_program): return prefetch_op_tuples -def convert_dist_to_sparse_program(main_program): - if not main_program._distributed_lookup_table: +def convert_dist_to_sparse_program(program): + """ + WARNING: this function will only be used for distributed training with distributed lookup table. + when we train model with distributed lookup table but want to do the local inference, we can use + this function to convert the train program with distributed lookup table to sparse lookup table. + + :param program(Program): the program must be the trainer program, which will be get by the distribute transpiler. + :return: + program: The `program` is a Program, it's the program replace distributed lookup table to sparse lookup table. + """ + if not program._distributed_lookup_table: _logger.warn( "There are no distributed lookup tables need to be converted") return # create table param and grad var in pserver program - origin_emb_var = "{}.origin".format(main_program._distributed_lookup_table) - emb_var = main_program._distributed_lookup_table - main_program.global_block()._rename_var(emb_var, origin_emb_var) - origin_param_var = main_program.global_block().vars[origin_emb_var] + origin_emb_var = "{}.origin".format(program._distributed_lookup_table) + emb_var = program._distributed_lookup_table + program.global_block()._rename_var(emb_var, origin_emb_var) + origin_param_var = program.global_block().vars[origin_emb_var] - param_var = main_program.global_block().create_var( + param_var = program.global_block().create_var( name=emb_var, shape=origin_param_var.shape, dtype=origin_param_var.dtype, @@ -100,28 +107,28 @@ def convert_dist_to_sparse_program(main_program): persistable=True) # parameter must be selected rows param_var.desc.set_type(core.VarDesc.VarType.SELECTED_ROWS) - main_program._sync_with_cpp() + program._sync_with_cpp() - prefetch_op_tuples = __get_prefetch_op_tuples(main_program) + prefetch_op_tuples = __get_prefetch_op_tuples(program) split_ids_id = prefetch_op_tuples[0] for idx in range(split_ids_id + 2, split_ids_id - 1, -1): - main_program.global_block()._remove_op(idx) - main_program.desc.flush() + program.global_block()._remove_op(idx) + program.desc.flush() in_out_pairs = zip(prefetch_op_tuples[1], prefetch_op_tuples[2]) for in_out_pair in in_out_pairs: idx = split_ids_id - ids = main_program.global_block().vars[in_out_pair[0]] - out = main_program.global_block().vars[in_out_pair[1]] - __insert_lookup_sparse_table_op(main_program, idx, ids, param_var, out) - main_program.desc.flush() - return main_program + ids = program.global_block().vars[in_out_pair[0]] + out = program.global_block().vars[in_out_pair[1]] + __insert_lookup_sparse_table_op(program, idx, ids, param_var, out) + program.desc.flush() + return program -def load_persistable_vars(executor, dirname, program, lookup_table_var): +def _load_persistable_vars(executor, dirname, program, lookup_table_vars): def _is_checkpoint_var(exclude_fluid_vars=None): """ the checkpoint will not save or load all the variables. @@ -159,8 +166,82 @@ def load_persistable_vars(executor, dirname, program, lookup_table_var): return is_valid - def _load_lookup_table_vars(executor, dirname, main_program, - lookup_table_vars): + io.load_vars( + executor, + dirname=dirname, + main_program=program, + predicate=_is_checkpoint_var(lookup_table_vars), + filename=None) + + +def load_persistables_for_increment(dirname, executor, program, + lookup_table_var, lookup_table_var_path): + """ + WARNING: this function will only be used for distributed training with distributed lookup table. + for increment trainning, the pserver will not only load dense variables, + but also load the suitable lookup table var. Because of slice lookup table + var with HASH, we must load the correct slice var. + + + :param dirname(str): The directory path + :param executor(Executor): The executor to run for loading inference model. + :param program(Program): The parameter server program, which will run on Pserver. + :param lookup_table_var: the distributed lookup tables var name. + :param lookup_table_var_path: the the distributed lookup tables var location. + :return: None + """ + + def __load_lookup_table_vars(executor, main_program, lookup_table_var, + lookup_table_var_path): + emb_var = main_program.global_block().var(lookup_table_var) + + load_program = Program() + load_block = load_program.global_block() + load_block.append_op( + type='load', + inputs={}, + outputs={'Out': [emb_var]}, + attrs={'file_path': lookup_table_var_path}) + executor.run(load_program) + + if not os.path.isdir(dirname): + raise ValueError("There is no directory named '%s'", dirname) + + if not os.path.exists(lookup_table_var_path): + raise ValueError("There is no file named '%s'", lookup_table_var_path) + + if not isinstance(program, Program): + raise ValueError("program must be an instance of fluid.Program") + + _logger.info("Start Load Sparse Program With " + "Distributed Lookup Table Vars from {}, time = {}".format( + dirname, time.ctime())) + + _load_persistable_vars(executor, dirname, program, [lookup_table_var]) + __load_lookup_table_vars(executor, program, lookup_table_var, + lookup_table_var_path) + + _logger.info("Finish Load Sparse Program With " + "Distributed Lookup Table Vars from {}, time = {}".format( + dirname, time.ctime())) + + +def load_persistables_for_inference(dirname, executor, program, + lookup_table_var_name): + """ + WARNING: this function will only be used for inference with distributed lookup table. + Inference with distributed lookup table is a little funky, this function will load distributed + lookup table vars into sparse var, can be used in local inference mode. + + :param dirname(str): The directory path + :param executor(Executor): The executor to run for loading inference model. + :param program(Program): The parameter server program, which will run on Pserver. + :param lookup_table_var_name: the distributed lookup tables var name. + :return: None + """ + + def __load_lookup_table_vars(executor, dirname, main_program, + lookup_table_vars): if not os.path.isdir(dirname): raise ValueError("There is no directory named '%s'", dirname) @@ -209,48 +290,34 @@ def load_persistable_vars(executor, dirname, program, lookup_table_var): global_block.append_op(type='delete_var', inputs={'X': sums}) executor.run(convert_program) - _logger.info("Start Load Sparse Program With " - "Distributed Lookup Table Vars from {}, time = {}".format( - dirname, time.ctime())) - - lookup_table_vars = [lookup_table_var] - - io.load_vars( - executor, - dirname=dirname, - main_program=program, - predicate=_is_checkpoint_var(lookup_table_vars), - filename=None) - - _load_lookup_table_vars(executor, dirname, program, lookup_table_vars) - - _logger.info("Finish Load Sparse Program With " - "Distributed Lookup Table Vars from {}, time = {}".format( - dirname, time.ctime())) - - -def load_inference_model(dirname, executor, lookup_table_var_name): if not os.path.isdir(dirname): raise ValueError("There is no directory named '%s'", dirname) - local_model = os.path.join(dirname, model_filename) + if program: + if not isinstance(program, Program): + raise ValueError("program must be an instance of fluid.Program") + else: + local_model = os.path.join(dirname, model_filename) - with open(local_model, "rb") as f: - program_desc_str = f.read() + with open(local_model, "rb") as f: + program_desc_str = f.read() - program = Program.parse_from_string(program_desc_str) + program = Program.parse_from_string(program_desc_str) - if not core._is_program_version_supported(program._version()): - raise ValueError("Unsupported program version: %d\n" % - program._version()) + if not core._is_program_version_supported(program._version()): + raise ValueError("Unsupported program version: %d\n" % + program._version()) - # Binary data also need version. - load_persistable_vars(executor, dirname, program, lookup_table_var_name) + _logger.info("Start Load Sparse Program With " + "Distributed Lookup Table Vars from {}, time = {}".format( + dirname, time.ctime())) + + _load_persistable_vars(executor, dirname, program, [lookup_table_var_name]) + __load_lookup_table_vars(executor, dirname, program, + [lookup_table_var_name]) - feed_target_names = program.desc.get_feed_target_names() - fetch_target_names = program.desc.get_fetch_target_names() - fetch_targets = [ - program.global_block().var(name) for name in fetch_target_names - ] + _logger.info("Finish Load Sparse Program With " + "Distributed Lookup Table Vars from {}, time = {}".format( + dirname, time.ctime())) - return [program, feed_target_names, fetch_targets] + return program diff --git a/python/paddle/fluid/layers/nn.py b/python/paddle/fluid/layers/nn.py index d8bc919784bf85538ef092b29ecdd8c88ae910d0..bdfcc8c4e2604fca9e93c5bc35a31a75db2cf78e 100644 --- a/python/paddle/fluid/layers/nn.py +++ b/python/paddle/fluid/layers/nn.py @@ -18,7 +18,9 @@ All layers just related to the neural network. from __future__ import print_function import numpy as np +import six import os +import inspect from ..layer_helper import LayerHelper from ..initializer import Normal, Constant from ..framework import Variable, OpProtoHolder @@ -176,6 +178,7 @@ __all__ = [ 'merge_selected_rows', 'get_tensor_from_selected_rows', 'lstm', + 'py_func', 'psroi_pool', 'huber_loss', ] @@ -9327,6 +9330,224 @@ def get_tensor_from_selected_rows(x, name=None): return out +class PyFuncRegistry(object): + _register_funcs = [] + + def __init__(self, func): + if func is None or not callable(func): + raise TypeError('func must be a Python function') + + self._func = func + # find named args using reflection + args = inspect.getargspec(self._func) + if len(args[0]) == 0 and args[1] is None and args[2] is None: + # Function with no inputs + self._named_args = None + else: + self._named_args = args[0] + self._id = core._append_python_callable_object_and_return_id(self) + ''' + Why record self here? + + 1. For debug usage. Users can call + :code:`py_func.registered_func(idx)` method + to find the registered function corresponding + to :code:`idx`. + + 2. For increasing reference count of self. + It seems that to release Python object + whose reference count is 1 would cause + segmentation fault error in C++ side. + May be lack of Python GC in C++ side? + ''' + PyFuncRegistry._register_funcs.append(self) + + @classmethod + def registered_func(cls, idx): + return cls._register_funcs[idx]._func + + @classmethod + def registered_func_num(cls): + return len(cls._register_funcs) + + @property + def id(self): + return self._id + + def __call__(self, *args): + if self._named_args is None: + func_ret = self._func() + else: + kwargs = dict() + idx = 0 + for arg in self._named_args: + kwargs[arg] = args[idx] + idx += 1 + func_ret = self._func(*args[idx:], **kwargs) + + if not isinstance(func_ret, (list, tuple)): + func_ret = (func_ret, ) + + ret = [] + for each_ret in func_ret: + if each_ret is None or isinstance(each_ret, core.LoDTensor): + ret.append(each_ret) + continue + + if not isinstance(each_ret, np.ndarray): + each_ret = np.array(each_ret) + + tensor = core.LoDTensor() + tensor.set(each_ret, core.CPUPlace()) + ret.append(tensor) + + return tuple(ret) + + +@templatedoc() +def py_func(func, x, out, backward_func=None, skip_vars_in_backward_input=None): + """ + PyFunc Operator. + + User can use :code:`py_func` to register operators in Python side. + The inputs of :code:`func` is :code:`LoDTensor` and outputs can be + numpy array or :code:`LoDTensor`. Paddle would call the registered + :code:`func` in forward part, and call :code:`backward_func` in + backward part (if :code:`backward_func` is not None). + + User should set the right data type and shape of :code:`out` before + calling this function. However, data types and shapes of gradients of + :code:`out` and :code:`x` would be inferred automatically. + + Input orders of :code:`backward_func` would be: forward inputs + :code:`x`, forward outputs :code:`out` and backward input gradients of + :code:`out`. If some variables of :code:`out` have no gradient, the input + tensor would be None in Python side. If some variables of :code:`in` have + no gradient, users should return None. + + This function can also be used to debug the running network. User can + add a :code:`py_func` operator without output, and print input + :code:`x` inside :code:`func`. + + Args: + func (callable): forward Python function. + x (Variable|list(Variable)|tuple(Variable)): inputs of :code:`func`. + out (Variable|list(Variable)|tuple(Variable)): outputs of :code:`func`. + Paddle cannot infer shapes and data types of :code:`out`. Users + should create :code:`out` beforehand. + backward_func (callable|None): backward Python function. + None means no backward. Default None. + skip_vars_in_backward_input (Variable|list(Variable)|tuple(Variable)): + Variables that are not needed in :code:`backward_func` inputs. + These variables must be any of :code:`x` and :code:`out`. + If set, these vars would not be inputs of :code:`backward_func`, + Only useful when :code:`backward_func` is not None. Default None. + + Returns: + out (Variable|list(Variable)|tuple(Variable)): input :code:`out` + + Examples: + + >>> import paddle.fluid as fluid + >>> import six + >>> + >>> def create_tmp_var(name, dtype, shape): + >>> return fluid.default_main_program().current_block().create_var( + >>> name=name, dtype=dtype, shape=shape) + >>> + >>> # tanh activation has been provided by Paddle C++ op + >>> # Here, we only use tanh to be an example to show the usage + >>> # of py_func + >>> def tanh(x): + >>> return np.tanh(x) + >>> + >>> # forward input x is skipped + >>> def tanh_grad(y, dy): + >>> return np.array(dy) * (1 - np.square(np.array(y))) + >>> + >>> def debug_func(x): + >>> print(x) + >>> + >>> def simple_net(img, label): + >>> hidden = img + >>> for idx in six.moves.range(4): + >>> hidden = fluid.layers.fc(hidden, size=200) + >>> new_hidden = create_tmp_var(name='hidden_{}'.format(idx), + >>> dtype=hidden.dtype, shape=hidden.shape) + >>> + >>> # user-defined layers with forward and backward + >>> hidden = fluid.layers.py_func(func=tanh, x=hidden, + >>> out=new_hidden, backward_func=tanh_grad, + >>> skip_vars_in_backward_input=hidden) + >>> + >>> # user-defined debug layers to print variables + >>> fluid.layers.py_func(func=debug_func, x=hidden, out=None) + >>> + >>> prediction = fluid.layers.fc(hidden, size=10, act='softmax') + >>> loss = fluid.layers.cross_entropy(input=prediction, label=label) + >>> return fluid.layers.mean(loss) + """ + helper = LayerHelper('py_func', **locals()) + if x is None: + x = [] + elif isinstance(x, Variable): + x = [x] + elif not isinstance(x, (list, tuple)): + raise TypeError('Input must be Variable/list(Variable)/tuple(Variable)') + + if out is None: + out_list = [] + elif isinstance(out, Variable): + out_list = [out] + elif isinstance(out, (list, tuple)): + out_list = out + else: + raise TypeError( + 'Output must be Variable/list(Variable)/tuple(Variable)') + + fwd_func_id = PyFuncRegistry(func).id + bwd_func_id = PyFuncRegistry( + backward_func).id if backward_func is not None else -1 + + for each_out in out_list: + if len(each_out.shape) == 0: + raise ValueError( + 'Output shapes of py_func op should be provided by users manually' + ) + + backward_skip_vars = set() + if backward_func is not None and skip_vars_in_backward_input is not None: + if isinstance(skip_vars_in_backward_input, Variable): + skip_vars_in_backward_input = [skip_vars_in_backward_input] + + fwd_in_out = [v.name for v in x] + fwd_in_out.extend([v.name for v in out_list]) + fwd_in_out = set(fwd_in_out) + backward_skip_vars = set() + for v in skip_vars_in_backward_input: + if not v.name in fwd_in_out: + raise ValueError( + 'Variable {} is not found in forward inputs and outputs' + .format(v.name)) + backward_skip_vars.add(v.name) + + helper.append_op( + type='py_func', + inputs={'X': x}, + outputs={'Out': out_list}, + attrs={ + 'forward_callable_id': fwd_func_id, + 'backward_callable_id': bwd_func_id, + 'backward_skip_vars': list(backward_skip_vars) + }) + return out + + +# For debug usage +py_func.registered_func = PyFuncRegistry.registered_func +py_func.registered_func_num = PyFuncRegistry.registered_func_num + + @templatedoc() def psroi_pool(input, rois, diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index c54c3963a152851f5396c2ba71c28cc09c1cd523..74cf76da951a4cea884c4fdb8591b3d4fb010300 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -92,35 +92,27 @@ class ParallelExecutor(object): num_trainers=1, trainer_id=0, scope=None): + # step1: get places, the places are used in run too. self._places = [] - self._act_places = [] if use_cuda: - gpus = [] gpus_env = os.getenv("FLAGS_selected_gpus") if gpus_env: gpus = [int(s) for s in gpus_env.split(",")] else: - for i in six.moves.range(core.get_cuda_device_count()): - gpus.append(i) - for i in gpus: - p = core.Place() - self._act_places.append(core.CUDAPlace(i)) - p.set_place(self._act_places[-1]) - self._places.append(p) + gpus = [ + i for i in six.moves.range(core.get_cuda_device_count()) + ] + self._places = [core.CUDAPlace(i) for i in gpus] else: cpu_num = int( os.environ.get('CPU_NUM', multiprocessing.cpu_count())) - for i in six.moves.range(cpu_num): - p = core.Place() - self._act_places.append(core.CPUPlace()) - p.set_place(self._act_places[-1]) - self._places.append(p) + self._places = [core.CPUPlace() for _ in six.moves.range(cpu_num)] assert self._places, "no place for execution" + # step2: init exec_strategy if exec_strategy is None: exec_strategy = ExecutionStrategy() exec_strategy.use_cuda = use_cuda - if exec_strategy.num_threads == 0: if use_cuda: # Experiments on se-resnext shows that too many threads hurt @@ -131,49 +123,54 @@ class ParallelExecutor(object): os.environ.get('CPU_NUM', multiprocessing.cpu_count())) exec_strategy.num_threads = cpu_num * 2 + # step3: init build_strategy if build_strategy is None: build_strategy = BuildStrategy() - build_strategy.num_trainers = num_trainers build_strategy.trainer_id = trainer_id - main = main_program - main = main if main else framework.default_main_program() + # step4: get main_program, scope, local_scopes + main = main_program if main_program \ + else framework.default_main_program() + scope = scope if scope is not None else executor.global_scope() + + if share_vars_from and not isinstance(share_vars_from, + ParallelExecutor): + raise TypeError("share_vars_from must be ParallelExecutor.") + + local_scopes = share_vars_from.executor.local_scopes()\ + if share_vars_from else [] + # step5: check trainers_endpoints, it is used for distribution. trainers_endpoints = main._trainers_endpoints if num_trainers > 1 and trainers_endpoints: assert num_trainers == len( trainers_endpoints), "num_trainers == len(end_points)" build_strategy.trainers_endpoints = trainers_endpoints - if scope == None: - scope = executor.global_scope() - - if share_vars_from and not isinstance(share_vars_from, - ParallelExecutor): - raise TypeError("share_vars_from must be ParallelExecutor.") - - local_scopes = share_vars_from.executor.local_scopes( - ) if share_vars_from else [] - - self.persistable_vars = [ - v.name for v in [ + # step5: get persistable_vars, parameter_vars, places. persistable_vars + # need be broadcast to other local_scope. + persistable_vars = set([ + cpt.to_text(v.name) for v in [ var for var in main.list_vars() if var.persistable and var.type != core.VarDesc.VarType.RAW ] - ] + ]) + + def place_obj(place): + p = core.Place() + p.set_place(place) + return p + places = list(map(place_obj, self._places)) + + # step6: init ParallelExecutor self.executor = core.ParallelExecutor( - self._places, - set([ - cpt.to_text(p.name) - for p in main.global_block().iter_parameters() - if not p.stop_gradient - ]), - set(cpt.to_text(var) for var in self.persistable_vars), main.desc, + places, persistable_vars, main.desc, cpt.to_text(loss_name) if loss_name else six.u(''), scope, local_scopes, exec_strategy, build_strategy, num_trainers, trainer_id) + self.scope = scope def run(self, fetch_list, feed=None, feed_dict=None, return_numpy=True): @@ -261,7 +258,7 @@ class ParallelExecutor(object): self.executor.feed_and_split_tensor_into_local_scopes( feed_tensor_dict) elif isinstance(feed, list) or isinstance(feed, tuple): - if len(feed) != len(self._act_places): + if len(feed) != len(self._places): raise ValueError( "Feed a list of tensor, the list should be the same size as places" ) @@ -277,7 +274,7 @@ class ParallelExecutor(object): tensor = each[feed_name] if not isinstance(tensor, core.LoDTensor): tmp = core.LoDTensor() - tmp.set(tensor, self._act_places[i]) + tmp.set(tensor, self._places[i]) tensor = tmp res_dict[feed_name] = tensor res.append(res_dict) @@ -294,4 +291,4 @@ class ParallelExecutor(object): @property def device_count(self): - return len(self._act_places) + return len(self._places) diff --git a/python/paddle/fluid/tests/unittests/test_conv2d_mkldnn_op.py b/python/paddle/fluid/tests/unittests/test_conv2d_mkldnn_op.py index 1902a9869807ba7ce3f9828c124256cc6752857e..438d45b84033b697c3210acc44392b93bf436df0 100644 --- a/python/paddle/fluid/tests/unittests/test_conv2d_mkldnn_op.py +++ b/python/paddle/fluid/tests/unittests/test_conv2d_mkldnn_op.py @@ -16,7 +16,7 @@ from __future__ import print_function import unittest -from test_conv2d_op import TestConv2dOp, TestWithPad, TestWithStride +from test_conv2d_op import TestConv2dOp, TestWithPad, TestWithStride, TestWithGroup, TestWith1x1, TestWithInput1x1Filter1x1 class TestMKLDNN(TestConv2dOp): @@ -37,5 +37,23 @@ class TestMKLDNNWithStride(TestWithStride): self.data_format = "NCHW" +class TestMKLDNNWithGroup(TestWithGroup): + def init_kernel_type(self): + self.use_mkldnn = True + self.data_format = "NCHW" + + +class TestMKLDNNWith1x1(TestWith1x1): + def init_kernel_type(self): + self.use_mkldnn = True + self.data_format = "NCHW" + + +class TestMKLDNNWithInput1x1Filter1x1(TestWithInput1x1Filter1x1): + def init_kernel_type(self): + self.use_mkldnn = True + self.data_format = "NCHW" + + if __name__ == '__main__': unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_get_tensor_from_selected_rows_op.py b/python/paddle/fluid/tests/unittests/test_get_tensor_from_selected_rows_op.py index 021b950b3b6245caecab22d476bbb9d6b6b45c5e..6cd02dad577b681b8c452bdb9574df60ffb4f82e 100644 --- a/python/paddle/fluid/tests/unittests/test_get_tensor_from_selected_rows_op.py +++ b/python/paddle/fluid/tests/unittests/test_get_tensor_from_selected_rows_op.py @@ -29,7 +29,7 @@ class TestGetTensorFromSelectedRows(unittest.TestCase): def check_with_place(self, place): scope = core.Scope() - x_rows = [0, 5, 5, 4, 20] + x_rows = [0, 5, 5, 4, 19] height = 20 row_numel = 2 diff --git a/python/paddle/fluid/tests/unittests/test_merge_selectedrows_op.py b/python/paddle/fluid/tests/unittests/test_merge_selectedrows_op.py index ce64da0478d3997f4889ca942c67e0defac80b45..d2fa344b67ab33a93f92733efd68e896c767bad2 100644 --- a/python/paddle/fluid/tests/unittests/test_merge_selectedrows_op.py +++ b/python/paddle/fluid/tests/unittests/test_merge_selectedrows_op.py @@ -29,8 +29,8 @@ class TestMergeSelectedRows(unittest.TestCase): def check_with_place(self, place): scope = core.Scope() - x_rows = [0, 5, 5, 4, 20] - out_rows = [0, 4, 5, 20] + x_rows = [0, 5, 5, 4, 19] + out_rows = [0, 4, 5, 19] height = 20 row_numel = 2 diff --git a/python/paddle/fluid/tests/unittests/test_py_func_op.py b/python/paddle/fluid/tests/unittests/test_py_func_op.py new file mode 100644 index 0000000000000000000000000000000000000000..943ad3ed22480193dc51375cdcca5ed36ce35158 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_py_func_op.py @@ -0,0 +1,183 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import paddle.fluid as fluid +import paddle +import unittest +import six +import numpy as np + +dev_cnt = 2 +if fluid.core.is_compiled_with_cuda(): + dev_cnt = fluid.core.get_cuda_device_count() +os.environ['CPU_NUM'] = str(dev_cnt) + + +def dummy_func_with_no_input(): + return float(1.0) + + +def dummy_func_with_no_output(x): + pass + + +def tanh(x): + return np.tanh(x) + + +def tanh_grad(y, dy): + return np.array(dy) * (1 - np.square(np.array(y))) + + +def cross_entropy(logits, labels): + logits = np.array(logits) + labels = np.array(labels) + M = logits.shape[0] + N = logits.shape[1] + ret = np.ndarray([M, 1]).astype(logits.dtype) + for idx in six.moves.range(M): + ret[idx][0] = -np.log(logits[idx][labels[idx][0]]) + return ret + + +def cross_entropy_grad(logits, labels, bwd_dout): + logits = np.array(logits) + labels = np.array(labels) + bwd_dout = np.array(bwd_dout) + M = logits.shape[0] + N = logits.shape[1] + dlogits = np.zeros([M, N]).astype(logits.dtype) + for idx in six.moves.range(M): + dlogits[idx][labels[idx][0]] = -bwd_dout[idx] / logits[idx][labels[idx][ + 0]] + return dlogits, None + + +def simple_fc_net(img, label, use_py_func_op): + hidden = img + for idx in range(4): + hidden = fluid.layers.fc( + hidden, + size=200, + bias_attr=fluid.ParamAttr( + initializer=fluid.initializer.Constant(value=1.0))) + if not use_py_func_op: + hidden = fluid.layers.tanh(hidden) + else: + new_hidden = fluid.default_main_program().current_block( + ).create_var( + name='hidden_{}'.format(idx), + dtype='float32', + shape=hidden.shape) + hidden = fluid.layers.py_func( + func=tanh, + x=hidden, + out=new_hidden, + backward_func=tanh_grad, + skip_vars_in_backward_input=hidden) + + prediction = fluid.layers.fc(hidden, size=10, act='softmax') + if not use_py_func_op: + loss = fluid.layers.cross_entropy(input=prediction, label=label) + else: + loss = fluid.default_main_program().current_block().create_var( + name='loss', dtype='float32', shape=[-1, 1]) + loss = fluid.layers.py_func( + func=cross_entropy, + x=[prediction, label], + out=loss, + backward_func=cross_entropy_grad, + skip_vars_in_backward_input=loss) + + dummy_var = fluid.default_main_program().current_block().create_var( + name='test_tmp_var', dtype='float32', shape=[1]) + fluid.layers.py_func( + func=dummy_func_with_no_input, x=None, out=dummy_var) + + fluid.layers.py_func(func=dummy_func_with_no_output, x=loss, out=None) + + loss = fluid.layers.mean(loss) + return loss + + +def reader(): + for _ in six.moves.range(dev_cnt * 100): + yield np.random.random([784]), np.random.random_integers( + size=[1], low=0, high=9) + + +def test_main(use_cuda, use_py_func_op, use_parallel_executor): + if use_cuda and not fluid.core.is_compiled_with_cuda(): + return None + + with fluid.program_guard(fluid.Program(), fluid.Program()): + with fluid.scope_guard(fluid.core.Scope()): + fluid.default_main_program().random_seed = 1 + fluid.default_startup_program().random_seed = 1 + np.random.seed(1) + + img = fluid.layers.data(name='image', shape=[784], dtype='float32') + label = fluid.layers.data(name='label', shape=[1], dtype='int64') + loss = simple_fc_net(img, label, use_py_func_op) + optimizer = fluid.optimizer.SGD(learning_rate=1e-3) + optimizer.minimize(loss) + + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + feeder = fluid.DataFeeder(feed_list=[img, label], place=place) + r = paddle.batch(reader, batch_size=10) + + exe = fluid.Executor(place) + exe.run(fluid.default_startup_program()) + if use_parallel_executor: + exe = fluid.ParallelExecutor( + use_cuda=use_cuda, loss_name=loss.name) + fetch_list = [loss.name] + else: + fetch_list = [loss] + + ret = [] + for epoch_id in six.moves.range(2): + for d in r(): + L, = exe.run(feed=feeder.feed(d), fetch_list=fetch_list) + ret.append(L) + + return np.array(ret) + + +class TestPyFuncOpUseExecutor(unittest.TestCase): + def setUp(self): + self.use_parallel_executor = False + + def test_loss_diff(self): + losses = [] + for use_cuda in [True, False]: + for use_py_func_op in [True, False]: + L = test_main(use_cuda, use_py_func_op, + self.use_parallel_executor) + if L is not None: + losses.append(L) + + for idx in six.moves.range(len(losses) - 1): + max_diff = np.max(np.abs(losses[idx] - losses[0])) + self.assertAlmostEqual(max_diff, 0, delta=1e-3) + + +class TestPyFuncOpUseParallelExecutor(unittest.TestCase): + def setUp(self): + self.use_parallel_executor = True + + +if __name__ == '__main__': + unittest.main() diff --git a/python/setup.py.in b/python/setup.py.in index 22b9537a90e79c2571f61ec0dc156b602df784d6..5d5f2dd0f18cd3e707dca8b9f337f2f2a07d47aa 100644 --- a/python/setup.py.in +++ b/python/setup.py.in @@ -107,9 +107,9 @@ packages=['paddle', 'paddle.fluid.distributed', 'paddle.fluid.layers', 'paddle.fluid.contrib', - 'paddle.fluid.contrib.utils', 'paddle.fluid.contrib.decoder', 'paddle.fluid.contrib.quantize', + 'paddle.fluid.contrib.utils', 'paddle.fluid.transpiler', 'paddle.fluid.transpiler.details']