未验证 提交 39278731 编写于 作者: K kangguangli 提交者: GitHub

[Executor] remove run_program branch (#52471)

* remove run_program

* remove FLAGS_USE_STANDALONE_EXECUTOR
上级 47c740e7
......@@ -473,9 +473,7 @@ class OptimizationTuner:
parent_env = copy.copy(os.environ.copy())
# env flags need for profile
new_env = {
"FLAGS_USE_STANDALONE_EXECUTOR": "False",
}
new_env = {}
new_env.update(parent_env)
# TODO if any rank hang or fail, kill all processes
......
......@@ -34,7 +34,6 @@ from paddle.distributed.auto_parallel.utils import (
ring_id_to_process_group,
)
from paddle.distributed.fleet.meta_optimizers.common import OP_ROLE_KEY, OpRole
from paddle.fluid.executor import _is_enable_standalone_executor
from paddle.static import default_main_program
from paddle.utils import unique_name
......@@ -97,7 +96,6 @@ class DataParallelOptimizationPass(PassBase):
self.global_rank = int(self.get_attr("global_rank"))
self.use_sharding = self.get_attr("use_sharding")
self.coalesce_prefix = 'coalesce_grad'
if _is_enable_standalone_executor():
self.gradient_sync_stream = "gradient_sync_stream"
with paddle.static.program_guard(main_program, startup_program):
......@@ -316,7 +314,6 @@ class DataParallelOptimizationPass(PassBase):
def _calc_wait_comms(self):
if _is_enable_standalone_executor():
return
block = default_main_program().global_block()
......@@ -602,7 +599,7 @@ class DataParallelOptimizationPass(PassBase):
# multiple stream executor(standalone exe). This function just for standalone exe. Refactor here
# in future when only one executor stay.
if not _is_enable_standalone_executor() or len(grad_groups) == 0:
if len(grad_groups) == 0:
return
block = default_main_program().global_block()
......
......@@ -18,7 +18,6 @@ import numpy as np
import paddle
from paddle.distributed.fleet.meta_optimizers.common import OP_ROLE_KEY, OpRole
from paddle.fluid.executor import _is_enable_standalone_executor
from ..auto_parallel.dist_attribute import OperatorDistAttr, TensorDistAttr
from ..auto_parallel.operators.common import (
......@@ -460,10 +459,7 @@ class ClipGradByGloblNormPass(PassBase):
)
self.clip_helper._init_dist_attr(allreduce_op)
if (
_is_enable_standalone_executor()
and insert_leaf_fill_constant_node
):
if insert_leaf_fill_constant_node:
# NOTE add naive deps for global norm sync in graph exe
j = idx - 1
......
......@@ -35,7 +35,6 @@ from paddle.distributed.auto_parallel.utils import (
set_var_dist_attr,
)
from paddle.distributed.fleet.meta_optimizers.sharding.utils import get_var_size
from paddle.fluid.executor import _is_enable_standalone_executor
from paddle.framework import core
from paddle.static import default_main_program, default_startup_program
from paddle.utils import unique_name
......@@ -1168,7 +1167,7 @@ class ShardingPass(PassBase):
P.S. this overlap pass is ONLY adapted for standalone executor (graph based) and stream awared allocator.
"""
if not _is_enable_standalone_executor() or (not self.enable_overlap):
if not self.enable_overlap:
return
self.grad_comm_group_stream_pairs = []
......
......@@ -21,7 +21,6 @@ from paddle.distributed.auto_parallel.utils import (
OpRole,
insert_dependencies_for_vars,
)
from paddle.fluid.executor import _is_enable_standalone_executor
from .auto_parallel_sharding import ShardingPass, _supported_optimizer_type
from .pass_base import PassBase, register_pass
......@@ -70,9 +69,7 @@ class AutoParalSupplementDepPass(PassBase):
def _apply_single_impl(self, main_program, startup_program, context):
# TODO general this pass for all case.
if not _is_enable_standalone_executor or not _sharding_pass_applied(
context
):
if not _sharding_pass_applied(context):
return
self._dist_context = self.get_attr("dist_context", None)
......
......@@ -493,14 +493,6 @@ def _to_name_str(var):
return _to_str(var)
def _is_enable_standalone_executor():
return (
framework._enable_standalone_executor_ is None
or framework._enable_standalone_executor_
in [1, '1', True, 'True', 'true']
)
def _is_dy2st_enable_standalone_executor():
return framework._dy2st_enable_standalone_executor_ in [
1,
......@@ -1004,8 +996,6 @@ class Executor:
"__auto_checkpoint_executor__"
)
# NOTE: Whether to use experimental executor `StandaloneExecutor`.
self._enable_interpreter_core = _is_enable_standalone_executor()
self._executor_cache = _ExecutorCache()
self._fleet_executor = None
......@@ -1605,9 +1595,7 @@ class Executor:
return True
if self._enable_interpreter_core and _can_use_interpreter_core(
program, self.place
):
if _can_use_interpreter_core(program, self.place):
if feed is None:
feed = {}
......@@ -1685,132 +1673,12 @@ class Executor:
acp._auto_checkpoint(self, program)
# For backward compatibility, run directly.
if not compiled:
return self._run_program(
program,
feed=feed,
fetch_list=fetch_list,
feed_var_name=feed_var_name,
fetch_var_name=fetch_var_name,
scope=scope,
return_numpy=return_numpy,
use_program_cache=use_program_cache,
)
program._compile(scope, self.place)
assert (
program._is_inference
), f"Program must have _is_inference = True, but get {program._is_inference}"
return self._run_inference(program._executor, feed)
def _run_program(
self,
program,
feed,
fetch_list,
feed_var_name,
fetch_var_name,
scope,
return_numpy,
use_program_cache,
):
from paddle.optimizer.lr import LRScheduler
if feed is None:
feed = {}
elif isinstance(feed, (list, tuple)):
assert len(feed) == 1, "Not compiled with data parallel"
feed = feed[0]
if not isinstance(feed, dict):
raise TypeError(
"feed requires dict as its Parameter. But you passed in %s"
% (type(feed))
)
assert program is not None, "The program should not be Empty"
if not isinstance(program, Program):
raise TypeError(
"Executor requires Program as its Parameter. But you passed in %s"
% (type(program))
)
if not isinstance(fetch_var_name, str):
raise TypeError(
"The name of fetch variable requires string as its Parameter. But you passed in %s"
% (type(fetch_var_name))
)
if use_program_cache:
cache_key = _get_strong_program_cache_key(program, feed, fetch_list)
cached_program = self._get_program_cache(cache_key)
cached_ctx = self._get_ctx_cache(cache_key)
cached_scope = self._get_scope_cache(cache_key)
if cached_program is None:
cached_program = _add_feed_fetch_ops(
program=program,
feed=feed,
fetch_list=fetch_list,
feed_var_name=feed_var_name,
fetch_var_name=fetch_var_name,
)
self._add_program_cache(cache_key, cached_program)
fetch_list_str = list(map(_to_name_str, fetch_list))
cached_ctx = self._default_executor.prepare(
cached_program.desc, 0, fetch_list_str, False
)
# currently, we cache program, vars, sub_scope here
# we suppose that in a life cycle of training, a user
# will not create many programs. So, here the basic
# rule of caching is to cache all unseen (program, var, scope)
# when a user use use_program_cache.
cached_scope = scope.new_scope()
self._default_executor.create_variables(
cached_program.desc, cached_scope, 0
)
self._add_ctx_cache(cache_key, cached_ctx)
self._add_scope_cache(cache_key, cached_scope)
program = cached_program
ctx = cached_ctx
scope = cached_scope
else:
program = _add_feed_fetch_ops(
program=program,
feed=feed,
fetch_list=fetch_list,
feed_var_name=feed_var_name,
fetch_var_name=fetch_var_name,
)
self._feed_data(program, feed, feed_var_name, scope)
if hasattr(program, 'lr_schedulerr'):
assert isinstance(
program.lr_scheduler, LRScheduler
), "must be LRScheduler"
lr_scheduler = program.lr_scheduler
lr_value = lr_scheduler()
lr_var = program.global_block().vars[lr_scheduler._var_name]
data = np.array([lr_value]).astype(convert_dtype(lr_var.dtype))
tensor = core.get_variable_tensor(scope, lr_scheduler._var_name)
tensor.set(data, self.place)
if not use_program_cache:
self._default_executor.run(
program.desc, scope, 0, True, True, [fetch_var_name]
)
else:
self._default_executor.run_prepared_ctx(
ctx, scope, False, False, False
)
arr = scope.find_var(fetch_var_name).get_fetch_list()
tensors = arr._move_to_list()
if return_numpy:
return as_numpy(tensors)
else:
return tensors
def _run_inference(self, exe, feed):
return exe.run(feed)
......
......@@ -116,9 +116,7 @@ _already_patch_eager_tensor = False
_already_patch_varbase = False
_current_cuda_graph_mode = None
_global_flags_ = core.globals()
_enable_standalone_executor_ = os.environ.get(
'FLAGS_USE_STANDALONE_EXECUTOR', None
)
_dy2st_enable_standalone_executor_ = os.environ.get(
'FLAGS_DY2ST_USE_STANDALONE_EXECUTOR', 1
)
......@@ -270,17 +268,6 @@ ipu_index_attr_name = 'ipu_index'
ipu_stage_attr_name = 'ipu_stage'
@signature_safe_contextmanager
def _enable_standalone_executor(enable=True):
global _enable_standalone_executor_
original_ = _enable_standalone_executor_
_enable_standalone_executor_ = enable
try:
yield
finally:
_enable_standalone_executor_ = original_
@signature_safe_contextmanager
def ipu_shard_guard(index=-1, stage=-1):
"""
......
......@@ -1150,25 +1150,6 @@ if(WITH_GLOO)
PROPERTIES TIMEOUT 120)
endif()
if($ENV{USE_STANDALONE_EXECUTOR})
# these test will fail in some server due to PR#42149, temporarily set it use old executor.
set_tests_properties(test_apply_pass_to_program
PROPERTIES ENVIRONMENT FLAGS_USE_STANDALONE_EXECUTOR=0)
set_tests_properties(test_buffer_shared_memory_reuse_pass
PROPERTIES ENVIRONMENT FLAGS_USE_STANDALONE_EXECUTOR=0)
set_tests_properties(
test_buffer_shared_memory_reuse_pass_and_fuse_optimization_op_pass
PROPERTIES ENVIRONMENT FLAGS_USE_STANDALONE_EXECUTOR=0)
set_tests_properties(test_imperative_optimizer
PROPERTIES ENVIRONMENT FLAGS_USE_STANDALONE_EXECUTOR=0)
set_tests_properties(test_imperative_star_gan_with_gradient_penalty
PROPERTIES ENVIRONMENT FLAGS_USE_STANDALONE_EXECUTOR=0)
set_tests_properties(test_switch_autotune
PROPERTIES ENVIRONMENT FLAGS_USE_STANDALONE_EXECUTOR=0)
set_tests_properties(test_imperative_mnist_sorted_gradient
PROPERTIES ENVIRONMENT FLAGS_USE_STANDALONE_EXECUTOR=0)
endif()
set(TEST_CINN_OPS
test_softmax_op
test_expand_v2_op
......
......@@ -201,9 +201,7 @@ if(WITH_GPU AND TENSORRT_FOUND)
set_tests_properties(test_trt_conv3d_op PROPERTIES TIMEOUT 60)
set_tests_properties(test_trt_conv3d_transpose_op PROPERTIES TIMEOUT 60)
set_tests_properties(test_trt_nearest_interp_v2_op PROPERTIES TIMEOUT 30)
set_tests_properties(
test_trt_multiclass_nms3_op PROPERTIES TIMEOUT 60 ENVIRONMENT
FLAGS_USE_STANDALONE_EXECUTOR=0)
set_tests_properties(test_trt_multiclass_nms3_op PROPERTIES TIMEOUT 60)
if(WITH_MKLDNN
AND TENSORRT_FOUND
......
......@@ -20,10 +20,6 @@ import paddle
from paddle import _legacy_C_ops
from paddle.fluid import core
from paddle.fluid.dygraph.base import switch_to_static_graph
from paddle.fluid.executor import (
_is_dy2st_enable_standalone_executor,
_is_enable_standalone_executor,
)
from paddle.fluid.framework import Variable
......@@ -140,10 +136,7 @@ class TestRunProgram(unittest.TestCase):
[out.name + '@GRAD'],
]
use_interpretorcore = (
_is_enable_standalone_executor()
and _is_dy2st_enable_standalone_executor()
)
use_interpretorcore = True
attrs.extend(('use_interpretorcore', use_interpretorcore))
if use_interpretorcore:
attrs.extend(
......
......@@ -21,10 +21,6 @@ import paddle
from paddle import _legacy_C_ops, fluid
from paddle.fluid import core, framework
from paddle.fluid.dygraph.base import switch_to_static_graph
from paddle.fluid.executor import (
_is_dy2st_enable_standalone_executor,
_is_enable_standalone_executor,
)
from paddle.fluid.framework import global_var
paddle.enable_static()
......@@ -240,10 +236,7 @@ class RunProgramOpTest(unittest.TestCase):
self.program_desc, self.fwd_op_num, len(outputs['Out'])
)
use_interpretorcore = (
_is_enable_standalone_executor()
and _is_dy2st_enable_standalone_executor()
)
use_interpretorcore = True
self.attrs.extend(('use_interpretorcore', use_interpretorcore))
if use_interpretorcore:
self.attrs.extend(
......@@ -292,10 +285,7 @@ class RunProgramOpTest(unittest.TestCase):
self.program_desc, self.fwd_op_num, len(outputs['Out'])
)
use_interpretorcore = (
_is_enable_standalone_executor()
and _is_dy2st_enable_standalone_executor()
)
use_interpretorcore = True
self.attrs.extend(('use_interpretorcore', use_interpretorcore))
if use_interpretorcore:
self.attrs.extend(
......
......@@ -21,10 +21,6 @@ import paddle
from paddle import _legacy_C_ops
from paddle.fluid import backward, core, framework, unique_name
from paddle.fluid.dygraph.base import switch_to_static_graph
from paddle.fluid.executor import (
_is_dy2st_enable_standalone_executor,
_is_enable_standalone_executor,
)
from paddle.fluid.framework import OpProtoHolder, _non_static_mode
from paddle.jit.dy2static.partial_program import (
LazyInitialized,
......@@ -976,10 +972,7 @@ def _run_dygraph(instance, input, program_holder):
)
)
use_interpretorcore = (
_is_enable_standalone_executor()
and _is_dy2st_enable_standalone_executor()
)
use_interpretorcore = True
attrs.extend(('use_interpretorcore', use_interpretorcore))
if use_interpretorcore:
attrs.extend(
......
......@@ -11,15 +11,6 @@ if(WITH_TESTING)
set_tests_properties(test_custom_relu_op_jit PROPERTIES TIMEOUT 180)
set_tests_properties(test_custom_relu_model PROPERTIES TIMEOUT 180)
set_tests_properties(test_context_pool PROPERTIES TIMEOUT 180)
if($ENV{USE_STANDALONE_EXECUTOR})
# these test will fail in some server due to PR#42149, temporarily set it use old executor.
set_tests_properties(
test_custom_relu_op_setup PROPERTIES ENVIRONMENT
FLAGS_USE_STANDALONE_EXECUTOR=0)
set_tests_properties(
test_custom_relu_model PROPERTIES ENVIRONMENT
FLAGS_USE_STANDALONE_EXECUTOR=0)
endif()
endif()
if(WITH_GPU AND WITH_DISTRIBUTE)
......
......@@ -17,7 +17,7 @@ import unittest
import numpy as np
import paddle
from paddle.fluid import core, framework
from paddle.fluid import core
from paddle.fluid.framework import Program, program_guard
paddle.enable_static()
......@@ -25,7 +25,7 @@ paddle.enable_static()
# test the compatibility of new executor: run old
# and new executor twice and check the result.
# please override the _get_feeds() and build_prgram()
# please override the _get_feeds() and build_prgram(), run_dygraph_once()
class TestCompatibility(unittest.TestCase):
def setUp(self):
self.place = (
......@@ -78,26 +78,53 @@ class TestCompatibility(unittest.TestCase):
ret.append(exe.run(main_program, feed=feed, fetch_list=fetch_vars))
return ret
def run_raw_executor(self, feed):
with framework._enable_standalone_executor(False):
out = self._run(feed)
def run_dygraph_once(self, feed):
x = paddle.tensor.fill_constant(shape=[1], dtype='float32', value=0.1)
y = paddle.tensor.fill_constant(shape=[1], dtype='float32', value=0.23)
if x < y:
out = [
paddle.tensor.fill_constant(
shape=[1, 2], dtype='int32', value=1
).numpy(),
paddle.tensor.fill_constant(
shape=[2, 3], dtype='bool', value=True
).numpy(),
]
else:
out = [
paddle.tensor.fill_constant(
shape=[3, 4], dtype='float32', value=3
).numpy(),
paddle.tensor.fill_constant(
shape=[4, 5], dtype='int64', value=2
).numpy(),
]
return out
def run_dygraph(self, feed):
ret = []
for _ in range(self.iter_run):
ret.append(self.run_dygraph_once(feed))
return ret
def run_new_executor(self, feed):
with framework._enable_standalone_executor(True):
out = self._run(feed)
return out
def test_with_feed(self):
feed = self._get_feed()
paddle.enable_static()
res = self.run_new_executor(feed)
gt = self.run_raw_executor(feed)
paddle.disable_static()
gt = self.run_dygraph(feed)
for x, y in zip(gt, res):
if isinstance(x, list):
for tx, ty in zip(x, y):
np.testing.assert_array_equal(tx, ty)
elif isinstance(x, np.ndarray):
np.testing.assert_array_equal(tx, ty)
np.testing.assert_array_equal(x, y)
else:
raise Exception("Not Implement!")
......@@ -129,6 +156,12 @@ class TestWhile(TestCompatibility):
exe = paddle.static.Executor(paddle.CPUPlace())
return main_program, startup_program, i
def run_dygraph_once(self, feed):
i = 1
while i < 10:
i = i + 1
return [i]
if __name__ == "__main__":
unittest.main()
......@@ -23,7 +23,7 @@ import unittest
import numpy as np
import paddle
from paddle.fluid import core, framework
from paddle.fluid import core
from paddle.fluid.core import StandaloneExecutor
from paddle.profiler import profiler
......@@ -143,7 +143,6 @@ class ExecutorStatisticsTestCase(unittest.TestCase):
scope = paddle.static.Scope()
with paddle.static.scope_guard(scope):
with framework._enable_standalone_executor(enable):
exe = paddle.static.Executor(self.place)
helper_profiler = profiler.Profiler(
targets=[profiler.ProfilerTarget.CPU], scheduler=(1, 2)
......@@ -183,7 +182,6 @@ class MultiStreamModelTestCase(unittest.TestCase):
paddle.seed(2020)
main_program, startup_program, fetch_list = build_program()
with framework._enable_standalone_executor(use_new_executor):
scope = core.Scope()
exe = paddle.static.Executor(self.place)
outs = []
......@@ -249,8 +247,27 @@ class SwitchExecutorInterfaceWithFeed(unittest.TestCase):
return outs
def run_raw_executor(self, feed, use_compiled=False):
with framework._enable_standalone_executor(False):
def run_dygraph(self, feed):
def run_once(is_double):
paddle.seed(2020)
a = feed['a']
a = paddle.to_tensor(a, dtype='float32')
b = paddle.ones([2, 2]) * 2
t = paddle.nn.Linear(2, 2)(a)
c = t + b
if is_double:
c = c + c
return c.numpy()
out1 = []
for i in range(self.iter_run):
out1.append(run_once(False))
out2 = []
for i in range(self.iter_run):
out2.append(run_once(True))
return [out1, out2]
def run_new_executor(self, feed, use_compiled=False):
# run construct program 1
out1 = self._run(
feed, use_str=False, is_double=False, use_compiled=use_compiled
......@@ -262,17 +279,14 @@ class SwitchExecutorInterfaceWithFeed(unittest.TestCase):
return [out1, out2]
def run_new_executor(self, feed, use_compiled=False):
with framework._enable_standalone_executor():
out = self.run_raw_executor(feed, use_compiled=use_compiled)
return out
def test_with_feed(self):
data = np.ones([2, 2], dtype="float32")
feed = {"a": data, 'fake_input': data}
with paddle.fluid.framework._static_guard():
res = self.run_new_executor(feed)
gt = self.run_raw_executor(feed)
with paddle.fluid.dygraph.guard():
gt = self.run_dygraph(feed)
for x, y in zip(gt, res):
np.testing.assert_array_equal(x, y)
......@@ -280,7 +294,6 @@ class SwitchExecutorInterfaceWithFeed(unittest.TestCase):
feed = [{'a': np.ones([2, 2], dtype="float32")}]
with self.assertRaises(TypeError):
with framework._enable_standalone_executor():
self._run(feed[0], add_wrong_fetch=True)
def test_empty_program(self):
......@@ -291,7 +304,6 @@ class SwitchExecutorInterfaceWithFeed(unittest.TestCase):
for i in range(10):
print(i, flush=1)
with framework._enable_standalone_executor():
out = exe.run(program, feed=None)
......@@ -328,7 +340,6 @@ class TestException(unittest.TestCase):
return out
def run_new_executor(self, feed):
with framework._enable_standalone_executor():
out = self._run(feed)
return out
......@@ -399,8 +410,6 @@ class TestInplaceApiWithDataTransform(unittest.TestCase):
with paddle.fluid.device_guard("cpu"):
x = paddle.increment(x)
exe = paddle.static.Executor(paddle.CUDAPlace(0))
with framework._enable_standalone_executor():
for i in range(10):
(a,) = exe.run(
paddle.static.default_main_program(), fetch_list=[x]
......
......@@ -39,6 +39,14 @@ class TestMultiplyWrite(TestCompatibility):
paddle.assign(inp2, out)
return main_program, startup_program, out
def run_dygraph_once(self, feed):
out = paddle.full((1,), 1)
inp1 = paddle.full((1,), 2)
inp2 = paddle.full((1,), 3)
paddle.assign(inp1, out)
paddle.assign(inp2, out)
return [out.numpy()]
def setUp(self):
self.place = paddle.CPUPlace()
self.iter_run = 5
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册