io.py 14.0 KB
Newer Older
1
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
D
dzhwinter 已提交
2
#
D
dzhwinter 已提交
3 4 5
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
D
dzhwinter 已提交
6
#
D
dzhwinter 已提交
7
#     http://www.apache.org/licenses/LICENSE-2.0
D
dzhwinter 已提交
8
#
D
dzhwinter 已提交
9 10 11 12 13
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
14

15
import multiprocessing
P
peizhilin 已提交
16
import os
17
import sys
Y
yuyang18 已提交
18
import threading
D
dzhwinter 已提交
19

Y
yuyang18 已提交
20
from ..data_feeder import DataFeeder
21
from .control_flow import BlockGuard
Y
yuyang18 已提交
22
from .. import core
Y
Refine  
Yu Yang 已提交
23
from ..executor import global_scope
24 25 26 27 28 29 30 31
from ..framework import (
    convert_np_dtype_to_dtype_,
    default_main_program,
    default_startup_program,
    program_guard,
    Program,
    Variable,
)
Y
yuyang18 已提交
32 33
from ..layer_helper import LayerHelper
from ..unique_name import generate as unique_name
34

35
import logging
36
from ..data_feeder import check_dtype, check_type
37
from paddle.fluid.framework import static_only
38 39 40 41 42
from ..framework import (
    _get_paddle_place,
    _current_expected_place,
    _set_expected_place,
)
Y
Yu Yang 已提交
43

Y
Yu Yang 已提交
44
__all__ = [
45
    'data',
Y
Yu Yang 已提交
46
]
Y
Yu Yang 已提交
47 48


49
@static_only
50 51 52 53 54 55 56 57 58
def data(
    name,
    shape,
    append_batch_size=True,
    dtype='float32',
    lod_level=0,
    type=core.VarDesc.VarType.LOD_TENSOR,
    stop_gradient=True,
):
Y
Yu Yang 已提交
59
    """
K
kavyasrinet 已提交
60
    **Data Layer**
Y
Yu Yang 已提交
61

G
guofei 已提交
62 63
    This operator creates the global variable. The global variables can be
    accessed by all the following operators in the graph.
Y
Yu Yang 已提交
64

65 66
    Note:
        :code:`paddle.fluid.layers.data` is deprecated as it will be removed in
G
guofei 已提交
67
        a later version. Please use :code:`paddle.fluid.data` .
Y
Yu Yang 已提交
68

69
        This :code:`paddle.fluid.layers.data` set shape and dtype at compile
T
tianshuo78520a 已提交
70
        time but does NOT check the shape or the dtype of fed data, the
71
        :code:`paddle.fluid.data` checks the shape and the dtype of data fed
G
guofei 已提交
72
        by Executor or ParallelExecutor during run time.
73

74 75 76 77 78 79 80 81 82 83
        To feed variable size inputs, users can feed variable size inputs
        directly to this :code:`paddle.fluid.layers.data` and PaddlePaddle will
        fit the size accordingly. Or set -1 on the variable dimension when using
        :code:`paddle.fluid.data` .

        The default :code:`stop_gradient` attribute of the Variable created by
        this API is true, which means the gradient won't be passed backward
        through the data Varaible. Set :code:`var.stop_gradient = False` If
        user would like to pass backward gradient.

K
kavyasrinet 已提交
84
    Args:
G
guofei 已提交
85 86
       name(str): The name/alias of the variable, see :ref:`api_guide_Name`
            for more details.
87
       shape(list|tuple): Tuple declaring the shape. If :code:`append_batch_size` is
88
            True and there is no -1 inside :code:`shape`, it should be
G
guofei 已提交
89
            considered as the shape of the each sample. Otherwise, it should
90
            be considered as the shape of the batched data.
X
Xin Pan 已提交
91 92
       append_batch_size(bool):
          1. If true, it prepends -1 to the shape.
93
            For example if shape=[1], the resulting shape is [-1, 1]. This will
94 95 96 97 98
            be useful to set different batch size at run time.
          2. If shape contains -1, such as shape=[1, -1].
            append_batch_size will be enforced to be be False (ineffective)
            because PaddlePaddle cannot set more than 1 unknown number on the
            shape.
G
guofei 已提交
99 100 101
       dtype(np.dtype|VarType|str): The type of the data. Supported dtype: bool,
            float16, float32, float64, int8, int16, int32, int64, uint8.
       type(VarType): The output type. Supported dtype: VarType.LOD_TENSOR,
102
            VarType.SELECTED_ROWS, VarType.NCCL_ID. Default: VarType.LOD_TENSOR.
K
kavyasrinet 已提交
103
       lod_level(int): The LoD Level. 0 means the input data is not a sequence.
G
guofei 已提交
104
            Default: 0.
K
kavyasrinet 已提交
105
       stop_gradient(bool): A boolean that mentions whether gradient should flow.
106
            Default: True.
K
kavyasrinet 已提交
107 108

    Returns:
G
guofei 已提交
109 110 111 112
        The global variable that gives access to the data.

    Return Type:
        Variable
K
kavyasrinet 已提交
113 114 115 116

    Examples:
        .. code-block:: python

117
          import paddle.fluid as fluid
K
kavyasrinet 已提交
118
          data = fluid.layers.data(name='x', shape=[784], dtype='float32')
Y
Yu Yang 已提交
119 120
    """
    helper = LayerHelper('data', **locals())
