From 67fc8e9384188d49c3e5d2829e145b737b633e53 Mon Sep 17 00:00:00 2001 From: Ruibiao Chen Date: Sat, 7 Jan 2023 11:23:03 +0800 Subject: [PATCH] Enable standalone executor for fleet training (#49293) * Enable standalone executor for fleet training * Update code * Replace use_standalone_executor utils in auto parallel * Update code * Diable standalone executor for test_pass_sharding * Update code * Set sequential run for auto parallel * Fix dist_attr bug * Set sequential run for auto parallel --- .../distributed/auto_parallel/dist_attr.h | 4 +-- .../distributed/auto_parallel/engine.py | 2 ++ .../paddle/distributed/auto_parallel/utils.py | 10 ------- ...uto_parallel_data_parallel_optimization.py | 8 ++--- .../passes/auto_parallel_grad_clip.py | 4 +-- .../passes/auto_parallel_sharding.py | 4 +-- ...rallel_supplement_explicit_dependencies.py | 6 ++-- python/paddle/fluid/executor.py | 29 ------------------- .../fluid/tests/custom_op/CMakeLists.txt | 2 -- .../fluid/tests/unittests/CMakeLists.txt | 5 ---- .../test_sharding_with_newexe.py | 1 - .../fleet/test_fleet_graph_executor.py | 2 -- ...arallel_data_parallel_optimization_pass.py | 2 -- .../fluid/tests/unittests/test_dist_base.py | 1 - 14 files changed, 16 insertions(+), 64 deletions(-) diff --git a/paddle/fluid/distributed/auto_parallel/dist_attr.h b/paddle/fluid/distributed/auto_parallel/dist_attr.h index 637c2b05594..b38a21f336a 100644 --- a/paddle/fluid/distributed/auto_parallel/dist_attr.h +++ b/paddle/fluid/distributed/auto_parallel/dist_attr.h @@ -288,8 +288,8 @@ class OperatorDistAttr { std::string impl_type_ = kDefault; int64_t impl_idx_ = 0; bool is_recompute_ = false; - std::string execution_stream_; - int64_t scheduling_priority_; // lower value, higher priority, default to 0 + std::string execution_stream_ = kDefault; + int64_t scheduling_priority_ = 0; // lower value, higher priority std::map annotated_; }; diff --git a/python/paddle/distributed/auto_parallel/engine.py b/python/paddle/distributed/auto_parallel/engine.py index 445cb50c7f6..d8febaf9d51 100644 --- a/python/paddle/distributed/auto_parallel/engine.py +++ b/python/paddle/distributed/auto_parallel/engine.py @@ -228,6 +228,8 @@ class Engine: self.history = None + paddle.framework.set_flags({'FLAGS_new_executor_sequential_run': 1}) + def _prepare_data_spec(self, data, split, batch_size): inputs_spec = [] labels_spec = [] diff --git a/python/paddle/distributed/auto_parallel/utils.py b/python/paddle/distributed/auto_parallel/utils.py index 32874f07691..9eb2f806eab 100644 --- a/python/paddle/distributed/auto_parallel/utils.py +++ b/python/paddle/distributed/auto_parallel/utils.py @@ -2337,13 +2337,3 @@ def is_dep_skip_op(op): return True return False - - -def use_standalone_executor(): - return os.environ.get('FLAGS_CONVERT_GRAPH_TO_PROGRAM', None) in [ - 1, - '1', - True, - 'True', - 'true', - ] diff --git a/python/paddle/distributed/passes/auto_parallel_data_parallel_optimization.py b/python/paddle/distributed/passes/auto_parallel_data_parallel_optimization.py index 2cfa113a7ae..8be8c67bca6 100644 --- a/python/paddle/distributed/passes/auto_parallel_data_parallel_optimization.py +++ b/python/paddle/distributed/passes/auto_parallel_data_parallel_optimization.py @@ -27,10 +27,10 @@ from paddle.distributed.auto_parallel.utils import ( is_loss_grad_op, is_optimize_op, ring_id_to_process_group, - use_standalone_executor, ) from paddle.distributed.fleet.meta_optimizers.common import OP_ROLE_KEY, OpRole from paddle.fluid import unique_name +from paddle.fluid.executor import _is_enable_standalone_executor from paddle.fluid.framework import default_main_program from .pass_base import PassBase, PassType, register_pass @@ -92,7 +92,7 @@ class DataParallelOptimizationPass(PassBase): self.global_rank = int(self.get_attr("global_rank")) self.use_sharding = self.get_attr("use_sharding") self.coalesce_prefix = 'coalesce_grad' - if use_standalone_executor(): + if _is_enable_standalone_executor(): self.gradient_sync_stream = "gradient_sync_stream" with paddle.static.program_guard(main_program, startup_program): @@ -313,7 +313,7 @@ class DataParallelOptimizationPass(PassBase): def _calc_wait_comms(self): - if use_standalone_executor(): + if _is_enable_standalone_executor(): return block = default_main_program().global_block() @@ -546,7 +546,7 @@ class DataParallelOptimizationPass(PassBase): # multiple stream executor(standalone exe). This function just for standalone exe. Refactor here # in future when only one executor stay. - if not use_standalone_executor() or len(grad_groups) == 0: + if not _is_enable_standalone_executor() or len(grad_groups) == 0: return block = default_main_program().global_block() diff --git a/python/paddle/distributed/passes/auto_parallel_grad_clip.py b/python/paddle/distributed/passes/auto_parallel_grad_clip.py index 8503ea94b98..0ff3fbcf951 100644 --- a/python/paddle/distributed/passes/auto_parallel_grad_clip.py +++ b/python/paddle/distributed/passes/auto_parallel_grad_clip.py @@ -18,6 +18,7 @@ import numpy as np import paddle from paddle.distributed.fleet.meta_optimizers.common import OP_ROLE_KEY, OpRole +from paddle.fluid.executor import _is_enable_standalone_executor from ..auto_parallel.dist_attribute import OperatorDistAttr, TensorDistAttr from ..auto_parallel.operators.common import SyncMode @@ -29,7 +30,6 @@ from ..auto_parallel.utils import ( insert_dependencies_for_vars, is_gradient_clip_op, is_optimize_op, - use_standalone_executor, ) from .pass_base import PassBase, register_pass @@ -378,7 +378,7 @@ class ClipGradByGloblNormPass(PassBase): self.clip_helper._init_dist_attr(allreduce_op) if ( - use_standalone_executor + _is_enable_standalone_executor() and insert_leaf_fill_constant_node ): diff --git a/python/paddle/distributed/passes/auto_parallel_sharding.py b/python/paddle/distributed/passes/auto_parallel_sharding.py index 5f68131d042..bb3ebbeaf8f 100644 --- a/python/paddle/distributed/passes/auto_parallel_sharding.py +++ b/python/paddle/distributed/passes/auto_parallel_sharding.py @@ -33,10 +33,10 @@ from paddle.distributed.auto_parallel.utils import ( is_optimize_op, naive_set_dist_op_attr_for_program_by_mesh_and_mapping, set_var_dist_attr, - use_standalone_executor, ) from paddle.distributed.fleet.meta_optimizers.sharding.utils import get_var_size from paddle.fluid import unique_name +from paddle.fluid.executor import _is_enable_standalone_executor from paddle.fluid.framework import default_main_program, default_startup_program from paddle.framework import core @@ -1170,7 +1170,7 @@ class ShardingPass(PassBase): P.S. this overlap pass is ONLY adapted for standalone executor (graph based) and stream awared allocator. """ - if not use_standalone_executor() or (not self.enable_overlap): + if not _is_enable_standalone_executor() or (not self.enable_overlap): return self.grad_comm_group_stream_pairs = [] diff --git a/python/paddle/distributed/passes/auto_parallel_supplement_explicit_dependencies.py b/python/paddle/distributed/passes/auto_parallel_supplement_explicit_dependencies.py index 0650c4c577e..07de0c1dcbf 100644 --- a/python/paddle/distributed/passes/auto_parallel_supplement_explicit_dependencies.py +++ b/python/paddle/distributed/passes/auto_parallel_supplement_explicit_dependencies.py @@ -20,8 +20,8 @@ from paddle.distributed.auto_parallel.operators.common import ( from paddle.distributed.auto_parallel.utils import ( OpRole, insert_dependencies_for_vars, - use_standalone_executor, ) +from paddle.fluid.executor import _is_enable_standalone_executor from .auto_parallel_sharding import ShardingPass, _supported_optimizer_type from .pass_base import PassBase, register_pass @@ -70,7 +70,9 @@ class AutoParalSupplementDepPass(PassBase): def _apply_single_impl(self, main_program, startup_program, context): # TODO general this pass for all case. - if not use_standalone_executor or not _sharding_pass_applied(context): + if not _is_enable_standalone_executor or not _sharding_pass_applied( + context + ): return self._dist_context = self.get_attr("dist_context", None) diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index b2ce9156691..d5db9a7f72c 100755 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -871,19 +871,6 @@ class _ExecutorCache: use_fetch_v2=True, ) - if ( - os.environ.get('FLAGS_CONVERT_GRAPH_TO_PROGRAM', None) - in [1, '1', True, 'True', 'true'] - and not program._is_start_up_program_ - ): - if program.num_blocks > 1: - # If there are multiple blocks in the program, subblock will not be executed with the new executor in temporary - logging.warning("There are more than 1 block in program.") - elif program.num_blocks == 1: - logging.warning("There are 1 block in program.") - else: - logging.warning("There are no block in program.") - # standalone executor will apply buffer_shared_inplace_pass and # inplace_addto_op_pass to program according to build_strategy enable_inplace = ( @@ -1711,10 +1698,6 @@ class Executor: if core.is_compiled_with_mlu(): return False - use_standalone_executor_for_distribution = os.environ.get( - 'FLAGS_CONVERT_GRAPH_TO_PROGRAM', None - ) in [1, '1', True, 'True', 'true'] - compiled = isinstance( program, compiler.CompiledProgram ) or isinstance(program._graph, compiler.CompiledProgram) @@ -1784,20 +1767,8 @@ class Executor: UserWarning, ) return False - - # delete this code after supporting fleet - from paddle.distributed.fleet import fleet - - if fleet._role_maker is not None: - warnings.warn( - "Standalone executor is not used for fleet", UserWarning - ) - return use_standalone_executor_for_distribution - return True - # NOTE: This is an experimental feature. If `export FLAGS_USE_STANDALONE_EXECUTOR=1 `, - # use StandaloneExecutor to run the program. if ( return_merged and self._enable_interpreter_core diff --git a/python/paddle/fluid/tests/custom_op/CMakeLists.txt b/python/paddle/fluid/tests/custom_op/CMakeLists.txt index 1b2e8b6f868..b939ee43883 100644 --- a/python/paddle/fluid/tests/custom_op/CMakeLists.txt +++ b/python/paddle/fluid/tests/custom_op/CMakeLists.txt @@ -7,8 +7,6 @@ if(WITH_GPU OR APPLE) # Compiling shared library will cost some time, but running process is very fast. set_tests_properties(test_custom_relu_op_setup PROPERTIES TIMEOUT 250) - set_tests_properties(test_custom_relu_op_setup - PROPERTIES ENVIRONMENT FLAGS_CONVERT_GRAPH_TO_PROGRAM=1) set_tests_properties(test_custom_relu_op_jit PROPERTIES TIMEOUT 180) set_tests_properties(test_custom_relu_model PROPERTIES TIMEOUT 180) set_tests_properties(test_context_pool PROPERTIES TIMEOUT 180) diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index b9904cb6f29..173c1b1b5db 100755 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -1251,11 +1251,6 @@ py_test_modules( test_eager_deletion_padding_rnn_for_interpretercore MODULES test_eager_deletion_padding_rnn ENVS FLAGS_CONVERT_GRAPH_TO_PROGRAM=true) -set_tests_properties( - test_buffer_shared_memory_reuse_pass - test_buffer_shared_memory_reuse_pass_and_fuse_optimization_op_pass - PROPERTIES ENVIRONMENT FLAGS_CONVERT_GRAPH_TO_PROGRAM=true) - # ExecutionStrategy is deprecated in standalone executor set_tests_properties(test_parallel_executor_dry_run PROPERTIES ENVIRONMENT "FLAGS_USE_STANDALONE_EXECUTOR=0") diff --git a/python/paddle/fluid/tests/unittests/auto_parallel/test_sharding_with_newexe.py b/python/paddle/fluid/tests/unittests/auto_parallel/test_sharding_with_newexe.py index 91ffae423c3..f71e024f36d 100644 --- a/python/paddle/fluid/tests/unittests/auto_parallel/test_sharding_with_newexe.py +++ b/python/paddle/fluid/tests/unittests/auto_parallel/test_sharding_with_newexe.py @@ -18,7 +18,6 @@ import sys import tempfile import unittest -os.environ["FLAGS_CONVERT_GRAPH_TO_PROGRAM"] = str(1) os.environ["FLAGS_add_dependency_for_communication_op"] = 'false' diff --git a/python/paddle/fluid/tests/unittests/collective/fleet/test_fleet_graph_executor.py b/python/paddle/fluid/tests/unittests/collective/fleet/test_fleet_graph_executor.py index f5eb234945c..58091bd847f 100644 --- a/python/paddle/fluid/tests/unittests/collective/fleet/test_fleet_graph_executor.py +++ b/python/paddle/fluid/tests/unittests/collective/fleet/test_fleet_graph_executor.py @@ -31,7 +31,6 @@ class TestFleetGraphExecutionMetaOptimizer(unittest.TestCase): "PADDLE_TRAINER_ENDPOINTS": "127.0.0.1:36001,127.0.0.1:36002", "http_proxy": "", "https_proxy": "", - "FLAGS_CONVERT_GRAPH_TO_PROGRAM": "1", } node_b = { @@ -41,7 +40,6 @@ class TestFleetGraphExecutionMetaOptimizer(unittest.TestCase): "PADDLE_TRAINER_ENDPOINTS": "127.0.0.1:36001,127.0.0.1:36002", "http_proxy": "", "https_proxy": "", - "FLAGS_CONVERT_GRAPH_TO_PROGRAM": "1", } def node_func(): diff --git a/python/paddle/fluid/tests/unittests/distributed_passes/test_auto_parallel_data_parallel_optimization_pass.py b/python/paddle/fluid/tests/unittests/distributed_passes/test_auto_parallel_data_parallel_optimization_pass.py index 5a6486991dc..d2b5b9fe62b 100644 --- a/python/paddle/fluid/tests/unittests/distributed_passes/test_auto_parallel_data_parallel_optimization_pass.py +++ b/python/paddle/fluid/tests/unittests/distributed_passes/test_auto_parallel_data_parallel_optimization_pass.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os import random import sys import unittest @@ -123,7 +122,6 @@ class TestDataParallelPassWithScale2(TestDataParallelPassWithScale1): class TestDataParallelPassWithStandaloneEXE(TestDataParallelPassWithScale1): def init(self): if paddle.is_compiled_with_cuda(): - os.environ['FLAGS_CONVERT_GRAPH_TO_PROGRAM'] = "1" paddle.set_flags({'FLAGS_cudnn_deterministic': 1}) self.rtol = 1e-5 self.atol = 1e-8 diff --git a/python/paddle/fluid/tests/unittests/test_dist_base.py b/python/paddle/fluid/tests/unittests/test_dist_base.py index 03eb219a0b5..b93b494a351 100755 --- a/python/paddle/fluid/tests/unittests/test_dist_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_base.py @@ -1687,7 +1687,6 @@ class TestDistBase(unittest.TestCase): "http_proxy": "", "NCCL_P2P_DISABLE": "1", "NCCL_SHM_DISABLE": "1", - "FLAGS_CONVERT_GRAPH_TO_PROGRAM": "1", } if check_error_log: -- GitLab