diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index b6bc41f0b58fd8205331724941eae23b745f1104..7dc13bae4b8faa6e48633b9c74f4b1883789be35 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -379,6 +379,8 @@ ir::Graph *ParallelExecutorPrivate::ApplyMemoryOptimizePass(ir::Graph *graph) { return graph; } +size_t ParallelExecutor::DeviceCount() const { return member_->places_.size(); } + std::vector &ParallelExecutor::GetLocalScopes() { return member_->local_scopes_; } diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index 00ac5e134db91836d499cac765d606a19fe0f954..612a58de0778dac4e044e40577c43db3c9e52f6c 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -58,6 +58,8 @@ class ParallelExecutor { ~ParallelExecutor(); + size_t DeviceCount() const; + std::vector &GetLocalScopes(); void DropLocalExeScopes(); diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index a51bff087436988e98b166171bd931e3bdc039a1..77c35466ed9de0412af08bdb80626c72c5b2d4eb 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -2175,11 +2175,13 @@ All parameter, weight, gradient are variables in Paddle. &ParallelExecutor::FeedTensorsIntoLocalScopes) .def("feed_and_split_tensor_into_local_scopes", &ParallelExecutor::FeedAndSplitTensorIntoLocalScopes) - .def("run", [](ParallelExecutor &self, - const std::vector &fetch_tensors) { - pybind11::gil_scoped_release release; - return self.Run(fetch_tensors); - }); + .def("run", + [](ParallelExecutor &self, + const std::vector &fetch_tensors) { + pybind11::gil_scoped_release release; + return self.Run(fetch_tensors); + }) + .def("device_count", &ParallelExecutor::DeviceCount); BindFleetWrapper(&m); BindBoxHelper(&m); diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 24ddad1bff0857f81278b86afee76a998b4d5899..f571cc666e5644acb508bbb53ae11c684cf92172 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -193,7 +193,7 @@ def dimension_is_compatible_with(first, second): return True -def check_feed_shape_type(var, feed): +def check_feed_shape_type(var, feed, num_places=1): """ Returns True if the variable doesn't require feed check or it is compatible with the shape and have same dtype as the feeded value. @@ -207,6 +207,8 @@ def check_feed_shape_type(var, feed): Args: var (Variable): the Variable object feed (LoDTensor): the feeded value, which must be a LoDTensor + num_places: an integer value indicating the number of places. + ParallelExecutor will divide data into devices (CPU/GPU) evenly. Returns: True if the shape and dtype of variable is compatible with the feed value Raises: @@ -214,11 +216,18 @@ def check_feed_shape_type(var, feed): the feed value """ if var.desc.need_check_feed(): - if not dimension_is_compatible_with(feed.shape(), var.shape): + feed_shape = feed.shape() + if six.PY2: + feed_shape[0] = long(feed_shape[0] / + num_places) if len(feed.lod()) == 0 else -1 + else: + feed_shape[0] = int(feed_shape[0] / + num_places) if len(feed.lod()) == 0 else -1 + if not dimension_is_compatible_with(feed_shape, var.shape): raise ValueError( 'The feeded Variable %r should have dimensions = %d, shape = ' - '%r, but received feeded shape %r' % - (var.name, len(var.shape), var.shape, feed.shape())) + '%r, but received feeded shape %r on each device' % + (var.name, len(var.shape), var.shape, feed_shape)) if not dtype_is_compatible_with(feed._dtype(), var.dtype): var_dtype_format = convert_dtype(var.dtype) if isinstance( var.dtype, core.VarDesc.VarType) else var.dtype @@ -632,7 +641,7 @@ class Executor(object): feed_tensor.set(feed[feed_name], core.CPUPlace()) if need_check_feed: var = global_block.var(feed_name) - check_feed_shape_type(var, feed_tensor) + check_feed_shape_type(var, feed_tensor, exe.device_count()) feed_tensor_dict[feed_name] = feed_tensor exe.feed_and_split_tensor_into_local_scopes(feed_tensor_dict) diff --git a/python/paddle/fluid/tests/unittests/test_cond.py b/python/paddle/fluid/tests/unittests/test_cond.py index 04e72a170a100c24d54bb94fd23b89c4aa890eed..b3632a5f5f3c6df569d872785c177a5bea36b1ac 100644 --- a/python/paddle/fluid/tests/unittests/test_cond.py +++ b/python/paddle/fluid/tests/unittests/test_cond.py @@ -15,6 +15,7 @@ from __future__ import print_function import numpy as np +import os import unittest import paddle.fluid as fluid @@ -22,7 +23,6 @@ import paddle.fluid.core as core import paddle.fluid.layers as layers import paddle.fluid.framework as framework from paddle.fluid.backward import append_backward -from paddle.fluid.executor import Executor from paddle.fluid.framework import Program, program_guard from simple_nets import simple_fc_net_with_inputs, batchnorm_fc_with_inputs @@ -329,7 +329,7 @@ class TestCondNestedControlFlow(unittest.TestCase): class TestCondBackward(unittest.TestCase): - def backward_value_helper(self, cond_func): + def backward_value_helper(self, cond_func, use_cuda, use_parallel_exe): """ Helper function that compares calculated backward value is close to dy/dx """ @@ -344,43 +344,78 @@ class TestCondBackward(unittest.TestCase): i = fluid.data(name="i", shape=[1], dtype='int32') loss = cond_func(i, img, label) append_backward(loss) - place = fluid.CUDAPlace(0) if core.is_compiled_with_cuda( - ) else fluid.CPUPlace() + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() exe = fluid.Executor(place) exe.run(startup_program) + num_devices = 1 + if use_parallel_exe: + os.environ['CPU_NUM'] = str(2) + exe = fluid.ParallelExecutor( + use_cuda=use_cuda, + main_program=main_program, + loss_name=loss.name) + num_devices = exe.device_count + delta = 0.005 for feed_i in range(0, 10): feed_img = np.random.random(size=[1, 9]).astype(np.float32) feed_label = np.random.randint( low=0, high=10, size=[1, 1], dtype=np.int64) - img_grad, loss_value = exe.run( - main_program, - feed={ - 'i': np.full((1), feed_i, np.int32), - 'image': feed_img, - 'label': feed_label - }, - fetch_list=[img.grad_name, loss.name]) + if use_parallel_exe: + img_grad, loss_value = exe.run( + feed={ + 'i': np.full((num_devices), feed_i, np.int32), + 'image': np.repeat( + feed_img, num_devices, axis=0), + 'label': np.repeat( + feed_label, num_devices, axis=0) + }, + fetch_list=[img.grad_name, loss.name]) + else: + img_grad, loss_value = exe.run( + main_program, + feed={ + 'i': np.full((1), feed_i, np.int32), + 'image': feed_img, + 'label': feed_label + }, + fetch_list=[img.grad_name, loss.name]) - numerical_grad = np.zeros(shape=[1, 9], dtype=np.float32) + numerical_grad = np.zeros(shape=[num_devices, 9], dtype=np.float32) feed_img_delta = np.copy(feed_img) for j in range(9): feed_img_delta[0][j] = feed_img[0][j] + delta - loss_delta = exe.run(main_program, - feed={ - 'i': np.full((1), feed_i, np.int32), - 'image': feed_img_delta, - 'label': feed_label - }, - fetch_list=[loss.name]) - numerical_grad[0][j] = (loss_delta[0] - loss_value[0]) / delta + if use_parallel_exe: + loss_delta = exe.run(feed={ + 'i': np.full((num_devices), feed_i, np.int32), + 'image': np.repeat( + feed_img_delta, num_devices, axis=0), + 'label': np.repeat( + feed_label, num_devices, axis=0) + }, + fetch_list=[loss.name]) + multi_device_grad = ( + loss_delta[0] - loss_value[0]) / delta / num_devices + for d in range(num_devices): + numerical_grad[d][j] = multi_device_grad[d] + else: + loss_delta = exe.run(main_program, + feed={ + 'i': np.full((1), feed_i, + np.int32), + 'image': feed_img_delta, + 'label': feed_label + }, + fetch_list=[loss.name]) + numerical_grad[0][j] = ( + loss_delta[0] - loss_value[0]) / delta feed_img_delta[0][j] = feed_img[0][j] self.assertTrue( np.isclose( img_grad, numerical_grad, atol=0.05, rtol=0.05).all()) - def add_optimizer_helper(self, cond_func): + def add_optimizer_helper(self, cond_func, use_cuda, use_parallel_exe): """ Test that program is runnable when add optimizer """ @@ -394,22 +429,38 @@ class TestCondBackward(unittest.TestCase): optimizer = fluid.optimizer.SGD(learning_rate=0.1) optimizer.minimize(loss) - place = fluid.CUDAPlace(0) if core.is_compiled_with_cuda( - ) else fluid.CPUPlace() + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() exe = fluid.Executor(place) exe.run(startup_program) + if use_parallel_exe: + os.environ['CPU_NUM'] = str(2) + exe = fluid.ParallelExecutor( + use_cuda=use_cuda, + main_program=main_program, + loss_name=loss.name) + num_devices = exe.device_count for feed_i in range(0, 10): feed_img = np.random.random(size=[16, 784]).astype(np.float32) feed_label = np.random.randint( low=0, high=10, size=[16, 1], dtype=np.int64) - exe.run(main_program, - feed={ - 'i': np.full((1), feed_i, np.int32), - 'image': feed_img, - 'label': feed_label - }, - fetch_list=[loss]) + if use_parallel_exe: + exe.run(feed={ + 'i': np.full((num_devices), feed_i, np.int32), + 'image': np.repeat( + feed_img, num_devices, axis=0), + 'label': np.repeat( + feed_label, num_devices, axis=0) + }, + fetch_list=[loss.name]) + else: + exe.run(main_program, + feed={ + 'i': np.full((1), feed_i, np.int32), + 'image': feed_img, + 'label': feed_label + }, + fetch_list=[loss]) def test_cond_backward(self): def cond_func(i, img, label): @@ -418,8 +469,13 @@ class TestCondBackward(unittest.TestCase): lambda: simple_fc_net_with_inputs(img, label, class_num=10), lambda: batchnorm_fc_with_inputs(img, label, class_num=10)) - self.backward_value_helper(cond_func) - self.add_optimizer_helper(cond_func) + for use_parallel_exe in [False, True]: + self.backward_value_helper(cond_func, + core.is_compiled_with_cuda(), + use_parallel_exe) + self.add_optimizer_helper(cond_func, + core.is_compiled_with_cuda(), + use_parallel_exe) def test_half_nested_cond_backward(self): def branch(i, img, label): @@ -434,10 +490,19 @@ class TestCondBackward(unittest.TestCase): return layers.cond(i < 5, lambda: layers.mean(img), lambda: branch(i, img, label)) - self.backward_value_helper(cond_func_simple_net_at_true) - self.add_optimizer_helper(cond_func_simple_net_at_true) - self.backward_value_helper(cond_func_simple_net_at_false) - self.add_optimizer_helper(cond_func_simple_net_at_false) + for use_parallel_exe in [False, True]: + self.backward_value_helper(cond_func_simple_net_at_true, + core.is_compiled_with_cuda(), + use_parallel_exe) + self.add_optimizer_helper(cond_func_simple_net_at_true, + core.is_compiled_with_cuda(), + use_parallel_exe) + self.backward_value_helper(cond_func_simple_net_at_false, + core.is_compiled_with_cuda(), + use_parallel_exe) + self.add_optimizer_helper(cond_func_simple_net_at_false, + core.is_compiled_with_cuda(), + use_parallel_exe) def test_nested_cond_backward(self): def branch(i, img, label, mod_two): @@ -453,8 +518,13 @@ class TestCondBackward(unittest.TestCase): return layers.cond(i < 5, lambda: branch(i, img, label, True), lambda: branch(i, img, label, False)) - self.backward_value_helper(cond_func) - self.add_optimizer_helper(cond_func) + for use_parallel_exe in [False, True]: + self.backward_value_helper(cond_func, + core.is_compiled_with_cuda(), + use_parallel_exe) + self.add_optimizer_helper(cond_func, + core.is_compiled_with_cuda(), + use_parallel_exe) if __name__ == '__main__': diff --git a/python/paddle/fluid/tests/unittests/test_feed_data_check_shape_type.py b/python/paddle/fluid/tests/unittests/test_feed_data_check_shape_type.py index 6cb483992927363acb990cf2f72d625d6c2d3b5f..afd5b52060a2dcd89188cab79156323c000837e9 100644 --- a/python/paddle/fluid/tests/unittests/test_feed_data_check_shape_type.py +++ b/python/paddle/fluid/tests/unittests/test_feed_data_check_shape_type.py @@ -36,17 +36,21 @@ class TestFeedData(unittest.TestCase): def setUp(self): self.hidden_sizes = [25, 20, 15] - self.base_batch_size = 10 + self.data_batch_size = 10 self.class_num = 10 self.iterations = 5 - def _get_batch_size(self, use_cuda, use_parallel_executor): - batch_size_times = 1 - if use_parallel_executor: - batch_size_times = core.get_cuda_device_count( - ) if use_cuda else int( - os.environ.get('CPU_NUM', multiprocessing.cpu_count())) - return self.base_batch_size * batch_size_times + def _get_device_count(self, use_cuda): + return core.get_cuda_device_count() if use_cuda else int( + os.environ.get('CPU_NUM', multiprocessing.cpu_count())) + + def _get_feed_batch_size(self, use_cuda, use_parallel_executor): + """ + Returns actual feeded data size. We should multiple the number of + devices when it is using ParallelExecutor + """ + return self.data_batch_size * self._get_device_count( + use_cuda) if use_parallel_executor else self.data_batch_size def _simple_fc_net(self, in_size, label_size, class_num, hidden_sizes): in_data = fluid.data(name="data", dtype='float32', shape=in_size) @@ -79,18 +83,17 @@ class TestFeedData(unittest.TestCase): use_parallel_executor) self._test_feed_data_contains_neg_one(use_cuda, use_parallel_executor) + self._test_feed_lod_tensor(use_cuda, use_parallel_executor) # Test exception message when feeding with error - batch_size = self._get_batch_size(use_cuda, - use_parallel_executor) if six.PY2: in_shape_tuple = (long(-1), long(3), long(4), long(8)) - feed_shape_list = [ - long(batch_size), long(3), long(4), long(5) + error_shape_list = [ + long(self.data_batch_size), long(3), long(4), long(5) ] else: in_shape_tuple = (-1, 3, 4, 8) - feed_shape_list = [batch_size, 3, 4, 5] + error_shape_list = [self.data_batch_size, 3, 4, 5] with self.assertRaises(ValueError) as shape_mismatch_err: self._test_feed_data_shape_mismatch(use_cuda, @@ -98,9 +101,9 @@ class TestFeedData(unittest.TestCase): self.assertEqual( str(shape_mismatch_err.exception), "The feeded Variable %r should have dimensions = %r, " - "shape = %r, but received feeded shape %r" % + "shape = %r, but received feeded shape %r on each device" % (u'data', len(in_shape_tuple), in_shape_tuple, - feed_shape_list)) + error_shape_list)) with self.assertRaises(ValueError) as dtype_mismatch_err: self._test_feed_data_dtype_mismatch(use_cuda, @@ -111,18 +114,20 @@ class TestFeedData(unittest.TestCase): "received 'float64'" % (u'label')) def _test_feed_data_dtype_mismatch(self, use_cuda, use_parallel_executor): - batch_size = self._get_batch_size(use_cuda, use_parallel_executor) - in_size = [batch_size, 3, 4, 5] + feed_batch_size = self._get_feed_batch_size(use_cuda, + use_parallel_executor) + in_size = [self.data_batch_size, 3, 4, 5] feed_in_data = np.random.uniform( - size=[batch_size, 3, 4, 5]).astype(np.float32) - label_size = [batch_size, 1] + size=[feed_batch_size, 3, 4, 5]).astype(np.float32) + label_size = [self.data_batch_size, 1] feed_label = np.random.randint( - low=0, high=self.class_num, size=[batch_size, 1]).astype(np.float64) + low=0, high=self.class_num, + size=[feed_batch_size, 1]).astype(np.float64) self._feed_data_in_executor(in_size, label_size, feed_in_data, feed_label, use_cuda, use_parallel_executor) def _test_feed_data_shape_mismatch(self, use_cuda, use_parallel_executor): - batch_size = self._get_batch_size(use_cuda, use_parallel_executor) + batch_size = self._get_feed_batch_size(use_cuda, use_parallel_executor) in_size = [None, 3, 4, 8] feed_in_data = np.random.uniform( size=[batch_size, 3, 4, 5]).astype(np.float32) @@ -133,7 +138,7 @@ class TestFeedData(unittest.TestCase): feed_label, use_cuda, use_parallel_executor) def _test_feed_data_contains_neg_one(self, use_cuda, use_parallel_executor): - batch_size = self._get_batch_size(use_cuda, use_parallel_executor) + batch_size = self._get_feed_batch_size(use_cuda, use_parallel_executor) in_size = [-1, 3, 4, 5] feed_in_data = np.random.uniform( size=[batch_size, 3, 4, 5]).astype(np.float32) @@ -144,15 +149,43 @@ class TestFeedData(unittest.TestCase): feed_label, use_cuda, use_parallel_executor) def _test_feed_data_match_shape_type(self, use_cuda, use_parallel_executor): - batch_size = self._get_batch_size(use_cuda, use_parallel_executor) - in_size = [batch_size, 3, 4, 5] - feed_in_data = np.random.uniform(size=in_size).astype(np.float32) - label_size = [batch_size, 1] + feed_batch_size = self._get_feed_batch_size(use_cuda, + use_parallel_executor) + in_size = [self.data_batch_size, 3, 4, 5] + feed_in_data = np.random.uniform( + size=[feed_batch_size, 3, 4, 5]).astype(np.float32) + label_size = [self.data_batch_size, 1] feed_label = np.random.randint( - low=0, high=self.class_num, size=label_size).astype(np.int64) + low=0, high=self.class_num, + size=[feed_batch_size, 1]).astype(np.int64) self._feed_data_in_executor(in_size, label_size, feed_in_data, feed_label, use_cuda, use_parallel_executor) + def _test_feed_lod_tensor(self, use_cuda, use_parallel_executor): + device_count = self._get_device_count(use_cuda) + + in_size = [device_count, 3, 4, 5] + sequence_lengths = [range(1, device_count + 1)] + # sum from 1 to device_count + sum_length = int((device_count + 1) * device_count / 2) + + feed_in_data = np.random.uniform( + size=[sum_length, 3, 4, 5]).astype(np.float32) + feed_data_tensor = fluid.LoDTensor() + feed_data_tensor.set(feed_in_data, fluid.CPUPlace()) + feed_data_tensor.set_recursive_sequence_lengths(sequence_lengths) + + label_size = [device_count, 1] + feed_label_tensor = fluid.LoDTensor() + feed_label = np.random.randint( + low=0, high=self.class_num, size=[sum_length, 1]).astype(np.int64) + feed_label_tensor.set(feed_label, fluid.CPUPlace()) + feed_label_tensor.set_recursive_sequence_lengths(sequence_lengths) + + self._feed_data_in_executor(in_size, label_size, feed_data_tensor, + feed_label_tensor, use_cuda, + use_parallel_executor) + def _feed_data_in_executor(self, in_size, label_size, feed_in_data, feed_label, use_cuda, use_parallel_executor):