未验证 提交 dd436156 编写于 作者: H Huihuang Zheng 提交者: GitHub

Add ParallelExecutor Test for Cond API and Fix PE Checks Shape Bug (#22029)

上级 95872494
......@@ -379,6 +379,8 @@ ir::Graph *ParallelExecutorPrivate::ApplyMemoryOptimizePass(ir::Graph *graph) {
return graph;
}
size_t ParallelExecutor::DeviceCount() const { return member_->places_.size(); }
std::vector<Scope *> &ParallelExecutor::GetLocalScopes() {
return member_->local_scopes_;
}
......
......@@ -58,6 +58,8 @@ class ParallelExecutor {
~ParallelExecutor();
size_t DeviceCount() const;
std::vector<Scope *> &GetLocalScopes();
void DropLocalExeScopes();
......
......@@ -2175,11 +2175,13 @@ All parameter, weight, gradient are variables in Paddle.
&ParallelExecutor::FeedTensorsIntoLocalScopes)
.def("feed_and_split_tensor_into_local_scopes",
&ParallelExecutor::FeedAndSplitTensorIntoLocalScopes)
.def("run", [](ParallelExecutor &self,
const std::vector<std::string> &fetch_tensors) {
pybind11::gil_scoped_release release;
return self.Run(fetch_tensors);
});
.def("run",
[](ParallelExecutor &self,
const std::vector<std::string> &fetch_tensors) {
pybind11::gil_scoped_release release;
return self.Run(fetch_tensors);
})
.def("device_count", &ParallelExecutor::DeviceCount);
BindFleetWrapper(&m);
BindBoxHelper(&m);
......
......@@ -193,7 +193,7 @@ def dimension_is_compatible_with(first, second):
return True
def check_feed_shape_type(var, feed):
def check_feed_shape_type(var, feed, num_places=1):
"""
Returns True if the variable doesn't require feed check or it is compatible
with the shape and have same dtype as the feeded value.
......@@ -207,6 +207,8 @@ def check_feed_shape_type(var, feed):
Args:
var (Variable): the Variable object
feed (LoDTensor): the feeded value, which must be a LoDTensor
num_places: an integer value indicating the number of places.
ParallelExecutor will divide data into devices (CPU/GPU) evenly.
Returns:
True if the shape and dtype of variable is compatible with the feed value
Raises:
......@@ -214,11 +216,18 @@ def check_feed_shape_type(var, feed):
the feed value
"""
if var.desc.need_check_feed():
if not dimension_is_compatible_with(feed.shape(), var.shape):
feed_shape = feed.shape()
if six.PY2:
feed_shape[0] = long(feed_shape[0] /
num_places) if len(feed.lod()) == 0 else -1
else:
feed_shape[0] = int(feed_shape[0] /
num_places) if len(feed.lod()) == 0 else -1
if not dimension_is_compatible_with(feed_shape, var.shape):
raise ValueError(
'The feeded Variable %r should have dimensions = %d, shape = '
'%r, but received feeded shape %r' %
(var.name, len(var.shape), var.shape, feed.shape()))
'%r, but received feeded shape %r on each device' %
(var.name, len(var.shape), var.shape, feed_shape))
if not dtype_is_compatible_with(feed._dtype(), var.dtype):
var_dtype_format = convert_dtype(var.dtype) if isinstance(
var.dtype, core.VarDesc.VarType) else var.dtype
......@@ -632,7 +641,7 @@ class Executor(object):
feed_tensor.set(feed[feed_name], core.CPUPlace())
if need_check_feed:
var = global_block.var(feed_name)
check_feed_shape_type(var, feed_tensor)
check_feed_shape_type(var, feed_tensor, exe.device_count())
feed_tensor_dict[feed_name] = feed_tensor
exe.feed_and_split_tensor_into_local_scopes(feed_tensor_dict)
......
......@@ -15,6 +15,7 @@
from __future__ import print_function
import numpy as np
import os
import unittest
import paddle.fluid as fluid
......@@ -22,7 +23,6 @@ import paddle.fluid.core as core
import paddle.fluid.layers as layers
import paddle.fluid.framework as framework
from paddle.fluid.backward import append_backward
from paddle.fluid.executor import Executor
from paddle.fluid.framework import Program, program_guard
from simple_nets import simple_fc_net_with_inputs, batchnorm_fc_with_inputs
......@@ -329,7 +329,7 @@ class TestCondNestedControlFlow(unittest.TestCase):
class TestCondBackward(unittest.TestCase):
def backward_value_helper(self, cond_func):
def backward_value_helper(self, cond_func, use_cuda, use_parallel_exe):
"""
Helper function that compares calculated backward value is close to dy/dx
"""
......@@ -344,43 +344,78 @@ class TestCondBackward(unittest.TestCase):
i = fluid.data(name="i", shape=[1], dtype='int32')
loss = cond_func(i, img, label)
append_backward(loss)
place = fluid.CUDAPlace(0) if core.is_compiled_with_cuda(
) else fluid.CPUPlace()
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(startup_program)
num_devices = 1
if use_parallel_exe:
os.environ['CPU_NUM'] = str(2)
exe = fluid.ParallelExecutor(
use_cuda=use_cuda,
main_program=main_program,
loss_name=loss.name)
num_devices = exe.device_count
delta = 0.005
for feed_i in range(0, 10):
feed_img = np.random.random(size=[1, 9]).astype(np.float32)
feed_label = np.random.randint(
low=0, high=10, size=[1, 1], dtype=np.int64)
img_grad, loss_value = exe.run(
main_program,
feed={
'i': np.full((1), feed_i, np.int32),
'image': feed_img,
'label': feed_label
},
fetch_list=[img.grad_name, loss.name])
if use_parallel_exe:
img_grad, loss_value = exe.run(
feed={
'i': np.full((num_devices), feed_i, np.int32),
'image': np.repeat(
feed_img, num_devices, axis=0),
'label': np.repeat(
feed_label, num_devices, axis=0)
},
fetch_list=[img.grad_name, loss.name])
else:
img_grad, loss_value = exe.run(
main_program,
feed={
'i': np.full((1), feed_i, np.int32),
'image': feed_img,
'label': feed_label
},
fetch_list=[img.grad_name, loss.name])
numerical_grad = np.zeros(shape=[1, 9], dtype=np.float32)
numerical_grad = np.zeros(shape=[num_devices, 9], dtype=np.float32)
feed_img_delta = np.copy(feed_img)
for j in range(9):
feed_img_delta[0][j] = feed_img[0][j] + delta
loss_delta = exe.run(main_program,
feed={
'i': np.full((1), feed_i, np.int32),
'image': feed_img_delta,
'label': feed_label
},
fetch_list=[loss.name])
numerical_grad[0][j] = (loss_delta[0] - loss_value[0]) / delta
if use_parallel_exe:
loss_delta = exe.run(feed={
'i': np.full((num_devices), feed_i, np.int32),
'image': np.repeat(
feed_img_delta, num_devices, axis=0),
'label': np.repeat(
feed_label, num_devices, axis=0)
},
fetch_list=[loss.name])
multi_device_grad = (
loss_delta[0] - loss_value[0]) / delta / num_devices
for d in range(num_devices):
numerical_grad[d][j] = multi_device_grad[d]
else:
loss_delta = exe.run(main_program,
feed={
'i': np.full((1), feed_i,
np.int32),
'image': feed_img_delta,
'label': feed_label
},
fetch_list=[loss.name])
numerical_grad[0][j] = (
loss_delta[0] - loss_value[0]) / delta
feed_img_delta[0][j] = feed_img[0][j]
self.assertTrue(
np.isclose(
img_grad, numerical_grad, atol=0.05, rtol=0.05).all())
def add_optimizer_helper(self, cond_func):
def add_optimizer_helper(self, cond_func, use_cuda, use_parallel_exe):
"""
Test that program is runnable when add optimizer
"""
......@@ -394,22 +429,38 @@ class TestCondBackward(unittest.TestCase):
optimizer = fluid.optimizer.SGD(learning_rate=0.1)
optimizer.minimize(loss)
place = fluid.CUDAPlace(0) if core.is_compiled_with_cuda(
) else fluid.CPUPlace()
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(startup_program)
if use_parallel_exe:
os.environ['CPU_NUM'] = str(2)
exe = fluid.ParallelExecutor(
use_cuda=use_cuda,
main_program=main_program,
loss_name=loss.name)
num_devices = exe.device_count
for feed_i in range(0, 10):
feed_img = np.random.random(size=[16, 784]).astype(np.float32)
feed_label = np.random.randint(
low=0, high=10, size=[16, 1], dtype=np.int64)
exe.run(main_program,
feed={
'i': np.full((1), feed_i, np.int32),
'image': feed_img,
'label': feed_label
},
fetch_list=[loss])
if use_parallel_exe:
exe.run(feed={
'i': np.full((num_devices), feed_i, np.int32),
'image': np.repeat(
feed_img, num_devices, axis=0),
'label': np.repeat(
feed_label, num_devices, axis=0)
},
fetch_list=[loss.name])
else:
exe.run(main_program,
feed={
'i': np.full((1), feed_i, np.int32),
'image': feed_img,
'label': feed_label
},
fetch_list=[loss])
def test_cond_backward(self):
def cond_func(i, img, label):
......@@ -418,8 +469,13 @@ class TestCondBackward(unittest.TestCase):
lambda: simple_fc_net_with_inputs(img, label, class_num=10),
lambda: batchnorm_fc_with_inputs(img, label, class_num=10))
self.backward_value_helper(cond_func)
self.add_optimizer_helper(cond_func)
for use_parallel_exe in [False, True]:
self.backward_value_helper(cond_func,
core.is_compiled_with_cuda(),
use_parallel_exe)
self.add_optimizer_helper(cond_func,
core.is_compiled_with_cuda(),
use_parallel_exe)
def test_half_nested_cond_backward(self):
def branch(i, img, label):
......@@ -434,10 +490,19 @@ class TestCondBackward(unittest.TestCase):
return layers.cond(i < 5, lambda: layers.mean(img),
lambda: branch(i, img, label))
self.backward_value_helper(cond_func_simple_net_at_true)
self.add_optimizer_helper(cond_func_simple_net_at_true)
self.backward_value_helper(cond_func_simple_net_at_false)
self.add_optimizer_helper(cond_func_simple_net_at_false)
for use_parallel_exe in [False, True]:
self.backward_value_helper(cond_func_simple_net_at_true,
core.is_compiled_with_cuda(),
use_parallel_exe)
self.add_optimizer_helper(cond_func_simple_net_at_true,
core.is_compiled_with_cuda(),
use_parallel_exe)
self.backward_value_helper(cond_func_simple_net_at_false,
core.is_compiled_with_cuda(),
use_parallel_exe)
self.add_optimizer_helper(cond_func_simple_net_at_false,
core.is_compiled_with_cuda(),
use_parallel_exe)
def test_nested_cond_backward(self):
def branch(i, img, label, mod_two):
......@@ -453,8 +518,13 @@ class TestCondBackward(unittest.TestCase):
return layers.cond(i < 5, lambda: branch(i, img, label, True),
lambda: branch(i, img, label, False))
self.backward_value_helper(cond_func)
self.add_optimizer_helper(cond_func)
for use_parallel_exe in [False, True]:
self.backward_value_helper(cond_func,
core.is_compiled_with_cuda(),
use_parallel_exe)
self.add_optimizer_helper(cond_func,
core.is_compiled_with_cuda(),
use_parallel_exe)
if __name__ == '__main__':
......
......@@ -36,17 +36,21 @@ class TestFeedData(unittest.TestCase):
def setUp(self):
self.hidden_sizes = [25, 20, 15]
self.base_batch_size = 10
self.data_batch_size = 10
self.class_num = 10
self.iterations = 5
def _get_batch_size(self, use_cuda, use_parallel_executor):
batch_size_times = 1
if use_parallel_executor:
batch_size_times = core.get_cuda_device_count(
) if use_cuda else int(
os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
return self.base_batch_size * batch_size_times
def _get_device_count(self, use_cuda):
return core.get_cuda_device_count() if use_cuda else int(
os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
def _get_feed_batch_size(self, use_cuda, use_parallel_executor):
"""
Returns actual feeded data size. We should multiple the number of
devices when it is using ParallelExecutor
"""
return self.data_batch_size * self._get_device_count(
use_cuda) if use_parallel_executor else self.data_batch_size
def _simple_fc_net(self, in_size, label_size, class_num, hidden_sizes):
in_data = fluid.data(name="data", dtype='float32', shape=in_size)
......@@ -79,18 +83,17 @@ class TestFeedData(unittest.TestCase):
use_parallel_executor)
self._test_feed_data_contains_neg_one(use_cuda,
use_parallel_executor)
self._test_feed_lod_tensor(use_cuda, use_parallel_executor)
# Test exception message when feeding with error
batch_size = self._get_batch_size(use_cuda,
use_parallel_executor)
if six.PY2:
in_shape_tuple = (long(-1), long(3), long(4), long(8))
feed_shape_list = [
long(batch_size), long(3), long(4), long(5)
error_shape_list = [
long(self.data_batch_size), long(3), long(4), long(5)
]
else:
in_shape_tuple = (-1, 3, 4, 8)
feed_shape_list = [batch_size, 3, 4, 5]
error_shape_list = [self.data_batch_size, 3, 4, 5]
with self.assertRaises(ValueError) as shape_mismatch_err:
self._test_feed_data_shape_mismatch(use_cuda,
......@@ -98,9 +101,9 @@ class TestFeedData(unittest.TestCase):
self.assertEqual(
str(shape_mismatch_err.exception),
"The feeded Variable %r should have dimensions = %r, "
"shape = %r, but received feeded shape %r" %
"shape = %r, but received feeded shape %r on each device" %
(u'data', len(in_shape_tuple), in_shape_tuple,
feed_shape_list))
error_shape_list))
with self.assertRaises(ValueError) as dtype_mismatch_err:
self._test_feed_data_dtype_mismatch(use_cuda,
......@@ -111,18 +114,20 @@ class TestFeedData(unittest.TestCase):
"received 'float64'" % (u'label'))
def _test_feed_data_dtype_mismatch(self, use_cuda, use_parallel_executor):
batch_size = self._get_batch_size(use_cuda, use_parallel_executor)
in_size = [batch_size, 3, 4, 5]
feed_batch_size = self._get_feed_batch_size(use_cuda,
use_parallel_executor)
in_size = [self.data_batch_size, 3, 4, 5]
feed_in_data = np.random.uniform(
size=[batch_size, 3, 4, 5]).astype(np.float32)
label_size = [batch_size, 1]
size=[feed_batch_size, 3, 4, 5]).astype(np.float32)
label_size = [self.data_batch_size, 1]
feed_label = np.random.randint(
low=0, high=self.class_num, size=[batch_size, 1]).astype(np.float64)
low=0, high=self.class_num,
size=[feed_batch_size, 1]).astype(np.float64)
self._feed_data_in_executor(in_size, label_size, feed_in_data,
feed_label, use_cuda, use_parallel_executor)
def _test_feed_data_shape_mismatch(self, use_cuda, use_parallel_executor):
batch_size = self._get_batch_size(use_cuda, use_parallel_executor)
batch_size = self._get_feed_batch_size(use_cuda, use_parallel_executor)
in_size = [None, 3, 4, 8]
feed_in_data = np.random.uniform(
size=[batch_size, 3, 4, 5]).astype(np.float32)
......@@ -133,7 +138,7 @@ class TestFeedData(unittest.TestCase):
feed_label, use_cuda, use_parallel_executor)
def _test_feed_data_contains_neg_one(self, use_cuda, use_parallel_executor):
batch_size = self._get_batch_size(use_cuda, use_parallel_executor)
batch_size = self._get_feed_batch_size(use_cuda, use_parallel_executor)
in_size = [-1, 3, 4, 5]
feed_in_data = np.random.uniform(
size=[batch_size, 3, 4, 5]).astype(np.float32)
......@@ -144,15 +149,43 @@ class TestFeedData(unittest.TestCase):
feed_label, use_cuda, use_parallel_executor)
def _test_feed_data_match_shape_type(self, use_cuda, use_parallel_executor):
batch_size = self._get_batch_size(use_cuda, use_parallel_executor)
in_size = [batch_size, 3, 4, 5]
feed_in_data = np.random.uniform(size=in_size).astype(np.float32)
label_size = [batch_size, 1]
feed_batch_size = self._get_feed_batch_size(use_cuda,
use_parallel_executor)
in_size = [self.data_batch_size, 3, 4, 5]
feed_in_data = np.random.uniform(
size=[feed_batch_size, 3, 4, 5]).astype(np.float32)
label_size = [self.data_batch_size, 1]
feed_label = np.random.randint(
low=0, high=self.class_num, size=label_size).astype(np.int64)
low=0, high=self.class_num,
size=[feed_batch_size, 1]).astype(np.int64)
self._feed_data_in_executor(in_size, label_size, feed_in_data,
feed_label, use_cuda, use_parallel_executor)
def _test_feed_lod_tensor(self, use_cuda, use_parallel_executor):
device_count = self._get_device_count(use_cuda)
in_size = [device_count, 3, 4, 5]
sequence_lengths = [range(1, device_count + 1)]
# sum from 1 to device_count
sum_length = int((device_count + 1) * device_count / 2)
feed_in_data = np.random.uniform(
size=[sum_length, 3, 4, 5]).astype(np.float32)
feed_data_tensor = fluid.LoDTensor()
feed_data_tensor.set(feed_in_data, fluid.CPUPlace())
feed_data_tensor.set_recursive_sequence_lengths(sequence_lengths)
label_size = [device_count, 1]
feed_label_tensor = fluid.LoDTensor()
feed_label = np.random.randint(
low=0, high=self.class_num, size=[sum_length, 1]).astype(np.int64)
feed_label_tensor.set(feed_label, fluid.CPUPlace())
feed_label_tensor.set_recursive_sequence_lengths(sequence_lengths)
self._feed_data_in_executor(in_size, label_size, feed_data_tensor,
feed_label_tensor, use_cuda,
use_parallel_executor)
def _feed_data_in_executor(self, in_size, label_size, feed_in_data,
feed_label, use_cuda, use_parallel_executor):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册