未验证 提交 67fc8e93 编写于 作者: R Ruibiao Chen 提交者: GitHub

Enable standalone executor for fleet training (#49293)

* Enable standalone executor for fleet training

* Update code

* Replace use_standalone_executor utils in auto parallel

* Update code

* Diable standalone executor for test_pass_sharding

* Update code

* Set sequential run for auto parallel

* Fix dist_attr bug

* Set sequential run for auto parallel
上级 44cb3da3
......@@ -288,8 +288,8 @@ class OperatorDistAttr {
std::string impl_type_ = kDefault;
int64_t impl_idx_ = 0;
bool is_recompute_ = false;
std::string execution_stream_;
int64_t scheduling_priority_; // lower value, higher priority, default to 0
std::string execution_stream_ = kDefault;
int64_t scheduling_priority_ = 0; // lower value, higher priority
std::map<std::string, bool> annotated_;
};
......
......@@ -228,6 +228,8 @@ class Engine:
self.history = None
paddle.framework.set_flags({'FLAGS_new_executor_sequential_run': 1})
def _prepare_data_spec(self, data, split, batch_size):
inputs_spec = []
labels_spec = []
......
......@@ -2337,13 +2337,3 @@ def is_dep_skip_op(op):
return True
return False
def use_standalone_executor():
return os.environ.get('FLAGS_CONVERT_GRAPH_TO_PROGRAM', None) in [
1,
'1',
True,
'True',
'true',
]
......@@ -27,10 +27,10 @@ from paddle.distributed.auto_parallel.utils import (
is_loss_grad_op,
is_optimize_op,
ring_id_to_process_group,
use_standalone_executor,
)
from paddle.distributed.fleet.meta_optimizers.common import OP_ROLE_KEY, OpRole
from paddle.fluid import unique_name
from paddle.fluid.executor import _is_enable_standalone_executor
from paddle.fluid.framework import default_main_program
from .pass_base import PassBase, PassType, register_pass
......@@ -92,7 +92,7 @@ class DataParallelOptimizationPass(PassBase):
self.global_rank = int(self.get_attr("global_rank"))
self.use_sharding = self.get_attr("use_sharding")
self.coalesce_prefix = 'coalesce_grad'
if use_standalone_executor():
if _is_enable_standalone_executor():
self.gradient_sync_stream = "gradient_sync_stream"
with paddle.static.program_guard(main_program, startup_program):
......@@ -313,7 +313,7 @@ class DataParallelOptimizationPass(PassBase):
def _calc_wait_comms(self):
if use_standalone_executor():
if _is_enable_standalone_executor():
return
block = default_main_program().global_block()
......@@ -546,7 +546,7 @@ class DataParallelOptimizationPass(PassBase):
# multiple stream executor(standalone exe). This function just for standalone exe. Refactor here
# in future when only one executor stay.
if not use_standalone_executor() or len(grad_groups) == 0:
if not _is_enable_standalone_executor() or len(grad_groups) == 0:
return
block = default_main_program().global_block()
......
......@@ -18,6 +18,7 @@ import numpy as np
import paddle
from paddle.distributed.fleet.meta_optimizers.common import OP_ROLE_KEY, OpRole
from paddle.fluid.executor import _is_enable_standalone_executor
from ..auto_parallel.dist_attribute import OperatorDistAttr, TensorDistAttr
from ..auto_parallel.operators.common import SyncMode
......@@ -29,7 +30,6 @@ from ..auto_parallel.utils import (
insert_dependencies_for_vars,
is_gradient_clip_op,
is_optimize_op,
use_standalone_executor,
)
from .pass_base import PassBase, register_pass
......@@ -378,7 +378,7 @@ class ClipGradByGloblNormPass(PassBase):
self.clip_helper._init_dist_attr(allreduce_op)
if (
use_standalone_executor
_is_enable_standalone_executor()
and insert_leaf_fill_constant_node
):
......
......@@ -33,10 +33,10 @@ from paddle.distributed.auto_parallel.utils import (
is_optimize_op,
naive_set_dist_op_attr_for_program_by_mesh_and_mapping,
set_var_dist_attr,
use_standalone_executor,
)
from paddle.distributed.fleet.meta_optimizers.sharding.utils import get_var_size
from paddle.fluid import unique_name
from paddle.fluid.executor import _is_enable_standalone_executor
from paddle.fluid.framework import default_main_program, default_startup_program
from paddle.framework import core
......@@ -1170,7 +1170,7 @@ class ShardingPass(PassBase):
P.S. this overlap pass is ONLY adapted for standalone executor (graph based) and stream awared allocator.
"""
if not use_standalone_executor() or (not self.enable_overlap):
if not _is_enable_standalone_executor() or (not self.enable_overlap):
return
self.grad_comm_group_stream_pairs = []
......
......@@ -20,8 +20,8 @@ from paddle.distributed.auto_parallel.operators.common import (
from paddle.distributed.auto_parallel.utils import (
OpRole,
insert_dependencies_for_vars,
use_standalone_executor,
)
from paddle.fluid.executor import _is_enable_standalone_executor
from .auto_parallel_sharding import ShardingPass, _supported_optimizer_type
from .pass_base import PassBase, register_pass
......@@ -70,7 +70,9 @@ class AutoParalSupplementDepPass(PassBase):
def _apply_single_impl(self, main_program, startup_program, context):
# TODO general this pass for all case.
if not use_standalone_executor or not _sharding_pass_applied(context):
if not _is_enable_standalone_executor or not _sharding_pass_applied(
context
):
return
self._dist_context = self.get_attr("dist_context", None)
......
......@@ -871,19 +871,6 @@ class _ExecutorCache:
use_fetch_v2=True,
)
if (
os.environ.get('FLAGS_CONVERT_GRAPH_TO_PROGRAM', None)
in [1, '1', True, 'True', 'true']
and not program._is_start_up_program_
):
if program.num_blocks > 1:
# If there are multiple blocks in the program, subblock will not be executed with the new executor in temporary
logging.warning("There are more than 1 block in program.")
elif program.num_blocks == 1:
logging.warning("There are 1 block in program.")
else:
logging.warning("There are no block in program.")
# standalone executor will apply buffer_shared_inplace_pass and
# inplace_addto_op_pass to program according to build_strategy
enable_inplace = (
......@@ -1711,10 +1698,6 @@ class Executor:
if core.is_compiled_with_mlu():
return False
use_standalone_executor_for_distribution = os.environ.get(
'FLAGS_CONVERT_GRAPH_TO_PROGRAM', None
) in [1, '1', True, 'True', 'true']
compiled = isinstance(
program, compiler.CompiledProgram
) or isinstance(program._graph, compiler.CompiledProgram)
......@@ -1784,20 +1767,8 @@ class Executor:
UserWarning,
)
return False
# delete this code after supporting fleet
from paddle.distributed.fleet import fleet
if fleet._role_maker is not None:
warnings.warn(
"Standalone executor is not used for fleet", UserWarning
)
return use_standalone_executor_for_distribution
return True
# NOTE: This is an experimental feature. If `export FLAGS_USE_STANDALONE_EXECUTOR=1 `,
# use StandaloneExecutor to run the program.
if (
return_merged
and self._enable_interpreter_core
......
......@@ -7,8 +7,6 @@ if(WITH_GPU OR APPLE)
# Compiling shared library will cost some time, but running process is very fast.
set_tests_properties(test_custom_relu_op_setup PROPERTIES TIMEOUT 250)
set_tests_properties(test_custom_relu_op_setup
PROPERTIES ENVIRONMENT FLAGS_CONVERT_GRAPH_TO_PROGRAM=1)
set_tests_properties(test_custom_relu_op_jit PROPERTIES TIMEOUT 180)
set_tests_properties(test_custom_relu_model PROPERTIES TIMEOUT 180)
set_tests_properties(test_context_pool PROPERTIES TIMEOUT 180)
......
......@@ -1251,11 +1251,6 @@ py_test_modules(
test_eager_deletion_padding_rnn_for_interpretercore MODULES
test_eager_deletion_padding_rnn ENVS FLAGS_CONVERT_GRAPH_TO_PROGRAM=true)
set_tests_properties(
test_buffer_shared_memory_reuse_pass
test_buffer_shared_memory_reuse_pass_and_fuse_optimization_op_pass
PROPERTIES ENVIRONMENT FLAGS_CONVERT_GRAPH_TO_PROGRAM=true)
# ExecutionStrategy is deprecated in standalone executor
set_tests_properties(test_parallel_executor_dry_run
PROPERTIES ENVIRONMENT "FLAGS_USE_STANDALONE_EXECUTOR=0")
......
......@@ -18,7 +18,6 @@ import sys
import tempfile
import unittest
os.environ["FLAGS_CONVERT_GRAPH_TO_PROGRAM"] = str(1)
os.environ["FLAGS_add_dependency_for_communication_op"] = 'false'
......
......@@ -31,7 +31,6 @@ class TestFleetGraphExecutionMetaOptimizer(unittest.TestCase):
"PADDLE_TRAINER_ENDPOINTS": "127.0.0.1:36001,127.0.0.1:36002",
"http_proxy": "",
"https_proxy": "",
"FLAGS_CONVERT_GRAPH_TO_PROGRAM": "1",
}
node_b = {
......@@ -41,7 +40,6 @@ class TestFleetGraphExecutionMetaOptimizer(unittest.TestCase):
"PADDLE_TRAINER_ENDPOINTS": "127.0.0.1:36001,127.0.0.1:36002",
"http_proxy": "",
"https_proxy": "",
"FLAGS_CONVERT_GRAPH_TO_PROGRAM": "1",
}
def node_func():
......
......@@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import random
import sys
import unittest
......@@ -123,7 +122,6 @@ class TestDataParallelPassWithScale2(TestDataParallelPassWithScale1):
class TestDataParallelPassWithStandaloneEXE(TestDataParallelPassWithScale1):
def init(self):
if paddle.is_compiled_with_cuda():
os.environ['FLAGS_CONVERT_GRAPH_TO_PROGRAM'] = "1"
paddle.set_flags({'FLAGS_cudnn_deterministic': 1})
self.rtol = 1e-5
self.atol = 1e-8
......
......@@ -1687,7 +1687,6 @@ class TestDistBase(unittest.TestCase):
"http_proxy": "",
"NCCL_P2P_DISABLE": "1",
"NCCL_SHM_DISABLE": "1",
"FLAGS_CONVERT_GRAPH_TO_PROGRAM": "1",
}
if check_error_log:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册