提交 a0bb18be 编写于 作者: Q Qiao Longfei

Merge branch 'add-async-ssa-graph-executor' of...

Merge branch 'add-async-ssa-graph-executor' of ssh://github.com/jacquesqiao/Paddle into add-async-ssa-graph-executor-communicator
......@@ -11,7 +11,7 @@ paddle.fluid.default_main_program (ArgSpec(args=[], varargs=None, keywords=None,
paddle.fluid.program_guard (ArgSpec(args=['main_program', 'startup_program'], varargs=None, keywords=None, defaults=(None,)), ('document', 'b54f403e57825a1592aece03afe3afb6'))
paddle.fluid.name_scope (ArgSpec(args=['prefix'], varargs=None, keywords=None, defaults=(None,)), ('document', '0ef753f5cec69fef9ae6ad8b867b33a2'))
paddle.fluid.Executor.__init__ (ArgSpec(args=['self', 'place'], varargs=None, keywords=None, defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754'))
paddle.fluid.Executor.close (ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None), ('document', '78e512cabeda9c7f42cb7c7e88967ae7'))
paddle.fluid.Executor.close (ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None), ('document', 'f5369953dd0c443961cf79f7a00e1a03'))
paddle.fluid.Executor.run (ArgSpec(args=['self', 'program', 'feed', 'fetch_list', 'feed_var_name', 'fetch_var_name', 'scope', 'return_numpy', 'use_program_cache'], varargs=None, keywords=None, defaults=(None, None, None, 'feed', 'fetch', None, True, False)), ('document', 'aba8093edebf2d5c869b735b92811e45'))
paddle.fluid.global_scope (ArgSpec(args=[], varargs=None, keywords=None, defaults=None), ('document', 'e148d3ab1ed8edf3e928212a375959c0'))
paddle.fluid.scope_guard (ArgSpec(args=['scope'], varargs=None, keywords=None, defaults=None), ('document', 'b94d1f6bcc29c4fb58fc0058561250c2'))
......
......@@ -14,7 +14,9 @@
#pragma once
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "ThreadPool.h"
......
......@@ -16,6 +16,7 @@ limitations under the License. */
#include <glog/logging.h>
#include <memory>
#include <utility>
#include "paddle/fluid/framework/details/memory_optimize_helper.h"
#include "paddle/fluid/framework/details/multi_devices_graph_pass.h"
......
......@@ -14,6 +14,7 @@
#pragma once
#include <memory>
#include <string>
#include <vector>
......
......@@ -14,6 +14,7 @@
#pragma once
#include <memory>
#include <string>
#include "glog/logging.h"
......
......@@ -13,7 +13,10 @@
// limitations under the License.
#include <algorithm>
#include <fstream>
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
......@@ -167,10 +170,6 @@ std::unique_ptr<ir::Graph> MultiDevSSAGraphBuilderBase::ApplyImpl(
bool is_forwarding = true;
bool insert_collection_ops = NeedCollectiveOps();
if (strategy_.async_mode_) {
// async mode did not need to merge gradient
insert_collection_ops = false;
}
for (ir::Node *node : sorted_ops) {
if (DealWithSpecialOp(&result, node)) {
......@@ -749,10 +748,6 @@ bool DistSSAGraphBuilder::DealWithSpecialOp(ir::Graph *result,
ir::Node *node) const {
bool insert_op = false;
if (OpHaveRole(*node, OpRole::kRPC)) {
// in async_mode, each graph will send it's own gradient.
if (strategy_.async_mode_ && node->Op()->Type() == "send") {
return false;
}
int op_dev_id = CreateRPCOp(result, node);
PADDLE_ENFORCE(op_dev_id != -1,
"Can not schedule the RPC operator to the right place.");
......@@ -768,11 +763,6 @@ bool DistSSAGraphBuilder::DealWithSpecialOp(ir::Graph *result,
insert_op = true;
need_broadcast_var_ = true;
} else if (OpHaveRole(*node, OpRole::kDist)) {
// in async_mode, each graph will send it's own gradient, do not need to
// merge gradient.
if (strategy_.async_mode_ && node->Op()->Type() != "concat") {
return false;
}
int op_dev_id = CreateDistTrainOp(result, node);
if (node->Op()->Type() == "concat") {
// the input(block of parameter) of concat is on different device,
......@@ -844,7 +834,7 @@ int DistSSAGraphBuilder::CreateRPCOp(ir::Graph *result, ir::Node *node) const {
}
auto recv_param_grad = boost::get<std::vector<std::string>>(
node->Op()->GetAttr(OpProtoAndCheckerMaker::OpRoleVarAttrName()));
if (recv_param_grad.size() == 2U && !strategy_.async_mode_) {
if (recv_param_grad.size() == 2U) {
op_dev_id = GetVarDeviceID(recv_param_grad[1]);
VLOG(10) << "recv param " << recv_param_grad[0]
<< " get grad place: " << recv_param_grad[1]
......
......@@ -14,7 +14,10 @@
#pragma once
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
......
......@@ -16,7 +16,9 @@
#include <deque>
#include <list>
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
......
......@@ -16,6 +16,7 @@ limitations under the License. */
#include <algorithm>
#include <string>
#include <tuple>
#include <utility>
#include <vector>
#include "paddle/fluid/framework/ir/graph_helper.h"
......
......@@ -14,6 +14,7 @@ limitations under the License. */
#pragma once
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
......
......@@ -16,6 +16,7 @@
#include <memory>
#include <unordered_set>
#include <utility>
#include <vector>
#include "paddle/fluid/framework/ddim.h"
......
......@@ -127,6 +127,12 @@ void Conv2DTransposeOpMaker::Make() {
"output feature channels,"
"H is the height of the filter, and W is the width of the filter. "
"We enforce groups number == 1 in the convolution transpose scenario.");
AddInput("Bias",
"(Tensor) Bias to be added to each output of filter application."
"The format of output tensor is X (one-dimensional) of size equal"
"to the number of output channels. Only used with MKL-DNN.")
.AsDispensable();
AddOutput("Output",
"(Tensor) The output tensor of convolution transpose operator. "
"The format of output tensor is also NCHW.");
......
......@@ -16,6 +16,7 @@
#include <condition_variable> // NOLINT
#include <deque>
#include <utility>
#include "paddle/fluid/platform/enforce.h"
......
......@@ -15,6 +15,7 @@
#pragma once
#include <memory>
#include <utility>
#include <vector>
#include "paddle/fluid/framework/ddim.h"
......
......@@ -104,6 +104,7 @@ class ParallelExecutor(object):
main_program = main_program if main_program is not None \
else framework.default_main_program()
self._compiled_program = compiler.CompiledProgram(main_program)
self._compiled_program.with_data_parallel(
loss_name=loss_name,
......
......@@ -15,36 +15,22 @@
from __future__ import print_function
import unittest
import numpy as np
import paddle.fluid.core as core
from paddle.fluid.tests.unittests.op_test import OpTest
from paddle.fluid.tests.unittests.test_conv2d_transpose_op import TestConv2dTransposeOp, TestWithPad, TestWithStride
from paddle.fluid.tests.unittests.test_conv2d_transpose_op import conv2dtranspose_forward_naive, TestConv2dTransposeOp
class TestMKLDNN(TestConv2dTransposeOp):
def init_op_type(self):
self.is_test = True
self.use_mkldnn = True
self.data_format = "NCHW"
self.op_type = "conv2d_transpose"
self._cpu_only = True
def test_check_grad(self):
return
def conv2d_bias_naive(out, bias):
_, out_c, _, _ = out.shape
def test_check_grad_no_input(self):
return
def test_check_grad_no_filter(self):
return
for l in range(out_c):
out[:, l, :, :] = out[:, l, :, :] + bias[l]
return out
class TestMKLDNNWithPad(TestWithPad):
def init_op_type(self):
self.is_test = True
self.use_mkldnn = True
self.data_format = "NCHW"
self.op_type = "conv2d_transpose"
self._cpu_only = True
class TestConv2dTransposeMKLDNNOp(TestConv2dTransposeOp):
def test_check_grad(self):
return
......@@ -54,24 +40,64 @@ class TestMKLDNNWithPad(TestWithPad):
def test_check_grad_no_filter(self):
return
class TestMKLDNNWithStride(TestWithStride):
def init_op_type(self):
self.is_test = True
self.use_mkldnn = True
self.data_format = "NCHW"
self.op_type = "conv2d_transpose"
self._cpu_only = True
def test_check_grad(self):
return
def test_check_grad_no_input(self):
return
def test_check_grad_no_filter(self):
return
if __name__ == '__main__':
unittest.main()
def init_test_case(self):
self.use_mkldnn = True
self.is_test = True
self.pad = [0, 0]
self.fuse_bias = False
self.bias_size = None
self.fuse_relu = False
self.stride = [1, 1]
self.dilations = [1, 1]
self.input_size = [2, 3, 5, 5] # NCHW
f_c = self.input_size[1]
self.filter_size = [f_c, 6, 3, 3]
self.groups = 1
def setUp(self):
TestConv2dTransposeOp.setUp(self)
output = self.outputs['Output']
if self.fuse_bias and self.bias_size is not None:
bias = np.random.random(self.bias_size).astype(self.dtype)
output = conv2d_bias_naive(output, bias)
output = output.astype(self.dtype)
self.attrs['fuse_bias'] = self.fuse_bias
self.inputs['Bias'] = OpTest.np_dtype_to_fluid_dtype(bias)
if self.fuse_relu:
output = np.maximum(output, 0).astype(self.dtype)
self.attrs['fuse_bias'] = self.fuse_bias
self.attrs['fuse_relu'] = self.fuse_relu
self.outputs['Output'] = output
class TestMKLDNNFuseBias(TestConv2dTransposeMKLDNNOp):
def init_test_case(self):
TestConv2dTransposeMKLDNNOp.init_test_case(self)
self.pad = [1, 1]
self.fuse_bias = True
self.bias_size = [6]
class TestMKLDNNWithPad(TestConv2dTransposeMKLDNNOp):
def init_test_case(self):
TestConv2dTransposeMKLDNNOp.init_test_case(self)
self.pad = [1, 1]
self.input_size = [2, 3, 10, 10]
class TestMKLDNNWithStride(TestConv2dTransposeMKLDNNOp):
def init_test_case(self):
TestConv2dTransposeMKLDNNOp.init_test_case(self)
self.pad = [1, 1]
self.stride = [2, 2]
self.input_size = [2, 3, 6, 6] # NCHW
......@@ -178,8 +178,8 @@ class TestAsyncSSAGraphExecutor(unittest.TestCase):
main_program=fluid.Program(),
startup_program=fluid.Program()):
test()
assert int(step_list[0] / 2) == int(step_list[1])
assert int(step_list[1] / 2) == int(step_list[2])
assert abs(int(step_list[0] / 2) - int(step_list[1])) < 5
assert abs(int(step_list[1] / 2) - int(step_list[2])) < 5
if __name__ == "__main__":
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册