未验证 提交 33965527 编写于 作者: C chengduo 提交者: GitHub

Add unit test for fuse all reduce (#16354)

* refine fused_all_reduce_op

* add unit test in test_parallel_executor_seresnext
test=develop
上级 18a0f6d9
...@@ -112,19 +112,20 @@ void FusedAllReduceOpHandle::RunImpl() { ...@@ -112,19 +112,20 @@ void FusedAllReduceOpHandle::RunImpl() {
}); });
for (size_t k = 1; k < g_tensor.size(); ++k) { for (size_t k = 1; k < g_tensor.size(); ++k) {
const void *pre_address = g_tensor.at(k - 1).second->data<void>(); const void *cur_address = g_tensor.at(k - 1).second->data<void>();
int64_t len = g_tensor.at(k - 1).second->numel(); int64_t len = g_tensor.at(k - 1).second->numel();
auto offset = len * framework::SizeOfType(dtype); auto offset = len * framework::SizeOfType(dtype);
void *next_address = reinterpret_cast<void *>( void *infer_next_address = reinterpret_cast<void *>(
reinterpret_cast<uintptr_t>(pre_address) + offset); reinterpret_cast<uintptr_t>(cur_address) + offset);
const void *cur_address = g_tensor.at(k).second->data<void>(); const void *next_address = g_tensor.at(k).second->data<void>();
VLOG(10) << k << ", "
<< " pre_address(" << g_tensor.at(k - 1).first VLOG(10) << string::Sprintf(
<< "): " << pre_address << ", cur_address(" "Input[%d](%s) address: 0X%02x, Input[%d](%s) address: 0X%02x, Infer "
<< g_tensor.at(k).first << "): " << cur_address "input[%d] address: 0X%02x. The offset: %d",
<< ", offset:" << offset << ", " << next_address << ", " k - 1, g_tensor.at(k - 1).first, cur_address, g_tensor.at(k).first, k,
<< cur_address; next_address, k, infer_next_address, offset);
PADDLE_ENFORCE_EQ(next_address, cur_address); PADDLE_ENFORCE_EQ(infer_next_address, next_address,
"The address is not consistent.");
} }
} }
......
...@@ -13,6 +13,9 @@ ...@@ -13,6 +13,9 @@
# limitations under the License. # limitations under the License.
from __future__ import print_function from __future__ import print_function
import os
os.environ['FLAGS_fuse_parameter_memory_size'] = "131072"
os.environ['FLAGS_fuse_parameter_groups_size'] = "3"
import paddle.fluid as fluid import paddle.fluid as fluid
import paddle.fluid.layers.ops as ops import paddle.fluid.layers.ops as ops
...@@ -22,7 +25,6 @@ import paddle.fluid.core as core ...@@ -22,7 +25,6 @@ import paddle.fluid.core as core
from parallel_executor_test_base import TestParallelExecutorBase from parallel_executor_test_base import TestParallelExecutorBase
import unittest import unittest
import math import math
import os
import numpy as np import numpy as np
# FIXME(zcd): If the neural net has dropout_op, the output of ParallelExecutor # FIXME(zcd): If the neural net has dropout_op, the output of ParallelExecutor
...@@ -312,17 +314,59 @@ class TestResnet(TestParallelExecutorBase): ...@@ -312,17 +314,59 @@ class TestResnet(TestParallelExecutorBase):
self.assertAlmostEquals( self.assertAlmostEquals(
np.mean(parallel_last_loss), single_last_loss[0], delta=delta2) np.mean(parallel_last_loss), single_last_loss[0], delta=delta2)
def _compare_with_fused_all_reduce(self,
model,
use_cuda,
iter=20,
delta2=1e-5):
if use_cuda and not core.is_compiled_with_cuda():
return
global remove_bn
remove_bn = True
img, label = self._init_data(batch_size=batch_size)
all_reduce_first_loss, all_reduce_last_loss = self.check_network_convergence(
model,
feed_dict={"image": img,
"label": label},
iter=iter,
batch_size=batch_size,
use_cuda=use_cuda,
fuse_all_reduce_ops=False,
optimizer=optimizer)
reduce_first_loss, reduce_last_loss = self.check_network_convergence(
model,
feed_dict={"image": img,
"label": label},
iter=iter,
batch_size=batch_size,
use_cuda=use_cuda,
fuse_all_reduce_ops=True,
optimizer=optimizer)
for loss in zip(all_reduce_first_loss, reduce_first_loss):
self.assertAlmostEquals(loss[0], loss[1], delta=1e-5)
for loss in zip(all_reduce_last_loss, reduce_last_loss):
self.assertAlmostEquals(loss[0], loss[1], delta=delta2)
def test_seresnext_with_learning_rate_decay(self): def test_seresnext_with_learning_rate_decay(self):
self._check_resnet_convergence(model=SE_ResNeXt50Small, use_cuda=True) self._check_resnet_convergence(model=SE_ResNeXt50Small, use_cuda=True)
self._check_resnet_convergence( self._check_resnet_convergence(
model=SE_ResNeXt50Small, use_cuda=False, iter=2, delta2=1e-3) model=SE_ResNeXt50Small, use_cuda=False, iter=2, delta2=1e-3)
def test_seresnext_with_new_strategy(self): def test_seresnext_with_reduce(self):
self._compare_reduce_and_allreduce( self._compare_reduce_and_allreduce(
model=SE_ResNeXt50Small, use_cuda=True, delta2=1e-2) model=SE_ResNeXt50Small, use_cuda=True, delta2=1e-2)
self._compare_reduce_and_allreduce( self._compare_reduce_and_allreduce(
model=SE_ResNeXt50Small, use_cuda=False, iter=5) model=SE_ResNeXt50Small, use_cuda=False, iter=5)
def test_seresnext_with_fused_all_reduce(self):
self._compare_with_fused_all_reduce(
model=SE_ResNeXt50Small, use_cuda=True, delta2=1e-3)
self._compare_with_fused_all_reduce(
model=SE_ResNeXt50Small, use_cuda=False, iter=2, delta2=1e-3)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册