121

122
    check_type(name, 'name', (bytes, str), 'data')
123 124
    check_type(shape, 'shape', (list, tuple), 'data')

Y
Yu Yang 已提交
125
    shape = list(shape)
126
    for i in range(len(shape)):
Y
Yu Yang 已提交
127 128 129 130 131 132 133 134 135
        if shape[i] is None:
            shape[i] = -1
            append_batch_size = False
        elif shape[i] < 0:
            append_batch_size = False

    if append_batch_size:
        shape = [-1] + shape  # append batch size as -1

136 137 138 139 140 141 142 143 144
    data_var = helper.create_global_variable(
        name=name,
        shape=shape,
        dtype=dtype,
        type=type,
        stop_gradient=stop_gradient,
        lod_level=lod_level,
        is_data=True,
    )
Y
Yu Yang 已提交
145
    return data_var
T
typhoonzero 已提交
146 147 148 149 150 151 152 153 154 155 156 157


class BlockGuardServ(BlockGuard):
    """
    BlockGuardServ class.

    BlockGuardServ class is used to create an op with a block in a program.
    """

    def __init__(self, server):
        if not (isinstance(server, ListenAndServ)):
            raise TypeError("BlockGuardServ takes a ListenAndServ")
158
        super().__init__(server.helper.main_program)
T
typhoonzero 已提交
159 160 161 162 163 164 165
        self.server = server

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type is not None:
            return False

        self.server.complete_op()
166
        return super().__exit__(exc_type, exc_val, exc_tb)
T
typhoonzero 已提交
167 168


169
class ListenAndServ:
T
typhoonzero 已提交
170
    """
Y
yi.wu 已提交
171
    **ListenAndServ Layer**
T
typhoonzero 已提交
172

Y
yi.wu 已提交
173 174 175 176 177 178 179 180 181
    ListenAndServ is used to create a rpc server bind and listen
    on specific TCP port, this server will run the sub-block when
    received variables from clients.

    Args:
        endpoint(string): IP:port string which the server will listen on.
        inputs(list): a list of variables that the server will get from clients.
        fan_in(int): how many client are expected to report to this server, default: 1.
        optimizer_mode(bool): whether to run the server as a parameter server, default: True.
Y
update  
yi.wu 已提交
182

Y
yi.wu 已提交
183 184 185
    Examples:
        .. code-block:: python

186
            import paddle.fluid as fluid
2
201716010711 已提交
187
            import paddle
Y
yi.wu 已提交
188 189 190 191 192 193 194 195 196 197
            with fluid.program_guard(main):
                serv = layers.ListenAndServ(
                    "127.0.0.1:6170", ["X"], optimizer_mode=False)
                with serv.do():
                    x = layers.data(
                        shape=[32, 32],
                        dtype='float32',
                        name="X",
                        append_batch_size=False)
                    fluid.initializer.Constant(value=1.0)(x, main.global_block())
2
201716010711 已提交
198
                    paddle.scale(x=x, scale=10.0, out=out_var)
Y
yi.wu 已提交
199

Y
yi.wu 已提交
200 201
            exe = fluid.Executor(place)
            exe.run(main)
T
typhoonzero 已提交
202 203
    """

Y
Yancey1989 已提交
204
    def __init__(self, endpoint, inputs, fan_in=1, optimizer_mode=True):
205
        self.helper = LayerHelper("listen_and_serv")
Y
Yancey1989 已提交
206
        self.inputs = inputs
T
typhoonzero 已提交
207 208 209
        self.outputs = []
        self.endpoint = endpoint
        self.fan_in = fan_in
T
typhoonzero 已提交
210 211
        # FIXME(typhoonzero): add optimizer_mode is stupid, should make it more
        # general.
