From b08c91ab7a4bac787bdf4437f75a3ed7585f405d Mon Sep 17 00:00:00 2001 From: wangxiaoning <71813629+wangxn12138@users.noreply.github.com> Date: Wed, 15 Feb 2023 12:17:41 +0800 Subject: [PATCH] [fluid clean]clean fluid.transpiler API (#50375) * move ascend_transpiler * move transpiler.collective * remver checkport * fix * fix import * fix import * add init * fix * fix * fix --- .../ascend/ascend_optimizer.py | 2 +- .../parameter_server_optimizer.py | 2 +- .../distributed/passes/ps_trainer_pass.py | 3 +- .../paddle/distributed/transpiler/__init__.py | 13 ++++ .../transpiler/ascend_transpiler.py | 3 +- .../transpiler/collective.py | 21 +++---- .../distribute_transpiler/__init__.py | 1 - .../fleet/parameter_server/pslib/__init__.py | 2 +- .../fluid/tests/unittests/test_boxps.py | 2 +- .../tests/unittests/test_dist_fleet_spmt.py | 4 +- .../fluid/transpiler/details/__init__.py | 1 - .../fluid/transpiler/details/checkport.py | 61 ------------------- .../fluid/transpiler/distribute_transpiler.py | 12 +++- 13 files changed, 41 insertions(+), 86 deletions(-) create mode 100644 python/paddle/distributed/transpiler/__init__.py rename python/paddle/{fluid => distributed}/transpiler/ascend_transpiler.py (98%) rename python/paddle/{fluid => distributed}/transpiler/collective.py (98%) delete mode 100644 python/paddle/fluid/transpiler/details/checkport.py diff --git a/python/paddle/distributed/fleet/meta_optimizers/ascend/ascend_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/ascend/ascend_optimizer.py index 8e88a213b54..b7d22882c82 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/ascend/ascend_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/ascend/ascend_optimizer.py @@ -261,7 +261,7 @@ class AscendOptimizer(Optimizer): from paddle.distributed import fleet if auto_dp and fleet.world_size() > 1: - from paddle.fluid.transpiler import ascend_transpiler + from paddle.distributed.transpiler import ascend_transpiler t = ascend_transpiler.AscendTranspiler( startup_program, loss.block.program diff --git a/python/paddle/distributed/fleet/meta_optimizers/parameter_server_optimizer.py b/python/paddle/distributed/fleet/meta_optimizers/parameter_server_optimizer.py index bf18f9316de..18bca1679d0 100644 --- a/python/paddle/distributed/fleet/meta_optimizers/parameter_server_optimizer.py +++ b/python/paddle/distributed/fleet/meta_optimizers/parameter_server_optimizer.py @@ -131,7 +131,7 @@ class ParameterServerOptimizer(MetaOptimizerBase): _startup = worker.fake_init_ops_pass(_startup, compiled_config) if use_ps_gpu: _main = worker.ps_gpu_pass(_main) - from paddle.fluid.transpiler.collective import ( + from paddle.distributed.transpiler.collective import ( SingleProcessMultiThread, ) diff --git a/python/paddle/distributed/passes/ps_trainer_pass.py b/python/paddle/distributed/passes/ps_trainer_pass.py index f25ede7f05e..b6a3bdd12f3 100755 --- a/python/paddle/distributed/passes/ps_trainer_pass.py +++ b/python/paddle/distributed/passes/ps_trainer_pass.py @@ -19,7 +19,6 @@ from _collections import defaultdict import paddle import paddle.fluid.framework as framework from paddle.distributed.passes.pass_base import PassBase, register_pass -from paddle.fluid.transpiler.collective import SingleProcessMultiThread from paddle.framework import core from paddle.static import Parameter, Program @@ -843,6 +842,8 @@ class PsTranspilePass(PassBase): return True def _apply_single_impl(self, main_program, startup_program, pass_ctx): + from ..transpiler.collective import SingleProcessMultiThread + attrs = pass_ctx._attrs t = SingleProcessMultiThread() env = get_dist_env() diff --git a/python/paddle/distributed/transpiler/__init__.py b/python/paddle/distributed/transpiler/__init__.py new file mode 100644 index 00000000000..eca2dce114b --- /dev/null +++ b/python/paddle/distributed/transpiler/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/python/paddle/fluid/transpiler/ascend_transpiler.py b/python/paddle/distributed/transpiler/ascend_transpiler.py similarity index 98% rename from python/paddle/fluid/transpiler/ascend_transpiler.py rename to python/paddle/distributed/transpiler/ascend_transpiler.py index 0b8a7b34163..da8a5fb558d 100644 --- a/python/paddle/fluid/transpiler/ascend_transpiler.py +++ b/python/paddle/distributed/transpiler/ascend_transpiler.py @@ -12,8 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +from paddle.framework import core + from . import collective -from .. import core OpRole = core.op_proto_and_checker_maker.OpRole from paddle.distributed import fleet diff --git a/python/paddle/fluid/transpiler/collective.py b/python/paddle/distributed/transpiler/collective.py similarity index 98% rename from python/paddle/fluid/transpiler/collective.py rename to python/paddle/distributed/transpiler/collective.py index 04bd68d2571..db07a1887d4 100644 --- a/python/paddle/fluid/transpiler/collective.py +++ b/python/paddle/distributed/transpiler/collective.py @@ -12,21 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -import sys -import math -from functools import reduce import os -import collections -import logging - -import numpy as np - -from .. import core, unique_name -from ..framework import Program, default_main_program, default_startup_program -from .details import wait_server_ready - -__all__ = ['GradAllReduce', 'LocalSGD', 'MultiThread'] +from paddle.distributed.fleet.base.private_helper_function import ( + wait_server_ready, +) +from paddle.fluid import unique_name +from paddle.framework import core +from paddle.static import default_main_program, default_startup_program OpRole = core.op_proto_and_checker_maker.OpRole @@ -540,7 +533,7 @@ class SingleProcessMultiThread(GradAllReduce): for idx, op in reversed(list(enumerate(block.ops))): if not self._is_backward_op(op): continue - if not self.op_role_var_key in op.attr_names: + if self.op_role_var_key not in op.attr_names: continue op_role_var = op.all_attrs()[self.op_role_var_key] if len(op_role_var) == 0: diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py index b78378560ad..7d058a72f46 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/__init__.py @@ -57,7 +57,6 @@ from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distribu StrategyFactory, ) -from paddle.fluid.transpiler.details.checkport import wait_server_ready from paddle.fluid.incubate.fleet.parameter_server.mode import PSMode from paddle.fluid.incubate.fleet.base.fleet_base import DistributedOptimizer diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py index 9d1a06ef09a..9bdfa35ab34 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py @@ -1230,7 +1230,7 @@ class DownpourOptimizer(DistributedOptimizer): fleet._main_programs = programs fleet._scopes = scopes if opt_info["use_ps_gpu"]: - from paddle.fluid.transpiler.collective import MultiThread + from paddle.distributed.transpiler.collective import MultiThread # check start program if program_mode not in [ diff --git a/python/paddle/fluid/tests/unittests/test_boxps.py b/python/paddle/fluid/tests/unittests/test_boxps.py index b6a1a845aa3..636502eed9c 100644 --- a/python/paddle/fluid/tests/unittests/test_boxps.py +++ b/python/paddle/fluid/tests/unittests/test_boxps.py @@ -17,8 +17,8 @@ import unittest import paddle import paddle.fluid as fluid import paddle.fluid.core as core +from paddle.distributed.transpiler import collective from paddle.fluid.layers.nn import _pull_box_sparse -from paddle.fluid.transpiler import collective class TestTranspile(unittest.TestCase): diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_spmt.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_spmt.py index 65dbacff5f9..61b91e08a33 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_spmt.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_spmt.py @@ -243,7 +243,9 @@ class TestSPMT(unittest.TestCase): print("===main_program====") print(main_program) print("===main_program====") - from paddle.fluid.transpiler.collective import SingleProcessMultiThread + from paddle.distributed.transpiler.collective import ( + SingleProcessMultiThread, + ) t = SingleProcessMultiThread() env = self.get_dist_env() diff --git a/python/paddle/fluid/transpiler/details/__init__.py b/python/paddle/fluid/transpiler/details/__init__.py index 10de0e95852..61e6ee2fb91 100644 --- a/python/paddle/fluid/transpiler/details/__init__.py +++ b/python/paddle/fluid/transpiler/details/__init__.py @@ -14,5 +14,4 @@ from .program_utils import * from .ufind import * -from .checkport import * from .vars_distributed import * diff --git a/python/paddle/fluid/transpiler/details/checkport.py b/python/paddle/fluid/transpiler/details/checkport.py deleted file mode 100644 index c0d8cdc9145..00000000000 --- a/python/paddle/fluid/transpiler/details/checkport.py +++ /dev/null @@ -1,61 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import sys -import time -import socket -from contextlib import closing - - -def wait_server_ready(endpoints): - """ - Wait until parameter servers are ready, use connext_ex to detect - port readiness. - - Args: - endpoints (list): endpoints string list, like: - ["127.0.0.1:8080", "127.0.0.1:8081"] - - Examples: - .. code-block:: python - - wait_server_ready(["127.0.0.1:8080", "127.0.0.1:8081"]) - """ - assert not isinstance(endpoints, str) - while True: - all_ok = True - not_ready_endpoints = [] - for ep in endpoints: - ip_port = ep.split(":") - with closing( - socket.socket(socket.AF_INET, socket.SOCK_STREAM) - ) as sock: - sock.settimeout(2) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - if hasattr(socket, 'SO_REUSEPORT'): - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) - - result = sock.connect_ex((ip_port[0], int(ip_port[1]))) - if result != 0: - all_ok = False - not_ready_endpoints.append(ep) - if not all_ok: - sys.stderr.write("server not ready, wait 3 sec to retry...\n") - sys.stderr.write( - "not ready endpoints:" + str(not_ready_endpoints) + "\n" - ) - sys.stderr.flush() - time.sleep(3) - else: - break diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index ac64448ea51..7bd092e8484 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -48,9 +48,8 @@ from ..framework import ( Parameter, grad_var_name, ) -from .details import wait_server_ready, UnionFind, VarStruct, VarsDistributed +from .details import UnionFind, VarStruct, VarsDistributed from .details import delete_ops, find_op_by_output_arg -from . import collective LOOKUP_TABLE_TYPE = ["lookup_table", "lookup_table_v2"] LOOKUP_TABLE_GRAD_TYPE = ["lookup_table_grad", "lookup_table_v2_grad"] @@ -372,6 +371,10 @@ class DistributeTranspiler: startup_program=None, wait_port=True, ): + from paddle.distributed.fleet.base.private_helper_function import ( + wait_server_ready, + ) + if not startup_program: startup_program = default_startup_program() if trainer_id >= 0: @@ -431,6 +434,8 @@ class DistributeTranspiler: main_program=None, wait_port=True, ): + from paddle.distributed.transpiler import collective + if isinstance(trainers, str): endpoints = trainers.split(",") elif isinstance(trainers, list): @@ -1124,6 +1129,9 @@ WIKI: https://github.com/PaddlePaddle/Fleet/blob/develop/markdown_doc/transpiler """ # remove optimize ops and add a send op to main_program # FIXME(typhoonzero): Also ops like clip_gradient, lrn_decay? + from paddle.distributed.fleet.base.private_helper_function import ( + wait_server_ready, + ) self._delete_trainer_optimizer(is_startup=True) sparse_table_names = self._get_sparse_table_names() -- GitLab