提交 b92a9355 编写于 作者: P phlrain

Merge branch 'develop' of https://github.com/PaddlePaddle/models into add_lstm_rnn_lm_new

# 基于DNN模型的点击率预估模型
## 介绍
本模型实现了下述论文中提出的DNN模型:
```text
@inproceedings{guo2017deepfm,
title={DeepFM: A Factorization-Machine based Neural Network for CTR Prediction},
author={Huifeng Guo, Ruiming Tang, Yunming Ye, Zhenguo Li and Xiuqiang He},
booktitle={the Twenty-Sixth International Joint Conference on Artificial Intelligence (IJCAI)},
pages={1725--1731},
year={2017}
}
```
## 运行环境
需要先安装PaddlePaddle Fluid,然后运行:
```shell
pip install -r requirements.txt
```
## 数据集
本文使用的是Kaggle公司举办的[展示广告竞赛](https://www.kaggle.com/c/criteo-display-ad-challenge/)中所使用的Criteo数据集。
每一行是一次广告展示的特征,第一列是一个标签,表示这次广告展示是否被点击。总共有39个特征,其中13个特征采用整型值,另外26个特征是类别类特征。测试集中是没有标签的。
下载数据集:
```bash
cd data && ./download.sh && cd ..
```
## 模型
本例子只实现了DeepFM论文中介绍的模型的DNN部分,DeepFM会在其他例子中给出。
## 数据准备
处理原始数据集,整型特征使用min-max归一化方法规范到[0, 1],类别类特征使用了one-hot编码。原始数据集分割成两部分:90%用于训练,其他10%用于训练过程中的验证。
## 训练
训练的命令行选项可以通过`python train.py -h`列出。
### 单机训练:
```bash
python train.py \
--train_data_path data/raw/train.txt \
2>&1 | tee train.log
```
训练到第1轮的第40000个batch后,测试的AUC为0.801178,误差(cost)为0.445196。
### 分布式训练
本地启动一个2 trainer 2 pserver的分布式训练任务,分布式场景下训练数据会按照trainer的id进行切分,保证trainer之间的训练数据不会重叠,提高训练效率
```bash
sh cluster_train.sh
```
## 预测
预测的命令行选项可以通过`python infer.py -h`列出。
对测试集进行预测:
```bash
python infer.py \
--model_path models/pass-0/ \
--data_path data/raw/valid.txt
```
注意:infer.py跑完最后输出的AUC才是整个预测文件的整体AUC。
## 在百度云上运行集群训练
1. 参考文档 [在百度云上启动Fluid分布式训练](https://github.com/PaddlePaddle/FluidDoc/blob/develop/doc/fluid/user_guides/howto/training/train_on_baidu_cloud_cn.rst) 在百度云上部署一个CPU集群。
1. 用preprocess.py处理训练数据生成train.txt。
1. 将train.txt切分成集群机器份,放到每台机器上。
1. 用上面的 `分布式训练` 中的命令行启动分布式训练任务.
# DNN for Click-Through Rate prediction
## Introduction
This model implements the DNN part proposed in the following paper:
```text
@inproceedings{guo2017deepfm,
title={DeepFM: A Factorization-Machine based Neural Network for CTR Prediction},
author={Huifeng Guo, Ruiming Tang, Yunming Ye, Zhenguo Li and Xiuqiang He},
booktitle={the Twenty-Sixth International Joint Conference on Artificial Intelligence (IJCAI)},
pages={1725--1731},
year={2017}
}
```
The DeepFm combines factorization machine and deep neural networks to model
both low order and high order feature interactions. For details of the
factorization machines, please refer to the paper [factorization
machines](https://www.csie.ntu.edu.tw/~b97053/paper/Rendle2010FM.pdf)
## Environment
You should install PaddlePaddle Fluid first, and run:
```shell
pip install -r requirements.txt
```
## Dataset
This example uses Criteo dataset which was used for the [Display Advertising
Challenge](https://www.kaggle.com/c/criteo-display-ad-challenge/)
hosted by Kaggle.
Each row is the features for an ad display and the first column is a label
indicating whether this ad has been clicked or not. There are 39 features in
total. 13 features take integer values and the other 26 features are
categorical features. For the test dataset, the labels are omitted.
Download dataset:
```bash
cd data && ./download.sh && cd ..
```
## Model
This Demo only implement the DNN part of the model described in DeepFM paper.
DeepFM model will be provided in other model.
## Data Preprocessing method
To preprocess the raw dataset, the integer features are clipped then min-max
normalized to [0, 1] and the categorical features are one-hot encoded. The raw
training dataset are splited such that 90% are used for training and the other
10% are used for validation during training. In reader.py, training data is the first
90% of data in train.txt, and validation data is the left.
## Train
The command line options for training can be listed by `python train.py -h`.
### Local Train:
```bash
python train.py \
--train_data_path data/raw/train.txt \
2>&1 | tee train.log
```
After training pass 1 batch 40000, the testing AUC is `0.801178` and the testing
cost is `0.445196`.
### Distributed Train
Run a 2 pserver 2 trainer distribute training on a single machine.
In distributed training setting, training data is splited by trainer_id, so that training data
do not overlap among trainers
```bash
sh cluster_train.sh
```
## Infer
The command line options for infering can be listed by `python infer.py -h`.
To make inference for the test dataset:
```bash
python infer.py \
--model_path models/ \
--data_path data/raw/train.txt
```
Note: The AUC value in the last log info is the total AUC for all test dataset. Here, train.txt is splited inside the reader.py so that validation data does not have overlap with training data.
## Train on Baidu Cloud
1. Please prepare some CPU machines on Baidu Cloud following the steps in [train_on_baidu_cloud](https://github.com/PaddlePaddle/FluidDoc/blob/develop/doc/fluid/user_guides/howto/training/train_on_baidu_cloud_cn.rst)
1. Prepare dataset using preprocess.py.
1. Split the train.txt to trainer_num parts and put them on the machines.
1. Run training with the cluster train using the command in `Distributed Train` above.
\ No newline at end of file
#!/bin/bash
# start pserver0
python train.py \
--train_data_path /paddle/data/train.txt \
--is_local 0 \
--role pserver \
--endpoints 127.0.0.1:6000,127.0.0.1:6001 \
--current_endpoint 127.0.0.1:6000 \
--trainers 2 \
> pserver0.log 2>&1 &
# start pserver1
python train.py \
--train_data_path /paddle/data/train.txt \
--is_local 0 \
--role pserver \
--endpoints 127.0.0.1:6000,127.0.0.1:6001 \
--current_endpoint 127.0.0.1:6001 \
--trainers 2 \
> pserver1.log 2>&1 &
# start trainer0
python train.py \
--train_data_path /paddle/data/train.txt \
--is_local 0 \
--role trainer \
--endpoints 127.0.0.1:6000,127.0.0.1:6001 \
--trainers 2 \
--trainer_id 0 \
> trainer0.log 2>&1 &
# start trainer1
python train.py \
--train_data_path /paddle/data/train.txt \
--is_local 0 \
--role trainer \
--endpoints 127.0.0.1:6000,127.0.0.1:6001 \
--trainers 2 \
--trainer_id 1 \
> trainer1.log 2>&1 &
\ No newline at end of file
#!/bin/bash
wget --no-check-certificate https://s3-eu-west-1.amazonaws.com/criteo-labs/dac.tar.gz
tar zxf dac.tar.gz
rm -f dac.tar.gz
mkdir raw
mv ./*.txt raw/
import argparse
import logging
import numpy as np
# disable gpu training for this example
import os
os.environ["CUDA_VISIBLE_DEVICES"] = ""
import paddle
import paddle.fluid as fluid
import reader
from network_conf import ctr_dnn_model
logging.basicConfig(
format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
def parse_args():
parser = argparse.ArgumentParser(description="PaddlePaddle DeepFM example")
parser.add_argument(
'--model_path',
type=str,
required=True,
help="The path of model parameters gz file")
parser.add_argument(
'--data_path',
type=str,
required=True,
help="The path of the dataset to infer")
parser.add_argument(
'--embedding_size',
type=int,
default=10,
help="The size for embedding layer (default:10)")
parser.add_argument(
'--sparse_feature_dim',
type=int,
default=1000001,
help="The size for embedding layer (default:1000001)")
parser.add_argument(
'--batch_size',
type=int,
default=1000,
help="The size of mini-batch (default:1000)")
return parser.parse_args()
def infer():
args = parse_args()
place = fluid.CPUPlace()
inference_scope = fluid.core.Scope()
dataset = reader.CriteoDataset(args.sparse_feature_dim)
test_reader = paddle.batch(dataset.test([args.data_path]), batch_size=args.batch_size)
startup_program = fluid.framework.Program()
test_program = fluid.framework.Program()
with fluid.framework.program_guard(test_program, startup_program):
loss, data_list, auc_var, batch_auc_var = ctr_dnn_model(args.embedding_size, args.sparse_feature_dim)
exe = fluid.Executor(place)
feeder = fluid.DataFeeder(feed_list=data_list, place=place)
with fluid.scope_guard(inference_scope):
[inference_program, _, fetch_targets] = fluid.io.load_inference_model(args.model_path, exe)
def set_zero(var_name):
param = inference_scope.var(var_name).get_tensor()
param_array = np.zeros(param._get_dims()).astype("int64")
param.set(param_array, place)
auc_states_names = ['_generated_var_2', '_generated_var_3']
for name in auc_states_names:
set_zero(name)
for batch_id, data in enumerate(test_reader()):
loss_val, auc_val = exe.run(inference_program,
feed=feeder.feed(data),
fetch_list=fetch_targets)
if batch_id % 100 == 0:
logger.info("TEST --> batch: {} loss: {} auc: {}".format(batch_id, loss_val/args.batch_size, auc_val))
if __name__ == '__main__':
infer()
import paddle.fluid as fluid
import math
dense_feature_dim = 13
def ctr_dnn_model(embedding_size, sparse_feature_dim):
dense_input = fluid.layers.data(
name="dense_input", shape=[dense_feature_dim], dtype='float32')
sparse_input_ids = [
fluid.layers.data(
name="C" + str(i), shape=[1], lod_level=1, dtype='int64')
for i in range(1, 27)
]
def embedding_layer(input):
return fluid.layers.embedding(
input=input,
size=[sparse_feature_dim, embedding_size],
param_attr=fluid.ParamAttr(name="SparseFeatFactors", initializer=fluid.initializer.Normal(scale=1/math.sqrt(sparse_feature_dim))))
sparse_embed_seq = map(embedding_layer, sparse_input_ids)
concated = fluid.layers.concat(sparse_embed_seq + [dense_input], axis=1)
fc1 = fluid.layers.fc(input=concated, size=400, act='relu',
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(scale=1/math.sqrt(concated.shape[1]))))
fc2 = fluid.layers.fc(input=fc1, size=400, act='relu',
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(scale=1/math.sqrt(fc1.shape[1]))))
fc3 = fluid.layers.fc(input=fc2, size=400, act='relu',
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(scale=1/math.sqrt(fc2.shape[1]))))
predict = fluid.layers.fc(input=fc3, size=2, act='softmax',
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(scale=1/math.sqrt(fc3.shape[1]))))
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
data_list = [dense_input] + sparse_input_ids + [label]
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.reduce_sum(cost)
accuracy = fluid.layers.accuracy(input=predict, label=label)
auc_var, batch_auc_var, auc_states = fluid.layers.auc(input=predict, label=label, num_thresholds=2**12, slide_steps=20)
return avg_cost, data_list, auc_var, batch_auc_var
"""
Preprocess Criteo dataset. This dataset was used for the Display Advertising
Challenge (https://www.kaggle.com/c/criteo-display-ad-challenge).
"""
import os
import sys
import click
import random
import collections
# There are 13 integer features and 26 categorical features
continous_features = range(1, 14)
categorial_features = range(14, 40)
# Clip integer features. The clip point for each integer feature
# is derived from the 95% quantile of the total values in each feature
continous_clip = [20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
class CategoryDictGenerator:
"""
Generate dictionary for each of the categorical features
"""
def __init__(self, num_feature):
self.dicts = []
self.num_feature = num_feature
for i in range(0, num_feature):
self.dicts.append(collections.defaultdict(int))
def build(self, datafile, categorial_features, cutoff=0):
with open(datafile, 'r') as f:
for line in f:
features = line.rstrip('\n').split('\t')
for i in range(0, self.num_feature):
if features[categorial_features[i]] != '':
self.dicts[i][features[categorial_features[i]]] += 1
for i in range(0, self.num_feature):
self.dicts[i] = filter(lambda x: x[1] >= cutoff,
self.dicts[i].items())
self.dicts[i] = sorted(self.dicts[i], key=lambda x: (-x[1], x[0]))
vocabs, _ = list(zip(*self.dicts[i]))
self.dicts[i] = dict(zip(vocabs, range(1, len(vocabs) + 1)))
self.dicts[i]['<unk>'] = 0
def gen(self, idx, key):
if key not in self.dicts[idx]:
res = self.dicts[idx]['<unk>']
else:
res = self.dicts[idx][key]
return res
def dicts_sizes(self):
return map(len, self.dicts)
class ContinuousFeatureGenerator:
"""
Normalize the integer features to [0, 1] by min-max normalization
"""
def __init__(self, num_feature):
self.num_feature = num_feature
self.min = [sys.maxint] * num_feature
self.max = [-sys.maxint] * num_feature
def build(self, datafile, continous_features):
with open(datafile, 'r') as f:
for line in f:
features = line.rstrip('\n').split('\t')
for i in range(0, self.num_feature):
val = features[continous_features[i]]
if val != '':
val = int(val)
if val > continous_clip[i]:
val = continous_clip[i]
self.min[i] = min(self.min[i], val)
self.max[i] = max(self.max[i], val)
def gen(self, idx, val):
if val == '':
return 0.0
val = float(val)
return (val - self.min[idx]) / (self.max[idx] - self.min[idx])
@click.command("preprocess")
@click.option("--datadir", type=str, help="Path to raw criteo dataset")
@click.option("--outdir", type=str, help="Path to save the processed data")
def preprocess(datadir, outdir):
"""
All 13 integer features are normalized to continuous values and these continuous
features are combined into one vector with dimension of 13.
Each of the 26 categorical features are one-hot encoded and all the one-hot
vectors are combined into one sparse binary vector.
"""
dists = ContinuousFeatureGenerator(len(continous_features))
dists.build(os.path.join(datadir, 'train.txt'), continous_features)
dicts = CategoryDictGenerator(len(categorial_features))
dicts.build(
os.path.join(datadir, 'train.txt'), categorial_features, cutoff=200)
dict_sizes = dicts.dicts_sizes()
categorial_feature_offset = [0]
for i in range(1, len(categorial_features)):
offset = categorial_feature_offset[i - 1] + dict_sizes[i - 1]
categorial_feature_offset.append(offset)
random.seed(0)
# 90% of the data are used for training, and 10% of the data are used
# for validation.
with open(os.path.join(outdir, 'train.txt'), 'w') as out_train:
with open(os.path.join(outdir, 'valid.txt'), 'w') as out_valid:
with open(os.path.join(datadir, 'train.txt'), 'r') as f:
for line in f:
features = line.rstrip('\n').split('\t')
continous_vals = []
for i in range(0, len(continous_features)):
val = dists.gen(i, features[continous_features[i]])
continous_vals.append("{0:.6f}".format(val).rstrip('0')
.rstrip('.'))
categorial_vals = []
for i in range(0, len(categorial_features)):
val = dicts.gen(i, features[categorial_features[
i]]) + categorial_feature_offset[i]
categorial_vals.append(str(val))
continous_vals = ','.join(continous_vals)
categorial_vals = ','.join(categorial_vals)
label = features[0]
if random.randint(0, 9999) % 10 != 0:
out_train.write('\t'.join(
[continous_vals, categorial_vals, label]) + '\n')
else:
out_valid.write('\t'.join(
[continous_vals, categorial_vals, label]) + '\n')
with open(os.path.join(outdir, 'test.txt'), 'w') as out:
with open(os.path.join(datadir, 'test.txt'), 'r') as f:
for line in f:
features = line.rstrip('\n').split('\t')
continous_vals = []
for i in range(0, len(continous_features)):
val = dists.gen(i, features[continous_features[i] - 1])
continous_vals.append("{0:.6f}".format(val).rstrip('0')
.rstrip('.'))
categorial_vals = []
for i in range(0, len(categorial_features)):
val = dicts.gen(i, features[categorial_features[
i] - 1]) + categorial_feature_offset[i]
categorial_vals.append(str(val))
continous_vals = ','.join(continous_vals)
categorial_vals = ','.join(categorial_vals)
out.write('\t'.join([continous_vals, categorial_vals]) + '\n')
if __name__ == "__main__":
preprocess()
class Dataset:
def __init__(self):
pass
class CriteoDataset(Dataset):
def __init__(self, sparse_feature_dim):
self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
self.cont_max_ = [20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
self.cont_diff_ = [20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
self.hash_dim_ = sparse_feature_dim
# here, training data are lines with line_index < train_idx_
self.train_idx_ = 41256555
self.continuous_range_ = range(1, 14)
self.categorical_range_ = range(14, 40)
def _reader_creator(self, file_list, is_train, trainer_num, trainer_id):
def reader():
for file in file_list:
with open(file, 'r') as f:
line_idx = 0
for line in f:
line_idx += 1
if is_train and line_idx > self.train_idx_:
continue
elif not is_train and line_idx <= self.train_idx_:
continue
if trainer_id > 0 and line_idx % trainer_num != trainer_id:
continue
features = line.rstrip('\n').split('\t')
dense_feature = []
sparse_feature = []
for idx in self.continuous_range_:
if features[idx] == '':
dense_feature.append(0.0)
else:
dense_feature.append((float(features[idx]) - self.cont_min_[idx - 1]) / self.cont_diff_[idx - 1])
for idx in self.categorical_range_:
sparse_feature.append([hash("%d_%s" % (idx, features[idx])) % self.hash_dim_])
label = [int(features[0])]
yield [dense_feature] + sparse_feature + [label]
return reader
def train(self, file_list, trainer_num, trainer_id):
return self._reader_creator(file_list, True, trainer_num, trainer_id)
def test(self, file_list):
return self._reader_creator(file_list, False, -1)
def infer(self, file_list):
return self._reader_creator(file_list, False, -1)
from __future__ import print_function
import argparse
import logging
import os
# disable gpu training for this example
os.environ["CUDA_VISIBLE_DEVICES"] = ""
import paddle
import paddle.fluid as fluid
import reader
from network_conf import ctr_dnn_model
logging.basicConfig(
format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
def parse_args():
parser = argparse.ArgumentParser(description="PaddlePaddle CTR example")
parser.add_argument(
'--train_data_path',
type=str,
default='./data/raw/train.txt',
help="The path of training dataset")
parser.add_argument(
'--test_data_path',
type=str,
default='./data/raw/valid.txt',
help="The path of testing dataset")
parser.add_argument(
'--batch_size',
type=int,
default=1000,
help="The size of mini-batch (default:1000)")
parser.add_argument(
'--embedding_size',
type=int,
default=10,
help="The size for embedding layer (default:10)")
parser.add_argument(
'--num_passes',
type=int,
default=10,
help="The number of passes to train (default: 10)")
parser.add_argument(
'--model_output_dir',
type=str,
default='models',
help='The path for model to store (default: models)')
parser.add_argument(
'--sparse_feature_dim',
type=int,
default=1000001,
help='sparse feature hashing space for index processing')
parser.add_argument(
'--is_local',
type=int,
default=1,
help='Local train or distributed train (default: 1)')
# the following arguments is used for distributed train, if is_local == false, then you should set them
parser.add_argument(
'--role',
type=str,
default='pserver', # trainer or pserver
help='The path for model to store (default: models)')
parser.add_argument(
'--endpoints',
type=str,
default='127.0.0.1:6000',
help='The pserver endpoints, like: 127.0.0.1:6000,127.0.0.1:6001')
parser.add_argument(
'--current_endpoint',
type=str,
default='127.0.0.1:6000',
help='The path for model to store (default: 127.0.0.1:6000)')
parser.add_argument(
'--trainer_id',
type=int,
default=0,
help='The path for model to store (default: models)')
parser.add_argument(
'--trainers',
type=int,
default=1,
help='The num of trianers, (default: 1)')
return parser.parse_args()
def train_loop(args, train_program, data_list, loss, auc_var, batch_auc_var,
trainer_num, trainer_id):
dataset = reader.CriteoDataset(args.sparse_feature_dim)
train_reader = paddle.batch(
paddle.reader.shuffle(
dataset.train([args.train_data_path], trainer_num, trainer_id),
buf_size=args.batch_size * 100),
batch_size=args.batch_size)
place = fluid.CPUPlace()
feeder = fluid.DataFeeder(feed_list=data_list, place=place)
data_name_list = [var.name for var in data_list]
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
for pass_id in range(args.num_passes):
for batch_id, data in enumerate(train_reader()):
loss_val, auc_val, batch_auc_val = exe.run(
train_program,
feed=feeder.feed(data),
fetch_list=[loss, auc_var, batch_auc_var]
)
logger.info("TRAIN --> pass: {} batch: {} loss: {} auc: {}, batch_auc: {}"
.format(pass_id, batch_id, loss_val/args.batch_size, auc_val, batch_auc_val))
if batch_id % 1000 == 0 and batch_id != 0:
model_dir = args.model_output_dir + '/batch-' + str(batch_id)
if args.trainer_id == 0:
fluid.io.save_inference_model(model_dir, data_name_list, [loss, auc_var], exe)
model_dir = args.model_output_dir + '/pass-' + str(pass_id)
if args.trainer_id == 0:
fluid.io.save_inference_model(model_dir, data_name_list, [loss, auc_var], exe)
def train():
args = parse_args()
if not os.path.isdir(args.model_output_dir):
os.mkdir(args.model_output_dir)
loss, data_list, auc_var, batch_auc_var = ctr_dnn_model(args.embedding_size, args.sparse_feature_dim)
optimizer = fluid.optimizer.Adam(learning_rate=1e-4)
optimizer.minimize(loss)
if args.is_local:
logger.info("run local training")
main_program = fluid.default_main_program()
train_loop(args, main_program, data_list, loss, auc_var, batch_auc_var, 1, -1)
else:
logger.info("run dist training")
t = fluid.DistributeTranspiler()
t.transpile(args.trainer_id, pservers=args.endpoints, trainers=args.trainers)
if args.role == "pserver":
logger.info("run pserver")
prog = t.get_pserver_program(args.current_endpoint)
startup = t.get_startup_program(args.current_endpoint, pserver_program=prog)
exe = fluid.Executor(fluid.CPUPlace())
exe.run(startup)
exe.run(prog)
elif args.role == "trainer":
logger.info("run trainer")
train_prog = t.get_trainer_program()
train_loop(args, train_prog, data_list, loss, auc_var, batch_auc_var,
args.trainers, args.trainer_id + 1)
if __name__ == '__main__':
train()
......@@ -74,14 +74,16 @@ python convert_format.py
```
## 训练
GPU 环境 默认配置
运行命令 `CUDA_VISIBLE_DEVICES=0 python train.py train_file test_file` 开始训练模型。
```python
CUDA_VISIBLE_DEVICES=0 python train.py small_train.txt small_test.file
'--use_cuda 1' 表示使用gpu, 缺省表示使用cpu '--parallel 1' 表示使用多卡,缺省表示使用单卡
GPU 环境
运行命令 `CUDA_VISIBLE_DEVICES=0 python train.py train_file test_file --use_cuda 1` 开始训练模型。
```
CUDA_VISIBLE_DEVICES=0 python train.py small_train.txt small_test.txt --use_cuda 1
```
CPU 环境
运行命令 `python train.py train_file test_file` 开始训练模型。
```python
```
python train.py small_train.txt small_test.txt
```
......@@ -100,8 +102,8 @@ python train.py small_train.txt small_test.txt
base_lr=0.01, # base learning rate
batch_size=batch_size,
pass_num=10, # the number of passed for training
use_cuda=True, # whether to use GPU card
parallel=False, # whether to be parallel
use_cuda=use_cuda, # whether to use GPU card
parallel=parallel, # whether to be parallel
model_dir="model_recall20", # directory to save model
init_low_bound=-0.1, # uniform parameter initialization lower bound
init_high_bound=0.1) # uniform parameter initialization upper bound
......@@ -198,9 +200,9 @@ model saved in model_recall20/epoch_1
```
## 预测
运行命令 `CUDA_VISIBLE_DEVICES=0 python infer.py model_dir start_epoch last_epoch(inclusive) train_file test_file` 开始预测其中,start_epoch指定开始预测的轮次,last_epoch指定结束的轮次,例如
运行命令 `CUDA_VISIBLE_DEVICES=0 python infer.py model_dir start_epoch last_epoch(inclusive) train_file test_file` 开始预测.其中,start_epoch指定开始预测的轮次,last_epoch指定结束的轮次,例如
```python
CUDA_VISIBLE_DEVICES=0 python infer.py model 1 10 small_train.txt small_test.txt# prediction from epoch 1 to epoch 10 small_train.txt small_test.txt
CUDA_VISIBLE_DEVICES=0 python infer.py model 1 10 small_train.txt small_test.txt
```
## 预测结果示例
......
......@@ -17,7 +17,8 @@ def parse_args():
parser = argparse.ArgumentParser("gru4rec benchmark.")
parser.add_argument('train_file')
parser.add_argument('test_file')
parser.add_argument('--use_cuda', help='whether use gpu')
parser.add_argument('--parallel', help='whether parallel')
parser.add_argument(
'--enable_ce',
action='store_true',
......@@ -120,7 +121,7 @@ def train(train_reader,
fetch_list = [avg_cost.name]
for pass_idx in six.moves.xrange(pass_num):
epoch_idx = pass_idx + 1
print "epoch_%d start" % epoch_idx
print("epoch_%d start" % epoch_idx)
t0 = time.time()
i = 0
......@@ -182,6 +183,9 @@ def train_net():
args = parse_args()
train_file = args.train_file
test_file = args.test_file
use_cuda = True if args.use_cuda else False
parallel = True if args.parallel else False
print("use_cuda:", use_cuda, "parallel:", parallel)
batch_size = 50
vocab, train_reader, test_reader = utils.prepare_data(
train_file, test_file,batch_size=batch_size * get_cards(args),\
......@@ -194,8 +198,8 @@ def train_net():
base_lr=0.01,
batch_size=batch_size,
pass_num=10,
use_cuda=True,
parallel=False,
use_cuda=use_cuda,
parallel=parallel,
model_dir="model_recall20",
init_low_bound=-0.1,
init_high_bound=0.1)
......
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v1.2.3
hooks:
- id: trailing-whitespace
\ No newline at end of file
# 个性化推荐中的多视角Simnet模型
## 介绍
在个性化推荐场景中,推荐系统给用户提供的项目(Item)列表通常是通过个性化的匹配模型计算出来的。在现实世界中,一个用户可能有很多个视角的特征,比如用户Id,年龄,项目的点击历史等。一个项目,举例来说,新闻资讯,也会有多种视角的特征比如新闻标题,新闻类别等。多视角Simnet模型是可以融合用户以及推荐项目的多个视角的特征并进行个性化匹配学习的一体化模型。这类模型在很多工业化的场景中都会被使用到,比如百度的Feed产品中。
## 数据集
目前,本项目实用机器生成的数据集来介绍多视角Simnet模型的概念,未来我们会逐渐加入真是世界中的数据集并在这个模型上进行效果验证。
## 模型
本项目的目标是提供一个在个性化匹配场景下利用Paddle搭建的模型。多视角Simnet模型包括多个编码器模块,每个编码器被用在不同的特征视角上。当前,项目中提供Bag-of-Embedding编码器,Temporal-Convolutional编码器,和Gated-Recurrent-Unit编码器。我们会逐渐加入稀疏特征场景下比较实用的编码器到这个项目中。模型的训练方法,当前采用的是Pairwise ranking模式进行训练,即针对一对具有关联的User-Item组合,随机实用一个Item作为负例进行排序学习。
## 训练
如下
如下命令行可以获得训练工具的具体选项,`python train.py -h`内容可以参考说明
```bash
python train.py
```
## 未来的工作
- 多种pairwise的损失函数会被加入到这个项目中。对于不同视角的特征,用户-项目之间的匹配关系可以使用不同的损失函数进行联合优化。整个模型会在真实数据中进行验证。
- 推理工具会被加入
- Parallel Executor选项会被加入
- 分布式训练能力会被加入
# Multi-view Simnet for Personalized recommendation
## Introduction
In personalized recommendation scenario, a user often is provided with several items from personalized interest matching model. In real world application, a user may have multiple views of features, say user-id, age, click-history of items, search queries. A item, e.g. news, may also have multiple views of features like news title, news category, images in news and so on. Multi-view Simnet is matching a model that combine users' and items' multiple views of features into one unified model. The model can be used in many industrial product like Baidu's feed news. The model is adapted from the paper A Multi-View Deep Learning(MV-DNN) Approach for Cross Domain User Modeling in Recommendation Systems, WWW 2015. The difference between our model and the MV-DNN is that we also consider multiple feature views of users.
## Dataset
Currently, synthetic dataset is provided for proof of concept and we aim to add more real world dataset in this project in the future.
## Model
This project aims to provide practical usage of Paddle in personalized matching scenario. The model provides several encoder modules for different views of features. Currently, Bag-of-Embedding encoder, Temporal-Convolutional encoder, Gated-Recurrent-Unit encoder are provided. We will add more practical encoder for sparse features commonly used in recommender systems. Training algorithms used in this model is pairwise ranking in that a negative item with multiple views will be sampled given a pair of positive user-item pair.
## Train
The command line options for training can be listed by `python train.py -h`
```bash
python train.py
```
## Future work
- Multiple types of pairwise loss will be added in this project. For different views of features between a user and an item, multiple losses will be supported. The model will be verified in real world dataset.
- infer will be added
- Parallel Executor will be added in this project
- Distributed Training will be added
#Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid as fluid
import paddle.fluid.layers.nn as nn
import paddle.fluid.layers.tensor as tensor
import paddle.fluid.layers.control_flow as cf
import paddle.fluid.layers.io as io
class BowEncoder(object):
""" bow-encoder """
def __init__(self):
self.param_name = ""
def forward(self, emb):
return nn.sequence_pool(input=emb, pool_type='sum')
class CNNEncoder(object):
""" cnn-encoder"""
def __init__(self,
param_name="cnn.w",
win_size=3,
ksize=128,
act='tanh',
pool_type='max'):
self.param_name = param_name
self.win_size = win_size
self.ksize = ksize
self.act = act
self.pool_type = pool_type
def forward(self, emb):
return fluid.nets.sequence_conv_pool(
input=emb,
num_filters=self.ksize,
filter_size=self.win_size,
act=self.act,
pool_type=self.pool_type,
attr=self.param_name)
class GrnnEncoder(object):
""" grnn-encoder """
def __init__(self, param_name="grnn.w", hidden_size=128):
self.param_name = args
self.hidden_size = hidden_size
def forward(self, emb):
fc0 = nn.fc(input=emb, size=self.hidden_size * 3)
gru_h = nn.dynamic_gru(
input=emb,
size=self.hidden_size,
is_reverse=False,
attr=self.param_name)
return nn.sequence_pool(input=gru_h, pool_type='max')
'''this is a very simple Encoder factory
most default argument values are used'''
class SimpleEncoderFactory(object):
def __init__(self):
pass
''' create an encoder through create function '''
def create(self, enc_type, enc_hid_size):
if enc_type == "bow":
bow_encode = BowEncoder()
return bow_encode
elif enc_type == "cnn":
cnn_encode = CNNEncoder(ksize=enc_hid_size)
return cnn_encode
elif enc_type == "gru":
rnn_encode = GrnnEncoder(hidden_size=enc_hid_size)
return rnn_encode
class MultiviewSimnet(object):
""" multi-view simnet """
def __init__(self, embedding_size, embedding_dim, hidden_size):
self.embedding_size = embedding_size
self.embedding_dim = embedding_dim
self.emb_shape = [self.embedding_size, self.embedding_dim]
self.hidden_size = hidden_size
self.margin = 0.1
def set_query_encoder(self, encoders):
self.query_encoders = encoders
def set_title_encoder(self, encoders):
self.title_encoders = encoders
def get_correct(self, x, y):
less = tensor.cast(cf.less_than(x, y), dtype='float32')
correct = nn.reduce_sum(less)
return correct
def train_net(self):
# input fields for query, pos_title, neg_title
q_slots = [
io.data(
name="q%d" % i, shape=[1], lod_level=1, dtype='int64')
for i in range(len(self.query_encoders))
]
pt_slots = [
io.data(
name="pt%d" % i, shape=[1], lod_level=1, dtype='int64')
for i in range(len(self.title_encoders))
]
nt_slots = [
io.data(
name="nt%d" % i, shape=[1], lod_level=1, dtype='int64')
for i in range(len(self.title_encoders))
]
# lookup embedding for each slot
q_embs = [
nn.embedding(
input=query, size=self.emb_shape, param_attr="emb.w")
for query in q_slots
]
pt_embs = [
nn.embedding(
input=title, size=self.emb_shape, param_attr="emb.w")
for title in pt_slots
]
nt_embs = [
nn.embedding(
input=title, size=self.emb_shape, param_attr="emb.w")
for title in nt_slots
]
# encode each embedding field with encoder
q_encodes = [
self.query_encoders[i].forward(emb) for i, emb in enumerate(q_embs)
]
pt_encodes = [
self.title_encoders[i].forward(emb) for i, emb in enumerate(pt_embs)
]
nt_encodes = [
self.title_encoders[i].forward(emb) for i, emb in enumerate(nt_embs)
]
# concat multi view for query, pos_title, neg_title
q_concat = nn.concat(q_encodes)
pt_concat = nn.concat(pt_encodes)
nt_concat = nn.concat(nt_encodes)
# projection of hidden layer
q_hid = nn.fc(q_concat, size=self.hidden_size, param_attr='q_fc.w')
pt_hid = nn.fc(pt_concat, size=self.hidden_size, param_attr='t_fc.w')
nt_hid = nn.fc(nt_concat, size=self.hidden_size, param_attr='t_fc.w')
# cosine of hidden layers
cos_pos = nn.cos_sim(q_hid, pt_hid)
cos_neg = nn.cos_sim(q_hid, nt_hid)
# pairwise hinge_loss
loss_part1 = nn.elementwise_sub(
tensor.fill_constant_batch_size_like(
input=cos_pos,
shape=[-1, 1],
value=self.margin,
dtype='float32'),
cos_pos)
loss_part2 = nn.elementwise_add(loss_part1, cos_neg)
loss_part3 = nn.elementwise_max(
tensor.fill_constant_batch_size_like(
input=loss_part2, shape=[-1, 1], value=0.0, dtype='float32'),
loss_part2)
avg_cost = nn.mean(loss_part3)
correct = self.get_correct(cos_pos, cos_neg)
return q_slots + pt_slots + nt_slots, avg_cost, correct
def pred_net(self, query_fields, pos_title_fields, neg_title_fields):
q_slots = [
io.data(
name="q%d" % i, shape=[1], lod_level=1, dtype='int64')
for i in range(len(self.query_encoders))
]
pt_slots = [
io.data(
name="pt%d" % i, shape=[1], lod_level=1, dtype='int64')
for i in range(len(self.title_encoders))
]
# lookup embedding for each slot
q_embs = [
nn.embedding(
input=query, size=self.emb_shape, param_attr="emb.w")
for query in q_slots
]
pt_embs = [
nn.embedding(
input=title, size=self.emb_shape, param_attr="emb.w")
for title in pt_slots
]
# encode each embedding field with encoder
q_encodes = [
self.query_encoder[i].forward(emb) for i, emb in enumerate(q_embs)
]
pt_encodes = [
self.title_encoders[i].forward(emb) for i, emb in enumerate(pt_embs)
]
# concat multi view for query, pos_title, neg_title
q_concat = nn.concat(q_encodes)
pt_concat = nn.concat(pt_encodes)
# projection of hidden layer
q_hid = nn.fc(q_concat, size=self.hidden_size, param_attr='q_fc.w')
pt_hid = nn.fc(pt_concat, size=self.hidden_size, param_attr='t_fc.w')
# cosine of hidden layers
cos = nn.cos_sim(q_hid, pt_hid)
return cos
#Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
class Dataset:
def __init__(self):
pass
class SyntheticDataset(Dataset):
def __init__(self, sparse_feature_dim, query_slot_num, title_slot_num):
# ids are randomly generated
self.ids_per_slot = 10
self.sparse_feature_dim = sparse_feature_dim
self.query_slot_num = query_slot_num
self.title_slot_num = title_slot_num
self.dataset_size = 10000
def _reader_creator(self, is_train):
def generate_ids(num, space):
return [random.randint(0, space - 1) for i in range(num)]
def reader():
for i in range(self.dataset_size):
query_slots = []
pos_title_slots = []
neg_title_slots = []
for i in range(self.query_slot_num):
qslot = generate_ids(self.ids_per_slot,
self.sparse_feature_dim)
query_slots.append(qslot)
for i in range(self.title_slot_num):
pt_slot = generate_ids(self.ids_per_slot,
self.sparse_feature_dim)
pos_title_slots.append(pt_slot)
if is_train:
for i in range(self.title_slot_num):
nt_slot = generate_ids(self.ids_per_slot,
self.sparse_feature_dim)
neg_title_slots.append(nt_slot)
yield query_slots + pos_title_slots + neg_title_slots
else:
yield query_slots + pos_title_slots
return reader
def train(self):
return self._reader_creator(True)
def valid(self):
return self._reader_creator(True)
def test(self):
return self._reader_creator(False)
#Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import time
import six
import numpy as np
import math
import argparse
import logging
import paddle.fluid as fluid
import paddle
import time
import reader as reader
from nets import MultiviewSimnet, SimpleEncoderFactory
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
def parse_args():
parser = argparse.ArgumentParser("multi-view simnet")
parser.add_argument("--train_file", type=str, help="Training file")
parser.add_argument("--valid_file", type=str, help="Validation file")
parser.add_argument(
"--epochs", type=int, default=10, help="Number of epochs for training")
parser.add_argument(
"--model_output_dir",
type=str,
default='model_output',
help="Model output folder")
parser.add_argument(
"--query_slots", type=int, default=1, help="Number of query slots")
parser.add_argument(
"--title_slots", type=int, default=1, help="Number of title slots")
parser.add_argument(
"--query_encoder",
type=str,
default="bow",
help="Encoder module for slot encoding")
parser.add_argument(
"--title_encoder",
type=str,
default="bow",
help="Encoder module for slot encoding")
parser.add_argument(
"--query_encode_dim",
type=int,
default=128,
help="Dimension of query encoder output")
parser.add_argument(
"--title_encode_dim",
type=int,
default=128,
help="Dimension of title encoder output")
parser.add_argument(
"--batch_size", type=int, default=128, help="Batch size for training")
parser.add_argument(
"--embedding_dim",
type=int,
default=128,
help="Default Dimension of Embedding")
parser.add_argument(
"--sparse_feature_dim",
type=int,
default=1000001,
help="Sparse feature hashing space"
"for index processing")
parser.add_argument(
"--hidden_size", type=int, default=128, help="Hidden dim")
return parser.parse_args()
def start_train(args):
dataset = reader.SyntheticDataset(args.sparse_feature_dim, args.query_slots,
args.title_slots)
train_reader = paddle.batch(
paddle.reader.shuffle(
dataset.train(), buf_size=args.batch_size * 100),
batch_size=args.batch_size)
place = fluid.CPUPlace()
factory = SimpleEncoderFactory()
query_encoders = [
factory.create(args.query_encoder, args.query_encode_dim)
for i in range(args.query_slots)
]
title_encoders = [
factory.create(args.title_encoder, args.title_encode_dim)
for i in range(args.title_slots)
]
m_simnet = MultiviewSimnet(args.sparse_feature_dim, args.embedding_dim,
args.hidden_size)
m_simnet.set_query_encoder(query_encoders)
m_simnet.set_title_encoder(title_encoders)
all_slots, avg_cost, correct = m_simnet.train_net()
optimizer = fluid.optimizer.Adam(learning_rate=1e-4)
optimizer.minimize(avg_cost)
startup_program = fluid.default_startup_program()
loop_program = fluid.default_main_program()
feeder = fluid.DataFeeder(feed_list=all_slots, place=place)
exe = fluid.Executor(place)
exe.run(startup_program)
for pass_id in range(args.epochs):
for batch_id, data in enumerate(train_reader()):
loss_val, correct_val = exe.run(loop_program,
feed=feeder.feed(data),
fetch_list=[avg_cost, correct])
logger.info("TRAIN --> pass: {} batch_id: {} avg_cost: {}, acc: {}"
.format(pass_id, batch_id, loss_val,
float(correct_val) / args.batch_size))
fluid.io.save_inference_model(args.model_output_dir,
[var.name for val in all_slots],
[avg_cost, correct], exe)
def main():
args = parse_args()
start_train(args)
if __name__ == "__main__":
main()
......@@ -5,6 +5,7 @@ python -u ../train_and_evaluate.py --use_cuda \
--ext_eval \
--word_emb_init ./data/word_embedding.pkl \
--save_path ./models \
--use_pyreader \
--batch_size 256 \
--vocab_size 172130 \
--channel1_num 16 \
......
......@@ -15,45 +15,85 @@ class Net(object):
self._stack_num = stack_num
self._channel1_num = channel1_num
self._channel2_num = channel2_num
self._feed_names = []
self.word_emb_name = "shared_word_emb"
self.use_stack_op = True
self.use_mask_cache = True
self.use_sparse_embedding = True
def set_word_embedding(self, word_emb, place):
word_emb_param = fluid.global_scope().find_var(
self.word_emb_name).get_tensor()
word_emb_param.set(word_emb, place)
def create_network(self):
mask_cache = dict() if self.use_mask_cache else None
turns_data = []
def create_py_reader(self, capacity, name):
# turns ids
shapes = [[-1, self._max_turn_len, 1]
for i in six.moves.xrange(self._max_turn_num)]
dtypes = ["int32" for i in six.moves.xrange(self._max_turn_num)]
# turns mask
shapes += [[-1, self._max_turn_len, 1]
for i in six.moves.xrange(self._max_turn_num)]
dtypes += ["float32" for i in six.moves.xrange(self._max_turn_num)]
# response ids, response mask, label
shapes += [[-1, self._max_turn_len, 1], [-1, self._max_turn_len, 1],
[-1, 1]]
dtypes += ["int32", "float32", "float32"]
py_reader = fluid.layers.py_reader(
capacity=capacity,
shapes=shapes,
lod_levels=[0] * (2 * self._max_turn_num + 3),
dtypes=dtypes,
name=name,
use_double_buffer=True)
data_vars = fluid.layers.read_file(py_reader)
self.turns_data = data_vars[0:self._max_turn_num]
self.turns_mask = data_vars[self._max_turn_num:2 * self._max_turn_num]
self.response = data_vars[-3]
self.response_mask = data_vars[-2]
self.label = data_vars[-1]
return py_reader
def create_data_layers(self):
self._feed_names = []
self.turns_data = []
for i in six.moves.xrange(self._max_turn_num):
name = "turn_%d" % i
turn = fluid.layers.data(
name="turn_%d" % i,
shape=[self._max_turn_len, 1],
dtype="int32")
turns_data.append(turn)
name=name, shape=[self._max_turn_len, 1], dtype="int32")
self.turns_data.append(turn)
self._feed_names.append(name)
turns_mask = []
self.turns_mask = []
for i in six.moves.xrange(self._max_turn_num):
name = "turn_mask_%d" % i
turn_mask = fluid.layers.data(
name="turn_mask_%d" % i,
shape=[self._max_turn_len, 1],
dtype="float32")
turns_mask.append(turn_mask)
name=name, shape=[self._max_turn_len, 1], dtype="float32")
self.turns_mask.append(turn_mask)
self._feed_names.append(name)
response = fluid.layers.data(
self.response = fluid.layers.data(
name="response", shape=[self._max_turn_len, 1], dtype="int32")
response_mask = fluid.layers.data(
self.response_mask = fluid.layers.data(
name="response_mask",
shape=[self._max_turn_len, 1],
dtype="float32")
label = fluid.layers.data(name="label", shape=[1], dtype="float32")
self.label = fluid.layers.data(name="label", shape=[1], dtype="float32")
self._feed_names += ["response", "response_mask", "label"]
def get_feed_names(self):
return self._feed_names
def set_word_embedding(self, word_emb, place):
word_emb_param = fluid.global_scope().find_var(
self.word_emb_name).get_tensor()
word_emb_param.set(word_emb, place)
def create_network(self):
mask_cache = dict() if self.use_mask_cache else None
response_emb = fluid.layers.embedding(
input=response,
input=self.response,
size=[self._vocab_size + 1, self._emb_size],
is_sparse=self.use_sparse_embedding,
param_attr=fluid.ParamAttr(
......@@ -71,8 +111,8 @@ class Net(object):
key=Hr,
value=Hr,
d_key=self._emb_size,
q_mask=response_mask,
k_mask=response_mask,
q_mask=self.response_mask,
k_mask=self.response_mask,
mask_cache=mask_cache)
Hr_stack.append(Hr)
......@@ -80,7 +120,7 @@ class Net(object):
sim_turns = []
for t in six.moves.xrange(self._max_turn_num):
Hu = fluid.layers.embedding(
input=turns_data[t],
input=self.turns_data[t],
size=[self._vocab_size + 1, self._emb_size],
is_sparse=self.use_sparse_embedding,
param_attr=fluid.ParamAttr(
......@@ -96,8 +136,8 @@ class Net(object):
key=Hu,
value=Hu,
d_key=self._emb_size,
q_mask=turns_mask[t],
k_mask=turns_mask[t],
q_mask=self.turns_mask[t],
k_mask=self.turns_mask[t],
mask_cache=mask_cache)
Hu_stack.append(Hu)
......@@ -111,8 +151,8 @@ class Net(object):
key=Hr_stack[index],
value=Hr_stack[index],
d_key=self._emb_size,
q_mask=turns_mask[t],
k_mask=response_mask,
q_mask=self.turns_mask[t],
k_mask=self.response_mask,
mask_cache=mask_cache)
r_a_t = layers.block(
name="r_attend_t_" + str(index),
......@@ -120,8 +160,8 @@ class Net(object):
key=Hu_stack[index],
value=Hu_stack[index],
d_key=self._emb_size,
q_mask=response_mask,
k_mask=turns_mask[t],
q_mask=self.response_mask,
k_mask=self.turns_mask[t],
mask_cache=mask_cache)
t_a_r_stack.append(t_a_r)
......@@ -158,5 +198,5 @@ class Net(object):
sim = fluid.layers.concat(input=sim_turns, axis=2)
final_info = layers.cnn_3d(sim, self._channel1_num, self._channel2_num)
loss, logits = layers.loss(final_info, label)
loss, logits = layers.loss(final_info, self.label)
return loss, logits
......@@ -7,7 +7,7 @@ import multiprocessing
import paddle
import paddle.fluid as fluid
import utils.reader as reader
from utils.util import print_arguments
from utils.util import print_arguments, mkdir
try:
import cPickle as pickle #python 2
......@@ -49,6 +49,10 @@ def parse_args():
'--use_cuda',
action='store_true',
help='If set, use cuda for training.')
parser.add_argument(
'--use_pyreader',
action='store_true',
help='If set, use pyreader for reading data.')
parser.add_argument(
'--ext_eval',
action='store_true',
......@@ -105,7 +109,75 @@ def parse_args():
#yapf: enable
def evaluate(score_path, result_file_path):
if args.ext_eval:
import utils.douban_evaluation as eva
else:
import utils.evaluation as eva
#write evaluation result
result = eva.evaluate(score_path)
with open(result_file_path, 'w') as out_file:
for p_at in result:
out_file.write(str(p_at) + '\n')
print('finish evaluation')
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
def test_with_feed(exe, program, feed_names, fetch_list, score_path, batches,
batch_num, dev_count):
score_file = open(score_path, 'w')
for it in six.moves.xrange(batch_num // dev_count):
feed_list = []
for dev in six.moves.xrange(dev_count):
val_index = it * dev_count + dev
batch_data = reader.make_one_batch_input(batches, val_index)
feed_dict = dict(zip(feed_names, batch_data))
feed_list.append(feed_dict)
predicts = exe.run(feed=feed_list, fetch_list=fetch_list)
scores = np.array(predicts[0])
for dev in six.moves.xrange(dev_count):
val_index = it * dev_count + dev
for i in six.moves.xrange(args.batch_size):
score_file.write(
str(scores[args.batch_size * dev + i][0]) + '\t' + str(
batches["label"][val_index][i]) + '\n')
score_file.close()
def test_with_pyreader(exe, program, pyreader, fetch_list, score_path, batches,
batch_num, dev_count):
def data_provider():
for index in six.moves.xrange(batch_num):
yield reader.make_one_batch_input(batches, index)
score_file = open(score_path, 'w')
pyreader.decorate_tensor_provider(data_provider)
it = 0
pyreader.start()
while True:
try:
predicts = exe.run(fetch_list=fetch_list)
scores = np.array(predicts[0])
for dev in six.moves.xrange(dev_count):
val_index = it * dev_count + dev
for i in six.moves.xrange(args.batch_size):
score_file.write(
str(scores[args.batch_size * dev + i][0]) + '\t' + str(
batches["label"][val_index][i]) + '\n')
it += 1
except fluid.core.EOFException:
pyreader.reset()
break
score_file.close()
def train(args):
if not os.path.exists(args.save_path):
os.makedirs(args.save_path)
# data data_config
data_conf = {
"batch_size": args.batch_size,
......@@ -117,27 +189,47 @@ def train(args):
dam = Net(args.max_turn_num, args.max_turn_len, args.vocab_size,
args.emb_size, args.stack_num, args.channel1_num,
args.channel2_num)
loss, logits = dam.create_network()
loss.persistable = True
logits.persistable = True
train_program = fluid.default_main_program()
test_program = fluid.default_main_program().clone(for_test=True)
# gradient clipping
fluid.clip.set_gradient_clip(clip=fluid.clip.GradientClipByValue(
max=1.0, min=-1.0))
optimizer = fluid.optimizer.Adam(
learning_rate=fluid.layers.exponential_decay(
learning_rate=args.learning_rate,
decay_steps=400,
decay_rate=0.9,
staircase=True))
optimizer.minimize(loss)
fluid.memory_optimize(train_program)
train_program = fluid.Program()
train_startup = fluid.Program()
with fluid.program_guard(train_program, train_startup):
with fluid.unique_name.guard():
if args.use_pyreader:
train_pyreader = dam.create_py_reader(
capacity=10, name='train_reader')
else:
dam.create_data_layers()
loss, logits = dam.create_network()
loss.persistable = True
logits.persistable = True
# gradient clipping
fluid.clip.set_gradient_clip(clip=fluid.clip.GradientClipByValue(
max=1.0, min=-1.0))
optimizer = fluid.optimizer.Adam(
learning_rate=fluid.layers.exponential_decay(
learning_rate=args.learning_rate,
decay_steps=400,
decay_rate=0.9,
staircase=True))
optimizer.minimize(loss)
fluid.memory_optimize(train_program)
test_program = fluid.Program()
test_startup = fluid.Program()
with fluid.program_guard(test_program, test_startup):
with fluid.unique_name.guard():
if args.use_pyreader:
test_pyreader = dam.create_py_reader(
capacity=10, name='test_reader')
else:
dam.create_data_layers()
loss, logits = dam.create_network()
loss.persistable = True
logits.persistable = True
test_program = test_program.clone(for_test=True)
if args.use_cuda:
place = fluid.CUDAPlace(0)
......@@ -152,7 +244,8 @@ def train(args):
program=train_program, batch_size=args.batch_size))
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
exe.run(train_startup)
exe.run(test_startup)
train_exe = fluid.ParallelExecutor(
use_cuda=args.use_cuda, loss_name=loss.name, main_program=train_program)
......@@ -162,11 +255,6 @@ def train(args):
main_program=test_program,
share_vars_from=train_exe)
if args.ext_eval:
import utils.douban_evaluation as eva
else:
import utils.evaluation as eva
if args.word_emb_init is not None:
print("start loading word embedding init ...")
if six.PY2:
......@@ -199,17 +287,15 @@ def train(args):
print("begin model training ...")
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
step = 0
for epoch in six.moves.xrange(args.num_scan_data):
shuffle_train = reader.unison_shuffle(train_data)
train_batches = reader.build_batches(shuffle_train, data_conf)
# train on one epoch data by feeding
def train_with_feed(step):
ave_cost = 0.0
for it in six.moves.xrange(batch_num // dev_count):
feed_list = []
for dev in six.moves.xrange(dev_count):
index = it * dev_count + dev
feed_dict = reader.make_one_batch_input(train_batches, index)
batch_data = reader.make_one_batch_input(train_batches, index)
feed_dict = dict(zip(dam.get_feed_names(), batch_data))
feed_list.append(feed_dict)
cost = train_exe.run(feed=feed_list, fetch_list=[loss.name])
......@@ -226,41 +312,73 @@ def train(args):
print("Save model at step %d ... " % step)
print(time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(time.time())))
fluid.io.save_persistables(exe, save_path)
fluid.io.save_persistables(exe, save_path, train_program)
score_path = os.path.join(args.save_path, 'score.' + str(step))
score_file = open(score_path, 'w')
for it in six.moves.xrange(val_batch_num // dev_count):
feed_list = []
for dev in six.moves.xrange(dev_count):
val_index = it * dev_count + dev
feed_dict = reader.make_one_batch_input(val_batches,
val_index)
feed_list.append(feed_dict)
predicts = test_exe.run(feed=feed_list,
fetch_list=[logits.name])
scores = np.array(predicts[0])
for dev in six.moves.xrange(dev_count):
val_index = it * dev_count + dev
for i in six.moves.xrange(args.batch_size):
score_file.write(
str(scores[args.batch_size * dev + i][0]) + '\t'
+ str(val_batches["label"][val_index][
i]) + '\n')
score_file.close()
#write evaluation result
result = eva.evaluate(score_path)
test_with_feed(test_exe, test_program,
dam.get_feed_names(), [logits.name], score_path,
val_batches, val_batch_num, dev_count)
result_file_path = os.path.join(args.save_path,
'result.' + str(step))
with open(result_file_path, 'w') as out_file:
for p_at in result:
out_file.write(str(p_at) + '\n')
print('finish evaluation')
print(time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(time.time())))
evaluate(score_path, result_file_path)
return step
# train on one epoch with pyreader
def train_with_pyreader(step):
def data_provider():
for index in six.moves.xrange(batch_num):
yield reader.make_one_batch_input(train_batches, index)
train_pyreader.decorate_tensor_provider(data_provider)
ave_cost = 0.0
train_pyreader.start()
while True:
try:
cost = train_exe.run(fetch_list=[loss.name])
ave_cost += np.array(cost[0]).mean()
step = step + 1
if step % print_step == 0:
print("processed: [" + str(step * dev_count * 1.0 /
batch_num) + "] ave loss: [" +
str(ave_cost / print_step) + "]")
ave_cost = 0.0
if (args.save_path is not None) and (step % save_step == 0):
save_path = os.path.join(args.save_path,
"step_" + str(step))
print("Save model at step %d ... " % step)
print(time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(time.time())))
fluid.io.save_persistables(exe, save_path, train_program)
score_path = os.path.join(args.save_path,
'score.' + str(step))
test_with_pyreader(test_exe, test_program, test_pyreader,
[logits.name], score_path, val_batches,
val_batch_num, dev_count)
result_file_path = os.path.join(args.save_path,
'result.' + str(step))
evaluate(score_path, result_file_path)
except fluid.core.EOFException:
train_pyreader.reset()
break
return step
# train over different epoches
global_step = 0
for epoch in six.moves.xrange(args.num_scan_data):
shuffle_train = reader.unison_shuffle(train_data)
train_batches = reader.build_batches(shuffle_train, data_conf)
if args.use_pyreader:
global_step = train_with_pyreader(global_step)
else:
global_step = train_with_feed(global_step)
if __name__ == '__main__':
......
......@@ -4,6 +4,7 @@ python -u ../train_and_evaluate.py --use_cuda \
--data_path ./data/data.pkl \
--word_emb_init ./data/word_embedding.pkl \
--save_path ./models \
--use_pyreader \
--batch_size 256 \
--vocab_size 434512 \
--emb_size 200 \
......
......@@ -202,30 +202,30 @@ def make_one_batch_input(data_batches, index):
every_turn_len[:, i] for i in six.moves.xrange(max_turn_num)
]
feed_dict = {}
feed_list = []
for i, turn in enumerate(turns_list):
feed_dict["turn_%d" % i] = turn
feed_dict["turn_%d" % i] = np.expand_dims(
feed_dict["turn_%d" % i], axis=-1)
turn = np.expand_dims(turn, axis=-1)
feed_list.append(turn)
for i, turn_len in enumerate(every_turn_len_list):
feed_dict["turn_mask_%d" % i] = np.ones(
(batch_size, max_turn_len, 1)).astype("float32")
turn_mask = np.ones((batch_size, max_turn_len, 1)).astype("float32")
for row in six.moves.xrange(batch_size):
feed_dict["turn_mask_%d" % i][row, turn_len[row]:, 0] = 0
turn_mask[row, turn_len[row]:, 0] = 0
feed_list.append(turn_mask)
feed_dict["response"] = response
feed_dict["response"] = np.expand_dims(feed_dict["response"], axis=-1)
response = np.expand_dims(response, axis=-1)
feed_list.append(response)
feed_dict["response_mask"] = np.ones(
(batch_size, max_turn_len, 1)).astype("float32")
response_mask = np.ones((batch_size, max_turn_len, 1)).astype("float32")
for row in six.moves.xrange(batch_size):
feed_dict["response_mask"][row, response_len[row]:, 0] = 0
response_mask[row, response_len[row]:, 0] = 0
feed_list.append(response_mask)
feed_dict["label"] = np.array([data_batches["label"][index]]).reshape(
label = np.array([data_batches["label"][index]]).reshape(
[-1, 1]).astype("float32")
feed_list.append(label)
return feed_dict
return feed_list
if __name__ == '__main__':
......
......@@ -7,7 +7,6 @@
- [Introduction](#introduction)
- [Data preparation](#data-preparation)
- [Training](#training)
- [Finetuning](#finetuning)
- [Evaluation](#evaluation)
- [Inference and Visualization](#inference-and-visualization)
- [Appendix](#appendix)
......@@ -24,10 +23,10 @@ Running sample code in this directory requires PaddelPaddle Fluid v.1.0.0 and la
Faster RCNN model
</p>
1. Base conv layerAs a CNN objective dection, Faster RCNN extract feature maps using a basic convolutional network. The feature maps then can be shared by RPN and fc layers. This sampel uses [ResNet-50](https://arxiv.org/abs/1512.03385) as base conv layer.
2. Region Proposal Network (RPN)RPN generates proposals for detection。This block generates anchors by a set of size and ratio and classifies anchors into fore-ground and back-ground by softmax. Then refine anchors to obtain more precise proposals using box regression.
3. RoI pooling。This layer takes feature maps and proposals as input. The proposals are mapped to feature maps and pooled to the same size. The output are sent to fc layers for classification and regression.
4. Detection layerUsing the output of roi pooling to compute the class and locatoin of each proposal in two fc layers.
1. Base conv layer. As a CNN objective dection, Faster RCNN extract feature maps using a basic convolutional network. The feature maps then can be shared by RPN and fc layers. This sampel uses [ResNet-50](https://arxiv.org/abs/1512.03385) as base conv layer.
2. Region Proposal Network (RPN). RPN generates proposals for detection。This block generates anchors by a set of size and ratio and classifies anchors into fore-ground and back-ground by softmax. Then refine anchors to obtain more precise proposals using box regression.
3. RoI Align. This layer takes feature maps and proposals as input. The proposals are mapped to feature maps and pooled to the same size. The output are sent to fc layers for classification and regression. RoIPool and RoIAlign are used separately to this layer and it can be set in roi\_func in config.py.
4. Detection layer. Using the output of roi pooling to compute the class and locatoin of each proposal in two fc layers.
## Data preparation
......@@ -42,10 +41,9 @@ Train the model on [MS-COCO dataset](http://cocodataset.org/#download), download
After data preparation, one can start the training step by:
python train.py \
--max_size=1333 \
--scales=[800] \
--batch_size=8 \
--model_save_dir=output/
--model_save_dir=output/ \
--pretrained_model=${path_to_pretrain_model}
--data_dir=${path_to_data}
- Set ```export CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7``` to specifiy 8 GPU to train.
- For more help on arguments:
......@@ -83,7 +81,7 @@ To train the model, [cocoapi](https://github.com/cocodataset/cocoapi) is needed.
**model configuration:**
* Use RoIPooling.
* Use RoIAlign and RoIPool separately.
* NMS threshold=0.7. During training, pre\_nms=12000, post\_nms=2000; during test, pre\_nms=6000, post\_nms=1000.
* In generating proposal lables, fg\_fraction=0.25, fg\_thresh=0.5, bg\_thresh_hi=0.5, bg\_thresh\_lo=0.0.
* In rpn target assignment, rpn\_fg\_fraction=0.5, rpn\_positive\_overlap=0.7, rpn\_negative\_overlap=0.3.
......@@ -102,20 +100,10 @@ Training result is shown as below:
<img src="image/train_loss.jpg" height=500 width=650 hspace='10'/> <br />
Faster RCNN train loss
</p>
* Fluid all padding: Each image padding to 1333\*1333.
* Fluid minibatch padding: Images in one batch padding to the same size. This method is same as detectron.
* Fluid no padding: Images without padding.
## Finetuning
Finetuning is to finetune model weights in a specific task by loading pretrained weights. After initializing ```pretrained_model```, one can finetune a model as:
python train.py
--max_size=1333 \
--scales=800 \
--pretrained_model=${path_to_pretrain_model} \
--batch_size= 8\
--model_save_dir=output/
* Fluid RoIPool minibatch padding: Use RoIPool. Images in one batch padding to the same size. This method is same as detectron.
* Fluid RoIpool no padding: Use RoIPool. Images without padding.
* Fluid RoIAlign no padding: Use RoIAlign. Images without padding.
## Evaluation
......@@ -126,26 +114,28 @@ Evaluation is to evaluate the performance of a trained model. This sample provid
python eval_coco_map.py \
--dataset=coco2017 \
--pretrained_mode=${path_to_pretrain_model} \
--batch_size=1 \
--nms_threshold=0.5 \
--score_threshold=0.05
- Set ```export CUDA_VISIBLE_DEVICES=0``` to specifiy one GPU to eval.
Evalutaion result is shown as below:
<p align="center">
<img src="image/mAP.jpg" height=500 width=650 hspace='10'/> <br />
Faster RCNN mAP
</p>
| Model | Batch size | Max iteration | mAP |
| :------------------------------ | :------------: | :-------------------:|------: |
| Detectron | 8 | 180000 | 0.315 |
| Fluid minibatch padding | 8 | 180000 | 0.314 |
| Fluid all padding | 8 | 180000 | 0.308 |
| Fluid no padding |8 | 180000 | 0.316 |
* Fluid all padding: Each image padding to 1333\*1333.
* Fluid minibatch padding: Images in one batch padding to the same size. This method is same as detectron.
* Fluid no padding: Images without padding.
| Model | RoI function | Batch size | Max iteration | mAP |
| :--------------- | :--------: | :------------: | :------------------: |------: |
| Detectron_RoIPool | RoIPool | 8 | 180000 | 0.315 |
| Fluid RoIPool minibatch padding | RoIPool | 8 | 180000 | 0.314 |
| Fluid RoIPool no padding | RoIPool | 8 | 180000 | 0.316 |
| Detectron_RoIAlign | RoIAlign | 8 | 180000 | 0.346 |
| Fluid RoIAlign no padding | RoIAlign | 8 | 180000 | 0.344 |
* Fluid RoIPool minibatch padding: Use RoIPool. Images in one batch padding to the same size. This method is same as detectron.
* Fluid RoIPool no padding: Images without padding.
* Fluid RoIAlign no padding: Images without padding.
## Inference and Visualization
......
......@@ -7,7 +7,6 @@
- [简介](#简介)
- [数据准备](#数据准备)
- [模型训练](#模型训练)
- [参数微调](#参数微调)
- [模型评估](#模型评估)
- [模型推断及可视化](#模型推断及可视化)
- [附录](#附录)
......@@ -26,7 +25,7 @@ Faster RCNN 目标检测模型
1. 基础卷积层。作为一种卷积神经网络目标检测方法,Faster RCNN首先使用一组基础的卷积网络提取图像的特征图。特征图被后续RPN层和全连接层共享。本示例采用[ResNet-50](https://arxiv.org/abs/1512.03385)作为基础卷积层。
2. 区域生成网络(RPN)。RPN网络用于生成候选区域(proposals)。该层通过一组固定的尺寸和比例得到一组锚点(anchors), 通过softmax判断锚点属于前景或者背景,再利用区域回归修正锚点从而获得精确的候选区域。
3. RoI池化。该层收集输入的特征图和候选区域,将候选区域映射到特征图中并池化为统一大小的区域特征图,送入全连接层判定目标类别
3. RoI Align。该层收集输入的特征图和候选区域,将候选区域映射到特征图中并池化为统一大小的区域特征图,送入全连接层判定目标类别, 该层可选用RoIPool和RoIAlign两种方式,在config.py中设置roi\_func
4. 检测层。利用区域特征图计算候选区域的类别,同时再次通过区域回归获得检测框最终的精确位置。
## 数据准备
......@@ -41,11 +40,9 @@ Faster RCNN 目标检测模型
数据准备完毕后,可以通过如下的方式启动训练:
python train.py \
--max_size=1333 \
--scales=[800] \
--batch_size=8 \
--model_save_dir=output/ \
--pretrained_model=${path_to_pretrain_model}
--data_dir=${path_to_data}
- 通过设置export CUDA\_VISIBLE\_DEVICES=0,1,2,3,4,5,6,7指定8卡GPU训练。
- 可选参数见:
......@@ -74,11 +71,11 @@ Faster RCNN 目标检测模型
# not to install the COCO API into global site-packages
python2 setup.py install --user
**数据读取器说明:** 数据读取器定义在reader.py中。所有图像将短边等比例缩放至`scales`,若长边大于`max_size`, 则再次将长边等比例缩放至`max_iter`。在训练阶段,对图像采用水平翻转。支持将同一个batch内的图像padding为相同尺寸。
**数据读取器说明:** 数据读取器定义在reader.py中。所有图像将短边等比例缩放至`scales`,若长边大于`max_size`, 则再次将长边等比例缩放至`max_size`。在训练阶段,对图像采用水平翻转。支持将同一个batch内的图像padding为相同尺寸。
**模型设置:**
* 使用RoIPooling
* 分别使用RoIAlign和RoIPool两种方法
* 训练过程pre\_nms=12000, post\_nms=2000,测试过程pre\_nms=6000, post\_nms=1000。nms阈值为0.7。
* RPN网络得到labels的过程中,fg\_fraction=0.25,fg\_thresh=0.5,bg\_thresh_hi=0.5,bg\_thresh\_lo=0.0
* RPN选择anchor时,rpn\_fg\_fraction=0.5,rpn\_positive\_overlap=0.7,rpn\_negative\_overlap=0.3
......@@ -89,9 +86,10 @@ Faster RCNN 目标检测模型
<img src="image/train_loss.jpg" height=500 width=650 hspace='10'/> <br />
Faster RCNN 训练loss
</p>
* Fluid all padding: 每张图像填充为1333\*1333大小。
* Fluid minibatch padding: 同一个batch内的图像填充为相同尺寸。该方法与detectron处理相同。
* Fluid no padding: 不对图像做填充处理。
* Fluid RoIPool minibatch padding: 使用RoIPool,同一个batch内的图像填充为相同尺寸。该方法与detectron处理相同。
* Fluid RoIPool no padding: 使用RoIPool,不对图像做填充处理。
* Fluid RoIAlign no padding: 使用RoIAlign,不对图像做填充处理。
**训练策略:**
......@@ -110,26 +108,31 @@ Faster RCNN 训练loss
python eval_coco_map.py \
--dataset=coco2017 \
--pretrained_mode=${path_to_pretrain_model} \
--batch_size=1 \
--nms_threshold=0.5 \
--score_threshold=0.05
- 通过设置export CUDA\_VISIBLE\_DEVICES=0指定单卡GPU评估。
下图为模型评估结果:
<p align="center">
<img src="image/mAP.jpg" height=500 width=650 hspace='10'/> <br />
Faster RCNN mAP
</p>
| 模型 | 批量大小 | 迭代次数 | mAP |
| :------------------------------ | :------------: | :------------------: |------: |
| Detectron | 8 | 180000 | 0.315 |
| Fluid minibatch padding | 8 | 180000 | 0.314 |
| Fluid all padding | 8 | 180000 | 0.308 |
| Fluid no padding |8 | 180000 | 0.316 |
| 模型 | RoI处理方式 | 批量大小 | 迭代次数 | mAP |
| :--------------- | :--------: | :------------: | :------------------: |------: |
| Detectron RoIPool | RoIPool | 8 | 180000 | 0.315 |
| Fluid RoIPool minibatch padding | RoIPool | 8 | 180000 | 0.314 |
| Fluid RoIPool no padding | RoIPool | 8 | 180000 | 0.316 |
| Detectron RoIAlign | RoIAlign | 8 | 180000 | 0.346 |
| Fluid RoIAlign no padding | RoIAlign | 8 | 180000 | 0.344 |
* Fluid all padding: 每张图像填充为1333\*1333大小
* Fluid minibatch padding: 同一个batch内的图像填充为相同尺寸。该方法与detectron处理相同
* Fluid no padding: 不对图像做填充处理。
* Fluid RoIPool minibatch padding: 使用RoIPool,同一个batch内的图像填充为相同尺寸。该方法与detectron处理相同
* Fluid RoIPool no padding: 使用RoIPool,不对图像做填充处理
* Fluid RoIAlign no padding: 使用RoIAlign,不对图像做填充处理。
## 模型推断及可视化
......
fluid/faster_rcnn/image/mAP.jpg

41.0 KB | W: | H:

fluid/faster_rcnn/image/mAP.jpg

201.1 KB | W: | H:

fluid/faster_rcnn/image/mAP.jpg
fluid/faster_rcnn/image/mAP.jpg
fluid/faster_rcnn/image/mAP.jpg
fluid/faster_rcnn/image/mAP.jpg
  • 2-up
  • Swipe
  • Onion skin
......@@ -269,7 +269,7 @@ class FasterRCNN(object):
x=rpn_cls_score_reshape, shape=(0, -1, 1))
rpn_bbox_pred_reshape = fluid.layers.reshape(
x=rpn_bbox_pred_reshape, shape=(0, -1, 4))
score_pred, loc_pred, score_tgt, loc_tgt = \
score_pred, loc_pred, score_tgt, loc_tgt, bbox_weight = \
fluid.layers.rpn_target_assign(
bbox_pred=rpn_bbox_pred_reshape,
cls_logits=rpn_cls_score_reshape,
......@@ -290,7 +290,12 @@ class FasterRCNN(object):
rpn_cls_loss = fluid.layers.reduce_mean(
rpn_cls_loss, name='loss_rpn_cls')
rpn_reg_loss = fluid.layers.smooth_l1(x=loc_pred, y=loc_tgt, sigma=3.0)
rpn_reg_loss = fluid.layers.smooth_l1(
x=loc_pred,
y=loc_tgt,
sigma=3.0,
inside_weight=bbox_weight,
outside_weight=bbox_weight)
rpn_reg_loss = fluid.layers.reduce_sum(
rpn_reg_loss, name='loss_rpn_bbox')
score_shape = fluid.layers.shape(score_tgt)
......
......@@ -52,7 +52,7 @@ def train():
boundaries = cfg.lr_steps
gamma = cfg.lr_gamma
step_num = len(lr_steps)
step_num = len(cfg.lr_steps)
values = [learning_rate * (gamma**i) for i in range(step_num + 1)]
optimizer = fluid.optimizer.Momentum(
......
......@@ -102,6 +102,7 @@ def coco(mode,
roidb_perm.rotate(-1)
if roidb_cur >= len(roidbs):
roidb_perm = deque(np.random.permutation(roidbs))
roidb_cur = 0
im, gt_boxes, gt_classes, is_crowd, im_info, im_id = roidb_reader(
roidb, mode)
if gt_boxes.shape[0] == 0:
......
......@@ -61,7 +61,7 @@ def train():
boundaries = cfg.lr_steps
gamma = cfg.lr_gamma
step_num = len(lr_steps)
step_num = len(cfg.lr_steps)
values = [learning_rate * (gamma**i) for i in range(step_num + 1)]
optimizer = fluid.optimizer.Momentum(
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
__all__ = ['parse_args', ]
BENCHMARK_MODELS = [
"ResNet50", "ResNet101", "ResNet152"
]
def parse_args():
parser = argparse.ArgumentParser('Distributed Image Classification Training.')
parser.add_argument(
'--model',
type=str,
choices=BENCHMARK_MODELS,
default='resnet',
help='The model to run benchmark with.')
parser.add_argument(
'--batch_size', type=int, default=32, help='The minibatch size.')
# args related to learning rate
parser.add_argument(
'--learning_rate', type=float, default=0.001, help='The learning rate.')
# TODO(wuyi): add "--use_fake_data" option back.
parser.add_argument(
'--skip_batch_num',
type=int,
default=5,
help='The first num of minibatch num to skip, for better performance test'
)
parser.add_argument(
'--iterations', type=int, default=80, help='The number of minibatches.')
parser.add_argument(
'--pass_num', type=int, default=100, help='The number of passes.')
parser.add_argument(
'--data_format',
type=str,
default='NCHW',
choices=['NCHW', 'NHWC'],
help='The data data_format, now only support NCHW.')
parser.add_argument(
'--device',
type=str,
default='GPU',
choices=['CPU', 'GPU'],
help='The device type.')
parser.add_argument(
'--gpus',
type=int,
default=1,
help='If gpus > 1, will use ParallelExecutor to run, else use Executor.')
# this option is available only for vgg and resnet.
parser.add_argument(
'--cpus',
type=int,
default=1,
help='If cpus > 1, will set ParallelExecutor to use multiple threads.')
parser.add_argument(
'--data_set',
type=str,
default='flowers',
choices=['cifar10', 'flowers', 'imagenet'],
help='Optional dataset for benchmark.')
parser.add_argument(
'--no_test',
action='store_true',
help='If set, do not test the testset during training.')
parser.add_argument(
'--memory_optimize',
action='store_true',
help='If set, optimize runtime memory before start.')
parser.add_argument(
'--update_method',
type=str,
default='local',
choices=['local', 'pserver', 'nccl2'],
help='Choose parameter update method, can be local, pserver, nccl2.')
parser.add_argument(
'--no_split_var',
action='store_true',
default=False,
help='Whether split variables into blocks when update_method is pserver')
parser.add_argument(
'--async_mode',
action='store_true',
default=False,
help='Whether start pserver in async mode to support ASGD')
parser.add_argument(
'--no_random',
action='store_true',
help='If set, keep the random seed and do not shuffle the data.')
parser.add_argument(
'--reduce_strategy',
type=str,
choices=['reduce', 'all_reduce'],
default='all_reduce',
help='Specify the reduce strategy, can be reduce, all_reduce')
parser.add_argument(
'--data_dir',
type=str,
default="../data/ILSVRC2012",
help="The ImageNet dataset root dir."
)
args = parser.parse_args()
return args
......@@ -26,9 +26,84 @@ import six
import sys
sys.path.append("..")
import models
from args import *
from reader import train, val
def parse_args():
parser = argparse.ArgumentParser('Distributed Image Classification Training.')
parser.add_argument(
'--model',
type=str,
default='DistResNet',
help='The model to run.')
parser.add_argument(
'--batch_size', type=int, default=32, help='The minibatch size per device.')
parser.add_argument(
'--multi_batch_repeat', type=int, default=1, help='Batch merge repeats.')
parser.add_argument(
'--learning_rate', type=float, default=0.1, help='The learning rate.')
parser.add_argument(
'--pass_num', type=int, default=90, help='The number of passes.')
parser.add_argument(
'--data_format',
type=str,
default='NCHW',
choices=['NCHW', 'NHWC'],
help='The data data_format, now only support NCHW.')
parser.add_argument(
'--device',
type=str,
default='GPU',
choices=['CPU', 'GPU'],
help='The device type.')
parser.add_argument(
'--gpus',
type=int,
default=1,
help='If gpus > 1, will use ParallelExecutor to run, else use Executor.')
parser.add_argument(
'--cpus',
type=int,
default=1,
help='If cpus > 1, will set ParallelExecutor to use multiple threads.')
parser.add_argument(
'--no_test',
action='store_true',
help='If set, do not test the testset during training.')
parser.add_argument(
'--memory_optimize',
action='store_true',
help='If set, optimize runtime memory before start.')
parser.add_argument(
'--update_method',
type=str,
default='local',
choices=['local', 'pserver', 'nccl2'],
help='Choose parameter update method, can be local, pserver, nccl2.')
parser.add_argument(
'--no_split_var',
action='store_true',
default=False,
help='Whether split variables into blocks when update_method is pserver')
parser.add_argument(
'--async_mode',
action='store_true',
default=False,
help='Whether start pserver in async mode to support ASGD')
parser.add_argument(
'--reduce_strategy',
type=str,
choices=['reduce', 'all_reduce'],
default='all_reduce',
help='Specify the reduce strategy, can be reduce, all_reduce')
parser.add_argument(
'--data_dir',
type=str,
default="../data/ILSVRC2012",
help="The ImageNet dataset root dir."
)
args = parser.parse_args()
return args
def get_model(args, is_train, main_prog, startup_prog):
pyreader = None
class_dim = 1000
......@@ -51,7 +126,7 @@ def get_model(args, is_train, main_prog, startup_prog):
name="train_reader" if is_train else "test_reader",
use_double_buffer=True)
input, label = fluid.layers.read_file(pyreader)
model_def = models.__dict__[args.model]()
model_def = models.__dict__[args.model](layers=50, is_train=is_train)
predict = model_def.net(input, class_dim=class_dim)
cost = fluid.layers.cross_entropy(input=predict, label=label)
......@@ -60,89 +135,64 @@ def get_model(args, is_train, main_prog, startup_prog):
batch_acc1 = fluid.layers.accuracy(input=predict, label=label, k=1)
batch_acc5 = fluid.layers.accuracy(input=predict, label=label, k=5)
# configure optimize
optimizer = None
if is_train:
start_lr = args.learning_rate
# n * worker * repeat
end_lr = args.learning_rate * trainer_count * args.multi_batch_repeat
total_images = 1281167 / trainer_count
step = int(total_images / (args.batch_size * args.gpus) + 1)
epochs = [30, 60, 90]
step = int(total_images / (args.batch_size * args.gpus * args.multi_batch_repeat) + 1)
warmup_steps = step * 5 # warmup 5 passes
epochs = [30, 60, 80]
bd = [step * e for e in epochs]
base_lr = args.learning_rate
base_lr = end_lr
lr = []
lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)]
optimizer = fluid.optimizer.Momentum(
learning_rate=fluid.layers.piecewise_decay(
boundaries=bd, values=lr),
learning_rate=models.learning_rate.lr_warmup(
fluid.layers.piecewise_decay(
boundaries=bd, values=lr),
warmup_steps, start_lr, end_lr),
momentum=0.9,
regularization=fluid.regularizer.L2Decay(1e-4))
optimizer.minimize(avg_cost)
if args.memory_optimize:
fluid.memory_optimize(main_prog)
batched_reader = None
pyreader.decorate_paddle_reader(
paddle.batch(
reader if args.no_random else paddle.reader.shuffle(
reader, buf_size=5120),
reader,
batch_size=args.batch_size))
return avg_cost, optimizer, [batch_acc1,
batch_acc5], batched_reader, pyreader
def append_nccl2_prepare(trainer_id, startup_prog):
if trainer_id >= 0:
# append gen_nccl_id at the end of startup program
trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
port = os.getenv("PADDLE_PSERVER_PORT")
worker_ips = os.getenv("PADDLE_TRAINER_IPS")
worker_endpoints = []
for ip in worker_ips.split(","):
worker_endpoints.append(':'.join([ip, port]))
num_trainers = len(worker_endpoints)
current_endpoint = os.getenv("PADDLE_CURRENT_IP") + ":" + port
worker_endpoints.remove(current_endpoint)
nccl_id_var = startup_prog.global_block().create_var(
name="NCCLID",
persistable=True,
type=fluid.core.VarDesc.VarType.RAW)
startup_prog.global_block().append_op(
type="gen_nccl_id",
inputs={},
outputs={"NCCLID": nccl_id_var},
attrs={
"endpoint": current_endpoint,
"endpoint_list": worker_endpoints,
"trainer_id": trainer_id
})
return nccl_id_var, num_trainers, trainer_id
else:
raise Exception("must set positive PADDLE_TRAINER_ID env variables for "
"nccl-based dist train.")
trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
port = os.getenv("PADDLE_PSERVER_PORT")
worker_ips = os.getenv("PADDLE_TRAINER_IPS")
worker_endpoints = []
for ip in worker_ips.split(","):
worker_endpoints.append(':'.join([ip, port]))
current_endpoint = os.getenv("PADDLE_CURRENT_IP") + ":" + port
config = fluid.DistributeTranspilerConfig()
config.mode = "nccl2"
t = fluid.DistributeTranspiler(config=config)
t.transpile(trainer_id, trainers=','.join(worker_endpoints),
current_endpoint=current_endpoint,
startup_program=startup_prog)
def dist_transpile(trainer_id, args, train_prog, startup_prog):
if trainer_id < 0:
return None, None
# the port of all pservers, needed by both trainer and pserver
port = os.getenv("PADDLE_PSERVER_PORT", "6174")
# comma separated ips of all pservers, needed by trainer and
# pserver
pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "")
eplist = []
for ip in pserver_ips.split(","):
eplist.append(':'.join([ip, port]))
pserver_endpoints = ",".join(eplist)
# total number of workers/trainers in the job, needed by
# trainer and pserver
trainers = int(os.getenv("PADDLE_TRAINERS"))
# the IP of the local machine, needed by pserver only
current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port
# the role, should be either PSERVER or TRAINER
training_role = os.getenv("PADDLE_TRAINING_ROLE")
config = fluid.DistributeTranspilerConfig()
......@@ -150,8 +200,6 @@ def dist_transpile(trainer_id, args, train_prog, startup_prog):
t = fluid.DistributeTranspiler(config=config)
t.transpile(
trainer_id,
# NOTE: *MUST* use train_prog, for we are using with guard to
# generate different program for train and test.
program=train_prog,
pservers=pserver_endpoints,
trainers=trainers,
......@@ -171,7 +219,7 @@ def dist_transpile(trainer_id, args, train_prog, startup_prog):
)
def test_parallel(exe, test_args, args, test_prog, feeder):
def test_parallel(exe, test_args, args, test_prog):
acc_evaluators = []
for i in six.moves.xrange(len(test_args[2])):
acc_evaluators.append(fluid.metrics.Accuracy())
......@@ -190,13 +238,10 @@ def test_parallel(exe, test_args, args, test_prog, feeder):
return [e.eval() for e in acc_evaluators]
# NOTE: only need to benchmark using parallelexe
def train_parallel(train_args, test_args, args, train_prog, test_prog,
startup_prog, nccl_id_var, num_trainers, trainer_id):
over_all_start = time.time()
place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0)
feeder = None
if nccl_id_var and trainer_id == 0:
#FIXME(wuyi): wait other trainer to start listening
......@@ -237,31 +282,27 @@ def train_parallel(train_args, test_args, args, train_prog, test_prog,
if args.update_method == "pserver":
test_scope = None
else:
# NOTE: use an empty scope to avoid test exe using NCCLID
test_scope = fluid.Scope()
test_exe = fluid.ParallelExecutor(
True, main_program=test_prog, share_vars_from=exe)
True, main_program=test_prog, share_vars_from=exe,
scope=test_scope)
pyreader = train_args[4]
for pass_id in range(args.pass_num):
num_samples = 0
iters = 0
start_time = time.time()
batch_id = 0
pyreader.start()
while True:
if iters == args.iterations:
break
if iters == args.skip_batch_num:
start_time = time.time()
num_samples = 0
fetch_list = [avg_loss.name]
acc_name_list = [v.name for v in train_args[2]]
fetch_list.extend(acc_name_list)
try:
fetch_ret = exe.run(fetch_list)
if batch_id % 30 == 0:
fetch_ret = exe.run(fetch_list)
else:
fetch_ret = exe.run([])
except fluid.core.EOFException as eof:
break
except fluid.core.EnforceNotMet as ex:
......@@ -269,20 +310,17 @@ def train_parallel(train_args, test_args, args, train_prog, test_prog,
break
num_samples += args.batch_size * args.gpus
iters += 1
if batch_id % 1 == 0:
if batch_id % 30 == 0:
fetched_data = [np.mean(np.array(d)) for d in fetch_ret]
print("Pass %d, batch %d, loss %s, accucacys: %s" %
(pass_id, batch_id, fetched_data[0], fetched_data[1:]))
batch_id += 1
print_train_time(start_time, time.time(), num_samples)
pyreader.reset() # reset reader handle
pyreader.reset()
if not args.no_test and test_args[2]:
test_feeder = None
test_ret = test_parallel(test_exe, test_args, args, test_prog,
test_feeder)
test_ret = test_parallel(test_exe, test_args, args, test_prog)
print("Pass: %d, Test Accuracy: %s\n" %
(pass_id, [np.mean(np.array(v)) for v in test_ret]))
......@@ -316,8 +354,6 @@ def main():
args = parse_args()
print_arguments(args)
print_paddle_envs()
if args.no_random:
fluid.default_startup_program().random_seed = 1
# the unique trainer id, starting from 0, needed by trainer
# only
......
......@@ -3,6 +3,8 @@ from .mobilenet import MobileNet
from .googlenet import GoogleNet
from .vgg import VGG11, VGG13, VGG16, VGG19
from .resnet import ResNet50, ResNet101, ResNet152
from .resnet_dist import DistResNet
from .inception_v4 import InceptionV4
from .se_resnext import SE_ResNeXt50_32x4d, SE_ResNeXt101_32x4d, SE_ResNeXt152_32x4d
from .dpn import DPN68, DPN92, DPN98, DPN107, DPN131
import learning_rate
......@@ -20,3 +20,31 @@ def cosine_decay(learning_rate, step_each_epoch, epochs=120):
decayed_lr = learning_rate * \
(ops.cos(epoch * (math.pi / epochs)) + 1)/2
return decayed_lr
def lr_warmup(learning_rate, warmup_steps, start_lr, end_lr):
""" Applies linear learning rate warmup for distributed training
Argument learning_rate can be float or a Variable
lr = lr + (warmup_rate * step / warmup_steps)
"""
assert(isinstance(end_lr, float))
assert(isinstance(start_lr, float))
linear_step = end_lr - start_lr
with fluid.default_main_program()._lr_schedule_guard():
lr = fluid.layers.tensor.create_global_var(
shape=[1],
value=0.0,
dtype='float32',
persistable=True,
name="learning_rate_warmup")
global_step = fluid.layers.learning_rate_scheduler._decay_step_counter()
with fluid.layers.control_flow.Switch() as switch:
with switch.case(global_step < warmup_steps):
decayed_lr = start_lr + linear_step * (global_step / warmup_steps)
fluid.layers.tensor.assign(decayed_lr, lr)
with switch.default():
fluid.layers.tensor.assign(learning_rate, lr)
return lr
\ No newline at end of file
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import paddle
import paddle.fluid as fluid
import math
__all__ = ["DistResNet"]
train_parameters = {
"input_size": [3, 224, 224],
"input_mean": [0.485, 0.456, 0.406],
"input_std": [0.229, 0.224, 0.225],
"learning_strategy": {
"name": "piecewise_decay",
"batch_size": 256,
"epochs": [30, 60, 90],
"steps": [0.1, 0.01, 0.001, 0.0001]
}
}
class DistResNet():
def __init__(self, layers=50, is_train=True):
self.params = train_parameters
self.layers = layers
self.is_train = is_train
self.weight_decay = 1e-4
def net(self, input, class_dim=1000):
layers = self.layers
supported_layers = [50, 101, 152]
assert layers in supported_layers, \
"supported layers are {} but input layer is {}".format(supported_layers, layers)
if layers == 50:
depth = [3, 4, 6, 3]
elif layers == 101:
depth = [3, 4, 23, 3]
elif layers == 152:
depth = [3, 8, 36, 3]
num_filters = [64, 128, 256, 512]
conv = self.conv_bn_layer(
input=input, num_filters=64, filter_size=7, stride=2, act='relu')
conv = fluid.layers.pool2d(
input=conv,
pool_size=3,
pool_stride=2,
pool_padding=1,
pool_type='max')
for block in range(len(depth)):
for i in range(depth[block]):
conv = self.bottleneck_block(
input=conv,
num_filters=num_filters[block],
stride=2 if i == 0 and block != 0 else 1)
pool = fluid.layers.pool2d(
input=conv, pool_size=7, pool_type='avg', global_pooling=True)
stdv = 1.0 / math.sqrt(pool.shape[1] * 1.0)
out = fluid.layers.fc(input=pool,
size=class_dim,
act='softmax',
param_attr=fluid.param_attr.ParamAttr(
initializer=fluid.initializer.Uniform(-stdv,
stdv),
regularizer=fluid.regularizer.L2Decay(self.weight_decay)),
bias_attr=fluid.ParamAttr(
regularizer=fluid.regularizer.L2Decay(self.weight_decay))
)
return out
def conv_bn_layer(self,
input,
num_filters,
filter_size,
stride=1,
groups=1,
act=None,
bn_init_value=1.0):
conv = fluid.layers.conv2d(
input=input,
num_filters=num_filters,
filter_size=filter_size,
stride=stride,
padding=(filter_size - 1) // 2,
groups=groups,
act=None,
bias_attr=False,
param_attr=fluid.ParamAttr(regularizer=fluid.regularizer.L2Decay(self.weight_decay)))
return fluid.layers.batch_norm(
input=conv, act=act, is_test=not self.is_train,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant(bn_init_value),
regularizer=None))
def shortcut(self, input, ch_out, stride):
ch_in = input.shape[1]
if ch_in != ch_out or stride != 1:
return self.conv_bn_layer(input, ch_out, 1, stride)
else:
return input
def bottleneck_block(self, input, num_filters, stride):
conv0 = self.conv_bn_layer(
input=input, num_filters=num_filters, filter_size=1, act='relu')
conv1 = self.conv_bn_layer(
input=conv0,
num_filters=num_filters,
filter_size=3,
stride=stride,
act='relu')
# NOTE: default bias is 0.0 already
conv2 = self.conv_bn_layer(
input=conv1, num_filters=num_filters * 4, filter_size=1, act=None, bn_init_value=0.0)
short = self.shortcut(input, num_filters * 4, stride)
return fluid.layers.elementwise_add(x=short, y=conv2, act='relu')
......@@ -132,61 +132,44 @@ class BRCDataset(object):
'passage_token_ids': [],
'passage_length': [],
'start_id': [],
'end_id': []
'end_id': [],
'passage_num': []
}
max_passage_num = max(
[len(sample['passages']) for sample in batch_data['raw_data']])
#max_passage_num = min(self.max_p_num, max_passage_num)
max_passage_num = self.max_p_num
max_passage_num = min(self.max_p_num, max_passage_num)
for sidx, sample in enumerate(batch_data['raw_data']):
count = 0
for pidx in range(max_passage_num):
if pidx < len(sample['passages']):
count += 1
batch_data['question_token_ids'].append(sample[
'question_token_ids'])
'question_token_ids'][0:self.max_q_len])
batch_data['question_length'].append(
len(sample['question_token_ids']))
min(len(sample['question_token_ids']), self.max_q_len))
passage_token_ids = sample['passages'][pidx][
'passage_token_ids']
'passage_token_ids'][0:self.max_p_len]
batch_data['passage_token_ids'].append(passage_token_ids)
batch_data['passage_length'].append(
min(len(passage_token_ids), self.max_p_len))
else:
batch_data['question_token_ids'].append([])
batch_data['question_length'].append(0)
batch_data['passage_token_ids'].append([])
batch_data['passage_length'].append(0)
batch_data, padded_p_len, padded_q_len = self._dynamic_padding(
batch_data, pad_id)
for sample in batch_data['raw_data']:
# record the start passage index of current doc
passade_idx_offset = sum(batch_data['passage_num'])
batch_data['passage_num'].append(count)
gold_passage_offset = 0
if 'answer_passages' in sample and len(sample['answer_passages']):
gold_passage_offset = padded_p_len * sample['answer_passages'][
0]
batch_data['start_id'].append(gold_passage_offset + sample[
'answer_spans'][0][0])
batch_data['end_id'].append(gold_passage_offset + sample[
'answer_spans'][0][1])
for i in range(sample['answer_passages'][0]):
gold_passage_offset += len(batch_data['passage_token_ids'][
passade_idx_offset + i])
start_id = min(sample['answer_spans'][0][0], self.max_p_len)
end_id = min(sample['answer_spans'][0][1], self.max_p_len)
batch_data['start_id'].append(gold_passage_offset + start_id)
batch_data['end_id'].append(gold_passage_offset + end_id)
else:
# fake span for some samples, only valid for testing
batch_data['start_id'].append(0)
batch_data['end_id'].append(0)
return batch_data
def _dynamic_padding(self, batch_data, pad_id):
"""
Dynamically pads the batch_data with pad_id
"""
pad_p_len = min(self.max_p_len, max(batch_data['passage_length']))
pad_q_len = min(self.max_q_len, max(batch_data['question_length']))
batch_data['passage_token_ids'] = [
(ids + [pad_id] * (pad_p_len - len(ids)))[:pad_p_len]
for ids in batch_data['passage_token_ids']
]
batch_data['question_token_ids'] = [
(ids + [pad_id] * (pad_q_len - len(ids)))[:pad_q_len]
for ids in batch_data['question_token_ids']
]
return batch_data, pad_p_len, pad_q_len
def word_iter(self, set_name=None):
"""
Iterates over all the words in the dataset
......
......@@ -68,16 +68,23 @@ def bi_lstm_encoder(input_seq, gate_size, para_name, args):
return encoder_out
def encoder(input_name, para_name, shape, hidden_size, args):
def get_data(input_name, lod_level, args):
input_ids = layers.data(
name=input_name, shape=[1], dtype='int64', lod_level=1)
name=input_name, shape=[1], dtype='int64', lod_level=lod_level)
return input_ids
def embedding(input_ids, shape, args):
input_embedding = layers.embedding(
input=input_ids,
size=shape,
dtype='float32',
is_sparse=True,
param_attr=fluid.ParamAttr(name='embedding_para'))
return input_embedding
def encoder(input_embedding, para_name, hidden_size, args):
encoder_out = bi_lstm_encoder(
input_seq=input_embedding,
gate_size=hidden_size,
......@@ -259,40 +266,41 @@ def fusion(g, args):
def rc_model(hidden_size, vocab, args):
emb_shape = [vocab.size(), vocab.embed_dim]
start_labels = layers.data(
name="start_lables", shape=[1], dtype='float32', lod_level=1)
end_labels = layers.data(
name="end_lables", shape=[1], dtype='float32', lod_level=1)
# stage 1:encode
p_ids_names = []
q_ids_names = []
ms = []
gs = []
qs = []
for i in range(args.doc_num):
p_ids_name = "pids_%d" % i
p_ids_names.append(p_ids_name)
p_enc_i = encoder(p_ids_name, 'p_enc', emb_shape, hidden_size, args)
q_ids_name = "qids_%d" % i
q_ids_names.append(q_ids_name)
q_enc_i = encoder(q_ids_name, 'q_enc', emb_shape, hidden_size, args)
q_id0 = get_data('q_id0', 1, args)
q_ids = get_data('q_ids', 2, args)
p_ids_name = 'p_ids'
p_ids = get_data('p_ids', 2, args)
p_embs = embedding(p_ids, emb_shape, args)
q_embs = embedding(q_ids, emb_shape, args)
drnn = layers.DynamicRNN()
with drnn.block():
p_emb = drnn.step_input(p_embs)
q_emb = drnn.step_input(q_embs)
p_enc = encoder(p_emb, 'p_enc', hidden_size, args)
q_enc = encoder(q_emb, 'q_enc', hidden_size, args)
# stage 2:match
g_i = attn_flow(q_enc_i, p_enc_i, p_ids_name, args)
g_i = attn_flow(q_enc, p_enc, p_ids_name, args)
# stage 3:fusion
m_i = fusion(g_i, args)
ms.append(m_i)
gs.append(g_i)
qs.append(q_enc_i)
m = layers.sequence_concat(input=ms)
g = layers.sequence_concat(input=gs)
q_vec = layers.sequence_concat(input=qs)
drnn.output(m_i, q_enc)
ms, q_encs = drnn()
p_vec = layers.lod_reset(x=ms, y=start_labels)
q_vec = layers.lod_reset(x=q_encs, y=q_id0)
# stage 4:decode
start_probs, end_probs = point_network_decoder(
p_vec=m, q_vec=q_vec, hidden_size=hidden_size, args=args)
start_labels = layers.data(
name="start_lables", shape=[1], dtype='float32', lod_level=1)
end_labels = layers.data(
name="end_lables", shape=[1], dtype='float32', lod_level=1)
p_vec=p_vec, q_vec=q_vec, hidden_size=hidden_size, args=args)
cost0 = layers.sequence_pool(
layers.cross_entropy(
......@@ -308,5 +316,5 @@ def rc_model(hidden_size, vocab, args):
cost = cost0 + cost1
cost.persistable = True
feeding_list = q_ids_names + ["start_lables", "end_lables"] + p_ids_names
return cost, start_probs, end_probs, feeding_list
feeding_list = ["q_ids", "start_lables", "end_lables", "p_ids", "q_id0"]
return cost, start_probs, end_probs, ms, feeding_list
......@@ -46,22 +46,32 @@ from vocab import Vocab
def prepare_batch_input(insts, args):
doc_num = args.doc_num
batch_size = len(insts['raw_data'])
inst_num = len(insts['passage_num'])
if batch_size != inst_num:
print("data error %d, %d" % (batch_size, inst_num))
return None
new_insts = []
passage_idx = 0
for i in range(batch_size):
p_len = 0
p_id = []
q_id = []
p_ids = []
q_ids = []
p_len = 0
for j in range(i * doc_num, (i + 1) * doc_num):
p_ids.append(insts['passage_token_ids'][j])
p_id = p_id + insts['passage_token_ids'][j]
q_ids.append(insts['question_token_ids'][j])
q_id = q_id + insts['question_token_ids'][j]
q_id = []
p_id_r = []
p_ids_r = []
q_ids_r = []
q_id_r = []
for j in range(insts['passage_num'][i]):
p_ids.append(insts['passage_token_ids'][passage_idx + j])
p_id = p_id + insts['passage_token_ids'][passage_idx + j]
q_ids.append(insts['question_token_ids'][passage_idx + j])
q_id = q_id + insts['question_token_ids'][passage_idx + j]
passage_idx += insts['passage_num'][i]
p_len = len(p_id)
def _get_label(idx, ref_len):
......@@ -72,11 +82,46 @@ def prepare_batch_input(insts, args):
start_label = _get_label(insts['start_id'][i], p_len)
end_label = _get_label(insts['end_id'][i], p_len)
new_inst = q_ids + [start_label, end_label] + p_ids
new_inst = [q_ids, start_label, end_label, p_ids, q_id]
new_insts.append(new_inst)
return new_insts
def batch_reader(batch_list, args):
res = []
for batch in batch_list:
res.append(prepare_batch_input(batch, args))
return res
def read_multiple(reader, count, clip_last=True):
"""
Stack data from reader for multi-devices.
"""
def __impl__():
res = []
for item in reader():
res.append(item)
if len(res) == count:
yield res
res = []
if len(res) == count:
yield res
elif not clip_last:
data = []
for item in res:
data += item
if len(data) > count:
inst_num_per_part = len(data) // count
yield [
data[inst_num_per_part * i:inst_num_per_part * (i + 1)]
for i in range(count)
]
return __impl__
def LodTensor_Array(lod_tensor):
lod = lod_tensor.lod()
array = np.array(lod_tensor)
......@@ -103,7 +148,7 @@ def print_para(train_prog, train_exe, logger, args):
logger.info("total param num: {0}".format(num_sum))
def find_best_answer_for_passage(start_probs, end_probs, passage_len, args):
def find_best_answer_for_passage(start_probs, end_probs, passage_len):
"""
Finds the best answer with the maximum start_prob * end_prob from a single passage
"""
......@@ -125,7 +170,7 @@ def find_best_answer_for_passage(start_probs, end_probs, passage_len, args):
return (best_start, best_end), max_prob
def find_best_answer(sample, start_prob, end_prob, padded_p_len, args):
def find_best_answer_for_inst(sample, start_prob, end_prob, inst_lod):
"""
Finds the best answer for a sample given start_prob and end_prob for each position.
This will call find_best_answer_for_passage because there are multiple passages in a sample
......@@ -134,11 +179,16 @@ def find_best_answer(sample, start_prob, end_prob, padded_p_len, args):
for p_idx, passage in enumerate(sample['passages']):
if p_idx >= args.max_p_num:
continue
if len(start_prob) != len(end_prob):
logger.info('error: {}'.format(sample['question']))
continue
passage_start = inst_lod[p_idx] - inst_lod[0]
passage_end = inst_lod[p_idx + 1] - inst_lod[0]
passage_len = passage_end - passage_start
passage_len = min(args.max_p_len, len(passage['passage_tokens']))
answer_span, score = find_best_answer_for_passage(
start_prob[p_idx * padded_p_len:(p_idx + 1) * padded_p_len],
end_prob[p_idx * padded_p_len:(p_idx + 1) * padded_p_len],
passage_len, args)
start_prob[passage_start:passage_end],
end_prob[passage_start:passage_end], passage_len)
if score > best_score:
best_score = score
best_p_idx = p_idx
......@@ -148,11 +198,11 @@ def find_best_answer(sample, start_prob, end_prob, padded_p_len, args):
else:
best_answer = ''.join(sample['passages'][best_p_idx]['passage_tokens'][
best_span[0]:best_span[1] + 1])
return best_answer
return best_answer, best_span
def validation(inference_program, avg_cost, s_probs, e_probs, feed_order, place,
vocab, brc_data, logger, args):
def validation(inference_program, avg_cost, s_probs, e_probs, match, feed_order,
place, dev_count, vocab, brc_data, logger, args):
"""
"""
......@@ -165,6 +215,8 @@ def validation(inference_program, avg_cost, s_probs, e_probs, feed_order, place,
# Use test set as validation each pass
total_loss = 0.0
count = 0
n_batch_cnt = 0
n_batch_loss = 0.0
pred_answers, ref_answers = [], []
val_feed_list = [
inference_program.global_block().var(var_name)
......@@ -172,55 +224,80 @@ def validation(inference_program, avg_cost, s_probs, e_probs, feed_order, place,
]
val_feeder = fluid.DataFeeder(val_feed_list, place)
pad_id = vocab.get_id(vocab.pad_token)
dev_batches = brc_data.gen_mini_batches(
'dev', args.batch_size, pad_id, shuffle=False)
dev_reader = lambda:brc_data.gen_mini_batches('dev', args.batch_size, pad_id, shuffle=False)
dev_reader = read_multiple(dev_reader, dev_count)
for batch_id, batch in enumerate(dev_batches, 1):
feed_data = prepare_batch_input(batch, args)
for batch_id, batch_list in enumerate(dev_reader(), 1):
feed_data = batch_reader(batch_list, args)
val_fetch_outs = parallel_executor.run(
feed=val_feeder.feed(feed_data),
fetch_list=[avg_cost.name, s_probs.name, e_probs.name],
feed=list(val_feeder.feed_parallel(feed_data, dev_count)),
fetch_list=[avg_cost.name, s_probs.name, e_probs.name, match.name],
return_numpy=False)
total_loss += np.array(val_fetch_outs[0])[0]
start_probs = LodTensor_Array(val_fetch_outs[1])
end_probs = LodTensor_Array(val_fetch_outs[2])
count += len(batch['raw_data'])
padded_p_len = len(batch['passage_token_ids'][0])
for sample, start_prob, end_prob in zip(batch['raw_data'], start_probs,
end_probs):
best_answer = find_best_answer(sample, start_prob, end_prob,
padded_p_len, args)
pred_answers.append({
'question_id': sample['question_id'],
'question_type': sample['question_type'],
'answers': [best_answer],
'entity_answers': [[]],
'yesno_answers': []
})
if 'answers' in sample:
ref_answers.append({
total_loss += np.array(val_fetch_outs[0]).sum()
start_probs_m = LodTensor_Array(val_fetch_outs[1])
end_probs_m = LodTensor_Array(val_fetch_outs[2])
match_lod = val_fetch_outs[3].lod()
count += len(np.array(val_fetch_outs[0]))
n_batch_cnt += len(np.array(val_fetch_outs[0]))
n_batch_loss += np.array(val_fetch_outs[0]).sum()
log_every_n_batch = args.log_interval
if log_every_n_batch > 0 and batch_id % log_every_n_batch == 0:
logger.info('Average dev loss from batch {} to {} is {}'.format(
batch_id - log_every_n_batch + 1, batch_id, "%.10f" % (
n_batch_loss / n_batch_cnt)))
n_batch_loss = 0.0
n_batch_cnt = 0
for idx, batch in enumerate(batch_list):
#one batch
batch_size = len(batch['raw_data'])
batch_range = match_lod[0][idx * batch_size:(idx + 1) * batch_size +
1]
batch_lod = [[batch_range[x], batch_range[x + 1]]
for x in range(len(batch_range[:-1]))]
start_prob_batch = start_probs_m[idx * batch_size:(idx + 1) *
batch_size]
end_prob_batch = end_probs_m[idx * batch_size:(idx + 1) *
batch_size]
for sample, start_prob_inst, end_prob_inst, inst_range in zip(
batch['raw_data'], start_prob_batch, end_prob_batch,
batch_lod):
#one instance
inst_lod = match_lod[1][inst_range[0]:inst_range[1] + 1]
best_answer, best_span = find_best_answer_for_inst(
sample, start_prob_inst, end_prob_inst, inst_lod)
pred = {
'question_id': sample['question_id'],
'question_type': sample['question_type'],
'answers': sample['answers'],
'answers': [best_answer],
'entity_answers': [[]],
'yesno_answers': []
})
if args.result_dir is not None and args.result_name is not None:
'yesno_answers': [best_span]
}
pred_answers.append(pred)
if 'answers' in sample:
ref = {
'question_id': sample['question_id'],
'question_type': sample['question_type'],
'answers': sample['answers'],
'entity_answers': [[]],
'yesno_answers': []
}
ref_answers.append(ref)
result_dir = args.result_dir
result_prefix = args.result_name
if result_dir is not None and result_prefix is not None:
if not os.path.exists(args.result_dir):
os.makedirs(args.result_dir)
result_file = os.path.join(args.result_dir, args.result_name + '.json')
result_file = os.path.join(result_dir, result_prefix + 'json')
with open(result_file, 'w') as fout:
for pred_answer in pred_answers:
fout.write(json.dumps(pred_answer, ensure_ascii=False) + '\n')
logger.info('Saving {} results to {}'.format(args.result_name,
logger.info('Saving {} results to {}'.format(result_prefix,
result_file))
ave_loss = 1.0 * total_loss / count
# compute the bleu and rouge scores if reference answers is provided
if len(ref_answers) > 0:
pred_dict, ref_dict = {}, {}
......@@ -250,6 +327,13 @@ def train(logger, args):
brc_data.convert_to_ids(vocab)
logger.info('Initialize the model...')
if not args.use_gpu:
place = fluid.CPUPlace()
dev_count = int(os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
else:
place = fluid.CUDAPlace(0)
dev_count = fluid.core.get_cuda_device_count()
# build model
main_program = fluid.Program()
startup_prog = fluid.Program()
......@@ -257,7 +341,7 @@ def train(logger, args):
startup_prog.random_seed = args.random_seed
with fluid.program_guard(main_program, startup_prog):
with fluid.unique_name.guard():
avg_cost, s_probs, e_probs, feed_order = rc_model.rc_model(
avg_cost, s_probs, e_probs, match, feed_order = rc_model.rc_model(
args.hidden_size, vocab, args)
# clone from default main program and use it as the validation program
inference_program = main_program.clone(for_test=True)
......@@ -314,20 +398,21 @@ def train(logger, args):
for pass_id in range(1, args.pass_num + 1):
pass_start_time = time.time()
pad_id = vocab.get_id(vocab.pad_token)
train_batches = brc_data.gen_mini_batches(
'train', args.batch_size, pad_id, shuffle=True)
train_reader = lambda:brc_data.gen_mini_batches('train', args.batch_size, pad_id, shuffle=False)
train_reader = read_multiple(train_reader, dev_count)
log_every_n_batch, n_batch_loss = args.log_interval, 0
total_num, total_loss = 0, 0
for batch_id, batch in enumerate(train_batches, 1):
input_data_dict = prepare_batch_input(batch, args)
for batch_id, batch_list in enumerate(train_reader(), 1):
feed_data = batch_reader(batch_list, args)
fetch_outs = parallel_executor.run(
feed=feeder.feed(input_data_dict),
feed=list(feeder.feed_parallel(feed_data, dev_count)),
fetch_list=[avg_cost.name],
return_numpy=False)
cost_train = np.array(fetch_outs[0])[0]
total_num += len(batch['raw_data'])
cost_train = np.array(fetch_outs[0]).mean()
total_num += args.batch_size * dev_count
n_batch_loss += cost_train
total_loss += cost_train * len(batch['raw_data'])
total_loss += cost_train * args.batch_size * dev_count
if log_every_n_batch > 0 and batch_id % log_every_n_batch == 0:
print_para(main_program, parallel_executor, logger,
args)
......@@ -337,19 +422,23 @@ def train(logger, args):
"%.10f" % (n_batch_loss / log_every_n_batch)))
n_batch_loss = 0
if args.dev_interval > 0 and batch_id % args.dev_interval == 0:
eval_loss, bleu_rouge = validation(
inference_program, avg_cost, s_probs, e_probs,
feed_order, place, vocab, brc_data, logger, args)
logger.info('Dev eval loss {}'.format(eval_loss))
logger.info('Dev eval result: {}'.format(bleu_rouge))
if brc_data.dev_set is not None:
eval_loss, bleu_rouge = validation(
inference_program, avg_cost, s_probs, e_probs,
match, feed_order, place, dev_count, vocab,
brc_data, logger, args)
logger.info('Dev eval loss {}'.format(eval_loss))
logger.info('Dev eval result: {}'.format(
bleu_rouge))
pass_end_time = time.time()
logger.info('Evaluating the model after epoch {}'.format(
pass_id))
if brc_data.dev_set is not None:
eval_loss, bleu_rouge = validation(
inference_program, avg_cost, s_probs, e_probs,
feed_order, place, vocab, brc_data, logger, args)
inference_program, avg_cost, s_probs, e_probs, match,
feed_order, place, dev_count, vocab, brc_data, logger,
args)
logger.info('Dev eval loss {}'.format(eval_loss))
logger.info('Dev eval result: {}'.format(bleu_rouge))
else:
......@@ -389,10 +478,17 @@ def evaluate(logger, args):
startup_prog.random_seed = args.random_seed
with fluid.program_guard(main_program, startup_prog):
with fluid.unique_name.guard():
avg_cost, s_probs, e_probs, feed_order = rc_model.rc_model(
avg_cost, s_probs, e_probs, match, feed_order = rc_model.rc_model(
args.hidden_size, vocab, args)
# initialize parameters
place = core.CUDAPlace(0) if args.use_gpu else core.CPUPlace()
if not args.use_gpu:
place = fluid.CPUPlace()
dev_count = int(
os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
else:
place = fluid.CUDAPlace(0)
dev_count = fluid.core.get_cuda_device_count()
exe = Executor(place)
if args.load_dir:
logger.info('load from {}'.format(args.load_dir))
......@@ -402,17 +498,10 @@ def evaluate(logger, args):
logger.error('No model file to load ...')
return
# prepare data
feed_list = [
main_program.global_block().var(var_name)
for var_name in feed_order
]
feeder = fluid.DataFeeder(feed_list, place)
inference_program = main_program.clone(for_test=True)
eval_loss, bleu_rouge = validation(
inference_program, avg_cost, s_probs, e_probs, feed_order,
place, vocab, brc_data, logger, args)
place, dev_count, vocab, brc_data, logger, args)
logger.info('Dev eval loss {}'.format(eval_loss))
logger.info('Dev eval result: {}'.format(bleu_rouge))
logger.info('Predicted answers are saved to {}'.format(
......@@ -438,10 +527,17 @@ def predict(logger, args):
startup_prog.random_seed = args.random_seed
with fluid.program_guard(main_program, startup_prog):
with fluid.unique_name.guard():
avg_cost, s_probs, e_probs, feed_order = rc_model.rc_model(
avg_cost, s_probs, e_probs, match, feed_order = rc_model.rc_model(
args.hidden_size, vocab, args)
# initialize parameters
place = core.CUDAPlace(0) if args.use_gpu else core.CPUPlace()
if not args.use_gpu:
place = fluid.CPUPlace()
dev_count = int(
os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
else:
place = fluid.CUDAPlace(0)
dev_count = fluid.core.get_cuda_device_count()
exe = Executor(place)
if args.load_dir:
logger.info('load from {}'.format(args.load_dir))
......@@ -451,17 +547,10 @@ def predict(logger, args):
logger.error('No model file to load ...')
return
# prepare data
feed_list = [
main_program.global_block().var(var_name)
for var_name in feed_order
]
feeder = fluid.DataFeeder(feed_list, place)
inference_program = main_program.clone(for_test=True)
eval_loss, bleu_rouge = validation(
inference_program, avg_cost, s_probs, e_probs, feed_order,
place, vocab, brc_data, logger, args)
inference_program, avg_cost, s_probs, e_probs, match,
feed_order, place, dev_count, vocab, brc_data, logger, args)
def prepare(logger, args):
......
export CUDA_VISIBLE_DEVICES=1
export CUDA_VISIBLE_DEVICES=0
python run.py \
--trainset 'data/preprocessed/trainset/search.train.json' \
'data/preprocessed/trainset/zhidao.train.json' \
......@@ -11,11 +11,12 @@ python run.py \
--save_dir ./models \
--pass_num 10 \
--learning_rate 0.001 \
--batch_size 8 \
--batch_size 32 \
--embed_size 300 \
--hidden_size 150 \
--max_p_num 5 \
--max_p_len 500 \
--max_q_len 60 \
--max_a_len 200 \
--weight_decay 0.0 \
--drop_rate 0.2 $@\
#!/bin/bash
set -x
unset http_proxy
unset https_proxy
#pserver
export TRAINING_ROLE=PSERVER
export PADDLE_PORT=30134
export PADDLE_PSERVERS=127.0.0.1
export PADDLE_IS_LOCAL=0
export PADDLE_INIT_TRAINER_COUNT=1
export POD_IP=127.0.0.1
export PADDLE_TRAINER_ID=0
export PADDLE_TRAINERS_NUM=1
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib64/:/usr/local/lib/:/workspace/brpc
export PYTHONPATH=$PYTHONPATH:/paddle/build/build_reader_RelWithDebInfo_gpu/python
#GLOG_v=7 GLOG_logtostderr=1
CUDA_VISIBLE_DEVICES=4,5,6,7 python -u train.py \
--src_vocab_fpath 'cluster_test_data_en_fr/thirdparty/vocab.wordpiece.en-fr' \
--trg_vocab_fpath 'cluster_test_data_en_fr/thirdparty/vocab.wordpiece.en-fr' \
--special_token '<s>' '<e>' '<unk>' \
--token_delimiter '\x01' \
--train_file_pattern 'cluster_test_data_en_fr/train/train.wordpiece.en-fr.0' \
--val_file_pattern 'cluster_test_data_en_fr/thirdparty/newstest2014.wordpiece.en-fr' \
--use_token_batch True \
--batch_size 3200 \
--sort_type pool \
--pool_size 200000 \
--local False > pserver.log 2>&1 &
pserver_pid=$(echo $!)
echo $pserver_pid
sleep 30s
#trainer
export TRAINING_ROLE=TRAINER
export PADDLE_PORT=30134
export PADDLE_PSERVERS=127.0.0.1
export PADDLE_IS_LOCAL=0
export PADDLE_INIT_TRAINER_COUNT=1
export POD_IP=127.0.0.1
export PADDLE_TRAINER_ID=0
export PADDLE_TRAINERS_NUM=1
CUDA_VISIBLE_DEVICES=4,5,6,7 python -u train.py \
--src_vocab_fpath 'cluster_test_data_en_fr/thirdparty/vocab.wordpiece.en-fr' \
--trg_vocab_fpath 'cluster_test_data_en_fr/thirdparty/vocab.wordpiece.en-fr' \
--special_token '<s>' '<e>' '<unk>' \
--token_delimiter '\x01' \
--train_file_pattern 'cluster_test_data_en_fr/train/train.wordpiece.en-fr.0' \
--val_file_pattern 'cluster_test_data_en_fr/thirdparty/newstest2014.wordpiece.en-fr' \
--use_token_batch True \
--batch_size 3200 \
--sort_type pool \
--pool_size 200000 \
--local False > trainer.log 2>&1 &
#sleep 80
#kill -9 $pserver_pid
......@@ -80,7 +80,7 @@ def multi_head_attention(queries,
# The value 0 in shape attr means copying the corresponding dimension
# size of the input as the output dimension size.
reshaped = layers.reshape(
x=x, shape=[0, 0, n_head, hidden_size // n_head])
x=x, shape=[0, 0, n_head, hidden_size // n_head], inplace=True)
# permuate the dimensions into:
# [batch_size, n_head, max_sequence_len, hidden_size_per_head]
......@@ -99,7 +99,9 @@ def multi_head_attention(queries,
# The value 0 in shape attr means copying the corresponding dimension
# size of the input as the output dimension size.
return layers.reshape(
x=trans_x, shape=[0, 0, trans_x.shape[2] * trans_x.shape[3]])
x=trans_x,
shape=[0, 0, trans_x.shape[2] * trans_x.shape[3]],
inplace=True)
def scaled_dot_product_attention(q, k, v, attn_bias, d_key, dropout_rate):
"""
......@@ -523,8 +525,7 @@ def transformer(src_vocab_size,
epsilon=label_smooth_eps)
cost = layers.softmax_with_cross_entropy(
logits=layers.reshape(
predict, shape=[-1, trg_vocab_size]),
logits=predict,
label=label,
soft_label=True if label_smooth_eps else False)
weighted_cost = cost * weights
......@@ -637,6 +638,9 @@ def wrap_decoder(trg_vocab_size,
preprocess_cmd,
postprocess_cmd,
caches=caches)
# Reshape to 2D tensor to use GEMM instead of BatchedGEMM
dec_output = layers.reshape(
dec_output, shape=[-1, dec_output.shape[-1]], inplace=True)
if weight_sharing:
predict = layers.matmul(
x=dec_output,
......@@ -751,7 +755,6 @@ def fast_decode(
dec_inputs=(pre_ids, pre_pos, None, pre_src_attn_bias),
enc_output=pre_enc_output,
caches=pre_caches)
logits = layers.reshape(logits, (-1, trg_vocab_size))
topk_scores, topk_indices = layers.topk(
input=layers.softmax(logits), k=beam_size)
......
import argparse
import ast
import contextlib
import multiprocessing
import os
import six
......@@ -79,8 +80,7 @@ def parse_args():
type=lambda x: str(x.encode().decode("unicode-escape")),
default=" ",
help="The delimiter used to split tokens in source or target sentences. "
"For EN-DE BPE data we provided, use spaces as token delimiter. "
"For EN-FR wordpiece data we provided, use '\x01' as token delimiter.")
"For EN-DE BPE data we provided, use spaces as token delimiter.")
parser.add_argument(
"--use_mem_opt",
type=ast.literal_eval,
......@@ -98,9 +98,14 @@ def parse_args():
help="The iteration number to run in profiling.")
parser.add_argument(
"--use_parallel_exe",
type=bool,
type=ast.literal_eval,
default=False,
help="The flag indicating whether to use ParallelExecutor.")
parser.add_argument(
"--profile_ops",
type=ast.literal_eval,
default=True,
help="The flag indicating whether to profile operators.")
parser.add_argument(
'opts',
help='See config.py for all options',
......@@ -125,6 +130,8 @@ def parse_args():
def main(args):
train_prog = fluid.Program()
startup_prog = fluid.Program()
train_prog.random_seed = 1000
startup_prog.random_seed = 1000
with fluid.program_guard(train_prog, startup_prog):
with fluid.unique_name.guard():
sum_cost, avg_cost, predict, token_num, pyreader = transformer(
......@@ -243,24 +250,33 @@ def main(args):
if args.use_py_reader:
pyreader.reset()
pyreader.start()
break
return reader_time, run_time
@contextlib.contextmanager
def profile_context(profile=True):
if profile:
with profiler.profiler('All', 'total', '/tmp/profile_file'):
yield
else:
yield
# start-up
init_flag = True
run(1)
run(5)
init_flag = False
# profiling
start = time.time()
# currently only support profiling on one device
with profiler.profiler('All', 'total', '/tmp/profile_file'):
with profile_context(args.profile_ops):
reader_time, run_time = run(args.iter_num)
end = time.time()
total_time = end - start
print("Total time: {0}, reader time: {1} s, run time: {2} s".format(
total_time, np.sum(reader_time), np.sum(run_time)))
print(
"Total time: {0}, reader time: {1} s, run time: {2} s, step number: {3}".
format(total_time, np.sum(reader_time), np.sum(run_time),
args.iter_num))
if __name__ == "__main__":
......
......@@ -297,9 +297,14 @@ class DataReader(object):
infos = self._sample_infos
if self._sort_type == SortType.POOL:
reverse = True
for i in range(0, len(infos), self._pool_size):
# to avoid placing short next to long sentences
reverse = not reverse
infos[i:i + self._pool_size] = sorted(
infos[i:i + self._pool_size], key=lambda x: x.max_len)
infos[i:i + self._pool_size],
key=lambda x: x.max_len,
reverse=reverse)
# concat batch
batches = []
......
import argparse
import ast
import copy
import logging
import multiprocessing
import os
import six
import sys
import time
import numpy as np
import paddle.fluid as fluid
from paddle.fluid.transpiler.details import program_to_code
import reader
from config import *
......@@ -97,6 +101,11 @@ def parse_args():
default='GPU',
choices=['CPU', 'GPU'],
help="The device type.")
parser.add_argument(
'--update_method',
choices=("pserver", "nccl2"),
default="pserver",
help='Update method.')
parser.add_argument(
'--sync', type=ast.literal_eval, default=True, help="sync mode.")
parser.add_argument(
......@@ -115,6 +124,11 @@ def parse_args():
type=ast.literal_eval,
default=True,
help="The flag indicating whether to use py_reader.")
parser.add_argument(
"--fetch_steps",
type=int,
default=100,
help="The frequency to fetch and print output.")
args = parser.parse_args()
# Append args related to dict
......@@ -131,6 +145,26 @@ def parse_args():
return args
def append_nccl2_prepare(startup_prog, trainer_id, worker_endpoints,
current_endpoint):
assert (trainer_id >= 0 and len(worker_endpoints) > 1 and
current_endpoint in worker_endpoints)
eps = copy.deepcopy(worker_endpoints)
eps.remove(current_endpoint)
nccl_id_var = startup_prog.global_block().create_var(
name="NCCLID", persistable=True, type=fluid.core.VarDesc.VarType.RAW)
startup_prog.global_block().append_op(
type="gen_nccl_id",
inputs={},
outputs={"NCCLID": nccl_id_var},
attrs={
"endpoint": current_endpoint,
"endpoint_list": eps,
"trainer_id": trainer_id
})
return nccl_id_var
def pad_batch_data(insts,
pad_idx,
n_head,
......@@ -370,7 +404,7 @@ def test_context(exe, train_exe, dev_count):
TrainTaskConfig.label_smooth_eps,
use_py_reader=args.use_py_reader,
is_test=True)
test_prog = test_prog.clone(for_test=True)
test_data = prepare_data_generator(
args, is_test=True, count=dev_count, pyreader=pyreader)
......@@ -410,15 +444,25 @@ def test_context(exe, train_exe, dev_count):
return test
def train_loop(exe, train_prog, startup_prog, dev_count, sum_cost, avg_cost,
token_num, predict, pyreader):
def train_loop(exe,
train_prog,
startup_prog,
dev_count,
sum_cost,
avg_cost,
token_num,
predict,
pyreader,
nccl2_num_trainers=1,
nccl2_trainer_id=0):
# Initialize the parameters.
if TrainTaskConfig.ckpt_path:
fluid.io.load_persistables(exe, TrainTaskConfig.ckpt_path)
else:
print("init fluid.framework.default_startup_program")
logging.info("init fluid.framework.default_startup_program")
exe.run(startup_prog)
logging.info("begin reader")
train_data = prepare_data_generator(
args, is_test=False, count=dev_count, pyreader=pyreader)
......@@ -431,12 +475,16 @@ def train_loop(exe, train_prog, startup_prog, dev_count, sum_cost, avg_cost,
# use token average cost among multi-devices. and the gradient scale is
# `1 / token_number` for average cost.
# build_strategy.gradient_scale_strategy = fluid.BuildStrategy.GradientScaleStrategy.Customized
logging.info("begin executor")
train_exe = fluid.ParallelExecutor(
use_cuda=TrainTaskConfig.use_gpu,
loss_name=avg_cost.name,
main_program=train_prog,
build_strategy=build_strategy,
exec_strategy=exec_strategy)
exec_strategy=exec_strategy,
num_trainers=nccl2_num_trainers,
trainer_id=nccl2_trainer_id)
if args.val_file_pattern is not None:
test = test_context(exe, train_exe, dev_count)
......@@ -450,6 +498,8 @@ def train_loop(exe, train_prog, startup_prog, dev_count, sum_cost, avg_cost,
step_idx = 0
init_flag = True
logging.info("begin train")
for pass_id in six.moves.xrange(TrainTaskConfig.pass_num):
pass_start_time = time.time()
......@@ -464,25 +514,38 @@ def train_loop(exe, train_prog, startup_prog, dev_count, sum_cost, avg_cost,
try:
feed_dict_list = prepare_feed_dict_list(data_generator,
init_flag, dev_count)
outs = train_exe.run(
fetch_list=[sum_cost.name, token_num.name],
fetch_list=[sum_cost.name, token_num.name]
if step_idx % args.fetch_steps == 0 else [],
feed=feed_dict_list)
sum_cost_val, token_num_val = np.array(outs[0]), np.array(outs[
1])
# sum the cost from multi-devices
total_sum_cost = sum_cost_val.sum()
total_token_num = token_num_val.sum()
total_avg_cost = total_sum_cost / total_token_num
print("step_idx: %d, epoch: %d, batch: %d, avg loss: %f, "
"normalized loss: %f, ppl: %f" %
(step_idx, pass_id, batch_id, total_avg_cost,
total_avg_cost - loss_normalizer,
np.exp([min(total_avg_cost, 100)])))
if step_idx % int(TrainTaskConfig.
save_freq) == TrainTaskConfig.save_freq - 1:
if step_idx % args.fetch_steps == 0:
sum_cost_val, token_num_val = np.array(outs[0]), np.array(
outs[1])
# sum the cost from multi-devices
total_sum_cost = sum_cost_val.sum()
total_token_num = token_num_val.sum()
total_avg_cost = total_sum_cost / total_token_num
if step_idx == 0:
logging.info(
"step_idx: %d, epoch: %d, batch: %d, avg loss: %f, "
"normalized loss: %f, ppl: %f" %
(step_idx, pass_id, batch_id, total_avg_cost,
total_avg_cost - loss_normalizer,
np.exp([min(total_avg_cost, 100)])))
avg_batch_time = time.time()
else:
logging.info(
"step_idx: %d, epoch: %d, batch: %d, avg loss: %f, "
"normalized loss: %f, ppl: %f, speed: %.2f step/s" %
(step_idx, pass_id, batch_id, total_avg_cost,
total_avg_cost - loss_normalizer,
np.exp([min(total_avg_cost, 100)]),
args.fetch_steps / (time.time() - avg_batch_time)))
avg_batch_time = time.time()
if step_idx % TrainTaskConfig.save_freq == 0 and step_idx > 0:
fluid.io.save_persistables(
exe,
os.path.join(TrainTaskConfig.ckpt_dir,
......@@ -492,6 +555,7 @@ def train_loop(exe, train_prog, startup_prog, dev_count, sum_cost, avg_cost,
os.path.join(TrainTaskConfig.model_dir,
"iter_" + str(step_idx) + ".infer.model"),
train_prog)
init_flag = False
batch_id += 1
step_idx += 1
......@@ -505,13 +569,13 @@ def train_loop(exe, train_prog, startup_prog, dev_count, sum_cost, avg_cost,
# Validate and save the persistable.
if args.val_file_pattern is not None:
val_avg_cost, val_ppl = test()
print(
logging.info(
"epoch: %d, val avg loss: %f, val normalized loss: %f, val ppl: %f,"
" consumed %fs" % (pass_id, val_avg_cost,
val_avg_cost - loss_normalizer, val_ppl,
time_consumed))
else:
print("epoch: %d, consumed %fs" % (pass_id, time_consumed))
logging.info("epoch: %d, consumed %fs" % (pass_id, time_consumed))
if not args.enable_ce:
fluid.io.save_persistables(
exe,
......@@ -531,7 +595,7 @@ def train(args):
is_local = os.getenv("PADDLE_IS_LOCAL", "1")
if is_local == '0':
args.local = False
print(args)
logging.info(args)
if args.device == 'CPU':
TrainTaskConfig.use_gpu = False
......@@ -576,15 +640,21 @@ def train(args):
use_py_reader=args.use_py_reader,
is_test=False)
if args.local:
optimizer = None
if args.sync:
lr_decay = fluid.layers.learning_rate_scheduler.noam_decay(
ModelHyperParams.d_model, TrainTaskConfig.warmup_steps)
logging.info("before adam")
with fluid.default_main_program()._lr_schedule_guard():
learning_rate = lr_decay * TrainTaskConfig.learning_rate
optimizer = fluid.optimizer.Adam(
learning_rate=lr_decay * TrainTaskConfig.learning_rate,
learning_rate=learning_rate,
beta1=TrainTaskConfig.beta1,
beta2=TrainTaskConfig.beta2,
epsilon=TrainTaskConfig.eps)
elif args.sync == False:
else:
optimizer = fluid.optimizer.SGD(0.003)
optimizer.minimize(avg_cost)
......@@ -592,10 +662,32 @@ def train(args):
fluid.memory_optimize(train_prog)
if args.local:
print("local start_up:")
logging.info("local start_up:")
train_loop(exe, train_prog, startup_prog, dev_count, sum_cost, avg_cost,
token_num, predict, pyreader)
else:
if args.update_method == "nccl2":
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
port = os.getenv("PADDLE_PORT")
worker_ips = os.getenv("PADDLE_TRAINERS")
worker_endpoints = []
for ip in worker_ips.split(","):
worker_endpoints.append(':'.join([ip, port]))
trainers_num = len(worker_endpoints)
current_endpoint = os.getenv("POD_IP") + ":" + port
if trainer_id == 0:
logging.info("train_id == 0, sleep 60s")
time.sleep(60)
logging.info("trainers_num:{}".format(trainers_num))
logging.info("worker_endpoints:{}".format(worker_endpoints))
logging.info("current_endpoint:{}".format(current_endpoint))
append_nccl2_prepare(startup_prog, trainer_id, worker_endpoints,
current_endpoint)
train_loop(exe, train_prog, startup_prog, dev_count, sum_cost,
avg_cost, token_num, predict, pyreader, trainers_num,
trainer_id)
return
port = os.getenv("PADDLE_PORT", "6174")
pserver_ips = os.getenv("PADDLE_PSERVERS") # ip,ip...
eplist = []
......@@ -605,6 +697,13 @@ def train(args):
trainers = int(os.getenv("PADDLE_TRAINERS_NUM", "0"))
current_endpoint = os.getenv("POD_IP") + ":" + port
trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
logging.info("pserver_endpoints:{}".format(pserver_endpoints))
logging.info("current_endpoint:{}".format(current_endpoint))
logging.info("trainer_id:{}".format(trainer_id))
logging.info("pserver_ips:{}".format(pserver_ips))
logging.info("port:{}".format(port))
t = fluid.DistributeTranspiler()
t.transpile(
trainer_id,
......@@ -614,32 +713,34 @@ def train(args):
startup_program=startup_prog)
if training_role == "PSERVER":
logging.info("distributed: pserver started")
current_endpoint = os.getenv("POD_IP") + ":" + os.getenv(
"PADDLE_PORT")
if not current_endpoint:
print("need env SERVER_ENDPOINT")
logging.critical("need env SERVER_ENDPOINT")
exit(1)
pserver_prog = t.get_pserver_program(current_endpoint)
pserver_startup = t.get_startup_program(current_endpoint,
pserver_prog)
print("psserver begin run")
with open('pserver_startup.desc', 'w') as f:
f.write(str(pserver_startup))
with open('pserver_prog.desc', 'w') as f:
f.write(str(pserver_prog))
exe.run(pserver_startup)
exe.run(pserver_prog)
elif training_role == "TRAINER":
logging.info("distributed: trainer started")
trainer_prog = t.get_trainer_program()
with open('trainer_prog.desc', 'w') as f:
f.write(str(trainer_prog))
train_loop(exe, train_prog, startup_prog, dev_count, sum_cost,
avg_cost, token_num, predict, pyreader)
else:
print("environment var TRAINER_ROLE should be TRAINER os PSERVER")
logging.critical(
"environment var TRAINER_ROLE should be TRAINER os PSERVER")
exit(1)
if __name__ == "__main__":
LOG_FORMAT = "[%(asctime)s %(levelname)s %(filename)s:%(lineno)d] %(message)s"
logging.basicConfig(
stream=sys.stdout, level=logging.DEBUG, format=LOG_FORMAT)
args = parse_args()
train(args)
......@@ -17,12 +17,12 @@
## 简介,模型详解
在PaddlePaddle v2版本[命名实体识别](https://github.com/PaddlePaddle/models/blob/develop/sequence_tagging_for_ner/README.md)中对于命名实体识别任务有较详细的介绍,在本例中不再重复介绍。
在PaddlePaddle v2版本[命名实体识别](https://github.com/PaddlePaddle/models/blob/develop/legacy/sequence_tagging_for_ner/README.md)中对于命名实体识别任务有较详细的介绍,在本例中不再重复介绍。
在模型上,我们沿用了v2版本的模型结构,唯一区别是我们使用LSTM代替原始的RNN。
## 数据获取
完整数据的获取请参考PaddlePaddle v2版本[命名实体识别](https://github.com/PaddlePaddle/models/blob/develop/sequence_tagging_for_ner/README.md) 一节中的方式。本例的示例数据同样可以通过运行data/download.sh来获取。
完整数据的获取请参考PaddlePaddle v2版本[命名实体识别](https://github.com/PaddlePaddle/models/blob/develop/legacy/sequence_tagging_for_ner/README.md) 一节中的方式。本例的示例数据同样可以通过运行data/download.sh来获取。
## 训练
......
......@@ -31,8 +31,8 @@ from pretrained_word2vec import Glove840B_300D
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('--model_name', type=str, default='cdssm', help="Which model to train")
parser.add_argument('--config', type=str, default='cdssm.cdssm_base', help="The global config setting")
parser.add_argument('--model_name', type=str, default='cdssmNet', help="Which model to train")
parser.add_argument('--config', type=str, default='cdssm_base', help="The global config setting")
DATA_DIR = os.path.join(os.path.expanduser('~'), '.cache/paddle/dataset')
......@@ -87,8 +87,8 @@ def evaluate(epoch_id, exe, inference_program, dev_reader, test_reader, fetch_li
def train_and_evaluate(train_reader,
test_reader,
dev_reader,
test_reader,
network,
optimizer,
global_config,
......@@ -246,7 +246,10 @@ def main():
# use cuda or not
if not global_config.has_member('use_cuda'):
global_config.use_cuda = 'CUDA_VISIBLE_DEVICES' in os.environ
if 'CUDA_VISIBLE_DEVICES' in os.environ and os.environ['CUDA_VISIBLE_DEVICES'] != '':
global_config.use_cuda = True
else:
global_config.use_cuda = False
global_config.list_config()
......
source ~/mapingshuo/.bash_mapingshuo_fluid
export CUDA_VISIBLE_DEVICES=1
fluid train_and_evaluate.py \
--model_name=cdssmNet \
--config=cdssm_base
#fluid train_and_evaluate.py \
# --model_name=DecAttNet \
# --config=decatt_glove
#fluid train_and_evaluate.py \
# --model_name=DecAttNet \
# --config=decatt_word
#fluid train_and_evaluate.py \
# --model_name=ESIMNet \
# --config=esim_seq
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册