From 2440c980888bad44e6d1e5301c24a5683a104458 Mon Sep 17 00:00:00 2001 From: kangguangli Date: Thu, 16 Mar 2023 14:59:23 +0800 Subject: [PATCH] [Executor] rm Executor._run_parallel (#51616) * rm Executor._run_parallel * remove compiledProgram related tests of standaloneExecutor --- python/paddle/fluid/compiler.py | 17 +-- python/paddle/fluid/executor.py | 130 +----------------- .../fluid/tests/unittests/feed_data_reader.py | 15 +- .../test_standalone_executor.py | 18 --- 4 files changed, 11 insertions(+), 169 deletions(-) diff --git a/python/paddle/fluid/compiler.py b/python/paddle/fluid/compiler.py index 977fcbdf15..4843b4e569 100644 --- a/python/paddle/fluid/compiler.py +++ b/python/paddle/fluid/compiler.py @@ -162,9 +162,7 @@ class CompiledProgram: self._place = None self._executor = None self._compiled = False - self._is_data_parallel = False self._is_inference = False - self._loss_name = None self._share_vars_from = None self._places = None self._build_strategy = build_strategy @@ -178,9 +176,6 @@ class CompiledProgram: Returns: self """ - assert ( - not self._is_data_parallel - ), "Cannot compile with both data parallel and inference" assert ( not self._is_inference ), "Already compiled with inference, cannot be recompiled." @@ -204,11 +199,6 @@ class CompiledProgram: if self._share_vars_from: if scope: sys.stderr.write("share_vars_from is set, scope is ignored.\n") - if not self._share_vars_from._is_data_parallel: - raise ValueError( - "The shared Program is not data parallel, cannot " - "share variables from it." - ) if self._share_vars_from._executor is None: raise ValueError( "The shared Program is not compiled and executed, so there is no " @@ -328,7 +318,7 @@ class CompiledProgram: return core.ParallelExecutor( places, self._persistable_vars, - self._loss_name if self._loss_name else '', + '', self._scope, self._local_scopes, self._exec_strategy, @@ -364,10 +354,7 @@ class CompiledProgram: if self._is_inference: self._executor = self._compile_inference() else: - if self._is_data_parallel: - self._places = self._get_places(self._place, self._places) - else: - self._places = [self._place] + self._places = [self._place] if isinstance(self._place, core.CUDAPlace): use_device = DeviceType.CUDA diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 49d073d988..2c5713e376 100755 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -1295,81 +1295,6 @@ class Executor: del trainer_instance self._default_executor.close() - def _run_parallel( - self, program, scope, feed, fetch_list, fetch_var_name, return_numpy - ): - from paddle.optimizer.lr import LRScheduler - - exe = program._executor - # TODO(zhenghuihuang): quantization uses Graph in CompiledProgram - # instead of program. We will add support for checking Vars in Graph - need_check_feed = program._program is not None - if need_check_feed: - global_block = program._program.global_block() - if isinstance(feed, dict): - feed_tensor_dict = dict() - for feed_name in feed: - feed_tensor = feed[feed_name] - var = global_block.var(feed_name) if need_check_feed else None - if not isinstance(feed_tensor, core.LoDTensor): - # always set to CPU place, since the tensor need to be split - # it is fast in CPU - feed_tensor = _as_lodtensor( - feed[feed_name], - core.CPUPlace(), - var.dtype if var else None, - ) - if need_check_feed: - check_feed_shape_type(var, feed_tensor, exe.device_count()) - feed_tensor_dict[feed_name] = feed_tensor - exe.feed_and_split_tensor_into_local_scopes(feed_tensor_dict) - - elif isinstance(feed, list) or isinstance(feed, tuple): - res = list() - for i, each in enumerate(feed): - if not isinstance(each, dict): - raise TypeError( - "Each element of feed list should be a dict" - ) - res_dict = dict() - for feed_name in each: - tensor = each[feed_name] - var = ( - global_block.var(feed_name) if need_check_feed else None - ) - if not isinstance(tensor, core.LoDTensor): - tensor = _as_lodtensor( - each[feed_name], - program._places[i], - var.dtype if var else None, - ) - if need_check_feed: - check_feed_shape_type(var, tensor) - res_dict[feed_name] = tensor - res.append(res_dict) - - exe.feed_tensors_into_local_scopes(res) - - if hasattr(program._program, 'lr_sheduler'): - lr_sheduler = program._program.lr_sheduler - assert isinstance(lr_sheduler, LRScheduler), "must be LRScheduler" - lr_value = lr_sheduler() - lr_var = program._program.global_block().vars[lr_sheduler._var_name] - lr_tensor = _as_lodtensor(lr_value, core.CPUPlace(), lr_var.dtype) - if core.is_cuda_graph_capturing(): - warnings.warn( - "Caution!!! When capturing CUDA Graph, the learning rate scheduler would not " - "take any effect! Please set the learning rate manually before each batch!" - ) - else: - exe.feed_and_split_tensor_into_local_scopes( - {lr_sheduler._var_name: lr_tensor} - ) - - fetch_var_names = list(map(_to_name_str, fetch_list)) - tensors = exe.run(fetch_var_names, True)._move_to_list() - return as_numpy(tensors) if return_numpy else tensors - def run( self, program=None, @@ -1673,23 +1598,7 @@ class Executor: else program._graph ) - # Unsupported case 1: data parallel - if ( - compiled_program._is_data_parallel - and len( - compiled_program._get_places( - place, compiled_program._places - ) - ) - != 1 - ): - warnings.warn( - "Standalone executor is not used for data parallel", - UserWarning, - ) - return False - - # Unsupported case 2: inference + # Unsupported case 1: inference if compiled_program._is_inference: warnings.warn( "Standalone executor is not used for inference", @@ -1697,7 +1606,7 @@ class Executor: ) return False - # Unsupported case 3: async mode + # Unsupported case 2: async mode if ( compiled_program._build_strategy is not None and compiled_program._build_strategy.async_mode @@ -1708,7 +1617,7 @@ class Executor: ) return False - # Unsupported case 4: CUDA Graph + # Unsupported case 3: CUDA Graph if ( compiled_program._build_strategy is not None and compiled_program._build_strategy.allow_cuda_graph_capture @@ -1803,24 +1712,6 @@ class Executor: # For backward compatibility, run directly. if not compiled: - # In distributed training, the compiled program is saved in Program._graph - has_compiled_graph = isinstance( - program._graph, compiler.CompiledProgram - ) - - if has_compiled_graph: - program._graph._compile(scope, self.place) - # _graph in program does not support inference since the _graph is optimized - # through optimizer.minimize function and should not be used as inference graph - # assert not program._graph._is_inference - return self._run_parallel( - program._graph, - scope=scope, - feed=feed, - fetch_list=fetch_list, - fetch_var_name=fetch_var_name, - return_numpy=return_numpy, - ) return self._run_program( program, @@ -1834,17 +1725,10 @@ class Executor: ) program._compile(scope, self.place) - if program._is_inference: - return self._run_inference(program._executor, feed) - else: - return self._run_parallel( - program, - scope=scope, - feed=feed, - fetch_list=fetch_list, - fetch_var_name=fetch_var_name, - return_numpy=return_numpy, - ) + assert ( + program._is_inference + ), f"Program must have _is_inference = True, but get {program._is_inference}" + return self._run_inference(program._executor, feed) def _run_program( self, diff --git a/python/paddle/fluid/tests/unittests/feed_data_reader.py b/python/paddle/fluid/tests/unittests/feed_data_reader.py index ef2e18a429..355d8f4d0b 100644 --- a/python/paddle/fluid/tests/unittests/feed_data_reader.py +++ b/python/paddle/fluid/tests/unittests/feed_data_reader.py @@ -57,19 +57,8 @@ class FeedDataReader: assert isinstance(exe, fluid.Executor), "exe must be Executor" use_cuda = isinstance(exe.place, fluid.CUDAPlace) if isinstance(program, fluid.CompiledProgram): - if program._is_data_parallel: - use_executor = False - if program._places is None: - device_num = ( - len(fluid.cuda_places()) - if use_cuda - else len(fluid.cpu_places()) - ) - else: - device_num = len(program._places) - else: - use_executor = True - device_num = 1 + use_executor = True + device_num = 1 else: use_executor = True device_num = 1 diff --git a/python/paddle/fluid/tests/unittests/standalone_executor/test_standalone_executor.py b/python/paddle/fluid/tests/unittests/standalone_executor/test_standalone_executor.py index fe61abc533..cfce6784ad 100644 --- a/python/paddle/fluid/tests/unittests/standalone_executor/test_standalone_executor.py +++ b/python/paddle/fluid/tests/unittests/standalone_executor/test_standalone_executor.py @@ -283,24 +283,6 @@ class SwitchExecutorInterfaceWithFeed(unittest.TestCase): with framework._enable_standalone_executor(): self._run(feed[0], add_wrong_fetch=True) - def test_compiled_program(self): - data = np.ones([2, 2], dtype="float32") - feed = {"a": data} - - res = self.run_new_executor(feed, use_compiled=True) - gt = self.run_raw_executor(feed, use_compiled=True) - for x, y in zip(gt, res): - np.testing.assert_array_equal(x, y) - - def test_compiled_program_convert_graph_to_program(self): - data = np.ones([2, 2], dtype="float32") - feed = {"a": data} - - res = self.run_new_executor(feed, use_compiled=True) - gt = self.run_raw_executor(feed, use_compiled=True) - for x, y in zip(gt, res): - np.testing.assert_array_equal(x, y) - def test_empty_program(self): program = paddle.static.Program() exe = paddle.static.Executor(self.place) -- GitLab