From c49791362f39ff4da3610ac161accbcd48347af2 Mon Sep 17 00:00:00 2001 From: Zeng Jinle <32832641+sneaxiy@users.noreply.github.com> Date: Tue, 14 Apr 2020 10:12:50 +0800 Subject: [PATCH] Correct reader device index (#23802) * correct reader device index, test=develop * fix async executor scope var initialization, test=develop --- paddle/fluid/framework/details/CMakeLists.txt | 2 +- .../details/async_ssa_graph_executor.cc | 5 +- .../fluid/framework/details/build_strategy.cc | 9 -- .../multi_devices_graph_pass/CMakeLists.txt | 2 +- .../set_reader_device_info_pass.cc | 101 ------------------ .../set_reader_device_info_utils.cc | 87 +++++++++++++++ .../set_reader_device_info_utils.h | 31 ++++++ paddle/fluid/framework/parallel_executor.cc | 22 ++-- .../test_async_ssa_graph_executor_mnist.py | 7 +- 9 files changed, 137 insertions(+), 129 deletions(-) delete mode 100644 paddle/fluid/framework/ir/multi_devices_graph_pass/set_reader_device_info_pass.cc create mode 100644 paddle/fluid/framework/ir/multi_devices_graph_pass/set_reader_device_info_utils.cc create mode 100644 paddle/fluid/framework/ir/multi_devices_graph_pass/set_reader_device_info_utils.h diff --git a/paddle/fluid/framework/details/CMakeLists.txt b/paddle/fluid/framework/details/CMakeLists.txt index 223a7da7f6..b9535ee493 100644 --- a/paddle/fluid/framework/details/CMakeLists.txt +++ b/paddle/fluid/framework/details/CMakeLists.txt @@ -73,7 +73,7 @@ set(SSA_GRAPH_EXECUTOR_DEPS graph framework_proto eager_deletion_pass buffer_shared_inplace_op_pass buffer_shared_cross_op_memory_reuse_pass - set_reader_device_info_pass + set_reader_device_info_utils add_reader_dependency_pass) cc_library(ssa_graph_executor SRCS ssa_graph_executor.cc DEPS ${SSA_GRAPH_EXECUTOR_DEPS}) diff --git a/paddle/fluid/framework/details/async_ssa_graph_executor.cc b/paddle/fluid/framework/details/async_ssa_graph_executor.cc index c41788cc69..8b7f85912a 100644 --- a/paddle/fluid/framework/details/async_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/async_ssa_graph_executor.cc @@ -126,8 +126,9 @@ AsyncSSAGraphExecutor::AsyncSSAGraphExecutor( } } - for (size_t i = 0; i < local_scopes_.size(); ++i) { - InitVarsInScope(var_infos_, local_scopes_[i], local_exec_scopes_[i]); + for (size_t i = local_scopes_.size(); i >= 1; --i) { + InitVarsInScope(var_infos_, local_scopes_[i - 1], + local_exec_scopes_[i - 1]); } ProcessGraph(graphs_, local_scopes_[0]); } diff --git a/paddle/fluid/framework/details/build_strategy.cc b/paddle/fluid/framework/details/build_strategy.cc index 97219602cd..ecdb8cc9b8 100644 --- a/paddle/fluid/framework/details/build_strategy.cc +++ b/paddle/fluid/framework/details/build_strategy.cc @@ -64,7 +64,6 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { AppendAddReaderDependencyPass(); AppendMultiDevPass(); - AppendSetReaderDeviceIndexPass(); AppendMultiGraphOptPasses(); AppendPassToSetMkldnnAttr("mkldnn_placement_pass"); @@ -243,10 +242,6 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder { &strategy_); } - void AppendSetReaderDeviceIndexPass() { - AppendPass("set_reader_device_index_pass"); - } - void AppendPrintGraphPass(const std::string &pass_name, const std::string &debug_file_suffix) { if (!strategy_.debug_graphviz_path_.empty()) { @@ -403,9 +398,6 @@ ir::Graph *BuildStrategy::Apply(ir::Graph *graph, "GPU, skipped."; continue; } - } else if (pass->Type() == "set_reader_device_index_pass") { - pass->Erase(kPlaces); - pass->SetNotOwned>(kPlaces, &places); } VLOG(1) << "Start Apply Pass " << pass->Type(); graph = pass->Apply(graph); @@ -442,7 +434,6 @@ USE_PASS(fuse_sgd_op_pass); USE_PASS(fuse_momentum_op_pass); USE_PASS(fuse_all_reduce_op_pass); USE_PASS(runtime_context_cache_pass); -USE_PASS(set_reader_device_index_pass); USE_PASS(add_reader_dependency_pass); #ifdef PADDLE_WITH_MKLDNN USE_PASS(mkldnn_placement_pass); diff --git a/paddle/fluid/framework/ir/multi_devices_graph_pass/CMakeLists.txt b/paddle/fluid/framework/ir/multi_devices_graph_pass/CMakeLists.txt index e4d247192d..6eab32ab92 100644 --- a/paddle/fluid/framework/ir/multi_devices_graph_pass/CMakeLists.txt +++ b/paddle/fluid/framework/ir/multi_devices_graph_pass/CMakeLists.txt @@ -11,7 +11,7 @@ endif() cc_library(multi_devices_graph_pass SRCS multi_devices_graph_pass.cc DEPS multi_devices_helper computation_op_handle scale_loss_grad_op_handle rpc_op_handle fetch_barrier_op_handle ${ALL_REDUCE_OP_HANDLES} reduce_op_handle broadcast_op_handle fused_broadcast_op_handle) cc_library(sequential_execution_pass SRCS sequential_execution_pass.cc DEPS graph graph_helper pass) -cc_library(set_reader_device_info_pass SRCS set_reader_device_info_pass.cc DEPS graph graph_helper pass multi_devices_graph_pass) +cc_library(set_reader_device_info_utils SRCS set_reader_device_info_utils.cc DEPS graph graph_helper pass multi_devices_graph_pass) cc_library(fuse_all_reduce_op_pass SRCS fuse_all_reduce_op_pass.cc DEPS graph graph_helper fused_all_reduce_op_handle) cc_library(all_reduce_deps_pass SRCS all_reduce_deps_pass.cc DEPS all_reduce_op_handle graph graph_helper pass) diff --git a/paddle/fluid/framework/ir/multi_devices_graph_pass/set_reader_device_info_pass.cc b/paddle/fluid/framework/ir/multi_devices_graph_pass/set_reader_device_info_pass.cc deleted file mode 100644 index abea009f5b..0000000000 --- a/paddle/fluid/framework/ir/multi_devices_graph_pass/set_reader_device_info_pass.cc +++ /dev/null @@ -1,101 +0,0 @@ -// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "paddle/fluid/framework/details/computation_op_handle.h" -#include "paddle/fluid/framework/details/multi_devices_helper.h" -#include "paddle/fluid/framework/ir/graph.h" -#include "paddle/fluid/framework/ir/pass.h" -#include "paddle/fluid/operators/reader/lod_tensor_blocking_queue.h" - -namespace paddle { -namespace framework { -namespace ir { - -static int GetDeviceCountFromPassAttr(const Pass &pass) { - return static_cast( - pass.Get>(details::kPlaces).size()); -} - -static std::unordered_set ReaderOpSet() { - return {"create_py_reader"}; -} - -class InitReaderDeviceCountPass : public Pass { - protected: - void ApplyImpl(Graph *graph) const override { - using QueueHolder = - operators::reader::OrderedMultiDeviceLoDTensorBlockingQueueHolder; - - auto reader_ops = ReaderOpSet(); - auto dev_cnt = GetDeviceCountFromPassAttr(*this); - const auto &scope = Get(details::kGlobalScope); - for (auto &node : graph->Nodes()) { - if (node->IsOp() && node->Op() && - reader_ops.count(node->Op()->Type()) != 0) { - auto queue_name = node->Op()->Input("blocking_queue")[0]; - auto var = scope.FindVar(queue_name); - if (var && var->IsType()) { - VLOG(10) << "Set device count of " << queue_name << " to be " - << dev_cnt; - var->GetMutable()->GetQueue()->SetDeviceCount(dev_cnt); - } - } - } - } -}; - -class SetReaderDeviceIndexPass : public Pass { - protected: - void ApplyImpl(Graph *graph) const override { - auto dev_cnt = GetDeviceCountFromPassAttr(*this); - auto reader_ops = ReaderOpSet(); - size_t found_op_num = 0; - - for (auto &node : graph->Nodes()) { - if (node->IsOp() && node->Op() && - reader_ops.count(node->Op()->Type()) != 0) { - auto &op_handle = dynamic_cast( - node->Wrapper()); - auto *op_desc = node->Op(); - auto &op_base_attrs = - const_cast(op_handle.GetOp()->Attrs()); - int dev_idx = static_cast(op_handle.GetScopeIdx()); - - op_desc->SetAttr("device_index", dev_idx); - op_desc->SetAttr("device_count", dev_cnt); - - op_base_attrs["device_index"] = dev_idx; - op_base_attrs["device_count"] = dev_cnt; - - ++found_op_num; - VLOG(10) << "Found op " << op_desc->Type() << " on device " << dev_idx; - } - } - - VLOG(10) << "Found op number " << found_op_num; - } -}; - -} // namespace ir -} // namespace framework -} // namespace paddle - -REGISTER_PASS(init_reader_device_count_pass, - paddle::framework::ir::InitReaderDeviceCountPass) - .RequirePassAttr(paddle::framework::details::kGlobalScope) - .RequirePassAttr(paddle::framework::details::kPlaces); - -REGISTER_PASS(set_reader_device_index_pass, - paddle::framework::ir::SetReaderDeviceIndexPass) - .RequirePassAttr(paddle::framework::details::kPlaces); diff --git a/paddle/fluid/framework/ir/multi_devices_graph_pass/set_reader_device_info_utils.cc b/paddle/fluid/framework/ir/multi_devices_graph_pass/set_reader_device_info_utils.cc new file mode 100644 index 0000000000..19d95190c6 --- /dev/null +++ b/paddle/fluid/framework/ir/multi_devices_graph_pass/set_reader_device_info_utils.cc @@ -0,0 +1,87 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/framework/ir/multi_devices_graph_pass/set_reader_device_info_utils.h" +#include +#include +#include +#include "paddle/fluid/framework/details/computation_op_handle.h" +#include "paddle/fluid/framework/details/multi_devices_helper.h" +#include "paddle/fluid/framework/ir/graph.h" +#include "paddle/fluid/framework/ir/pass.h" +#include "paddle/fluid/operators/reader/lod_tensor_blocking_queue.h" + +namespace paddle { +namespace framework { +namespace ir { + +static std::unordered_set ReaderOpSet() { + return {"create_py_reader"}; +} + +void InitReaderQueueDeviceCount(Graph *graph, const Scope &scope, + size_t dev_cnt) { + using QueueHolder = + operators::reader::OrderedMultiDeviceLoDTensorBlockingQueueHolder; + + auto reader_ops = ReaderOpSet(); + for (auto &node : graph->Nodes()) { + if (node->IsOp() && node->Op() && + reader_ops.count(node->Op()->Type()) != 0) { + auto queue_name = node->Op()->Input("blocking_queue")[0]; + auto var = scope.FindVar(queue_name); + if (var && var->IsType()) { + VLOG(10) << "Set device count of " << queue_name << " to be " + << dev_cnt; + var->GetMutable()->GetQueue()->SetDeviceCount(dev_cnt); + } + } + } +} + +void SetReaderOpDeviceInfo(Graph *graph, size_t dev_cnt, size_t dev_idx) { + auto reader_ops = ReaderOpSet(); + size_t found_op_num = 0; + + for (auto &node : graph->Nodes()) { + if (node->IsOp() && node->Op() && + reader_ops.count(node->Op()->Type()) != 0) { + auto &op_handle = dynamic_cast( + node->Wrapper()); + auto *op_desc = node->Op(); + auto &op_base_attrs = + const_cast(op_handle.GetOp()->Attrs()); + int actual_dev_idx = static_cast(op_handle.GetScopeIdx()); + if (dev_idx != -1UL) { + actual_dev_idx = static_cast(dev_idx); + } + + op_desc->SetAttr("device_index", actual_dev_idx); + op_desc->SetAttr("device_count", static_cast(dev_cnt)); + + op_base_attrs["device_index"] = actual_dev_idx; + op_base_attrs["device_count"] = static_cast(dev_cnt); + + ++found_op_num; + VLOG(10) << "Found op " << op_desc->Type() << " on device " + << actual_dev_idx; + } + } + + VLOG(10) << "Found op number " << found_op_num; +} + +} // namespace ir +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/ir/multi_devices_graph_pass/set_reader_device_info_utils.h b/paddle/fluid/framework/ir/multi_devices_graph_pass/set_reader_device_info_utils.h new file mode 100644 index 0000000000..0038790cae --- /dev/null +++ b/paddle/fluid/framework/ir/multi_devices_graph_pass/set_reader_device_info_utils.h @@ -0,0 +1,31 @@ +// Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "paddle/fluid/framework/ir/graph.h" +#include "paddle/fluid/framework/scope.h" + +namespace paddle { +namespace framework { +namespace ir { + +void InitReaderQueueDeviceCount(Graph *graph, const Scope &scope, + size_t dev_cnt); + +void SetReaderOpDeviceInfo(Graph *graph, size_t dev_cnt, size_t dev_idx = -1UL); + +} // namespace ir +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 6be9c5b4b7..738a636569 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -30,6 +30,7 @@ limitations under the License. */ #include "paddle/fluid/framework/ir/graph_helper.h" #include "paddle/fluid/framework/ir/memory_optimize_pass/memory_optimization_var_info.h" #include "paddle/fluid/framework/ir/memory_optimize_pass/reference_count_pass_helper.h" +#include "paddle/fluid/framework/ir/multi_devices_graph_pass/set_reader_device_info_utils.h" #include "paddle/fluid/platform/profiler.h" DECLARE_double(eager_delete_tensor_gb); @@ -81,15 +82,6 @@ class ParallelExecutorPrivate { } } - void InitReaderDeviceCount(ir::Graph *graph) const { - auto pass = - ir::PassRegistry::Instance().Get("init_reader_device_count_pass"); - pass->SetNotOwned(details::kGlobalScope, global_scope_); - pass->SetNotOwned>(details::kPlaces, - &places_); - pass->Apply(graph); - } - void SetHasFeed(size_t dev_idx, bool has_feed = true); bool AllowPartialFeed() const; @@ -456,7 +448,8 @@ ParallelExecutor::ParallelExecutor(const std::vector &places, const BuildStrategy &build_strategy, ir::Graph *graph) : member_(new ParallelExecutorPrivate(places, scope)) { - member_->InitReaderDeviceCount(graph); + ir::InitReaderQueueDeviceCount(graph, *(member_->global_scope_), + member_->places_.size()); member_->use_cuda_ = exec_strategy.use_cuda_; member_->build_strategy_ = build_strategy; member_->use_all_reduce_ = member_->build_strategy_.reduce_ == @@ -733,6 +726,14 @@ ParallelExecutor::ParallelExecutor(const std::vector &places, op->SetLocalExecScopes(scope_map); } } + + if (final_graphs.size() == 1) { + ir::SetReaderOpDeviceInfo(final_graphs[0], member_->places_.size()); + } else { + for (size_t i = 0; i < final_graphs.size(); ++i) { + ir::SetReaderOpDeviceInfo(final_graphs[i], member_->places_.size(), i); + } + } } void ParallelExecutor::BCastParamsToDevices( @@ -1061,4 +1062,3 @@ USE_PASS(reference_count_pass); USE_PASS(eager_deletion_pass); USE_PASS(buffer_shared_inplace_pass); USE_PASS(buffer_shared_cross_op_memory_reuse_pass); -USE_PASS(init_reader_device_count_pass); diff --git a/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py b/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py index abc463a0fb..1269735049 100644 --- a/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_async_ssa_graph_executor_mnist.py @@ -32,12 +32,11 @@ def convolutional_neural_network(use_py_reader): py_reader = None if use_py_reader: - py_reader = fluid.layers.create_py_reader_by_data( + py_reader = fluid.io.DataLoader.from_generator( capacity=64, feed_list=[img, label], - name='py_reader', + iterable=False, use_double_buffer=False) - img, label = fluid.layers.read_file(py_reader) conv_pool_1 = fluid.nets.simple_img_conv_pool( input=img, @@ -144,7 +143,7 @@ def train(use_cuda, thread_num, cpu_num): exec_strategy=exec_strategy) print("declare parallel executor done.") - py_reader.decorate_paddle_reader(train_reader) + py_reader.set_sample_list_generator(train_reader) for pass_id in range(2): step = 0 -- GitLab