From 48c280d3b8c26f836d66fcdc9d55295b3ab5b330 Mon Sep 17 00:00:00 2001 From: 123malin Date: Fri, 24 Apr 2020 10:48:23 +0800 Subject: [PATCH] add shuffle_batch (#4569) --- PaddleRec/word2vec/README.md | 3 +- PaddleRec/word2vec/cluster_train.py | 44 +++++++++------ PaddleRec/word2vec/net.py | 83 +++++++++++++++++++++++++++++ PaddleRec/word2vec/train.py | 44 +++++++++------ PaddleRec/word2vec/utils.py | 7 ++- 5 files changed, 148 insertions(+), 33 deletions(-) diff --git a/PaddleRec/word2vec/README.md b/PaddleRec/word2vec/README.md index 581c81ac..eae86615 100644 --- a/PaddleRec/word2vec/README.md +++ b/PaddleRec/word2vec/README.md @@ -20,7 +20,7 @@ ## 介绍 本例实现了skip-gram模式的word2vector模型。 -**目前模型库下模型均要求使用PaddlePaddle 1.6及以上版本或适当的develop版本。** +**目前模型库下模型均要求使用PaddlePaddle 1.6及以上版本或适当的develop版本。若要使用shuffle_batch功能,则需使用PaddlePaddle 1.7及以上版本。** 同时推荐用户参考[ IPython Notebook demo](https://aistudio.baidu.com/aistudio/projectDetail/124377) @@ -102,6 +102,7 @@ OPENBLAS_NUM_THREADS=1 CPU_NUM=5 python train.py --train_data_dir data/convert_t ```bash sh cluster_train.sh ``` +若需要开启shuffle_batch功能,需在命令中加入`--with_shuffle_batch`。单机模拟分布式多机训练,需更改`cluster_train.sh`文件,在各个节点的启动命令中加入`--with_shuffle_batch`。 ## 预测 测试集下载命令如下 diff --git a/PaddleRec/word2vec/cluster_train.py b/PaddleRec/word2vec/cluster_train.py index 45cd9aec..11054ce3 100644 --- a/PaddleRec/word2vec/cluster_train.py +++ b/PaddleRec/word2vec/cluster_train.py @@ -10,7 +10,7 @@ import paddle import paddle.fluid as fluid import six import reader -from net import skip_gram_word2vec +from net import skip_gram_word2vec, skip_gram_word2vec_shuffle_batch logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger("fluid") @@ -100,6 +100,12 @@ def parse_args(): type=int, default=1, help='The num of trianers, (default: 1)') + parser.add_argument( + '--with_shuffle_batch', + action='store_true', + required=False, + default=False, + help='negative samples come from shuffle_batch op or not , (default: False)') return parser.parse_args() @@ -134,11 +140,7 @@ def convert_python_to_tensor(weight, batch_size, sample_reader): return __reader__ -def train_loop(args, train_program, reader, data_loader, loss, trainer_id, - weight): - - data_loader.set_batch_generator( - convert_python_to_tensor(weight, args.batch_size, reader.train())) +def train_loop(args, train_program, data_loader, loss, trainer_id): place = fluid.CPUPlace() exe = fluid.Executor(place) @@ -207,14 +209,26 @@ def train(args): filelist, 0, 1) logger.info("dict_size: {}".format(word2vec_reader.dict_size)) - np_power = np.power(np.array(word2vec_reader.id_frequencys), 0.75) - id_frequencys_pow = np_power / np_power.sum() + + if args.with_shuffle_batch: + loss, data_loader = skip_gram_word2vec_shuffle_batch( + word2vec_reader.dict_size, + args.embedding_size, + is_sparse=args.is_sparse, + neg_num=args.nce_num) + data_loader.set_sample_generator(word2vec_reader.train(), batch_size=args.batch_size, drop_last=True) + else: + np_power = np.power(np.array(word2vec_reader.id_frequencys), 0.75) + id_frequencys_pow = np_power / np_power.sum() + + loss, data_loader = skip_gram_word2vec( + word2vec_reader.dict_size, + args.embedding_size, + is_sparse=args.is_sparse, + neg_num=args.nce_num) - loss, data_loader = skip_gram_word2vec( - word2vec_reader.dict_size, - args.embedding_size, - is_sparse=args.is_sparse, - neg_num=args.nce_num) + data_loader.set_batch_generator( + convert_python_to_tensor(id_frequencys_pow, args.batch_size, word2vec_reader.train())) optimizer = fluid.optimizer.SGD( learning_rate=fluid.layers.exponential_decay( @@ -241,8 +255,8 @@ def train(args): elif args.role == "trainer": print("run trainer") train_loop(args, - t.get_trainer_program(), word2vec_reader, data_loader, loss, - args.trainer_id, id_frequencys_pow) + t.get_trainer_program(), data_loader, loss, + args.trainer_id) if __name__ == '__main__': diff --git a/PaddleRec/word2vec/net.py b/PaddleRec/word2vec/net.py index 54d29378..3e2ab33b 100644 --- a/PaddleRec/word2vec/net.py +++ b/PaddleRec/word2vec/net.py @@ -20,6 +20,89 @@ import numpy as np import paddle.fluid as fluid +def skip_gram_word2vec_shuffle_batch(dict_size, embedding_size, is_sparse=False, neg_num=5): + + words = [] + input_word = fluid.data(name="input_word", shape=[None, 1], dtype='int64') + true_word = fluid.data(name='true_label', shape=[None, 1], dtype='int64') + + words.append(input_word) + words.append(true_word) + + data_loader = fluid.io.DataLoader.from_generator( + capacity=64, feed_list=words, iterable=False) + + init_width = 0.5 / embedding_size + input_emb = fluid.embedding( + input=words[0], + is_sparse=is_sparse, + size=[dict_size, embedding_size], + param_attr=fluid.ParamAttr( + name='emb', + initializer=fluid.initializer.Uniform(-init_width, init_width))) + + true_emb_w = fluid.embedding( + input=words[1], + is_sparse=is_sparse, + size=[dict_size, embedding_size], + param_attr=fluid.ParamAttr( + name='emb_w', initializer=fluid.initializer.Constant(value=0.0))) + + true_emb_b = fluid.embedding( + input=words[1], + is_sparse=is_sparse, + size=[dict_size, 1], + param_attr=fluid.ParamAttr( + name='emb_b', initializer=fluid.initializer.Constant(value=0.0))) + input_emb = fluid.layers.squeeze(input=input_emb, axes=[1]) + true_emb_w = fluid.layers.squeeze(input=true_emb_w, axes=[1]) + true_emb_b = fluid.layers.squeeze(input=true_emb_b, axes=[1]) + + # add shuffle_batch after embedding. + neg_emb_w_list = [] + for i in range(neg_num): + neg_emb_w_list.append(fluid.contrib.layers.shuffle_batch(true_emb_w)) # shuffle true_word + neg_emb_w = fluid.layers.concat(neg_emb_w_list, axis=0) + + neg_emb_w_re = fluid.layers.reshape( + neg_emb_w, shape=[-1, neg_num, embedding_size]) + + neg_emb_b_list = [] + for i in range(neg_num): + neg_emb_b_list.append(fluid.contrib.layers.shuffle_batch(true_emb_b)) # shuffle true_word + neg_emb_b = fluid.layers.concat(neg_emb_b_list, axis=0) + neg_emb_b_vec = fluid.layers.reshape(neg_emb_b, shape=[-1, neg_num]) + + true_logits = fluid.layers.elementwise_add( + fluid.layers.reduce_sum( + fluid.layers.elementwise_mul(input_emb, true_emb_w), + dim=1, + keep_dim=True), + true_emb_b) + input_emb_re = fluid.layers.reshape( + input_emb, shape=[-1, 1, embedding_size]) + neg_matmul = fluid.layers.matmul(input_emb_re, neg_emb_w_re, transpose_y=True) + neg_matmul_re = fluid.layers.reshape(neg_matmul, shape=[-1, neg_num]) + neg_logits = fluid.layers.elementwise_add(neg_matmul_re, neg_emb_b_vec) + #nce loss + + label_ones = fluid.layers.fill_constant_batch_size_like( + true_logits, shape=[-1, 1], value=1.0, dtype='float32') + label_zeros = fluid.layers.fill_constant_batch_size_like( + true_logits, shape=[-1, neg_num], value=0.0, dtype='float32') + + true_xent = fluid.layers.sigmoid_cross_entropy_with_logits(true_logits, + label_ones) + neg_xent = fluid.layers.sigmoid_cross_entropy_with_logits(neg_logits, + label_zeros) + cost = fluid.layers.elementwise_add( + fluid.layers.reduce_sum( + true_xent, dim=1), + fluid.layers.reduce_sum( + neg_xent, dim=1)) + avg_cost = fluid.layers.reduce_mean(cost) + return avg_cost, data_loader + def skip_gram_word2vec(dict_size, embedding_size, is_sparse=False, neg_num=5): words = [] diff --git a/PaddleRec/word2vec/train.py b/PaddleRec/word2vec/train.py index 16de84d9..df77c4bd 100644 --- a/PaddleRec/word2vec/train.py +++ b/PaddleRec/word2vec/train.py @@ -10,7 +10,7 @@ import paddle import paddle.fluid as fluid import six import reader -from net import skip_gram_word2vec +from net import skip_gram_word2vec, skip_gram_word2vec_shuffle_batch import utils import sys @@ -84,6 +84,12 @@ def parse_args(): required=False, default=False, help='print speed or not , (default: False)') + parser.add_argument( + '--with_shuffle_batch', + action='store_true', + required=False, + default=False, + help='negative samples come from shuffle_batch op or not , (default: False)') parser.add_argument( '--enable_ce', action='store_true', help='If set, run the task with continuous evaluation logs.') @@ -121,10 +127,7 @@ def convert_python_to_tensor(weight, batch_size, sample_reader): return __reader__ -def train_loop(args, train_program, reader, data_loader, loss, trainer_id, - weight): - data_loader.set_batch_generator( - convert_python_to_tensor(weight, args.batch_size, reader.train())) +def train_loop(args, train_program, data_loader, loss, trainer_id): place = fluid.CPUPlace() exe = fluid.Executor(place) @@ -211,14 +214,26 @@ def train(args): filelist, 0, 1) logger.info("dict_size: {}".format(word2vec_reader.dict_size)) - np_power = np.power(np.array(word2vec_reader.id_frequencys), 0.75) - id_frequencys_pow = np_power / np_power.sum() - loss, data_loader = skip_gram_word2vec( - word2vec_reader.dict_size, - args.embedding_size, - is_sparse=args.is_sparse, - neg_num=args.nce_num) + if args.with_shuffle_batch: + loss, data_loader = skip_gram_word2vec_shuffle_batch( + word2vec_reader.dict_size, + args.embedding_size, + is_sparse=args.is_sparse, + neg_num=args.nce_num) + data_loader.set_sample_generator(word2vec_reader.train(), batch_size=args.batch_size, drop_last=True) + else: + np_power = np.power(np.array(word2vec_reader.id_frequencys), 0.75) + id_frequencys_pow = np_power / np_power.sum() + + loss, data_loader = skip_gram_word2vec( + word2vec_reader.dict_size, + args.embedding_size, + is_sparse=args.is_sparse, + neg_num=args.nce_num) + + data_loader.set_batch_generator( + convert_python_to_tensor(id_frequencys_pow, args.batch_size, word2vec_reader.train())) optimizer = fluid.optimizer.SGD( learning_rate=fluid.layers.exponential_decay( @@ -232,11 +247,10 @@ def train(args): # do local training logger.info("run local training") main_program = fluid.default_main_program() - train_loop(args, main_program, word2vec_reader, data_loader, loss, 0, - id_frequencys_pow) + train_loop(args, main_program, data_loader, loss, 0) if __name__ == '__main__': - utils.check_version() args = parse_args() + utils.check_version(args.with_shuffle_batch) train(args) diff --git a/PaddleRec/word2vec/utils.py b/PaddleRec/word2vec/utils.py index c09e30d7..2b02e04a 100644 --- a/PaddleRec/word2vec/utils.py +++ b/PaddleRec/word2vec/utils.py @@ -27,7 +27,7 @@ def prepare_data(file_dir, dict_path, batch_size): return vocab_size, reader, i2w -def check_version(): +def check_version(with_shuffle_batch=False): """ Log error and exit when the installed version of paddlepaddle is not satisfied. @@ -37,7 +37,10 @@ def check_version(): "Please make sure the version is good with your code." \ try: - fluid.require_version('1.6.0') + if with_shuffle_batch: + fluid.require_version('1.7.0') + else: + fluid.require_version('1.6.0') except Exception as e: logger.error(err) sys.exit(1) -- GitLab