未验证 提交 29337f4e 编写于 作者: Z Zeng Jinle 提交者: GitHub

fix conflict of inferne partial feed with gpu parallel ssa graph executor, test=develop (#23400)

上级 8cf2c38c
...@@ -105,38 +105,36 @@ static size_t GetUniqueDeviceIdOfOp(const details::OpHandleBase &op) { ...@@ -105,38 +105,36 @@ static size_t GetUniqueDeviceIdOfOp(const details::OpHandleBase &op) {
return dev_idx; return dev_idx;
} }
/** static bool IsDataParallelInferenceGraphImpl(
* This function tries to separate the original graph into multiple graphs, in const ir::Graph &graph,
* which each graph would only run on single device. This is usually used to std::unordered_map<details::OpHandleBase *, size_t> *p_op_to_dev_idx,
* separate a data-parallel inference graph to multiple graphs on each device. size_t *p_place_num) {
* auto &place_num = *p_place_num;
* The graph can be separated into multiple single device graphs if and only if: auto &op_to_dev_idx = *p_op_to_dev_idx;
*
* - the graph does not contain any ops related to multi-devices communication, auto clear_result = [&] {
* such as allreduce, send, recv, sync_batch_norm, etc. place_num = 0;
* op_to_dev_idx.clear();
* - ops on different devices do not depend on each other. That is to say, the return false;
* graph has several disconnected sub-graphs. };
*/
std::vector<std::unique_ptr<ir::Graph>> TrySeparateToMultipleSingleDeviceGraphs( clear_result();
ir::Graph *graph) {
// If sub-block contains multi-devices ops, we cannot separate // If sub-block contains multi-devices ops, we cannot separate
if (ContainMultiDeviceOp(graph->OriginProgram(), 1)) { if (ContainMultiDeviceOp(graph.OriginProgram(), 1)) {
return {}; return clear_result();
} }
size_t place_num = 0; auto op_handles = ir::FilterByNodeWrapper<OpHandleBase>(graph);
auto op_handles = ir::FilterByNodeWrapper<OpHandleBase>(*graph);
if (op_handles.empty()) { if (op_handles.empty()) {
return {}; return clear_result();
} }
std::unordered_map<details::OpHandleBase *, size_t> op_to_dev_idx;
for (auto &op : op_handles) { for (auto &op : op_handles) {
auto dev_idx = GetUniqueDeviceIdOfOp(*op); auto dev_idx = GetUniqueDeviceIdOfOp(*op);
if (dev_idx == kUndefinedDevIdx) { if (dev_idx == kUndefinedDevIdx) {
VLOG(10) << "Op " << op->Name() << " is not determined"; VLOG(10) << "Op " << op->Name() << " is not determined";
return {}; return clear_result();
} }
place_num = std::max(place_num, dev_idx + 1); place_num = std::max(place_num, dev_idx + 1);
op_to_dev_idx[op] = dev_idx; op_to_dev_idx[op] = dev_idx;
...@@ -148,7 +146,7 @@ std::vector<std::unique_ptr<ir::Graph>> TrySeparateToMultipleSingleDeviceGraphs( ...@@ -148,7 +146,7 @@ std::vector<std::unique_ptr<ir::Graph>> TrySeparateToMultipleSingleDeviceGraphs(
if (in_var->GeneratedOp()) { if (in_var->GeneratedOp()) {
auto iter = op_to_dev_idx.find(in_var->GeneratedOp()); auto iter = op_to_dev_idx.find(in_var->GeneratedOp());
if (iter == op_to_dev_idx.end() || iter->second != dev_idx) { if (iter == op_to_dev_idx.end() || iter->second != dev_idx) {
return {}; return clear_result();
} }
} }
} }
...@@ -157,7 +155,7 @@ std::vector<std::unique_ptr<ir::Graph>> TrySeparateToMultipleSingleDeviceGraphs( ...@@ -157,7 +155,7 @@ std::vector<std::unique_ptr<ir::Graph>> TrySeparateToMultipleSingleDeviceGraphs(
for (auto &pending_op : out_var->PendingOps()) { for (auto &pending_op : out_var->PendingOps()) {
auto iter = op_to_dev_idx.find(pending_op); auto iter = op_to_dev_idx.find(pending_op);
if (iter == op_to_dev_idx.end() || iter->second != dev_idx) { if (iter == op_to_dev_idx.end() || iter->second != dev_idx) {
return {}; return clear_result();
} }
} }
} }
...@@ -171,6 +169,36 @@ std::vector<std::unique_ptr<ir::Graph>> TrySeparateToMultipleSingleDeviceGraphs( ...@@ -171,6 +169,36 @@ std::vector<std::unique_ptr<ir::Graph>> TrySeparateToMultipleSingleDeviceGraphs(
"issue at https://github.com/PaddlePaddle/Paddle/issues/new. And " "issue at https://github.com/PaddlePaddle/Paddle/issues/new. And "
"we will resolve it with high priority.")); "we will resolve it with high priority."));
return true;
}
bool IsDataParallelInferenceGraph(const ir::Graph &graph) {
size_t place_num;
std::unordered_map<details::OpHandleBase *, size_t> op_to_dev_idx;
return IsDataParallelInferenceGraphImpl(graph, &op_to_dev_idx, &place_num);
}
/**
* This function tries to separate the original graph into multiple graphs, in
* which each graph would only run on single device. This is usually used to
* separate a data-parallel inference graph to multiple graphs on each device.
*
* The graph can be separated into multiple single device graphs if and only if:
*
* - the graph does not contain any ops related to multi-devices communication,
* such as allreduce, send, recv, sync_batch_norm, etc.
*
* - ops on different devices do not depend on each other. That is to say, the
* graph has several disconnected sub-graphs.
*/
std::vector<std::unique_ptr<ir::Graph>> TrySeparateToMultipleSingleDeviceGraphs(
ir::Graph *graph) {
size_t place_num;
std::unordered_map<details::OpHandleBase *, size_t> op_to_dev_idx;
if (!IsDataParallelInferenceGraphImpl(*graph, &op_to_dev_idx, &place_num)) {
return {};
}
if (place_num == 1) { if (place_num == 1) {
return {}; return {};
} }
...@@ -182,8 +210,10 @@ std::vector<std::unique_ptr<ir::Graph>> TrySeparateToMultipleSingleDeviceGraphs( ...@@ -182,8 +210,10 @@ std::vector<std::unique_ptr<ir::Graph>> TrySeparateToMultipleSingleDeviceGraphs(
g->Set(kGraphDepVars, new GraphDepVars()); g->Set(kGraphDepVars, new GraphDepVars());
} }
for (auto &op : op_handles) { for (auto &pair : op_to_dev_idx) {
auto dev_idx = op_to_dev_idx.at(op); auto *op = pair.first;
auto dev_idx = pair.second;
auto *ret_graph = graphs[dev_idx].get(); auto *ret_graph = graphs[dev_idx].get();
auto &ret_vars = ret_graph->Get<GraphVars>(kGraphVars)[0]; auto &ret_vars = ret_graph->Get<GraphVars>(kGraphVars)[0];
auto &ret_dummy_vars = ret_graph->Get<GraphDepVars>(kGraphDepVars); auto &ret_dummy_vars = ret_graph->Get<GraphDepVars>(kGraphDepVars);
......
...@@ -101,6 +101,8 @@ inline std::vector<std::string> GetOpRoleVarsOrEmpty(const OpDesc &op) { ...@@ -101,6 +101,8 @@ inline std::vector<std::string> GetOpRoleVarsOrEmpty(const OpDesc &op) {
return boost::get<std::vector<std::string>>(iter->second); return boost::get<std::vector<std::string>>(iter->second);
} }
bool IsDataParallelInferenceGraph(const ir::Graph &graph);
std::vector<std::unique_ptr<ir::Graph>> TrySeparateToMultipleSingleDeviceGraphs( std::vector<std::unique_ptr<ir::Graph>> TrySeparateToMultipleSingleDeviceGraphs(
ir::Graph *graph); ir::Graph *graph);
......
...@@ -647,11 +647,22 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places, ...@@ -647,11 +647,22 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places,
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
// TODO(Yancey1989): Remove passing in the main_program when // TODO(Yancey1989): Remove passing in the main_program when
// allreduce_seq_pass doesn't need it as the attr. // allreduce_seq_pass doesn't need it as the attr.
bool is_inference = details::IsDataParallelInferenceGraph(*graph);
bool has_drop_last_read_op = details::HasDropLastReadOp(*graph);
auto *pg_exe = new details::ParallelSSAGraphExecutor( auto *pg_exe = new details::ParallelSSAGraphExecutor(
exec_strategy, member_->local_scopes_, member_->local_exec_scopes_, exec_strategy, member_->local_scopes_, member_->local_exec_scopes_,
member_->places_, graph); member_->places_, graph);
final_graphs = pg_exe->Graphs(); final_graphs = pg_exe->Graphs();
member_->executor_.reset(pg_exe); member_->executor_.reset(pg_exe);
if (is_inference && member_->places_.size() > 1) {
member_->inference_executor_ = pg_exe;
if (!has_drop_last_read_op) {
VLOG(5) << "Enable partial feed support in inference phase";
pg_exe->EnablePartialFeedSupport();
}
}
#else #else
PADDLE_THROW( PADDLE_THROW(
"Paddle should be compiled with CUDA for ParallelGraph Execution."); "Paddle should be compiled with CUDA for ParallelGraph Execution.");
......
...@@ -357,6 +357,7 @@ set_tests_properties(test_parallel_executor_test_while_train test_parallel_execu ...@@ -357,6 +357,7 @@ set_tests_properties(test_parallel_executor_test_while_train test_parallel_execu
test_optimizer_in_control_flow test_dataloader_keep_order test_optimizer_in_control_flow test_dataloader_keep_order
test_dataloader_unkeep_order test_dataloader_unkeep_order
test_parallel_executor_inference_feed_partial_data test_parallel_executor_inference_feed_partial_data
test_parallel_ssa_graph_inference_feed_partial_data
test_fetch_unmerged test_fetch_unmerged
test_buffer_shared_memory_reuse_pass PROPERTIES LABELS "RUN_TYPE=DIST") test_buffer_shared_memory_reuse_pass PROPERTIES LABELS "RUN_TYPE=DIST")
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid as fluid
import unittest
fluid.core.globals()['FLAGS_enable_parallel_graph'] = 1
from test_parallel_executor_inference_feed_partial_data import *
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册