未验证 提交 8997bb63 编写于 作者: G guru4elephant 提交者: GitHub

Merge pull request #4 from PaddlePaddle/develop

merge from remote
......@@ -5,6 +5,7 @@ python -u ../train_and_evaluate.py --use_cuda \
--ext_eval \
--word_emb_init ./data/word_embedding.pkl \
--save_path ./models \
--use_pyreader \
--batch_size 256 \
--vocab_size 172130 \
--channel1_num 16 \
......
......@@ -15,45 +15,85 @@ class Net(object):
self._stack_num = stack_num
self._channel1_num = channel1_num
self._channel2_num = channel2_num
self._feed_names = []
self.word_emb_name = "shared_word_emb"
self.use_stack_op = True
self.use_mask_cache = True
self.use_sparse_embedding = True
def set_word_embedding(self, word_emb, place):
word_emb_param = fluid.global_scope().find_var(
self.word_emb_name).get_tensor()
word_emb_param.set(word_emb, place)
def create_network(self):
mask_cache = dict() if self.use_mask_cache else None
turns_data = []
def create_py_reader(self, capacity, name):
# turns ids
shapes = [[-1, self._max_turn_len, 1]
for i in six.moves.xrange(self._max_turn_num)]
dtypes = ["int32" for i in six.moves.xrange(self._max_turn_num)]
# turns mask
shapes += [[-1, self._max_turn_len, 1]
for i in six.moves.xrange(self._max_turn_num)]
dtypes += ["float32" for i in six.moves.xrange(self._max_turn_num)]
# response ids, response mask, label
shapes += [[-1, self._max_turn_len, 1], [-1, self._max_turn_len, 1],
[-1, 1]]
dtypes += ["int32", "float32", "float32"]
py_reader = fluid.layers.py_reader(
capacity=capacity,
shapes=shapes,
lod_levels=[0] * (2 * self._max_turn_num + 3),
dtypes=dtypes,
name=name,
use_double_buffer=True)
data_vars = fluid.layers.read_file(py_reader)
self.turns_data = data_vars[0:self._max_turn_num]
self.turns_mask = data_vars[self._max_turn_num:2 * self._max_turn_num]
self.response = data_vars[-3]
self.response_mask = data_vars[-2]
self.label = data_vars[-1]
return py_reader
def create_data_layers(self):
self._feed_names = []
self.turns_data = []
for i in six.moves.xrange(self._max_turn_num):
name = "turn_%d" % i
turn = fluid.layers.data(
name="turn_%d" % i,
shape=[self._max_turn_len, 1],
dtype="int32")
turns_data.append(turn)
name=name, shape=[self._max_turn_len, 1], dtype="int32")
self.turns_data.append(turn)
self._feed_names.append(name)
turns_mask = []
self.turns_mask = []
for i in six.moves.xrange(self._max_turn_num):
name = "turn_mask_%d" % i
turn_mask = fluid.layers.data(
name="turn_mask_%d" % i,
shape=[self._max_turn_len, 1],
dtype="float32")
turns_mask.append(turn_mask)
name=name, shape=[self._max_turn_len, 1], dtype="float32")
self.turns_mask.append(turn_mask)
self._feed_names.append(name)
response = fluid.layers.data(
self.response = fluid.layers.data(
name="response", shape=[self._max_turn_len, 1], dtype="int32")
response_mask = fluid.layers.data(
self.response_mask = fluid.layers.data(
name="response_mask",
shape=[self._max_turn_len, 1],
dtype="float32")
label = fluid.layers.data(name="label", shape=[1], dtype="float32")
self.label = fluid.layers.data(name="label", shape=[1], dtype="float32")
self._feed_names += ["response", "response_mask", "label"]
def get_feed_names(self):
return self._feed_names
def set_word_embedding(self, word_emb, place):
word_emb_param = fluid.global_scope().find_var(
self.word_emb_name).get_tensor()
word_emb_param.set(word_emb, place)
def create_network(self):
mask_cache = dict() if self.use_mask_cache else None
response_emb = fluid.layers.embedding(
input=response,
input=self.response,
size=[self._vocab_size + 1, self._emb_size],
is_sparse=self.use_sparse_embedding,
param_attr=fluid.ParamAttr(
......@@ -71,8 +111,8 @@ class Net(object):
key=Hr,
value=Hr,
d_key=self._emb_size,
q_mask=response_mask,
k_mask=response_mask,
q_mask=self.response_mask,
k_mask=self.response_mask,
mask_cache=mask_cache)
Hr_stack.append(Hr)
......@@ -80,7 +120,7 @@ class Net(object):
sim_turns = []
for t in six.moves.xrange(self._max_turn_num):
Hu = fluid.layers.embedding(
input=turns_data[t],
input=self.turns_data[t],
size=[self._vocab_size + 1, self._emb_size],
is_sparse=self.use_sparse_embedding,
param_attr=fluid.ParamAttr(
......@@ -96,8 +136,8 @@ class Net(object):
key=Hu,
value=Hu,
d_key=self._emb_size,
q_mask=turns_mask[t],
k_mask=turns_mask[t],
q_mask=self.turns_mask[t],
k_mask=self.turns_mask[t],
mask_cache=mask_cache)
Hu_stack.append(Hu)
......@@ -111,8 +151,8 @@ class Net(object):
key=Hr_stack[index],
value=Hr_stack[index],
d_key=self._emb_size,
q_mask=turns_mask[t],
k_mask=response_mask,
q_mask=self.turns_mask[t],
k_mask=self.response_mask,
mask_cache=mask_cache)
r_a_t = layers.block(
name="r_attend_t_" + str(index),
......@@ -120,8 +160,8 @@ class Net(object):
key=Hu_stack[index],
value=Hu_stack[index],
d_key=self._emb_size,
q_mask=response_mask,
k_mask=turns_mask[t],
q_mask=self.response_mask,
k_mask=self.turns_mask[t],
mask_cache=mask_cache)
t_a_r_stack.append(t_a_r)
......@@ -158,5 +198,5 @@ class Net(object):
sim = fluid.layers.concat(input=sim_turns, axis=2)
final_info = layers.cnn_3d(sim, self._channel1_num, self._channel2_num)
loss, logits = layers.loss(final_info, label)
loss, logits = layers.loss(final_info, self.label)
return loss, logits
......@@ -7,7 +7,7 @@ import multiprocessing
import paddle
import paddle.fluid as fluid
import utils.reader as reader
from utils.util import print_arguments
from utils.util import print_arguments, mkdir
try:
import cPickle as pickle #python 2
......@@ -49,6 +49,10 @@ def parse_args():
'--use_cuda',
action='store_true',
help='If set, use cuda for training.')
parser.add_argument(
'--use_pyreader',
action='store_true',
help='If set, use pyreader for reading data.')
parser.add_argument(
'--ext_eval',
action='store_true',
......@@ -105,7 +109,75 @@ def parse_args():
#yapf: enable
def evaluate(score_path, result_file_path):
if args.ext_eval:
import utils.douban_evaluation as eva
else:
import utils.evaluation as eva
#write evaluation result
result = eva.evaluate(score_path)
with open(result_file_path, 'w') as out_file:
for p_at in result:
out_file.write(str(p_at) + '\n')
print('finish evaluation')
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
def test_with_feed(exe, program, feed_names, fetch_list, score_path, batches,
batch_num, dev_count):
score_file = open(score_path, 'w')
for it in six.moves.xrange(batch_num // dev_count):
feed_list = []
for dev in six.moves.xrange(dev_count):
val_index = it * dev_count + dev
batch_data = reader.make_one_batch_input(batches, val_index)
feed_dict = dict(zip(feed_names, batch_data))
feed_list.append(feed_dict)
predicts = exe.run(feed=feed_list, fetch_list=fetch_list)
scores = np.array(predicts[0])
for dev in six.moves.xrange(dev_count):
val_index = it * dev_count + dev
for i in six.moves.xrange(args.batch_size):
score_file.write(
str(scores[args.batch_size * dev + i][0]) + '\t' + str(
batches["label"][val_index][i]) + '\n')
score_file.close()
def test_with_pyreader(exe, program, pyreader, fetch_list, score_path, batches,
batch_num, dev_count):
def data_provider():
for index in six.moves.xrange(batch_num):
yield reader.make_one_batch_input(batches, index)
score_file = open(score_path, 'w')
pyreader.decorate_tensor_provider(data_provider)
it = 0
pyreader.start()
while True:
try:
predicts = exe.run(fetch_list=fetch_list)
scores = np.array(predicts[0])
for dev in six.moves.xrange(dev_count):
val_index = it * dev_count + dev
for i in six.moves.xrange(args.batch_size):
score_file.write(
str(scores[args.batch_size * dev + i][0]) + '\t' + str(
batches["label"][val_index][i]) + '\n')
it += 1
except fluid.core.EOFException:
pyreader.reset()
break
score_file.close()
def train(args):
if not os.path.exists(args.save_path):
os.makedirs(args.save_path)
# data data_config
data_conf = {
"batch_size": args.batch_size,
......@@ -117,14 +189,19 @@ def train(args):
dam = Net(args.max_turn_num, args.max_turn_len, args.vocab_size,
args.emb_size, args.stack_num, args.channel1_num,
args.channel2_num)
loss, logits = dam.create_network()
train_program = fluid.Program()
train_startup = fluid.Program()
with fluid.program_guard(train_program, train_startup):
with fluid.unique_name.guard():
if args.use_pyreader:
train_pyreader = dam.create_py_reader(
capacity=10, name='train_reader')
else:
dam.create_data_layers()
loss, logits = dam.create_network()
loss.persistable = True
logits.persistable = True
train_program = fluid.default_main_program()
test_program = fluid.default_main_program().clone(for_test=True)
# gradient clipping
fluid.clip.set_gradient_clip(clip=fluid.clip.GradientClipByValue(
max=1.0, min=-1.0))
......@@ -136,9 +213,24 @@ def train(args):
decay_rate=0.9,
staircase=True))
optimizer.minimize(loss)
fluid.memory_optimize(train_program)
test_program = fluid.Program()
test_startup = fluid.Program()
with fluid.program_guard(test_program, test_startup):
with fluid.unique_name.guard():
if args.use_pyreader:
test_pyreader = dam.create_py_reader(
capacity=10, name='test_reader')
else:
dam.create_data_layers()
loss, logits = dam.create_network()
loss.persistable = True
logits.persistable = True
test_program = test_program.clone(for_test=True)
if args.use_cuda:
place = fluid.CUDAPlace(0)
dev_count = fluid.core.get_cuda_device_count()
......@@ -152,7 +244,8 @@ def train(args):
program=train_program, batch_size=args.batch_size))
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
exe.run(train_startup)
exe.run(test_startup)
train_exe = fluid.ParallelExecutor(
use_cuda=args.use_cuda, loss_name=loss.name, main_program=train_program)
......@@ -162,11 +255,6 @@ def train(args):
main_program=test_program,
share_vars_from=train_exe)
if args.ext_eval:
import utils.douban_evaluation as eva
else:
import utils.evaluation as eva
if args.word_emb_init is not None:
print("start loading word embedding init ...")
if six.PY2:
......@@ -199,17 +287,15 @@ def train(args):
print("begin model training ...")
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
step = 0
for epoch in six.moves.xrange(args.num_scan_data):
shuffle_train = reader.unison_shuffle(train_data)
train_batches = reader.build_batches(shuffle_train, data_conf)
# train on one epoch data by feeding
def train_with_feed(step):
ave_cost = 0.0
for it in six.moves.xrange(batch_num // dev_count):
feed_list = []
for dev in six.moves.xrange(dev_count):
index = it * dev_count + dev
feed_dict = reader.make_one_batch_input(train_batches, index)
batch_data = reader.make_one_batch_input(train_batches, index)
feed_dict = dict(zip(dam.get_feed_names(), batch_data))
feed_list.append(feed_dict)
cost = train_exe.run(feed=feed_list, fetch_list=[loss.name])
......@@ -226,41 +312,73 @@ def train(args):
print("Save model at step %d ... " % step)
print(time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(time.time())))
fluid.io.save_persistables(exe, save_path)
fluid.io.save_persistables(exe, save_path, train_program)
score_path = os.path.join(args.save_path, 'score.' + str(step))
score_file = open(score_path, 'w')
for it in six.moves.xrange(val_batch_num // dev_count):
feed_list = []
for dev in six.moves.xrange(dev_count):
val_index = it * dev_count + dev
feed_dict = reader.make_one_batch_input(val_batches,
val_index)
feed_list.append(feed_dict)
test_with_feed(test_exe, test_program,
dam.get_feed_names(), [logits.name], score_path,
val_batches, val_batch_num, dev_count)
predicts = test_exe.run(feed=feed_list,
fetch_list=[logits.name])
scores = np.array(predicts[0])
for dev in six.moves.xrange(dev_count):
val_index = it * dev_count + dev
for i in six.moves.xrange(args.batch_size):
score_file.write(
str(scores[args.batch_size * dev + i][0]) + '\t'
+ str(val_batches["label"][val_index][
i]) + '\n')
score_file.close()
#write evaluation result
result = eva.evaluate(score_path)
result_file_path = os.path.join(args.save_path,
'result.' + str(step))
with open(result_file_path, 'w') as out_file:
for p_at in result:
out_file.write(str(p_at) + '\n')
print('finish evaluation')
evaluate(score_path, result_file_path)
return step
# train on one epoch with pyreader
def train_with_pyreader(step):
def data_provider():
for index in six.moves.xrange(batch_num):
yield reader.make_one_batch_input(train_batches, index)
train_pyreader.decorate_tensor_provider(data_provider)
ave_cost = 0.0
train_pyreader.start()
while True:
try:
cost = train_exe.run(fetch_list=[loss.name])
ave_cost += np.array(cost[0]).mean()
step = step + 1
if step % print_step == 0:
print("processed: [" + str(step * dev_count * 1.0 /
batch_num) + "] ave loss: [" +
str(ave_cost / print_step) + "]")
ave_cost = 0.0
if (args.save_path is not None) and (step % save_step == 0):
save_path = os.path.join(args.save_path,
"step_" + str(step))
print("Save model at step %d ... " % step)
print(time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(time.time())))
fluid.io.save_persistables(exe, save_path, train_program)
score_path = os.path.join(args.save_path,
'score.' + str(step))
test_with_pyreader(test_exe, test_program, test_pyreader,
[logits.name], score_path, val_batches,
val_batch_num, dev_count)
result_file_path = os.path.join(args.save_path,
'result.' + str(step))
evaluate(score_path, result_file_path)
except fluid.core.EOFException:
train_pyreader.reset()
break
return step
# train over different epoches
global_step = 0
for epoch in six.moves.xrange(args.num_scan_data):
shuffle_train = reader.unison_shuffle(train_data)
train_batches = reader.build_batches(shuffle_train, data_conf)
if args.use_pyreader:
global_step = train_with_pyreader(global_step)
else:
global_step = train_with_feed(global_step)
if __name__ == '__main__':
......
......@@ -4,6 +4,7 @@ python -u ../train_and_evaluate.py --use_cuda \
--data_path ./data/data.pkl \
--word_emb_init ./data/word_embedding.pkl \
--save_path ./models \
--use_pyreader \
--batch_size 256 \
--vocab_size 434512 \
--emb_size 200 \
......
......@@ -202,30 +202,30 @@ def make_one_batch_input(data_batches, index):
every_turn_len[:, i] for i in six.moves.xrange(max_turn_num)
]
feed_dict = {}
feed_list = []
for i, turn in enumerate(turns_list):
feed_dict["turn_%d" % i] = turn
feed_dict["turn_%d" % i] = np.expand_dims(
feed_dict["turn_%d" % i], axis=-1)
turn = np.expand_dims(turn, axis=-1)
feed_list.append(turn)
for i, turn_len in enumerate(every_turn_len_list):
feed_dict["turn_mask_%d" % i] = np.ones(
(batch_size, max_turn_len, 1)).astype("float32")
turn_mask = np.ones((batch_size, max_turn_len, 1)).astype("float32")
for row in six.moves.xrange(batch_size):
feed_dict["turn_mask_%d" % i][row, turn_len[row]:, 0] = 0
turn_mask[row, turn_len[row]:, 0] = 0
feed_list.append(turn_mask)
feed_dict["response"] = response
feed_dict["response"] = np.expand_dims(feed_dict["response"], axis=-1)
response = np.expand_dims(response, axis=-1)
feed_list.append(response)
feed_dict["response_mask"] = np.ones(
(batch_size, max_turn_len, 1)).astype("float32")
response_mask = np.ones((batch_size, max_turn_len, 1)).astype("float32")
for row in six.moves.xrange(batch_size):
feed_dict["response_mask"][row, response_len[row]:, 0] = 0
response_mask[row, response_len[row]:, 0] = 0
feed_list.append(response_mask)
feed_dict["label"] = np.array([data_batches["label"][index]]).reshape(
label = np.array([data_batches["label"][index]]).reshape(
[-1, 1]).astype("float32")
feed_list.append(label)
return feed_dict
return feed_list
if __name__ == '__main__':
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
__all__ = ['parse_args', ]
BENCHMARK_MODELS = [
"ResNet50", "ResNet101", "ResNet152"
]
def parse_args():
parser = argparse.ArgumentParser('Distributed Image Classification Training.')
parser.add_argument(
'--model',
type=str,
choices=BENCHMARK_MODELS,
default='resnet',
help='The model to run benchmark with.')
parser.add_argument(
'--batch_size', type=int, default=32, help='The minibatch size.')
# args related to learning rate
parser.add_argument(
'--learning_rate', type=float, default=0.001, help='The learning rate.')
# TODO(wuyi): add "--use_fake_data" option back.
parser.add_argument(
'--skip_batch_num',
type=int,
default=5,
help='The first num of minibatch num to skip, for better performance test'
)
parser.add_argument(
'--iterations', type=int, default=80, help='The number of minibatches.')
parser.add_argument(
'--pass_num', type=int, default=100, help='The number of passes.')
parser.add_argument(
'--data_format',
type=str,
default='NCHW',
choices=['NCHW', 'NHWC'],
help='The data data_format, now only support NCHW.')
parser.add_argument(
'--device',
type=str,
default='GPU',
choices=['CPU', 'GPU'],
help='The device type.')
parser.add_argument(
'--gpus',
type=int,
default=1,
help='If gpus > 1, will use ParallelExecutor to run, else use Executor.')
# this option is available only for vgg and resnet.
parser.add_argument(
'--cpus',
type=int,
default=1,
help='If cpus > 1, will set ParallelExecutor to use multiple threads.')
parser.add_argument(
'--data_set',
type=str,
default='flowers',
choices=['cifar10', 'flowers', 'imagenet'],
help='Optional dataset for benchmark.')
parser.add_argument(
'--no_test',
action='store_true',
help='If set, do not test the testset during training.')
parser.add_argument(
'--memory_optimize',
action='store_true',
help='If set, optimize runtime memory before start.')
parser.add_argument(
'--update_method',
type=str,
default='local',
choices=['local', 'pserver', 'nccl2'],
help='Choose parameter update method, can be local, pserver, nccl2.')
parser.add_argument(
'--no_split_var',
action='store_true',
default=False,
help='Whether split variables into blocks when update_method is pserver')
parser.add_argument(
'--async_mode',
action='store_true',
default=False,
help='Whether start pserver in async mode to support ASGD')
parser.add_argument(
'--no_random',
action='store_true',
help='If set, keep the random seed and do not shuffle the data.')
parser.add_argument(
'--reduce_strategy',
type=str,
choices=['reduce', 'all_reduce'],
default='all_reduce',
help='Specify the reduce strategy, can be reduce, all_reduce')
parser.add_argument(
'--data_dir',
type=str,
default="../data/ILSVRC2012",
help="The ImageNet dataset root dir."
)
args = parser.parse_args()
return args
......@@ -26,9 +26,84 @@ import six
import sys
sys.path.append("..")
import models
from args import *
from reader import train, val
def parse_args():
parser = argparse.ArgumentParser('Distributed Image Classification Training.')
parser.add_argument(
'--model',
type=str,
default='DistResNet',
help='The model to run.')
parser.add_argument(
'--batch_size', type=int, default=32, help='The minibatch size per device.')
parser.add_argument(
'--multi_batch_repeat', type=int, default=1, help='Batch merge repeats.')
parser.add_argument(
'--learning_rate', type=float, default=0.1, help='The learning rate.')
parser.add_argument(
'--pass_num', type=int, default=90, help='The number of passes.')
parser.add_argument(
'--data_format',
type=str,
default='NCHW',
choices=['NCHW', 'NHWC'],
help='The data data_format, now only support NCHW.')
parser.add_argument(
'--device',
type=str,
default='GPU',
choices=['CPU', 'GPU'],
help='The device type.')
parser.add_argument(
'--gpus',
type=int,
default=1,
help='If gpus > 1, will use ParallelExecutor to run, else use Executor.')
parser.add_argument(
'--cpus',
type=int,
default=1,
help='If cpus > 1, will set ParallelExecutor to use multiple threads.')
parser.add_argument(
'--no_test',
action='store_true',
help='If set, do not test the testset during training.')
parser.add_argument(
'--memory_optimize',
action='store_true',
help='If set, optimize runtime memory before start.')
parser.add_argument(
'--update_method',
type=str,
default='local',
choices=['local', 'pserver', 'nccl2'],
help='Choose parameter update method, can be local, pserver, nccl2.')
parser.add_argument(
'--no_split_var',
action='store_true',
default=False,
help='Whether split variables into blocks when update_method is pserver')
parser.add_argument(
'--async_mode',
action='store_true',
default=False,
help='Whether start pserver in async mode to support ASGD')
parser.add_argument(
'--reduce_strategy',
type=str,
choices=['reduce', 'all_reduce'],
default='all_reduce',
help='Specify the reduce strategy, can be reduce, all_reduce')
parser.add_argument(
'--data_dir',
type=str,
default="../data/ILSVRC2012",
help="The ImageNet dataset root dir."
)
args = parser.parse_args()
return args
def get_model(args, is_train, main_prog, startup_prog):
pyreader = None
class_dim = 1000
......@@ -51,7 +126,7 @@ def get_model(args, is_train, main_prog, startup_prog):
name="train_reader" if is_train else "test_reader",
use_double_buffer=True)
input, label = fluid.layers.read_file(pyreader)
model_def = models.__dict__[args.model]()
model_def = models.__dict__[args.model](layers=50, is_train=is_train)
predict = model_def.net(input, class_dim=class_dim)
cost = fluid.layers.cross_entropy(input=predict, label=label)
......@@ -60,89 +135,64 @@ def get_model(args, is_train, main_prog, startup_prog):
batch_acc1 = fluid.layers.accuracy(input=predict, label=label, k=1)
batch_acc5 = fluid.layers.accuracy(input=predict, label=label, k=5)
# configure optimize
optimizer = None
if is_train:
start_lr = args.learning_rate
# n * worker * repeat
end_lr = args.learning_rate * trainer_count * args.multi_batch_repeat
total_images = 1281167 / trainer_count
step = int(total_images / (args.batch_size * args.gpus) + 1)
epochs = [30, 60, 90]
step = int(total_images / (args.batch_size * args.gpus * args.multi_batch_repeat) + 1)
warmup_steps = step * 5 # warmup 5 passes
epochs = [30, 60, 80]
bd = [step * e for e in epochs]
base_lr = args.learning_rate
base_lr = end_lr
lr = []
lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)]
optimizer = fluid.optimizer.Momentum(
learning_rate=fluid.layers.piecewise_decay(
learning_rate=models.learning_rate.lr_warmup(
fluid.layers.piecewise_decay(
boundaries=bd, values=lr),
warmup_steps, start_lr, end_lr),
momentum=0.9,
regularization=fluid.regularizer.L2Decay(1e-4))
optimizer.minimize(avg_cost)
if args.memory_optimize:
fluid.memory_optimize(main_prog)
batched_reader = None
pyreader.decorate_paddle_reader(
paddle.batch(
reader if args.no_random else paddle.reader.shuffle(
reader, buf_size=5120),
reader,
batch_size=args.batch_size))
return avg_cost, optimizer, [batch_acc1,
batch_acc5], batched_reader, pyreader
def append_nccl2_prepare(trainer_id, startup_prog):
if trainer_id >= 0:
# append gen_nccl_id at the end of startup program
trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
port = os.getenv("PADDLE_PSERVER_PORT")
worker_ips = os.getenv("PADDLE_TRAINER_IPS")
worker_endpoints = []
for ip in worker_ips.split(","):
worker_endpoints.append(':'.join([ip, port]))
num_trainers = len(worker_endpoints)
current_endpoint = os.getenv("PADDLE_CURRENT_IP") + ":" + port
worker_endpoints.remove(current_endpoint)
nccl_id_var = startup_prog.global_block().create_var(
name="NCCLID",
persistable=True,
type=fluid.core.VarDesc.VarType.RAW)
startup_prog.global_block().append_op(
type="gen_nccl_id",
inputs={},
outputs={"NCCLID": nccl_id_var},
attrs={
"endpoint": current_endpoint,
"endpoint_list": worker_endpoints,
"trainer_id": trainer_id
})
return nccl_id_var, num_trainers, trainer_id
else:
raise Exception("must set positive PADDLE_TRAINER_ID env variables for "
"nccl-based dist train.")
config = fluid.DistributeTranspilerConfig()
config.mode = "nccl2"
t = fluid.DistributeTranspiler(config=config)
t.transpile(trainer_id, trainers=','.join(worker_endpoints),
current_endpoint=current_endpoint,
startup_program=startup_prog)
def dist_transpile(trainer_id, args, train_prog, startup_prog):
if trainer_id < 0:
return None, None
# the port of all pservers, needed by both trainer and pserver
port = os.getenv("PADDLE_PSERVER_PORT", "6174")
# comma separated ips of all pservers, needed by trainer and
# pserver
pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "")
eplist = []
for ip in pserver_ips.split(","):
eplist.append(':'.join([ip, port]))
pserver_endpoints = ",".join(eplist)
# total number of workers/trainers in the job, needed by
# trainer and pserver
trainers = int(os.getenv("PADDLE_TRAINERS"))
# the IP of the local machine, needed by pserver only
current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port
# the role, should be either PSERVER or TRAINER
training_role = os.getenv("PADDLE_TRAINING_ROLE")
config = fluid.DistributeTranspilerConfig()
......@@ -150,8 +200,6 @@ def dist_transpile(trainer_id, args, train_prog, startup_prog):
t = fluid.DistributeTranspiler(config=config)
t.transpile(
trainer_id,
# NOTE: *MUST* use train_prog, for we are using with guard to
# generate different program for train and test.
program=train_prog,
pservers=pserver_endpoints,
trainers=trainers,
......@@ -171,7 +219,7 @@ def dist_transpile(trainer_id, args, train_prog, startup_prog):
)
def test_parallel(exe, test_args, args, test_prog, feeder):
def test_parallel(exe, test_args, args, test_prog):
acc_evaluators = []
for i in six.moves.xrange(len(test_args[2])):
acc_evaluators.append(fluid.metrics.Accuracy())
......@@ -190,13 +238,10 @@ def test_parallel(exe, test_args, args, test_prog, feeder):
return [e.eval() for e in acc_evaluators]
# NOTE: only need to benchmark using parallelexe
def train_parallel(train_args, test_args, args, train_prog, test_prog,
startup_prog, nccl_id_var, num_trainers, trainer_id):
over_all_start = time.time()
place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0)
feeder = None
if nccl_id_var and trainer_id == 0:
#FIXME(wuyi): wait other trainer to start listening
......@@ -237,31 +282,27 @@ def train_parallel(train_args, test_args, args, train_prog, test_prog,
if args.update_method == "pserver":
test_scope = None
else:
# NOTE: use an empty scope to avoid test exe using NCCLID
test_scope = fluid.Scope()
test_exe = fluid.ParallelExecutor(
True, main_program=test_prog, share_vars_from=exe)
True, main_program=test_prog, share_vars_from=exe,
scope=test_scope)
pyreader = train_args[4]
for pass_id in range(args.pass_num):
num_samples = 0
iters = 0
start_time = time.time()
batch_id = 0
pyreader.start()
while True:
if iters == args.iterations:
break
if iters == args.skip_batch_num:
start_time = time.time()
num_samples = 0
fetch_list = [avg_loss.name]
acc_name_list = [v.name for v in train_args[2]]
fetch_list.extend(acc_name_list)
try:
if batch_id % 30 == 0:
fetch_ret = exe.run(fetch_list)
else:
fetch_ret = exe.run([])
except fluid.core.EOFException as eof:
break
except fluid.core.EnforceNotMet as ex:
......@@ -269,20 +310,17 @@ def train_parallel(train_args, test_args, args, train_prog, test_prog,
break
num_samples += args.batch_size * args.gpus
iters += 1
if batch_id % 1 == 0:
if batch_id % 30 == 0:
fetched_data = [np.mean(np.array(d)) for d in fetch_ret]
print("Pass %d, batch %d, loss %s, accucacys: %s" %
(pass_id, batch_id, fetched_data[0], fetched_data[1:]))
batch_id += 1
print_train_time(start_time, time.time(), num_samples)
pyreader.reset() # reset reader handle
pyreader.reset()
if not args.no_test and test_args[2]:
test_feeder = None
test_ret = test_parallel(test_exe, test_args, args, test_prog,
test_feeder)
test_ret = test_parallel(test_exe, test_args, args, test_prog)
print("Pass: %d, Test Accuracy: %s\n" %
(pass_id, [np.mean(np.array(v)) for v in test_ret]))
......@@ -316,8 +354,6 @@ def main():
args = parse_args()
print_arguments(args)
print_paddle_envs()
if args.no_random:
fluid.default_startup_program().random_seed = 1
# the unique trainer id, starting from 0, needed by trainer
# only
......
......@@ -3,6 +3,8 @@ from .mobilenet import MobileNet
from .googlenet import GoogleNet
from .vgg import VGG11, VGG13, VGG16, VGG19
from .resnet import ResNet50, ResNet101, ResNet152
from .resnet_dist import DistResNet
from .inception_v4 import InceptionV4
from .se_resnext import SE_ResNeXt50_32x4d, SE_ResNeXt101_32x4d, SE_ResNeXt152_32x4d
from .dpn import DPN68, DPN92, DPN98, DPN107, DPN131
import learning_rate
......@@ -20,3 +20,31 @@ def cosine_decay(learning_rate, step_each_epoch, epochs=120):
decayed_lr = learning_rate * \
(ops.cos(epoch * (math.pi / epochs)) + 1)/2
return decayed_lr
def lr_warmup(learning_rate, warmup_steps, start_lr, end_lr):
""" Applies linear learning rate warmup for distributed training
Argument learning_rate can be float or a Variable
lr = lr + (warmup_rate * step / warmup_steps)
"""
assert(isinstance(end_lr, float))
assert(isinstance(start_lr, float))
linear_step = end_lr - start_lr
with fluid.default_main_program()._lr_schedule_guard():
lr = fluid.layers.tensor.create_global_var(
shape=[1],
value=0.0,
dtype='float32',
persistable=True,
name="learning_rate_warmup")
global_step = fluid.layers.learning_rate_scheduler._decay_step_counter()
with fluid.layers.control_flow.Switch() as switch:
with switch.case(global_step < warmup_steps):
decayed_lr = start_lr + linear_step * (global_step / warmup_steps)
fluid.layers.tensor.assign(decayed_lr, lr)
with switch.default():
fluid.layers.tensor.assign(learning_rate, lr)
return lr
\ No newline at end of file
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import paddle
import paddle.fluid as fluid
import math
__all__ = ["DistResNet"]
train_parameters = {
"input_size": [3, 224, 224],
"input_mean": [0.485, 0.456, 0.406],
"input_std": [0.229, 0.224, 0.225],
"learning_strategy": {
"name": "piecewise_decay",
"batch_size": 256,
"epochs": [30, 60, 90],
"steps": [0.1, 0.01, 0.001, 0.0001]
}
}
class DistResNet():
def __init__(self, layers=50, is_train=True):
self.params = train_parameters
self.layers = layers
self.is_train = is_train
self.weight_decay = 1e-4
def net(self, input, class_dim=1000):
layers = self.layers
supported_layers = [50, 101, 152]
assert layers in supported_layers, \
"supported layers are {} but input layer is {}".format(supported_layers, layers)
if layers == 50:
depth = [3, 4, 6, 3]
elif layers == 101:
depth = [3, 4, 23, 3]
elif layers == 152:
depth = [3, 8, 36, 3]
num_filters = [64, 128, 256, 512]
conv = self.conv_bn_layer(
input=input, num_filters=64, filter_size=7, stride=2, act='relu')
conv = fluid.layers.pool2d(
input=conv,
pool_size=3,
pool_stride=2,
pool_padding=1,
pool_type='max')
for block in range(len(depth)):
for i in range(depth[block]):
conv = self.bottleneck_block(
input=conv,
num_filters=num_filters[block],
stride=2 if i == 0 and block != 0 else 1)
pool = fluid.layers.pool2d(
input=conv, pool_size=7, pool_type='avg', global_pooling=True)
stdv = 1.0 / math.sqrt(pool.shape[1] * 1.0)
out = fluid.layers.fc(input=pool,
size=class_dim,
act='softmax',
param_attr=fluid.param_attr.ParamAttr(
initializer=fluid.initializer.Uniform(-stdv,
stdv),
regularizer=fluid.regularizer.L2Decay(self.weight_decay)),
bias_attr=fluid.ParamAttr(
regularizer=fluid.regularizer.L2Decay(self.weight_decay))
)
return out
def conv_bn_layer(self,
input,
num_filters,
filter_size,
stride=1,
groups=1,
act=None,
bn_init_value=1.0):
conv = fluid.layers.conv2d(
input=input,
num_filters=num_filters,
filter_size=filter_size,
stride=stride,
padding=(filter_size - 1) // 2,
groups=groups,
act=None,
bias_attr=False,
param_attr=fluid.ParamAttr(regularizer=fluid.regularizer.L2Decay(self.weight_decay)))
return fluid.layers.batch_norm(
input=conv, act=act, is_test=not self.is_train,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant(bn_init_value),
regularizer=None))
def shortcut(self, input, ch_out, stride):
ch_in = input.shape[1]
if ch_in != ch_out or stride != 1:
return self.conv_bn_layer(input, ch_out, 1, stride)
else:
return input
def bottleneck_block(self, input, num_filters, stride):
conv0 = self.conv_bn_layer(
input=input, num_filters=num_filters, filter_size=1, act='relu')
conv1 = self.conv_bn_layer(
input=conv0,
num_filters=num_filters,
filter_size=3,
stride=stride,
act='relu')
# NOTE: default bias is 0.0 already
conv2 = self.conv_bn_layer(
input=conv1, num_filters=num_filters * 4, filter_size=1, act=None, bn_init_value=0.0)
short = self.shortcut(input, num_filters * 4, stride)
return fluid.layers.elementwise_add(x=short, y=conv2, act='relu')
......@@ -132,61 +132,44 @@ class BRCDataset(object):
'passage_token_ids': [],
'passage_length': [],
'start_id': [],
'end_id': []
'end_id': [],
'passage_num': []
}
max_passage_num = max(
[len(sample['passages']) for sample in batch_data['raw_data']])
#max_passage_num = min(self.max_p_num, max_passage_num)
max_passage_num = self.max_p_num
max_passage_num = min(self.max_p_num, max_passage_num)
for sidx, sample in enumerate(batch_data['raw_data']):
count = 0
for pidx in range(max_passage_num):
if pidx < len(sample['passages']):
count += 1
batch_data['question_token_ids'].append(sample[
'question_token_ids'])
'question_token_ids'][0:self.max_q_len])
batch_data['question_length'].append(
len(sample['question_token_ids']))
min(len(sample['question_token_ids']), self.max_q_len))
passage_token_ids = sample['passages'][pidx][
'passage_token_ids']
'passage_token_ids'][0:self.max_p_len]
batch_data['passage_token_ids'].append(passage_token_ids)
batch_data['passage_length'].append(
min(len(passage_token_ids), self.max_p_len))
else:
batch_data['question_token_ids'].append([])
batch_data['question_length'].append(0)
batch_data['passage_token_ids'].append([])
batch_data['passage_length'].append(0)
batch_data, padded_p_len, padded_q_len = self._dynamic_padding(
batch_data, pad_id)
for sample in batch_data['raw_data']:
# record the start passage index of current doc
passade_idx_offset = sum(batch_data['passage_num'])
batch_data['passage_num'].append(count)
gold_passage_offset = 0
if 'answer_passages' in sample and len(sample['answer_passages']):
gold_passage_offset = padded_p_len * sample['answer_passages'][
0]
batch_data['start_id'].append(gold_passage_offset + sample[
'answer_spans'][0][0])
batch_data['end_id'].append(gold_passage_offset + sample[
'answer_spans'][0][1])
for i in range(sample['answer_passages'][0]):
gold_passage_offset += len(batch_data['passage_token_ids'][
passade_idx_offset + i])
start_id = min(sample['answer_spans'][0][0], self.max_p_len)
end_id = min(sample['answer_spans'][0][1], self.max_p_len)
batch_data['start_id'].append(gold_passage_offset + start_id)
batch_data['end_id'].append(gold_passage_offset + end_id)
else:
# fake span for some samples, only valid for testing
batch_data['start_id'].append(0)
batch_data['end_id'].append(0)
return batch_data
def _dynamic_padding(self, batch_data, pad_id):
"""
Dynamically pads the batch_data with pad_id
"""
pad_p_len = min(self.max_p_len, max(batch_data['passage_length']))
pad_q_len = min(self.max_q_len, max(batch_data['question_length']))
batch_data['passage_token_ids'] = [
(ids + [pad_id] * (pad_p_len - len(ids)))[:pad_p_len]
for ids in batch_data['passage_token_ids']
]
batch_data['question_token_ids'] = [
(ids + [pad_id] * (pad_q_len - len(ids)))[:pad_q_len]
for ids in batch_data['question_token_ids']
]
return batch_data, pad_p_len, pad_q_len
def word_iter(self, set_name=None):
"""
Iterates over all the words in the dataset
......
......@@ -68,16 +68,23 @@ def bi_lstm_encoder(input_seq, gate_size, para_name, args):
return encoder_out
def encoder(input_name, para_name, shape, hidden_size, args):
def get_data(input_name, lod_level, args):
input_ids = layers.data(
name=input_name, shape=[1], dtype='int64', lod_level=1)
name=input_name, shape=[1], dtype='int64', lod_level=lod_level)
return input_ids
def embedding(input_ids, shape, args):
input_embedding = layers.embedding(
input=input_ids,
size=shape,
dtype='float32',
is_sparse=True,
param_attr=fluid.ParamAttr(name='embedding_para'))
return input_embedding
def encoder(input_embedding, para_name, hidden_size, args):
encoder_out = bi_lstm_encoder(
input_seq=input_embedding,
gate_size=hidden_size,
......@@ -259,40 +266,41 @@ def fusion(g, args):
def rc_model(hidden_size, vocab, args):
emb_shape = [vocab.size(), vocab.embed_dim]
start_labels = layers.data(
name="start_lables", shape=[1], dtype='float32', lod_level=1)
end_labels = layers.data(
name="end_lables", shape=[1], dtype='float32', lod_level=1)
# stage 1:encode
p_ids_names = []
q_ids_names = []
ms = []
gs = []
qs = []
for i in range(args.doc_num):
p_ids_name = "pids_%d" % i
p_ids_names.append(p_ids_name)
p_enc_i = encoder(p_ids_name, 'p_enc', emb_shape, hidden_size, args)
q_ids_name = "qids_%d" % i
q_ids_names.append(q_ids_name)
q_enc_i = encoder(q_ids_name, 'q_enc', emb_shape, hidden_size, args)
q_id0 = get_data('q_id0', 1, args)
q_ids = get_data('q_ids', 2, args)
p_ids_name = 'p_ids'
p_ids = get_data('p_ids', 2, args)
p_embs = embedding(p_ids, emb_shape, args)
q_embs = embedding(q_ids, emb_shape, args)
drnn = layers.DynamicRNN()
with drnn.block():
p_emb = drnn.step_input(p_embs)
q_emb = drnn.step_input(q_embs)
p_enc = encoder(p_emb, 'p_enc', hidden_size, args)
q_enc = encoder(q_emb, 'q_enc', hidden_size, args)
# stage 2:match
g_i = attn_flow(q_enc_i, p_enc_i, p_ids_name, args)
g_i = attn_flow(q_enc, p_enc, p_ids_name, args)
# stage 3:fusion
m_i = fusion(g_i, args)
ms.append(m_i)
gs.append(g_i)
qs.append(q_enc_i)
m = layers.sequence_concat(input=ms)
g = layers.sequence_concat(input=gs)
q_vec = layers.sequence_concat(input=qs)
drnn.output(m_i, q_enc)
ms, q_encs = drnn()
p_vec = layers.lod_reset(x=ms, y=start_labels)
q_vec = layers.lod_reset(x=q_encs, y=q_id0)
# stage 4:decode
start_probs, end_probs = point_network_decoder(
p_vec=m, q_vec=q_vec, hidden_size=hidden_size, args=args)
start_labels = layers.data(
name="start_lables", shape=[1], dtype='float32', lod_level=1)
end_labels = layers.data(
name="end_lables", shape=[1], dtype='float32', lod_level=1)
p_vec=p_vec, q_vec=q_vec, hidden_size=hidden_size, args=args)
cost0 = layers.sequence_pool(
layers.cross_entropy(
......@@ -308,5 +316,5 @@ def rc_model(hidden_size, vocab, args):
cost = cost0 + cost1
cost.persistable = True
feeding_list = q_ids_names + ["start_lables", "end_lables"] + p_ids_names
return cost, start_probs, end_probs, feeding_list
feeding_list = ["q_ids", "start_lables", "end_lables", "p_ids", "q_id0"]
return cost, start_probs, end_probs, ms, feeding_list
......@@ -46,22 +46,32 @@ from vocab import Vocab
def prepare_batch_input(insts, args):
doc_num = args.doc_num
batch_size = len(insts['raw_data'])
inst_num = len(insts['passage_num'])
if batch_size != inst_num:
print("data error %d, %d" % (batch_size, inst_num))
return None
new_insts = []
passage_idx = 0
for i in range(batch_size):
p_len = 0
p_id = []
q_id = []
p_ids = []
q_ids = []
p_len = 0
for j in range(i * doc_num, (i + 1) * doc_num):
p_ids.append(insts['passage_token_ids'][j])
p_id = p_id + insts['passage_token_ids'][j]
q_ids.append(insts['question_token_ids'][j])
q_id = q_id + insts['question_token_ids'][j]
q_id = []
p_id_r = []
p_ids_r = []
q_ids_r = []
q_id_r = []
for j in range(insts['passage_num'][i]):
p_ids.append(insts['passage_token_ids'][passage_idx + j])
p_id = p_id + insts['passage_token_ids'][passage_idx + j]
q_ids.append(insts['question_token_ids'][passage_idx + j])
q_id = q_id + insts['question_token_ids'][passage_idx + j]
passage_idx += insts['passage_num'][i]
p_len = len(p_id)
def _get_label(idx, ref_len):
......@@ -72,11 +82,46 @@ def prepare_batch_input(insts, args):
start_label = _get_label(insts['start_id'][i], p_len)
end_label = _get_label(insts['end_id'][i], p_len)
new_inst = q_ids + [start_label, end_label] + p_ids
new_inst = [q_ids, start_label, end_label, p_ids, q_id]
new_insts.append(new_inst)
return new_insts
def batch_reader(batch_list, args):
res = []
for batch in batch_list:
res.append(prepare_batch_input(batch, args))
return res
def read_multiple(reader, count, clip_last=True):
"""
Stack data from reader for multi-devices.
"""
def __impl__():
res = []
for item in reader():
res.append(item)
if len(res) == count:
yield res
res = []
if len(res) == count:
yield res
elif not clip_last:
data = []
for item in res:
data += item
if len(data) > count:
inst_num_per_part = len(data) // count
yield [
data[inst_num_per_part * i:inst_num_per_part * (i + 1)]
for i in range(count)
]
return __impl__
def LodTensor_Array(lod_tensor):
lod = lod_tensor.lod()
array = np.array(lod_tensor)
......@@ -103,7 +148,7 @@ def print_para(train_prog, train_exe, logger, args):
logger.info("total param num: {0}".format(num_sum))
def find_best_answer_for_passage(start_probs, end_probs, passage_len, args):
def find_best_answer_for_passage(start_probs, end_probs, passage_len):
"""
Finds the best answer with the maximum start_prob * end_prob from a single passage
"""
......@@ -125,7 +170,7 @@ def find_best_answer_for_passage(start_probs, end_probs, passage_len, args):
return (best_start, best_end), max_prob
def find_best_answer(sample, start_prob, end_prob, padded_p_len, args):
def find_best_answer_for_inst(sample, start_prob, end_prob, inst_lod):
"""
Finds the best answer for a sample given start_prob and end_prob for each position.
This will call find_best_answer_for_passage because there are multiple passages in a sample
......@@ -134,11 +179,16 @@ def find_best_answer(sample, start_prob, end_prob, padded_p_len, args):
for p_idx, passage in enumerate(sample['passages']):
if p_idx >= args.max_p_num:
continue
if len(start_prob) != len(end_prob):
logger.info('error: {}'.format(sample['question']))
continue
passage_start = inst_lod[p_idx] - inst_lod[0]
passage_end = inst_lod[p_idx + 1] - inst_lod[0]
passage_len = passage_end - passage_start
passage_len = min(args.max_p_len, len(passage['passage_tokens']))
answer_span, score = find_best_answer_for_passage(
start_prob[p_idx * padded_p_len:(p_idx + 1) * padded_p_len],
end_prob[p_idx * padded_p_len:(p_idx + 1) * padded_p_len],
passage_len, args)
start_prob[passage_start:passage_end],
end_prob[passage_start:passage_end], passage_len)
if score > best_score:
best_score = score
best_p_idx = p_idx
......@@ -148,11 +198,11 @@ def find_best_answer(sample, start_prob, end_prob, padded_p_len, args):
else:
best_answer = ''.join(sample['passages'][best_p_idx]['passage_tokens'][
best_span[0]:best_span[1] + 1])
return best_answer
return best_answer, best_span
def validation(inference_program, avg_cost, s_probs, e_probs, feed_order, place,
vocab, brc_data, logger, args):
def validation(inference_program, avg_cost, s_probs, e_probs, match, feed_order,
place, dev_count, vocab, brc_data, logger, args):
"""
"""
......@@ -165,6 +215,8 @@ def validation(inference_program, avg_cost, s_probs, e_probs, feed_order, place,
# Use test set as validation each pass
total_loss = 0.0
count = 0
n_batch_cnt = 0
n_batch_loss = 0.0
pred_answers, ref_answers = [], []
val_feed_list = [
inference_program.global_block().var(var_name)
......@@ -172,55 +224,80 @@ def validation(inference_program, avg_cost, s_probs, e_probs, feed_order, place,
]
val_feeder = fluid.DataFeeder(val_feed_list, place)
pad_id = vocab.get_id(vocab.pad_token)
dev_batches = brc_data.gen_mini_batches(
'dev', args.batch_size, pad_id, shuffle=False)
dev_reader = lambda:brc_data.gen_mini_batches('dev', args.batch_size, pad_id, shuffle=False)
dev_reader = read_multiple(dev_reader, dev_count)
for batch_id, batch in enumerate(dev_batches, 1):
feed_data = prepare_batch_input(batch, args)
for batch_id, batch_list in enumerate(dev_reader(), 1):
feed_data = batch_reader(batch_list, args)
val_fetch_outs = parallel_executor.run(
feed=val_feeder.feed(feed_data),
fetch_list=[avg_cost.name, s_probs.name, e_probs.name],
feed=list(val_feeder.feed_parallel(feed_data, dev_count)),
fetch_list=[avg_cost.name, s_probs.name, e_probs.name, match.name],
return_numpy=False)
total_loss += np.array(val_fetch_outs[0])[0]
start_probs = LodTensor_Array(val_fetch_outs[1])
end_probs = LodTensor_Array(val_fetch_outs[2])
count += len(batch['raw_data'])
padded_p_len = len(batch['passage_token_ids'][0])
for sample, start_prob, end_prob in zip(batch['raw_data'], start_probs,
end_probs):
best_answer = find_best_answer(sample, start_prob, end_prob,
padded_p_len, args)
pred_answers.append({
total_loss += np.array(val_fetch_outs[0]).sum()
start_probs_m = LodTensor_Array(val_fetch_outs[1])
end_probs_m = LodTensor_Array(val_fetch_outs[2])
match_lod = val_fetch_outs[3].lod()
count += len(np.array(val_fetch_outs[0]))
n_batch_cnt += len(np.array(val_fetch_outs[0]))
n_batch_loss += np.array(val_fetch_outs[0]).sum()
log_every_n_batch = args.log_interval
if log_every_n_batch > 0 and batch_id % log_every_n_batch == 0:
logger.info('Average dev loss from batch {} to {} is {}'.format(
batch_id - log_every_n_batch + 1, batch_id, "%.10f" % (
n_batch_loss / n_batch_cnt)))
n_batch_loss = 0.0
n_batch_cnt = 0
for idx, batch in enumerate(batch_list):
#one batch
batch_size = len(batch['raw_data'])
batch_range = match_lod[0][idx * batch_size:(idx + 1) * batch_size +
1]
batch_lod = [[batch_range[x], batch_range[x + 1]]
for x in range(len(batch_range[:-1]))]
start_prob_batch = start_probs_m[idx * batch_size:(idx + 1) *
batch_size]
end_prob_batch = end_probs_m[idx * batch_size:(idx + 1) *
batch_size]
for sample, start_prob_inst, end_prob_inst, inst_range in zip(
batch['raw_data'], start_prob_batch, end_prob_batch,
batch_lod):
#one instance
inst_lod = match_lod[1][inst_range[0]:inst_range[1] + 1]
best_answer, best_span = find_best_answer_for_inst(
sample, start_prob_inst, end_prob_inst, inst_lod)
pred = {
'question_id': sample['question_id'],
'question_type': sample['question_type'],
'answers': [best_answer],
'entity_answers': [[]],
'yesno_answers': []
})
'yesno_answers': [best_span]
}
pred_answers.append(pred)
if 'answers' in sample:
ref_answers.append({
ref = {
'question_id': sample['question_id'],
'question_type': sample['question_type'],
'answers': sample['answers'],
'entity_answers': [[]],
'yesno_answers': []
})
if args.result_dir is not None and args.result_name is not None:
}
ref_answers.append(ref)
result_dir = args.result_dir
result_prefix = args.result_name
if result_dir is not None and result_prefix is not None:
if not os.path.exists(args.result_dir):
os.makedirs(args.result_dir)
result_file = os.path.join(args.result_dir, args.result_name + '.json')
result_file = os.path.join(result_dir, result_prefix + 'json')
with open(result_file, 'w') as fout:
for pred_answer in pred_answers:
fout.write(json.dumps(pred_answer, ensure_ascii=False) + '\n')
logger.info('Saving {} results to {}'.format(args.result_name,
logger.info('Saving {} results to {}'.format(result_prefix,
result_file))
ave_loss = 1.0 * total_loss / count
# compute the bleu and rouge scores if reference answers is provided
if len(ref_answers) > 0:
pred_dict, ref_dict = {}, {}
......@@ -250,6 +327,13 @@ def train(logger, args):
brc_data.convert_to_ids(vocab)
logger.info('Initialize the model...')
if not args.use_gpu:
place = fluid.CPUPlace()
dev_count = int(os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
else:
place = fluid.CUDAPlace(0)
dev_count = fluid.core.get_cuda_device_count()
# build model
main_program = fluid.Program()
startup_prog = fluid.Program()
......@@ -257,7 +341,7 @@ def train(logger, args):
startup_prog.random_seed = args.random_seed
with fluid.program_guard(main_program, startup_prog):
with fluid.unique_name.guard():
avg_cost, s_probs, e_probs, feed_order = rc_model.rc_model(
avg_cost, s_probs, e_probs, match, feed_order = rc_model.rc_model(
args.hidden_size, vocab, args)
# clone from default main program and use it as the validation program
inference_program = main_program.clone(for_test=True)
......@@ -314,20 +398,21 @@ def train(logger, args):
for pass_id in range(1, args.pass_num + 1):
pass_start_time = time.time()
pad_id = vocab.get_id(vocab.pad_token)
train_batches = brc_data.gen_mini_batches(
'train', args.batch_size, pad_id, shuffle=True)
train_reader = lambda:brc_data.gen_mini_batches('train', args.batch_size, pad_id, shuffle=False)
train_reader = read_multiple(train_reader, dev_count)
log_every_n_batch, n_batch_loss = args.log_interval, 0
total_num, total_loss = 0, 0
for batch_id, batch in enumerate(train_batches, 1):
input_data_dict = prepare_batch_input(batch, args)
for batch_id, batch_list in enumerate(train_reader(), 1):
feed_data = batch_reader(batch_list, args)
fetch_outs = parallel_executor.run(
feed=feeder.feed(input_data_dict),
feed=list(feeder.feed_parallel(feed_data, dev_count)),
fetch_list=[avg_cost.name],
return_numpy=False)
cost_train = np.array(fetch_outs[0])[0]
total_num += len(batch['raw_data'])
cost_train = np.array(fetch_outs[0]).mean()
total_num += args.batch_size * dev_count
n_batch_loss += cost_train
total_loss += cost_train * len(batch['raw_data'])
total_loss += cost_train * args.batch_size * dev_count
if log_every_n_batch > 0 and batch_id % log_every_n_batch == 0:
print_para(main_program, parallel_executor, logger,
args)
......@@ -337,19 +422,23 @@ def train(logger, args):
"%.10f" % (n_batch_loss / log_every_n_batch)))
n_batch_loss = 0
if args.dev_interval > 0 and batch_id % args.dev_interval == 0:
if brc_data.dev_set is not None:
eval_loss, bleu_rouge = validation(
inference_program, avg_cost, s_probs, e_probs,
feed_order, place, vocab, brc_data, logger, args)
match, feed_order, place, dev_count, vocab,
brc_data, logger, args)
logger.info('Dev eval loss {}'.format(eval_loss))
logger.info('Dev eval result: {}'.format(bleu_rouge))
logger.info('Dev eval result: {}'.format(
bleu_rouge))
pass_end_time = time.time()
logger.info('Evaluating the model after epoch {}'.format(
pass_id))
if brc_data.dev_set is not None:
eval_loss, bleu_rouge = validation(
inference_program, avg_cost, s_probs, e_probs,
feed_order, place, vocab, brc_data, logger, args)
inference_program, avg_cost, s_probs, e_probs, match,
feed_order, place, dev_count, vocab, brc_data, logger,
args)
logger.info('Dev eval loss {}'.format(eval_loss))
logger.info('Dev eval result: {}'.format(bleu_rouge))
else:
......@@ -389,10 +478,17 @@ def evaluate(logger, args):
startup_prog.random_seed = args.random_seed
with fluid.program_guard(main_program, startup_prog):
with fluid.unique_name.guard():
avg_cost, s_probs, e_probs, feed_order = rc_model.rc_model(
avg_cost, s_probs, e_probs, match, feed_order = rc_model.rc_model(
args.hidden_size, vocab, args)
# initialize parameters
place = core.CUDAPlace(0) if args.use_gpu else core.CPUPlace()
if not args.use_gpu:
place = fluid.CPUPlace()
dev_count = int(
os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
else:
place = fluid.CUDAPlace(0)
dev_count = fluid.core.get_cuda_device_count()
exe = Executor(place)
if args.load_dir:
logger.info('load from {}'.format(args.load_dir))
......@@ -402,17 +498,10 @@ def evaluate(logger, args):
logger.error('No model file to load ...')
return
# prepare data
feed_list = [
main_program.global_block().var(var_name)
for var_name in feed_order
]
feeder = fluid.DataFeeder(feed_list, place)
inference_program = main_program.clone(for_test=True)
eval_loss, bleu_rouge = validation(
inference_program, avg_cost, s_probs, e_probs, feed_order,
place, vocab, brc_data, logger, args)
place, dev_count, vocab, brc_data, logger, args)
logger.info('Dev eval loss {}'.format(eval_loss))
logger.info('Dev eval result: {}'.format(bleu_rouge))
logger.info('Predicted answers are saved to {}'.format(
......@@ -438,10 +527,17 @@ def predict(logger, args):
startup_prog.random_seed = args.random_seed
with fluid.program_guard(main_program, startup_prog):
with fluid.unique_name.guard():
avg_cost, s_probs, e_probs, feed_order = rc_model.rc_model(
avg_cost, s_probs, e_probs, match, feed_order = rc_model.rc_model(
args.hidden_size, vocab, args)
# initialize parameters
place = core.CUDAPlace(0) if args.use_gpu else core.CPUPlace()
if not args.use_gpu:
place = fluid.CPUPlace()
dev_count = int(
os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
else:
place = fluid.CUDAPlace(0)
dev_count = fluid.core.get_cuda_device_count()
exe = Executor(place)
if args.load_dir:
logger.info('load from {}'.format(args.load_dir))
......@@ -451,17 +547,10 @@ def predict(logger, args):
logger.error('No model file to load ...')
return
# prepare data
feed_list = [
main_program.global_block().var(var_name)
for var_name in feed_order
]
feeder = fluid.DataFeeder(feed_list, place)
inference_program = main_program.clone(for_test=True)
eval_loss, bleu_rouge = validation(
inference_program, avg_cost, s_probs, e_probs, feed_order,
place, vocab, brc_data, logger, args)
inference_program, avg_cost, s_probs, e_probs, match,
feed_order, place, dev_count, vocab, brc_data, logger, args)
def prepare(logger, args):
......
export CUDA_VISIBLE_DEVICES=1
export CUDA_VISIBLE_DEVICES=0
python run.py \
--trainset 'data/preprocessed/trainset/search.train.json' \
'data/preprocessed/trainset/zhidao.train.json' \
......@@ -11,11 +11,12 @@ python run.py \
--save_dir ./models \
--pass_num 10 \
--learning_rate 0.001 \
--batch_size 8 \
--batch_size 32 \
--embed_size 300 \
--hidden_size 150 \
--max_p_num 5 \
--max_p_len 500 \
--max_q_len 60 \
--max_a_len 200 \
--weight_decay 0.0 \
--drop_rate 0.2 $@\
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册