diff --git a/paddle/fluid/API.spec b/paddle/fluid/API.spec index 5f90d43ca387ed70c0fb9fce19bc84e084ee24c2..6119e3050b54cab6089b559171d3ecff1b95a76c 100644 --- a/paddle/fluid/API.spec +++ b/paddle/fluid/API.spec @@ -55,9 +55,10 @@ paddle.fluid.Inferencer.__init__ ArgSpec(args=['self', 'infer_func', 'param_path paddle.fluid.Inferencer.infer ArgSpec(args=['self', 'inputs', 'return_numpy'], varargs=None, keywords=None, defaults=(True,)) paddle.fluid.DistributeTranspiler.__init__ ArgSpec(args=['self', 'config'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.DistributeTranspiler.get_pserver_program ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None) -paddle.fluid.DistributeTranspiler.get_startup_program ArgSpec(args=['self', 'endpoint', 'pserver_program', 'startup_program'], varargs=None, keywords=None, defaults=(None,)) +paddle.fluid.DistributeTranspiler.get_pserver_programs ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None) +paddle.fluid.DistributeTranspiler.get_startup_program ArgSpec(args=['self', 'endpoint', 'pserver_program', 'startup_program'], varargs=None, keywords=None, defaults=(None, None)) paddle.fluid.DistributeTranspiler.get_trainer_program ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None) -paddle.fluid.DistributeTranspiler.transpile ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True)) +paddle.fluid.DistributeTranspiler.transpile ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode', 'startup_program'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True, None)) paddle.fluid.InferenceTranspiler.__init__ paddle.fluid.InferenceTranspiler.transpile ArgSpec(args=['self', 'program', 'place', 'scope'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.memory_optimize ArgSpec(args=['input_program', 'skip_opt_set', 'print_log', 'level'], varargs=None, keywords=None, defaults=(None, False, 0)) @@ -329,9 +330,10 @@ paddle.fluid.contrib.BeamSearchDecoder.update_array ArgSpec(args=['self', 'array paddle.fluid.contrib.memory_usage ArgSpec(args=['program', 'batch_size'], varargs=None, keywords=None, defaults=None) paddle.fluid.transpiler.DistributeTranspiler.__init__ ArgSpec(args=['self', 'config'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.transpiler.DistributeTranspiler.get_pserver_program ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None) -paddle.fluid.transpiler.DistributeTranspiler.get_startup_program ArgSpec(args=['self', 'endpoint', 'pserver_program', 'startup_program'], varargs=None, keywords=None, defaults=(None,)) +paddle.fluid.transpiler.DistributeTranspiler.get_pserver_programs ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None) +paddle.fluid.transpiler.DistributeTranspiler.get_startup_program ArgSpec(args=['self', 'endpoint', 'pserver_program', 'startup_program'], varargs=None, keywords=None, defaults=(None, None)) paddle.fluid.transpiler.DistributeTranspiler.get_trainer_program ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None) -paddle.fluid.transpiler.DistributeTranspiler.transpile ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True)) +paddle.fluid.transpiler.DistributeTranspiler.transpile ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode', 'startup_program'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True, None)) paddle.fluid.transpiler.InferenceTranspiler.__init__ paddle.fluid.transpiler.InferenceTranspiler.transpile ArgSpec(args=['self', 'program', 'place', 'scope'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.transpiler.memory_optimize ArgSpec(args=['input_program', 'skip_opt_set', 'print_log', 'level'], varargs=None, keywords=None, defaults=(None, False, 0)) diff --git a/paddle/fluid/framework/details/multi_devices_graph_pass.cc b/paddle/fluid/framework/details/multi_devices_graph_pass.cc index 02e89d18a8dcdd90372e65645ee30c607ae50727..0bfff745493d069e948e6d277ec2bbfb0673a70b 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_pass.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_pass.cc @@ -736,7 +736,7 @@ void MultiDevSSAGraphBuilder::CreateDistTrainOp(ir::Graph *result, .emplace(varname, op_dev_id); } } else { - PADDLE_ENFORCE( + PADDLE_THROW( "the distribute training related op should be in [split_byref, " "concat]."); } @@ -746,17 +746,26 @@ void MultiDevSSAGraphBuilder::CreateDistTrainOp(ir::Graph *result, node->Op()->Type()); CreateComputationalOp(result, node, op_dev_id); - if (node->Op()->Type() == "concat") { - ConnectOp(result, result->Get(kGraphOps).back().get(), - "fetch_barrier"); +} + +void SetOpInputsAllPlaces(ir::Graph *result, ir::Node *node, int num_places) { + auto *op_handle = result->Get(kGraphOps).back().get(); + for (ir::Node *input : node->inputs) { + VarHandle *var = nullptr; + for (int place_offset = 0; place_offset < num_places; ++place_offset) { + auto &var_holders = result->Get(kGraphVars)[place_offset]; + auto &var_holder = var_holders[input->Name()]; + if (!var_holder.empty()) { + var = var_holder.rbegin()->get(); + op_handle->AddInput(var); + } + } } } // Create RPC related op handles that connects its in ops and out ops. void MultiDevSSAGraphBuilder::CreateRPCOp(ir::Graph *result, ir::Node *node) const { - // FIXME(typhoonzero): Cleanup this deps for both sync mode and async mode - // put them into transpiler. int op_dev_id = -1; if (node->Op()->Type() == "send") { // TODO(paddle-dev): getting the first var is not safe. @@ -791,8 +800,6 @@ void MultiDevSSAGraphBuilder::CreateRPCOp(ir::Graph *result, } auto recv_param_grad = boost::get>( node->Op()->GetAttr(OpProtoAndCheckerMaker::OpRoleVarAttrName())); - // FIXME(typhoonzero): assume each recv op output one param - // Use the same place as send. if (recv_param_grad.size() == 2U) { op_dev_id = GetVarDeviceID(*result, recv_param_grad[1]); VLOG(10) << "recv param " << recv_param_grad[0] @@ -806,34 +813,44 @@ void MultiDevSSAGraphBuilder::CreateRPCOp(ir::Graph *result, .emplace(varname, op_dev_id); } } else { - // send_barrier and fetch_barrier op can be scheduled on device 0 + // send_barrier, fetch_barrier will run on place 0; op_dev_id = 0; } PADDLE_ENFORCE(op_dev_id != -1, "can not find the right place for rpc op: %s", node->Op()->Type()); - result->Get(kGraphOps).emplace_back(new RPCOpHandle( result->CreateOpNode(node->Op()), *node->Op(), local_scopes_[op_dev_id], node->Op()->Type(), places_[op_dev_id])); - // TODO(panyx0718): This might not be needed anymore. - if (node->Op()->Type() == "send_barrier") { - ConnectOp(result, result->Get(kGraphOps).back().get(), "send"); - } else if (node->Op()->Type() == "recv") { - ConnectOp(result, result->Get(kGraphOps).back().get(), - "send_barrier"); - } else if (node->Op()->Type() == "fetch_barrier") { - ConnectOp(result, result->Get(kGraphOps).back().get(), "recv"); - } else if (node->Op()->Type() == "send") { - // do nothing + if (node->Op()->Type() == "send") { + CreateOpHandleIOs(result, node, op_dev_id); } else { - PADDLE_THROW( - "rpc op should be in [" - "send, send_barrier. recv, fetch_barrier]"); - } + // send_barrier, recv, fetch_barrier's inputs are deps var, get them from + // all places + auto p = places_[op_dev_id]; + auto *op_handle = result->Get(kGraphOps).back().get(); + op_handle->SetDeviceContext(p, + platform::DeviceContextPool::Instance().Get(p)); - CreateOpHandleIOs(result, node, op_dev_id); + SetOpInputsAllPlaces(result, node, places_.size()); + for (ir::Node *output : node->outputs) { + int outvar_dev_id = op_dev_id; + if (node->Op()->Type() == "fetch_barrier") { + outvar_dev_id = GetVarDeviceID(*result, output->Name()); + PADDLE_ENFORCE_NE(outvar_dev_id, -1); + } + p = places_[outvar_dev_id]; + ir::Node *new_node = nullptr; + if (output->Var()) { + new_node = result->CreateVarNode(output->Var()); + } else { + new_node = + result->CreateEmptyNode(output->Name(), ir::Node::Type::kVariable); + } + CreateOpOutput(result, op_handle, new_node, p, outvar_dev_id); + } + } } bool MultiDevSSAGraphBuilder::IsScaleLossOp(ir::Node *node) const { diff --git a/paddle/fluid/framework/ir/graph.cc b/paddle/fluid/framework/ir/graph.cc index 2a6bf4ac230df81b38751000bf4b663f24984db3..39b0f2f038380b4631728e28031511205c4b40f2 100644 --- a/paddle/fluid/framework/ir/graph.cc +++ b/paddle/fluid/framework/ir/graph.cc @@ -132,63 +132,6 @@ Graph::Graph(const ProgramDesc &program) : program_(program) { } } - std::vector send_ops; - ir::Node *send_bar = nullptr; - std::vector recv_ops; - ir::Node *fetch_bar = nullptr; - for (ir::Node *node : Nodes()) { - if (node->Name() == "send") { - send_ops.push_back(node); - } else if (node->Name() == "send_barrier") { - PADDLE_ENFORCE(!send_bar, "only has one send barrier"); - send_bar = node; - } else if (node->Name() == "recv") { - recv_ops.push_back(node); - } else if (node->Name() == "fetch_barrier") { - PADDLE_ENFORCE(!fetch_bar, "only has one fetch barrier"); - fetch_bar = node; - } - } - if (send_bar) { - for (ir::Node *send : send_ops) { - ir::Node *dep_var = CreateControlDepVar(); - send->outputs.push_back(dep_var); - dep_var->inputs.push_back(send); - send_bar->inputs.push_back(dep_var); - dep_var->outputs.push_back(send_bar); - } - for (ir::Node *recv : recv_ops) { - ir::Node *dep_var = CreateControlDepVar(); - recv->inputs.push_back(dep_var); - dep_var->outputs.push_back(recv); - send_bar->outputs.push_back(dep_var); - dep_var->inputs.push_back(send_bar); - } - } - if (fetch_bar) { - for (ir::Node *recv : recv_ops) { - ir::Node *dep_var = CreateControlDepVar(); - recv->outputs.push_back(dep_var); - dep_var->inputs.push_back(recv); - fetch_bar->inputs.push_back(dep_var); - dep_var->outputs.push_back(fetch_bar); - } - } - - std::vector send_vars = FindDistTrainSendVars(send_ops); - std::vector recv_vars = FindDistTrainRecvVars(recv_ops); - for (ir::Node *node : Nodes()) { - if (IsDistTrainOp(node, send_vars, recv_vars)) { - if (fetch_bar && node->Name() == "concat") { - ir::Node *dep_var = CreateControlDepVar(); - fetch_bar->outputs.push_back(dep_var); - dep_var->inputs.push_back(fetch_bar); - node->inputs.push_back(dep_var); - dep_var->outputs.push_back(node); - } - } - } - /** * We should handle write after read(WAR) and write after write(WAW) here. * Because some of the operators of the program can be executed parallelly. diff --git a/paddle/fluid/operators/fetch_barrier_op.cc b/paddle/fluid/operators/fetch_barrier_op.cc index d9cd956dfdff3d009d38ee5088f5396080580483..9d7ac7ab6194593747548fac3cefc8d4ed3058d8 100644 --- a/paddle/fluid/operators/fetch_barrier_op.cc +++ b/paddle/fluid/operators/fetch_barrier_op.cc @@ -52,6 +52,8 @@ class FetchBarrierOp : public framework::OperatorBase { class FetchBarrierOpMaker : public framework::OpProtoAndCheckerMaker { public: void Make() { + AddOutput("Out", "(Any) Dummy outputs, used for control dependency") + .AsDuplicable(); AddComment(R"DOC( SendBarrier operator diff --git a/paddle/fluid/operators/send_barrier_op.cc b/paddle/fluid/operators/send_barrier_op.cc index 14b07649c416ff1b671fc9b5ee4eb956b44570c5..40404295266899c6ac2f7b1e08fdf7db40958794 100644 --- a/paddle/fluid/operators/send_barrier_op.cc +++ b/paddle/fluid/operators/send_barrier_op.cc @@ -56,6 +56,10 @@ class SendBarrierOp : public framework::OperatorBase { class SendBarrierOpMaker : public framework::OpProtoAndCheckerMaker { public: void Make() { + AddInput("X", "(Any) Dummy inputs, used for control dependency") + .AsDuplicable(); + AddOutput("Out", "(Any) Dummy outputs, used for control dependency") + .AsDuplicable(); AddComment(R"DOC( SendBarrier operator diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index b03ee514f50f9a8c1425bd5b1d409b58ed62351a..0cf7aaef4ab75ca6976465d1b404004a9f2f64c5 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -246,7 +246,11 @@ def Send(endpoints, send_vars, dummy_output=None, sync=True): rpc_op_role_name: core.op_proto_and_checker_maker.OpRole.RPC }) if sync: - helper.append_op(type="send_barrier", attrs={"endpoints": endpoints}) + helper.append_op( + type="send_barrier", + inputs={"X": dummy_output}, + outputs={"Out": []}, + attrs={"endpoints": endpoints}) def Recv(endpoints, get_vars, dummy_input=None, sync=True): @@ -282,7 +286,10 @@ def Recv(endpoints, get_vars, dummy_input=None, sync=True): attrs={"endpoints": endpoints, "epmap": epmap}) if sync: - helper.append_op(type="fetch_barrier", attrs={"endpoints": endpoints}) + helper.append_op( + type="fetch_barrier", + outputs={"Out": get_vars}, + attrs={"endpoints": endpoints}) return get_vars diff --git a/python/paddle/fluid/tests/unittests/dist_se_resnext.py b/python/paddle/fluid/tests/unittests/dist_se_resnext.py index b0ee6ff9f5941b090e663dac0122cfb575f2f442..5440a8c7fe7a5411c17bb6a9ea9f4e50c85d6796 100644 --- a/python/paddle/fluid/tests/unittests/dist_se_resnext.py +++ b/python/paddle/fluid/tests/unittests/dist_se_resnext.py @@ -130,7 +130,12 @@ class SE_ResNeXt(): input=conv, pool_size=7, pool_type='avg', global_pooling=True) drop = fluid.layers.dropout(x=pool, dropout_prob=0.2) stdv = 1.0 / math.sqrt(drop.shape[1] * 1.0) - out = fluid.layers.fc(input=drop, size=class_dim, act='softmax') + out = fluid.layers.fc( + input=drop, + size=class_dim, + act='softmax', + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Constant(value=0.05))) return out def shortcut(self, input, ch_out, stride): @@ -180,7 +185,7 @@ class SE_ResNeXt(): act=None, # avoid pserver CPU init differs from GPU param_attr=fluid.ParamAttr( - initializer=fluid.initializer.Constant()), + initializer=fluid.initializer.Constant(value=0.05)), bias_attr=False) return fluid.layers.batch_norm(input=conv, act=act) @@ -188,13 +193,19 @@ class SE_ResNeXt(): pool = fluid.layers.pool2d( input=input, pool_size=0, pool_type='avg', global_pooling=True) stdv = 1.0 / math.sqrt(pool.shape[1] * 1.0) - squeeze = fluid.layers.fc(input=pool, - size=num_channels // reduction_ratio, - act='relu') + squeeze = fluid.layers.fc( + input=pool, + size=num_channels // reduction_ratio, + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Constant(value=0.05)), + act='relu') stdv = 1.0 / math.sqrt(squeeze.shape[1] * 1.0) - excitation = fluid.layers.fc(input=squeeze, - size=num_channels, - act='sigmoid') + excitation = fluid.layers.fc( + input=squeeze, + size=num_channels, + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Constant(value=0.05)), + act='sigmoid') scale = fluid.layers.elementwise_mul(x=input, y=excitation, axis=0) return scale diff --git a/python/paddle/fluid/tests/unittests/dist_transformer.py b/python/paddle/fluid/tests/unittests/dist_transformer.py index ab4c5c3f368333ac42781aebd495579b5c26f388..7abfa0a4be0dec9fe251704e22dfef1f932e7c5b 100644 --- a/python/paddle/fluid/tests/unittests/dist_transformer.py +++ b/python/paddle/fluid/tests/unittests/dist_transformer.py @@ -18,54 +18,129 @@ import numpy as np import argparse import time import math +import os +import sys +import six +import argparse +import ast +import multiprocessing +import time +from functools import partial +from os.path import expanduser +import glob +import random +import tarfile import paddle import paddle.fluid as fluid +import paddle.fluid.layers as layers from paddle.fluid import core -import os -import sys -import six -import transformer_model -import paddle.dataset.wmt16 as wmt16 +from test_dist_base import TestDistRunnerBase, runtime_main +from paddle.compat import long_type + +import hashlib + +from paddle.fluid.transpiler.details import program_to_code + +const_para_attr = fluid.ParamAttr(initializer=fluid.initializer.Constant(0.001)) +const_bias_attr = const_para_attr # Fix seed for test fluid.default_startup_program().random_seed = 1 fluid.default_main_program().random_seed = 1 -WMT16_RECORDIO_FILE = "/tmp/wmt16.recordio" +#from transformer_config import ModelHyperParams, TrainTaskConfig, merge_cfg_from_list +class TrainTaskConfig(object): + # only support GPU currently + use_gpu = True + # the epoch number to train. + pass_num = 1 + # the number of sequences contained in a mini-batch. + # deprecated, set batch_size in args. + batch_size = 20 + # the hyper parameters for Adam optimizer. + # This static learning_rate will be multiplied to the LearningRateScheduler + # derived learning rate the to get the final learning rate. + learning_rate = 1 + beta1 = 0.9 + beta2 = 0.98 + eps = 1e-9 + # the parameters for learning rate scheduling. + warmup_steps = 4000 + # the weight used to mix up the ground-truth distribution and the fixed + # uniform distribution in label smoothing when training. + # Set this as zero if label smoothing is not wanted. + label_smooth_eps = 0.1 + # the directory for saving trained models. + model_dir = "trained_models" + # the directory for saving checkpoints. + ckpt_dir = "trained_ckpts" + # the directory for loading checkpoint. + # If provided, continue training from the checkpoint. + ckpt_path = None + # the parameter to initialize the learning rate scheduler. + # It should be provided if use checkpoints, since the checkpoint doesn't + # include the training step counter currently. + start_step = 0 -class ModelHyperParams(object): - # Dictionary size for source and target language. This model directly uses - # paddle.dataset.wmt16 in which , and token has - # alreay been added, but the token is not added. Transformer requires - # sequences in a mini-batch are padded to have the same length. A token is - # added into the original dictionary in paddle.dateset.wmt16. + check_acc = True - # size of source word dictionary. - src_vocab_size = 10000 - # index for token in source language. - src_pad_idx = src_vocab_size + data_path = expanduser("~") + ( + "/.cache/paddle/dataset/test_dist_transformer/") + src_vocab_fpath = data_path + "vocab.bpe.32000" + trg_vocab_fpath = data_path + "vocab.bpe.32000" + train_file_pattern = data_path + "train.tok.clean.bpe.32000.en-de" + val_file_pattern = data_path + "newstest2013.tok.bpe.32000.en-de" + pool_size = 2000 + sort_type = None + local = True + shuffle = False + shuffle_batch = False + special_token = ['', '', ''] + token_delimiter = ' ' + use_token_batch = False - # size of target word dictionay - trg_vocab_size = 10000 - # index for token in target language. - trg_pad_idx = trg_vocab_size - # position value corresponding to the token. - pos_pad_idx = 0 +class InferTaskConfig(object): + use_gpu = True + # the number of examples in one run for sequence generation. + batch_size = 10 + # the parameters for beam search. + beam_size = 5 + max_out_len = 256 + # the number of decoded sentences to output. + n_best = 1 + # the flags indicating whether to output the special tokens. + output_bos = False + output_eos = False + output_unk = True + # the directory for loading the trained model. + model_path = "trained_models/pass_1.infer.model" - # max length of sequences. It should plus 1 to include position - # padding token for position encoding. - max_length = 50 +class ModelHyperParams(object): + # These following five vocabularies related configurations will be set + # automatically according to the passed vocabulary path and special tokens. + # size of source word dictionary. + src_vocab_size = 10000 + # size of target word dictionay + trg_vocab_size = 10000 + # index for token + bos_idx = 0 + # index for token + eos_idx = 1 + # index for token + unk_idx = 2 + # max length of sequences deciding the size of position encoding table. + # Start from 1 and count start and end tokens in. + max_length = 256 # the dimension for word embeddings, which is also the last dimension of # the input and output of multi-head attention, position-wise feed-forward # networks, encoder and decoder. - d_model = 512 # size of the hidden layer in position-wise feed-forward networks. - d_inner_hid = 1024 + d_inner_hid = 2048 # the dimension that keys are projected to for dot-product attention. d_key = 64 # the dimension that values are projected to for dot-product attention. @@ -75,210 +150,1577 @@ class ModelHyperParams(object): # number of sub-layers to be stacked in the encoder and decoder. n_layer = 6 # dropout rate used by all dropout layers. - dropout = 0.1 + dropout = 0.0 # no random + # random seed used in dropout for CE. + dropout_seed = None + # the flag indicating whether to share embedding and softmax weights. + # vocabularies in source and target should be same for weight sharing. + weight_sharing = True + + +def merge_cfg_from_list(cfg_list, g_cfgs): + """ + Set the above global configurations using the cfg_list. + """ + assert len(cfg_list) % 2 == 0 + for key, value in zip(cfg_list[0::2], cfg_list[1::2]): + for g_cfg in g_cfgs: + if hasattr(g_cfg, key): + try: + value = eval(value) + except Exception: # for file path + pass + setattr(g_cfg, key, value) + break + + +# The placeholder for batch_size in compile time. Must be -1 currently to be +# consistent with some ops' infer-shape output in compile time, such as the +# sequence_expand op used in beamsearch decoder. +batch_size = -1 +# The placeholder for squence length in compile time. +seq_len = ModelHyperParams.max_length +# Here list the data shapes and data types of all inputs. +# The shapes here act as placeholder and are set to pass the infer-shape in +# compile time. +input_descs = { + # The actual data shape of src_word is: + # [batch_size * max_src_len_in_batch, 1] + "src_word": [(batch_size, seq_len, long_type(1)), "int64", 2], + # The actual data shape of src_pos is: + # [batch_size * max_src_len_in_batch, 1] + "src_pos": [(batch_size, seq_len, long_type(1)), "int64"], + # This input is used to remove attention weights on paddings in the + # encoder. + # The actual data shape of src_slf_attn_bias is: + # [batch_size, n_head, max_src_len_in_batch, max_src_len_in_batch] + "src_slf_attn_bias": [(batch_size, ModelHyperParams.n_head, seq_len, + seq_len), "float32"], + # The actual data shape of trg_word is: + # [batch_size * max_trg_len_in_batch, 1] + "trg_word": [(batch_size, seq_len, long_type(1)), "int64", + 2], # lod_level is only used in fast decoder. + # The actual data shape of trg_pos is: + # [batch_size * max_trg_len_in_batch, 1] + "trg_pos": [(batch_size, seq_len, long_type(1)), "int64"], + # This input is used to remove attention weights on paddings and + # subsequent words in the decoder. + # The actual data shape of trg_slf_attn_bias is: + # [batch_size, n_head, max_trg_len_in_batch, max_trg_len_in_batch] + "trg_slf_attn_bias": [(batch_size, ModelHyperParams.n_head, seq_len, + seq_len), "float32"], + # This input is used to remove attention weights on paddings of the source + # input in the encoder-decoder attention. + # The actual data shape of trg_src_attn_bias is: + # [batch_size, n_head, max_trg_len_in_batch, max_src_len_in_batch] + "trg_src_attn_bias": [(batch_size, ModelHyperParams.n_head, seq_len, + seq_len), "float32"], + # This input is used in independent decoder program for inference. + # The actual data shape of enc_output is: + # [batch_size, max_src_len_in_batch, d_model] + "enc_output": [(batch_size, seq_len, ModelHyperParams.d_model), "float32"], + # The actual data shape of label_word is: + # [batch_size * max_trg_len_in_batch, 1] + "lbl_word": [(batch_size * seq_len, long_type(1)), "int64"], + # This input is used to mask out the loss of paddding tokens. + # The actual data shape of label_weight is: + # [batch_size * max_trg_len_in_batch, 1] + "lbl_weight": [(batch_size * seq_len, long_type(1)), "float32"], + # These inputs are used to change the shape tensor in beam-search decoder. + "trg_slf_attn_pre_softmax_shape_delta": [(long_type(2), ), "int32"], + "trg_slf_attn_post_softmax_shape_delta": [(long_type(4), ), "int32"], + "init_score": [(batch_size, long_type(1)), "float32"], +} + +# Names of word embedding table which might be reused for weight sharing. +word_emb_param_names = ( + "src_word_emb_table", + "trg_word_emb_table", ) +# Names of position encoding table which will be initialized externally. +pos_enc_param_names = ( + "src_pos_enc_table", + "trg_pos_enc_table", ) +# separated inputs for different usages. +encoder_data_input_fields = ( + "src_word", + "src_pos", + "src_slf_attn_bias", ) +decoder_data_input_fields = ( + "trg_word", + "trg_pos", + "trg_slf_attn_bias", + "trg_src_attn_bias", + "enc_output", ) +label_data_input_fields = ( + "lbl_word", + "lbl_weight", ) +# In fast decoder, trg_pos (only containing the current time step) is generated +# by ops and trg_slf_attn_bias is not needed. +fast_decoder_data_input_fields = ( + "trg_word", + "init_score", + "trg_src_attn_bias", ) + +# fast_decoder_util_input_fields = ( +# "trg_slf_attn_pre_softmax_shape_delta", +# "trg_slf_attn_post_softmax_shape_delta", ) + +#from optim import LearningRateScheduler +class LearningRateScheduler(object): + """ + Wrapper for learning rate scheduling as described in the Transformer paper. + LearningRateScheduler adapts the learning rate externally and the adapted + learning rate will be feeded into the main_program as input data. + """ + + def __init__(self, + d_model, + warmup_steps, + learning_rate=0.001, + current_steps=0, + name="learning_rate"): + self.current_steps = current_steps + self.warmup_steps = warmup_steps + self.d_model = d_model + self.static_lr = learning_rate + self.learning_rate = layers.create_global_var( + name=name, + shape=[1], + value=float(learning_rate), + dtype="float32", + persistable=True) + + def update_learning_rate(self): + self.current_steps += 1 + lr_value = np.power(self.d_model, -0.5) * np.min([ + np.power(self.current_steps, -0.5), + np.power(self.warmup_steps, -1.5) * self.current_steps + ]) * self.static_lr + return np.array([lr_value], dtype="float32") -def prepare_batch_input(insts, src_pad_idx, trg_pad_idx, n_head): + +#from transformer_train import train_loop +def pad_batch_data(insts, + pad_idx, + n_head, + is_target=False, + is_label=False, + return_attn_bias=True, + return_max_len=True, + return_num_token=False): """ Pad the instances to the max sequence length in batch, and generate the - corresponding position data and attention bias. Then, convert the numpy - data to tensors and return a dict mapping names to tensors. + corresponding position data and attention bias. """ + return_list = [] + max_len = max(len(inst) for inst in insts) + num_token = reduce(lambda x, y: x + y, + [len(inst) for inst in insts]) if return_num_token else 0 + # Any token included in dict can be used to pad, since the paddings' loss + # will be masked out by weights and make no effect on parameter gradients. + inst_data = np.array( + [inst + [pad_idx] * (max_len - len(inst)) for inst in insts]) + return_list += [inst_data.astype("int64").reshape([-1, 1])] + if is_label: # label weight + inst_weight = np.array( + [[1.] * len(inst) + [0.] * (max_len - len(inst)) for inst in insts]) + return_list += [inst_weight.astype("float32").reshape([-1, 1])] + else: # position data + inst_pos = np.array([ + range(1, len(inst) + 1) + [0] * (max_len - len(inst)) + for inst in insts + ]) + return_list += [inst_pos.astype("int64").reshape([-1, 1])] + if return_attn_bias: + if is_target: + # This is used to avoid attention on paddings and subsequent + # words. + slf_attn_bias_data = np.ones((inst_data.shape[0], max_len, max_len)) + slf_attn_bias_data = np.triu(slf_attn_bias_data, + 1).reshape([-1, 1, max_len, max_len]) + slf_attn_bias_data = np.tile(slf_attn_bias_data, + [1, n_head, 1, 1]) * [-1e9] + else: + # This is used to avoid attention on paddings. + slf_attn_bias_data = np.array([[0] * len(inst) + [-1e9] * + (max_len - len(inst)) + for inst in insts]) + slf_attn_bias_data = np.tile( + slf_attn_bias_data.reshape([-1, 1, 1, max_len]), + [1, n_head, max_len, 1]) + return_list += [slf_attn_bias_data.astype("float32")] + if return_max_len: + return_list += [max_len] + if return_num_token: + return_list += [num_token] + return return_list if len(return_list) > 1 else return_list[0] + + +def prepare_batch_input(insts, data_input_names, src_pad_idx, trg_pad_idx, + n_head, d_model): + """ + Put all padded data needed by training into a dict. + """ + src_word, src_pos, src_slf_attn_bias, src_max_len = pad_batch_data( + [inst[0] for inst in insts], src_pad_idx, n_head, is_target=False) + src_word = src_word.reshape(-1, src_max_len, 1) + src_pos = src_pos.reshape(-1, src_max_len, 1) + trg_word, trg_pos, trg_slf_attn_bias, trg_max_len = pad_batch_data( + [inst[1] for inst in insts], trg_pad_idx, n_head, is_target=True) + trg_word = trg_word.reshape(-1, trg_max_len, 1) + trg_pos = trg_pos.reshape(-1, trg_max_len, 1) - def __pad_batch_data(insts, - pad_idx, - is_target=False, - return_pos=True, - return_attn_bias=True, - return_max_len=True): - """ - Pad the instances to the max sequence length in batch, and generate the - corresponding position data and attention bias. - """ - return_list = [] - max_len = max(len(inst) for inst in insts) - inst_data = np.array( - [inst + [pad_idx] * (max_len - len(inst)) for inst in insts]) - return_list += [inst_data.astype("int64").reshape([-1, 1])] - if return_pos: - inst_pos = np.array([[ - pos_i + 1 if w_i != pad_idx else 0 - for pos_i, w_i in enumerate(inst) - ] for inst in inst_data]) - - return_list += [inst_pos.astype("int64").reshape([-1, 1])] - if return_attn_bias: - if is_target: - # This is used to avoid attention on paddings and subsequent - # words. - slf_attn_bias_data = np.ones((inst_data.shape[0], max_len, - max_len)) - slf_attn_bias_data = np.triu(slf_attn_bias_data, 1).reshape( - [-1, 1, max_len, max_len]) - slf_attn_bias_data = np.tile(slf_attn_bias_data, - [1, n_head, 1, 1]) * [-1e9] - else: - # This is used to avoid attention on paddings. - slf_attn_bias_data = np.array([[0] * len(inst) + [-1e9] * - (max_len - len(inst)) - for inst in insts]) - slf_attn_bias_data = np.tile( - slf_attn_bias_data.reshape([-1, 1, 1, max_len]), - [1, n_head, max_len, 1]) - return_list += [slf_attn_bias_data.astype("float32")] - if return_max_len: - return_list += [max_len] - return return_list if len(return_list) > 1 else return_list[0] - - src_word, src_pos, src_slf_attn_bias, src_max_len = __pad_batch_data( - [inst[0] for inst in insts], src_pad_idx, is_target=False) - trg_word, trg_pos, trg_slf_attn_bias, trg_max_len = __pad_batch_data( - [inst[1] for inst in insts], trg_pad_idx, is_target=True) trg_src_attn_bias = np.tile(src_slf_attn_bias[:, :, ::src_max_len, :], [1, 1, trg_max_len, 1]).astype("float32") - lbl_word = __pad_batch_data([inst[2] for inst in insts], trg_pad_idx, False, - False, False, False) - lbl_weight = (lbl_word != trg_pad_idx).astype("float32").reshape([-1, 1]) + lbl_word, lbl_weight, num_token = pad_batch_data( + [inst[2] for inst in insts], + trg_pad_idx, + n_head, + is_target=False, + is_label=True, + return_attn_bias=False, + return_max_len=False, + return_num_token=True) + + data_input_dict = dict( + zip(data_input_names, [ + src_word, src_pos, src_slf_attn_bias, trg_word, trg_pos, + trg_slf_attn_bias, trg_src_attn_bias, lbl_word, lbl_weight + ])) + return data_input_dict, np.asarray([num_token], dtype="float32") + + +def read_multiple(reader, count, clip_last=True): + """ + Stack data from reader for multi-devices. + """ + + def __impl__(): + res = [] + for item in reader(): + res.append(item) + if len(res) == count: + yield res + res = [] + if len(res) == count: + yield res + elif not clip_last: + data = [] + for item in res: + data += item + if len(data) > count: + inst_num_per_part = len(data) // count + yield [ + data[inst_num_per_part * i:inst_num_per_part * (i + 1)] + for i in range(count) + ] + + return __impl__ + + +def split_data(data, num_part): + """ + Split data for each device. + """ + if len(data) == num_part: + return data + data = data[0] + inst_num_per_part = len(data) // num_part return [ - src_word, src_pos, trg_word, trg_pos, src_slf_attn_bias, - trg_slf_attn_bias, trg_src_attn_bias, lbl_word, lbl_weight + data[inst_num_per_part * i:inst_num_per_part * (i + 1)] + for i in range(num_part) ] -def transformer(use_feed): - assert not use_feed, "transfomer doesn't support feed yet" - return transformer_model.transformer( - ModelHyperParams.src_vocab_size + 1, - ModelHyperParams.trg_vocab_size + 1, ModelHyperParams.max_length + 1, - ModelHyperParams.n_layer, ModelHyperParams.n_head, - ModelHyperParams.d_key, ModelHyperParams.d_value, - ModelHyperParams.d_model, ModelHyperParams.d_inner_hid, - ModelHyperParams.dropout, ModelHyperParams.src_pad_idx, - ModelHyperParams.trg_pad_idx, ModelHyperParams.pos_pad_idx) - - -def get_model(): - avg_cost = transformer(use_feed=False) - optimizer = fluid.optimizer.Adam() - optimizer.minimize(avg_cost) - fluid.memory_optimize(fluid.default_main_program()) - return avg_cost - - -def get_transpiler(trainer_id, main_program, pserver_endpoints, trainers): - t = fluid.DistributeTranspiler() - t.transpile( - trainer_id=trainer_id, - program=main_program, - pservers=pserver_endpoints, - trainers=trainers) - return t - - -class DistTransformer2x2(object): - def run_pserver(self, pserver_endpoints, trainers, current_endpoint, - trainer_id): - get_model() - t = get_transpiler(trainer_id, - fluid.default_main_program(), pserver_endpoints, - trainers) - pserver_prog = t.get_pserver_program(current_endpoint) - startup_prog = t.get_startup_program(current_endpoint, pserver_prog) +def test_context(train_progm, avg_cost, train_exe, dev_count, data_input_names, + sum_cost, token_num): + # Context to do validation. + test_program = train_progm.clone() + with fluid.program_guard(test_program): + test_program = fluid.io.get_inference_program([avg_cost]) + + val_data = DataReader( + src_vocab_fpath=TrainTaskConfig.src_vocab_fpath, + trg_vocab_fpath=TrainTaskConfig.trg_vocab_fpath, + fpattern=TrainTaskConfig.val_file_pattern, + token_delimiter=TrainTaskConfig.token_delimiter, + use_token_batch=TrainTaskConfig.use_token_batch, + batch_size=TrainTaskConfig.batch_size * + (1 if TrainTaskConfig.use_token_batch else dev_count), + pool_size=TrainTaskConfig.pool_size, + sort_type=TrainTaskConfig.sort_type, + start_mark=TrainTaskConfig.special_token[0], + end_mark=TrainTaskConfig.special_token[1], + unk_mark=TrainTaskConfig.special_token[2], + # count start and end tokens out + max_length=ModelHyperParams.max_length - 2, + clip_last_batch=False, + shuffle=False, + shuffle_batch=False) + + build_strategy = fluid.BuildStrategy() + + strategy = fluid.ExecutionStrategy() + strategy.num_threads = 1 + + test_exe = fluid.ParallelExecutor( + use_cuda=TrainTaskConfig.use_gpu, + main_program=test_program, + share_vars_from=train_exe, + build_strategy=build_strategy, + exec_strategy=strategy) + + def test(exe=test_exe): + test_total_cost = 0 + test_total_token = 0 + test_data = read_multiple( + reader=val_data.batch_generator, + count=dev_count if TrainTaskConfig.use_token_batch else 1) + for batch_id, data in enumerate(test_data()): + feed_list = [] + for place_id, data_buffer in enumerate( + split_data( + data, num_part=dev_count)): + data_input_dict, _ = prepare_batch_input( + data_buffer, data_input_names, ModelHyperParams.eos_idx, + ModelHyperParams.eos_idx, ModelHyperParams.n_head, + ModelHyperParams.d_model) + feed_list.append(data_input_dict) + + outs = exe.run(feed=feed_list, + fetch_list=[sum_cost.name, token_num.name]) + sum_cost_val, token_num_val = np.array(outs[0]), np.array(outs[1]) + test_total_cost += sum_cost_val.sum() + test_total_token += token_num_val.sum() + test_avg_cost = test_total_cost / test_total_token + test_ppl = np.exp([min(test_avg_cost, 100)]) + return test_avg_cost, test_ppl + + return test + + +def train_loop(exe, train_progm, dev_count, sum_cost, avg_cost, lr_scheduler, + token_num, predict): + # Initialize the parameters. + if TrainTaskConfig.ckpt_path: + lr_scheduler.current_steps = TrainTaskConfig.start_step + else: + exe.run(fluid.framework.default_startup_program()) + + train_data = DataReader( + src_vocab_fpath=TrainTaskConfig.src_vocab_fpath, + trg_vocab_fpath=TrainTaskConfig.trg_vocab_fpath, + fpattern=TrainTaskConfig.train_file_pattern, + token_delimiter=TrainTaskConfig.token_delimiter, + use_token_batch=TrainTaskConfig.use_token_batch, + batch_size=TrainTaskConfig.batch_size * + (1 if TrainTaskConfig.use_token_batch else dev_count), + pool_size=TrainTaskConfig.pool_size, + sort_type=TrainTaskConfig.sort_type, + shuffle=TrainTaskConfig.shuffle, + shuffle_batch=TrainTaskConfig.shuffle_batch, + start_mark=TrainTaskConfig.special_token[0], + end_mark=TrainTaskConfig.special_token[1], + unk_mark=TrainTaskConfig.special_token[2], + # count start and end tokens out + max_length=ModelHyperParams.max_length - 2, + clip_last_batch=False) + train_data = read_multiple( + reader=train_data.batch_generator, + count=dev_count if TrainTaskConfig.use_token_batch else 1) + + build_strategy = fluid.BuildStrategy() + # Since the token number differs among devices, customize gradient scale to + # use token average cost among multi-devices. and the gradient scale is + # `1 / token_number` for average cost. + build_strategy.gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.Customized + + strategy = fluid.ExecutionStrategy() + strategy.num_threads = 1 + + train_exe = fluid.ParallelExecutor( + use_cuda=TrainTaskConfig.use_gpu, + loss_name=sum_cost.name, + main_program=train_progm, + build_strategy=build_strategy, + exec_strategy=strategy) + + data_input_names = encoder_data_input_fields + decoder_data_input_fields[: + -1] + label_data_input_fields + + if TrainTaskConfig.val_file_pattern is not None: + test = test_context(train_progm, avg_cost, train_exe, dev_count, + data_input_names, sum_cost, token_num) + + # the best cross-entropy value with label smoothing + loss_normalizer = -((1. - TrainTaskConfig.label_smooth_eps) * np.log( + (1. - TrainTaskConfig.label_smooth_eps + )) + TrainTaskConfig.label_smooth_eps * + np.log(TrainTaskConfig.label_smooth_eps / ( + ModelHyperParams.trg_vocab_size - 1) + 1e-20)) + init = False + for pass_id in xrange(TrainTaskConfig.pass_num): + pass_start_time = time.time() + for batch_id, data in enumerate(train_data()): + if batch_id >= 5: + break + + feed_list = [] + total_num_token = 0 + + #if TrainTaskConfig.local: + # lr_rate = lr_scheduler.update_learning_rate() + #for place_id, data_buffer in enumerate( + # split_data( + # data, num_part=dev_count)): + + if TrainTaskConfig.local: + lr_rate = lr_scheduler.update_learning_rate() + + for place_id, data_buffer in enumerate( + split_data( + data, num_part=dev_count)): + data_input_dict, num_token = prepare_batch_input( + data_buffer, data_input_names, ModelHyperParams.eos_idx, + ModelHyperParams.eos_idx, ModelHyperParams.n_head, + ModelHyperParams.d_model) + total_num_token += num_token + feed_kv_pairs = data_input_dict.items() + if TrainTaskConfig.local: + feed_kv_pairs += { + lr_scheduler.learning_rate.name: lr_rate + }.items() + feed_list.append(dict(feed_kv_pairs)) + + if not init: + for pos_enc_param_name in pos_enc_param_names: + pos_enc = position_encoding_init( + ModelHyperParams.max_length + 1, + ModelHyperParams.d_model) + feed_list[place_id][pos_enc_param_name] = pos_enc + + if not TrainTaskConfig.check_acc: + for feed_dict in feed_list: + feed_dict[sum_cost.name + "@GRAD"] = 1. / total_num_token + else: + b = 100 * TrainTaskConfig.batch_size + a = np.asarray([b], dtype="float32") + for feed_dict in feed_list: + feed_dict[sum_cost.name + "@GRAD"] = 1. / a + + outs = train_exe.run(fetch_list=[sum_cost.name, token_num.name], + feed=feed_list) + + sum_cost_val, token_num_val = np.array(outs[0]), np.array(outs[1]) + total_sum_cost = sum_cost_val.sum() + total_token_num = token_num_val.sum() + total_avg_cost = total_sum_cost / total_token_num + + init = True + + # Validate and save the model for inference. + if TrainTaskConfig.val_file_pattern is not None: + val_avg_cost, val_ppl = test() + print("[%f]" % val_avg_cost) + else: + assert (False) + + +#import transformer_reader as reader +class SortType(object): + GLOBAL = 'global' + POOL = 'pool' + NONE = "none" + + +class Converter(object): + def __init__(self, vocab, beg, end, unk, delimiter): + self._vocab = vocab + self._beg = beg + self._end = end + self._unk = unk + self._delimiter = delimiter + + def __call__(self, sentence): + return [self._beg] + [ + self._vocab.get(w, self._unk) + for w in sentence.split(self._delimiter) + ] + [self._end] + + +class ComposedConverter(object): + def __init__(self, converters): + self._converters = converters + + def __call__(self, parallel_sentence): + return [ + self._converters[i](parallel_sentence[i]) + for i in range(len(self._converters)) + ] + + +class SentenceBatchCreator(object): + def __init__(self, batch_size): + self.batch = [] + self._batch_size = batch_size + + def append(self, info): + self.batch.append(info) + if len(self.batch) == self._batch_size: + tmp = self.batch + self.batch = [] + return tmp + + +class TokenBatchCreator(object): + def __init__(self, batch_size): + self.batch = [] + self.max_len = -1 + self._batch_size = batch_size + + def append(self, info): + cur_len = info.max_len + max_len = max(self.max_len, cur_len) + if max_len * (len(self.batch) + 1) > self._batch_size: + result = self.batch + self.batch = [info] + self.max_len = cur_len + return result + else: + self.max_len = max_len + self.batch.append(info) + + +class SampleInfo(object): + def __init__(self, i, max_len, min_len): + self.i = i + self.min_len = min_len + self.max_len = max_len + + +class MinMaxFilter(object): + def __init__(self, max_len, min_len, underlying_creator): + self._min_len = min_len + self._max_len = max_len + self._creator = underlying_creator + + def append(self, info): + if info.max_len > self._max_len or info.min_len < self._min_len: + return + else: + return self._creator.append(info) + + @property + def batch(self): + return self._creator.batch + + +class DataReader(object): + """ + The data reader loads all data from files and produces batches of data + in the way corresponding to settings. + + An example of returning a generator producing data batches whose data + is shuffled in each pass and sorted in each pool: + + ``` + train_data = DataReader( + src_vocab_fpath='data/src_vocab_file', + trg_vocab_fpath='data/trg_vocab_file', + fpattern='data/part-*', + use_token_batch=True, + batch_size=2000, + pool_size=10000, + sort_type=SortType.POOL, + shuffle=True, + shuffle_batch=True, + start_mark='', + end_mark='', + unk_mark='', + clip_last_batch=False).batch_generator + ``` + + :param src_vocab_fpath: The path of vocabulary file of source language. + :type src_vocab_fpath: basestring + :param trg_vocab_fpath: The path of vocabulary file of target language. + :type trg_vocab_fpath: basestring + :param fpattern: The pattern to match data files. + :type fpattern: basestring + :param batch_size: The number of sequences contained in a mini-batch. + or the maximum number of tokens (include paddings) contained in a + mini-batch. + :type batch_size: int + :param pool_size: The size of pool buffer. + :type pool_size: int + :param sort_type: The grain to sort by length: 'global' for all + instances; 'pool' for instances in pool; 'none' for no sort. + :type sort_type: basestring + :param clip_last_batch: Whether to clip the last uncompleted batch. + :type clip_last_batch: bool + :param tar_fname: The data file in tar if fpattern matches a tar file. + :type tar_fname: basestring + :param min_length: The minimum length used to filt sequences. + :type min_length: int + :param max_length: The maximum length used to filt sequences. + :type max_length: int + :param shuffle: Whether to shuffle all instances. + :type shuffle: bool + :param shuffle_batch: Whether to shuffle the generated batches. + :type shuffle_batch: bool + :param use_token_batch: Whether to produce batch data according to + token number. + :type use_token_batch: bool + :param field_delimiter: The delimiter used to split source and target in + each line of data file. + :type field_delimiter: basestring + :param token_delimiter: The delimiter used to split tokens in source or + target sentences. + :type token_delimiter: basestring + :param start_mark: The token representing for the beginning of + sentences in dictionary. + :type start_mark: basestring + :param end_mark: The token representing for the end of sentences + in dictionary. + :type end_mark: basestring + :param unk_mark: The token representing for unknown word in dictionary. + :type unk_mark: basestring + :param seed: The seed for random. + :type seed: int + """ + + def __init__(self, + src_vocab_fpath, + trg_vocab_fpath, + fpattern, + batch_size, + pool_size, + sort_type=SortType.GLOBAL, + clip_last_batch=True, + tar_fname=None, + min_length=0, + max_length=100, + shuffle=True, + shuffle_batch=False, + use_token_batch=False, + field_delimiter="\t", + token_delimiter=" ", + start_mark="", + end_mark="", + unk_mark="", + seed=0): + self._src_vocab = self.load_dict(src_vocab_fpath) + self._only_src = True + if trg_vocab_fpath is not None: + self._trg_vocab = self.load_dict(trg_vocab_fpath) + self._only_src = False + self._pool_size = pool_size + self._batch_size = batch_size + self._use_token_batch = use_token_batch + self._sort_type = sort_type + self._clip_last_batch = clip_last_batch + self._shuffle = shuffle + self._shuffle_batch = shuffle_batch + self._min_length = min_length + self._max_length = max_length + self._field_delimiter = field_delimiter + self._token_delimiter = token_delimiter + self.load_src_trg_ids(end_mark, fpattern, start_mark, tar_fname, + unk_mark) + self._random = random.Random(x=seed) + + def load_src_trg_ids(self, end_mark, fpattern, start_mark, tar_fname, + unk_mark): + converters = [ + Converter( + vocab=self._src_vocab, + beg=self._src_vocab[start_mark], + end=self._src_vocab[end_mark], + unk=self._src_vocab[unk_mark], + delimiter=self._token_delimiter) + ] + if not self._only_src: + converters.append( + Converter( + vocab=self._trg_vocab, + beg=self._trg_vocab[start_mark], + end=self._trg_vocab[end_mark], + unk=self._trg_vocab[unk_mark], + delimiter=self._token_delimiter)) + + converters = ComposedConverter(converters) + + self._src_seq_ids = [] + self._trg_seq_ids = None if self._only_src else [] + self._sample_infos = [] + + for i, line in enumerate(self._load_lines(fpattern, tar_fname)): + src_trg_ids = converters(line) + self._src_seq_ids.append(src_trg_ids[0]) + lens = [len(src_trg_ids[0])] + if not self._only_src: + self._trg_seq_ids.append(src_trg_ids[1]) + lens.append(len(src_trg_ids[1])) + self._sample_infos.append(SampleInfo(i, max(lens), min(lens))) + + def _load_lines(self, fpattern, tar_fname): + fpaths = glob.glob(fpattern) + + if len(fpaths) == 1 and tarfile.is_tarfile(fpaths[0]): + if tar_fname is None: + raise Exception("If tar file provided, please set tar_fname.") + + f = tarfile.open(fpaths[0], "r") + for line in f.extractfile(tar_fname): + fields = line.strip("\n").split(self._field_delimiter) + if (not self._only_src and len(fields) == 2) or ( + self._only_src and len(fields) == 1): + yield fields + else: + for fpath in fpaths: + if not os.path.isfile(fpath): + raise IOError("Invalid file: %s" % fpath) + + with open(fpath, "r") as f: + for line in f: + fields = line.strip("\n").split(self._field_delimiter) + if (not self._only_src and len(fields) == 2) or ( + self._only_src and len(fields) == 1): + yield fields + + @staticmethod + def load_dict(dict_path, reverse=False): + word_dict = {} + with open(dict_path, "r") as fdict: + for idx, line in enumerate(fdict): + if reverse: + word_dict[idx] = line.strip("\n") + else: + word_dict[line.strip("\n")] = idx + return word_dict + + def batch_generator(self): + # global sort or global shuffle + if self._sort_type == SortType.GLOBAL: + infos = sorted( + self._sample_infos, key=lambda x: x.max_len, reverse=True) + else: + if self._shuffle: + infos = self._sample_infos + self._random.shuffle(infos) + else: + infos = self._sample_infos + + if self._sort_type == SortType.POOL: + for i in range(0, len(infos), self._pool_size): + infos[i:i + self._pool_size] = sorted( + infos[i:i + self._pool_size], key=lambda x: x.max_len) + + # concat batch + batches = [] + batch_creator = TokenBatchCreator( + self._batch_size + ) if self._use_token_batch else SentenceBatchCreator(self._batch_size) + batch_creator = MinMaxFilter(self._max_length, self._min_length, + batch_creator) + + for info in infos: + batch = batch_creator.append(info) + if batch is not None: + batches.append(batch) + + if not self._clip_last_batch and len(batch_creator.batch) != 0: + batches.append(batch_creator.batch) + + if self._shuffle_batch: + self._random.shuffle(batches) + + for batch in batches: + batch_ids = [info.i for info in batch] + + if self._only_src: + yield [[self._src_seq_ids[idx]] for idx in batch_ids] + else: + yield [(self._src_seq_ids[idx], self._trg_seq_ids[idx][:-1], + self._trg_seq_ids[idx][1:]) for idx in batch_ids] + + +#from transformer_model import transformer +def position_encoding_init(n_position, d_pos_vec): + """ + Generate the initial values for the sinusoid position encoding table. + """ + position_enc = np.array([[ + pos / np.power(10000, 2 * (j // 2) / d_pos_vec) + for j in range(d_pos_vec) + ] if pos != 0 else np.zeros(d_pos_vec) for pos in range(n_position)]) + position_enc[1:, 0::2] = np.sin(position_enc[1:, 0::2]) # dim 2i + position_enc[1:, 1::2] = np.cos(position_enc[1:, 1::2]) # dim 2i+1 + return position_enc.astype("float32") + + +def multi_head_attention(queries, + keys, + values, + attn_bias, + d_key, + d_value, + d_model, + n_head=1, + dropout_rate=0., + cache=None): + """ + Multi-Head Attention. Note that attn_bias is added to the logit before + computing softmax activiation to mask certain selected positions so that + they will not considered in attention weights. + """ + if not (len(queries.shape) == len(keys.shape) == len(values.shape) == 3): + raise ValueError( + "Inputs: quries, keys and values should all be 3-D tensors.") + + def __compute_qkv(queries, keys, values, n_head, d_key, d_value): + """ + Add linear projection to queries, keys, and values. + """ + q = layers.fc(input=queries, + size=d_key * n_head, + num_flatten_dims=2, + param_attr=const_para_attr, + bias_attr=const_bias_attr) + k = layers.fc(input=keys, + size=d_key * n_head, + num_flatten_dims=2, + param_attr=const_para_attr, + bias_attr=const_bias_attr) + v = layers.fc(input=values, + size=d_value * n_head, + num_flatten_dims=2, + param_attr=const_para_attr, + bias_attr=const_bias_attr) + return q, k, v + + def __split_heads(x, n_head): + """ + Reshape the last dimension of inpunt tensor x so that it becomes two + dimensions and then transpose. Specifically, input a tensor with shape + [bs, max_sequence_length, n_head * hidden_dim] then output a tensor + with shape [bs, n_head, max_sequence_length, hidden_dim]. + """ + if n_head == 1: + return x + + hidden_size = x.shape[-1] + # The value 0 in shape attr means copying the corresponding dimension + # size of the input as the output dimension size. + reshaped = layers.reshape( + x=x, shape=[0, 0, n_head, hidden_size // n_head]) + + # permuate the dimensions into: + # [batch_size, n_head, max_sequence_len, hidden_size_per_head] + return layers.transpose(x=reshaped, perm=[0, 2, 1, 3]) + + def __combine_heads(x): + """ + Transpose and then reshape the last two dimensions of inpunt tensor x + so that it becomes one dimension, which is reverse to __split_heads. + """ + if len(x.shape) == 3: return x + if len(x.shape) != 4: + raise ValueError("Input(x) should be a 4-D Tensor.") + + trans_x = layers.transpose(x, perm=[0, 2, 1, 3]) + # The value 0 in shape attr means copying the corresponding dimension + # size of the input as the output dimension size. + return layers.reshape( + x=trans_x, + shape=map(int, [0, 0, trans_x.shape[2] * trans_x.shape[3]])) + + def scaled_dot_product_attention(q, k, v, attn_bias, d_model, dropout_rate): + """ + Scaled Dot-Product Attention + """ + scaled_q = layers.scale(x=q, scale=d_model**-0.5) + product = layers.matmul(x=scaled_q, y=k, transpose_y=True) + if attn_bias: + product += attn_bias + weights = layers.softmax(product) + if dropout_rate: + weights = layers.dropout( + weights, + dropout_prob=dropout_rate, + seed=ModelHyperParams.dropout_seed, + is_test=False) + out = layers.matmul(weights, v) + return out + + q, k, v = __compute_qkv(queries, keys, values, n_head, d_key, d_value) + + if cache is not None: # use cache and concat time steps + k = cache["k"] = layers.concat([cache["k"], k], axis=1) + v = cache["v"] = layers.concat([cache["v"], v], axis=1) + + q = __split_heads(q, n_head) + k = __split_heads(k, n_head) + v = __split_heads(v, n_head) + + ctx_multiheads = scaled_dot_product_attention(q, k, v, attn_bias, d_model, + dropout_rate) + + out = __combine_heads(ctx_multiheads) + + # Project back to the model size. + proj_out = layers.fc(input=out, + size=d_model, + num_flatten_dims=2, + param_attr=const_para_attr, + bias_attr=const_bias_attr) + return proj_out + + +def positionwise_feed_forward(x, d_inner_hid, d_hid): + """ + Position-wise Feed-Forward Networks. + This module consists of two linear transformations with a ReLU activation + in between, which is applied to each position separately and identically. + """ + hidden = layers.fc(input=x, + size=d_inner_hid, + num_flatten_dims=2, + act="relu", + param_attr=const_para_attr, + bias_attr=const_bias_attr) + out = layers.fc(input=hidden, + size=d_hid, + num_flatten_dims=2, + param_attr=const_para_attr, + bias_attr=const_bias_attr) + return out + + +def pre_post_process_layer(prev_out, out, process_cmd, dropout_rate=0.): + """ + Add residual connection, layer normalization and droput to the out tensor + optionally according to the value of process_cmd. + This will be used before or after multi-head attention and position-wise + feed-forward networks. + """ + for cmd in process_cmd: + if cmd == "a": # add residual connection + out = out + prev_out if prev_out else out + elif cmd == "n": # add layer normalization + out = layers.layer_norm( + out, + begin_norm_axis=len(out.shape) - 1, + param_attr=fluid.initializer.Constant(1.), + bias_attr=fluid.initializer.Constant(0.)) + elif cmd == "d": # add dropout + if dropout_rate: + out = layers.dropout( + out, + dropout_prob=dropout_rate, + seed=ModelHyperParams.dropout_seed, + is_test=False) + return out + + +pre_process_layer = partial(pre_post_process_layer, None) +post_process_layer = pre_post_process_layer + + +def prepare_encoder(src_word, + src_pos, + src_vocab_size, + src_emb_dim, + src_max_len, + dropout_rate=0., + word_emb_param_name=None, + pos_enc_param_name=None): + """Add word embeddings and position encodings. + The output tensor has a shape of: + [batch_size, max_src_length_in_batch, d_model]. + This module is used at the bottom of the encoder stacks. + """ + if TrainTaskConfig.check_acc: + src_word_emb = layers.embedding( + src_word, + size=[src_vocab_size, src_emb_dim], + param_attr=fluid.ParamAttr( + name=word_emb_param_name, + initializer=fluid.initializer.ConstantInitializer(0.001))) + else: + src_word_emb = layers.embedding( + src_word, + size=[src_vocab_size, src_emb_dim], + param_attr=fluid.ParamAttr( + name=word_emb_param_name, + initializer=fluid.initializer.Normal(0., src_emb_dim**-0.5))) + + src_word_emb = layers.scale(x=src_word_emb, scale=src_emb_dim**0.5) + src_pos_enc = layers.embedding( + src_pos, + size=[src_max_len, src_emb_dim], + param_attr=fluid.ParamAttr( + name=pos_enc_param_name, + trainable=False, + initializer=fluid.initializer.ConstantInitializer(0.001))) + enc_input = src_word_emb + src_pos_enc + return layers.dropout( + enc_input, + dropout_prob=dropout_rate, + seed=ModelHyperParams.dropout_seed, + is_test=False) if dropout_rate else enc_input + + +prepare_encoder = partial( + prepare_encoder, pos_enc_param_name=pos_enc_param_names[0]) +prepare_decoder = partial( + prepare_encoder, pos_enc_param_name=pos_enc_param_names[1]) + + +def encoder_layer(enc_input, + attn_bias, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate=0.): + """The encoder layers that can be stacked to form a deep encoder. + This module consits of a multi-head (self) attention followed by + position-wise feed-forward networks and both the two components companied + with the post_process_layer to add residual connection, layer normalization + and droput. + """ + attn_output = multi_head_attention(enc_input, enc_input, enc_input, + attn_bias, d_key, d_value, d_model, + n_head, dropout_rate) + attn_output = post_process_layer(enc_input, attn_output, "dan", + dropout_rate) + ffd_output = positionwise_feed_forward(attn_output, d_inner_hid, d_model) + return post_process_layer(attn_output, ffd_output, "dan", dropout_rate) + + +def encoder(enc_input, + attn_bias, + n_layer, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate=0.): + """ + The encoder is composed of a stack of identical layers returned by calling + encoder_layer. + """ + for i in range(n_layer): + enc_output = encoder_layer(enc_input, attn_bias, n_head, d_key, d_value, + d_model, d_inner_hid, dropout_rate) + enc_input = enc_output + return enc_output + + +def decoder_layer(dec_input, + enc_output, + slf_attn_bias, + dec_enc_attn_bias, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate=0., + cache=None): + """ The layer to be stacked in decoder part. + The structure of this module is similar to that in the encoder part except + a multi-head attention is added to implement encoder-decoder attention. + """ + slf_attn_output = multi_head_attention( + dec_input, + dec_input, + dec_input, + slf_attn_bias, + d_key, + d_value, + d_model, + n_head, + dropout_rate, + cache, ) + slf_attn_output = post_process_layer( + dec_input, + slf_attn_output, + "dan", # residual connection + dropout + layer normalization + dropout_rate, ) + enc_attn_output = multi_head_attention( + slf_attn_output, + enc_output, + enc_output, + dec_enc_attn_bias, + d_key, + d_value, + d_model, + n_head, + dropout_rate, ) + enc_attn_output = post_process_layer( + slf_attn_output, + enc_attn_output, + "dan", # residual connection + dropout + layer normalization + dropout_rate, ) + ffd_output = positionwise_feed_forward( + enc_attn_output, + d_inner_hid, + d_model, ) + dec_output = post_process_layer( + enc_attn_output, + ffd_output, + "dan", # residual connection + dropout + layer normalization + dropout_rate, ) + return dec_output + + +def decoder(dec_input, + enc_output, + dec_slf_attn_bias, + dec_enc_attn_bias, + n_layer, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate=0., + caches=None): + """ + The decoder is composed of a stack of identical decoder_layer layers. + """ + for i in range(n_layer): + cache = None + if caches is not None: + cache = caches[i] + + dec_output = decoder_layer( + dec_input, + enc_output, + dec_slf_attn_bias, + dec_enc_attn_bias, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate, + cache=cache) + dec_input = dec_output + return dec_output + + +def make_all_inputs(input_fields): + """ + Define the input data layers for the transformer model. + """ + inputs = [] + for input_field in input_fields: + input_var = layers.data( + name=input_field, + shape=input_descs[input_field][0], + dtype=input_descs[input_field][1], + lod_level=input_descs[input_field][2] + if len(input_descs[input_field]) == 3 else 0, + append_batch_size=False) + inputs.append(input_var) + return inputs + + +def transformer( + src_vocab_size, + trg_vocab_size, + max_length, + n_layer, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate, + weight_sharing, + label_smooth_eps, ): + if weight_sharing: + assert src_vocab_size == src_vocab_size, ( + "Vocabularies in source and target should be same for weight sharing." + ) + enc_inputs = make_all_inputs(encoder_data_input_fields) + + enc_output = wrap_encoder( + src_vocab_size, + max_length, + n_layer, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate, + weight_sharing, + enc_inputs, ) + + dec_inputs = make_all_inputs(decoder_data_input_fields[:-1]) + + predict = wrap_decoder( + trg_vocab_size, + max_length, + n_layer, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate, + weight_sharing, + dec_inputs, + enc_output, ) + + # Padding index do not contribute to the total loss. The weights is used to + # cancel padding index in calculating the loss. + label, weights = make_all_inputs(label_data_input_fields) + if label_smooth_eps: + label = layers.label_smooth( + label=layers.one_hot( + input=label, depth=trg_vocab_size), + epsilon=label_smooth_eps) + + cost = layers.softmax_with_cross_entropy( + logits=layers.reshape( + predict, shape=[-1, trg_vocab_size]), + label=label, + soft_label=True if label_smooth_eps else False) + weighted_cost = cost * weights + sum_cost = layers.reduce_sum(weighted_cost) + token_num = layers.reduce_sum(weights) + avg_cost = sum_cost / token_num + avg_cost.stop_gradient = True + return sum_cost, avg_cost, predict, token_num + + +def wrap_encoder(src_vocab_size, + max_length, + n_layer, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate, + weight_sharing, + enc_inputs=None): + """ + The wrapper assembles together all needed layers for the encoder. + """ + if enc_inputs is None: + # This is used to implement independent encoder program in inference. + src_word, src_pos, src_slf_attn_bias = \ + make_all_inputs(encoder_data_input_fields) + else: + src_word, src_pos, src_slf_attn_bias = \ + enc_inputs + enc_input = prepare_encoder( + src_word, + src_pos, + src_vocab_size, + d_model, + max_length, + dropout_rate, + word_emb_param_name=word_emb_param_names[0]) + enc_output = encoder(enc_input, src_slf_attn_bias, n_layer, n_head, d_key, + d_value, d_model, d_inner_hid, dropout_rate) + return enc_output + + +def wrap_decoder(trg_vocab_size, + max_length, + n_layer, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate, + weight_sharing, + dec_inputs=None, + enc_output=None, + caches=None): + """ + The wrapper assembles together all needed layers for the decoder. + """ + if dec_inputs is None: + # This is used to implement independent decoder program in inference. + trg_word, trg_pos, trg_slf_attn_bias, trg_src_attn_bias, \ + enc_output = make_all_inputs( + decoder_data_input_fields + decoder_util_input_fields) + else: + trg_word, trg_pos, trg_slf_attn_bias, trg_src_attn_bias = dec_inputs + + dec_input = prepare_decoder( + trg_word, + trg_pos, + trg_vocab_size, + d_model, + max_length, + dropout_rate, + word_emb_param_name=word_emb_param_names[0] + if weight_sharing else word_emb_param_names[1]) + dec_output = decoder( + dec_input, + enc_output, + trg_slf_attn_bias, + trg_src_attn_bias, + n_layer, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate, + caches=caches) + # Return logits for training and probs for inference. + if weight_sharing: + predict = layers.matmul( + x=dec_output, + y=fluid.get_var(word_emb_param_names[0]), + transpose_y=True) + else: + predict = layers.fc(input=dec_output, + size=trg_vocab_size, + num_flatten_dims=2, + param_attr=const_para_attr, + bias_attr=const_bias_attr) + if dec_inputs is None: + predict = layers.softmax(predict) + return predict + + +def fast_decode( + src_vocab_size, + trg_vocab_size, + max_in_len, + n_layer, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate, + weight_sharing, + beam_size, + max_out_len, + eos_idx, ): + """ + Use beam search to decode. Caches will be used to store states of history + steps which can make the decoding faster. + """ + enc_output = wrap_encoder(src_vocab_size, max_in_len, n_layer, n_head, + d_key, d_value, d_model, d_inner_hid, + dropout_rate, weight_sharing) + start_tokens, init_scores, trg_src_attn_bias = \ + make_all_inputs(fast_decoder_data_input_fields ) + + def beam_search(): + max_len = layers.fill_constant( + shape=[1], dtype=start_tokens.dtype, value=max_out_len) + step_idx = layers.fill_constant( + shape=[1], dtype=start_tokens.dtype, value=0) + cond = layers.less_than(x=step_idx, y=max_len) + while_op = layers.While(cond) + # array states will be stored for each step. + ids = layers.array_write( + layers.reshape(start_tokens, (-1, 1)), step_idx) + scores = layers.array_write(init_scores, step_idx) + # cell states will be overwrited at each step. + # caches contains states of history steps to reduce redundant + # computation in decoder. + caches = [{ + "k": layers.fill_constant_batch_size_like( + input=start_tokens, + shape=[-1, 0, d_model], + dtype=enc_output.dtype, + value=0), + "v": layers.fill_constant_batch_size_like( + input=start_tokens, + shape=[-1, 0, d_model], + dtype=enc_output.dtype, + value=0) + } for i in range(n_layer)] + with while_op.block(): + pre_ids = layers.array_read(array=ids, i=step_idx) + pre_ids = layers.reshape(pre_ids, (-1, 1, 1)) + pre_scores = layers.array_read(array=scores, i=step_idx) + # sequence_expand can gather sequences according to lod thus can be + # used in beam search to sift states corresponding to selected ids. + pre_src_attn_bias = layers.sequence_expand( + x=trg_src_attn_bias, y=pre_scores) + pre_enc_output = layers.sequence_expand(x=enc_output, y=pre_scores) + pre_caches = [{ + "k": layers.sequence_expand( + x=cache["k"], y=pre_scores), + "v": layers.sequence_expand( + x=cache["v"], y=pre_scores), + } for cache in caches] + pre_pos = layers.elementwise_mul( + x=layers.fill_constant_batch_size_like( + input=pre_enc_output, # cann't use pre_ids here since it has lod + value=1, + shape=[-1, 1, 1], + dtype=pre_ids.dtype), + y=layers.increment( + x=step_idx, value=1.0, in_place=False), + axis=0) + logits = wrap_decoder( + trg_vocab_size, + max_in_len, + n_layer, + n_head, + d_key, + d_value, + d_model, + d_inner_hid, + dropout_rate, + weight_sharing, + dec_inputs=(pre_ids, pre_pos, None, pre_src_attn_bias), + enc_output=pre_enc_output, + caches=pre_caches) + logits = layers.reshape(logits, (-1, trg_vocab_size)) + + topk_scores, topk_indices = layers.topk( + input=layers.softmax(logits), k=beam_size) + accu_scores = layers.elementwise_add( + x=layers.log(topk_scores), + y=layers.reshape( + pre_scores, shape=[-1]), + axis=0) + # beam_search op uses lod to distinguish branches. + topk_indices = layers.lod_reset(topk_indices, pre_ids) + selected_ids, selected_scores = layers.beam_search( + pre_ids=pre_ids, + pre_scores=pre_scores, + ids=topk_indices, + scores=accu_scores, + beam_size=beam_size, + end_id=eos_idx) + + layers.increment(x=step_idx, value=1.0, in_place=True) + # update states + layers.array_write(selected_ids, i=step_idx, array=ids) + layers.array_write(selected_scores, i=step_idx, array=scores) + layers.assign(pre_src_attn_bias, trg_src_attn_bias) + layers.assign(pre_enc_output, enc_output) + for i in range(n_layer): + layers.assign(pre_caches[i]["k"], caches[i]["k"]) + layers.assign(pre_caches[i]["v"], caches[i]["v"]) + length_cond = layers.less_than(x=step_idx, y=max_len) + finish_cond = layers.logical_not(layers.is_empty(x=selected_ids)) + layers.logical_and(x=length_cond, y=finish_cond, out=cond) + + finished_ids, finished_scores = layers.beam_search_decode( + ids, scores, beam_size=beam_size, end_id=eos_idx) + return finished_ids, finished_scores + + finished_ids, finished_scores = beam_search() + return finished_ids, finished_scores + + +def get_model(is_dist, is_async): + sum_cost, avg_cost, predict, token_num = transformer( + ModelHyperParams.src_vocab_size, ModelHyperParams.trg_vocab_size, + ModelHyperParams.max_length + 1, ModelHyperParams.n_layer, + ModelHyperParams.n_head, ModelHyperParams.d_key, + ModelHyperParams.d_value, ModelHyperParams.d_model, + ModelHyperParams.d_inner_hid, ModelHyperParams.dropout, + ModelHyperParams.weight_sharing, TrainTaskConfig.label_smooth_eps) + + local_lr_scheduler = LearningRateScheduler(ModelHyperParams.d_model, + TrainTaskConfig.warmup_steps, + TrainTaskConfig.learning_rate) + + if not is_dist: + optimizer = fluid.optimizer.Adam( + learning_rate=local_lr_scheduler.learning_rate, + beta1=TrainTaskConfig.beta1, + beta2=TrainTaskConfig.beta2, + epsilon=TrainTaskConfig.eps) + optimizer.minimize(sum_cost) + elif is_async: + optimizer = fluid.optimizer.SGD(0.003) + optimizer.minimize(sum_cost) + else: + lr_decay = fluid.layers\ + .learning_rate_scheduler\ + .noam_decay(ModelHyperParams.d_model, + TrainTaskConfig.warmup_steps) + + optimizer = fluid.optimizer.Adam( + learning_rate=lr_decay, + beta1=TrainTaskConfig.beta1, + beta2=TrainTaskConfig.beta2, + epsilon=TrainTaskConfig.eps) + optimizer.minimize(sum_cost) + + return sum_cost, avg_cost, predict, token_num, local_lr_scheduler + + +def update_args(): + src_dict = DataReader.load_dict(TrainTaskConfig.src_vocab_fpath) + trg_dict = DataReader.load_dict(TrainTaskConfig.trg_vocab_fpath) + dict_args = [ + "src_vocab_size", str(len(src_dict)), "trg_vocab_size", + str(len(trg_dict)), "bos_idx", + str(src_dict[TrainTaskConfig.special_token[0]]), "eos_idx", + str(src_dict[TrainTaskConfig.special_token[1]]), "unk_idx", + str(src_dict[TrainTaskConfig.special_token[2]]) + ] + merge_cfg_from_list(dict_args, [TrainTaskConfig, ModelHyperParams]) + + +class DistTransformer2x2(TestDistRunnerBase): + def run_pserver(self, args): + get_model(True, not args.sync_mode) + t = self.get_transpiler(args.trainer_id, + fluid.default_main_program(), args.endpoints, + args.trainers, args.sync_mode) + pserver_prog = t.get_pserver_program(args.current_endpoint) + startup_prog = t.get_startup_program(args.current_endpoint, + pserver_prog) place = fluid.CPUPlace() exe = fluid.Executor(place) exe.run(startup_prog) exe.run(pserver_prog) - def _wait_ps_ready(self, pid): - retry_times = 20 - while True: - assert retry_times >= 0, "wait ps ready failed" - time.sleep(3) - print("waiting ps ready: ", pid) - try: - # the listen_and_serv_op would touch a file which contains the listen port - # on the /tmp directory until it was ready to process all the RPC call. - os.stat("/tmp/paddle.%d.port" % pid) - return - except os.error: - retry_times -= 1 - - def run_trainer(self, place, endpoints, trainer_id, trainers, is_dist=True): - avg_cost = get_model() - if is_dist: - t = get_transpiler(trainer_id, - fluid.default_main_program(), endpoints, - trainers) + def run_trainer(self, place, args): + + sum_cost, avg_cost, predict, token_num, local_lr_scheduler = get_model( + args.is_dist, not args.sync_mode) + + if args.is_dist: + t = self.get_transpiler(args.trainer_id, + fluid.default_main_program(), + args.endpoints, args.trainers, + args.sync_mode) trainer_prog = t.get_trainer_program() + TrainTaskConfig.batch_size = 10 + TrainTaskConfig.train_file_pattern = TrainTaskConfig.data_path + "train.tok.clean.bpe.32000.en-de.train_{}".format( + args.trainer_id) else: + TrainTaskConfig.batch_size = 20 trainer_prog = fluid.default_main_program() startup_exe = fluid.Executor(place) - startup_exe.run(fluid.default_startup_program()) - - strategy = fluid.ExecutionStrategy() - strategy.num_threads = 1 - strategy.allow_op_delay = False - exe = fluid.ParallelExecutor( - True, loss_name=avg_cost.name, exec_strategy=strategy) - - first_loss, = exe.run(fetch_list=[avg_cost.name]) - print(first_loss) - for i in six.moves.xrange(5): - _ = exe.run(fetch_list=[avg_cost.name]) - last_loss, = exe.run(fetch_list=[avg_cost.name]) - print(last_loss) - - -def main(role="pserver", - endpoints="127.0.0.1:9123", - trainer_id=0, - current_endpoint="127.0.0.1:9123", - trainers=1, - is_dist=True): - - reader = paddle.batch( - wmt16.train(ModelHyperParams.src_vocab_size, - ModelHyperParams.trg_vocab_size), - batch_size=transformer_model.batch_size) - - with fluid.recordio_writer.create_recordio_writer( - WMT16_RECORDIO_FILE) as writer: - for batch in reader(): - for tensor in prepare_batch_input( - batch, ModelHyperParams.src_pad_idx, - ModelHyperParams.trg_pad_idx, ModelHyperParams.n_head): - t = fluid.LoDTensor() - t.set(tensor, fluid.CPUPlace()) - writer.append_tensor(t) - writer.complete_append_tensor() - - model = DistTransformer2x2() - if role == "pserver": - model.run_pserver(endpoints, trainers, current_endpoint, trainer_id) - else: - p = fluid.CUDAPlace(0) if core.is_compiled_with_cuda( - ) else fluid.CPUPlace() - model.run_trainer(p, endpoints, trainer_id, trainers, is_dist) + + TrainTaskConfig.local = not args.is_dist + + train_loop(startup_exe, trainer_prog, 1, sum_cost, avg_cost, + local_lr_scheduler, token_num, predict) if __name__ == "__main__": - if len(sys.argv) != 7: - print( - "Usage: python dist_transformer.py [pserver/trainer] [endpoints] [trainer_id] [current_endpoint] [trainers] [is_dist]" - ) - role = sys.argv[1] - endpoints = sys.argv[2] - trainer_id = int(sys.argv[3]) - current_endpoint = sys.argv[4] - trainers = int(sys.argv[5]) - is_dist = True if sys.argv[6] == "TRUE" else False - main( - role=role, - endpoints=endpoints, - trainer_id=trainer_id, - current_endpoint=current_endpoint, - trainers=trainers, - is_dist=is_dist) + update_args() + runtime_main(DistTransformer2x2) diff --git a/python/paddle/fluid/tests/unittests/dist_word2vec.py b/python/paddle/fluid/tests/unittests/dist_word2vec.py index 0ad994a258c04cabc807823b7d2a8ae8bb62ab2c..f3e740fc7027a4a562b836c3113b87d55062c185 100644 --- a/python/paddle/fluid/tests/unittests/dist_word2vec.py +++ b/python/paddle/fluid/tests/unittests/dist_word2vec.py @@ -49,28 +49,32 @@ class TestDistWord2vec2x2(TestDistRunnerBase): dtype='float32', is_sparse=IS_SPARSE, param_attr=fluid.ParamAttr( - name='shared_w', initializer=fluid.initializer.Constant())) + name='shared_w', + initializer=fluid.initializer.Constant(value=0.1))) embed_second = fluid.layers.embedding( input=words[1], size=[dict_size, EMBED_SIZE], dtype='float32', is_sparse=IS_SPARSE, param_attr=fluid.ParamAttr( - name='shared_w', initializer=fluid.initializer.Constant())) + name='shared_w', + initializer=fluid.initializer.Constant(value=0.1))) embed_third = fluid.layers.embedding( input=words[2], size=[dict_size, EMBED_SIZE], dtype='float32', is_sparse=IS_SPARSE, param_attr=fluid.ParamAttr( - name='shared_w', initializer=fluid.initializer.Constant())) + name='shared_w', + initializer=fluid.initializer.Constant(value=0.1))) embed_forth = fluid.layers.embedding( input=words[3], size=[dict_size, EMBED_SIZE], dtype='float32', is_sparse=IS_SPARSE, param_attr=fluid.ParamAttr( - name='shared_w', initializer=fluid.initializer.Constant())) + name='shared_w', + initializer=fluid.initializer.Constant(value=0.1))) concat_embed = fluid.layers.concat( input=[embed_first, embed_second, embed_third, embed_forth], @@ -80,13 +84,13 @@ class TestDistWord2vec2x2(TestDistRunnerBase): size=HIDDEN_SIZE, act='sigmoid', param_attr=fluid.ParamAttr( - initializer=fluid.initializer.Constant())) + initializer=fluid.initializer.Constant(value=0.1))) predict_word = fluid.layers.fc( input=hidden1, size=dict_size, act='softmax', param_attr=fluid.ParamAttr( - initializer=fluid.initializer.Constant())) + initializer=fluid.initializer.Constant(value=0.1))) cost = fluid.layers.cross_entropy( input=predict_word, label=words[4]) avg_cost = fluid.layers.mean(cost) diff --git a/python/paddle/fluid/tests/unittests/test_dist_base.py b/python/paddle/fluid/tests/unittests/test_dist_base.py index 4c71181d0d736bd1a8796b2d38ed1667557e3db8..f730a17477d2c1d1edc7ee55becc632d96c0e260 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_base.py @@ -21,7 +21,7 @@ import sys import six import signal import subprocess -import six +import argparse class TestDistRunnerBase(object): @@ -30,7 +30,7 @@ class TestDistRunnerBase(object): "get_model should be implemented by child classes.") def get_transpiler(self, trainer_id, main_program, pserver_endpoints, - trainers): + trainers, sync_mode): # NOTE: import fluid until runtime, or else forking processes will cause error. import paddle import paddle.fluid as fluid @@ -39,33 +39,35 @@ class TestDistRunnerBase(object): trainer_id=trainer_id, program=main_program, pservers=pserver_endpoints, - trainers=trainers) + trainers=trainers, + sync_mode=sync_mode) return t - def run_pserver(self, pserver_endpoints, trainers, current_endpoint, - trainer_id): + def run_pserver(self, args): import paddle import paddle.fluid as fluid self.get_model(batch_size=2) - t = self.get_transpiler(trainer_id, - fluid.default_main_program(), pserver_endpoints, - trainers) - pserver_prog = t.get_pserver_program(current_endpoint) - startup_prog = t.get_startup_program(current_endpoint, pserver_prog) + t = self.get_transpiler(args.trainer_id, + fluid.default_main_program(), args.endpoints, + args.trainers, args.sync_mode) + pserver_prog = t.get_pserver_program(args.current_endpoint) + startup_prog = t.get_startup_program(args.current_endpoint, + pserver_prog) place = fluid.CPUPlace() exe = fluid.Executor(place) exe.run(startup_prog) exe.run(pserver_prog) - def run_trainer(self, place, endpoints, trainer_id, trainers, is_dist=True): + def run_trainer(self, place, args): import paddle import paddle.fluid as fluid test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \ self.get_model(batch_size=2) - if is_dist: - t = self.get_transpiler(trainer_id, - fluid.default_main_program(), endpoints, - trainers) + if args.is_dist: + t = self.get_transpiler(args.trainer_id, + fluid.default_main_program(), + args.endpoints, args.trainers, + args.sync_mode) trainer_prog = t.get_trainer_program() else: trainer_prog = fluid.default_main_program() @@ -76,8 +78,18 @@ class TestDistRunnerBase(object): strategy = fluid.ExecutionStrategy() strategy.num_threads = 1 strategy.allow_op_delay = False + build_stra = fluid.BuildStrategy() + + if args.use_reduce: + build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce + else: + build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce + exe = fluid.ParallelExecutor( - True, loss_name=avg_cost.name, exec_strategy=strategy) + True, + loss_name=avg_cost.name, + exec_strategy=strategy, + build_strategy=build_stra) feed_var_list = [ var for var in trainer_prog.global_block().vars.values() @@ -106,45 +118,64 @@ def runtime_main(test_class): import paddle.fluid as fluid import paddle.fluid.core as core - if len(sys.argv) != 7: - print( - "Usage: python dist_se_resnext.py [pserver/trainer] [endpoints] [trainer_id] [current_endpoint] [trainers] [is_dist]" - ) - role = sys.argv[1] - endpoints = sys.argv[2] - trainer_id = int(sys.argv[3]) - current_endpoint = sys.argv[4] - trainers = int(sys.argv[5]) - is_dist = True if sys.argv[6] == "TRUE" else False + parser = argparse.ArgumentParser(description='Run dist test.') + parser.add_argument( + '--role', type=str, required=True, choices=['pserver', 'trainer']) + parser.add_argument('--endpoints', type=str, required=False, default="") + parser.add_argument('--is_dist', action='store_true') + parser.add_argument('--trainer_id', type=int, required=False, default=0) + parser.add_argument('--trainers', type=int, required=False, default=1) + parser.add_argument( + '--current_endpoint', type=str, required=False, default="") + parser.add_argument('--sync_mode', action='store_true') + parser.add_argument('--mem_opt', action='store_true') + parser.add_argument('--use_reduce', action='store_true') + + args = parser.parse_args() model = test_class() - if role == "pserver": - model.run_pserver(endpoints, trainers, current_endpoint, trainer_id) + if args.role == "pserver": + model.run_pserver(args) else: p = fluid.CUDAPlace(0) if core.is_compiled_with_cuda( ) else fluid.CPUPlace() - model.run_trainer(p, endpoints, trainer_id, trainers, is_dist) + model.run_trainer(p, args) import paddle.compat as cpt class TestDistBase(unittest.TestCase): + def _setup_config(self): + raise NotImplementedError("tests should have _setup_config implemented") + def setUp(self): self._trainers = 2 self._pservers = 2 self._ps_endpoints = "127.0.0.1:9123,127.0.0.1:9124" self._python_interp = "python" + self._sync_mode = True + self._mem_opt = False + self._use_reduce = False + self._setup_config() def start_pserver(self, model_file, check_error_log): ps0_ep, ps1_ep = self._ps_endpoints.split(",") - ps0_cmd = "%s %s pserver %s 0 %s %d TRUE" % \ + ps_cmd = "%s %s --role pserver --endpoints %s --trainer_id 0 --current_endpoint %s --trainers %d --is_dist" + ps0_cmd = ps_cmd % \ (self._python_interp, model_file, self._ps_endpoints, ps0_ep, self._trainers) - ps1_cmd = "%s %s pserver %s 0 %s %d TRUE" % \ + ps1_cmd = ps_cmd % \ (self._python_interp, model_file, self._ps_endpoints, ps1_ep, self._trainers) + if self._sync_mode: + ps0_cmd += " --sync_mode" + ps1_cmd += " --sync_mode" + if self._mem_opt: + ps0_cmd += " --mem_opt" + ps1_cmd += " --mem_opt" + ps0_pipe = subprocess.PIPE ps1_pipe = subprocess.PIPE if check_error_log: @@ -195,9 +226,7 @@ class TestDistBase(unittest.TestCase): # Run local to get a base line env_local = {"CUDA_VISIBLE_DEVICES": "0"} env_local.update(required_envs) - local_cmd = "%s %s trainer %s 0 %s %d FLASE" % \ - (self._python_interp, model_file, - "127.0.0.1:1234", "127.0.0.1:1234", 1) + local_cmd = "%s %s --role trainer" % (self._python_interp, model_file) if not check_error_log: local_proc = subprocess.Popen( local_cmd.split(" "), @@ -226,12 +255,23 @@ class TestDistBase(unittest.TestCase): self._wait_ps_ready(ps1.pid) ps0_ep, ps1_ep = self._ps_endpoints.split(",") - tr0_cmd = "%s %s trainer %s 0 %s %d TRUE" % \ - (self._python_interp, model_file, self._ps_endpoints, ps0_ep, - self._trainers) - tr1_cmd = "%s %s trainer %s 1 %s %d TRUE" % \ - (self._python_interp, model_file, self._ps_endpoints, ps1_ep, - self._trainers) + tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --trainers %d --is_dist" + tr0_cmd = tr_cmd % \ + (self._python_interp, model_file, self._ps_endpoints, + 0, ps0_ep, self._trainers) + tr1_cmd = tr_cmd % \ + (self._python_interp, model_file, self._ps_endpoints, + 1, ps1_ep, self._trainers) + + if self._sync_mode: + tr0_cmd += " --sync_mode" + tr1_cmd += " --sync_mode" + if self._mem_opt: + tr0_cmd += " --mem_opt" + tr1_cmd += " --mem_opt" + if self._use_reduce: + tr0_cmd += " --use_reduce" + tr1_cmd += " --use_reduce" env0 = {"CUDA_VISIBLE_DEVICES": "0"} env1 = {"CUDA_VISIBLE_DEVICES": "1"} @@ -282,6 +322,10 @@ class TestDistBase(unittest.TestCase): # FIXME: use terminate() instead of sigkill. os.kill(ps0.pid, signal.SIGKILL) os.kill(ps1.pid, signal.SIGKILL) + ps0.terminate() + ps1.terminate() + ps0.wait() + ps1.wait() FNULL.close() self.assertAlmostEqual(local_first_loss, dist_first_loss, delta=delta) diff --git a/python/paddle/fluid/tests/unittests/test_dist_mnist.py b/python/paddle/fluid/tests/unittests/test_dist_mnist.py index 4ec68d411b0f0e9ae89b107914e8fd844a19228b..59a137c18c9435ef5c5772d0cc08f197c1d86603 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_dist_mnist.py @@ -17,10 +17,51 @@ import unittest from test_dist_base import TestDistBase -class TestDistSeResneXt2x2(TestDistBase): +class TestDistMnist2x2(TestDistBase): + def _setup_config(self): + self._sync_mode = True + self._use_reduce = False + + def test_se_resnext(self): + self.check_with_place("dist_mnist.py", delta=1e-7) + + +class TestDistMnist2x2WithMemopt(TestDistBase): + def _setup_config(self): + self._sync_mode = True + self._mem_opt = True + def test_se_resnext(self): self.check_with_place("dist_mnist.py", delta=1e-7) +class TestDistMnistAsync(TestDistBase): + def _setup_config(self): + self._sync_mode = False + self._use_reduce = False + + def test_se_resnext(self): + self.check_with_place("dist_mnist.py", delta=200) + + +# FIXME(typhoonzero): enable these tests once we have 4 +# 4 GPUs on CI machine, and the base class should be updated. +# +# class TestDistMnist2x2ReduceMode(TestDistBase): +# def _setup_config(self): +# self._sync_mode = True +# self._use_reduce = True + +# def test_se_resnext(self): +# self.check_with_place("dist_mnist.py", delta=1e-7) + +# class TestDistMnistAsyncReduceMode(TestDistBase): +# def _setup_config(self): +# self._sync_mode = False +# self._use_reduce = True + +# def test_se_resnext(self): +# self.check_with_place("dist_mnist.py", delta=200) + if __name__ == "__main__": unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_dist_se_resnext.py b/python/paddle/fluid/tests/unittests/test_dist_se_resnext.py index 16525f6fdb60a90a44a628fb0648f4130218c102..c0e9fa38e7d1eadd89eff9a8ba4442f888b8120e 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_se_resnext.py +++ b/python/paddle/fluid/tests/unittests/test_dist_se_resnext.py @@ -18,9 +18,20 @@ from test_dist_base import TestDistBase class TestDistSeResneXt2x2(TestDistBase): + def _setup_config(self): + self._sync_mode = True + def test_se_resnext(self): self.check_with_place("dist_se_resnext.py", delta=1e-7) +class TestDistSeResneXt2x2Async(TestDistBase): + def _setup_config(self): + self._sync_mode = False + + def test_se_resnext(self): + self.check_with_place("dist_se_resnext.py", delta=100) + + if __name__ == "__main__": unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_dist_train.py b/python/paddle/fluid/tests/unittests/test_dist_train.py index 9581abdf394d738470d32ae609838832077ee519..083525ccf54d389b60c4aaa9f8c6223f07c773cd 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_train.py +++ b/python/paddle/fluid/tests/unittests/test_dist_train.py @@ -100,7 +100,7 @@ class TestSendOp(unittest.TestCase): main.global_block().append_op( type="fetch_barrier", inputs={}, - outputs={}, + outputs={"Out": []}, attrs={ "endpoints": ["127.0.0.1:{0}".format(port)], RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE diff --git a/python/paddle/fluid/tests/unittests/test_dist_transformer.py b/python/paddle/fluid/tests/unittests/test_dist_transformer.py index 313207ff9ce054f81322224cb6ceafaaf25bbedf..a8e6ce4cfe18384e405f1602429628914d2c2e00 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_transformer.py +++ b/python/paddle/fluid/tests/unittests/test_dist_transformer.py @@ -15,14 +15,55 @@ from __future__ import print_function import unittest +import paddle from test_dist_base import TestDistBase -class TestDistTransformer2x2(TestDistBase): +def download_files(): + url_prefix = 'http://paddle-unittest-data.cdn.bcebos.com/dist_transformer/' + vocab_url = url_prefix + 'vocab.bpe.32000' + vocab_md5 = 'a86d345ca6e27f6591d0dccb1b9be853' + paddle.dataset.common.download(vocab_url, 'test_dist_transformer', + vocab_md5) + + local_train_url = url_prefix + 'train.tok.clean.bpe.32000.en-de' + local_train_md5 = '033eb02b9449e6dd823f050782ac8914' + paddle.dataset.common.download(local_train_url, 'test_dist_transformer', + local_train_md5) + + train0_url = url_prefix + 'train.tok.clean.bpe.32000.en-de.train_0' + train0_md5 = 'ddce7f602f352a0405267285379a38b1' + paddle.dataset.common.download(train0_url, 'test_dist_transformer', + train0_md5) + + train1_url = url_prefix + 'train.tok.clean.bpe.32000.en-de.train_1' + train1_md5 = '8757798200180285b1a619cd7f408747' + paddle.dataset.common.download(train1_url, 'test_dist_transformer', + train1_md5) + + test_url = url_prefix + 'newstest2013.tok.bpe.32000.en-de' + test_md5 = '9dd74a266dbdb25314183899f269b4a2' + paddle.dataset.common.download(test_url, 'test_dist_transformer', test_md5) + + +class TestDistTransformer2x2Sync(TestDistBase): + def _setup_config(self): + self._sync_mode = True + + def test_transformer(self): + download_files() + #Note: loss on test dataset of the first 5 batch are: + # 10.518872, 10.518871, 10.518868, 10.518862, 10.518855 + self.check_with_place("dist_transformer.py", delta=1e-7) + + +class TestDistTransformer2x2Async(TestDistBase): + def _setup_config(self): + self._sync_mode = False + def test_transformer(self): - # TODO(paddle-dev): check if the delta is OK. - # Usually start around ~8000 and converge to ~5000 - self.check_with_place("dist_transformer.py", delta=400) + download_files() + self.check_with_place("dist_transformer.py", delta=1.0) if __name__ == "__main__": diff --git a/python/paddle/fluid/tests/unittests/test_dist_word2vec.py b/python/paddle/fluid/tests/unittests/test_dist_word2vec.py index e43992c488d35d1b3f670e13650d420b0498eeec..9a3e92e8d775a37e0c24ee1bcc5435628d61bb91 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_word2vec.py +++ b/python/paddle/fluid/tests/unittests/test_dist_word2vec.py @@ -18,8 +18,19 @@ from test_dist_base import TestDistBase class TestDistSeResneXt2x2(TestDistBase): + def _setup_config(self): + self._sync_mode = True + + def test_se_resnext(self): + self.check_with_place("dist_word2vec.py", delta=1e-4) + + +class TestDistSeResneXt2x2Async(TestDistBase): + def _setup_config(self): + self._sync_mode = False + def test_se_resnext(self): - self.check_with_place("dist_word2vec.py", delta=1e-7) + self.check_with_place("dist_word2vec.py", delta=1) if __name__ == "__main__": diff --git a/python/paddle/fluid/transpiler/details/program_utils.py b/python/paddle/fluid/transpiler/details/program_utils.py index 640dbf4bbed58edf746456419af18c75241fa03c..4261cccdc2e9baaac21b83c22e2090854378c67d 100644 --- a/python/paddle/fluid/transpiler/details/program_utils.py +++ b/python/paddle/fluid/transpiler/details/program_utils.py @@ -39,3 +39,136 @@ def find_op_by_output_arg(block, arg_name): if arg_name in op.output_arg_names: return index return -1 + + +def get_indent_space(indent, space_num=4): + ret = "" + for i in range(0, indent * space_num): + ret += " " + + return ret + + +def variable_to_code(var): + """ + Get readable codes of fluid variable. + Args: + var: A fluid operator. + Returns: + string: The formatted string. + """ + if var.type == core.VarDesc.VarType.SELECTED_ROWS or var.type == core.VarDesc.VarType.LOD_TENSOR: + var_str = "{name} : fluid.{type}.shape{shape}.astype({dtype})".\ + format(i="{", e="}", name=var.name, type=var.type, shape=var.shape, dtype=var.dtype) + else: + var_str = "{name} : fluid.{type})".\ + format(i="{", e="}", name=var.name, type=var.type) + + if type(var) == paddle.fluid.framework.Parameter: + if var.trainable: + var_str = "trainable parameter " + var_str + else: + var_str = "parameter " + var_str + else: + var_str = "var " + var_str + + if var.persistable: + var_str = "persist " + var_str + + return var_str + + +def op_to_code(op): + """ + Get readable codes of fluid operator. + Args: + op: A fluid operator. + Returns: + string: The foramtted string. + """ + + outputs_str = "{" + for i in range(0, len(op.output_names)): + outputs_str += "{name}=".format(name=op.output_names[i]) + o = op.output(op.output_names[i]) + outputs_str += "{value}".format(value=o) + if i != len(op.output_names) - 1: + outputs_str += ", " + outputs_str += "}" + + inputs_str = "{" + for i in range(0, len(op.input_names)): + inputs_str += "{name}=".format(name=op.input_names[i]) + o = op.input(op.input_names[i]) + inputs_str += "{value}".format(value=o) + + if i != len(op.input_names) - 1: + inputs_str += ", " + inputs_str += "}" + + attrs_str = "" + for i in range(0, len(op.attr_names)): + name = op.attr_names[i] + + attr_type = op.desc.attr_type(name) + if attr_type == core.AttrType.BLOCK: + a = "{name} = block[{value}]".format( + name=name, type=attr_type, value=op.block_attr_id(name)) + attrs_str += a + continue + + if attr_type == core.AttrType.BLOCKS: + a = "{name} = blocks{value}".format( + name=name, type=attr_type, value=op.blocks_attr_ids(name)) + attrs_str += a + continue + + a = "{name} = {value}".format( + name=name, type=attr_type, value=op.desc.attr(name)) + attrs_str += a + if i != len(op.attr_names) - 1: + attrs_str += ", " + + if outputs_str != "{}": + op_str = "{outputs} = {op_type}(inputs={inputs}, {attrs})".\ + format(outputs = outputs_str, op_type=op.type, inputs=inputs_str, attrs=attrs_str) + else: + op_str = "{op_type}(inputs={inputs}, {attrs})".\ + format(op_type=op.type, inputs=inputs_str, attrs=attrs_str) + return op_str + + +def block_to_code(block, block_idx): + indent = 0 + + print("{0}{1} // block {2}".format( + get_indent_space(indent), '{', block_idx)) + + indent += 1 + # sort all vars + all_vars = sorted(block.vars.iteritems(), key=lambda x: x[0]) + for var in all_vars: + print("{}{}".format(get_indent_space(indent), variable_to_code(var[1]))) + + if len(all_vars) > 0: + print("") + + for op in block.ops: + print("{}{}".format(get_indent_space(indent), op_to_code(op))) + indent -= 1 + + print("{0}{1}".format(get_indent_space(indent), '}')) + + +def program_to_code(prog): + """ + Print readable codes of fluid program. + Args: + prog : A fluid program. + An example result like bellow: + https://github.com/PaddlePaddle/Paddle/pull/12673 + """ + block_idx = 0 + for block in prog.blocks: + block_to_code(block, block_idx) + block_idx += 1 diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index ca9521d33a6d1a2a92a30bace4632e5414e60a66..f5ffd66d861a09874a98223e754b6c7d06af717c 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -31,9 +31,10 @@ Steps to transpile pserver: """ import math -import random +import sys import numpy as np import collections +import random from .ps_dispatcher import RoundRobin, HashName, PSDispatcher from .. import core, framework @@ -181,7 +182,8 @@ class DistributeTranspiler(object): program=None, pservers="127.0.0.1:6174", trainers=1, - sync_mode=True): + sync_mode=True, + startup_program=None): """ Run the transpiler. @@ -194,13 +196,17 @@ class DistributeTranspiler(object): list. trainers (int): number of trainers in the distributed job. sync_mode (bool): Do sync training or not, default is True. + startup_program (Program|None): startup_program to transpile, + default is fluid.default_main_program(). """ if program is None: program = default_main_program() + if startup_program is None: + startup_program = default_startup_program() self.origin_program = program - self.origin_startup_program = default_startup_program().clone() + self.startup_program = startup_program + self.origin_startup_program = self.startup_program.clone() - self.startup_program = default_startup_program() self.trainer_num = trainers self.sync_mode = sync_mode self.trainer_id = trainer_id @@ -260,6 +266,10 @@ class DistributeTranspiler(object): name=framework.generate_control_dev_var_name()) grad_name_to_send_dummy_out[grad_varname] = dummy_output + # get send op_role_var, if not splited, the grad should have .trainer suffix + # if splited, grad should be the original grad var name (split_by_ref and send + # will be on the same place). ParallelExecutor + # will use op_role_var to get expected device place to run this op. program.global_block()._insert_op( index=index + 1, type="send", @@ -268,18 +278,23 @@ class DistributeTranspiler(object): attrs={ "epmap": eplist, RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE, - OP_ROLE_VAR_ATTR_NAME: - [self.grad_name_to_param_name[grad_varname], grad_varname], + OP_ROLE_VAR_ATTR_NAME: [ + self.grad_name_to_param_name[grad_varname], + splited_grad_varname + ], "sync_mode": not self.sync_mode, }) for _, var in enumerate(splited_vars): send_vars.append(var) if self.sync_mode: + send_barrier_out = program.global_block().create_var( + name=framework.generate_control_dev_var_name()) + input_deps = grad_name_to_send_dummy_out.values() program.global_block().append_op( type="send_barrier", - inputs={}, - outputs={}, + inputs={"X": input_deps}, + outputs={"Out": send_barrier_out}, attrs={ "endpoints": pserver_endpoints, RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE @@ -297,32 +312,46 @@ class DistributeTranspiler(object): self.param_grad_ep_mapping[ep]["grads"].append(send_vars[i]) # step4: Concat the parameters splits together after recv. + all_recv_outputs = [] for param_varname, splited_var in six.iteritems(self.param_var_mapping): eps = [] for var in splited_var: index = [v.name for v in recv_vars].index(var.name) eps.append(eplist[index]) - grad_send_dummy_out = grad_name_to_send_dummy_out[ - self.param_name_to_grad_name[param_varname]] + if self.sync_mode: + recv_dep_in = send_barrier_out + else: + # connect deps to send op in async mode + recv_dep_in = grad_name_to_send_dummy_out[ + self.param_name_to_grad_name[param_varname]] + all_recv_outputs.extend(splited_var) + # get recv op_role_var, if not splited, the grad should have .trainer suffix + # if splited, grad should be the original grad var name. ParallelExecutor + # will use op_role_var to get expected device place to run this op. + orig_grad_name = self.param_name_to_grad_name[param_varname] + recv_op_role_var_name = orig_grad_name + splited_trainer_grad = self.grad_var_mapping[orig_grad_name] + if len(splited_trainer_grad) == 1: + recv_op_role_var_name = splited_trainer_grad[0].name + program.global_block().append_op( type="recv", - inputs={"X": [grad_send_dummy_out]}, + inputs={"X": [recv_dep_in]}, outputs={"Out": splited_var}, attrs={ "epmap": eps, RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE, - OP_ROLE_VAR_ATTR_NAME: [ - param_varname, - self.param_name_to_grad_name[param_varname] - ], + OP_ROLE_VAR_ATTR_NAME: + [param_varname, recv_op_role_var_name], "sync_mode": not self.sync_mode }) if self.sync_mode: + # form a WAW dependency program.global_block().append_op( type="fetch_barrier", inputs={}, - outputs={}, + outputs={"Out": all_recv_outputs}, attrs={ "endpoints": pserver_endpoints, RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE @@ -359,21 +388,18 @@ class DistributeTranspiler(object): return self.origin_program - def _get_trainer_startup_program(self, - recv_vars, - eplist, - startup_program=None): + def _get_trainer_startup_program(self, recv_vars, eplist): """ Get transpiled trainer side startup program. Args: - startup_program(Program): Startup program. + recv_vars (list): Variable list to recv for current trainer_id + eplist (list): A list of strings indicating Returns: Program: trainer side startup program. """ - if startup_program is None: - startup_program = self.startup_program + startup_program = self.startup_program # FIXME(gongwb): delete not need ops. # note that: some parameter is not trainable and those ops can't be deleted. @@ -406,10 +432,12 @@ class DistributeTranspiler(object): RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE }) + fetch_barrier_out = startup_program.global_block().create_var( + name=framework.generate_control_dev_var_name()) startup_program.global_block().append_op( type="fetch_barrier", inputs={}, - outputs={}, + outputs={"Out": fetch_barrier_out}, attrs={ "endpoints": self.pserver_endpoints, RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE @@ -419,7 +447,18 @@ class DistributeTranspiler(object): #add concat ops to merge splited parameters received from parameter servers. if len(splited_var) <= 1: continue - orig_param = startup_program.global_block().vars[varname] + # NOTE: if enable memory optimization, origin vars maybe removed. + if startup_program.global_block().vars.has_key(varname): + orig_param = startup_program.global_block().vars[varname] + else: + origin_param_var = self.origin_program.global_block().vars[ + varname] + orig_param = startup_program.global_block().create_var( + name=varname, + persistable=origin_param_var.persistable, + type=origin_param_var.type, + dtype=origin_param_var.dtype, + shape=origin_param_var.shape) startup_program.global_block().append_op( type="concat", inputs={"X": splited_var}, @@ -442,7 +481,9 @@ class DistributeTranspiler(object): # NOTE: assume blocks of the same variable is not distributed # on the same pserver, only change param/grad varnames for # trainers to fetch. - + sys.stderr.write("get_pserver_program() is deprecated, call\ + get_pserver_programs() to get pserver main and startup\ + in a single call.") # step1 pserver_program = Program() pserver_program.random_seed = self.origin_program.random_seed @@ -626,32 +667,58 @@ class DistributeTranspiler(object): attrs=attrs) pserver_program._sync_with_cpp() + # save pserver program to generate pserver side startup relatively. + self.pserver_program = pserver_program return pserver_program + def get_pserver_programs(self, endpoint): + """ + Get pserver side main program and startup program for distributed training. + + Args: + endpoint (str): current pserver endpoint. + + Returns: + tuple: (main_program, startup_program), of type "Program" + """ + pserver_prog = self.get_pserver_program(endpoint) + pserver_startup = self.get_startup_program(endpoint) + return pserver_prog, pserver_startup + def get_startup_program(self, endpoint, - pserver_program, + pserver_program=None, startup_program=None): """ + **Deprecated** + Get startup program for current parameter server. Modify operator input variables if there are variables that were split to several blocks. Args: endpoint (str): current pserver endpoint. - pserver_program (Program): call get_pserver_program first and - pass the result here. - startup_program (Program): if pass None, will use - default_startup_program + pserver_program (Program): deprecated, call get_pserver_program first. + startup_program (Program): deprecated, should pass startup_program + when initalizing Returns: Program: parameter server side startup program. """ + sys.stderr.write("get_startup_program() is deprecated, call\ + get_pserver_programs() to get pserver main and startup\ + in a single call.") + if pserver_program != None: + sys.stderr.write("passing pserver_program to get_startup_program()\ + is deprecated, you can use new API get_pserver_programs() to\ + get both pserver main program and startup program.") + if startup_program != None: + sys.stderr.write("passing startup_program to get_startup_program()\ + is deprecated, use fluid.program_guard() or pass this argument\ + to transpile() call.") + s_prog = Program() - if not startup_program: - orig_s_prog = default_startup_program() - else: - orig_s_prog = startup_program + orig_s_prog = self.startup_program s_prog.random_seed = orig_s_prog.random_seed params = self.param_grad_ep_mapping[endpoint]["params"]