未验证 提交 19001c6c 编写于 作者: K kangguangli 提交者: GitHub

[with_data_parallel][part10] remove with_data_parallel in parallel_executor.py (#51369)

* remove with_data_parallel

* remove multidevice-optimizer-in-controlflow checks and fix ci
上级 7ee3eba9
...@@ -531,16 +531,6 @@ class CompiledProgram: ...@@ -531,16 +531,6 @@ class CompiledProgram:
else: else:
self._places = [self._place] self._places = [self._place]
# Todo(liym27):If optimizer is used in control flow,
# training on multi-places is not supported now, will
# be supported later.
if len(self._places) > 1 and _has_optimizer_in_control_flow(
self._program
):
raise NotImplementedError(
"If optimizer is used in control flow, "
"training on multi-places is not supported now."
)
if isinstance(self._place, core.CUDAPlace): if isinstance(self._place, core.CUDAPlace):
use_device = DeviceType.CUDA use_device = DeviceType.CUDA
elif isinstance(self._place, core.XPUPlace): elif isinstance(self._place, core.XPUPlace):
......
...@@ -189,21 +189,14 @@ class ParallelExecutor: ...@@ -189,21 +189,14 @@ class ParallelExecutor:
else framework.default_main_program() else framework.default_main_program()
) )
self._compiled_program = compiler.CompiledProgram(main_program) self._compiled_program = compiler.CompiledProgram(
main_program, build_strategy=build_strategy
)
if share_vars_from: if share_vars_from:
assert isinstance( assert isinstance(
share_vars_from, ParallelExecutor share_vars_from, ParallelExecutor
), "The share_vars_from should be ParallelExecutor." ), "The share_vars_from should be ParallelExecutor."
self._compiled_program.with_data_parallel(
loss_name=loss_name,
build_strategy=build_strategy,
exec_strategy=exec_strategy,
share_vars_from=share_vars_from._compiled_program
if share_vars_from
else None,
)
self._place = core.CUDAPlace(0) if use_cuda else core.CPUPlace() self._place = core.CUDAPlace(0) if use_cuda else core.CPUPlace()
self._exe = executor.Executor(self._place) self._exe = executor.Executor(self._place)
......
...@@ -12,7 +12,6 @@ ...@@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import os
import unittest import unittest
import numpy as np import numpy as np
...@@ -605,7 +604,7 @@ class TestCondNestedControlFlow(unittest.TestCase): ...@@ -605,7 +604,7 @@ class TestCondNestedControlFlow(unittest.TestCase):
class TestCondBackward(unittest.TestCase): class TestCondBackward(unittest.TestCase):
def backward_value_helper(self, cond_func, use_cuda, use_parallel_exe): def backward_value_helper(self, cond_func, use_cuda):
""" """
Helper function that compares calculated backward value is close to dy/dx Helper function that compares calculated backward value is close to dy/dx
""" """
...@@ -626,14 +625,6 @@ class TestCondBackward(unittest.TestCase): ...@@ -626,14 +625,6 @@ class TestCondBackward(unittest.TestCase):
exe.run(startup_program) exe.run(startup_program)
num_devices = 1 num_devices = 1
if use_parallel_exe:
os.environ['CPU_NUM'] = str(2)
exe = fluid.ParallelExecutor(
use_cuda=use_cuda,
main_program=main_program,
loss_name=loss.name,
)
num_devices = exe.device_count
delta = 0.005 delta = 0.005
for feed_i in range(0, 10): for feed_i in range(0, 10):
...@@ -641,65 +632,37 @@ class TestCondBackward(unittest.TestCase): ...@@ -641,65 +632,37 @@ class TestCondBackward(unittest.TestCase):
feed_label = np.random.randint( feed_label = np.random.randint(
low=0, high=10, size=[1, 1], dtype=np.int64 low=0, high=10, size=[1, 1], dtype=np.int64
) )
if use_parallel_exe:
img_grad, loss_value = exe.run( img_grad, loss_value = exe.run(
feed={ main_program,
'i': np.full((num_devices), feed_i, np.int32), feed={
'image': np.repeat(feed_img, num_devices, axis=0), 'i': np.full((1), feed_i, np.int32),
'label': np.repeat(feed_label, num_devices, axis=0), 'image': feed_img,
}, 'label': feed_label,
fetch_list=[img.grad_name, loss.name], },
) fetch_list=[img.grad_name, loss.name],
else: )
img_grad, loss_value = exe.run(
numerical_grad = np.zeros(shape=[num_devices, 9], dtype=np.float32)
feed_img_delta = np.copy(feed_img)
for j in range(9):
feed_img_delta[0][j] = feed_img[0][j] + delta
loss_delta = exe.run(
main_program, main_program,
feed={ feed={
'i': np.full((1), feed_i, np.int32), 'i': np.full((1), feed_i, np.int32),
'image': feed_img, 'image': feed_img_delta,
'label': feed_label, 'label': feed_label,
}, },
fetch_list=[img.grad_name, loss.name], fetch_list=[loss.name],
) )
numerical_grad[0][j] = (loss_delta[0] - loss_value[0]) / delta
numerical_grad = np.zeros(shape=[num_devices, 9], dtype=np.float32)
feed_img_delta = np.copy(feed_img)
for j in range(9):
feed_img_delta[0][j] = feed_img[0][j] + delta
if use_parallel_exe:
loss_delta = exe.run(
feed={
'i': np.full((num_devices), feed_i, np.int32),
'image': np.repeat(
feed_img_delta, num_devices, axis=0
),
'label': np.repeat(feed_label, num_devices, axis=0),
},
fetch_list=[loss.name],
)
multi_device_grad = (
(loss_delta[0] - loss_value[0]) / delta / num_devices
)
for d in range(num_devices):
numerical_grad[d][j] = multi_device_grad[d]
else:
loss_delta = exe.run(
main_program,
feed={
'i': np.full((1), feed_i, np.int32),
'image': feed_img_delta,
'label': feed_label,
},
fetch_list=[loss.name],
)
numerical_grad[0][j] = (
loss_delta[0] - loss_value[0]
) / delta
feed_img_delta[0][j] = feed_img[0][j] feed_img_delta[0][j] = feed_img[0][j]
np.testing.assert_allclose( np.testing.assert_allclose(
img_grad, numerical_grad, rtol=0.05, atol=0.05 img_grad, numerical_grad, rtol=0.05, atol=0.05
) )
def add_optimizer_helper(self, cond_func, use_cuda, use_parallel_exe): def add_optimizer_helper(self, cond_func, use_cuda):
""" """
Test that program is runnable when add optimizer Test that program is runnable when add optimizer
""" """
...@@ -716,39 +679,21 @@ class TestCondBackward(unittest.TestCase): ...@@ -716,39 +679,21 @@ class TestCondBackward(unittest.TestCase):
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe = fluid.Executor(place) exe = fluid.Executor(place)
exe.run(startup_program) exe.run(startup_program)
if use_parallel_exe:
os.environ['CPU_NUM'] = str(2)
exe = fluid.ParallelExecutor(
use_cuda=use_cuda,
main_program=main_program,
loss_name=loss.name,
)
num_devices = exe.device_count
for feed_i in range(0, 10): for feed_i in range(0, 10):
feed_img = np.random.random(size=[16, 784]).astype(np.float32) feed_img = np.random.random(size=[16, 784]).astype(np.float32)
feed_label = np.random.randint( feed_label = np.random.randint(
low=0, high=10, size=[16, 1], dtype=np.int64 low=0, high=10, size=[16, 1], dtype=np.int64
) )
if use_parallel_exe: exe.run(
exe.run( main_program,
feed={ feed={
'i': np.full((num_devices), feed_i, np.int32), 'i': np.full((1), feed_i, np.int32),
'image': np.repeat(feed_img, num_devices, axis=0), 'image': feed_img,
'label': np.repeat(feed_label, num_devices, axis=0), 'label': feed_label,
}, },
fetch_list=[loss.name], fetch_list=[loss],
) )
else:
exe.run(
main_program,
feed={
'i': np.full((1), feed_i, np.int32),
'image': feed_img,
'label': feed_label,
},
fetch_list=[loss],
)
def test_cond_backward(self): def test_cond_backward(self):
...@@ -762,19 +707,8 @@ class TestCondBackward(unittest.TestCase): ...@@ -762,19 +707,8 @@ class TestCondBackward(unittest.TestCase):
lambda: batchnorm_fc_with_inputs(img, label, class_num=10), lambda: batchnorm_fc_with_inputs(img, label, class_num=10),
) )
for use_parallel_exe in [False, True]: self.backward_value_helper(cond_func, core.is_compiled_with_cuda())
if use_parallel_exe and os.name == "nt": self.add_optimizer_helper(cond_func, core.is_compiled_with_cuda())
print(
"Skip use_parallel_exe=True in Windows because of flaky test when using PE under old Windows machine"
)
continue
self.backward_value_helper(
cond_func, core.is_compiled_with_cuda(), use_parallel_exe
)
self.add_optimizer_helper(
cond_func, core.is_compiled_with_cuda(), use_parallel_exe
)
def test_half_nested_cond_backward(self): def test_half_nested_cond_backward(self):
paddle.enable_static() paddle.enable_static()
...@@ -796,33 +730,22 @@ class TestCondBackward(unittest.TestCase): ...@@ -796,33 +730,22 @@ class TestCondBackward(unittest.TestCase):
i < 5, lambda: paddle.mean(img), lambda: branch(i, img, label) i < 5, lambda: paddle.mean(img), lambda: branch(i, img, label)
) )
for use_parallel_exe in [False, True]: self.backward_value_helper(
if use_parallel_exe and os.name == "nt": cond_func_simple_net_at_true,
print( core.is_compiled_with_cuda(),
"Skip use_parallel_exe=True in Windows because of flaky test when using PE under old Windows machine" )
) self.add_optimizer_helper(
continue cond_func_simple_net_at_true,
core.is_compiled_with_cuda(),
self.backward_value_helper( )
cond_func_simple_net_at_true, self.backward_value_helper(
core.is_compiled_with_cuda(), cond_func_simple_net_at_false,
use_parallel_exe, core.is_compiled_with_cuda(),
) )
self.add_optimizer_helper( self.add_optimizer_helper(
cond_func_simple_net_at_true, cond_func_simple_net_at_false,
core.is_compiled_with_cuda(), core.is_compiled_with_cuda(),
use_parallel_exe, )
)
self.backward_value_helper(
cond_func_simple_net_at_false,
core.is_compiled_with_cuda(),
use_parallel_exe,
)
self.add_optimizer_helper(
cond_func_simple_net_at_false,
core.is_compiled_with_cuda(),
use_parallel_exe,
)
def test_nested_cond_backward(self): def test_nested_cond_backward(self):
paddle.enable_static() paddle.enable_static()
...@@ -845,18 +768,8 @@ class TestCondBackward(unittest.TestCase): ...@@ -845,18 +768,8 @@ class TestCondBackward(unittest.TestCase):
lambda: branch(i, img, label, False), lambda: branch(i, img, label, False),
) )
for use_parallel_exe in [False, True]: self.backward_value_helper(cond_func, core.is_compiled_with_cuda())
if use_parallel_exe and os.name == "nt": self.add_optimizer_helper(cond_func, core.is_compiled_with_cuda())
print(
"Skip use_parallel_exe=True in Windows because of flaky test when using PE under old Windows machine"
)
continue
self.backward_value_helper(
cond_func, core.is_compiled_with_cuda(), use_parallel_exe
)
self.add_optimizer_helper(
cond_func, core.is_compiled_with_cuda(), use_parallel_exe
)
class TestCondWithError(unittest.TestCase): class TestCondWithError(unittest.TestCase):
......
...@@ -12,7 +12,6 @@ ...@@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import os
import unittest import unittest
import numpy as np import numpy as np
...@@ -251,67 +250,5 @@ class TestMultiTask(unittest.TestCase): ...@@ -251,67 +250,5 @@ class TestMultiTask(unittest.TestCase):
np.testing.assert_allclose(loss_1, loss_2, rtol=1e-05) np.testing.assert_allclose(loss_1, loss_2, rtol=1e-05)
class TestMultiOptimizersMultiCardsError(unittest.TestCase):
def test_error(self):
startup_program = Program()
main_program = Program()
use_cuda = core.is_compiled_with_cuda()
with program_guard(main_program, startup_program):
def fn_1(opt, avg_loss):
opt.minimize(avg_loss)
def fn_2(opt, avg_loss):
opt.minimize(avg_loss)
x = paddle.static.data("X", [-1, 10], 'float32')
hidden = paddle.static.nn.fc(x, 5)
avg_loss = paddle.mean(hidden)
adam = optimizer.Adam(learning_rate=LR)
sgd = optimizer.SGD(learning_rate=LR)
cond = layers.fill_constant([1], 'bool', True)
paddle.static.nn.case(
[(cond, lambda: fn_1(adam, avg_loss))],
lambda: fn_2(sgd, avg_loss),
)
cpu_place = fluid.CPUPlace()
cuda_place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
for place in [cpu_place, cuda_place]:
exe = fluid.Executor(place)
exe.run(startup_program)
np.random.seed(SEED)
# NOTE(liym27):
# This test needs to run in multi cards to test NotImplementedError.
# Here, move this test from RUN_TYPE=DIST in tests/unittests/CMakeList.txt,
# to use multi cards ** only on CPU ** not GPU to reduce CI time.
os.environ['CPU_NUM'] = str(2)
pe_exe = fluid.ParallelExecutor(
use_cuda=use_cuda,
main_program=main_program,
loss_name=avg_loss.name,
)
num_devices = pe_exe.device_count
def not_implemented_error():
pe_exe.run(
feed={
'X': np.random.random(size=[64, 10]).astype('float32'),
},
fetch_list=[avg_loss.name],
)
if num_devices > 1:
self.assertRaises(NotImplementedError, not_implemented_error)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册