未验证 提交 2440c980 编写于 作者: K kangguangli 提交者: GitHub

[Executor] rm Executor._run_parallel (#51616)

* rm Executor._run_parallel

* remove compiledProgram related tests of standaloneExecutor
上级 d850022d
...@@ -162,9 +162,7 @@ class CompiledProgram: ...@@ -162,9 +162,7 @@ class CompiledProgram:
self._place = None self._place = None
self._executor = None self._executor = None
self._compiled = False self._compiled = False
self._is_data_parallel = False
self._is_inference = False self._is_inference = False
self._loss_name = None
self._share_vars_from = None self._share_vars_from = None
self._places = None self._places = None
self._build_strategy = build_strategy self._build_strategy = build_strategy
...@@ -178,9 +176,6 @@ class CompiledProgram: ...@@ -178,9 +176,6 @@ class CompiledProgram:
Returns: Returns:
self self
""" """
assert (
not self._is_data_parallel
), "Cannot compile with both data parallel and inference"
assert ( assert (
not self._is_inference not self._is_inference
), "Already compiled with inference, cannot be recompiled." ), "Already compiled with inference, cannot be recompiled."
...@@ -204,11 +199,6 @@ class CompiledProgram: ...@@ -204,11 +199,6 @@ class CompiledProgram:
if self._share_vars_from: if self._share_vars_from:
if scope: if scope:
sys.stderr.write("share_vars_from is set, scope is ignored.\n") sys.stderr.write("share_vars_from is set, scope is ignored.\n")
if not self._share_vars_from._is_data_parallel:
raise ValueError(
"The shared Program is not data parallel, cannot "
"share variables from it."
)
if self._share_vars_from._executor is None: if self._share_vars_from._executor is None:
raise ValueError( raise ValueError(
"The shared Program is not compiled and executed, so there is no " "The shared Program is not compiled and executed, so there is no "
...@@ -328,7 +318,7 @@ class CompiledProgram: ...@@ -328,7 +318,7 @@ class CompiledProgram:
return core.ParallelExecutor( return core.ParallelExecutor(
places, places,
self._persistable_vars, self._persistable_vars,
self._loss_name if self._loss_name else '', '',
self._scope, self._scope,
self._local_scopes, self._local_scopes,
self._exec_strategy, self._exec_strategy,
...@@ -364,10 +354,7 @@ class CompiledProgram: ...@@ -364,10 +354,7 @@ class CompiledProgram:
if self._is_inference: if self._is_inference:
self._executor = self._compile_inference() self._executor = self._compile_inference()
else: else:
if self._is_data_parallel: self._places = [self._place]
self._places = self._get_places(self._place, self._places)
else:
self._places = [self._place]
if isinstance(self._place, core.CUDAPlace): if isinstance(self._place, core.CUDAPlace):
use_device = DeviceType.CUDA use_device = DeviceType.CUDA
......
...@@ -1295,81 +1295,6 @@ class Executor: ...@@ -1295,81 +1295,6 @@ class Executor:
del trainer_instance del trainer_instance
self._default_executor.close() self._default_executor.close()
def _run_parallel(
self, program, scope, feed, fetch_list, fetch_var_name, return_numpy
):
from paddle.optimizer.lr import LRScheduler
exe = program._executor
# TODO(zhenghuihuang): quantization uses Graph in CompiledProgram
# instead of program. We will add support for checking Vars in Graph
need_check_feed = program._program is not None
if need_check_feed:
global_block = program._program.global_block()
if isinstance(feed, dict):
feed_tensor_dict = dict()
for feed_name in feed:
feed_tensor = feed[feed_name]
var = global_block.var(feed_name) if need_check_feed else None
if not isinstance(feed_tensor, core.LoDTensor):
# always set to CPU place, since the tensor need to be split
# it is fast in CPU
feed_tensor = _as_lodtensor(
feed[feed_name],
core.CPUPlace(),
var.dtype if var else None,
)
if need_check_feed:
check_feed_shape_type(var, feed_tensor, exe.device_count())
feed_tensor_dict[feed_name] = feed_tensor
exe.feed_and_split_tensor_into_local_scopes(feed_tensor_dict)
elif isinstance(feed, list) or isinstance(feed, tuple):
res = list()
for i, each in enumerate(feed):
if not isinstance(each, dict):
raise TypeError(
"Each element of feed list should be a dict"
)
res_dict = dict()
for feed_name in each:
tensor = each[feed_name]
var = (
global_block.var(feed_name) if need_check_feed else None
)
if not isinstance(tensor, core.LoDTensor):
tensor = _as_lodtensor(
each[feed_name],
program._places[i],
var.dtype if var else None,
)
if need_check_feed:
check_feed_shape_type(var, tensor)
res_dict[feed_name] = tensor
res.append(res_dict)
exe.feed_tensors_into_local_scopes(res)
if hasattr(program._program, 'lr_sheduler'):
lr_sheduler = program._program.lr_sheduler
assert isinstance(lr_sheduler, LRScheduler), "must be LRScheduler"
lr_value = lr_sheduler()
lr_var = program._program.global_block().vars[lr_sheduler._var_name]
lr_tensor = _as_lodtensor(lr_value, core.CPUPlace(), lr_var.dtype)
if core.is_cuda_graph_capturing():
warnings.warn(
"Caution!!! When capturing CUDA Graph, the learning rate scheduler would not "
"take any effect! Please set the learning rate manually before each batch!"
)
else:
exe.feed_and_split_tensor_into_local_scopes(
{lr_sheduler._var_name: lr_tensor}
)
fetch_var_names = list(map(_to_name_str, fetch_list))
tensors = exe.run(fetch_var_names, True)._move_to_list()
return as_numpy(tensors) if return_numpy else tensors
def run( def run(
self, self,
program=None, program=None,
...@@ -1673,23 +1598,7 @@ class Executor: ...@@ -1673,23 +1598,7 @@ class Executor:
else program._graph else program._graph
) )
# Unsupported case 1: data parallel # Unsupported case 1: inference
if (
compiled_program._is_data_parallel
and len(
compiled_program._get_places(
place, compiled_program._places
)
)
!= 1
):
warnings.warn(
"Standalone executor is not used for data parallel",
UserWarning,
)
return False
# Unsupported case 2: inference
if compiled_program._is_inference: if compiled_program._is_inference:
warnings.warn( warnings.warn(
"Standalone executor is not used for inference", "Standalone executor is not used for inference",
...@@ -1697,7 +1606,7 @@ class Executor: ...@@ -1697,7 +1606,7 @@ class Executor:
) )
return False return False
# Unsupported case 3: async mode # Unsupported case 2: async mode
if ( if (
compiled_program._build_strategy is not None compiled_program._build_strategy is not None
and compiled_program._build_strategy.async_mode and compiled_program._build_strategy.async_mode
...@@ -1708,7 +1617,7 @@ class Executor: ...@@ -1708,7 +1617,7 @@ class Executor:
) )
return False return False
# Unsupported case 4: CUDA Graph # Unsupported case 3: CUDA Graph
if ( if (
compiled_program._build_strategy is not None compiled_program._build_strategy is not None
and compiled_program._build_strategy.allow_cuda_graph_capture and compiled_program._build_strategy.allow_cuda_graph_capture
...@@ -1803,24 +1712,6 @@ class Executor: ...@@ -1803,24 +1712,6 @@ class Executor:
# For backward compatibility, run directly. # For backward compatibility, run directly.
if not compiled: if not compiled:
# In distributed training, the compiled program is saved in Program._graph
has_compiled_graph = isinstance(
program._graph, compiler.CompiledProgram
)
if has_compiled_graph:
program._graph._compile(scope, self.place)
# _graph in program does not support inference since the _graph is optimized
# through optimizer.minimize function and should not be used as inference graph
# assert not program._graph._is_inference
return self._run_parallel(
program._graph,
scope=scope,
feed=feed,
fetch_list=fetch_list,
fetch_var_name=fetch_var_name,
return_numpy=return_numpy,
)
return self._run_program( return self._run_program(
program, program,
...@@ -1834,17 +1725,10 @@ class Executor: ...@@ -1834,17 +1725,10 @@ class Executor:
) )
program._compile(scope, self.place) program._compile(scope, self.place)
if program._is_inference: assert (
return self._run_inference(program._executor, feed) program._is_inference
else: ), f"Program must have _is_inference = True, but get {program._is_inference}"
return self._run_parallel( return self._run_inference(program._executor, feed)
program,
scope=scope,
feed=feed,
fetch_list=fetch_list,
fetch_var_name=fetch_var_name,
return_numpy=return_numpy,
)
def _run_program( def _run_program(
self, self,
......
...@@ -57,19 +57,8 @@ class FeedDataReader: ...@@ -57,19 +57,8 @@ class FeedDataReader:
assert isinstance(exe, fluid.Executor), "exe must be Executor" assert isinstance(exe, fluid.Executor), "exe must be Executor"
use_cuda = isinstance(exe.place, fluid.CUDAPlace) use_cuda = isinstance(exe.place, fluid.CUDAPlace)
if isinstance(program, fluid.CompiledProgram): if isinstance(program, fluid.CompiledProgram):
if program._is_data_parallel: use_executor = True
use_executor = False device_num = 1
if program._places is None:
device_num = (
len(fluid.cuda_places())
if use_cuda
else len(fluid.cpu_places())
)
else:
device_num = len(program._places)
else:
use_executor = True
device_num = 1
else: else:
use_executor = True use_executor = True
device_num = 1 device_num = 1
......
...@@ -283,24 +283,6 @@ class SwitchExecutorInterfaceWithFeed(unittest.TestCase): ...@@ -283,24 +283,6 @@ class SwitchExecutorInterfaceWithFeed(unittest.TestCase):
with framework._enable_standalone_executor(): with framework._enable_standalone_executor():
self._run(feed[0], add_wrong_fetch=True) self._run(feed[0], add_wrong_fetch=True)
def test_compiled_program(self):
data = np.ones([2, 2], dtype="float32")
feed = {"a": data}
res = self.run_new_executor(feed, use_compiled=True)
gt = self.run_raw_executor(feed, use_compiled=True)
for x, y in zip(gt, res):
np.testing.assert_array_equal(x, y)
def test_compiled_program_convert_graph_to_program(self):
data = np.ones([2, 2], dtype="float32")
feed = {"a": data}
res = self.run_new_executor(feed, use_compiled=True)
gt = self.run_raw_executor(feed, use_compiled=True)
for x, y in zip(gt, res):
np.testing.assert_array_equal(x, y)
def test_empty_program(self): def test_empty_program(self):
program = paddle.static.Program() program = paddle.static.Program()
exe = paddle.static.Executor(self.place) exe = paddle.static.Executor(self.place)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册