T
WIP  
typhoonzero 已提交
212
        self.optimizer_mode = optimizer_mode
T
typhoonzero 已提交
213 214 215 216 217 218 219 220 221 222 223 224 225

    def do(self):
        return BlockGuardServ(self)

    def get_params_and_grads(self):
        main_program = self.helper.main_program
        current_block = main_program.current_block()
        parent_block = self.parent_block()
        # params and grads in the same order.
        params = list()
        grads = list()
        for op in current_block.ops:
            # FIXME(typhoonzero): op.inputs is None if it's cloned.
T
WIP  
typhoonzero 已提交
226 227 228 229 230 231 232 233
            if self.optimizer_mode:
                if "Grad" in op.inputs and "Param" in op.inputs:
                    params.append(op.inputs["Param"].name)
                    grads.append(op.inputs["Grad"].name)
            else:
                # simple recv mode, recv operators inputs.
                for iname in op.input_names:
                    for in_var_name in op.input(iname):
T
typhoonzero 已提交
234 235
                        params.append(parent_block.var(in_var_name))
                        grads.append(parent_block.var(in_var_name))
T
typhoonzero 已提交
236 237 238

        return params, grads

T
typhoonzero 已提交
239 240 241 242 243 244 245
    def parent_block(self):
        prog = self.helper.main_program
        parent_idx = prog.current_block().parent_idx
        assert parent_idx >= 0
        parent_block = prog.block(parent_idx)
        return parent_block

T
typhoonzero 已提交
246
    def complete_op(self):
247 248
        from ..incubate.fleet.parameter_server.mode import DistributedMode

T
typhoonzero 已提交
249 250 251 252 253
        main_program = self.helper.main_program
        current_block = main_program.current_block()
        parent_block = self.parent_block()

        parent_block.append_op(
254
            type='listen_and_serv',
Y
Yancey1989 已提交
255
            inputs={"X": self.inputs},
T
typhoonzero 已提交
256 257 258 259
            outputs={},
            attrs={
                'endpoint': self.endpoint,
                'Fanin': self.fan_in,
260 261 262 263 264 265 266
                'optimize_blocks': [
                    current_block
                ],  # did not support multiple optimize blocks in layers
                'distributed_mode': DistributedMode.SYNC,  # did not support async now in layers
                'grad_to_block_id': [""],
            },
        )
T
typhoonzero 已提交
267 268


269
def Send(endpoints, send_vars, dummy_output=None, sync=True):
T
typhoonzero 已提交
270
    """
Y
yi.wu 已提交
271 272
    Send variables to the server side, and get vars from server
    side when server have finished running server side program.
T
typhoonzero 已提交
273 274

    Args:
T
tianshuo78520a 已提交
275
        endpoints (str): comma separated IP:PORT pairs in the order
T
typhoonzero 已提交
276
                   of send_vars to send
Y
yi.wu 已提交
277 278
        send_vars (list): variables to send to server
        sync (bool): whether to wait the request finish
T
typhoonzero 已提交
279 280

    """
281
    assert type(send_vars) == list
T
typhoonzero 已提交
282

283 284 285 286 287
    if dummy_output is None:
        dummy_output = []
    elif isinstance(dummy_output, Variable):
        dummy_output = [dummy_output]

288
    assert type(dummy_output) == list
289

T
typhoonzero 已提交
290
    epmap = endpoints.split(",")
T
typhoonzero 已提交
291
    endpoints = list(set(epmap))
T
typhoonzero 已提交
292 293

    helper = LayerHelper("Send", **locals())
Y
Yancey1989 已提交
294
    rpc_op_role_name = core.op_proto_and_checker_maker.kOpRoleAttrName()
Y
Yancey1989 已提交
295

296 297 298 299 300 301 302 303 304 305
    helper.append_op(
        type="send",
        inputs={"X": send_vars},
        outputs={"Out": dummy_output},
        attrs={
            "endpoints": endpoints,
            "epmap": epmap,
            rpc_op_role_name: core.op_proto_and_checker_maker.OpRole.RPC,
        },
    )
Y
yi.wu 已提交
306
    if sync:
307 308 309 310 311 312
        helper.append_op(
            type="send_barrier",
            inputs={"X": dummy_output},
            outputs={"Out": []},
            attrs={"endpoints": endpoints},
        )
313 314


315
def Recv(endpoints, get_vars, dummy_input=None, sync=True):
316
    """
Y
yi.wu 已提交
317
    Receive variables from server side
318 319

    Args:
T
tianshuo78520a 已提交
320
        endpoints (str): comma separated IP:PORT pairs in the order
321
                   of send_vars to send
Y
yi.wu 已提交
322 323
        get_vars (list): vars to get from server after send completes.
        sync (bool): whether to wait the request finish
324

Y
yi.wu 已提交
325 326
    Returns:
        list: list of received variables
327
    """
