io.py 10.3 KB
Newer Older
1
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
D
dzhwinter 已提交
2
#
D
dzhwinter 已提交
3 4 5
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
D
dzhwinter 已提交
6
#
D
dzhwinter 已提交
7
#     http://www.apache.org/licenses/LICENSE-2.0
D
dzhwinter 已提交
8
#
D
dzhwinter 已提交
9 10 11 12 13
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
14

15
import multiprocessing
P
peizhilin 已提交
16
import os
17
import sys
Y
yuyang18 已提交
18
import threading
D
dzhwinter 已提交
19

Y
yuyang18 已提交
20
from ..data_feeder import DataFeeder
21
from .control_flow import BlockGuard
Y
yuyang18 已提交
22
from .. import core
Y
Refine  
Yu Yang 已提交
23
from ..executor import global_scope
24 25 26 27 28 29 30 31
from ..framework import (
    convert_np_dtype_to_dtype_,
    default_main_program,
    default_startup_program,
    program_guard,
    Program,
    Variable,
)
Y
yuyang18 已提交
32 33
from ..layer_helper import LayerHelper
from ..unique_name import generate as unique_name
34

35
import logging
36
from ..data_feeder import check_dtype, check_type
37
from paddle.fluid.framework import static_only
38 39 40 41 42
from ..framework import (
    _get_paddle_place,
    _current_expected_place,
    _set_expected_place,
)
Y
Yu Yang 已提交
43

G
GGBond8488 已提交
44
__all__ = []
T
typhoonzero 已提交
45 46 47 48 49 50 51 52 53 54 55 56


class BlockGuardServ(BlockGuard):
    """
    BlockGuardServ class.

    BlockGuardServ class is used to create an op with a block in a program.
    """

    def __init__(self, server):
        if not (isinstance(server, ListenAndServ)):
            raise TypeError("BlockGuardServ takes a ListenAndServ")
57
        super().__init__(server.helper.main_program)
T
typhoonzero 已提交
58 59 60 61 62 63 64
        self.server = server

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type is not None:
            return False

        self.server.complete_op()
65
        return super().__exit__(exc_type, exc_val, exc_tb)
T
typhoonzero 已提交
66 67


68
class ListenAndServ:
T
typhoonzero 已提交
69
    """
Y
yi.wu 已提交
70
    **ListenAndServ Layer**
T
typhoonzero 已提交
71

Y
yi.wu 已提交
72 73 74 75 76 77 78 79 80
    ListenAndServ is used to create a rpc server bind and listen
    on specific TCP port, this server will run the sub-block when
    received variables from clients.

    Args:
        endpoint(string): IP:port string which the server will listen on.
        inputs(list): a list of variables that the server will get from clients.
        fan_in(int): how many client are expected to report to this server, default: 1.
        optimizer_mode(bool): whether to run the server as a parameter server, default: True.
Y
update  
yi.wu 已提交
81

Y
yi.wu 已提交
82 83 84
    Examples:
        .. code-block:: python

85
            import paddle.fluid as fluid
2
201716010711 已提交
86
            import paddle
Y
yi.wu 已提交
87 88 89 90
            with fluid.program_guard(main):
                serv = layers.ListenAndServ(
                    "127.0.0.1:6170", ["X"], optimizer_mode=False)
                with serv.do():
G
GGBond8488 已提交
91
                    x = paddle.static.data(
Y
yi.wu 已提交
92 93
                        shape=[32, 32],
                        dtype='float32',
G
GGBond8488 已提交
94
                        name="X")
Y
yi.wu 已提交
95
                    fluid.initializer.Constant(value=1.0)(x, main.global_block())
2
201716010711 已提交
96
                    paddle.scale(x=x, scale=10.0, out=out_var)
Y
yi.wu 已提交
97

Y
yi.wu 已提交
98 99
            exe = fluid.Executor(place)
            exe.run(main)
T
typhoonzero 已提交
100 101
    """

Y
Yancey1989 已提交
102
    def __init__(self, endpoint, inputs, fan_in=1, optimizer_mode=True):
103
        self.helper = LayerHelper("listen_and_serv")
Y
Yancey1989 已提交
104
        self.inputs = inputs
T
typhoonzero 已提交
105 106 107
        self.outputs = []
        self.endpoint = endpoint
        self.fan_in = fan_in
T
typhoonzero 已提交
108 109
        # FIXME(typhoonzero): add optimizer_mode is stupid, should make it more
        # general.
T
WIP  
typhoonzero 已提交
110
        self.optimizer_mode = optimizer_mode
