提交 203027ca 编写于 作者: S sneaxiy

test=develop

上级 cf1944af
......@@ -69,7 +69,7 @@ struct BuildStrategy {
bool enable_data_balance_{false};
bool enable_sequential_execution_{true};
bool enable_sequential_execution_{false};
bool fuse_broadcast_op_{false};
......
......@@ -13,6 +13,7 @@
// limitations under the License.
#include "paddle/fluid/framework/details/sequential_execution_pass.h"
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>
......@@ -29,6 +30,15 @@ static bool IsSameOpDesc(OpDesc *op1, OpDesc *op2) {
std::unique_ptr<ir::Graph> SequentialExecutionPass::ApplyImpl(
std::unique_ptr<ir::Graph> graph) const {
// FIXME(zjl): Insert dependencies between some distributed ops may cause
// the multi_devices_graph_pass fails. So we skip these ops here.
// Indeed, maybe we should not insert dependencies between these ops
// casually, which may cause deadlock easily.
// We should add more skipped distributed ops when found errors in
// multi_devices_graph_pass
static std::unordered_set<std::string> skip_dist_ops{
"send", "recv", "send_barrier", "fetch_barrier"};
auto &ops = Get<const std::vector<OpDesc *>>(kAllOpDescs);
std::vector<ir::Node *> op_node_list;
op_node_list.reserve(ops.size());
......@@ -73,7 +83,9 @@ std::unique_ptr<ir::Graph> SequentialExecutionPass::ApplyImpl(
}
}
ready_ops.erase(found_node);
op_node_list.push_back(found_node);
if (skip_dist_ops.count(op_desc->Type()) == 0) {
op_node_list.push_back(found_node);
}
}
for (size_t i = 1; i < op_node_list.size(); ++i) {
......
......@@ -40,7 +40,8 @@ class TestParallelExecutorBase(unittest.TestCase):
use_reduce=False,
fuse_elewise_add_act_ops=False,
optimizer=fluid.optimizer.Adam,
use_fast_executor=False):
use_fast_executor=False,
enable_sequential_execution=False):
def run_executor(exe, feed, fetch_list, program=None):
if isinstance(exe, fluid.ParallelExecutor):
res = exe.run(fetch_list=fetch_list, feed=feed)
......@@ -80,6 +81,7 @@ class TestParallelExecutorBase(unittest.TestCase):
build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce \
if use_reduce else fluid.BuildStrategy.ReduceStrategy.AllReduce
build_strategy.fuse_elewise_add_act_ops = fuse_elewise_add_act_ops
build_strategy.enable_sequential_execution = enable_sequential_execution
if use_parallel_executor:
exe = fluid.ParallelExecutor(
......
......@@ -232,6 +232,46 @@ class TestResnet(TestParallelExecutorBase):
for loss in zip(all_reduce_last_loss, reduce_last_loss):
self.assertAlmostEquals(loss[0], loss[1], delta=delta2)
if not use_cuda:
return
all_reduce_first_loss_seq, all_reduce_last_loss_seq = self.check_network_convergence(
model,
feed_dict={"image": img,
"label": label},
iter=iter,
batch_size=batch_size,
use_cuda=use_cuda,
use_reduce=False,
optimizer=optimizer,
enable_sequential_execution=True)
reduce_first_loss_seq, reduce_last_loss_seq = self.check_network_convergence(
model,
feed_dict={"image": img,
"label": label},
iter=iter,
batch_size=batch_size,
use_cuda=use_cuda,
use_reduce=True,
optimizer=optimizer,
enable_sequential_execution=True)
for loss in zip(all_reduce_first_loss, all_reduce_first_loss_seq):
self.assertAlmostEquals(loss[0], loss[1], delta=1e-6)
for loss in zip(all_reduce_last_loss, all_reduce_last_loss_seq):
self.assertAlmostEquals(loss[0], loss[1], delta=delta2)
for loss in zip(reduce_first_loss, reduce_first_loss_seq):
self.assertAlmostEquals(loss[0], loss[1], delta=1e-6)
for loss in zip(reduce_last_loss, reduce_last_loss_seq):
self.assertAlmostEquals(loss[0], loss[1], delta=delta2)
for loss in zip(all_reduce_first_loss_seq, reduce_first_loss_seq):
self.assertAlmostEquals(loss[0], loss[1], delta=1e-6)
for loss in zip(all_reduce_last_loss_seq, reduce_last_loss_seq):
self.assertAlmostEquals(loss[0], loss[1], delta=delta2)
def _check_resnet_convergence(self,
model,
use_cuda=True,
......
......@@ -173,6 +173,8 @@ class TestTransformer(TestParallelExecutorBase):
def test_main(self):
if core.is_compiled_with_cuda():
self.check_network_convergence(transformer, use_cuda=True)
self.check_network_convergence(
transformer, use_cuda=True, enable_sequential_execution=True)
self.check_network_convergence(transformer, use_cuda=False, iter=5)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册