未验证 提交 00877381 编写于 作者: Q qizhaoaoe 提交者: GitHub

fluid clean: remove fluid.ir and fluid.io (#51167)

* fluid clean: remove fluid.ir to framework.ir and some funcs form fluid.layer.io to incubate.

* delete fluid.ir
上级 3f991128
......@@ -18,9 +18,9 @@ import os
import paddle
from paddle.fluid import compiler
from paddle.fluid.framework import in_dygraph_mode
from paddle.fluid.ir import apply_build_strategy
from paddle.fluid.wrapped_decorator import wrap_decorator
from paddle.framework import _global_flags
from paddle.framework.ir import apply_build_strategy
from .base import topology as tp
from .base.distributed_strategy import DistributedStrategy
......
......@@ -12,245 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import multiprocessing
import os
import sys
import threading
from ..data_feeder import DataFeeder
from .control_flow import BlockGuard
from .. import core
from ..executor import global_scope
from ..framework import (
convert_np_dtype_to_dtype_,
default_main_program,
default_startup_program,
program_guard,
Program,
Variable,
)
from ..layer_helper import LayerHelper
from ..unique_name import generate as unique_name
import logging
from ..data_feeder import check_dtype, check_type
from paddle.fluid.framework import static_only
from ..framework import (
_get_paddle_place,
_current_expected_place,
_set_expected_place,
)
__all__ = []
class BlockGuardServ(BlockGuard):
"""
BlockGuardServ class.
BlockGuardServ class is used to create an op with a block in a program.
"""
def __init__(self, server):
if not (isinstance(server, ListenAndServ)):
raise TypeError("BlockGuardServ takes a ListenAndServ")
super().__init__(server.helper.main_program)
self.server = server
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
return False
self.server.complete_op()
return super().__exit__(exc_type, exc_val, exc_tb)
class ListenAndServ:
"""
**ListenAndServ Layer**
ListenAndServ is used to create a rpc server bind and listen
on specific TCP port, this server will run the sub-block when
received variables from clients.
Args:
endpoint(string): IP:port string which the server will listen on.
inputs(list): a list of variables that the server will get from clients.
fan_in(int): how many client are expected to report to this server, default: 1.
optimizer_mode(bool): whether to run the server as a parameter server, default: True.
Examples:
.. code-block:: python
import paddle.fluid as fluid
import paddle
with fluid.program_guard(main):
serv = layers.ListenAndServ(
"127.0.0.1:6170", ["X"], optimizer_mode=False)
with serv.do():
x = paddle.static.data(
shape=[32, 32],
dtype='float32',
name="X")
paddle.nn.initializer.Constant(value=1.0)(x, main.global_block())
paddle.scale(x=x, scale=10.0, out=out_var)
exe = fluid.Executor(place)
exe.run(main)
"""
def __init__(self, endpoint, inputs, fan_in=1, optimizer_mode=True):
self.helper = LayerHelper("listen_and_serv")
self.inputs = inputs
self.outputs = []
self.endpoint = endpoint
self.fan_in = fan_in
# FIXME(typhoonzero): add optimizer_mode is stupid, should make it more
# general.
self.optimizer_mode = optimizer_mode
def do(self):
return BlockGuardServ(self)
def get_params_and_grads(self):
main_program = self.helper.main_program
current_block = main_program.current_block()
parent_block = self.parent_block()
# params and grads in the same order.
params = list()
grads = list()
for op in current_block.ops:
# FIXME(typhoonzero): op.inputs is None if it's cloned.
if self.optimizer_mode:
if "Grad" in op.inputs and "Param" in op.inputs:
params.append(op.inputs["Param"].name)
grads.append(op.inputs["Grad"].name)
else:
# simple recv mode, recv operators inputs.
for iname in op.input_names:
for in_var_name in op.input(iname):
params.append(parent_block.var(in_var_name))
grads.append(parent_block.var(in_var_name))
return params, grads
def parent_block(self):
prog = self.helper.main_program
parent_idx = prog.current_block().parent_idx
assert parent_idx >= 0
parent_block = prog.block(parent_idx)
return parent_block
def complete_op(self):
from ..incubate.fleet.parameter_server.mode import DistributedMode
main_program = self.helper.main_program
current_block = main_program.current_block()
parent_block = self.parent_block()
parent_block.append_op(
type='listen_and_serv',
inputs={"X": self.inputs},
outputs={},
attrs={
'endpoint': self.endpoint,
'Fanin': self.fan_in,
'optimize_blocks': [
current_block
], # did not support multiple optimize blocks in layers
'distributed_mode': DistributedMode.SYNC, # did not support async now in layers
'grad_to_block_id': [""],
},
)
def Send(endpoints, send_vars, dummy_output=None, sync=True):
"""
Send variables to the server side, and get vars from server
side when server have finished running server side program.
Args:
endpoints (str): comma separated IP:PORT pairs in the order
of send_vars to send
send_vars (list): variables to send to server
sync (bool): whether to wait the request finish
"""
assert type(send_vars) == list
if dummy_output is None:
dummy_output = []
elif isinstance(dummy_output, Variable):
dummy_output = [dummy_output]
assert type(dummy_output) == list
epmap = endpoints.split(",")
endpoints = list(set(epmap))
helper = LayerHelper("Send", **locals())
rpc_op_role_name = core.op_proto_and_checker_maker.kOpRoleAttrName()
helper.append_op(
type="send",
inputs={"X": send_vars},
outputs={"Out": dummy_output},
attrs={
"endpoints": endpoints,
"epmap": epmap,
rpc_op_role_name: core.op_proto_and_checker_maker.OpRole.RPC,
},
)
if sync:
helper.append_op(
type="send_barrier",
inputs={"X": dummy_output},
outputs={"Out": []},
attrs={"endpoints": endpoints},
)
def Recv(endpoints, get_vars, dummy_input=None, sync=True):
"""
Receive variables from server side
Args:
endpoints (str): comma separated IP:PORT pairs in the order
of send_vars to send
get_vars (list): vars to get from server after send completes.
sync (bool): whether to wait the request finish
Returns:
list: list of received variables
"""
assert type(get_vars) == list
if dummy_input is None:
dummy_input = []
elif isinstance(dummy_input, Variable):
dummy_input = [dummy_input]
assert type(dummy_input) == list
epmap = endpoints.split(",")
endpoints = list(set(epmap))
helper = LayerHelper("Recv", **locals())
helper.append_op(
type="recv",
inputs={"X": dummy_input},
outputs={"Out": get_vars},
attrs={"endpoints": endpoints, "epmap": epmap},
)
if sync:
helper.append_op(
type="fetch_barrier",
outputs={"Out": get_vars},
attrs={"endpoints": endpoints},
)
return get_vars
def monkey_patch_reader_methods(reader):
def __get_reader__():
scope = global_scope()
......
......@@ -19,7 +19,7 @@ import numpy as np
import paddle
import paddle.fluid as fluid
from paddle.fluid.framework import _apply_pass
from paddle.fluid.ir import apply_build_strategy
from paddle.framework.ir import apply_build_strategy
from paddle.nn import CrossEntropyLoss
from paddle.vision.models import resnet50
......
......@@ -25,7 +25,7 @@ import paddle
import paddle.fluid as fluid
import paddle.fluid.layers.ops as ops
from paddle.fluid import core
from paddle.fluid.layers.io import ListenAndServ, Recv, Send
from paddle.incubate.nn.layer.io import ListenAndServ, Recv, Send
RPC_OP_ROLE_ATTR_NAME = (
op_role_attr_name
......
......@@ -12,12 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import inspect
from os import path
import paddle
from ..fluid.framework import _apply_pass
from . import core
from .framework import _apply_pass
def get_data_vars(program):
......
# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ....fluid.framework import Variable
from ....fluid.layers.control_flow import BlockGuard
from ....framework import LayerHelper, core
class BlockGuardServ(BlockGuard):
"""
BlockGuardServ class.
BlockGuardServ class is used to create an op with a block in a program.
"""
def __init__(self, server):
if not (isinstance(server, ListenAndServ)):
raise TypeError("BlockGuardServ takes a ListenAndServ")
super().__init__(server.helper.main_program)
self.server = server
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
return False
self.server.complete_op()
return super().__exit__(exc_type, exc_val, exc_tb)
class ListenAndServ:
"""
**ListenAndServ Layer**
ListenAndServ is used to create a rpc server bind and listen
on specific TCP port, this server will run the sub-block when
received variables from clients.
Args:
endpoint(string): IP:port string which the server will listen on.
inputs(list): a list of variables that the server will get from clients.
fan_in(int): how many client are expected to report to this server, default: 1.
optimizer_mode(bool): whether to run the server as a parameter server, default: True.
Examples:
.. code-block:: python
import paddle.fluid as fluid
import paddle
with fluid.program_guard(main):
serv = layers.ListenAndServ(
"127.0.0.1:6170", ["X"], optimizer_mode=False)
with serv.do():
x = paddle.static.data(
shape=[32, 32],
dtype='float32',
name="X")
paddle.nn.initializer.Constant(value=1.0)(x, main.global_block())
paddle.scale(x=x, scale=10.0, out=out_var)
exe = fluid.Executor(place)
exe.run(main)
"""
def __init__(self, endpoint, inputs, fan_in=1, optimizer_mode=True):
self.helper = LayerHelper("listen_and_serv")
self.inputs = inputs
self.outputs = []
self.endpoint = endpoint
self.fan_in = fan_in
# FIXME(typhoonzero): add optimizer_mode is stupid, should make it more
# general.
self.optimizer_mode = optimizer_mode
def do(self):
return BlockGuardServ(self)
def get_params_and_grads(self):
main_program = self.helper.main_program
current_block = main_program.current_block()
parent_block = self.parent_block()
# params and grads in the same order.
params = list()
grads = list()
for op in current_block.ops:
# FIXME(typhoonzero): op.inputs is None if it's cloned.
if self.optimizer_mode:
if "Grad" in op.inputs and "Param" in op.inputs:
params.append(op.inputs["Param"].name)
grads.append(op.inputs["Grad"].name)
else:
# simple recv mode, recv operators inputs.
for iname in op.input_names:
for in_var_name in op.input(iname):
params.append(parent_block.var(in_var_name))
grads.append(parent_block.var(in_var_name))
return params, grads
def parent_block(self):
prog = self.helper.main_program
parent_idx = prog.current_block().parent_idx
assert parent_idx >= 0
parent_block = prog.block(parent_idx)
return parent_block
def complete_op(self):
from paddle.incubate.fleet.parameter_server.mode import DistributedMode
main_program = self.helper.main_program
current_block = main_program.current_block()
parent_block = self.parent_block()
parent_block.append_op(
type='listen_and_serv',
inputs={"X": self.inputs},
outputs={},
attrs={
'endpoint': self.endpoint,
'Fanin': self.fan_in,
'optimize_blocks': [
current_block
], # did not support multiple optimize blocks in layers
'distributed_mode': DistributedMode.SYNC, # did not support async now in layers
'grad_to_block_id': [""],
},
)
def Send(endpoints, send_vars, dummy_output=None, sync=True):
"""
Send variables to the server side, and get vars from server
side when server have finished running server side program.
Args:
endpoints (str): comma separated IP:PORT pairs in the order
of send_vars to send
send_vars (list): variables to send to server
sync (bool): whether to wait the request finish
"""
assert type(send_vars) == list
if dummy_output is None:
dummy_output = []
elif isinstance(dummy_output, Variable):
dummy_output = [dummy_output]
assert type(dummy_output) == list
epmap = endpoints.split(",")
endpoints = list(set(epmap))
helper = LayerHelper("Send", **locals())
rpc_op_role_name = core.op_proto_and_checker_maker.kOpRoleAttrName()
helper.append_op(
type="send",
inputs={"X": send_vars},
outputs={"Out": dummy_output},
attrs={
"endpoints": endpoints,
"epmap": epmap,
rpc_op_role_name: core.op_proto_and_checker_maker.OpRole.RPC,
},
)
if sync:
helper.append_op(
type="send_barrier",
inputs={"X": dummy_output},
outputs={"Out": []},
attrs={"endpoints": endpoints},
)
def Recv(endpoints, get_vars, dummy_input=None, sync=True):
"""
Receive variables from server side
Args:
endpoints (str): comma separated IP:PORT pairs in the order
of send_vars to send
get_vars (list): vars to get from server after send completes.
sync (bool): whether to wait the request finish
Returns:
list: list of received variables
"""
assert type(get_vars) == list
if dummy_input is None:
dummy_input = []
elif isinstance(dummy_input, Variable):
dummy_input = [dummy_input]
assert type(dummy_input) == list
epmap = endpoints.split(",")
endpoints = list(set(epmap))
helper = LayerHelper("Recv", **locals())
helper.append_op(
type="recv",
inputs={"X": dummy_input},
outputs={"Out": get_vars},
attrs={"endpoints": endpoints, "epmap": epmap},
)
if sync:
helper.append_op(
type="fetch_barrier",
outputs={"Out": get_vars},
attrs={"endpoints": endpoints},
)
return get_vars
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册