T
typhoonzero 已提交
111 112 113 114 115 116 117 118 119 120 121 122 123

    def do(self):
        return BlockGuardServ(self)

    def get_params_and_grads(self):
        main_program = self.helper.main_program
        current_block = main_program.current_block()
        parent_block = self.parent_block()
        # params and grads in the same order.
        params = list()
        grads = list()
        for op in current_block.ops:
            # FIXME(typhoonzero): op.inputs is None if it's cloned.
T
WIP  
typhoonzero 已提交
124 125 126 127 128 129 130 131
            if self.optimizer_mode:
                if "Grad" in op.inputs and "Param" in op.inputs:
                    params.append(op.inputs["Param"].name)
                    grads.append(op.inputs["Grad"].name)
            else:
                # simple recv mode, recv operators inputs.
                for iname in op.input_names:
                    for in_var_name in op.input(iname):
T
typhoonzero 已提交
132 133
                        params.append(parent_block.var(in_var_name))
                        grads.append(parent_block.var(in_var_name))
T
typhoonzero 已提交
134 135 136

        return params, grads

T
typhoonzero 已提交
137 138 139 140 141 142 143
    def parent_block(self):
        prog = self.helper.main_program
        parent_idx = prog.current_block().parent_idx
        assert parent_idx >= 0
        parent_block = prog.block(parent_idx)
        return parent_block

T
typhoonzero 已提交
144
    def complete_op(self):
145 146
        from ..incubate.fleet.parameter_server.mode import DistributedMode

T
typhoonzero 已提交
147 148 149 150 151
        main_program = self.helper.main_program
        current_block = main_program.current_block()
        parent_block = self.parent_block()

        parent_block.append_op(
152
            type='listen_and_serv',
Y
Yancey1989 已提交
153
            inputs={"X": self.inputs},
T
typhoonzero 已提交
154 155 156 157
            outputs={},
            attrs={
                'endpoint': self.endpoint,
                'Fanin': self.fan_in,
158 159 160 161 162 163 164
                'optimize_blocks': [
                    current_block
                ],  # did not support multiple optimize blocks in layers
                'distributed_mode': DistributedMode.SYNC,  # did not support async now in layers
                'grad_to_block_id': [""],
            },
        )
T
typhoonzero 已提交
165 166


167
def Send(endpoints, send_vars, dummy_output=None, sync=True):
T
typhoonzero 已提交
168
    """
Y
yi.wu 已提交
169 170
    Send variables to the server side, and get vars from server
    side when server have finished running server side program.
T
typhoonzero 已提交
171 172

    Args:
T
tianshuo78520a 已提交
173
        endpoints (str): comma separated IP:PORT pairs in the order
T
typhoonzero 已提交
174
                   of send_vars to send
Y
yi.wu 已提交
175 176
        send_vars (list): variables to send to server
        sync (bool): whether to wait the request finish
T
typhoonzero 已提交
177 178

    """
179
    assert type(send_vars) == list
T
typhoonzero 已提交
180

181 182 183 184 185
    if dummy_output is None:
        dummy_output = []
    elif isinstance(dummy_output, Variable):
        dummy_output = [dummy_output]

186
    assert type(dummy_output) == list
187

T
typhoonzero 已提交
188
    epmap = endpoints.split(",")
T
typhoonzero 已提交
189
    endpoints = list(set(epmap))
T
typhoonzero 已提交
190 191

    helper = LayerHelper("Send", **locals())
Y
Yancey1989 已提交
192
    rpc_op_role_name = core.op_proto_and_checker_maker.kOpRoleAttrName()
Y
Yancey1989 已提交
193

194 195 196 197 198 199 200 201 202 203
    helper.append_op(
        type="send",
        inputs={"X": send_vars},
        outputs={"Out": dummy_output},
        attrs={
            "endpoints": endpoints,
            "epmap": epmap,
            rpc_op_role_name: core.op_proto_and_checker_maker.OpRole.RPC,
        },
    )
Y
yi.wu 已提交
204
    if sync:
205 206 207 208 209 210
        helper.append_op(
            type="send_barrier",
            inputs={"X": dummy_output},
            outputs={"Out": []},
            attrs={"endpoints": endpoints},
        )
211 212


213
def Recv(endpoints, get_vars, dummy_input=None, sync=True):
214
    """
Y
yi.wu 已提交
215
    Receive variables from server side
216 217

    Args:
T
tianshuo78520a 已提交
218
        endpoints (str): comma separated IP:PORT pairs in the order
219
                   of send_vars to send
Y
yi.wu 已提交
220 221
        get_vars (list): vars to get from server after send completes.
        sync (bool): whether to wait the request finish
222

Y
yi.wu 已提交
223 224
    Returns:
        list: list of received variables
225
    """