328
    assert type(get_vars) == list
329

330 331 332 333 334
    if dummy_input is None:
        dummy_input = []
    elif isinstance(dummy_input, Variable):
        dummy_input = [dummy_input]

335
    assert type(dummy_input) == list
336

337 338 339 340
    epmap = endpoints.split(",")
    endpoints = list(set(epmap))

    helper = LayerHelper("Recv", **locals())
341 342 343 344 345 346
    helper.append_op(
        type="recv",
        inputs={"X": dummy_input},
        outputs={"Out": get_vars},
        attrs={"endpoints": endpoints, "epmap": epmap},
    )
Y
yi.wu 已提交
347
    if sync:
348 349 350 351 352
        helper.append_op(
            type="fetch_barrier",
            outputs={"Out": get_vars},
            attrs={"endpoints": endpoints},
        )
Y
yi.wu 已提交
353
    return get_vars
Y
Yu Yang 已提交
354 355


Y
Refine  
Yu Yang 已提交
356 357 358 359 360 361 362 363 364 365
def monkey_patch_reader_methods(reader):
    def __get_reader__():
        scope = global_scope()
        var = scope.find_var(reader.name)
        return var.get_reader()

    def reset():
        return __get_reader__().reset()

    reader.reset = reset
Y
Yu Yang 已提交
366 367
    reader.stop_gradient = True
    reader.persistable = True
Y
Refine  
Yu Yang 已提交
368 369 370
    return reader


Y
Yu Yang 已提交
371 372 373 374
def _copy_reader_var_(block, var):
    new_var = block.create_var(name=var.name, type=core.VarDesc.VarType.READER)
    new_var.desc.set_shapes(var.desc.shapes())
    new_var.desc.set_dtypes(var.desc.dtypes())
S
sneaxiy 已提交
375
    new_var.desc.set_lod_levels(var.desc.lod_levels())
Y
Yu Yang 已提交
376
    new_var.persistable = True
F
fengjiayi 已提交
377 378 379 380
    return new_var


def _copy_reader_create_op_(block, op):
F
fengjiayi 已提交
381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396
    input_param_names = op.input_names
    new_input_map = {}
    for param_name in input_param_names:
        new_input_map[param_name] = []
        arg_names = op.input(param_name)
        for arg_name in arg_names:
            new_input_map[param_name].append(block.var(arg_name))

    output_param_names = op.output_names
    new_output_map = {}
    for param_name in output_param_names:
        new_output_map[param_name] = []
        arg_names = op.output(param_name)
        for arg_name in arg_names:
            new_output_map[param_name].append(block.var(arg_name))

397 398 399 400 401 402
    new_op = block.append_op(
        type=op.type,
        inputs=new_input_map,
        outputs=new_output_map,
        attrs=op.all_attrs(),
    )
F
fengjiayi 已提交
403
    return new_op
Y
Yu Yang 已提交
404 405


J
JiayiFeng 已提交
406
def __create_shared_decorated_reader__(op_type, reader, attrs):
Y
Yu Yang 已提交
407 408 409
    var_name = unique_name(op_type)
    startup_blk = default_startup_program().current_block()
    startup_var = startup_blk.create_var(name=var_name)
410 411 412 413 414 415
    startop_op = startup_blk.append_op(
        type=op_type,
        inputs={'UnderlyingReader': reader},
        outputs={'Out': [startup_var]},
        attrs=attrs,
    )
Y
Yu Yang 已提交
416
    startup_var.persistable = True
F
fengjiayi 已提交
417 418 419 420
    main_prog_block = default_main_program().current_block()
    main_prog_var = _copy_reader_var_(main_prog_block, startup_var)
    _copy_reader_create_op_(main_prog_block, startop_op)
    return monkey_patch_reader_methods(main_prog_var)
Y
Yu Yang 已提交
421 422


423 424
def __create_unshared_decorated_reader__(op_type, reader, attrs, name=None):
    new_reader_name = name if name is not None else unique_name(op_type)
425 426
    main_blk = default_main_program().current_block()
    new_reader = main_blk.create_var(name=new_reader_name)
427 428 429 430 431 432
    main_blk.append_op(
        type=op_type,
        inputs={'UnderlyingReader': reader},
        outputs={'Out': [new_reader]},
        attrs=attrs,
    )
433
    return monkey_patch_reader_methods(new_reader)