diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index eaef4eb16fdf505f2b5c5c464088eb65fac36154..e64b9c131eaa72828e8f9ead566c5ca26bd14d11 100755 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -43,7 +43,6 @@ list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_cloud) list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_ascend) list(APPEND MIXED_DIST_TEST_OPS test_ascend_group) list(APPEND MIXED_DIST_TEST_OPS test_fleet_launch_nproc) -list(APPEND MIXED_DIST_TEST_OPS test_fleet_api_input) list(APPEND MIXED_DIST_TEST_OPS test_fleet_base) list(APPEND MIXED_DIST_TEST_OPS test_fleet_base_2) @@ -602,7 +601,6 @@ if(WITH_DISTRIBUTE) add_subdirectory(collective) # FIXME(typhoonzero): add these tests back - list(REMOVE_ITEM DIST_TEST_OPS "test_dist_transformer") list(REMOVE_ITEM DIST_TEST_OPS "test_dist_transpiler") # TODO(sandyhouse): fix and add the ut back @@ -615,7 +613,6 @@ if(WITH_DISTRIBUTE) list(REMOVE_ITEM DIST_TEST_OPS "test_dist_ctr") list(REMOVE_ITEM DIST_TEST_OPS "test_dist_mnist_lars") list(REMOVE_ITEM DIST_TEST_OPS "test_dist_mnist_train") - list(REMOVE_ITEM DIST_TEST_OPS "test_dist_save_load") list(REMOVE_ITEM DIST_TEST_OPS "test_dist_text_classification") list(REMOVE_ITEM DIST_TEST_OPS "test_dist_train") list(REMOVE_ITEM DIST_TEST_OPS "test_dist_word2vec") diff --git a/python/paddle/fluid/tests/unittests/dist_save_load.py b/python/paddle/fluid/tests/unittests/dist_save_load.py deleted file mode 100644 index faff1ea1e78cba479aa8d2d3ba99b28176e98b9c..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/unittests/dist_save_load.py +++ /dev/null @@ -1,222 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import pickle -import sys - -import numpy as np -from dist_simnet_bow import DATA_MD5, DATA_URL, TestDistSimnetBow2x2 -from test_dist_base import RUN_STEP, runtime_main - -import paddle -import paddle.fluid as fluid -from paddle.fluid import core - - -class TestDistSaveLoad2x2(TestDistSimnetBow2x2): - def _load_persistable_vars(self, executor, dirname, program): - def _is_checkpoint_var(var): - """ - the checkpoint will not save or load all the variables. - var type is FEED_MINIBATCH/FETCH_LIST/RAW or var name ends with @GRAD are discarded. - - : param var(Variable) - """ - if ( - var.desc.type() == core.VarDesc.VarType.FEED_MINIBATCH - or var.desc.type() == core.VarDesc.VarType.FETCH_LIST - or var.desc.type() == core.VarDesc.VarType.RAW - ): - return False - # @GRAD are named for gradient variables, checkpoint will not save it. - if "@GRAD" in var.name: - return False - # .trainer_ are named for distribute train variables, checkpoint will not save it. - if ".trainer_" in var.name: - return False - - # .block is named for distribute train variables, checkpoint will not save it. - if ".block" in var.name: - return False - - if "tmp_" in var.name: - return False - - return var.persistable - - paddle.static.io.load_vars( - executor, - dirname=dirname, - main_program=program, - predicate=_is_checkpoint_var, - filename=None, - ) - - def run_pserver(self, args): - self.get_model(batch_size=2) - # NOTE: pserver should not call memory optimize - t = self.get_transpiler( - args.trainer_id, - fluid.default_main_program(), - args.endpoints, - args.trainers, - args.sync_mode, - False, - args.current_endpoint, - ) - pserver_prog = t.get_pserver_program(args.current_endpoint) - startup_prog = t.get_startup_program( - args.current_endpoint, pserver_prog - ) - - need_load = bool(int(os.getenv("LOAD", "0"))) - model_dir = os.getenv("MODEL_DIR", "") - - place = fluid.CPUPlace() - exe = fluid.Executor(place) - exe.run(startup_prog) - - if need_load and model_dir: - paddle.distributed.io.load_persistables( - exe, model_dir, pserver_prog - ) - - exe.run(pserver_prog) - - def run_trainer(self, args): - ( - test_program, - avg_cost, - train_reader, - test_reader, - batch_acc, - predict, - ) = self.get_model(batch_size=2) - - if args.update_method == "pserver": - t = self.get_transpiler( - args.trainer_id, - fluid.default_main_program(), - args.endpoints, - args.trainers, - args.sync_mode, - ) - - trainer_prog = t.get_trainer_program() - else: - trainer_prog = fluid.default_main_program() - - if args.use_cuda: - place = fluid.CUDAPlace(0) - else: - place = fluid.CPUPlace() - - startup_exe = fluid.Executor(place) - startup_exe.run(fluid.default_startup_program()) - - strategy = fluid.ExecutionStrategy() - strategy.num_threads = 1 - - build_stra = fluid.BuildStrategy() - - if args.use_reduce: - build_stra.reduce_strategy = ( - fluid.BuildStrategy.ReduceStrategy.Reduce - ) - else: - build_stra.reduce_strategy = ( - fluid.BuildStrategy.ReduceStrategy.AllReduce - ) - - exe = fluid.ParallelExecutor( - args.use_cuda, - loss_name=avg_cost.name, - exec_strategy=strategy, - build_strategy=build_stra, - ) - - feed_var_list = [ - var - for var in trainer_prog.global_block().vars.values() - if var.is_data - ] - - feeder = fluid.DataFeeder(feed_var_list, place) - reader_generator = train_reader() - - def get_data(): - origin_batch = next(reader_generator) - if args.update_method == "pserver" and args.use_reader_alloc: - new_batch = [] - for offset, item in enumerate(origin_batch): - if offset % 2 == args.trainer_id: - new_batch.append(item) - return new_batch - else: - return origin_batch - - need_save = bool(int(os.getenv("SAVE", "0"))) - model_dir = os.getenv("MODEL_DIR", "") - save_mode = os.getenv("SAVE_MODE", "") - - if save_mode == "LOCAL": - if need_save: - for _ in range(RUN_STEP): - (loss,) = exe.run( - fetch_list=[avg_cost.name], feed=feeder.feed(get_data()) - ) - if need_save and model_dir: - paddle.distributed.io.save_persistables( - startup_exe, model_dir, trainer_prog - ) - - var = np.array( - fluid.global_scope().find_var('__fc_b__').get_tensor() - ) - sys.stdout.buffer.write(pickle.dumps(np.ravel(var).tolist())) - - elif save_mode == "DIST": - skip_steps = int(os.getenv("SKIP_STEPS")) - loss = None - if need_save: - for idx in range(8): - (loss,) = exe.run( - fetch_list=[avg_cost.name], feed=feeder.feed(get_data()) - ) - if ( - need_save - and model_dir - and idx == skip_steps - and args.trainer_id == 0 - ): - paddle.distributed.io.save_persistables( - startup_exe, model_dir, trainer_prog - ) - else: - for idx in range(8): - data = get_data() - if idx <= skip_steps: - continue - (loss,) = exe.run( - fetch_list=[avg_cost.name], feed=feeder.feed(data) - ) - sys.stdout.buffer.write(pickle.dumps(loss.tolist())) - else: - raise Exception("save_mode must be LOCAL or DIST") - - -if __name__ == "__main__": - paddle.dataset.common.download(DATA_URL, 'simnet', DATA_MD5, "train") - runtime_main(TestDistSaveLoad2x2) diff --git a/python/paddle/fluid/tests/unittests/dist_transformer.py b/python/paddle/fluid/tests/unittests/dist_transformer.py deleted file mode 100644 index 52ffec74cc2b9800cb5d3b31335245202aaf5e51..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/unittests/dist_transformer.py +++ /dev/null @@ -1,2030 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import functools -import glob -import os -import random -import tarfile -import time -from functools import partial -from os.path import expanduser - -import numpy as np -from test_dist_base import RUN_STEP, TestDistRunnerBase, runtime_main - -import paddle -import paddle.fluid as fluid -import paddle.fluid.layers as layers -import paddle.nn.functional as F - -const_para_attr = fluid.ParamAttr( - initializer=paddle.nn.initializer.Constant(0.001) -) -const_bias_attr = const_para_attr - -# Fix seed for test -fluid.default_startup_program().random_seed = 1 -fluid.default_main_program().random_seed = 1 - - -# from transformer_config import ModelHyperParams, TrainTaskConfig, merge_cfg_from_list -class TrainTaskConfig: - # only support GPU currently - use_gpu = True - # the epoch number to train. - pass_num = 1 - # the number of sequences contained in a mini-batch. - # deprecated, set batch_size in args. - batch_size = 20 - # the hyper parameters for Adam optimizer. - # This static learning_rate will be multiplied to the LearningRateScheduler - # derived learning rate the to get the final learning rate. - learning_rate = 1 - beta1 = 0.9 - beta2 = 0.98 - eps = 1e-9 - # the parameters for learning rate scheduling. - warmup_steps = 4000 - # the weight used to mix up the ground-truth distribution and the fixed - # uniform distribution in label smoothing when training. - # Set this as zero if label smoothing is not wanted. - label_smooth_eps = 0.1 - # the directory for saving trained models. - model_dir = "trained_models" - # the directory for saving checkpoints. - ckpt_dir = "trained_ckpts" - # the directory for loading checkpoint. - # If provided, continue training from the checkpoint. - ckpt_path = None - # the parameter to initialize the learning rate scheduler. - # It should be provided if use checkpoints, since the checkpoint doesn't - # include the training step counter currently. - start_step = 0 - - check_acc = True - - data_path = expanduser("~") + ( - "/.cache/paddle/dataset/test_dist_transformer/" - ) - src_vocab_fpath = data_path + "vocab.bpe.32000" - trg_vocab_fpath = data_path + "vocab.bpe.32000" - train_file_pattern = data_path + "train.tok.clean.bpe.32000.en-de" - val_file_pattern = data_path + "newstest2013.tok.bpe.32000.en-de.cut" - pool_size = 2000 - sort_type = None - local = True - shuffle = False - shuffle_batch = False - special_token = ['', '', ''] - token_delimiter = ' ' - use_token_batch = False - - -class InferTaskConfig: - use_gpu = True - # the number of examples in one run for sequence generation. - batch_size = 10 - # the parameters for beam search. - beam_size = 5 - max_out_len = 256 - # the number of decoded sentences to output. - n_best = 1 - # the flags indicating whether to output the special tokens. - output_bos = False - output_eos = False - output_unk = True - # the directory for loading the trained model. - model_path = "trained_models/pass_1.infer.model" - - -class ModelHyperParams: - # These following five vocabularies related configurations will be set - # automatically according to the passed vocabulary path and special tokens. - # size of source word dictionary. - src_vocab_size = 10000 - # size of target word dictionay - trg_vocab_size = 10000 - # index for token - bos_idx = 0 - # index for token - eos_idx = 1 - # index for token - unk_idx = 2 - # max length of sequences deciding the size of position encoding table. - # Start from 1 and count start and end tokens in. - max_length = 256 - # the dimension for word embeddings, which is also the last dimension of - # the input and output of multi-head attention, position-wise feed-forward - # networks, encoder and decoder. - d_model = 512 - # size of the hidden layer in position-wise feed-forward networks. - d_inner_hid = 2048 - # the dimension that keys are projected to for dot-product attention. - d_key = 64 - # the dimension that values are projected to for dot-product attention. - d_value = 64 - # number of head used in multi-head attention. - n_head = 8 - # number of sub-layers to be stacked in the encoder and decoder. - n_layer = 6 - # dropout rate used by all dropout layers. - dropout = 0.0 # no random - # random seed used in dropout for CE. - dropout_seed = None - # the flag indicating whether to share embedding and softmax weights. - # vocabularies in source and target should be same for weight sharing. - weight_sharing = True - - -def merge_cfg_from_list(cfg_list, g_cfgs): - """ - Set the above global configurations using the cfg_list. - """ - assert len(cfg_list) % 2 == 0 - for key, value in zip(cfg_list[0::2], cfg_list[1::2]): - for g_cfg in g_cfgs: - if hasattr(g_cfg, key): - try: - value = eval(value) - except Exception: # for file path - pass - setattr(g_cfg, key, value) - break - - -# The placeholder for batch_size in compile time. Must be -1 currently to be -# consistent with some ops' infer-shape output in compile time, such as the -# sequence_expand op used in beamsearch decoder. -batch_size = -1 -# The placeholder for squence length in compile time. -seq_len = ModelHyperParams.max_length -# Here list the data shapes and data types of all inputs. -# The shapes here act as placeholder and are set to pass the infer-shape in -# compile time. -input_descs = { - # The actual data shape of src_word is: - # [batch_size * max_src_len_in_batch, 1] - "src_word": [(batch_size, seq_len, 1), "int64", 2], - # The actual data shape of src_pos is: - # [batch_size * max_src_len_in_batch, 1] - "src_pos": [(batch_size, seq_len, 1), "int64"], - # This input is used to remove attention weights on paddings in the - # encoder. - # The actual data shape of src_slf_attn_bias is: - # [batch_size, n_head, max_src_len_in_batch, max_src_len_in_batch] - "src_slf_attn_bias": [ - (batch_size, ModelHyperParams.n_head, seq_len, seq_len), - "float32", - ], - # The actual data shape of trg_word is: - # [batch_size * max_trg_len_in_batch, 1] - "trg_word": [ - (batch_size, seq_len, 1), - "int64", - 2, - ], # lod_level is only used in fast decoder. - # The actual data shape of trg_pos is: - # [batch_size * max_trg_len_in_batch, 1] - "trg_pos": [(batch_size, seq_len, 1), "int64"], - # This input is used to remove attention weights on paddings and - # subsequent words in the decoder. - # The actual data shape of trg_slf_attn_bias is: - # [batch_size, n_head, max_trg_len_in_batch, max_trg_len_in_batch] - "trg_slf_attn_bias": [ - (batch_size, ModelHyperParams.n_head, seq_len, seq_len), - "float32", - ], - # This input is used to remove attention weights on paddings of the source - # input in the encoder-decoder attention. - # The actual data shape of trg_src_attn_bias is: - # [batch_size, n_head, max_trg_len_in_batch, max_src_len_in_batch] - "trg_src_attn_bias": [ - (batch_size, ModelHyperParams.n_head, seq_len, seq_len), - "float32", - ], - # This input is used in independent decoder program for inference. - # The actual data shape of enc_output is: - # [batch_size, max_src_len_in_batch, d_model] - "enc_output": [(batch_size, seq_len, ModelHyperParams.d_model), "float32"], - # The actual data shape of label_word is: - # [batch_size * max_trg_len_in_batch, 1] - "lbl_word": [(batch_size * seq_len, 1), "int64"], - # This input is used to mask out the loss of padding tokens. - # The actual data shape of label_weight is: - # [batch_size * max_trg_len_in_batch, 1] - "lbl_weight": [(batch_size * seq_len, 1), "float32"], - # These inputs are used to change the shape tensor in beam-search decoder. - "trg_slf_attn_pre_softmax_shape_delta": [(2,), "int32"], - "trg_slf_attn_post_softmax_shape_delta": [(4,), "int32"], - "init_score": [(batch_size, 1), "float32"], -} - -# Names of word embedding table which might be reused for weight sharing. -word_emb_param_names = ( - "src_word_emb_table", - "trg_word_emb_table", -) -# Names of position encoding table which will be initialized externally. -pos_enc_param_names = ( - "src_pos_enc_table", - "trg_pos_enc_table", -) -# separated inputs for different usages. -encoder_data_input_fields = ( - "src_word", - "src_pos", - "src_slf_attn_bias", -) -decoder_data_input_fields = ( - "trg_word", - "trg_pos", - "trg_slf_attn_bias", - "trg_src_attn_bias", - "enc_output", -) -label_data_input_fields = ( - "lbl_word", - "lbl_weight", -) -# In fast decoder, trg_pos (only containing the current time step) is generated -# by ops and trg_slf_attn_bias is not needed. -fast_decoder_data_input_fields = ( - "trg_word", - "init_score", - "trg_src_attn_bias", -) - -# fast_decoder_util_input_fields = ( -# "trg_slf_attn_pre_softmax_shape_delta", -# "trg_slf_attn_post_softmax_shape_delta", ) - - -# from optim import LearningRateScheduler -class LearningRateScheduler: - """ - Wrapper for learning rate scheduling as described in the Transformer paper. - LearningRateScheduler adapts the learning rate externally and the adapted - learning rate will be fed into the main_program as input data. - """ - - def __init__( - self, - d_model, - warmup_steps, - learning_rate=0.001, - current_steps=0, - name="learning_rate", - ): - self.current_steps = current_steps - self.warmup_steps = warmup_steps - self.d_model = d_model - self.static_lr = learning_rate - self.learning_rate = layers.create_global_var( - name=name, - shape=[1], - value=float(learning_rate), - dtype="float32", - persistable=True, - ) - - def update_learning_rate(self): - self.current_steps += 1 - lr_value = ( - np.power(self.d_model, -0.5) - * np.min( - [ - np.power(self.current_steps, -0.5), - np.power(self.warmup_steps, -1.5) * self.current_steps, - ] - ) - * self.static_lr - ) - return np.array([lr_value], dtype="float32") - - -# from transformer_train import train_loop -def pad_batch_data( - insts, - pad_idx, - n_head, - is_target=False, - is_label=False, - return_attn_bias=True, - return_max_len=True, - return_num_token=False, -): - """ - Pad the instances to the max sequence length in batch, and generate the - corresponding position data and attention bias. - """ - return_list = [] - max_len = max(len(inst) for inst in insts) - num_token = ( - functools.reduce(lambda x, y: x + y, [len(inst) for inst in insts]) - if return_num_token - else 0 - ) - # Any token included in dict can be used to pad, since the paddings' loss - # will be masked out by weights and make no effect on parameter gradients. - inst_data = np.array( - [inst + [pad_idx] * (max_len - len(inst)) for inst in insts] - ) - return_list += [inst_data.astype("int64").reshape([-1, 1])] - if is_label: # label weight - inst_weight = np.array( - [ - [1.0] * len(inst) + [0.0] * (max_len - len(inst)) - for inst in insts - ] - ) - return_list += [inst_weight.astype("float32").reshape([-1, 1])] - else: # position data - inst_pos = np.array( - [ - list(range(1, len(inst) + 1)) + [0] * (max_len - len(inst)) - for inst in insts - ] - ) - return_list += [inst_pos.astype("int64").reshape([-1, 1])] - if return_attn_bias: - if is_target: - # This is used to avoid attention on paddings and subsequent - # words. - slf_attn_bias_data = np.ones((inst_data.shape[0], max_len, max_len)) - slf_attn_bias_data = np.triu(slf_attn_bias_data, 1).reshape( - [-1, 1, max_len, max_len] - ) - slf_attn_bias_data = np.tile( - slf_attn_bias_data, [1, n_head, 1, 1] - ) * [-1e9] - else: - # This is used to avoid attention on paddings. - slf_attn_bias_data = np.array( - [ - [0] * len(inst) + [-1e9] * (max_len - len(inst)) - for inst in insts - ] - ) - slf_attn_bias_data = np.tile( - slf_attn_bias_data.reshape([-1, 1, 1, max_len]), - [1, n_head, max_len, 1], - ) - return_list += [slf_attn_bias_data.astype("float32")] - if return_max_len: - return_list += [max_len] - if return_num_token: - return_list += [num_token] - return return_list if len(return_list) > 1 else return_list[0] - - -def prepare_batch_input( - insts, data_input_names, src_pad_idx, trg_pad_idx, n_head, d_model -): - """ - Put all padded data needed by training into a dict. - """ - src_word, src_pos, src_slf_attn_bias, src_max_len = pad_batch_data( - [inst[0] for inst in insts], src_pad_idx, n_head, is_target=False - ) - src_word = src_word.reshape(-1, src_max_len, 1) - src_pos = src_pos.reshape(-1, src_max_len, 1) - trg_word, trg_pos, trg_slf_attn_bias, trg_max_len = pad_batch_data( - [inst[1] for inst in insts], trg_pad_idx, n_head, is_target=True - ) - trg_word = trg_word.reshape(-1, trg_max_len, 1) - trg_pos = trg_pos.reshape(-1, trg_max_len, 1) - - trg_src_attn_bias = np.tile( - src_slf_attn_bias[:, :, ::src_max_len, :], [1, 1, trg_max_len, 1] - ).astype("float32") - - lbl_word, lbl_weight, num_token = pad_batch_data( - [inst[2] for inst in insts], - trg_pad_idx, - n_head, - is_target=False, - is_label=True, - return_attn_bias=False, - return_max_len=False, - return_num_token=True, - ) - - data_input_dict = dict( - list( - zip( - data_input_names, - [ - src_word, - src_pos, - src_slf_attn_bias, - trg_word, - trg_pos, - trg_slf_attn_bias, - trg_src_attn_bias, - lbl_word, - lbl_weight, - ], - ) - ) - ) - return data_input_dict, np.asarray([num_token], dtype="float32") - - -def read_multiple(reader, count, clip_last=True): - """ - Stack data from reader for multi-devices. - """ - - def __impl__(): - res = [] - for item in reader(): - res.append(item) - if len(res) == count: - yield res - res = [] - if len(res) == count: - yield res - elif not clip_last: - data = [] - for item in res: - data += item - if len(data) > count: - inst_num_per_part = len(data) // count - yield [ - data[inst_num_per_part * i : inst_num_per_part * (i + 1)] - for i in range(count) - ] - - return __impl__ - - -def split_data(data, num_part): - """ - Split data for each device. - """ - if len(data) == num_part: - return data - data = data[0] - inst_num_per_part = len(data) // num_part - return [ - data[inst_num_per_part * i : inst_num_per_part * (i + 1)] - for i in range(num_part) - ] - - -def test_context( - test_program, - avg_cost, - train_exe, - dev_count, - data_input_names, - sum_cost, - token_num, -): - val_data = DataReader( - src_vocab_fpath=TrainTaskConfig.src_vocab_fpath, - trg_vocab_fpath=TrainTaskConfig.trg_vocab_fpath, - fpattern=TrainTaskConfig.val_file_pattern, - token_delimiter=TrainTaskConfig.token_delimiter, - use_token_batch=TrainTaskConfig.use_token_batch, - batch_size=TrainTaskConfig.batch_size - * (1 if TrainTaskConfig.use_token_batch else dev_count), - pool_size=TrainTaskConfig.pool_size, - sort_type=TrainTaskConfig.sort_type, - start_mark=TrainTaskConfig.special_token[0], - end_mark=TrainTaskConfig.special_token[1], - unk_mark=TrainTaskConfig.special_token[2], - # count start and end tokens out - max_length=ModelHyperParams.max_length - 2, - clip_last_batch=False, - shuffle=False, - shuffle_batch=False, - ) - - build_strategy = fluid.BuildStrategy() - - strategy = fluid.ExecutionStrategy() - strategy.num_threads = 1 - - test_exe = fluid.ParallelExecutor( - use_cuda=TrainTaskConfig.use_gpu, - main_program=test_program, - share_vars_from=train_exe, - build_strategy=build_strategy, - exec_strategy=strategy, - ) - - def test(exe=test_exe): - test_total_cost = 0 - test_total_token = 0 - test_data = read_multiple( - reader=val_data.batch_generator, - count=dev_count if TrainTaskConfig.use_token_batch else 1, - ) - for batch_id, data in enumerate(test_data()): - feed_list = [] - for place_id, data_buffer in enumerate( - split_data(data, num_part=dev_count) - ): - data_input_dict, _ = prepare_batch_input( - data_buffer, - data_input_names, - ModelHyperParams.eos_idx, - ModelHyperParams.eos_idx, - ModelHyperParams.n_head, - ModelHyperParams.d_model, - ) - feed_list.append(data_input_dict) - - outs = exe.run( - feed=feed_list, fetch_list=[sum_cost.name, token_num.name] - ) - sum_cost_val, token_num_val = np.array(outs[0]), np.array(outs[1]) - test_total_cost += sum_cost_val.sum() - test_total_token += token_num_val.sum() - test_avg_cost = test_total_cost / test_total_token - test_ppl = np.exp([min(test_avg_cost, 100)]) - return test_avg_cost, test_ppl - - return test - - -def train_loop( - exe, - train_progm, - dev_count, - sum_cost, - avg_cost, - lr_scheduler, - token_num, - predict, - test_program, -): - # Initialize the parameters. - if TrainTaskConfig.ckpt_path: - lr_scheduler.current_steps = TrainTaskConfig.start_step - else: - exe.run(fluid.framework.default_startup_program()) - - train_data = DataReader( - src_vocab_fpath=TrainTaskConfig.src_vocab_fpath, - trg_vocab_fpath=TrainTaskConfig.trg_vocab_fpath, - fpattern=TrainTaskConfig.train_file_pattern, - token_delimiter=TrainTaskConfig.token_delimiter, - use_token_batch=TrainTaskConfig.use_token_batch, - batch_size=TrainTaskConfig.batch_size - * (1 if TrainTaskConfig.use_token_batch else dev_count), - pool_size=TrainTaskConfig.pool_size, - sort_type=TrainTaskConfig.sort_type, - shuffle=TrainTaskConfig.shuffle, - shuffle_batch=TrainTaskConfig.shuffle_batch, - start_mark=TrainTaskConfig.special_token[0], - end_mark=TrainTaskConfig.special_token[1], - unk_mark=TrainTaskConfig.special_token[2], - # count start and end tokens out - max_length=ModelHyperParams.max_length - 2, - clip_last_batch=False, - ) - train_data = read_multiple( - reader=train_data.batch_generator, - count=dev_count if TrainTaskConfig.use_token_batch else 1, - ) - - build_strategy = fluid.BuildStrategy() - # Since the token number differs among devices, customize gradient scale to - # use token average cost among multi-devices. and the gradient scale is - # `1 / token_number` for average cost. - build_strategy.gradient_scale_strategy = ( - fluid.BuildStrategy.GradientScaleStrategy.Customized - ) - - strategy = fluid.ExecutionStrategy() - strategy.num_threads = 1 - - train_exe = fluid.ParallelExecutor( - use_cuda=TrainTaskConfig.use_gpu, - loss_name=sum_cost.name, - main_program=train_progm, - build_strategy=build_strategy, - exec_strategy=strategy, - ) - - data_input_names = ( - encoder_data_input_fields - + decoder_data_input_fields[:-1] - + label_data_input_fields - ) - - if TrainTaskConfig.val_file_pattern is not None: - test = test_context( - test_program, - avg_cost, - train_exe, - dev_count, - data_input_names, - sum_cost, - token_num, - ) - - # the best cross-entropy value with label smoothing - loss_normalizer = -( - (1.0 - TrainTaskConfig.label_smooth_eps) - * np.log((1.0 - TrainTaskConfig.label_smooth_eps)) - + TrainTaskConfig.label_smooth_eps - * np.log( - TrainTaskConfig.label_smooth_eps - / (ModelHyperParams.trg_vocab_size - 1) - + 1e-20 - ) - ) - init = False - for pass_id in range(TrainTaskConfig.pass_num): - pass_start_time = time.time() - for batch_id, data in enumerate(train_data()): - if batch_id >= RUN_STEP: - break - - feed_list = [] - total_num_token = 0 - - if TrainTaskConfig.local: - lr_rate = lr_scheduler.update_learning_rate() - - for place_id, data_buffer in enumerate( - split_data(data, num_part=dev_count) - ): - data_input_dict, num_token = prepare_batch_input( - data_buffer, - data_input_names, - ModelHyperParams.eos_idx, - ModelHyperParams.eos_idx, - ModelHyperParams.n_head, - ModelHyperParams.d_model, - ) - total_num_token += num_token - feed_kv_pairs = list(data_input_dict.items()) - if TrainTaskConfig.local: - feed_kv_pairs += list( - {lr_scheduler.learning_rate.name: lr_rate}.items() - ) - feed_list.append(dict(feed_kv_pairs)) - - if not init: - for pos_enc_param_name in pos_enc_param_names: - pos_enc = position_encoding_init( - ModelHyperParams.max_length + 1, - ModelHyperParams.d_model, - ) - feed_list[place_id][pos_enc_param_name] = pos_enc - - if not TrainTaskConfig.check_acc: - for feed_dict in feed_list: - feed_dict[sum_cost.name + "@GRAD"] = 1.0 / total_num_token - else: - b = 100 * TrainTaskConfig.batch_size - a = np.asarray([b], dtype="float32") - for feed_dict in feed_list: - feed_dict[sum_cost.name + "@GRAD"] = 1.0 / a - - outs = train_exe.run( - fetch_list=[sum_cost.name, token_num.name], feed=feed_list - ) - - sum_cost_val, token_num_val = np.array(outs[0]), np.array(outs[1]) - total_sum_cost = sum_cost_val.sum() - total_token_num = token_num_val.sum() - total_avg_cost = total_sum_cost / total_token_num - - init = True - - # Validate and save the model for inference. - if TrainTaskConfig.val_file_pattern is not None: - val_avg_cost, val_ppl = test() - print("[%f]" % val_avg_cost) - else: - assert False - - -# import transformer_reader as reader -class SortType: - GLOBAL = 'global' - POOL = 'pool' - NONE = "none" - - -class Converter: - def __init__(self, vocab, beg, end, unk, delimiter): - self._vocab = vocab - self._beg = beg - self._end = end - self._unk = unk - self._delimiter = delimiter - - def __call__(self, sentence): - return ( - [self._beg] - + [ - self._vocab.get(w, self._unk) - for w in sentence.split(self._delimiter) - ] - + [self._end] - ) - - -class ComposedConverter: - def __init__(self, converters): - self._converters = converters - - def __call__(self, parallel_sentence): - return [ - self._converters[i](parallel_sentence[i]) - for i in range(len(self._converters)) - ] - - -class SentenceBatchCreator: - def __init__(self, batch_size): - self.batch = [] - self._batch_size = batch_size - - def append(self, info): - self.batch.append(info) - if len(self.batch) == self._batch_size: - tmp = self.batch - self.batch = [] - return tmp - - -class TokenBatchCreator: - def __init__(self, batch_size): - self.batch = [] - self.max_len = -1 - self._batch_size = batch_size - - def append(self, info): - cur_len = info.max_len - max_len = max(self.max_len, cur_len) - if max_len * (len(self.batch) + 1) > self._batch_size: - result = self.batch - self.batch = [info] - self.max_len = cur_len - return result - else: - self.max_len = max_len - self.batch.append(info) - - -class SampleInfo: - def __init__(self, i, max_len, min_len): - self.i = i - self.min_len = min_len - self.max_len = max_len - - -class MinMaxFilter: - def __init__(self, max_len, min_len, underlying_creator): - self._min_len = min_len - self._max_len = max_len - self._creator = underlying_creator - - def append(self, info): - if info.max_len > self._max_len or info.min_len < self._min_len: - return - else: - return self._creator.append(info) - - @property - def batch(self): - return self._creator.batch - - -class DataReader: - """ - The data reader loads all data from files and produces batches of data - in the way corresponding to settings. - - An example of returning a generator producing data batches whose data - is shuffled in each pass and sorted in each pool: - - ``` - train_data = DataReader( - src_vocab_fpath='data/src_vocab_file', - trg_vocab_fpath='data/trg_vocab_file', - fpattern='data/part-*', - use_token_batch=True, - batch_size=2000, - pool_size=10000, - sort_type=SortType.POOL, - shuffle=True, - shuffle_batch=True, - start_mark='', - end_mark='', - unk_mark='', - clip_last_batch=False).batch_generator - ``` - - :param src_vocab_fpath: The path of vocabulary file of source language. - :type src_vocab_fpath: basestring - :param trg_vocab_fpath: The path of vocabulary file of target language. - :type trg_vocab_fpath: basestring - :param fpattern: The pattern to match data files. - :type fpattern: basestring - :param batch_size: The number of sequences contained in a mini-batch. - or the maximum number of tokens (include paddings) contained in a - mini-batch. - :type batch_size: int - :param pool_size: The size of pool buffer. - :type pool_size: int - :param sort_type: The grain to sort by length: 'global' for all - instances; 'pool' for instances in pool; 'none' for no sort. - :type sort_type: basestring - :param clip_last_batch: Whether to clip the last uncompleted batch. - :type clip_last_batch: bool - :param tar_fname: The data file in tar if fpattern matches a tar file. - :type tar_fname: basestring - :param min_length: The minimum length used to filt sequences. - :type min_length: int - :param max_length: The maximum length used to filt sequences. - :type max_length: int - :param shuffle: Whether to shuffle all instances. - :type shuffle: bool - :param shuffle_batch: Whether to shuffle the generated batches. - :type shuffle_batch: bool - :param use_token_batch: Whether to produce batch data according to - token number. - :type use_token_batch: bool - :param field_delimiter: The delimiter used to split source and target in - each line of data file. - :type field_delimiter: basestring - :param token_delimiter: The delimiter used to split tokens in source or - target sentences. - :type token_delimiter: basestring - :param start_mark: The token representing for the beginning of - sentences in dictionary. - :type start_mark: basestring - :param end_mark: The token representing for the end of sentences - in dictionary. - :type end_mark: basestring - :param unk_mark: The token representing for unknown word in dictionary. - :type unk_mark: basestring - :param seed: The seed for random. - :type seed: int - """ - - def __init__( - self, - src_vocab_fpath, - trg_vocab_fpath, - fpattern, - batch_size, - pool_size, - sort_type=SortType.GLOBAL, - clip_last_batch=True, - tar_fname=None, - min_length=0, - max_length=100, - shuffle=True, - shuffle_batch=False, - use_token_batch=False, - field_delimiter="\t", - token_delimiter=" ", - start_mark="", - end_mark="", - unk_mark="", - seed=0, - ): - self._src_vocab = self.load_dict(src_vocab_fpath) - self._only_src = True - if trg_vocab_fpath is not None: - self._trg_vocab = self.load_dict(trg_vocab_fpath) - self._only_src = False - self._pool_size = pool_size - self._batch_size = batch_size - self._use_token_batch = use_token_batch - self._sort_type = sort_type - self._clip_last_batch = clip_last_batch - self._shuffle = shuffle - self._shuffle_batch = shuffle_batch - self._min_length = min_length - self._max_length = max_length - self._field_delimiter = field_delimiter - self._token_delimiter = token_delimiter - self.load_src_trg_ids( - end_mark, fpattern, start_mark, tar_fname, unk_mark - ) - self._random = random.Random(x=seed) - - def load_src_trg_ids( - self, end_mark, fpattern, start_mark, tar_fname, unk_mark - ): - converters = [ - Converter( - vocab=self._src_vocab, - beg=self._src_vocab[start_mark], - end=self._src_vocab[end_mark], - unk=self._src_vocab[unk_mark], - delimiter=self._token_delimiter, - ) - ] - if not self._only_src: - converters.append( - Converter( - vocab=self._trg_vocab, - beg=self._trg_vocab[start_mark], - end=self._trg_vocab[end_mark], - unk=self._trg_vocab[unk_mark], - delimiter=self._token_delimiter, - ) - ) - - converters = ComposedConverter(converters) - - self._src_seq_ids = [] - self._trg_seq_ids = None if self._only_src else [] - self._sample_infos = [] - - for i, line in enumerate(self._load_lines(fpattern, tar_fname)): - src_trg_ids = converters(line) - self._src_seq_ids.append(src_trg_ids[0]) - lens = [len(src_trg_ids[0])] - if not self._only_src: - self._trg_seq_ids.append(src_trg_ids[1]) - lens.append(len(src_trg_ids[1])) - self._sample_infos.append(SampleInfo(i, max(lens), min(lens))) - - def _load_lines(self, fpattern, tar_fname): - fpaths = glob.glob(fpattern) - - if len(fpaths) == 1 and tarfile.is_tarfile(fpaths[0]): - if tar_fname is None: - raise Exception("If tar file provided, please set tar_fname.") - - f = tarfile.open(fpaths[0], "r") - for line in f.extractfile(tar_fname): - line = line.decode() - fields = line.strip("\n").split(self._field_delimiter) - if (not self._only_src and len(fields) == 2) or ( - self._only_src and len(fields) == 1 - ): - yield fields - else: - for fpath in fpaths: - if not os.path.isfile(fpath): - raise IOError("Invalid file: %s" % fpath) - - with open(fpath, "rb") as f: - for line in f: - line = line.decode() - fields = line.strip("\n").split(self._field_delimiter) - if (not self._only_src and len(fields) == 2) or ( - self._only_src and len(fields) == 1 - ): - yield fields - - @staticmethod - def load_dict(dict_path, reverse=False): - word_dict = {} - with open(dict_path, "rb") as fdict: - for idx, line in enumerate(fdict): - line = line.decode() - if reverse: - word_dict[idx] = line.strip("\n") - else: - word_dict[line.strip("\n")] = idx - return word_dict - - def batch_generator(self): - # global sort or global shuffle - if self._sort_type == SortType.GLOBAL: - infos = sorted( - self._sample_infos, key=lambda x: x.max_len, reverse=True - ) - else: - if self._shuffle: - infos = self._sample_infos - self._random.shuffle(infos) - else: - infos = self._sample_infos - - if self._sort_type == SortType.POOL: - for i in range(0, len(infos), self._pool_size): - infos[i : i + self._pool_size] = sorted( - infos[i : i + self._pool_size], key=lambda x: x.max_len - ) - - # concat batch - batches = [] - batch_creator = ( - TokenBatchCreator(self._batch_size) - if self._use_token_batch - else SentenceBatchCreator(self._batch_size) - ) - batch_creator = MinMaxFilter( - self._max_length, self._min_length, batch_creator - ) - - for info in infos: - batch = batch_creator.append(info) - if batch is not None: - batches.append(batch) - - if not self._clip_last_batch and len(batch_creator.batch) != 0: - batches.append(batch_creator.batch) - - if self._shuffle_batch: - self._random.shuffle(batches) - - for batch in batches: - batch_ids = [info.i for info in batch] - - if self._only_src: - yield [[self._src_seq_ids[idx]] for idx in batch_ids] - else: - yield [ - ( - self._src_seq_ids[idx], - self._trg_seq_ids[idx][:-1], - self._trg_seq_ids[idx][1:], - ) - for idx in batch_ids - ] - - -# from transformer_model import transformer -def position_encoding_init(n_position, d_pos_vec): - """ - Generate the initial values for the sinusoid position encoding table. - """ - position_enc = np.array( - [ - [ - pos / np.power(10000, 2 * (j // 2) / d_pos_vec) - for j in range(d_pos_vec) - ] - if pos != 0 - else np.zeros(d_pos_vec) - for pos in range(n_position) - ] - ) - position_enc[1:, 0::2] = np.sin(position_enc[1:, 0::2]) # dim 2i - position_enc[1:, 1::2] = np.cos(position_enc[1:, 1::2]) # dim 2i+1 - return position_enc.astype("float32") - - -def multi_head_attention( - queries, - keys, - values, - attn_bias, - d_key, - d_value, - d_model, - n_head=1, - dropout_rate=0.0, - cache=None, -): - """ - Multi-Head Attention. Note that attn_bias is added to the logit before - computing softmax activiation to mask certain selected positions so that - they will not considered in attention weights. - """ - if not (len(queries.shape) == len(keys.shape) == len(values.shape) == 3): - raise ValueError( - "Inputs: queries, keys and values should all be 3-D tensors." - ) - - def __compute_qkv(queries, keys, values, n_head, d_key, d_value): - """ - Add linear projection to queries, keys, and values. - """ - q = paddle.static.nn.fc( - x=queries, - size=d_key * n_head, - num_flatten_dims=2, - weight_attr=const_para_attr, - bias_attr=const_bias_attr, - ) - k = paddle.static.nn.fc( - x=keys, - size=d_key * n_head, - num_flatten_dims=2, - weight_attr=const_para_attr, - bias_attr=const_bias_attr, - ) - v = paddle.static.nn.fc( - x=values, - size=d_value * n_head, - num_flatten_dims=2, - weight_attr=const_para_attr, - bias_attr=const_bias_attr, - ) - return q, k, v - - def __split_heads(x, n_head): - """ - Reshape the last dimension of input tensor x so that it becomes two - dimensions and then transpose. Specifically, input a tensor with shape - [bs, max_sequence_length, n_head * hidden_dim] then output a tensor - with shape [bs, n_head, max_sequence_length, hidden_dim]. - """ - if n_head == 1: - return x - - hidden_size = x.shape[-1] - # The value 0 in shape attr means copying the corresponding dimension - # size of the input as the output dimension size. - reshaped = paddle.reshape( - x=x, shape=[0, 0, n_head, hidden_size // n_head] - ) - - # permute the dimensions into: - # [batch_size, n_head, max_sequence_len, hidden_size_per_head] - return paddle.transpose(x=reshaped, perm=[0, 2, 1, 3]) - - def __combine_heads(x): - """ - Transpose and then reshape the last two dimensions of input tensor x - so that it becomes one dimension, which is reverse to __split_heads. - """ - if len(x.shape) == 3: - return x - if len(x.shape) != 4: - raise ValueError("Input(x) should be a 4-D Tensor.") - - trans_x = paddle.transpose(x, perm=[0, 2, 1, 3]) - # The value 0 in shape attr means copying the corresponding dimension - # size of the input as the output dimension size. - return paddle.reshape( - x=trans_x, - shape=list(map(int, [0, 0, trans_x.shape[2] * trans_x.shape[3]])), - ) - - def scaled_dot_product_attention(q, k, v, attn_bias, d_model, dropout_rate): - """ - Scaled Dot-Product Attention - """ - scaled_q = paddle.scale(x=q, scale=d_model**-0.5) - product = layers.matmul(x=scaled_q, y=k, transpose_y=True) - if attn_bias: - product += attn_bias - weights = paddle.nn.functional.softmax(product) - if dropout_rate: - weights = layers.dropout( - weights, - dropout_prob=dropout_rate, - seed=ModelHyperParams.dropout_seed, - is_test=False, - ) - out = layers.matmul(weights, v) - return out - - q, k, v = __compute_qkv(queries, keys, values, n_head, d_key, d_value) - - if cache is not None: # use cache and concat time steps - k = cache["k"] = paddle.concat([cache["k"], k], axis=1) - v = cache["v"] = paddle.concat([cache["v"], v], axis=1) - - q = __split_heads(q, n_head) - k = __split_heads(k, n_head) - v = __split_heads(v, n_head) - - ctx_multiheads = scaled_dot_product_attention( - q, k, v, attn_bias, d_model, dropout_rate - ) - - out = __combine_heads(ctx_multiheads) - - # Project back to the model size. - proj_out = paddle.static.nn.fc( - x=out, - size=d_model, - num_flatten_dims=2, - weight_attr=const_para_attr, - bias_attr=const_bias_attr, - ) - return proj_out - - -def positionwise_feed_forward(x, d_inner_hid, d_hid): - """ - Position-wise Feed-Forward Networks. - This module consists of two linear transformations with a ReLU activation - in between, which is applied to each position separately and identically. - """ - hidden = paddle.static.nn.fc( - x=x, - size=d_inner_hid, - num_flatten_dims=2, - activation="relu", - weight_attr=const_para_attr, - bias_attr=const_bias_attr, - ) - out = paddle.static.nn.fc( - x=hidden, - size=d_hid, - num_flatten_dims=2, - weight_attr=const_para_attr, - bias_attr=const_bias_attr, - ) - return out - - -def pre_post_process_layer(prev_out, out, process_cmd, dropout_rate=0.0): - """ - Add residual connection, layer normalization and droput to the out tensor - optionally according to the value of process_cmd. - This will be used before or after multi-head attention and position-wise - feed-forward networks. - """ - for cmd in process_cmd: - if cmd == "a": # add residual connection - out = out + prev_out if prev_out else out - elif cmd == "n": # add layer normalization - out = layers.layer_norm( - out, - begin_norm_axis=len(out.shape) - 1, - param_attr=paddle.nn.initializer.Constant(1.0), - bias_attr=paddle.nn.initializer.Constant(0.0), - ) - elif cmd == "d": # add dropout - if dropout_rate: - out = layers.dropout( - out, - dropout_prob=dropout_rate, - seed=ModelHyperParams.dropout_seed, - is_test=False, - ) - return out - - -pre_process_layer = partial(pre_post_process_layer, None) -post_process_layer = pre_post_process_layer - - -def prepare_encoder( - src_word, - src_pos, - src_vocab_size, - src_emb_dim, - src_max_len, - dropout_rate=0.0, - word_emb_param_name=None, - pos_enc_param_name=None, -): - """Add word embeddings and position encodings. - The output tensor has a shape of: - [batch_size, max_src_length_in_batch, d_model]. - This module is used at the bottom of the encoder stacks. - """ - if TrainTaskConfig.check_acc: - src_word_emb = layers.embedding( - src_word, - size=[src_vocab_size, src_emb_dim], - param_attr=fluid.ParamAttr( - name=word_emb_param_name, - initializer=paddle.nn.initializer.Constant(0.001), - ), - ) - else: - src_word_emb = layers.embedding( - src_word, - size=[src_vocab_size, src_emb_dim], - param_attr=fluid.ParamAttr( - name=word_emb_param_name, - initializer=paddle.nn.initializer.Normal( - 0.0, src_emb_dim**-0.5 - ), - ), - ) - - src_word_emb = paddle.scale(x=src_word_emb, scale=src_emb_dim**0.5) - src_pos_enc = layers.embedding( - src_pos, - size=[src_max_len, src_emb_dim], - param_attr=fluid.ParamAttr( - name=pos_enc_param_name, - trainable=False, - initializer=paddle.nn.initializer.Constant(0.001), - ), - ) - src_pos_enc.stop_gradient = True - enc_input = src_word_emb + src_pos_enc - return ( - layers.dropout( - enc_input, - dropout_prob=dropout_rate, - seed=ModelHyperParams.dropout_seed, - is_test=False, - ) - if dropout_rate - else enc_input - ) - - -prepare_encoder = partial( - prepare_encoder, pos_enc_param_name=pos_enc_param_names[0] -) -prepare_decoder = partial( - prepare_encoder, pos_enc_param_name=pos_enc_param_names[1] -) - - -def encoder_layer( - enc_input, - attn_bias, - n_head, - d_key, - d_value, - d_model, - d_inner_hid, - dropout_rate=0.0, -): - """The encoder layers that can be stacked to form a deep encoder. - This module consits of a multi-head (self) attention followed by - position-wise feed-forward networks and both the two components companied - with the post_process_layer to add residual connection, layer normalization - and droput. - """ - attn_output = multi_head_attention( - enc_input, - enc_input, - enc_input, - attn_bias, - d_key, - d_value, - d_model, - n_head, - dropout_rate, - ) - attn_output = post_process_layer( - enc_input, attn_output, "dan", dropout_rate - ) - ffd_output = positionwise_feed_forward(attn_output, d_inner_hid, d_model) - return post_process_layer(attn_output, ffd_output, "dan", dropout_rate) - - -def encoder( - enc_input, - attn_bias, - n_layer, - n_head, - d_key, - d_value, - d_model, - d_inner_hid, - dropout_rate=0.0, -): - """ - The encoder is composed of a stack of identical layers returned by calling - encoder_layer. - """ - for i in range(n_layer): - enc_output = encoder_layer( - enc_input, - attn_bias, - n_head, - d_key, - d_value, - d_model, - d_inner_hid, - dropout_rate, - ) - enc_input = enc_output - return enc_output - - -def decoder_layer( - dec_input, - enc_output, - slf_attn_bias, - dec_enc_attn_bias, - n_head, - d_key, - d_value, - d_model, - d_inner_hid, - dropout_rate=0.0, - cache=None, -): - """The layer to be stacked in decoder part. - The structure of this module is similar to that in the encoder part except - a multi-head attention is added to implement encoder-decoder attention. - """ - slf_attn_output = multi_head_attention( - dec_input, - dec_input, - dec_input, - slf_attn_bias, - d_key, - d_value, - d_model, - n_head, - dropout_rate, - cache, - ) - slf_attn_output = post_process_layer( - dec_input, - slf_attn_output, - "dan", # residual connection + dropout + layer normalization - dropout_rate, - ) - enc_attn_output = multi_head_attention( - slf_attn_output, - enc_output, - enc_output, - dec_enc_attn_bias, - d_key, - d_value, - d_model, - n_head, - dropout_rate, - ) - enc_attn_output = post_process_layer( - slf_attn_output, - enc_attn_output, - "dan", # residual connection + dropout + layer normalization - dropout_rate, - ) - ffd_output = positionwise_feed_forward( - enc_attn_output, - d_inner_hid, - d_model, - ) - dec_output = post_process_layer( - enc_attn_output, - ffd_output, - "dan", # residual connection + dropout + layer normalization - dropout_rate, - ) - return dec_output - - -def decoder( - dec_input, - enc_output, - dec_slf_attn_bias, - dec_enc_attn_bias, - n_layer, - n_head, - d_key, - d_value, - d_model, - d_inner_hid, - dropout_rate=0.0, - caches=None, -): - """ - The decoder is composed of a stack of identical decoder_layer layers. - """ - for i in range(n_layer): - cache = None - if caches is not None: - cache = caches[i] - - dec_output = decoder_layer( - dec_input, - enc_output, - dec_slf_attn_bias, - dec_enc_attn_bias, - n_head, - d_key, - d_value, - d_model, - d_inner_hid, - dropout_rate, - cache=cache, - ) - dec_input = dec_output - return dec_output - - -def make_all_inputs(input_fields): - """ - Define the input data layers for the transformer model. - """ - inputs = [] - for input_field in input_fields: - input_var = paddle.static.data( - name=input_field, - shape=input_descs[input_field][0], - dtype=input_descs[input_field][1], - lod_level=input_descs[input_field][2] - if len(input_descs[input_field]) == 3 - else 0, - ) - inputs.append(input_var) - return inputs - - -def transformer( - src_vocab_size, - trg_vocab_size, - max_length, - n_layer, - n_head, - d_key, - d_value, - d_model, - d_inner_hid, - dropout_rate, - weight_sharing, - label_smooth_eps, -): - if weight_sharing: - assert ( - src_vocab_size == src_vocab_size - ), "Vocabularies in source and target should be same for weight sharing." - enc_inputs = make_all_inputs(encoder_data_input_fields) - - enc_output = wrap_encoder( - src_vocab_size, - max_length, - n_layer, - n_head, - d_key, - d_value, - d_model, - d_inner_hid, - dropout_rate, - weight_sharing, - enc_inputs, - ) - - dec_inputs = make_all_inputs(decoder_data_input_fields[:-1]) - - predict = wrap_decoder( - trg_vocab_size, - max_length, - n_layer, - n_head, - d_key, - d_value, - d_model, - d_inner_hid, - dropout_rate, - weight_sharing, - dec_inputs, - enc_output, - ) - - # Padding index do not contribute to the total loss. The weights is used to - # cancel padding index in calculating the loss. - label, weights = make_all_inputs(label_data_input_fields) - if label_smooth_eps: - label = F.label_smooth( - label=layers.one_hot(input=label, depth=trg_vocab_size), - epsilon=label_smooth_eps, - ) - - cost = paddle.nn.functional.softmax_with_cross_entropy( - logits=paddle.reshape(predict, shape=[-1, trg_vocab_size]), - label=label, - soft_label=True if label_smooth_eps else False, - ) - weighted_cost = cost * weights - sum_cost = paddle.sum(weighted_cost) - token_num = paddle.sum(weights) - avg_cost = sum_cost / token_num - avg_cost.stop_gradient = True - return sum_cost, avg_cost, predict, token_num - - -def wrap_encoder( - src_vocab_size, - max_length, - n_layer, - n_head, - d_key, - d_value, - d_model, - d_inner_hid, - dropout_rate, - weight_sharing, - enc_inputs=None, -): - """ - The wrapper assembles together all needed layers for the encoder. - """ - if enc_inputs is None: - # This is used to implement independent encoder program in inference. - src_word, src_pos, src_slf_attn_bias = make_all_inputs( - encoder_data_input_fields - ) - else: - src_word, src_pos, src_slf_attn_bias = enc_inputs - enc_input = prepare_encoder( - src_word, - src_pos, - src_vocab_size, - d_model, - max_length, - dropout_rate, - word_emb_param_name=word_emb_param_names[0], - ) - enc_output = encoder( - enc_input, - src_slf_attn_bias, - n_layer, - n_head, - d_key, - d_value, - d_model, - d_inner_hid, - dropout_rate, - ) - return enc_output - - -def wrap_decoder( - trg_vocab_size, - max_length, - n_layer, - n_head, - d_key, - d_value, - d_model, - d_inner_hid, - dropout_rate, - weight_sharing, - dec_inputs=None, - enc_output=None, - caches=None, -): - """ - The wrapper assembles together all needed layers for the decoder. - """ - if dec_inputs is None: - # This is used to implement independent decoder program in inference. - ( - trg_word, - trg_pos, - trg_slf_attn_bias, - trg_src_attn_bias, - enc_output, - ) = make_all_inputs(decoder_data_input_fields) - else: - trg_word, trg_pos, trg_slf_attn_bias, trg_src_attn_bias = dec_inputs - - dec_input = prepare_decoder( - trg_word, - trg_pos, - trg_vocab_size, - d_model, - max_length, - dropout_rate, - word_emb_param_name=word_emb_param_names[0] - if weight_sharing - else word_emb_param_names[1], - ) - dec_output = decoder( - dec_input, - enc_output, - trg_slf_attn_bias, - trg_src_attn_bias, - n_layer, - n_head, - d_key, - d_value, - d_model, - d_inner_hid, - dropout_rate, - caches=caches, - ) - # Return logits for training and probs for inference. - if weight_sharing: - predict = layers.matmul( - x=dec_output, - y=fluid.framework._get_var(word_emb_param_names[0]), - transpose_y=True, - ) - else: - predict = paddle.static.nn.fc( - x=dec_output, - size=trg_vocab_size, - num_flatten_dims=2, - weight_attr=const_para_attr, - bias_attr=const_bias_attr, - ) - if dec_inputs is None: - predict = paddle.nn.functional.softmax(predict) - return predict - - -def fast_decode( - src_vocab_size, - trg_vocab_size, - max_in_len, - n_layer, - n_head, - d_key, - d_value, - d_model, - d_inner_hid, - dropout_rate, - weight_sharing, - beam_size, - max_out_len, - eos_idx, -): - """ - Use beam search to decode. Caches will be used to store states of history - steps which can make the decoding faster. - """ - enc_output = wrap_encoder( - src_vocab_size, - max_in_len, - n_layer, - n_head, - d_key, - d_value, - d_model, - d_inner_hid, - dropout_rate, - weight_sharing, - ) - start_tokens, init_scores, trg_src_attn_bias = make_all_inputs( - fast_decoder_data_input_fields - ) - - def beam_search(): - max_len = layers.fill_constant( - shape=[1], dtype=start_tokens.dtype, value=max_out_len - ) - step_idx = layers.fill_constant( - shape=[1], dtype=start_tokens.dtype, value=0 - ) - cond = paddle.less_than(x=step_idx, y=max_len) - while_op = layers.While(cond) - # array states will be stored for each step. - ids = layers.array_write( - paddle.reshape(start_tokens, (-1, 1)), step_idx - ) - scores = layers.array_write(init_scores, step_idx) - # cell states will be overwrited at each step. - # caches contains states of history steps to reduce redundant - # computation in decoder. - caches = [ - { - "k": layers.fill_constant_batch_size_like( - input=start_tokens, - shape=[-1, 0, d_model], - dtype=enc_output.dtype, - value=0, - ), - "v": layers.fill_constant_batch_size_like( - input=start_tokens, - shape=[-1, 0, d_model], - dtype=enc_output.dtype, - value=0, - ), - } - for i in range(n_layer) - ] - with while_op.block(): - pre_ids = layers.array_read(array=ids, i=step_idx) - pre_ids = paddle.reshape(pre_ids, (-1, 1, 1)) - pre_scores = layers.array_read(array=scores, i=step_idx) - # sequence_expand can gather sequences according to lod thus can be - # used in beam search to sift states corresponding to selected ids. - pre_src_attn_bias = layers.sequence_expand( - x=trg_src_attn_bias, y=pre_scores - ) - pre_enc_output = layers.sequence_expand(x=enc_output, y=pre_scores) - pre_caches = [ - { - "k": layers.sequence_expand(x=cache["k"], y=pre_scores), - "v": layers.sequence_expand(x=cache["v"], y=pre_scores), - } - for cache in caches - ] - pre_pos = layers.elementwise_mul( - x=layers.fill_constant_batch_size_like( - input=pre_enc_output, # can't use pre_ids here since it has lod - value=1, - shape=[-1, 1, 1], - dtype=pre_ids.dtype, - ), - y=layers.increment(x=step_idx, value=1.0, in_place=False), - axis=0, - ) - logits = wrap_decoder( - trg_vocab_size, - max_in_len, - n_layer, - n_head, - d_key, - d_value, - d_model, - d_inner_hid, - dropout_rate, - weight_sharing, - dec_inputs=(pre_ids, pre_pos, None, pre_src_attn_bias), - enc_output=pre_enc_output, - caches=pre_caches, - ) - logits = paddle.reshape(logits, (-1, trg_vocab_size)) - topk_scores, topk_indices = paddle.topk( - x=paddle.nn.functional.softmax(logits), k=beam_size - ) - accu_scores = layers.elementwise_add( - x=paddle.log(topk_scores), - y=paddle.reshape(pre_scores, shape=[-1]), - axis=0, - ) - # beam_search op uses lod to distinguish branches. - topk_indices = layers.lod_reset(topk_indices, pre_ids) - selected_ids, selected_scores = layers.beam_search( - pre_ids=pre_ids, - pre_scores=pre_scores, - ids=topk_indices, - scores=accu_scores, - beam_size=beam_size, - end_id=eos_idx, - ) - - layers.increment(x=step_idx, value=1.0, in_place=True) - # update states - layers.array_write(selected_ids, i=step_idx, array=ids) - layers.array_write(selected_scores, i=step_idx, array=scores) - paddle.assign(pre_src_attn_bias, trg_src_attn_bias) - paddle.assign(pre_enc_output, enc_output) - for i in range(n_layer): - paddle.assign(pre_caches[i]["k"], caches[i]["k"]) - paddle.assign(pre_caches[i]["v"], caches[i]["v"]) - length_cond = paddle.less_than(x=step_idx, y=max_len) - finish_cond = paddle.logical_not(layers.is_empty(x=selected_ids)) - paddle.logical_and(x=length_cond, y=finish_cond, out=cond) - - finished_ids, finished_scores = layers.beam_search_decode( - ids, scores, beam_size=beam_size, end_id=eos_idx - ) - return finished_ids, finished_scores - - finished_ids, finished_scores = beam_search() - return finished_ids, finished_scores - - -def get_model(is_dist, is_async): - sum_cost, avg_cost, predict, token_num = transformer( - ModelHyperParams.src_vocab_size, - ModelHyperParams.trg_vocab_size, - ModelHyperParams.max_length + 1, - ModelHyperParams.n_layer, - ModelHyperParams.n_head, - ModelHyperParams.d_key, - ModelHyperParams.d_value, - ModelHyperParams.d_model, - ModelHyperParams.d_inner_hid, - ModelHyperParams.dropout, - ModelHyperParams.weight_sharing, - TrainTaskConfig.label_smooth_eps, - ) - - local_lr_scheduler = LearningRateScheduler( - ModelHyperParams.d_model, - TrainTaskConfig.warmup_steps, - TrainTaskConfig.learning_rate, - ) - # Context to do validation. - test_program = fluid.default_main_program().clone(for_test=True) - - if not is_dist: - optimizer = fluid.optimizer.Adam( - learning_rate=local_lr_scheduler.learning_rate, - beta1=TrainTaskConfig.beta1, - beta2=TrainTaskConfig.beta2, - epsilon=TrainTaskConfig.eps, - ) - optimizer.minimize(sum_cost) - elif is_async: - optimizer = fluid.optimizer.SGD(0.003) - optimizer.minimize(sum_cost) - else: - lr_decay = fluid.layers.learning_rate_scheduler.noam_decay( - ModelHyperParams.d_model, TrainTaskConfig.warmup_steps - ) - - optimizer = fluid.optimizer.Adam( - learning_rate=lr_decay, - beta1=TrainTaskConfig.beta1, - beta2=TrainTaskConfig.beta2, - epsilon=TrainTaskConfig.eps, - ) - optimizer.minimize(sum_cost) - - return ( - sum_cost, - avg_cost, - predict, - token_num, - local_lr_scheduler, - test_program, - ) - - -def update_args(): - src_dict = DataReader.load_dict(TrainTaskConfig.src_vocab_fpath) - trg_dict = DataReader.load_dict(TrainTaskConfig.trg_vocab_fpath) - dict_args = [ - "src_vocab_size", - str(len(src_dict)), - "trg_vocab_size", - str(len(trg_dict)), - "bos_idx", - str(src_dict[TrainTaskConfig.special_token[0]]), - "eos_idx", - str(src_dict[TrainTaskConfig.special_token[1]]), - "unk_idx", - str(src_dict[TrainTaskConfig.special_token[2]]), - ] - merge_cfg_from_list(dict_args, [TrainTaskConfig, ModelHyperParams]) - - -class DistTransformer2x2(TestDistRunnerBase): - def run_pserver(self, args): - get_model(True, not args.sync_mode) - t = self.get_transpiler( - args.trainer_id, - fluid.default_main_program(), - args.endpoints, - args.trainers, - args.sync_mode, - ) - pserver_prog = t.get_pserver_program(args.current_endpoint) - startup_prog = t.get_startup_program( - args.current_endpoint, pserver_prog - ) - - place = fluid.CPUPlace() - exe = fluid.Executor(place) - exe.run(startup_prog) - exe.run(pserver_prog) - - def run_trainer(self, args): - TrainTaskConfig.use_gpu = args.use_cuda - ( - sum_cost, - avg_cost, - predict, - token_num, - local_lr_scheduler, - test_program, - ) = get_model(args.is_dist, not args.sync_mode) - - if args.is_dist: - t = self.get_transpiler( - args.trainer_id, - fluid.default_main_program(), - args.endpoints, - args.trainers, - args.sync_mode, - ) - trainer_prog = t.get_trainer_program() - TrainTaskConfig.batch_size = 10 - TrainTaskConfig.train_file_pattern = ( - TrainTaskConfig.data_path - + "train.tok.clean.bpe.32000.en-de.train_{}".format( - args.trainer_id - ) - ) - else: - TrainTaskConfig.batch_size = 20 - trainer_prog = fluid.default_main_program() - - if args.use_cuda: - place = fluid.CUDAPlace(0) - else: - place = fluid.CPUPlace() - - startup_exe = fluid.Executor(place) - - TrainTaskConfig.local = not args.is_dist - - train_loop( - startup_exe, - trainer_prog, - 1, - sum_cost, - avg_cost, - local_lr_scheduler, - token_num, - predict, - test_program, - ) - - -if __name__ == "__main__": - update_args() - runtime_main(DistTransformer2x2) diff --git a/python/paddle/fluid/tests/unittests/test_dist_save_load.py b/python/paddle/fluid/tests/unittests/test_dist_save_load.py deleted file mode 100644 index 9eeaf376a5e0cf2aa94460baf7518e437e0a5479..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/unittests/test_dist_save_load.py +++ /dev/null @@ -1,173 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import shutil -import tempfile -import unittest - -import numpy as np -from test_dist_base import TestDistBase - -flag_name = os.path.splitext(__file__)[0] - - -class TestDistSaveLoadDense2x2(TestDistBase): - def _setup_config(self): - self._sync_mode = True - self._enforce_place = "CPU" - - def check_with_place( - self, - model_file, - delta=1e-3, - check_error_log=False, - need_envs={}, - log_name="", - ): - required_envs = { - "PATH": os.getenv("PATH", ""), - "PYTHONPATH": os.getenv("PYTHONPATH", ""), - "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""), - "http_proxy": "", - } - - required_envs.update(need_envs) - - if check_error_log: - required_envs[ - "GLOG_vmodule" - ] = "fused_all_reduce_op_handle=10,all_reduce_op_handle=10,alloc_continuous_space_op=10,fuse_all_reduce_op_pass=10,alloc_continuous_space_for_grad_pass=10,fast_threaded_ssa_graph_executor=10" - required_envs["GLOG_logtostderr"] = "1" - - model_dir = tempfile.mkdtemp() - - local_env = {} - local_env["SAVE"] = "1" - local_env["MODEL_DIR"] = model_dir - local_env.update(required_envs) - - cluster_env = {} - cluster_env["LOAD"] = "1" - cluster_env["MODEL_DIR"] = model_dir - cluster_env.update(required_envs) - - local_var = self._run_local(model_file, local_env, check_error_log) - tr0_var, tr1_var = self._run_cluster( - model_file, cluster_env, check_error_log, log_name=flag_name - ) - - shutil.rmtree(model_dir) - - local_np = np.array(local_var) - train0_np = np.array(tr0_var) - train1_np = np.array(tr1_var) - - np.testing.assert_almost_equal(local_np, train0_np, decimal=2) - np.testing.assert_almost_equal(local_np, train1_np, decimal=2) - np.testing.assert_almost_equal(train0_np, train1_np, decimal=2) - - def test_dist(self): - need_envs = { - "IS_DISTRIBUTED": '0', - "IS_SPARSE": '0', - 'IS_SELF_CONTAINED_LR': '1', - 'SAVE_MODE': 'LOCAL', - } - self.check_with_place( - "dist_save_load.py", - delta=0, - check_error_log=False, - need_envs=need_envs, - ) - - -class TestDistSaveLoadWithPServerStateDense2x2(TestDistBase): - def _setup_config(self): - self._sync_mode = True - self._enforce_place = "CPU" - - def check_with_place( - self, - model_file, - delta=1e-3, - check_error_log=False, - need_envs={}, - log_name="", - ): - required_envs = { - "PATH": os.getenv("PATH", ""), - "PYTHONPATH": os.getenv("PYTHONPATH", ""), - "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""), - "http_proxy": "", - } - - required_envs.update(need_envs) - - if check_error_log: - required_envs[ - "GLOG_vmodule" - ] = "fused_all_reduce_op_handle=10,all_reduce_op_handle=10,alloc_continuous_space_op=10,fuse_all_reduce_op_pass=10,alloc_continuous_space_for_grad_pass=10,fast_threaded_ssa_graph_executor=10" - required_envs["GLOG_logtostderr"] = "1" - - model_dir = tempfile.mkdtemp() - - save_env = {} - save_env["SAVE_MODE"] = "DIST" - save_env["SAVE"] = "1" - save_env["MODEL_DIR"] = model_dir - save_env.update(required_envs) - - tr0_var_1, tr1_var_1 = self._run_cluster( - model_file, save_env, check_error_log, log_name=flag_name - ) - - load_env = {} - load_env["LOAD"] = "1" - load_env["MODEL_DIR"] = model_dir - load_env.update(required_envs) - tr0_var_2, tr1_var_2 = self._run_cluster( - model_file, load_env, check_error_log, log_name=flag_name - ) - - shutil.rmtree(model_dir) - - train0_1_np = np.array(tr0_var_1) - train1_1_np = np.array(tr1_var_1) - train0_2_np = np.array(tr0_var_2) - train1_2_np = np.array(tr1_var_2) - - np.testing.assert_almost_equal(train0_1_np, train0_2_np, decimal=2) - np.testing.assert_almost_equal(train1_1_np, train1_2_np, decimal=2) - - def test_dist(self): - need_envs = { - "IS_DISTRIBUTED": '0', - "IS_SPARSE": '0', - 'IS_SELF_CONTAINED_LR': '1', - 'SAVE_MODE': 'DIST', - 'OPTIMIZER': 'ADAM', - 'SKIP_STEPS': str(np.random.randint(2, 6)), - } - self.check_with_place( - "dist_save_load.py", - delta=0, - check_error_log=True, - need_envs=need_envs, - log_name=flag_name, - ) - - -if __name__ == "__main__": - unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_dist_transformer.py b/python/paddle/fluid/tests/unittests/test_dist_transformer.py deleted file mode 100644 index 6ccebe309df209de2badbe5c37396cc40fee08c1..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/unittests/test_dist_transformer.py +++ /dev/null @@ -1,89 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import unittest - -from test_dist_base import TestDistBase - -import paddle - - -def download_files(): - url_prefix = 'http://paddle-unittest-data.bj.bcebos.com/dist_transformer/' - vocab_url = url_prefix + 'vocab.bpe.32000' - vocab_md5 = 'a86d345ca6e27f6591d0dccb1b9be853' - paddle.dataset.common.download( - vocab_url, 'test_dist_transformer', vocab_md5 - ) - - local_train_url = url_prefix + 'train.tok.clean.bpe.32000.en-de' - local_train_md5 = '033eb02b9449e6dd823f050782ac8914' - paddle.dataset.common.download( - local_train_url, 'test_dist_transformer', local_train_md5 - ) - - train0_url = url_prefix + 'train.tok.clean.bpe.32000.en-de.train_0' - train0_md5 = 'ddce7f602f352a0405267285379a38b1' - paddle.dataset.common.download( - train0_url, 'test_dist_transformer', train0_md5 - ) - - train1_url = url_prefix + 'train.tok.clean.bpe.32000.en-de.train_1' - train1_md5 = '8757798200180285b1a619cd7f408747' - paddle.dataset.common.download( - train1_url, 'test_dist_transformer', train1_md5 - ) - - test_url = url_prefix + 'newstest2013.tok.bpe.32000.en-de' - test_md5 = '9dd74a266dbdb25314183899f269b4a2' - paddle.dataset.common.download(test_url, 'test_dist_transformer', test_md5) - # cut test data for faster CI - orig_path = os.path.join( - paddle.dataset.common.DATA_HOME, - "test_dist_transformer", - "newstest2013.tok.bpe.32000.en-de", - ) - head_path = os.path.join( - paddle.dataset.common.DATA_HOME, - "test_dist_transformer", - "newstest2013.tok.bpe.32000.en-de.cut", - ) - os.system("head -n10 %s > %s" % (orig_path, head_path)) - - -class TestDistTransformer2x2Sync(TestDistBase): - def _setup_config(self): - self._sync_mode = True - - def test_dist_train(self): - download_files() - self.check_with_place( - "dist_transformer.py", delta=1e-5, check_error_log=False - ) - - -class TestDistTransformer2x2Async(TestDistBase): - def _setup_config(self): - self._sync_mode = False - - def test_dist_train(self): - download_files() - self.check_with_place( - "dist_transformer.py", delta=1.0, check_error_log=False - ) - - -if __name__ == "__main__": - unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_api_input.py b/python/paddle/fluid/tests/unittests/test_fleet_api_input.py deleted file mode 100644 index 316ddce6577f62a3a927925d41673b98e193c135..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/unittests/test_fleet_api_input.py +++ /dev/null @@ -1,291 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest - -from dist_fleet_simnet_bow import train_network - -import paddle -import paddle.fluid as fluid -import paddle.incubate.distributed.fleet.role_maker as role_maker -from paddle.distributed.transpiler.distribute_transpiler import ( - DistributeTranspilerConfig, -) -from paddle.incubate.distributed.fleet.collective import CollectiveOptimizer - -# from paddle.incubate.distributed.fleet.parameter_server import TranspilerOptimizer -from paddle.incubate.distributed.fleet.parameter_server.distribute_transpiler import ( - fleet, -) -from paddle.incubate.distributed.fleet.role_maker import ( - Role, - UserDefinedCollectiveRoleMaker, - UserDefinedRoleMaker, -) - - -class DistributeTranspilerConfigTest(unittest.TestCase): - def set_runtime_split_send_recv(self, config, value): - config.runtime_split_send_recv = value - - def set_sync_mode(self, config, value): - config.sync_mode = value - - def testConfig(self): - config = DistributeTranspilerConfig() - self.assertRaises(Exception, self.set_sync_mode, config, None) - self.assertRaises( - Exception, self.set_runtime_split_send_recv, config, None - ) - self.assertRaises( - Exception, self.set_runtime_split_send_recv, config, True - ) - self.set_sync_mode(config, False) - self.assertFalse(config.sync_mode) - self.set_runtime_split_send_recv(config, True) - self.assertRaises(Exception, self.set_sync_mode, config, True) - - -class FleetTest(unittest.TestCase): - def testInvalidInputs(self): - self.assertRaises(Exception, fleet.split_files, "files") - self.assertRaises(Exception, fleet.init, "pserver") - - data = paddle.static.data(name='X', shape=[-1, 1], dtype='float32') - hidden = paddle.static.nn.fc(x=data, size=10) - loss = paddle.mean(hidden) - adam = fluid.optimizer.Adam() - adam.minimize(loss) - place = fluid.CPUPlace() - exe = fluid.Executor(place) - pe = fluid.ParallelExecutor(use_cuda=False, loss_name=loss.name) - self.assertRaises( - Exception, - fleet.save_inference_model, - dirname='/tmp/', - feeded_var_names=['X'], - target_vars=[loss], - executor=pe, - ) - self.assertRaises( - Exception, - fleet.save_inference_model, - dirname='/tmp/', - feeded_var_names=['X'], - target_vars=[loss], - executor="executor", - ) - compiled_prog = fluid.compiler.CompiledProgram( - fluid.default_main_program() - ) - self.assertRaises( - Exception, - fleet.save_inference_model, - dirname='/tmp/', - feeded_var_names=['X'], - target_vars=[loss], - executor=exe, - main_program=compiled_prog, - ) - self.assertRaises( - Exception, fleet.save_persistables, executor=pe, dirname='/tmp/' - ) - self.assertRaises( - Exception, - fleet.save_persistables, - executor="executor", - dirname='/tmp/', - ) - self.assertRaises( - Exception, - fleet.save_persistables, - executor=exe, - dirname='/tmp/', - main_program=compiled_prog, - ) - # self.assertRaises(Exception, fleet._transpile, "config") - - def set_program(self, avg_cost, strategy): - with fluid.scope_guard(fluid.Scope()): - optimizer = fluid.optimizer.SGD(0.1) - optimizer = fleet.distributed_optimizer(optimizer, strategy) - optimizer.minimize(avg_cost) - - def test_init_role(self): - role = role_maker.UserDefinedRoleMaker( - current_id=0, - role=role_maker.Role.SERVER, - worker_num=2, - server_endpoints=["127.0.0.1:36011", "127.0.0.1:36012"], - ) - # for test optimizer without init(role) - # fleet.init(role) - batch_size = 128 - is_sparse = True - is_distribute = False - strategy = DistributeTranspilerConfig() - strategy.sync_mode = False - strategy.geo_sgd_mode = True - strategy.geo_sgd_need_push_nums = 5 - avg_cost, _, _, _ = train_network(batch_size, is_distribute, is_sparse) - - self.assertRaises(Exception, self.set_program, avg_cost, strategy) - - def test_transpile(self): - role = role_maker.UserDefinedRoleMaker( - current_id=0, - role=role_maker.Role.SERVER, - worker_num=2, - server_endpoints=["127.0.0.1:36011", "127.0.0.1:36012"], - ) - # for test optimizer without init(role) - fleet.init(role) - batch_size = 128 - is_sparse = True - is_distribute = False - - strategy = DistributeTranspilerConfig() - strategy.sync_mode = False - strategy.runtime_split_send_recv = True - avg_cost, _, _, _ = train_network(batch_size, is_distribute, is_sparse) - - self.set_program(avg_cost, strategy) - strategy.runtime_split_send_recv = False - self.set_program(avg_cost, strategy) - - -""" -class TranspilerOptimizerTest(unittest.TestCase): - def testInvalidInputs(self): - self.assertRaises(Exception, TranspilerOptimizer, "Adam", None) - self.assertRaises( - Exception, - TranspilerOptimizer, - fluid.optimizer.Adam(0.001), - "strategy", - ) - - transpiler = TranspilerOptimizer(fluid.optimizer.Adam(0.001)) - self.assertRaises(Exception, transpiler.minimize, loss=[]) - data = paddle.static.data(name='X', shape=[-1, 1], dtype='float32') - hidden = paddle.static.nn.fc(x=data, size=10) - loss = paddle.mean(hidden) - self.assertRaises( - Exception, transpiler.minimize, loss=loss.name, startup_program=[] - ) -""" - - -class UserDefinedRoleMakerTest(unittest.TestCase): - def createRoleMaker( - self, - current_id=0, - role=Role.WORKER, - worker_num=1, - server_endpoints=["127.0.0.1:8080"], - ): - role = UserDefinedRoleMaker( - current_id, role, worker_num, server_endpoints - ) - - def testRoleMaker(self): - self.createRoleMaker() - # test all invalid server_endpoints - self.assertRaises( - Exception, self.createRoleMaker, server_endpoints=None - ) # server_endpoints must be as list - self.assertRaises( - Exception, self.createRoleMaker, server_endpoints=[] - ) # server_endpoints can't be empty - self.assertRaises( - Exception, self.createRoleMaker, server_endpoints=[3, []] - ) # element in server_endpoints must be as string - self.assertRaises( - Exception, - self.createRoleMaker, - server_endpoints=["127.0.0.1:8080", "127.0.0.1:8080"], - ) # element in server_endpoints can't be duplicate - # test all invalid current_id - self.assertRaises( - Exception, self.createRoleMaker, current_id="0" - ) # current_id must be as int - self.assertRaises( - Exception, self.createRoleMaker, current_id=-1 - ) # current_id must be greater than or equal to 0 - self.assertRaises( - Exception, - self.createRoleMaker, - current_id=1, - role=Role.SERVER, - server_endpoints=["127.0.0.1:8080"], - ) # if role is server, current_id must be less than len(server_endpoints) - # test all invalid worker_num - self.assertRaises( - Exception, self.createRoleMaker, worker_num="1" - ) # worker_num must be as int - self.assertRaises( - Exception, self.createRoleMaker, worker_num=0 - ) # worker_num must be greater than 0 - # test all invalid role - self.assertRaises( - Exception, self.createRoleMaker, role=3 - ) # role must be as Role(Role.WORKER=1, Role.SERVER=2) - - -class UserDefinedCollectiveRoleMakerTest(unittest.TestCase): - def createRoleMaker( - self, current_id=0, worker_endpoints=["127.0.0.1:8080"] - ): - role = UserDefinedCollectiveRoleMaker(current_id, worker_endpoints) - - def testRoleMaker(self): - self.createRoleMaker() - # test all invalid worker_endpoints - self.assertRaises( - Exception, self.createRoleMaker, worker_endpoints=None - ) # worker_endpoints must be as list - self.assertRaises( - Exception, self.createRoleMaker, worker_endpoints=[] - ) # worker_endpoints can't be empty - self.assertRaises( - Exception, self.createRoleMaker, worker_endpoints=[3, []] - ) # element worker_endpoints must be as string - self.assertRaises( - Exception, - self.createRoleMaker, - worker_endpoints=["127.0.0.1:8080", "127.0.0.1:8080"], - ) # element in worker_endpoints can't be duplicate - # test all invalid current_id - self.assertRaises( - Exception, self.createRoleMaker, current_id="0" - ) # current_id must be as int - self.assertRaises( - Exception, self.createRoleMaker, current_id=-1 - ) # current_id must be greater than or equal to 0 - self.assertRaises( - Exception, - self.createRoleMaker, - current_id=1, - worker_endpoints=["127.0.0.1:8080"], - ) # current_id must be less than len(worker_endpoints) - - -class CollectiveOptimizerTest(unittest.TestCase): - def test_ds_as_None(self): - optimizer = fluid.optimizer.AdamOptimizer() - dist_optimizer = CollectiveOptimizer(optimizer, strategy=None) - - -if __name__ == '__main__': - unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_fleet_base_2.py b/python/paddle/fluid/tests/unittests/test_fleet_base_2.py index ee5a84d1e41e810e2dc6a47758fd672e91223eb0..8f55bf1384a8f2aff446c94d6faf5031ae39b31b 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_base_2.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_base_2.py @@ -71,7 +71,6 @@ class TestFleetBase(unittest.TestCase): place = fluid.CPUPlace() exe = fluid.Executor(place) exe.run(paddle.static.default_startup_program()) - pe = fluid.ParallelExecutor(use_cuda=False, loss_name=avg_cost.name) compiled_prog = fluid.compiler.CompiledProgram( fluid.default_main_program() )