# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from .. import core from ..framework import convert_np_dtype_to_dtype_, default_main_program, default_startup_program from ..unique_name import generate as unique_name from control_flow import BlockGuard from ..layer_helper import LayerHelper from ..executor import global_scope __all__ = [ 'data', 'BlockGuardServ', 'ListenAndServ', 'Send', 'open_recordio_file', 'open_files', 'read_file', 'create_shuffle_reader', 'create_double_buffer_reader', 'create_multi_pass_reader' ] def data(name, shape, append_batch_size=True, dtype='float32', lod_level=0, type=core.VarDesc.VarType.LOD_TENSOR, stop_gradient=True): """ **Data Layer** This function takes in the input and based on whether data has to be returned back as a minibatch, it creates the global variable by using the helper functions. The global variables can be accessed by all the following operators in the graph. All the input variables of this function are passed in as local variables to the LayerHelper constructor. Args: name(str): The name/alias of the function shape(list): Tuple declaring the shape. append_batch_size(bool): Whether or not to append the data as a batch. dtype(int|float): The type of data : float32, float_16, int etc type(VarType): The output type. By default it is LOD_TENSOR. lod_level(int): The LoD Level. 0 means the input data is not a sequence. main_program(Program): Name of the main program that calls this startup_program(Program): Name of the startup program stop_gradient(bool): A boolean that mentions whether gradient should flow. Returns: Variable: The global variable that gives access to the data. Examples: .. code-block:: python data = fluid.layers.data(name='x', shape=[784], dtype='float32') """ helper = LayerHelper('data', **locals()) shape = list(shape) for i in xrange(len(shape)): if shape[i] is None: shape[i] = -1 append_batch_size = False elif shape[i] < 0: append_batch_size = False if append_batch_size: shape = [-1] + shape # append batch size as -1 return helper.create_global_variable( name=name, shape=shape, dtype=dtype, type=type, stop_gradient=stop_gradient, lod_level=lod_level) class BlockGuardServ(BlockGuard): """ BlockGuardServ class. BlockGuardServ class is used to create an op with a block in a program. """ def __init__(self, server): if not (isinstance(server, ListenAndServ)): raise TypeError("BlockGuardServ takes a ListenAndServ") super(BlockGuardServ, self).__init__(server.helper.main_program) self.server = server def __exit__(self, exc_type, exc_val, exc_tb): if exc_type is not None: return False self.server.complete_op() return super(BlockGuardServ, self).__exit__(exc_type, exc_val, exc_tb) class ListenAndServ(object): """ ListenAndServ class. ListenAndServ class is used to wrap listen_and_serv op to create a server which can receive variables from clients and run a block. """ def __init__(self, endpoint, inputs, fan_in=1, optimizer_mode=True): self.helper = LayerHelper("listen_and_serv") self.inputs = inputs self.outputs = [] self.endpoint = endpoint self.fan_in = fan_in # FIXME(typhoonzero): add optimizer_mode is stupid, should make it more # general. self.optimizer_mode = optimizer_mode def do(self): return BlockGuardServ(self) def get_params_and_grads(self): main_program = self.helper.main_program current_block = main_program.current_block() parent_block = self.parent_block() # params and grads in the same order. params = list() grads = list() for op in current_block.ops: # FIXME(typhoonzero): op.inputs is None if it's cloned. if self.optimizer_mode: if "Grad" in op.inputs and "Param" in op.inputs: params.append(op.inputs["Param"].name) grads.append(op.inputs["Grad"].name) else: # simple recv mode, recv operators inputs. for iname in op.input_names: for in_var_name in op.input(iname): params.append(parent_block.var(in_var_name)) grads.append(parent_block.var(in_var_name)) return params, grads def parent_block(self): prog = self.helper.main_program parent_idx = prog.current_block().parent_idx assert parent_idx >= 0 parent_block = prog.block(parent_idx) return parent_block def complete_op(self): main_program = self.helper.main_program current_block = main_program.current_block() parent_block = self.parent_block() parent_block.append_op( type='listen_and_serv', inputs={"X": self.inputs}, outputs={}, attrs={ 'endpoint': self.endpoint, 'Fanin': self.fan_in, 'OptimizeBlock': current_block }) def Send(endpoints, send_vars, get_vars): """ Send layer Args: endpoints: comma seperated IP:PORT pairs in the order of send_vars to send send_vars: vars to send get_vars: vars to get from server after send completes. Send variables to the server side, and get vars from server side when server have finished running server side program. """ assert (type(send_vars) == list) assert (type(get_vars) == list) epmap = endpoints.split(",") endpoints = list(set(epmap)) helper = LayerHelper("Send", **locals()) rpc_client_var = default_main_program().global_block().create_var( name="RPC_CLIENT_VAR", persistable=True, type=core.VarDesc.VarType.RAW) helper.append_op( type="send", inputs={"X": send_vars}, outputs={"Out": get_vars, "RPCClient": rpc_client_var}, attrs={"endpoints": endpoints, "epmap": epmap}) def Recv(endpoints, get_vars): """ Recv layer Args: endpoints: comma seperated IP:PORT pairs in the order of send_vars to send send_vars: vars to send get_vars: vars to get from server after send completes. Send variables to the server side, and get vars from server side when server have finished running server side program. """ assert (type(send_vars) == list) assert (type(get_vars) == list) epmap = endpoints.split(",") endpoints = list(set(epmap)) helper = LayerHelper("Recv", **locals()) helper.append_op( type="recv", inputs={"X": get_vars}, outputs={"Out": get_vars}, attrs={"endpoints": endpoints, "epmap": epmap}) def monkey_patch_reader_methods(reader): def __get_reader__(): scope = global_scope() var = scope.find_var(reader.name) return var.get_reader() def eof(): return not __get_reader__().has_next() def reset(): return __get_reader__().reset() reader.eof = eof reader.reset = reset reader.stop_gradient = True reader.persistable = True return reader def _copy_reader_var_(block, var): new_var = block.create_var(name=var.name, type=core.VarDesc.VarType.READER) new_var.desc.set_shapes(var.desc.shapes()) new_var.desc.set_dtypes(var.desc.dtypes()) new_var.persistable = True return monkey_patch_reader_methods(new_var) def open_recordio_file(filename, shapes, lod_levels, dtypes): dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes] shape_concat = [] ranks = [] for shape in shapes: shape_concat.extend(shape) ranks.append(len(shape)) var_name = unique_name('open_recordio_file') startup_blk = default_startup_program().current_block() startup_var = startup_blk.create_var(name=var_name) startup_blk.append_op( type='create_recordio_file_reader', outputs={'Out': [startup_var]}, attrs={ 'shape_concat': shape_concat, 'lod_levels': lod_levels, 'filename': filename, 'ranks': ranks }) startup_var.desc.set_dtypes(dtypes) startup_var.persistable = True return _copy_reader_var_(default_main_program().current_block(), startup_var) def open_files(filenames, thread_num, shapes, lod_levels, dtypes): dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes] shape_concat = [] ranks = [] for shape in shapes: shape_concat.extend(shape) ranks.append(len(shape)) var_name = unique_name('multiple_reader') startup_blk = default_startup_program().current_block() startup_var = startup_blk.create_var(name=var_name) startup_blk.append_op( type='open_files', outputs={'Out': [startup_var]}, attrs={ 'shape_concat': shape_concat, 'lod_levels': lod_levels, 'ranks': ranks, 'file_names': filenames, 'thread_num': thread_num }) startup_var.desc.set_dtypes(dtypes) startup_var.persistable = True return _copy_reader_var_(default_main_program().current_block(), startup_var) def __create_decorated_reader__(op_type, reader, attrs): var_name = unique_name(op_type) startup_blk = default_startup_program().current_block() startup_var = startup_blk.create_var(name=var_name) startup_blk.append_op( type=op_type, inputs={'UnderlyingReader': reader}, outputs={'Out': [startup_var]}, attrs=attrs) startup_var.persistable = True return _copy_reader_var_(default_main_program().current_block(), startup_var) def create_shuffle_reader(reader, buffer_size): return __create_decorated_reader__('create_shuffle_reader', reader, {'buffer_size': int(buffer_size)}) def create_double_buffer_reader(reader, place=None): attrs = dict() if place is not None: attrs['place'] = str(place).upper() return __create_decorated_reader__('create_double_buffer_reader', reader, attrs) def create_multi_pass_reader(reader, pass_num): return __create_decorated_reader__('create_multi_pass_reader', reader, {'pass_num': int(pass_num)}) def read_file(file_obj): helper = LayerHelper('read_file') out = [ helper.create_tmp_variable( stop_gradient=True, dtype='float32') for _ in range(len(file_obj.desc.shapes())) ] helper.append_op( type='read', inputs={'Reader': [file_obj]}, outputs={'Out': out}) if len(out) == 1: return out[0] else: return out