diff --git a/python/paddle/distributed/fleet/fleet.py b/python/paddle/distributed/fleet/fleet.py index 55650a5b5f6afa56c34de98229623db185fc2a83..74ee30e349fa9b74b6df48701664f4fd24aff999 100755 --- a/python/paddle/distributed/fleet/fleet.py +++ b/python/paddle/distributed/fleet/fleet.py @@ -18,9 +18,9 @@ import os import paddle from paddle.fluid import compiler from paddle.fluid.framework import in_dygraph_mode -from paddle.fluid.ir import apply_build_strategy from paddle.fluid.wrapped_decorator import wrap_decorator from paddle.framework import _global_flags +from paddle.framework.ir import apply_build_strategy from .base import topology as tp from .base.distributed_strategy import DistributedStrategy diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index eb4d227f914ff8330876525ce9827e74d5e4ab27..d4aa7734aee6fe95cbe1d0bd93fcad4b0b514ad0 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -12,245 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. -import multiprocessing -import os -import sys -import threading - -from ..data_feeder import DataFeeder -from .control_flow import BlockGuard from .. import core from ..executor import global_scope from ..framework import ( - convert_np_dtype_to_dtype_, default_main_program, default_startup_program, - program_guard, - Program, - Variable, ) -from ..layer_helper import LayerHelper from ..unique_name import generate as unique_name -import logging -from ..data_feeder import check_dtype, check_type -from paddle.fluid.framework import static_only -from ..framework import ( - _get_paddle_place, - _current_expected_place, - _set_expected_place, -) __all__ = [] -class BlockGuardServ(BlockGuard): - """ - BlockGuardServ class. - - BlockGuardServ class is used to create an op with a block in a program. - """ - - def __init__(self, server): - if not (isinstance(server, ListenAndServ)): - raise TypeError("BlockGuardServ takes a ListenAndServ") - super().__init__(server.helper.main_program) - self.server = server - - def __exit__(self, exc_type, exc_val, exc_tb): - if exc_type is not None: - return False - - self.server.complete_op() - return super().__exit__(exc_type, exc_val, exc_tb) - - -class ListenAndServ: - """ - **ListenAndServ Layer** - - ListenAndServ is used to create a rpc server bind and listen - on specific TCP port, this server will run the sub-block when - received variables from clients. - - Args: - endpoint(string): IP:port string which the server will listen on. - inputs(list): a list of variables that the server will get from clients. - fan_in(int): how many client are expected to report to this server, default: 1. - optimizer_mode(bool): whether to run the server as a parameter server, default: True. - - Examples: - .. code-block:: python - - import paddle.fluid as fluid - import paddle - with fluid.program_guard(main): - serv = layers.ListenAndServ( - "127.0.0.1:6170", ["X"], optimizer_mode=False) - with serv.do(): - x = paddle.static.data( - shape=[32, 32], - dtype='float32', - name="X") - paddle.nn.initializer.Constant(value=1.0)(x, main.global_block()) - paddle.scale(x=x, scale=10.0, out=out_var) - - exe = fluid.Executor(place) - exe.run(main) - """ - - def __init__(self, endpoint, inputs, fan_in=1, optimizer_mode=True): - self.helper = LayerHelper("listen_and_serv") - self.inputs = inputs - self.outputs = [] - self.endpoint = endpoint - self.fan_in = fan_in - # FIXME(typhoonzero): add optimizer_mode is stupid, should make it more - # general. - self.optimizer_mode = optimizer_mode - - def do(self): - return BlockGuardServ(self) - - def get_params_and_grads(self): - main_program = self.helper.main_program - current_block = main_program.current_block() - parent_block = self.parent_block() - # params and grads in the same order. - params = list() - grads = list() - for op in current_block.ops: - # FIXME(typhoonzero): op.inputs is None if it's cloned. - if self.optimizer_mode: - if "Grad" in op.inputs and "Param" in op.inputs: - params.append(op.inputs["Param"].name) - grads.append(op.inputs["Grad"].name) - else: - # simple recv mode, recv operators inputs. - for iname in op.input_names: - for in_var_name in op.input(iname): - params.append(parent_block.var(in_var_name)) - grads.append(parent_block.var(in_var_name)) - - return params, grads - - def parent_block(self): - prog = self.helper.main_program - parent_idx = prog.current_block().parent_idx - assert parent_idx >= 0 - parent_block = prog.block(parent_idx) - return parent_block - - def complete_op(self): - from ..incubate.fleet.parameter_server.mode import DistributedMode - - main_program = self.helper.main_program - current_block = main_program.current_block() - parent_block = self.parent_block() - - parent_block.append_op( - type='listen_and_serv', - inputs={"X": self.inputs}, - outputs={}, - attrs={ - 'endpoint': self.endpoint, - 'Fanin': self.fan_in, - 'optimize_blocks': [ - current_block - ], # did not support multiple optimize blocks in layers - 'distributed_mode': DistributedMode.SYNC, # did not support async now in layers - 'grad_to_block_id': [""], - }, - ) - - -def Send(endpoints, send_vars, dummy_output=None, sync=True): - """ - Send variables to the server side, and get vars from server - side when server have finished running server side program. - - Args: - endpoints (str): comma separated IP:PORT pairs in the order - of send_vars to send - send_vars (list): variables to send to server - sync (bool): whether to wait the request finish - - """ - assert type(send_vars) == list - - if dummy_output is None: - dummy_output = [] - elif isinstance(dummy_output, Variable): - dummy_output = [dummy_output] - - assert type(dummy_output) == list - - epmap = endpoints.split(",") - endpoints = list(set(epmap)) - - helper = LayerHelper("Send", **locals()) - rpc_op_role_name = core.op_proto_and_checker_maker.kOpRoleAttrName() - - helper.append_op( - type="send", - inputs={"X": send_vars}, - outputs={"Out": dummy_output}, - attrs={ - "endpoints": endpoints, - "epmap": epmap, - rpc_op_role_name: core.op_proto_and_checker_maker.OpRole.RPC, - }, - ) - if sync: - helper.append_op( - type="send_barrier", - inputs={"X": dummy_output}, - outputs={"Out": []}, - attrs={"endpoints": endpoints}, - ) - - -def Recv(endpoints, get_vars, dummy_input=None, sync=True): - """ - Receive variables from server side - - Args: - endpoints (str): comma separated IP:PORT pairs in the order - of send_vars to send - get_vars (list): vars to get from server after send completes. - sync (bool): whether to wait the request finish - - Returns: - list: list of received variables - """ - assert type(get_vars) == list - - if dummy_input is None: - dummy_input = [] - elif isinstance(dummy_input, Variable): - dummy_input = [dummy_input] - - assert type(dummy_input) == list - - epmap = endpoints.split(",") - endpoints = list(set(epmap)) - - helper = LayerHelper("Recv", **locals()) - helper.append_op( - type="recv", - inputs={"X": dummy_input}, - outputs={"Out": get_vars}, - attrs={"endpoints": endpoints, "epmap": epmap}, - ) - if sync: - helper.append_op( - type="fetch_barrier", - outputs={"Out": get_vars}, - attrs={"endpoints": endpoints}, - ) - return get_vars - - def monkey_patch_reader_methods(reader): def __get_reader__(): scope = global_scope() diff --git a/python/paddle/fluid/tests/unittests/test_apply_pass_to_program.py b/python/paddle/fluid/tests/unittests/test_apply_pass_to_program.py index f2a18ea156cbe796b4943e39c7bf1e4af0b208b7..7ec9ae6c43e1cf926a3ce9a08687e100bc5d08ea 100644 --- a/python/paddle/fluid/tests/unittests/test_apply_pass_to_program.py +++ b/python/paddle/fluid/tests/unittests/test_apply_pass_to_program.py @@ -19,7 +19,7 @@ import numpy as np import paddle import paddle.fluid as fluid from paddle.fluid.framework import _apply_pass -from paddle.fluid.ir import apply_build_strategy +from paddle.framework.ir import apply_build_strategy from paddle.nn import CrossEntropyLoss from paddle.vision.models import resnet50 diff --git a/python/paddle/fluid/tests/unittests/test_dist_train.py b/python/paddle/fluid/tests/unittests/test_dist_train.py index 548f2bf8a0c83d56a03b345dad24545b775e5a30..f93c229daa81f913fd10454aee63046ac7631c9a 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_train.py +++ b/python/paddle/fluid/tests/unittests/test_dist_train.py @@ -25,7 +25,7 @@ import paddle import paddle.fluid as fluid import paddle.fluid.layers.ops as ops from paddle.fluid import core -from paddle.fluid.layers.io import ListenAndServ, Recv, Send +from paddle.incubate.nn.layer.io import ListenAndServ, Recv, Send RPC_OP_ROLE_ATTR_NAME = ( op_role_attr_name diff --git a/python/paddle/fluid/ir.py b/python/paddle/framework/ir.py similarity index 97% rename from python/paddle/fluid/ir.py rename to python/paddle/framework/ir.py index c444b4cedaafa89a049d6dada8414595646028be..544eff024d7502c8d468456e7c1a090e5f08d530 100644 --- a/python/paddle/fluid/ir.py +++ b/python/paddle/framework/ir.py @@ -12,12 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -import copy -import inspect -from os import path -import paddle +from ..fluid.framework import _apply_pass from . import core -from .framework import _apply_pass def get_data_vars(program): diff --git a/python/paddle/incubate/nn/layer/io.py b/python/paddle/incubate/nn/layer/io.py new file mode 100644 index 0000000000000000000000000000000000000000..8e8522a8659039c33d457cfc2c313d4546e863b2 --- /dev/null +++ b/python/paddle/incubate/nn/layer/io.py @@ -0,0 +1,224 @@ +# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ....fluid.framework import Variable +from ....fluid.layers.control_flow import BlockGuard +from ....framework import LayerHelper, core + + +class BlockGuardServ(BlockGuard): + """ + BlockGuardServ class. + + BlockGuardServ class is used to create an op with a block in a program. + """ + + def __init__(self, server): + if not (isinstance(server, ListenAndServ)): + raise TypeError("BlockGuardServ takes a ListenAndServ") + super().__init__(server.helper.main_program) + self.server = server + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_type is not None: + return False + + self.server.complete_op() + return super().__exit__(exc_type, exc_val, exc_tb) + + +class ListenAndServ: + """ + **ListenAndServ Layer** + + ListenAndServ is used to create a rpc server bind and listen + on specific TCP port, this server will run the sub-block when + received variables from clients. + + Args: + endpoint(string): IP:port string which the server will listen on. + inputs(list): a list of variables that the server will get from clients. + fan_in(int): how many client are expected to report to this server, default: 1. + optimizer_mode(bool): whether to run the server as a parameter server, default: True. + + Examples: + .. code-block:: python + + import paddle.fluid as fluid + import paddle + with fluid.program_guard(main): + serv = layers.ListenAndServ( + "127.0.0.1:6170", ["X"], optimizer_mode=False) + with serv.do(): + x = paddle.static.data( + shape=[32, 32], + dtype='float32', + name="X") + paddle.nn.initializer.Constant(value=1.0)(x, main.global_block()) + paddle.scale(x=x, scale=10.0, out=out_var) + + exe = fluid.Executor(place) + exe.run(main) + """ + + def __init__(self, endpoint, inputs, fan_in=1, optimizer_mode=True): + self.helper = LayerHelper("listen_and_serv") + self.inputs = inputs + self.outputs = [] + self.endpoint = endpoint + self.fan_in = fan_in + # FIXME(typhoonzero): add optimizer_mode is stupid, should make it more + # general. + self.optimizer_mode = optimizer_mode + + def do(self): + return BlockGuardServ(self) + + def get_params_and_grads(self): + main_program = self.helper.main_program + current_block = main_program.current_block() + parent_block = self.parent_block() + # params and grads in the same order. + params = list() + grads = list() + for op in current_block.ops: + # FIXME(typhoonzero): op.inputs is None if it's cloned. + if self.optimizer_mode: + if "Grad" in op.inputs and "Param" in op.inputs: + params.append(op.inputs["Param"].name) + grads.append(op.inputs["Grad"].name) + else: + # simple recv mode, recv operators inputs. + for iname in op.input_names: + for in_var_name in op.input(iname): + params.append(parent_block.var(in_var_name)) + grads.append(parent_block.var(in_var_name)) + + return params, grads + + def parent_block(self): + prog = self.helper.main_program + parent_idx = prog.current_block().parent_idx + assert parent_idx >= 0 + parent_block = prog.block(parent_idx) + return parent_block + + def complete_op(self): + from paddle.incubate.fleet.parameter_server.mode import DistributedMode + + main_program = self.helper.main_program + current_block = main_program.current_block() + parent_block = self.parent_block() + + parent_block.append_op( + type='listen_and_serv', + inputs={"X": self.inputs}, + outputs={}, + attrs={ + 'endpoint': self.endpoint, + 'Fanin': self.fan_in, + 'optimize_blocks': [ + current_block + ], # did not support multiple optimize blocks in layers + 'distributed_mode': DistributedMode.SYNC, # did not support async now in layers + 'grad_to_block_id': [""], + }, + ) + + +def Send(endpoints, send_vars, dummy_output=None, sync=True): + """ + Send variables to the server side, and get vars from server + side when server have finished running server side program. + + Args: + endpoints (str): comma separated IP:PORT pairs in the order + of send_vars to send + send_vars (list): variables to send to server + sync (bool): whether to wait the request finish + + """ + assert type(send_vars) == list + + if dummy_output is None: + dummy_output = [] + elif isinstance(dummy_output, Variable): + dummy_output = [dummy_output] + + assert type(dummy_output) == list + + epmap = endpoints.split(",") + endpoints = list(set(epmap)) + + helper = LayerHelper("Send", **locals()) + rpc_op_role_name = core.op_proto_and_checker_maker.kOpRoleAttrName() + + helper.append_op( + type="send", + inputs={"X": send_vars}, + outputs={"Out": dummy_output}, + attrs={ + "endpoints": endpoints, + "epmap": epmap, + rpc_op_role_name: core.op_proto_and_checker_maker.OpRole.RPC, + }, + ) + if sync: + helper.append_op( + type="send_barrier", + inputs={"X": dummy_output}, + outputs={"Out": []}, + attrs={"endpoints": endpoints}, + ) + + +def Recv(endpoints, get_vars, dummy_input=None, sync=True): + """ + Receive variables from server side + + Args: + endpoints (str): comma separated IP:PORT pairs in the order + of send_vars to send + get_vars (list): vars to get from server after send completes. + sync (bool): whether to wait the request finish + + Returns: + list: list of received variables + """ + assert type(get_vars) == list + + if dummy_input is None: + dummy_input = [] + elif isinstance(dummy_input, Variable): + dummy_input = [dummy_input] + + assert type(dummy_input) == list + + epmap = endpoints.split(",") + endpoints = list(set(epmap)) + + helper = LayerHelper("Recv", **locals()) + helper.append_op( + type="recv", + inputs={"X": dummy_input}, + outputs={"Out": get_vars}, + attrs={"endpoints": endpoints, "epmap": epmap}, + ) + if sync: + helper.append_op( + type="fetch_barrier", + outputs={"Out": get_vars}, + attrs={"endpoints": endpoints}, + ) + return get_vars