未验证 提交 b08c91ab 编写于 作者: W wangxiaoning 提交者: GitHub

[fluid clean]clean fluid.transpiler API (#50375)

* move ascend_transpiler

* move transpiler.collective

* remver checkport

* fix

* fix import

* fix import

* add init

* fix

* fix

* fix
上级 fe698fd4
...@@ -261,7 +261,7 @@ class AscendOptimizer(Optimizer): ...@@ -261,7 +261,7 @@ class AscendOptimizer(Optimizer):
from paddle.distributed import fleet from paddle.distributed import fleet
if auto_dp and fleet.world_size() > 1: if auto_dp and fleet.world_size() > 1:
from paddle.fluid.transpiler import ascend_transpiler from paddle.distributed.transpiler import ascend_transpiler
t = ascend_transpiler.AscendTranspiler( t = ascend_transpiler.AscendTranspiler(
startup_program, loss.block.program startup_program, loss.block.program
......
...@@ -131,7 +131,7 @@ class ParameterServerOptimizer(MetaOptimizerBase): ...@@ -131,7 +131,7 @@ class ParameterServerOptimizer(MetaOptimizerBase):
_startup = worker.fake_init_ops_pass(_startup, compiled_config) _startup = worker.fake_init_ops_pass(_startup, compiled_config)
if use_ps_gpu: if use_ps_gpu:
_main = worker.ps_gpu_pass(_main) _main = worker.ps_gpu_pass(_main)
from paddle.fluid.transpiler.collective import ( from paddle.distributed.transpiler.collective import (
SingleProcessMultiThread, SingleProcessMultiThread,
) )
......
...@@ -19,7 +19,6 @@ from _collections import defaultdict ...@@ -19,7 +19,6 @@ from _collections import defaultdict
import paddle import paddle
import paddle.fluid.framework as framework import paddle.fluid.framework as framework
from paddle.distributed.passes.pass_base import PassBase, register_pass from paddle.distributed.passes.pass_base import PassBase, register_pass
from paddle.fluid.transpiler.collective import SingleProcessMultiThread
from paddle.framework import core from paddle.framework import core
from paddle.static import Parameter, Program from paddle.static import Parameter, Program
...@@ -843,6 +842,8 @@ class PsTranspilePass(PassBase): ...@@ -843,6 +842,8 @@ class PsTranspilePass(PassBase):
return True return True
def _apply_single_impl(self, main_program, startup_program, pass_ctx): def _apply_single_impl(self, main_program, startup_program, pass_ctx):
from ..transpiler.collective import SingleProcessMultiThread
attrs = pass_ctx._attrs attrs = pass_ctx._attrs
t = SingleProcessMultiThread() t = SingleProcessMultiThread()
env = get_dist_env() env = get_dist_env()
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
...@@ -12,8 +12,9 @@ ...@@ -12,8 +12,9 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from paddle.framework import core
from . import collective from . import collective
from .. import core
OpRole = core.op_proto_and_checker_maker.OpRole OpRole = core.op_proto_and_checker_maker.OpRole
from paddle.distributed import fleet from paddle.distributed import fleet
......
...@@ -12,21 +12,14 @@ ...@@ -12,21 +12,14 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import sys
import math
from functools import reduce
import os import os
import collections from paddle.distributed.fleet.base.private_helper_function import (
import logging wait_server_ready,
)
import numpy as np from paddle.fluid import unique_name
from paddle.framework import core
from .. import core, unique_name from paddle.static import default_main_program, default_startup_program
from ..framework import Program, default_main_program, default_startup_program
from .details import wait_server_ready
__all__ = ['GradAllReduce', 'LocalSGD', 'MultiThread']
OpRole = core.op_proto_and_checker_maker.OpRole OpRole = core.op_proto_and_checker_maker.OpRole
...@@ -540,7 +533,7 @@ class SingleProcessMultiThread(GradAllReduce): ...@@ -540,7 +533,7 @@ class SingleProcessMultiThread(GradAllReduce):
for idx, op in reversed(list(enumerate(block.ops))): for idx, op in reversed(list(enumerate(block.ops))):
if not self._is_backward_op(op): if not self._is_backward_op(op):
continue continue
if not self.op_role_var_key in op.attr_names: if self.op_role_var_key not in op.attr_names:
continue continue
op_role_var = op.all_attrs()[self.op_role_var_key] op_role_var = op.all_attrs()[self.op_role_var_key]
if len(op_role_var) == 0: if len(op_role_var) == 0:
......
...@@ -57,7 +57,6 @@ from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distribu ...@@ -57,7 +57,6 @@ from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distribu
StrategyFactory, StrategyFactory,
) )
from paddle.fluid.transpiler.details.checkport import wait_server_ready
from paddle.fluid.incubate.fleet.parameter_server.mode import PSMode from paddle.fluid.incubate.fleet.parameter_server.mode import PSMode
from paddle.fluid.incubate.fleet.base.fleet_base import DistributedOptimizer from paddle.fluid.incubate.fleet.base.fleet_base import DistributedOptimizer
......
...@@ -1230,7 +1230,7 @@ class DownpourOptimizer(DistributedOptimizer): ...@@ -1230,7 +1230,7 @@ class DownpourOptimizer(DistributedOptimizer):
fleet._main_programs = programs fleet._main_programs = programs
fleet._scopes = scopes fleet._scopes = scopes
if opt_info["use_ps_gpu"]: if opt_info["use_ps_gpu"]:
from paddle.fluid.transpiler.collective import MultiThread from paddle.distributed.transpiler.collective import MultiThread
# check start program # check start program
if program_mode not in [ if program_mode not in [
......
...@@ -17,8 +17,8 @@ import unittest ...@@ -17,8 +17,8 @@ import unittest
import paddle import paddle
import paddle.fluid as fluid import paddle.fluid as fluid
import paddle.fluid.core as core import paddle.fluid.core as core
from paddle.distributed.transpiler import collective
from paddle.fluid.layers.nn import _pull_box_sparse from paddle.fluid.layers.nn import _pull_box_sparse
from paddle.fluid.transpiler import collective
class TestTranspile(unittest.TestCase): class TestTranspile(unittest.TestCase):
......
...@@ -243,7 +243,9 @@ class TestSPMT(unittest.TestCase): ...@@ -243,7 +243,9 @@ class TestSPMT(unittest.TestCase):
print("===main_program====") print("===main_program====")
print(main_program) print(main_program)
print("===main_program====") print("===main_program====")
from paddle.fluid.transpiler.collective import SingleProcessMultiThread from paddle.distributed.transpiler.collective import (
SingleProcessMultiThread,
)
t = SingleProcessMultiThread() t = SingleProcessMultiThread()
env = self.get_dist_env() env = self.get_dist_env()
......
...@@ -14,5 +14,4 @@ ...@@ -14,5 +14,4 @@
from .program_utils import * from .program_utils import *
from .ufind import * from .ufind import *
from .checkport import *
from .vars_distributed import * from .vars_distributed import *
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import time
import socket
from contextlib import closing
def wait_server_ready(endpoints):
"""
Wait until parameter servers are ready, use connext_ex to detect
port readiness.
Args:
endpoints (list): endpoints string list, like:
["127.0.0.1:8080", "127.0.0.1:8081"]
Examples:
.. code-block:: python
wait_server_ready(["127.0.0.1:8080", "127.0.0.1:8081"])
"""
assert not isinstance(endpoints, str)
while True:
all_ok = True
not_ready_endpoints = []
for ep in endpoints:
ip_port = ep.split(":")
with closing(
socket.socket(socket.AF_INET, socket.SOCK_STREAM)
) as sock:
sock.settimeout(2)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if hasattr(socket, 'SO_REUSEPORT'):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
result = sock.connect_ex((ip_port[0], int(ip_port[1])))
if result != 0:
all_ok = False
not_ready_endpoints.append(ep)
if not all_ok:
sys.stderr.write("server not ready, wait 3 sec to retry...\n")
sys.stderr.write(
"not ready endpoints:" + str(not_ready_endpoints) + "\n"
)
sys.stderr.flush()
time.sleep(3)
else:
break
...@@ -48,9 +48,8 @@ from ..framework import ( ...@@ -48,9 +48,8 @@ from ..framework import (
Parameter, Parameter,
grad_var_name, grad_var_name,
) )
from .details import wait_server_ready, UnionFind, VarStruct, VarsDistributed from .details import UnionFind, VarStruct, VarsDistributed
from .details import delete_ops, find_op_by_output_arg from .details import delete_ops, find_op_by_output_arg
from . import collective
LOOKUP_TABLE_TYPE = ["lookup_table", "lookup_table_v2"] LOOKUP_TABLE_TYPE = ["lookup_table", "lookup_table_v2"]
LOOKUP_TABLE_GRAD_TYPE = ["lookup_table_grad", "lookup_table_v2_grad"] LOOKUP_TABLE_GRAD_TYPE = ["lookup_table_grad", "lookup_table_v2_grad"]
...@@ -372,6 +371,10 @@ class DistributeTranspiler: ...@@ -372,6 +371,10 @@ class DistributeTranspiler:
startup_program=None, startup_program=None,
wait_port=True, wait_port=True,
): ):
from paddle.distributed.fleet.base.private_helper_function import (
wait_server_ready,
)
if not startup_program: if not startup_program:
startup_program = default_startup_program() startup_program = default_startup_program()
if trainer_id >= 0: if trainer_id >= 0:
...@@ -431,6 +434,8 @@ class DistributeTranspiler: ...@@ -431,6 +434,8 @@ class DistributeTranspiler:
main_program=None, main_program=None,
wait_port=True, wait_port=True,
): ):
from paddle.distributed.transpiler import collective
if isinstance(trainers, str): if isinstance(trainers, str):
endpoints = trainers.split(",") endpoints = trainers.split(",")
elif isinstance(trainers, list): elif isinstance(trainers, list):
...@@ -1124,6 +1129,9 @@ WIKI: https://github.com/PaddlePaddle/Fleet/blob/develop/markdown_doc/transpiler ...@@ -1124,6 +1129,9 @@ WIKI: https://github.com/PaddlePaddle/Fleet/blob/develop/markdown_doc/transpiler
""" """
# remove optimize ops and add a send op to main_program # remove optimize ops and add a send op to main_program
# FIXME(typhoonzero): Also ops like clip_gradient, lrn_decay? # FIXME(typhoonzero): Also ops like clip_gradient, lrn_decay?
from paddle.distributed.fleet.base.private_helper_function import (
wait_server_ready,
)
self._delete_trainer_optimizer(is_startup=True) self._delete_trainer_optimizer(is_startup=True)
sparse_table_names = self._get_sparse_table_names() sparse_table_names = self._get_sparse_table_names()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册