diff --git a/paddle/fluid/framework/details/CMakeLists.txt b/paddle/fluid/framework/details/CMakeLists.txt index 6ec60700f0e17d81c036627c979f36da8fb25426..c4305f477d0f27e502c32a32aa4a090c129fa834 100644 --- a/paddle/fluid/framework/details/CMakeLists.txt +++ b/paddle/fluid/framework/details/CMakeLists.txt @@ -87,6 +87,12 @@ cc_library(fast_threaded_ssa_graph_executor SRCS fast_threaded_ssa_graph_executo DEPS fetch_op_handle ssa_graph_executor scope simple_threadpool device_context) cc_test(fused_broadcast_op_test SRCS fused_broadcast_op_handle_test.cc DEPS fused_broadcast_op_handle) +if(WITH_NGRAPH) + set(NGRAPH_BS_DEPS ngraph) +else() + set(NGRAPH_BS_DEPS) +endif() + cc_library(build_strategy SRCS build_strategy.cc DEPS graph_viz_pass multi_devices_graph_pass multi_devices_graph_print_pass multi_devices_graph_check_pass @@ -94,4 +100,5 @@ cc_library(build_strategy SRCS build_strategy.cc DEPS fuse_relu_depthwise_conv_pass lock_free_optimize_pass coalesce_grad_tensor_pass fuse_all_reduce_op_pass backward_optimizer_op_deps_pass - fuse_adam_op_pass fuse_sgd_op_pass fuse_momentum_op_pass) + fuse_adam_op_pass fuse_sgd_op_pass fuse_momentum_op_pass + ${NGRAPH_BS_DEPS}) diff --git a/paddle/fluid/framework/details/build_strategy.cc b/paddle/fluid/framework/details/build_strategy.cc index 68e8e631041a5445fb1ec5f0e1d233be639180cf..b43f3b43eb5c7f56edcbddc9aee74b0ac036cb1a 100644 --- a/paddle/fluid/framework/details/build_strategy.cc +++ b/paddle/fluid/framework/details/build_strategy.cc @@ -27,6 +27,7 @@ limitations under the License. */ #include "paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.h" DECLARE_bool(use_mkldnn); +DECLARE_bool(use_ngraph); namespace paddle { namespace framework { @@ -53,6 +54,8 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { "sequential_execution_pass"); AppendPassWithCheck(strategy_.sync_batch_norm_, "sync_batch_norm_pass"); + AppendPassToUseNgraph("ngraph_subgraph_pass"); + AppendOpFusePasses(); AppendPrintGraphPass("graph_viz_pass", "_fused_graph"); @@ -220,6 +223,22 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { #endif } + void AppendPassToUseNgraph(const std::string &pass_name) { +#ifdef PADDLE_WITH_NGRAPH + if (FLAGS_use_ngraph) { + if (strategy_.reduce_ != BuildStrategy::ReduceStrategy::kAllReduce) { + LOG(WARNING) << "Currently ngraph_subgraph_pass works under AllReduce," + "please set FLAGS_use_ngraph=false."; + } else { + AppendPass(pass_name); + } + } +#else + PADDLE_ENFORCE_NE(FLAGS_use_ngraph, true, + "Please compile with NGRAPH first to use NGRAPH"); +#endif + } + private: BuildStrategy strategy_; }; @@ -360,3 +379,6 @@ USE_PASS(runtime_context_cache_pass); #ifdef PADDLE_WITH_MKLDNN USE_PASS(mkldnn_placement_pass); #endif +#ifdef PADDLE_WITH_NGRAPH +USE_PASS(ngraph_subgraph_pass); +#endif diff --git a/paddle/fluid/framework/executor.cc b/paddle/fluid/framework/executor.cc index 29f2de5de0699ff4bda5deecae4e9e02ed74f150..7c2467310fc3ecaa863dc15b96f2d4b88f281dfb 100644 --- a/paddle/fluid/framework/executor.cc +++ b/paddle/fluid/framework/executor.cc @@ -39,11 +39,11 @@ limitations under the License. */ #ifdef PADDLE_WITH_NGRAPH #include "paddle/fluid/operators/ngraph/ngraph_engine.h" -DEFINE_bool(use_ngraph, false, "Use NGRAPH to run"); #endif DECLARE_bool(benchmark); DEFINE_bool(use_mkldnn, false, "Use MKLDNN to run"); +DEFINE_bool(use_ngraph, false, "Use NGRAPH to run"); namespace paddle { namespace framework { diff --git a/paddle/fluid/framework/ir/ngraph_subgraph_pass.cc b/paddle/fluid/framework/ir/ngraph_subgraph_pass.cc index 4664d13d1e4428c1a6655d658ba4215dbcfa6824..6198fab7dcaf7cce229532e50c34e516c1697ba4 100644 --- a/paddle/fluid/framework/ir/ngraph_subgraph_pass.cc +++ b/paddle/fluid/framework/ir/ngraph_subgraph_pass.cc @@ -47,8 +47,8 @@ std::string GenerateEngineKey(const std::set &engine_inputs, return engine_key; } -void NgraphSubgraphPass::ApplyImpl(ir::Graph *graph) const { - PADDLE_ENFORCE(graph); +void NgraphSubgraphPass::ApplyImpl(Graph *graph) const { + PADDLE_ENFORCE_NOT_NULL(graph); FusePassBase::Init("ngraph_subgraph_pass", graph); std::unordered_set nodes2delete; @@ -66,15 +66,13 @@ void NgraphSubgraphPass::ApplyImpl(ir::Graph *graph) const { if (node->IsOp() && !ANAT::Agent(node).subgraph()->empty()) { OpDesc *op_desc = node->Op(); op_desc->SetType("ngraph_engine"); - for (auto it = ANAT::Agent(node).subgraph()->begin(); - it != ANAT::Agent(node).subgraph()->end(); ++it) { - } CreateNgraphEngineOp(node, graph); std::unordered_set nodes2remove( ANAT::Agent(node).subgraph()->begin(), ANAT::Agent(node).subgraph()->end()); + GraphSafeRemoveNodes(graph, nodes2remove); } } @@ -85,70 +83,100 @@ void NgraphSubgraphPass::ApplyImpl(ir::Graph *graph) const { nodes2remove.insert(node); } } + framework::ir::GraphSafeRemoveNodes(graph, nodes2remove); - std::vector nodes = ir::TopologySortOperations(*graph); + // std::vector nodes = ir::TopologySortOperations(*graph); } -void NgraphSubgraphPass::CreateNgraphEngineOp(framework::ir::Node *node, - Graph *graph) const { - auto *op_desc = node->Op(); +bool IsValid(std::string name) { + return name.find(Node::kControlDepVarName) == std::string::npos; +} + +void UpdateNgraphIO(Node *node, Graph *graph, + std::vector *input_names, + std::vector *output_names) { + bool is_test = true, has_fetch = false; + for (Node *node : graph->Nodes()) { + if (node->IsOp() && node->Name().find("_grad") != std::string::npos) { + is_test = false; + } + if (node->IsVar() && node->Var()) { + for (auto out : node->outputs) { + if (out->Name() == "fetch") has_fetch = true; + } + } + } + if (is_test && has_fetch) { + for (auto *x : node->inputs) { + (*input_names).emplace_back(x->Name()); + } + for (auto *x : node->outputs) { + (*output_names).emplace_back(x->Name()); + } + return; + } + auto &subgraph = *ANAT::Agent(node).subgraph(); - PADDLE_ENFORCE(!subgraph.empty()); + std::unordered_set inputs; + std::unordered_set outputs; + for (auto *node : subgraph) { + for (auto in : node->inputs) { + auto name = in->Name(); + if (!IsValid(name)) continue; + if (!outputs.count(name) && !inputs.count(name)) { + (*input_names).emplace_back(name); + inputs.insert(name); + } + } + for (auto out : node->outputs) { + auto name = out->Name(); + if (!IsValid(name)) continue; + outputs.insert(name); + (*output_names).emplace_back(name); + } + } +} - framework::ProgramDesc *program_desc = - Get("program"); - const framework::BlockDesc &main_block = - program_desc->Block(framework::kRootBlockIndex); - framework::BlockDesc *new_block = program_desc->AppendBlock(main_block); +void NgraphSubgraphPass::CreateNgraphEngineOp(Node *node, Graph *graph) const { + auto &subgraph = *ANAT::Agent(node).subgraph(); + PADDLE_ENFORCE_NE(subgraph.empty(), true, "subgraph cannot be empty"); framework::proto::BlockDesc block_proto; framework::BlockDesc block_desc(nullptr, &block_proto); block_desc.Proto()->set_parent_idx(-1); block_desc.Proto()->set_idx(0); for (auto *node : subgraph) { - auto *new_block_op = new_block->AppendOp(); auto *op = block_desc.AppendOp(); - *new_block_op->Proto() = *node->Op()->Proto(); *op->Proto() = *node->Op()->Proto(); } - - std::set input_names; - std::set input_names_with_id; - for (auto *x : node->inputs) { - input_names.insert(x->Name()); - input_names_with_id.insert(x->Name() + std::to_string(x->id())); - } - op_desc->SetInput( - "Xs", std::vector(input_names.begin(), input_names.end())); - - std::set output_names; - std::set output_names_with_id; - - for (auto *x : node->outputs) { - output_names.insert(x->Name()); - output_names_with_id.insert(x->Name() + std::to_string(x->id())); - } - op_desc->SetOutput( - "Ys", std::vector(output_names.begin(), output_names.end())); auto *vars = block_desc.Proto()->mutable_vars(); - for (framework::ir::Node *node : graph->Nodes()) { + for (Node *node : graph->Nodes()) { if (node->IsVar() && node->Var()) { *vars->Add() = *node->Var()->Proto(); } } + PADDLE_ENFORCE_NE(block_desc.Proto()->vars().empty(), true, + "the block has no var-desc"); - PADDLE_ENFORCE(!block_desc.Proto()->vars().empty(), - "the block has no var-desc"); - - op_desc->SetType("ngraph_engine"); + std::vector input_names; + std::vector output_names; + UpdateNgraphIO(node, graph, &input_names, &output_names); + auto *op_desc = node->Op(); + op_desc->SetInput( + "Xs", std::vector(input_names.begin(), input_names.end())); + op_desc->SetOutput( + "Ys", std::vector(output_names.begin(), output_names.end())); int sgs = subgraph.size(); - std::string engine_key = GenerateEngineKey( - input_names_with_id, output_names_with_id, std::to_string(sgs)); + std::string subgraph_str = block_desc.Proto()->SerializeAsString(); + std::string engine_key = + std::to_string(std::hash()(subgraph_str)); std::vector interval{0, sgs}; + op_desc->SetType("ngraph_engine"); op_desc->SetAttr("interval", interval); - op_desc->SetAttr("graph", block_desc.Proto()->SerializeAsString()); + op_desc->SetAttr("graph", subgraph_str); op_desc->SetAttr("engine_key", engine_key); + op_desc->SetAttr("op_role", 0); } } // namespace ir diff --git a/paddle/fluid/inference/analysis/ir_passes/subgraph_detector.cc b/paddle/fluid/inference/analysis/ir_passes/subgraph_detector.cc index 670335827b47b9f5308bb7f16620c2b5c07f1c6b..064f947aaa7ca75c6497ddf76d4d78c5557fdeb8 100644 --- a/paddle/fluid/inference/analysis/ir_passes/subgraph_detector.cc +++ b/paddle/fluid/inference/analysis/ir_passes/subgraph_detector.cc @@ -21,6 +21,8 @@ limitations under the License. */ #include "paddle/fluid/framework/ir/graph_pattern_detector.h" #include "paddle/fluid/framework/ir/node.h" +DECLARE_bool(use_ngraph); + namespace paddle { namespace inference { namespace analysis { @@ -398,6 +400,11 @@ void RemoveIntermediateOutputInSubgraph(const std::vector &subgraph, } } + // In use for ngraph subgraph pass for parallel executor, + // this will remove all nodes, bypass this and let ngraph + // subgraph pass to process outputs + if (FLAGS_use_ngraph && valid_output.size() == 0) return; + outputs->assign(valid_output.begin(), valid_output.end()); } diff --git a/python/paddle/fluid/tests/unittests/ngraph/test_parallel_executor_ngraph.py b/python/paddle/fluid/tests/unittests/ngraph/test_parallel_executor_ngraph.py new file mode 100644 index 0000000000000000000000000000000000000000..3afe2b99083df320834c39f2cfef58b63bc22c9e --- /dev/null +++ b/python/paddle/fluid/tests/unittests/ngraph/test_parallel_executor_ngraph.py @@ -0,0 +1,87 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +from paddle.fluid.tests.unittests.simple_nets import simple_fc_net +import paddle.fluid as fluid +import paddle.fluid.core as core +from paddle.fluid import compiler +import numpy as np +import unittest +import os +import sys +import math + + +class TestPallelExecutorNgraph(unittest.TestCase): + def check_network_convergence(self, build_strategy=None): + os.environ['CPU_NUM'] = str(2) + main = fluid.Program() + startup = fluid.Program() + with fluid.program_guard(main, startup): + loss = simple_fc_net() + test_program = main.clone(for_test=True) + + opt = fluid.optimizer.Adam(learning_rate=0.001) + opt.minimize(loss) + + batch_size = 32 + image = np.random.normal(size=(batch_size, 784)).astype('float32') + label = np.random.randint(0, 10, (batch_size, 1), dtype="int64") + + place = fluid.CPUPlace() + exe = fluid.Executor(place) + exe.run(startup) + feed_dict = {'image': image, 'label': label} + + train_cp = compiler.CompiledProgram(main).with_data_parallel( + loss_name=loss.name, build_strategy=build_strategy) + test_cp = compiler.CompiledProgram(test_program).with_data_parallel( + loss_name=loss.name, + build_strategy=build_strategy, + share_vars_from=train_cp) + + for i in range(5): + _ = exe.run(train_cp, fetch_list=[loss.name], feed=feed_dict) + test_loss, = exe.run(test_cp, + fetch_list=[loss.name], + feed=feed_dict) + train_loss = exe.run(train_cp, + fetch_list=[loss.name], + feed=feed_dict) + + avg_test_loss_val = np.array(test_loss).mean() + if math.isnan(float(avg_test_loss_val)): + sys.exit("got NaN loss, testing failed.") + + avg_train_loss_val = np.array(train_loss).mean() + if math.isnan(float(avg_train_loss_val)): + sys.exit("got NaN loss, training failed.") + + self.assertTrue( + np.allclose( + train_loss, test_loss, atol=1e-8), + "Train loss: " + str(train_loss) + "\n Test loss:" + + str(test_loss)) + + def test_parallel_testing(self): + build_strategy = fluid.BuildStrategy() + build_strategy.enable_inplace = False + build_strategy.memory_optimize = False + self.check_network_convergence(build_strategy=build_strategy) + + +if __name__ == '__main__': + unittest.main()