diff --git a/paddle/fluid/framework/details/reduce_and_gather.h b/paddle/fluid/framework/details/reduce_and_gather.h index c0cd873a1d83fa8c2c7b7cd5acfaad9949bcff7d..e28264eb32756f77ef5baed3dff77ba9f0943160 100644 --- a/paddle/fluid/framework/details/reduce_and_gather.h +++ b/paddle/fluid/framework/details/reduce_and_gather.h @@ -35,14 +35,16 @@ struct ReduceLoDTensor { PADDLE_ENFORCE(!src_tensors_.empty()); auto &t0 = *src_tensors_[0]; PADDLE_ENFORCE_NE(t0.numel(), 0); + dst_tensor_.Resize(t0.dims()); T *dst = dst_tensor_.mutable_data(platform::CPUPlace()); - if (dst != t0.data()) { - std::copy(t0.data(), t0.data() + t0.numel(), dst); - } - for (size_t i = 1; i < src_tensors_.size(); ++i) { + for (size_t i = 0; i < src_tensors_.size(); ++i) { auto &t = *src_tensors_[i]; + if (dst == t.data()) { + continue; + } + PADDLE_ENFORCE_EQ(t.dims(), t0.dims()); PADDLE_ENFORCE_EQ(t.type(), t0.type()); std::transform(t.data(), t.data() + t.numel(), dst, dst, diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py index 4580ab85e235e9f73d138058ac6a9aa3beb49a54..d999ca8d3ca0e7c1a0fa0482ff7ce816cd8b5da7 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py @@ -102,6 +102,16 @@ class TestMNIST(TestParallelExecutorBase): fluid.recordio_writer.convert_reader_to_recordio_file( MNIST_RECORDIO_FILE, reader, feeder) + def _init_data(self, random=True): + np.random.seed(5) + if random: + img = np.random.random(size=[32, 784]).astype(np.float32) + else: + img = np.ones(shape=[32, 784], dtype='float32') + label = np.ones(shape=[32, 1], dtype='int64') + return img, label + + # simple_fc def check_simple_fc_convergence(self, use_cuda, use_reduce=False): if use_cuda and not core.is_compiled_with_cuda(): return @@ -109,8 +119,8 @@ class TestMNIST(TestParallelExecutorBase): self.check_network_convergence( simple_fc_net, use_cuda=use_cuda, allow_op_delay=True) - img = np.zeros(shape=[32, 784], dtype='float32') - label = np.ones(shape=[32, 1], dtype='int64') + img, label = self._init_data() + self.check_network_convergence( simple_fc_net, feed_dict={"image": img, @@ -118,6 +128,37 @@ class TestMNIST(TestParallelExecutorBase): use_cuda=use_cuda, use_reduce=use_reduce) + def check_simple_fc_convergence_with_Reduce(self, use_cuda): + if use_cuda and not core.is_compiled_with_cuda(): + return + self.check_network_convergence( + simple_fc_net, use_cuda=use_cuda, use_reduce=True) + self.check_network_convergence( + simple_fc_net, + use_cuda=use_cuda, + allow_op_delay=True, + use_reduce=True) + + img, label = self._init_data() + + all_reduce_first_loss, all_reduce_last_loss = self.check_network_convergence( + simple_fc_net, + feed_dict={"image": img, + "label": label}, + use_cuda=use_cuda, + use_reduce=False) + reduce_first_loss, reduce_last_loss = self.check_network_convergence( + simple_fc_net, + feed_dict={"image": img, + "label": label}, + use_cuda=use_cuda, + use_reduce=True) + + for loss in zip(all_reduce_first_loss, reduce_first_loss): + self.assertAlmostEquals(loss[0], loss[1], delta=1e-6) + for loss in zip(all_reduce_last_loss, reduce_last_loss): + self.assertAlmostEquals(loss[0], loss[1], delta=1e-6) + def test_simple_fc(self): # use_cuda self.check_simple_fc_convergence(True) @@ -125,14 +166,15 @@ class TestMNIST(TestParallelExecutorBase): def test_simple_fc_with_new_strategy(self): # use_cuda, use_reduce - self.check_simple_fc_convergence(True, True) - self.check_simple_fc_convergence(False, True) + self.check_simple_fc_convergence_with_Reduce(True) + self.check_simple_fc_convergence_with_Reduce(False) - def check_simple_fc_parallel_accuracy(self, use_cuda, use_reduce=False): + def check_simple_fc_parallel_accuracy(self, use_cuda): if use_cuda and not core.is_compiled_with_cuda(): return - img = np.zeros(shape=[32, 784], dtype='float32') - label = np.ones(shape=[32, 1], dtype='int64') + + img, label = self._init_data(random=False) + single_first_loss, single_last_loss = self.check_network_convergence( method=simple_fc_net, seed=1000, @@ -146,8 +188,7 @@ class TestMNIST(TestParallelExecutorBase): feed_dict={"image": img, "label": label}, use_cuda=use_cuda, - use_parallel_executor=True, - use_reduce=use_reduce) + use_parallel_executor=True) for p_f in parallel_first_loss: self.assertAlmostEquals(p_f, single_first_loss[0], delta=1e-6) @@ -158,32 +199,53 @@ class TestMNIST(TestParallelExecutorBase): self.check_simple_fc_parallel_accuracy(True) self.check_simple_fc_parallel_accuracy(False) - def test_simple_fc_parallel_accuracy_with_new_strategy(self): - # use_cuda, use_reduce - self.check_simple_fc_parallel_accuracy(True, True) - self.check_simple_fc_parallel_accuracy(False, True) - - def check_batchnorm_fc_convergence(self, use_cuda, use_reduce=False): + def check_batchnorm_fc_convergence(self, use_cuda): if use_cuda and not core.is_compiled_with_cuda(): return + self.check_network_convergence(fc_with_batchnorm, use_cuda=use_cuda) - img = np.zeros(shape=[32, 784], dtype='float32') - label = np.ones(shape=[32, 1], dtype='int64') + + img, label = self._init_data() + + self.check_network_convergence( + fc_with_batchnorm, + feed_dict={"image": img, + "label": label}, + use_cuda=use_cuda) + + def check_batchnorm_fc_convergence_use_reduce(self, use_cuda): + if use_cuda and not core.is_compiled_with_cuda(): + return self.check_network_convergence( + fc_with_batchnorm, use_cuda=use_cuda, use_reduce=True) + + img, label = self._init_data() + + all_reduce_first_loss, all_reduce_last_loss = self.check_network_convergence( fc_with_batchnorm, feed_dict={"image": img, "label": label}, use_cuda=use_cuda, - use_reduce=use_reduce) + use_reduce=False) + reduce_first_loss, reduce_last_loss = self.check_network_convergence( + fc_with_batchnorm, + feed_dict={"image": img, + "label": label}, + use_cuda=use_cuda, + use_reduce=True) + + for loss in zip(all_reduce_first_loss, reduce_first_loss): + self.assertAlmostEquals(loss[0], loss[1], delta=1e-6) + for loss in zip(all_reduce_last_loss, reduce_last_loss): + self.assertAlmostEquals(loss[0], loss[1], delta=1e-4) def test_batchnorm_fc(self): self.check_batchnorm_fc_convergence(True) self.check_batchnorm_fc_convergence(False) def test_batchnorm_fc_with_new_strategy(self): - # use_cuda, use_reduce - self.check_batchnorm_fc_convergence(True, True) - self.check_batchnorm_fc_convergence(False, True) + self.check_batchnorm_fc_convergence_use_reduce(True) + self.check_batchnorm_fc_convergence_use_reduce(False) if __name__ == '__main__':