226
    assert type(get_vars) == list
227

228 229 230 231 232
    if dummy_input is None:
        dummy_input = []
    elif isinstance(dummy_input, Variable):
        dummy_input = [dummy_input]

233
    assert type(dummy_input) == list
234

235 236 237 238
    epmap = endpoints.split(",")
    endpoints = list(set(epmap))

    helper = LayerHelper("Recv", **locals())
239 240 241 242 243 244
    helper.append_op(
        type="recv",
        inputs={"X": dummy_input},
        outputs={"Out": get_vars},
        attrs={"endpoints": endpoints, "epmap": epmap},
    )
Y
yi.wu 已提交
245
    if sync:
246 247 248 249 250
        helper.append_op(
            type="fetch_barrier",
            outputs={"Out": get_vars},
            attrs={"endpoints": endpoints},
        )
Y
yi.wu 已提交
251
    return get_vars
Y
Yu Yang 已提交
252 253


Y
Refine  
Yu Yang 已提交
254 255 256 257 258 259 260 261 262 263
def monkey_patch_reader_methods(reader):
    def __get_reader__():
        scope = global_scope()
        var = scope.find_var(reader.name)
        return var.get_reader()

    def reset():
        return __get_reader__().reset()

    reader.reset = reset
Y
Yu Yang 已提交
264 265
    reader.stop_gradient = True
    reader.persistable = True
Y
Refine  
Yu Yang 已提交
266 267 268
    return reader


Y
Yu Yang 已提交
269 270 271 272
def _copy_reader_var_(block, var):
    new_var = block.create_var(name=var.name, type=core.VarDesc.VarType.READER)
    new_var.desc.set_shapes(var.desc.shapes())
    new_var.desc.set_dtypes(var.desc.dtypes())
S
sneaxiy 已提交
273
    new_var.desc.set_lod_levels(var.desc.lod_levels())
Y
Yu Yang 已提交
274
    new_var.persistable = True
F
fengjiayi 已提交
275 276 277 278
    return new_var


def _copy_reader_create_op_(block, op):
F
fengjiayi 已提交
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294
    input_param_names = op.input_names
    new_input_map = {}
    for param_name in input_param_names:
        new_input_map[param_name] = []
        arg_names = op.input(param_name)
        for arg_name in arg_names:
            new_input_map[param_name].append(block.var(arg_name))

    output_param_names = op.output_names
    new_output_map = {}
    for param_name in output_param_names:
        new_output_map[param_name] = []
        arg_names = op.output(param_name)
        for arg_name in arg_names:
            new_output_map[param_name].append(block.var(arg_name))

295 296 297 298 299 300
    new_op = block.append_op(
        type=op.type,
        inputs=new_input_map,
        outputs=new_output_map,
        attrs=op.all_attrs(),
    )
F
fengjiayi 已提交
301
    return new_op
Y
Yu Yang 已提交
302 303


J
JiayiFeng 已提交
304
def __create_shared_decorated_reader__(op_type, reader, attrs):
Y
Yu Yang 已提交
305 306 307
    var_name = unique_name(op_type)
    startup_blk = default_startup_program().current_block()
    startup_var = startup_blk.create_var(name=var_name)
308 309 310 311 312 313
    startop_op = startup_blk.append_op(
        type=op_type,
        inputs={'UnderlyingReader': reader},
        outputs={'Out': [startup_var]},
        attrs=attrs,
    )
Y
Yu Yang 已提交
314
    startup_var.persistable = True
F
fengjiayi 已提交
315 316 317 318
    main_prog_block = default_main_program().current_block()
    main_prog_var = _copy_reader_var_(main_prog_block, startup_var)
    _copy_reader_create_op_(main_prog_block, startop_op)
    return monkey_patch_reader_methods(main_prog_var)
Y
Yu Yang 已提交
319 320


321 322
def __create_unshared_decorated_reader__(op_type, reader, attrs, name=None):
    new_reader_name = name if name is not None else unique_name(op_type)
323 324
    main_blk = default_main_program().current_block()
    new_reader = main_blk.create_var(name=new_reader_name)
325 326 327 328 329 330
    main_blk.append_op(
        type=op_type,
        inputs={'UnderlyingReader': reader},
        outputs={'Out': [new_reader]},
        attrs=attrs,
    )
331
    return monkey_patch_reader_methods(new_reader)