From 3d96601b82bc98509f7e5888aca45891f604fe55 Mon Sep 17 00:00:00 2001 From: lilong12 Date: Sat, 4 Jul 2020 06:55:46 +0800 Subject: [PATCH] modify pipeline optimizer to only support the mode of sync pipeline training (#25065) * modify pipeline optimizer, test=develop --- paddle/fluid/API.spec | 3 + python/paddle/fluid/optimizer.py | 1003 +++++++++++++---- .../fluid/tests/unittests/CMakeLists.txt | 2 - .../fluid/tests/unittests/test_boxps.py | 112 -- .../tests/unittests/test_data_norm_op.py | 123 -- .../unittests/test_paddlebox_datafeed.py | 146 --- .../fluid/tests/unittests/test_pipeline.py | 355 +++--- 7 files changed, 931 insertions(+), 813 deletions(-) create mode 100644 paddle/fluid/API.spec delete mode 100644 python/paddle/fluid/tests/unittests/test_paddlebox_datafeed.py diff --git a/paddle/fluid/API.spec b/paddle/fluid/API.spec new file mode 100644 index 00000000000..fc5aa111483 --- /dev/null +++ b/paddle/fluid/API.spec @@ -0,0 +1,3 @@ +paddle.fluid.optimizer.PipelineOptimizer (paddle.fluid.optimizer.PipelineOptimizer, ('document', '2e55a29dbeb874934f7a1a1af3a22b8c')) +paddle.fluid.optimizer.PipelineOptimizer.__init__ (ArgSpec(args=['self', 'optimizer', 'num_microbatches', 'start_cpu_core_id'], varargs=None, keywords=None, defaults=(1, 0)), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) +paddle.fluid.optimizer.PipelineOptimizer.minimize (ArgSpec(args=['self', 'loss', 'startup_program', 'parameter_list', 'no_grad_set'], varargs=None, keywords=None, defaults=(None, None, None)), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) diff --git a/python/paddle/fluid/optimizer.py b/python/paddle/fluid/optimizer.py index 28403284eb8..6b145105132 100644 --- a/python/paddle/fluid/optimizer.py +++ b/python/paddle/fluid/optimizer.py @@ -3633,302 +3633,851 @@ class PipelineOptimizer(object): """ :api_attr: Static Graph - Pipeline Optimizer - - Train with pipeline mode. The program will be split by cut_list. - - If the len of cut_list is k, then the whole program (including \ - backward part) will be split to 2*k-1 sections. - - So the length of place_list and concurrency_list must be also 2*k-1. - - Note: Though the asynchronous mode is applied in pipeline training to speed up, \ - the final performance depends on the training progress of each pipeline heavily. - - And we will try the synchronous mode in the future. + Pipeline Optimizer: Make a program to run as pipeline, that is splitting a + program into multiple sections (sub-programs) and each section run on a + device to enable the training of large scale models and the use of + heterogeneous devices. Meanwhile, all sections run in the stype of pipeline. Args: - optimizer (Optimizer): The based optimizer, such as SGD. - cut_list (list of Variable list): The cut variable of the main_program. - place_list (list of Place): The place where the section will run on. - concurrency_list (list of int): The concurrency degree. - queue_size (int): Each section will consume scopes from its in-scope queue - and produce scopes to out-scope queue. And this parameter - specify the scope queue size. [Optional. Default: 30]. - sync_steps (int): The synchronization steps between different cards. [Optional. Default: 1]. - start_cpu_core_id (int): specify the first cpu core id. [Optional. Default:0]. - + optimizer (Optimizer): The optimizer to use, such as SGD. + num_microbatches (int): Number of microbatches. [Optional. Default:1]. + start_cpu_core_id (int): The first cpu core id to use. [Optional. Default:0]. + Examples: .. code-block:: python import paddle.fluid as fluid import paddle.fluid.layers as layers - x = fluid.layers.data(name='x', shape=[1], dtype='int64', lod_level=0) - y = fluid.layers.data(name='y', shape=[1], dtype='int64', lod_level=0) - emb_x = layers.embedding(input=x, param_attr=fluid.ParamAttr(name="embx"), size=[10,2], is_sparse=False) - emb_y = layers.embedding(input=y, param_attr=fluid.ParamAttr(name="emby",learning_rate=0.9), size=[10,2], is_sparse=False) - concat = layers.concat([emb_x, emb_y], axis=1) - fc = layers.fc(input=concat, name="fc", size=1, num_flatten_dims=1, bias_attr=False) - loss = layers.reduce_mean(fc) + with fluid.device_guard("gpu:0"): + x = fluid.layers.data(name='x', shape=[1], dtype='int64', lod_level=0) + y = fluid.layers.data(name='y', shape=[1], dtype='int64', lod_level=0) + data_loader = fluid.io.DataLoader.from_generator( + feed_list=[x, y], + capacity=64, + use_double_buffer=True, + iterable=False) + + emb_x = layers.embedding(input=x, param_attr=fluid.ParamAttr(name="embx"), size=[10,2], is_sparse=False) + emb_y = layers.embedding(input=y, param_attr=fluid.ParamAttr(name="emby",learning_rate=0.9), size=[10,2], is_sparse=False) + + with fluid.device_guard("gpu:1"): + concat = layers.concat([emb_x, emb_y], axis=1) + fc = layers.fc(input=concat, name="fc", size=1, num_flatten_dims=1, bias_attr=False) + loss = layers.reduce_mean(fc) optimizer = fluid.optimizer.SGD(learning_rate=0.5) - optimizer = fluid.optimizer.PipelineOptimizer(optimizer, - cut_list=[[emb_x, emb_y], [loss]], - place_list=[fluid.CPUPlace(), fluid.CUDAPlace(0), fluid.CPUPlace()], - concurrency_list=[1, 1, 4], - queue_size=2, - sync_steps=1, - ) + optimizer = fluid.optimizer.PipelineOptimizer(optimizer) optimizer.minimize(loss) - place = fluid.CPUPlace() + + def train_reader(): + for _ in range(4): + x = np.random.random(size=[1]).astype('int64') + y = np.random.random(size=[1]).astype('int64') + yield x, y + data_loader.set_sample_generator(train_reader, batch_size=1) + + place = fluid.CUDAPlace(0) exe = fluid.Executor(place) exe.run(fluid.default_startup_program()) + batch_size = 1 filelist = [] # you should set your own filelist, e.g. filelist = ["dataA.txt"] dataset = fluid.DatasetFactory().create_dataset("FileInstantDataset") dataset.set_use_var([x,y]) dataset.set_batch_size(batch_size) dataset.set_filelist(filelist) + data_loader.start() exe.train_from_dataset( - fluid.default_main_program(), - dataset, - thread=2, - debug=False, - fetch_list=[], - fetch_info=[], - print_period=1) + fluid.default_main_program(), + dataset) + data_loader.reset() """ - def __init__(self, - optimizer, - cut_list=None, - place_list=None, - concurrency_list=None, - queue_size=30, - sync_steps=1, - start_cpu_core_id=0): + def __init__(self, optimizer, num_microbatches=1, start_cpu_core_id=0): if framework.in_dygraph_mode(): raise Exception("In dygraph, don't support PipelineOptimizer.") - # TODO: check properties + if not isinstance(optimizer, Optimizer): + raise ValueError("The 'optimizer' parameter for " + "PipelineOptimizer must be an instance of " + "Optimizer, but the given type is {}.".format( + type(optimizer))) self._optimizer = optimizer - self._cut_list = cut_list - self._place_list = place_list - self._concurrency_list = concurrency_list - self._queue_size = queue_size - self._sync_steps = sync_steps + assert num_microbatches >= 1, ( + "num_microbatches must be a positive value.") + self._num_microbatches = num_microbatches + assert start_cpu_core_id >= 0, ( + "start_cpu_core_id must be greater than or equal to 0.") self._start_cpu_core_id = start_cpu_core_id + self._place_list = None + op_maker = core.op_proto_and_checker_maker + self._op_role = op_maker.OpRole + self._op_role_key = op_maker.kOpRoleAttrName() + self._op_role_var_key = op_maker.kOpRoleVarAttrName() + self._op_device_key = op_maker.kOpDeviceAttrName() + self._param_device_map = dict() def _create_vars(self, block, main_program): + # Create vars for block, copied from main_program's global block used_var_set = set() for op_idx in range(block.desc.op_size()): op_desc = block.desc.op(op_idx) vars = op_desc.input_arg_names() + op_desc.output_arg_names() for var in vars: - if var in used_var_set: + # a var whose name contains "blocking_queue" + # only exists in startup program + if var in used_var_set or "_blocking_queue" in var: continue used_var_set.add(var) source_var = main_program.block(0).var(str(var)) - block._clone_variable(source_var, False) + if source_var.type == core.VarDesc.VarType.READER: + block.create_var(name=var, type=core.VarDesc.VarType.READER) + else: + block._clone_variable(source_var, False) + + def _is_loss_grad_op(self, op): + if self._op_role_key not in op.attr_names: + return False + op_role = int(op.all_attrs()[self._op_role_key]) + return op_role & int(self._op_role.Backward) and op_role & int( + self._op_role.Loss) + + def _is_backward_op(self, op): + return self._op_role_key in op.attr_names and int(op.all_attrs()[ + self._op_role_key]) & int(self._op_role.Backward) + + def _is_optimize_op(self, op): + return self._op_role_key in op.attr_names and int(op.all_attrs()[ + self._op_role_key]) & int(self._op_role.Optimize) + + def _is_update_op(self, op): + return 'Param' in op.input_names and 'Grad' in op.input_names and ( + "LearningRate" in op.input_names) - def _extract_section_opt_ops(self, ops, cut_point_name): + def _split_program(self, main_program): """ - Extract opt ops in the given section + Split a program into sections according to devices that ops run on. + + Args: + main_program (Program): the main program """ - output_names = set(cut_point_name) - relevant_op_flags = [True] * len(ops) - for i, op in reversed(list(enumerate(ops))): - if _some_in_set_(op.desc.output_arg_names(), output_names): - for name in op.desc.input_arg_names(): - output_names.add(name) - else: - relevant_op_flags[i] = False + programs = [] + # Map from device to its corresponding section program info + device_program_map = dict() + block = main_program.block(0) + + for op in block.ops: + device = op.attr(self._op_device_key) + + if device not in device_program_map: + program = {"program": Program()} + device_program_map[device] = program + program = device_program_map[device] + op_desc = op.desc + ap_op = program["program"].block(0).desc.append_op() + ap_op.copy_from(op_desc) + + for key in sorted(device_program_map.keys()): + program = device_program_map[key] + program['program']._sync_with_cpp() + programs.append(program) + + return programs - op_path = [ops[i] for i in range(len(ops)) if relevant_op_flags[i]] - return op_path + def _find_post_op(self, ops, cur_op, var_name): + """ + Find the real post op that has variable named var_name as input. - def _find_input_output(self, ops, name, is_forward=True): + Args: + ops (list): A list of ops. + cur_op (Operator): Current operator which has variable named + var_name as output. + var_name (string): Variable name. + """ + post_op = [] + before = True + for op in ops: + if op == cur_op: + before = False + continue + if before: + continue + for in_var_name in op.input_arg_names: + if in_var_name == var_name: + post_op.append(op) + if post_op: + if not len(post_op) == 1: + raise ValueError("Each op can only have one post op.") + return post_op[0] + return None + + def _find_real_prev_op(self, ops, cur_op, var_name): """ - Find the inputs or outputs of a section + Find the real previous op that outputs variable named var_name. + + Args: + ops (list): A list of ops. + cur_op (Operator): Current operator which has variable named + var_name as input. + var_name (string): Variable name. """ - all_set = set() - part_set = set() + prev_op = [] for op in ops: - if is_forward: - part_set.update(op.desc.output_arg_names()) - else: - part_set.update(op.desc.input_arg_names()) - all_set.update(op.desc.output_arg_names()) - all_set.update(op.desc.input_arg_names()) - return all_set - part_set + if op == cur_op: + break + for out_var_name in op.output_arg_names: + if out_var_name == var_name: + prev_op.append(op) + if prev_op: + # A op may have more than one prev op, + # e.g., for 'learning_rate', there may be multiple ops have it as + # output. + return prev_op[-1] + return None + + def _rename_arg(self, op, old_name, new_name): + op_desc = op.desc + if isinstance(op_desc, tuple): + op_desc = op_desc[0] + op_desc._rename_input(old_name, new_name) + op_desc._rename_output(old_name, new_name) + + def _create_var(self, block, ref_var, name): + """ + Create a new var for block, which has the same type, + shape and dtype as ref_var, then rename it with the + name `name`. + """ + new_var = block.create_var( + name=name, + shape=ref_var.shape, + dtype=ref_var.dtype, + type=ref_var.type, + lod_level=ref_var.lod_level, + persistable=False, + is_data=False, + need_check_feed=ref_var.desc.need_check_feed()) + return new_var + + def _get_data_var_info(self, block): + """ + Get all vars whose is_data attribute are true and then rename them. - def _find_persistable_vars(self, ops, whole_parameters): + For PipelineTrainer, all data vars are binded to + minibatch scope, so we have to feed them to the microbatch + to avoid conflicts. The vars feeded to microbatch have to + be renamed. """ - find the persistable input vars in current section + # A map from var name to the renamed name. + raw_name_new_name_map = dict() + # Because we will create vars in block, it is more safe + # to get all var_names before iteration. + var_names = list(block.vars.keys()) + for var_name in var_names: + var = block.var(var_name) + if not var.is_data: + continue + assert var_name not in raw_name_new_name_map, ( + "{} has already been processed.".format(var_name)) + new_name = unique_name.generate(var_name) + raw_name_new_name_map[var_name] = new_name + new_var = self._create_var(block, var, new_name) + new_var.is_data = False + + # map of data to devices that that data on + data_devices_map = dict() + for op in block.ops: + dev_spec = op.attr(self._op_device_key) + for var_name in op.input_arg_names: + if var_name not in raw_name_new_name_map: + continue + if not var_name in data_devices_map: + data_devices_map[var_name] = [] + if not dev_spec in data_devices_map[var_name]: + data_devices_map[var_name].append(dev_spec) + new_name = raw_name_new_name_map[var_name] + #self._rename_arg(op, var_name, new_name) + return data_devices_map, raw_name_new_name_map + + def _rename_var_in_block(self, block, raw_name_new_name_map): """ - res = set() - for op in ops: - vars = op.desc.input_arg_names() - for var in vars: - if var in whole_parameters: - res.add(var) - return res + Rename vars whose names in raw_name_new_name_map to the corresponding + new names. + """ + for op in block.ops: + if op.type == "enqueue" or op.type == "dequeue": + continue + for var_name in op.input_arg_names: + if var_name in raw_name_new_name_map: + new_name = raw_name_new_name_map[var_name] + self._rename_arg(op, var_name, new_name) - def _is_opt_role_op(self, op): - op_maker = core.op_proto_and_checker_maker - optimize_role = core.op_proto_and_checker_maker.OpRole.Optimize - if op_maker.kOpRoleAttrName() in op.attr_names and \ - int(op.all_attrs()[op_maker.kOpRoleAttrName()]) & int(optimize_role) != 0: - return True - return False + def _insert_enq_deq_for_data_var(self, main_block, programs, startup, + devices): + """ + Insert enqueue and dequeue ops for data var - def _is_lr_role_op(self, op): - op_maker = core.op_proto_and_checker_maker - optimize_role = core.op_proto_and_checker_maker.OpRole.LRSched - if op_maker.kOpRoleAttrName() in op.attr_names and \ - int(op.all_attrs()[op_maker.kOpRoleAttrName()]) == int(optimize_role): - return True - return False + Args: + main_block (Block): Global block for main program + programs (dict): Dictionary for section params + startup (Program): Startup program + devices (list): List of devices in the format (dev:dev_index) + """ + main_program = main_block.program + data_devices_map, raw_name_new_name_map = self._get_data_var_info( + main_block) + + first_prog = programs[0]['program'] + first_block = first_prog.block(0) + enqueue_index = 0 + if first_block.ops[0].type == "create_py_reader" or ( + first_block.ops[1].type == "create_py_reader"): + for op in first_block.ops: + if op.type == "read": + enqueue_index += 1 + break + enqueue_index += 1 + first_dev_spec = devices[0] + for var_name in data_devices_map.keys(): + for device in data_devices_map[var_name]: + # step1: generate queue for each pair of data var and device + # that that data on + queue_name = var_name + "_blocking_queue" + queue_name = unique_name.generate(queue_name) + queue_var = startup.block(0).create_var( + name=queue_name, + persistable=True, + type=core.VarDesc.VarType.RAW) + startup.block(0).append_op( + type='queue_generator', + attrs={ + 'names': [queue_name], + 'capacity': self._num_microbatches + }) + main_var = main_block.var(var_name) + assert main_var.is_data + if not var_name in first_block.vars: + self._create_var(first_block, main_var, var_name) + first_block._insert_op( + index=enqueue_index, + type='enqueue', + inputs={'X': first_block.var(var_name)}, + attrs={ + 'queue_name': queue_name, + self._op_device_key: first_dev_spec, + self._op_role_key: self._op_role.Forward + }) + # Get the device that that data on + assert device in devices + prog_index = devices.index(device) + prog = programs[prog_index]['program'] + block = prog.block(0) + index = 0 + if device == first_dev_spec: + index = enqueue_index + 1 + new_name = raw_name_new_name_map[var_name] + source_var = main_program.block(0).var(var_name) + new_var = self._create_var(block, source_var, new_name) + block._insert_op( + index=index, + type='dequeue', + outputs={'Out': [new_var]}, + attrs={ + self._op_device_key: device, + self._op_role_key: self._op_role.Forward, + 'queue_name': queue_name, + }) + self._rename_var_in_block(block, raw_name_new_name_map) + + def _strip_grad_suffix(self, name): + """ + Strip the grad suffix from the given variable name + """ + pos = name.find(core.grad_var_suffix()) + return name[:pos] if pos != -1 else name - def _extract_section_ops(self, ops, cut_point_name): + def _append_grad_suffix(self, name): + """ + Append grad suffix to the given variable name + """ + return name + core.grad_var_suffix() + + def _update_param_device_map(self, params_grads, block): + for param_grad in params_grads: + if not param_grad[0].trainable: continue + param_name = param_grad[0].name + ops = block.ops + for op in ops: + input_arg_names = op.input_arg_names + if param_name in input_arg_names: + self._param_device_map[param_name] = op.attr( + self._op_device_key) + break + + def _add_opdevice_attr_for_regularization_clip(self, block): """ - Extract ops in the given section + Add op_device attribute for regulization and clip ops. """ - output_names = set(cut_point_name) - relevant_op_flags = [True] * len(ops) - for i, op in reversed(list(enumerate(ops))): - if not self._is_opt_role_op(op) and _some_in_set_( - op.desc.output_arg_names(), output_names): - for name in op.desc.input_arg_names(): - output_names.add(name) - elif op.desc.type() == "print" and op.desc.input_arg_names()[ - 0] in output_names: + for op in block.ops: + # role for regularization and clip ops is optimize + if int(op.attr(self._op_role_key)) != int(self._op_role.Optimize): continue - else: - relevant_op_flags[i] = False + if op.has_attr(self._op_device_key) and ( + op.attr(self._op_device_key) != ""): + continue + assert self._op_role_var_key in op.attr_names + op_role_var = op.all_attrs()[self._op_role_var_key] + assert len(op_role_var) == 2 + param_name = block.vars[op_role_var[0]].name + device = self._param_device_map[param_name] + op._set_attr(self._op_device_key, device) - op_path = [ops[i] for i in range(len(ops)) if relevant_op_flags[i]] - return op_path + def _add_default_opdevice_attr(self, block): + """ + 1. Add default op_device attribute for lr-related ops. + The default value is the one that of the first place. + 2. Add default op_device attribute for sum ops added during + backward. For these ops, we set the op_device attribute + as the one of its post op, i.e, which op has the output of the + sum op as an input. + """ + first_devcie = "" + + # Get the device spec of the first place. + # device_spec: 'cpu' for cpu device and 'gpu:id' for gpu device, + # e.g. 'gpu:0', 'gpu:1', etc. + for op in block.ops: + if op.has_attr(self._op_device_key) and ( + op.attr(self._op_device_key) != ""): + first_device = op.attr(self._op_device_key) + break + assert first_device + + # set op_device attr for lr-related ops + lrsched_role = int(self._op_role.LRSched) + for op in block.ops: + if not op.has_attr(self._op_device_key) or ( + op.attr(self._op_device_key) == ""): + if op.type == "sum": + # For sum ops that compute the sum of @RENAMED@ vars + for name in op.desc.input_arg_names(): + assert '@RENAME@' in name + assert len(op.desc.output_arg_names()) == 1 + out_name = op.desc.output_arg_names()[0] + post_op = self._find_post_op(block.ops, op, out_name) + device = post_op.attr(self._op_device_key) + assert device + op._set_attr(self._op_device_key, device) + continue - def _find_section_opt(self, ops, params): - res = self._extract_section_opt_ops(ops, params) - return res + assert op.attr(self._op_role_key) == lrsched_role, ( + "Op whose op_device attr has not been set for pipeline" + " must be of the role LRSched.") + op._set_attr(self._op_device_key, first_device) - def _split_program(self, main_program, cut_list): - programs = [] - block = main_program.block(0) - whole_parameters = [e.name for e in block.all_parameters()] - cut_var_names = [] - cut_len = len(cut_list) - sec_params = [] - for i, cut_vars in enumerate(cut_list[:-1]): - cut_var_names.append([cut_var.name for cut_var in cut_vars]) - for i, cut_vars in reversed(list(enumerate(cut_list[:-1]))): - cut_var_names.append( - [_append_grad_suffix_(cut_var.name) for cut_var in cut_vars]) - if i == 0: - cut_var_names[-1] += [var.name for var in cut_list[-1]] - ops = block.ops[:] - for i, cut_vars in enumerate(cut_var_names): - program = { - "program": Program(), - "input_set": set(), - "output_set": set() - } - cur_ops = self._extract_section_ops(ops, cut_vars) - if i == 0: - for op in ops: - if self._is_lr_role_op(op): - cur_ops.append(op) - #prevent inplace in/out - program["input_set"].update( - self._find_input_output( - cur_ops, [], is_forward=True)) - for e in cur_ops: - ops.remove(e) - - if i < cut_len: - sec_params.append( - self._find_persistable_vars(cur_ops, whole_parameters)) - if i >= cut_len - 1: - opt_ops = self._find_section_opt( - ops, sec_params[2 * cut_len - 2 - i]) - - for e in opt_ops: - ops.remove(e) - cur_ops += opt_ops - - op_descs = [op.desc for op in cur_ops] - for op_desc in op_descs: - ap_op = program["program"].block(0).desc.append_op() - ap_op.copy_from(op_desc) - program["input_set"].update( - self._find_input_output( - cur_ops, cut_vars, is_forward=True)) - program["input_set"].update(sec_params[min(i, 2 * cut_len - 2 - i)]) - program["output_set"].update( - self._find_input_output( - cur_ops, cut_vars, is_forward=False)) - programs.append(program) - program = { - "program": Program(), - "input_set": set(), - "output_set": set() - } - op_descs = [op.desc for op in ops] - for op_desc in op_descs: - ap_op = program["program"].block(0).desc.append_op() - ap_op.copy_from(op_desc) - program["input_set"].update( - [cut_var.name + "@GRAD" for cut_var in cut_list[0]]) - program["input_set"].update( - self._find_input_output( - ops, [], is_forward=True)) - program["input_set"].update(sec_params[0]) - programs.append(program) - inputs = set() - for program in reversed(list(programs)): - output_list = list(program["output_set"]) - for output in output_list: - if output not in inputs: - program["output_set"].remove(output) - inputs.update(program["input_set"]) - return programs + def _check_validation(self, block): + """ + Check whether ops in a block are all validate (i.e., the + op_device attribute has been set). + Then, return all device specifications in order. + """ + device_specs = [] + for op in block.ops: + type = op.type + if not op._has_kernel(type): + assert op.type == "conditional_block" and ( + op.attr(self._op_role_key) == int(self._op_role.LRSched)), ( + "Now, the only supported op without kernel is " + "conditional_block, and its op role must be LRSched.") + assert op.has_attr(self._op_device_key), ( + "op ({}) has no {} attribute.".format(op.type, + self._op_device_key)) + dev_spec = op.attr(self._op_device_key) + assert dev_spec, ("op_device attribute for op " + "{} has not been set.".format(op.type)) + if not dev_spec in device_specs: + device_specs.append(dev_spec) + return device_specs + + def _insert_enq_deq_ops_for_boundaries(self, block, origin_block, + startup_program): + """ + Insert a pair of enqueue and dequeue ops for every two + consecutive ops on different devices. + """ + startup_block = startup_program.global_block() + extra_index = 0 + + # A map from var to device spec where op takes it as input, + # avoiding multiple enqueue and dequeue ops. + var_devspec = dict() + + for index, op in list(enumerate(origin_block.ops)): + cur_device_spec = op.attr(self._op_device_key) + for var_name in op.input_arg_names: + # i.e., lod_tensor_blocking_queue created by DataLoader, + # which only exists in startup program. + if not var_name in origin_block.vars: continue + var = block.var(var_name) + # skip data, because we will process it later + if var.is_data: continue + prev_op = self._find_real_prev_op(origin_block.ops, op, + var_name) + if prev_op is None: + continue + prev_device_spec = prev_op.attr(self._op_device_key) + + if prev_device_spec != cur_device_spec: + if var_name not in var_devspec: + var_devspec[var_name] = [] + if cur_device_spec in var_devspec[var_name]: continue + var_devspec[var_name].append(cur_device_spec) + + queue_name = var_name + "_blocking_queue" + queue_name = unique_name.generate(queue_name) + queue_var = startup_block.create_var( + name=queue_name, + persistable=True, + type=core.VarDesc.VarType.RAW) + startup_block.append_op( + type='queue_generator', + attrs={ + 'names': [queue_name], + 'capacity': self._num_microbatches + }) + op_role = op.all_attrs()[self._op_role_key] + var = block.vars[var_name] + block._insert_op( + index=index + extra_index, + type='enqueue', + inputs={'X': var}, + attrs={ + 'queue_name': queue_name, + self._op_device_key: prev_device_spec, + self._op_role_key: op_role + }) + extra_index += 1 + block._insert_op( + index=index + extra_index, + type='dequeue', + outputs={'Out': [var]}, + attrs={ + self._op_device_key: cur_device_spec, + 'queue_name': queue_name, + self._op_role_key: op_role + }) + extra_index += 1 + + def _add_dequeue_ops_for_optimize(self, block, startup_program): + startup_block = startup_program.global_block() + grad_queue_map = dict() + grad_device_map = dict() + optimize_index = None + grad_names_to_dequeue = [] + + for index, op in reversed(list(enumerate(block.ops))): + device = op.attr(self._op_device_key) + # Optimizer pass + if not self._is_optimize_op(op): + optimize_index = index + 1 + break + if not self._is_update_op(op): continue + assert self._op_role_var_key in op.attr_names + op_role_var = op.all_attrs()[self._op_role_var_key] + assert len(op_role_var) == 2 + grad_name = op_role_var[1] + assert grad_name not in grad_device_map + assert grad_name not in grad_names_to_dequeue + grad_device_map[grad_name] = device + grad_names_to_dequeue.append(grad_name) + + for grad_name in grad_names_to_dequeue: + device = grad_device_map[grad_name] + grad_names = [] + grads = [] + queue_name = grad_name + "_blocking_queue" + queue_name = unique_name.generate(queue_name) + grad_queue_map[grad_name] = queue_name + ref_var = block.vars[grad_name] + queue_var = startup_block.create_var( + name=queue_name, + persistable=True, + type=core.VarDesc.VarType.RAW) + startup_block.append_op( + type='queue_generator', + attrs={ + 'names': [queue_name], + 'capacity': self._num_microbatches + }) + orig_var_name = self._strip_grad_suffix(grad_name) + for _ in range(self._num_microbatches): + u_name = unique_name.generate(orig_var_name) + u_grad_name = self._append_grad_suffix(u_name) + grad_var = self._create_var(block, ref_var, u_grad_name) + grad_names.append(u_grad_name) + grads.append(grad_var) + block._insert_op( + index=optimize_index, + type='dequeue', + outputs={'Out': grads}, + attrs={ + self._op_device_key: device, + 'queue_name': queue_name, + self._op_role_key: self._op_role.Optimize + }) + block._insert_op( + index=optimize_index + 1, + type='sum', + inputs={'X': grad_names}, + outputs={'Out': ref_var}, + attrs={ + self._op_device_key: device, + self._op_role_key: self._op_role.Optimize + }) + return grad_queue_map + + def _insert_enq_deq_ops_for_update(self, block, startup_program): + """ + Insert enqueue and dequeue ops for gradients of parameters. + """ + startup_block = startup_program.global_block() + grad_queue_map = self._add_dequeue_ops_for_optimize(block, + startup_program) + + for index, op in reversed(list(enumerate(block.ops))): + offset = index + device = op.attr(self._op_device_key) + + # Backward pass + if self._is_loss_grad_op(op): + loss_grad_var = block.vars[op.output_arg_names[0]] + scale_factor = self._num_microbatches + block._insert_op( + index=index + 1, + type='scale', + inputs={'X': loss_grad_var}, + outputs={'Out': loss_grad_var}, + attrs={ + 'scale': 1.0 / scale_factor, + self._op_device_key: device, + self._op_role_key: self._op_role.Backward + }) + break + if self._is_backward_op(op) and ( + self._op_role_var_key in op.attr_names): + op_role_var = op.all_attrs()[self._op_role_var_key] + + if len(op_role_var) == 0: + continue + assert len(op_role_var) % 2 == 0 + for i in range(0, len(op_role_var), 2): + grad_name = op_role_var[i + 1] + grad_var = block.vars[grad_name] + assert grad_name in grad_queue_map + queue_name = grad_queue_map[grad_name] + block._insert_op( + index=offset + 1, + type='enqueue', + inputs={'X': block.vars[grad_name]}, + attrs={ + 'queue_name': queue_name, + self._op_device_key: device, + self._op_role_key: self._op_role.Backward + }) + offset += 1 + + def _add_sub_blocks(self, main_block, program_list): + main_program = main_block.program + for prog_info in program_list: + prog = prog_info['program'] + for op in prog.block(0).ops: + if not op.has_attr('sub_block'): + continue + origin_sub_block_id = op.attr('sub_block').id + origin_sub_block = main_program.block(origin_sub_block_id) + new_sub_block = prog._create_block(parent_idx=0) + for op in origin_sub_block.ops: + op_desc = op.desc + ap_op = new_sub_block.desc.append_op() + ap_op.copy_from(op_desc) + new_sub_block._sync_with_cpp() + op._set_attr('sub_block:', new_sub_block) + + def _get_device_info(self, block): + for op in block.ops: + if not op._has_kernel(op.type): continue + op_device = op.attr(self._op_device_key) + return op_device + + def _process_persistable_vars_in_multi_sections(self, main_program, + startup_prog, program_list): + """ + Special Case: process persistable vars that exist in + multiple sections, e.g., shared weight + """ + # var_info = {var_name: [program1, program2...]}, + # persistable var only + var_info = dict() + for prog_info in program_list: + prog = prog_info['program'] + block = prog.block(0) + for var_name in block.vars: + var = block.var(var_name) + if not var.persistable: continue + if not var_name in var_info: + var_info[var_name] = [] + if not prog in var_info[var_name]: + var_info[var_name].append(prog) + for var_name in list(var_info.keys()): + if len(var_info[var_name]) == 1: + var_info.pop(var_name) + + # write_info = {var_name: program}, where program is the only program + # in which the var named var_name is written. + write_info = dict() + for var_name in var_info.keys(): + for prog in var_info[var_name]: + block = prog.block(0) + for op in block.ops: + if op.type == "dequeue": continue + # We have processed lr related vars + if op.attr(self._op_role_key) == int( + self._op_role.Optimize.LRSched): + continue + if var_name in op.desc.output_arg_names(): + assert var_name not in write_info, ( + "two sections write the same var({}): second " + "op {}.".format(var_name, op)) + write_info[var_name] = prog + break + + for var_name in var_info.keys(): + # Case 1: read only variables, no special process + if not var_name in write_info: continue + + # Case 2: one write multiple reads + write_prog = write_info[var_name] + write_block = write_prog.block(0) + write_device = self._get_device_info(write_block) + all_progs = var_info[var_name] + for prog in all_progs: + if prog == write_prog: continue + + queue_name = var_name + "_blocking_queue" + queue_name = unique_name.generate(queue_name) + queue_var = startup_prog.block(0).create_var( + name=queue_name, + persistable=True, + type=core.VarDesc.VarType.RAW) + startup_prog.block(0).append_op( + type='queue_generator', + attrs={ + 'names': [queue_name], + 'capacity': self._num_microbatches + }) + write_block._insert_op( + index=0, + type='enqueue', + inputs={'X': write_block.var(var_name), }, + attrs={ + 'queue_name': queue_name, + self._op_device_key: write_device, + # A trick to make the role LRSched to avoid copy every + # microbatch + self._op_role_key: self._op_role.LRSched + }) + read_block = prog.block(0) + read_device = self._get_device_info(read_block) + read_block._insert_op( + index=0, + type='dequeue', + outputs={'Out': [read_block.var(var_name)]}, + attrs={ + self._op_device_key: read_device, + # A trick to make the role LRSched to avoid copy every + # microbatch + self._op_role_key: self._op_role.LRSched, + 'queue_name': queue_name, + }) def minimize(self, loss, startup_program=None, parameter_list=None, no_grad_set=None): - self._optimizer.minimize(loss, startup_program, parameter_list, - no_grad_set) - program = loss.block.program - if len(self._cut_list) == 0: + main_block = loss.block + if startup_program is None: + startup_program = default_startup_program() + optimize_ops, params_grads = self._optimizer.minimize( + loss, startup_program, parameter_list, no_grad_set) + self._update_param_device_map(params_grads, main_block) + + # Step1: add default op_device attribute for regulization and clip ops + self._add_opdevice_attr_for_regularization_clip(main_block) + + # Step2: add default op_device attribute for ops whose op_device + # attribute have not been set yet. + self._add_default_opdevice_attr(main_block) + device_specs = self._check_validation(main_block) + + # Step3: add enqueue and dequeue ops between section boundaries + origin_prog = main_block.program.clone(for_test=False) + origin_main_block = origin_prog.global_block() + self._insert_enq_deq_ops_for_boundaries(main_block, origin_main_block, + startup_program) + + # Step4: add a pair of enqueue and dequeueN for parameter gradients + self._insert_enq_deq_ops_for_update(main_block, startup_program) + + main_program = main_block.program + + place_list = [] + place_id_list = [] + for dev_spec in device_specs: + if dev_spec == "cpu": + place_list.append(core.CPUPlace()) + place_id_list.append(-1) + elif "gpu" in dev_spec and ":" in dev_spec: + dev_index = dev_spec.split(":")[1] + place_list.append(core.CUDAPlace(int(dev_index))) + place_id_list.append(int(dev_index)) + else: + raise ValueError("Unknown device type: %s", dev_spec) + + # Step5: split program into sections and add pairs of + # enqueue and dequeue ops for data var. + if len(place_list) == 0: program_list = [] - ptmp = {"program": program, "input_set": set(), "output_set": set()} + ptmp = { + "program": main_program, + "input_set": set(), + "output_set": set() + } program_list.append(ptmp) else: - program_list = self._split_program(program, self._cut_list) + program_list = self._split_program(main_program) for p in program_list: - self._create_vars(p["program"].block(0), program) - whole_parameters = [e.name for e in program.block(0).all_parameters()] - param_need_sync = [] - for i, section_p in enumerate(program_list): - if not isinstance(self._place_list[i], core.CUDAPlace): - continue - section_var = [e for e in section_p["program"].block(0).vars] - for p in section_var: - if p in whole_parameters: - param_need_sync.append(p) - program._pipeline_opt = { + self._create_vars(p["program"].block(0), main_program) + self._insert_enq_deq_for_data_var(main_block, program_list, + startup_program, device_specs) + + # Step6: Special Case: process persistable vars that exist in + # multiple sections + self._process_persistable_vars_in_multi_sections( + main_program, startup_program, program_list) + + # Step7: Add sub blocks for section programs + self._add_sub_blocks(main_block, program_list) + + main_program._pipeline_opt = { "trainer": "PipelineTrainer", "device_worker": "Section", "section_program_list": program_list, - "place_list": self._place_list, - "concurrency_list": self._concurrency_list, - "queue_size": self._queue_size, + "place_list": place_list, + "place_id_list": place_id_list, + "sync_steps": -1, + "queue_size": self._num_microbatches, "start_cpu_core_id": self._start_cpu_core_id, - "sync_steps": self._sync_steps, - "param_need_sync": param_need_sync } + return optimize_ops, params_grads, program_list class RecomputeOptimizer(Optimizer): diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 692c76119ee..d26f2de0340 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -49,7 +49,6 @@ endif() LIST(REMOVE_ITEM TEST_OPS test_conv2d_transpose_op) if(WIN32) LIST(REMOVE_ITEM TEST_OPS test_boxps) - LIST(REMOVE_ITEM TEST_OPS test_paddlebox_datafeed) LIST(REMOVE_ITEM TEST_OPS test_trainer_desc) LIST(REMOVE_ITEM TEST_OPS test_multiprocess_reader_exception) LIST(REMOVE_ITEM TEST_OPS test_avoid_twice_initialization) @@ -89,7 +88,6 @@ endif() if(NOT WITH_GPU OR WIN32) LIST(REMOVE_ITEM TEST_OPS test_pipeline) LIST(REMOVE_ITEM TEST_OPS test_boxps) - LIST(REMOVE_ITEM TEST_OPS test_paddlebox_datafeed) endif() list(REMOVE_ITEM TEST_OPS test_seq_concat_op) # FIXME(helin): https://github.com/PaddlePaddle/Paddle/issues/8290 list(REMOVE_ITEM TEST_OPS test_lstm_unit_op) # # FIXME(qijun) https://github.com/PaddlePaddle/Paddle/issues/5185 diff --git a/python/paddle/fluid/tests/unittests/test_boxps.py b/python/paddle/fluid/tests/unittests/test_boxps.py index a61cc9747e9..0eba0e8f26e 100644 --- a/python/paddle/fluid/tests/unittests/test_boxps.py +++ b/python/paddle/fluid/tests/unittests/test_boxps.py @@ -87,117 +87,5 @@ class TestRunCmd(unittest.TestCase): self.assertTrue(ret2 == 0) -class TestBoxPSPreload(unittest.TestCase): - """ TestCases for BoxPS Preload """ - - def test_boxps_cpu(self): - self.run_boxps_preload(True, True) - self.run_boxps_preload(True, False) - - def test_boxps_gpu(self): - self.run_boxps_preload(False, True) - self.run_boxps_preload(False, False) - - def run_boxps_preload(self, is_cpu=True, random_with_lineid=False): - program = fluid.Program() - with fluid.program_guard(program): - x = fluid.layers.data( - name='x', shape=[1], dtype='int64', lod_level=0) - y = fluid.layers.data( - name='y', shape=[1], dtype='int64', lod_level=0) - z = layers.data(name='z', shape=[1], dtype='int64') - emb_x, emb_y = _pull_box_sparse([x, y], size=2) - emb_xp = _pull_box_sparse(x, size=2) - concat = layers.concat([emb_x, emb_y], axis=1) - fc = layers.fc(input=concat, - name="fc", - size=1, - num_flatten_dims=1, - bias_attr=False) - loss = layers.reduce_mean(fc) - place = fluid.CPUPlace( - ) if is_cpu or not core.is_compiled_with_cuda( - ) else fluid.CUDAPlace(0) - exe = fluid.Executor(place) - batch_size = 100 - - def binary_print(slot, fout): - fout.write(str(len(slot)) + " ") - for e in slot: - fout.write(str(e) + " ") - - batch1 = np.ones( - (batch_size, 2, 1)).astype("int64").reshape(batch_size, 2, 1) - filelist = [] - place_str = "cpu" if is_cpu else "gpu" - for i in range(2): - filelist.append("test_hdfs_" + place_str + "_" + str(i)) - for f in filelist: - with open(f, "w") as fout: - for ins in batch1: - for slot in ins: - binary_print(slot, fout) - fout.write("\n") - - def create_dataset(): - dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset") - dataset.set_date("20190930") - dataset.set_use_var([x, y]) - dataset.set_batch_size(2) - dataset.set_thread(1) - dataset.set_filelist(filelist) - return dataset - - datasets = [] - datasets.append(create_dataset()) - datasets.append(create_dataset()) - optimizer = fluid.optimizer.SGD(learning_rate=0.5) - optimizer = fluid.optimizer.PipelineOptimizer( - optimizer, - cut_list=[], - place_list=[place], - concurrency_list=[1], - queue_size=1, - sync_steps=-1) - optimizer.minimize(loss) - - program._pipeline_opt["dump_fields"] = [ - "fc.tmp_0", "fc.tmp_0@GRAD", "fake_var", "z", - "reduce_mean_3.tmp_0" - ] - # fake_var: not in scope - # z: in scope, but no initialized - # reduce_mean_0.tmp_0, dimension is not right - - program._pipeline_opt["dump_fields_path"] = "./dump_log/" - program._pipeline_opt["dump_param"] = ["fc.w_0"] - program._pipeline_opt["enable_random_dump"] = True - program._pipeline_opt["dump_interval"] = 10 - program._pipeline_opt["random_with_lineid"] = random_with_lineid - - exe.run(fluid.default_startup_program()) - datasets[0].load_into_memory() - datasets[0].begin_pass() - datasets[0].slots_shuffle([]) - datasets[1].preload_into_memory() - exe.train_from_dataset( - program=fluid.default_main_program(), - dataset=datasets[0], - print_period=1) - datasets[0].end_pass(True) - datasets[1].wait_preload_done() - datasets[1].begin_pass() - exe.train_from_dataset( - program=fluid.default_main_program(), - dataset=datasets[1], - print_period=1, - debug=True) - datasets[1].end_pass(False) - for f in filelist: - os.remove(f) - if os.path.isdir("dump_log"): - shutil.rmtree("dump_log") - - if __name__ == '__main__': unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_data_norm_op.py b/python/paddle/fluid/tests/unittests/test_data_norm_op.py index 0b7ed20f4b1..c766cf17f42 100644 --- a/python/paddle/fluid/tests/unittests/test_data_norm_op.py +++ b/python/paddle/fluid/tests/unittests/test_data_norm_op.py @@ -437,129 +437,6 @@ class TestDataNormOpWithSlotDim(OpTest): self.check_grad(['X'], 'Y', no_grad_set=set([])) -class TestDataNormOpWithSyncStats(unittest.TestCase): - """ - test class for data norm op - test forward and backward - """ - - def test_sync_stats(self): - if not core.is_compiled_with_cuda(): - return - if os.name == 'nt': - print( - 'Skip TestDataNormOpWithSyncStats because nccl is not supported on windows' - ) - return - x = fluid.layers.data(name='x', shape=[1], dtype='int64', lod_level=0) - emb = layers.embedding( - input=x, - param_attr=fluid.ParamAttr(name="embx"), - size=[10, 2], - is_sparse=False) - - dn = layers.data_norm( - input=emb, - name="hehe", - epsilon=1e-4, - param_attr={ - "batch_size": 1e4, - "batch_sum": 1e5, - "batch_square": 1e4 - }, - summary_decay_rate=1, - sync_stats=True) #[-1,3] - loss = layers.mean(dn) - - optimizer = fluid.optimizer.SGD(learning_rate=0.5) - optimizer = fluid.optimizer.PipelineOptimizer( - optimizer, - cut_list=[[emb], [loss]], - place_list=[ - fluid.CUDAPlace(0), fluid.CUDAPlace(0), fluid.CPUPlace() - ], - concurrency_list=[1, 1, 1], - queue_size=1, - sync_steps=10000000, ) - - all_p = fluid.default_main_program().global_block().all_parameters() - parameter_without_datanorm = [] - for e in all_p: - if e.name.find("batch_size") != -1 or e.name.find( - "batch_sq") != -1 or e.name.find("batch_sum") != -1: - continue - parameter_without_datanorm.append(e.name) - optimizer.minimize(loss, parameter_list=parameter_without_datanorm) - place = fluid.CUDAPlace(0) - exe = fluid.Executor(place) - #prepare data - batch_size = 1 - - def binary_print(slot, fout): - num = np.int16(len(slot) + 1) - num.tofile(fout) - a = np.int64(batch_size) - a.tofile(fout) - slot.tofile(fout) - - #batch1 = np.array([[0,1], [1,2], [2,3]]).astype("int64").reshape(batch_size,2,1) - #batch2 = np.array([[1,2], [2,3], [3,4]]).astype("int64").reshape(batch_size,2,1) - batch1 = np.ones( - (batch_size, 1)).astype("int64").reshape(batch_size, 1, 1) - batch2 = np.ones( - (batch_size, 1)).astype("int64").reshape(batch_size, 1, 1) - data = [batch1, batch2] - data = [batch1] - filelist = [] - for i in range(2): - filelist.append("test_pipeline_input_" + str(i)) - for f in filelist: - with open(f, "wb") as fout: - for batch_data in data: - for ins in batch_data: - for slot in ins: - binary_print(slot, fout) - - dataset = fluid.DatasetFactory().create_dataset("FileInstantDataset") - dataset.set_use_var([x]) - dataset.set_batch_size(batch_size) - dataset.set_filelist(filelist) - - block = fluid.default_startup_program().global_block() - block.append_op( - type='c_comm_init_all', attrs={'ring_id': 0, - 'devices': [0, 1]}) - with open("main_program", "w") as fout: - fout.write(str(fluid.default_main_program())) - with open("startup_program", "w") as fout: - fout.write(str(fluid.default_startup_program())) - exe.run(fluid.default_startup_program()) - emb_t = fluid.global_scope().find_var("embx").get_tensor() - para = np.ones((10, 2)).astype("float32") - emb_t.set(para, place) - for epoch in range(1): - exe.train_from_dataset( - fluid.default_main_program(), - dataset, - thread=2, - debug=False, - fetch_list=[], - fetch_info=[], - print_period=1) - batch_size = np.array(fluid.global_scope().find_var("hehe.batch_size") - .get_tensor()) - self.assertEqual(batch_size[0], 10002) - b = np.array(fluid.global_scope().find_var("hehe.batch_sum").get_tensor( - )) - self.assertEqual(b[0], 100002) - c = np.array(fluid.global_scope().find_var("hehe.batch_square_sum") - .get_tensor()) - self.assertEqual(c[0], 10162) - - for f in filelist: - os.remove(f) - - class TestDataNormOpErrorr(unittest.TestCase): def test_errors(self): with program_guard(Program(), Program()): diff --git a/python/paddle/fluid/tests/unittests/test_paddlebox_datafeed.py b/python/paddle/fluid/tests/unittests/test_paddlebox_datafeed.py deleted file mode 100644 index 94bc8ff2886..00000000000 --- a/python/paddle/fluid/tests/unittests/test_paddlebox_datafeed.py +++ /dev/null @@ -1,146 +0,0 @@ -# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from __future__ import print_function -import paddle.fluid as fluid -import paddle.fluid.core as core -import os -import unittest -import paddle.fluid.layers as layers - - -class TestDataFeed(unittest.TestCase): - """ TestBaseCase(Merge PV) """ - - def setUp(self): - self.batch_size = 10 - self.pv_batch_size = 10 - self.enable_pv_merge = True - self.merge_by_sid = True - - def set_data_config(self): - self.dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset") - self.dataset.set_feed_type("PaddleBoxDataFeed") - self.dataset.set_parse_logkey(True) - self.dataset.set_thread(1) - self.dataset.set_enable_pv_merge(self.enable_pv_merge) - self.dataset.set_batch_size(self.batch_size) - if self.enable_pv_merge: - self.dataset.set_merge_by_sid(self.merge_by_sid) - self.dataset.set_rank_offset("rank_offset") - self.dataset.set_pv_batch_size(self.pv_batch_size) - - def test_pboxdatafeed(self): - self.run_dataset(False) - - def test_pboxdatafeed(self): - self.run_dataset(True) - - def run_dataset(self, is_cpu): - x = fluid.layers.data(name='x', shape=[1], dtype='int64', lod_level=0) - y = fluid.layers.data(name='y', shape=[1], dtype='int64', lod_level=0) - rank_offset = fluid.layers.data( - name="rank_offset", - shape=[-1, 7], - dtype="int32", - lod_level=0, - append_batch_size=False) - - emb_x, emb_y = fluid.contrib.layers._pull_box_extended_sparse( - [x, y], size=2, extend_size=128) - concat = layers.concat([emb_x[0], emb_x[1], emb_y[0], emb_y[1]], axis=1) - fc = layers.fc(input=concat, - name="fc", - size=1, - num_flatten_dims=1, - bias_attr=False) - loss = layers.reduce_mean(fc) - place = fluid.CPUPlace() if is_cpu or not core.is_compiled_with_cuda( - ) else fluid.CUDAPlace(0) - exe = fluid.Executor(place) - - with open("test_run_with_dump_a.txt", "w") as f: - data = "1 1702f830eee19501ad7429505f714c1d 1 1 1 9\n" - data += "1 1702f830eee19502ad7429505f714c1d 1 2 1 8\n" - data += "1 1702f830eee19503ad7429505f714c1d 1 3 1 7\n" - data += "1 1702f830eee0de01ad7429505f714c2d 1 4 1 6\n" - data += "1 1702f830eee0df01ad7429505f714c3d 1 5 1 5\n" - data += "1 1702f830eee0df02ad7429505f714c3d 1 6 1 4\n" - f.write(data) - with open("test_run_with_dump_b.txt", "w") as f: - data = "1 1702f830fff22201ad7429505f715c1d 1 1 1 1\n" - data += "1 1702f830fff22202ad7429505f715c1d 1 2 1 2\n" - data += "1 1702f830fff22203ad7429505f715c1d 1 3 1 3\n" - data += "1 1702f830fff22101ad7429505f714ccd 1 4 1 4\n" - data += "1 1702f830fff22102ad7429505f714ccd 1 5 1 5\n" - data += "1 1702f830fff22103ad7429505f714ccd 1 6 1 6\n" - data += "1 1702f830fff22104ad7429505f714ccd 1 6 1 7\n" - f.write(data) - - self.set_data_config() - self.dataset.set_use_var([x, y]) - self.dataset.set_filelist( - ["test_run_with_dump_a.txt", "test_run_with_dump_b.txt"]) - - optimizer = fluid.optimizer.SGD(learning_rate=0.5) - optimizer = fluid.optimizer.PipelineOptimizer( - optimizer, - cut_list=[], - place_list=[place], - concurrency_list=[1], - queue_size=1, - sync_steps=-1) - optimizer.minimize(loss) - exe.run(fluid.default_startup_program()) - self.dataset.set_current_phase(1) - self.dataset.load_into_memory() - self.dataset.preprocess_instance() - self.dataset.begin_pass() - pv_num = self.dataset.get_pv_data_size() - - exe.train_from_dataset( - program=fluid.default_main_program(), - dataset=self.dataset, - print_period=1) - self.dataset.set_current_phase(0) - self.dataset.postprocess_instance() - exe.train_from_dataset( - program=fluid.default_main_program(), - dataset=self.dataset, - print_period=1) - self.dataset.end_pass(True) - os.remove("test_run_with_dump_a.txt") - os.remove("test_run_with_dump_b.txt") - - -class TestDataFeed2(TestDataFeed): - """ TestBaseCase(Merge PV not merge by sid) """ - - def setUp(self): - self.batch_size = 10 - self.pv_batch_size = 10 - self.enable_pv_merge = True - self.merge_by_sid = False - - -class TestDataFeed3(TestDataFeed): - """ TestBaseCase(Not Merge PV) """ - - def setUp(self): - self.batch_size = 10 - self.pv_batch_size = 10 - self.enable_pv_merge = False - - -if __name__ == '__main__': - unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_pipeline.py b/python/paddle/fluid/tests/unittests/test_pipeline.py index dbf14e04757..40a1e0b0248 100644 --- a/python/paddle/fluid/tests/unittests/test_pipeline.py +++ b/python/paddle/fluid/tests/unittests/test_pipeline.py @@ -19,137 +19,162 @@ import numpy as np import os import shutil import unittest - - -class TestPipelineConfig(unittest.TestCase): - """ TestCases for Config in Pipeline Training. """ - - def config(self, filelist_length, pipeline_num, reader_concurrency): - filelist = [] - for i in range(filelist_length): - filelist.append("file" + str(i)) - self.dataset.set_filelist(filelist) - self.pipeline_opt["concurrency_list"][0] = reader_concurrency - self.pipeline_num = pipeline_num - - def helper(self, in_filelist_length, in_pipeline_num, in_reader_concurrency, - out_pipeline_num, out_reader_concurrency, out_dataset_thread): - self.config(in_filelist_length, in_pipeline_num, in_reader_concurrency) - res = self.exe._adjust_pipeline_resource( - self.pipeline_opt, self.dataset, self.pipeline_num) - self.assertEqual(self.pipeline_opt["concurrency_list"][0], - out_reader_concurrency) - self.assertEqual(res, out_pipeline_num) - self.assertEqual(self.dataset.thread_num, out_dataset_thread) - - def test_adjust_pipeline_resource(self): - self.exe = fluid.Executor(fluid.CPUPlace()) - self.dataset = fluid.DatasetFactory().create_dataset( - "FileInstantDataset") - self.pipeline_opt = {"concurrency_list": [0, 1, 2]} - self.pipeline_num = 0 - - self.helper(7, 2, 2, 2, 2, 4) - self.helper(7, 2, 3, 2, 3, 6) - self.helper(7, 2, 4, 2, 3, 6) - - self.helper(8, 2, 3, 2, 3, 6) - self.helper(8, 2, 4, 2, 4, 8) - self.helper(8, 2, 5, 2, 4, 8) - - self.helper(3, 4, 1, 3, 1, 3) - self.helper(3, 4, 2, 3, 1, 3) +import math + + +def conv_bn_layer(input, num_filters, filter_size, stride=1, groups=1, + act=None): + conv = fluid.layers.conv2d( + input=input, + num_filters=num_filters, + filter_size=filter_size, + stride=stride, + padding=(filter_size - 1) // 2, + groups=groups, + act=None, + bias_attr=False) + return fluid.layers.batch_norm( + input=conv, + act=act, ) + + +def shortcut(input, ch_out, stride, is_first): + ch_in = input.shape[1] + if ch_in != ch_out or stride != 1 or is_first == True: + return conv_bn_layer(input, ch_out, 1, stride) + else: + return input + + +def bottleneck_block(input, num_filters, stride): + conv0 = conv_bn_layer( + input=input, num_filters=num_filters, filter_size=1, act='relu') + conv1 = conv_bn_layer( + input=conv0, + num_filters=num_filters, + filter_size=3, + stride=stride, + act='relu') + conv2 = conv_bn_layer( + input=conv1, num_filters=num_filters * 4, filter_size=1, act=None) + + short = shortcut(input, num_filters * 4, stride, is_first=False) + + return fluid.layers.elementwise_add(x=short, y=conv2, act='relu') + + +def basic_block(input, num_filters, stride, is_first): + conv0 = conv_bn_layer( + input=input, + num_filters=num_filters, + filter_size=3, + act='relu', + stride=stride) + conv1 = conv_bn_layer( + input=conv0, num_filters=num_filters, filter_size=3, act=None) + short = shortcut(input, num_filters, stride, is_first) + return fluid.layers.elementwise_add(x=short, y=conv1, act='relu') + + +def build_network(input, layers=50, class_dim=1000): + supported_layers = [18, 34, 50, 101, 152] + assert layers in supported_layers + depth = None + if layers == 18: + depth = [2, 2, 2, 2] + elif layers == 34 or layers == 50: + depth = [3, 4, 6, 3] + elif layers == 101: + depth = [3, 4, 23, 3] + elif layers == 152: + depth = [3, 8, 36, 3] + num_filters = [64, 128, 256, 512] + with fluid.device_guard("cpu"): + conv = conv_bn_layer( + input=input, num_filters=64, filter_size=7, stride=2, act='relu') + conv = fluid.layers.pool2d( + input=conv, + pool_size=3, + pool_stride=2, + pool_padding=1, + pool_type='max') + if layers >= 50: + for block in range(len(depth)): + with fluid.device_guard("cpu"): + for i in range(depth[block]): + conv = bottleneck_block( + input=conv, + num_filters=num_filters[block], + stride=2 if i == 0 and block != 0 else 1) + + with fluid.device_guard("gpu:0"): + pool = fluid.layers.pool2d( + input=conv, pool_size=7, pool_type='avg', global_pooling=True) + stdv = 1.0 / math.sqrt(pool.shape[1] * 1.0) + out = fluid.layers.fc( + input=pool, + size=class_dim, + param_attr=fluid.param_attr.ParamAttr( + initializer=fluid.initializer.Uniform(-stdv, stdv))) + else: + for block in range(len(depth)): + with fluid.device_guard("cpu"): + for i in range(depth[block]): + conv = basic_block( + input=conv, + num_filters=num_filters[block], + stride=2 if i == 0 and block != 0 else 1, + is_first=block == i == 0) + with fluid.device_guard("gpu:0"): + pool = fluid.layers.pool2d( + input=conv, pool_size=7, pool_type='avg', global_pooling=True) + stdv = 1.0 / math.sqrt(pool.shape[1] * 1.0) + out = fluid.layers.fc( + input=pool, + size=class_dim, + param_attr=fluid.param_attr.ParamAttr( + initializer=fluid.initializer.Uniform(-stdv, stdv))) + return out class TestPipeline(unittest.TestCase): """ TestCases for Pipeline Training. """ def test_pipeline(self): - x = fluid.layers.data(name='x', shape=[1], dtype='int64', lod_level=0) - y = fluid.layers.data(name='y', shape=[1], dtype='int64', lod_level=0) - emb_x = layers.embedding( - input=x, - param_attr=fluid.ParamAttr(name="embx"), - size=[10, 2], - is_sparse=False) - emb_y = layers.embedding( - input=y, - param_attr=fluid.ParamAttr( - name="emby", learning_rate=0.9), - size=[10, 2], - is_sparse=False) - - concat = layers.concat([emb_x, emb_y], axis=1) - - fc = layers.fc(input=concat, - name="fc", - size=1, - num_flatten_dims=1, - bias_attr=False) - loss = layers.reduce_mean(fc) - - optimizer = fluid.optimizer.SGD(learning_rate=0.5) + with fluid.device_guard("cpu"): + image = fluid.layers.data( + name="image", shape=[3, 224, 224], dtype="float32") + label = fluid.layers.data(name="label", shape=[1], dtype="int64") + data_loader = fluid.io.DataLoader.from_generator( + feed_list=[image, label], + capacity=64, + use_double_buffer=True, + iterable=False) + fc = build_network(image, layers=50) + with fluid.device_guard("gpu:0"): + out, prob = fluid.layers.softmax_with_cross_entropy( + logits=fc, label=label, return_softmax=True) + loss = fluid.layers.mean(out) + acc_top1 = fluid.layers.accuracy(input=prob, label=label, k=1) + acc_top5 = fluid.layers.accuracy(input=prob, label=label, k=5) + + base_lr = 0.1 + passes = [30, 60, 80, 90] + total_images = 1281167 + steps_per_pass = total_images // 128 + bd = [steps_per_pass * p for p in passes] + lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)] + lr_val = fluid.layers.piecewise_decay(boundaries=bd, values=lr) + optimizer = fluid.optimizer.Momentum( + lr_val, + momentum=0.9, + regularization=fluid.regularizer.L2Decay(1e-4)) optimizer = fluid.optimizer.PipelineOptimizer( - optimizer, - cut_list=[[emb_x, emb_y], [loss]], - place_list=[ - fluid.CPUPlace(), fluid.CUDAPlace(0), fluid.CPUPlace() - ], - concurrency_list=[1, 1, 1], - queue_size=1, - sync_steps=10000000, ) + optimizer, num_microbatches=2) optimizer.minimize(loss) - place = fluid.CPUPlace() - exe = fluid.Executor(place) - exe.run(fluid.default_startup_program()) - #prepare data - batch_size = 100 - - def binary_print(slot, fout): - num = np.int16(len(slot) + 1) - num.tofile(fout) - a = np.int64(batch_size) - a.tofile(fout) - slot.tofile(fout) - - #batch1 = np.array([[0,1], [1,2], [2,3]]).astype("int64").reshape(batch_size,2,1) - #batch2 = np.array([[1,2], [2,3], [3,4]]).astype("int64").reshape(batch_size,2,1) - batch1 = np.ones( - (batch_size, 2, 1)).astype("int64").reshape(batch_size, 2, 1) - batch2 = np.ones( - (batch_size, 2, 1)).astype("int64").reshape(batch_size, 2, 1) - data = [batch1, batch2] - filelist = [] - for i in range(2): - filelist.append("test_pipeline_input_" + str(i)) - for f in filelist: - with open(f, "wb") as fout: - for batch_data in data: - for ins in batch_data: - for slot in ins: - binary_print(slot, fout) - - dataset = fluid.DatasetFactory().create_dataset("FileInstantDataset") - dataset.set_use_var([x, y]) - dataset.set_batch_size(batch_size) - dataset.set_filelist(filelist) - - for epoch in range(1): - exe.train_from_dataset( - fluid.default_main_program(), - dataset, - thread=1, - debug=False, - fetch_list=[], - fetch_info=[], - print_period=1) - - for f in filelist: - os.remove(f) - - def single_section(self, random_dump): - program = fluid.Program() - with fluid.program_guard(program): + + def test_pipeline_noneoptimizer(self): + with fluid.device_guard("gpu:0"): x = fluid.layers.data( name='x', shape=[1], dtype='int64', lod_level=0) y = fluid.layers.data( @@ -159,94 +184,18 @@ class TestPipeline(unittest.TestCase): param_attr=fluid.ParamAttr(name="embx"), size=[10, 2], is_sparse=False) - emb_y = layers.embedding( - input=y, - param_attr=fluid.ParamAttr( - name="emby", learning_rate=0.9), - size=[10, 2], - is_sparse=False) - concat = layers.concat([emb_x, emb_y], axis=1) - - fc = layers.fc(input=concat, + fc = layers.fc(input=emb_x, name="fc", size=1, num_flatten_dims=1, bias_attr=False) loss = layers.reduce_mean(fc) - optimizer = fluid.optimizer.SGD(learning_rate=0.5) + optimizer = fluid.optimizer.SGD(learning_rate=0.5) + with self.assertRaises(ValueError): optimizer = fluid.optimizer.PipelineOptimizer( - optimizer, - cut_list=[], - #place_list=[fluid.CPUPlace()], - place_list=[fluid.CUDAPlace(0)], - concurrency_list=[1], - queue_size=1, - sync_steps=-1) - optimizer.minimize(loss) - - program._pipeline_opt["dump_fields"] = ["fc.tmp_0", "fc.tmp_0@GRAD"] - program._pipeline_opt["dump_fields_path"] = "./dump_log/" - program._pipeline_opt["dump_param"] = ["embx"] - program._pipeline_opt["enable_random_dump"] = random_dump - program._pipeline_opt["dump_interval"] = 10 - program._pipeline_opt["random_with_lineid"] = False - #print(program._pipeline_opt) - place = fluid.CPUPlace() - exe = fluid.Executor(place) - exe.run(fluid.default_startup_program()) - #prepare data - batch_size = 100 - - def binary_print(slot, fout): - num = np.int16(len(slot) + 1) - num.tofile(fout) - a = np.int64(batch_size) - a.tofile(fout) - slot.tofile(fout) - - #batch1 = np.array([[0,1], [1,2], [2,3]]).astype("int64").reshape(batch_size,2,1) - #batch2 = np.array([[1,2], [2,3], [3,4]]).astype("int64").reshape(batch_size,2,1) - batch1 = np.ones( - (batch_size, 2, 1)).astype("int64").reshape(batch_size, 2, 1) - batch2 = np.ones( - (batch_size, 2, 1)).astype("int64").reshape(batch_size, 2, 1) - data = [batch1, batch2] - filelist = [] - for i in range(2): - filelist.append("test_pipeline_input_" + str(i)) - for f in filelist: - with open(f, "wb") as fout: - for batch_data in data: - for ins in batch_data: - for slot in ins: - binary_print(slot, fout) - - dataset = fluid.DatasetFactory().create_dataset( - "FileInstantDataset") - dataset.set_use_var([x, y]) - dataset.set_batch_size(batch_size) - dataset.set_filelist(filelist) - - for epoch in range(1): - exe.train_from_dataset( - fluid.default_main_program(), - dataset, - thread=1, - debug=True, - fetch_list=[], - fetch_info=[], - print_period=1) - - for f in filelist: - os.remove(f) - if os.path.isdir("dump_log"): - shutil.rmtree("dump_log") - - def test_pipeline(self): - self.single_section(True) - self.single_section(False) + dict(), num_microbatches=2) if __name__ == '__main__': -- GitLab