diff --git a/paddle/fluid/framework/details/CMakeLists.txt b/paddle/fluid/framework/details/CMakeLists.txt index aa6b7db5562f8596b1b30a16c0f08fcc433cfcd7..d8bc72e6b2fa38db06cb077ada9d7ec180299e8c 100644 --- a/paddle/fluid/framework/details/CMakeLists.txt +++ b/paddle/fluid/framework/details/CMakeLists.txt @@ -35,13 +35,15 @@ if(WITH_GPU) all_reduce_op_handle reduce_op_handle broadcast_op_handle data_balance_op_handle graph graph_helper pass) endif() +cc_library(sequential_execution_pass SRCS sequential_execution_pass.cc DEPS graph graph_helper pass) + cc_library(multi_devices_graph_pass SRCS multi_devices_graph_pass.cc DEPS multi_devices_helper computation_op_handle scale_loss_grad_op_handle rpc_op_handle all_reduce_op_handle reduce_op_handle broadcast_op_handle data_balance_op_handle fused_broadcast_op_handle) if(WITH_GPU) - cc_library(ssa_graph_executor SRCS ssa_graph_executor.cc DEPS graph framework_proto reference_count_pass) + cc_library(ssa_graph_executor SRCS ssa_graph_executor.cc DEPS graph framework_proto reference_count_pass sequential_execution_pass) else() - cc_library(ssa_graph_executor SRCS ssa_graph_executor.cc DEPS graph framework_proto) + cc_library(ssa_graph_executor SRCS ssa_graph_executor.cc DEPS graph framework_proto sequential_execution_pass) endif() cc_library(threaded_ssa_graph_executor SRCS threaded_ssa_graph_executor.cc DEPS fetch_op_handle ssa_graph_executor scope diff --git a/paddle/fluid/framework/details/build_strategy.cc b/paddle/fluid/framework/details/build_strategy.cc index fefd27fc86fb8dce3311fa580d90f518906dd862..bc19bd36610bf144f163c8ebf582d4afbc6592e3 100644 --- a/paddle/fluid/framework/details/build_strategy.cc +++ b/paddle/fluid/framework/details/build_strategy.cc @@ -16,6 +16,7 @@ limitations under the License. */ #include "paddle/fluid/framework/details/multi_devices_graph_check_pass.h" #include "paddle/fluid/framework/details/multi_devices_graph_print_pass.h" +#include "paddle/fluid/framework/details/sequential_execution_pass.h" #include "paddle/fluid/framework/ir/graph.h" #include "paddle/fluid/framework/ir/graph_viz_pass.h" @@ -27,6 +28,10 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { public: explicit ParallelExecutorPassBuilder(const BuildStrategy &strategy) : ir::PassBuilder(), strategy_(strategy) { + if (strategy_.enable_sequential_execution_) { + AppendPass("sequential_execution_pass"); + } + // Add a graph viz pass to record a graph. if (!strategy_.debug_graphviz_path_.empty()) { auto viz_pass = AppendPass("graph_viz_pass"); @@ -110,6 +115,11 @@ std::unique_ptr BuildStrategy::Apply( pass->Erase("nccl_ctxs"); pass->SetNotOwned("nccl_ctxs", nctx); #endif + } else if (pass->Type() == "sequential_execution_pass") { + pass->Erase(kAllOpDescs); + pass->Set>( + kAllOpDescs, + new std::vector(main_program.Block(0).AllOps())); } graph = pass->Apply(std::move(graph)); } @@ -125,3 +135,4 @@ USE_PASS(multi_batch_merge_pass); USE_PASS(multi_devices_pass); USE_PASS(multi_devices_check_pass); USE_PASS(multi_devices_print_pass); +USE_PASS(sequential_execution_pass); diff --git a/paddle/fluid/framework/details/build_strategy.h b/paddle/fluid/framework/details/build_strategy.h index f3ffaf6ecd7c4dd99c40fe58ba88c0cbdc14bde7..88459320b0eb6d6c4405bff4c8b13c99aa7edb0d 100644 --- a/paddle/fluid/framework/details/build_strategy.h +++ b/paddle/fluid/framework/details/build_strategy.h @@ -69,6 +69,8 @@ struct BuildStrategy { bool enable_data_balance_{false}; + bool enable_sequential_execution_{false}; + bool fuse_broadcast_op_{false}; // User normally doesn't need to call this API. diff --git a/paddle/fluid/framework/details/sequential_execution_pass.cc b/paddle/fluid/framework/details/sequential_execution_pass.cc new file mode 100644 index 0000000000000000000000000000000000000000..cc2c8bfef9f9f54c2e499467df0d22ce3f69d6b8 --- /dev/null +++ b/paddle/fluid/framework/details/sequential_execution_pass.cc @@ -0,0 +1,109 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/framework/details/sequential_execution_pass.h" +#include +#include +#include +#include +#include "paddle/fluid/framework/op_proto_maker.h" + +namespace paddle { +namespace framework { +namespace details { + +static bool IsSameOpDesc(OpDesc *op1, OpDesc *op2) { + return op1->Type() == op2->Type() && op1->Inputs() == op2->Inputs() && + op1->Outputs() == op2->Outputs(); +} + +std::unique_ptr SequentialExecutionPass::ApplyImpl( + std::unique_ptr graph) const { + // FIXME(zjl): Insert dependencies between some distributed ops may cause + // the multi_devices_graph_pass fails. So we skip these ops here. + // Indeed, maybe we should not insert dependencies between these ops + // casually, which may cause deadlock easily. + // We should add more skipped distributed ops when found errors in + // multi_devices_graph_pass + static std::unordered_set skip_dist_ops{ + "send", "recv", "send_barrier", "fetch_barrier"}; + + auto &ops = Get>(kAllOpDescs); + std::vector op_node_list; + op_node_list.reserve(ops.size()); + + std::unordered_map op_deps; + std::unordered_map> pending_ops; + std::unordered_set ready_ops; + + for (ir::Node *node : graph->Nodes()) { + if (!node->IsOp()) continue; + std::unordered_set preceding_ops; + for (auto *in : node->inputs) { + PADDLE_ENFORCE(in->IsVar(), + "Preceding Node of Op Nodes must be Var Node"); + if (in->inputs.empty()) continue; + PADDLE_ENFORCE(in->inputs.size() == 1 && in->inputs[0]->IsOp(), + "Preceding Op Node of Var Node must be unique"); + preceding_ops.insert(in->inputs[0]); + pending_ops[in->inputs[0]].insert(node); + } + op_deps[node] = preceding_ops.size(); + if (preceding_ops.empty()) { + ready_ops.insert(node); + } + } + + for (auto *op_desc : ops) { + ir::Node *found_node = nullptr; + for (auto *node : ready_ops) { + if (IsSameOpDesc(op_desc, node->Op())) { + PADDLE_ENFORCE(found_node == nullptr, + "Found multiple op_desc in graph: %s", op_desc->Type()); + found_node = node; + } + } + + PADDLE_ENFORCE_NOT_NULL(found_node, "Cannot find op_desc in graph: %s", + op_desc->Type()); + for (auto *pending_op : pending_ops[found_node]) { + if (--op_deps.at(pending_op) == 0) { + ready_ops.insert(pending_op); + } + } + ready_ops.erase(found_node); + if (skip_dist_ops.count(op_desc->Type()) == 0) { + op_node_list.push_back(found_node); + } + } + + for (size_t i = 1; i < op_node_list.size(); ++i) { + auto *dep_var = graph->CreateControlDepVar(); + op_node_list[i]->inputs.push_back(dep_var); + op_node_list[i - 1]->outputs.push_back(dep_var); + dep_var->outputs.push_back(op_node_list[i]); + dep_var->inputs.push_back(op_node_list[i - 1]); + VLOG(10) << "Add dependencies between " << op_node_list[i - 1]->Name() + << " and " << op_node_list[i]->Name(); + } + return graph; +} + +} // namespace details +} // namespace framework +} // namespace paddle + +REGISTER_PASS(sequential_execution_pass, + paddle::framework::details::SequentialExecutionPass) + .RequirePassAttr(paddle::framework::details::kAllOpDescs); diff --git a/paddle/fluid/framework/details/sequential_execution_pass.h b/paddle/fluid/framework/details/sequential_execution_pass.h new file mode 100644 index 0000000000000000000000000000000000000000..a04c08bc2eb3bae797d648b30a22a5fee7ba0eaa --- /dev/null +++ b/paddle/fluid/framework/details/sequential_execution_pass.h @@ -0,0 +1,34 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "paddle/fluid/framework/ir/graph.h" +#include "paddle/fluid/framework/ir/pass.h" + +namespace paddle { +namespace framework { +namespace details { + +constexpr char kAllOpDescs[] = "all_op_descs"; + +class SequentialExecutionPass : public ir::Pass { + protected: + std::unique_ptr ApplyImpl( + std::unique_ptr graph) const override; +}; + +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 5f15a29f4c3e9b1412912fe4723642d1ede60346..7c7b14df6618bd636f3636612486884b573309fb 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -821,6 +821,13 @@ All parameter, weight, gradient are variables in Paddle. [](BuildStrategy &self, bool b) { self.enable_data_balance_ = b; }) // FIXME(chengudo): enable_data_balance seems not important + .def_property("enable_sequential_execution", + [](const BuildStrategy &self) { + return self.enable_sequential_execution_; + }, + [](BuildStrategy &self, bool b) { + self.enable_sequential_execution_ = b; + }) .def_property( "fuse_elewise_add_act_ops", [](const BuildStrategy &self) { diff --git a/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py b/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py index ee291fe746f3a1b6ce18df9fb6aa174a89e2eadd..a3fe5e0a0591c8da787e3c2fdb030f3912548316 100644 --- a/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py +++ b/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py @@ -40,7 +40,8 @@ class TestParallelExecutorBase(unittest.TestCase): use_reduce=False, fuse_elewise_add_act_ops=False, optimizer=fluid.optimizer.Adam, - use_fast_executor=False): + use_fast_executor=False, + enable_sequential_execution=False): def run_executor(exe, feed, fetch_list, program=None): if isinstance(exe, fluid.ParallelExecutor): res = exe.run(fetch_list=fetch_list, feed=feed) @@ -80,6 +81,7 @@ class TestParallelExecutorBase(unittest.TestCase): build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce \ if use_reduce else fluid.BuildStrategy.ReduceStrategy.AllReduce build_strategy.fuse_elewise_add_act_ops = fuse_elewise_add_act_ops + build_strategy.enable_sequential_execution = enable_sequential_execution if use_parallel_executor: exe = fluid.ParallelExecutor( diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py index cc2d692e18430eb48e6e800106eab0c3739d3f53..e7a56bb6386a812e43e5c1b5c08cd0682aa9223a 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py @@ -232,6 +232,46 @@ class TestResnet(TestParallelExecutorBase): for loss in zip(all_reduce_last_loss, reduce_last_loss): self.assertAlmostEquals(loss[0], loss[1], delta=delta2) + if not use_cuda: + return + + all_reduce_first_loss_seq, all_reduce_last_loss_seq = self.check_network_convergence( + model, + feed_dict={"image": img, + "label": label}, + iter=iter, + batch_size=batch_size, + use_cuda=use_cuda, + use_reduce=False, + optimizer=optimizer, + enable_sequential_execution=True) + + reduce_first_loss_seq, reduce_last_loss_seq = self.check_network_convergence( + model, + feed_dict={"image": img, + "label": label}, + iter=iter, + batch_size=batch_size, + use_cuda=use_cuda, + use_reduce=True, + optimizer=optimizer, + enable_sequential_execution=True) + + for loss in zip(all_reduce_first_loss, all_reduce_first_loss_seq): + self.assertAlmostEquals(loss[0], loss[1], delta=1e-6) + for loss in zip(all_reduce_last_loss, all_reduce_last_loss_seq): + self.assertAlmostEquals(loss[0], loss[1], delta=delta2) + + for loss in zip(reduce_first_loss, reduce_first_loss_seq): + self.assertAlmostEquals(loss[0], loss[1], delta=1e-6) + for loss in zip(reduce_last_loss, reduce_last_loss_seq): + self.assertAlmostEquals(loss[0], loss[1], delta=delta2) + + for loss in zip(all_reduce_first_loss_seq, reduce_first_loss_seq): + self.assertAlmostEquals(loss[0], loss[1], delta=1e-6) + for loss in zip(all_reduce_last_loss_seq, reduce_last_loss_seq): + self.assertAlmostEquals(loss[0], loss[1], delta=delta2) + def _check_resnet_convergence(self, model, use_cuda=True, diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_transformer.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_transformer.py index a55b2002ed989d4588716202a37aa6f4139825ea..3827743908c1d76931572277323d1dd5ddd05523 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_transformer.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_transformer.py @@ -173,6 +173,8 @@ class TestTransformer(TestParallelExecutorBase): def test_main(self): if core.is_compiled_with_cuda(): self.check_network_convergence(transformer, use_cuda=True) + self.check_network_convergence( + transformer, use_cuda=True, enable_sequential_execution=True) self.check_network_convergence(transformer, use_cuda=False, iter=5)