diff --git a/fluid/deep_attention_matching_net/douban/train.sh b/fluid/deep_attention_matching_net/douban/train.sh index 3c18df7dc1bccb0485b814cf519f23259ed52144..6ed91319a7880f00a1f8b202ebe057ca1145f615 100644 --- a/fluid/deep_attention_matching_net/douban/train.sh +++ b/fluid/deep_attention_matching_net/douban/train.sh @@ -5,6 +5,7 @@ python -u ../train_and_evaluate.py --use_cuda \ --ext_eval \ --word_emb_init ./data/word_embedding.pkl \ --save_path ./models \ + --use_pyreader \ --batch_size 256 \ --vocab_size 172130 \ --channel1_num 16 \ diff --git a/fluid/deep_attention_matching_net/model.py b/fluid/deep_attention_matching_net/model.py index e0c3f478237bf8b9a467c812fa7bb7589e864c92..537722a419038b2832e9f7234b3fe3c08baa2bf8 100644 --- a/fluid/deep_attention_matching_net/model.py +++ b/fluid/deep_attention_matching_net/model.py @@ -15,45 +15,85 @@ class Net(object): self._stack_num = stack_num self._channel1_num = channel1_num self._channel2_num = channel2_num + self._feed_names = [] self.word_emb_name = "shared_word_emb" self.use_stack_op = True self.use_mask_cache = True self.use_sparse_embedding = True - def set_word_embedding(self, word_emb, place): - word_emb_param = fluid.global_scope().find_var( - self.word_emb_name).get_tensor() - word_emb_param.set(word_emb, place) - - def create_network(self): - mask_cache = dict() if self.use_mask_cache else None - - turns_data = [] + def create_py_reader(self, capacity, name): + # turns ids + shapes = [[-1, self._max_turn_len, 1] + for i in six.moves.xrange(self._max_turn_num)] + dtypes = ["int32" for i in six.moves.xrange(self._max_turn_num)] + # turns mask + shapes += [[-1, self._max_turn_len, 1] + for i in six.moves.xrange(self._max_turn_num)] + dtypes += ["float32" for i in six.moves.xrange(self._max_turn_num)] + + # response ids, response mask, label + shapes += [[-1, self._max_turn_len, 1], [-1, self._max_turn_len, 1], + [-1, 1]] + dtypes += ["int32", "float32", "float32"] + + py_reader = fluid.layers.py_reader( + capacity=capacity, + shapes=shapes, + lod_levels=[0] * (2 * self._max_turn_num + 3), + dtypes=dtypes, + name=name, + use_double_buffer=True) + + data_vars = fluid.layers.read_file(py_reader) + + self.turns_data = data_vars[0:self._max_turn_num] + self.turns_mask = data_vars[self._max_turn_num:2 * self._max_turn_num] + self.response = data_vars[-3] + self.response_mask = data_vars[-2] + self.label = data_vars[-1] + return py_reader + + def create_data_layers(self): + self._feed_names = [] + + self.turns_data = [] for i in six.moves.xrange(self._max_turn_num): + name = "turn_%d" % i turn = fluid.layers.data( - name="turn_%d" % i, - shape=[self._max_turn_len, 1], - dtype="int32") - turns_data.append(turn) + name=name, shape=[self._max_turn_len, 1], dtype="int32") + self.turns_data.append(turn) + self._feed_names.append(name) - turns_mask = [] + self.turns_mask = [] for i in six.moves.xrange(self._max_turn_num): + name = "turn_mask_%d" % i turn_mask = fluid.layers.data( - name="turn_mask_%d" % i, - shape=[self._max_turn_len, 1], - dtype="float32") - turns_mask.append(turn_mask) + name=name, shape=[self._max_turn_len, 1], dtype="float32") + self.turns_mask.append(turn_mask) + self._feed_names.append(name) - response = fluid.layers.data( + self.response = fluid.layers.data( name="response", shape=[self._max_turn_len, 1], dtype="int32") - response_mask = fluid.layers.data( + self.response_mask = fluid.layers.data( name="response_mask", shape=[self._max_turn_len, 1], dtype="float32") - label = fluid.layers.data(name="label", shape=[1], dtype="float32") + self.label = fluid.layers.data(name="label", shape=[1], dtype="float32") + self._feed_names += ["response", "response_mask", "label"] + + def get_feed_names(self): + return self._feed_names + + def set_word_embedding(self, word_emb, place): + word_emb_param = fluid.global_scope().find_var( + self.word_emb_name).get_tensor() + word_emb_param.set(word_emb, place) + + def create_network(self): + mask_cache = dict() if self.use_mask_cache else None response_emb = fluid.layers.embedding( - input=response, + input=self.response, size=[self._vocab_size + 1, self._emb_size], is_sparse=self.use_sparse_embedding, param_attr=fluid.ParamAttr( @@ -71,8 +111,8 @@ class Net(object): key=Hr, value=Hr, d_key=self._emb_size, - q_mask=response_mask, - k_mask=response_mask, + q_mask=self.response_mask, + k_mask=self.response_mask, mask_cache=mask_cache) Hr_stack.append(Hr) @@ -80,7 +120,7 @@ class Net(object): sim_turns = [] for t in six.moves.xrange(self._max_turn_num): Hu = fluid.layers.embedding( - input=turns_data[t], + input=self.turns_data[t], size=[self._vocab_size + 1, self._emb_size], is_sparse=self.use_sparse_embedding, param_attr=fluid.ParamAttr( @@ -96,8 +136,8 @@ class Net(object): key=Hu, value=Hu, d_key=self._emb_size, - q_mask=turns_mask[t], - k_mask=turns_mask[t], + q_mask=self.turns_mask[t], + k_mask=self.turns_mask[t], mask_cache=mask_cache) Hu_stack.append(Hu) @@ -111,8 +151,8 @@ class Net(object): key=Hr_stack[index], value=Hr_stack[index], d_key=self._emb_size, - q_mask=turns_mask[t], - k_mask=response_mask, + q_mask=self.turns_mask[t], + k_mask=self.response_mask, mask_cache=mask_cache) r_a_t = layers.block( name="r_attend_t_" + str(index), @@ -120,8 +160,8 @@ class Net(object): key=Hu_stack[index], value=Hu_stack[index], d_key=self._emb_size, - q_mask=response_mask, - k_mask=turns_mask[t], + q_mask=self.response_mask, + k_mask=self.turns_mask[t], mask_cache=mask_cache) t_a_r_stack.append(t_a_r) @@ -158,5 +198,5 @@ class Net(object): sim = fluid.layers.concat(input=sim_turns, axis=2) final_info = layers.cnn_3d(sim, self._channel1_num, self._channel2_num) - loss, logits = layers.loss(final_info, label) + loss, logits = layers.loss(final_info, self.label) return loss, logits diff --git a/fluid/deep_attention_matching_net/train_and_evaluate.py b/fluid/deep_attention_matching_net/train_and_evaluate.py index d4f8374c66938985a946f5cf2da6d1b6e391f37c..bd701904d6445be22d8bcb0634b78ad59713cc21 100644 --- a/fluid/deep_attention_matching_net/train_and_evaluate.py +++ b/fluid/deep_attention_matching_net/train_and_evaluate.py @@ -7,7 +7,7 @@ import multiprocessing import paddle import paddle.fluid as fluid import utils.reader as reader -from utils.util import print_arguments +from utils.util import print_arguments, mkdir try: import cPickle as pickle #python 2 @@ -49,6 +49,10 @@ def parse_args(): '--use_cuda', action='store_true', help='If set, use cuda for training.') + parser.add_argument( + '--use_pyreader', + action='store_true', + help='If set, use pyreader for reading data.') parser.add_argument( '--ext_eval', action='store_true', @@ -105,7 +109,75 @@ def parse_args(): #yapf: enable +def evaluate(score_path, result_file_path): + if args.ext_eval: + import utils.douban_evaluation as eva + else: + import utils.evaluation as eva + #write evaluation result + result = eva.evaluate(score_path) + with open(result_file_path, 'w') as out_file: + for p_at in result: + out_file.write(str(p_at) + '\n') + print('finish evaluation') + print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) + + +def test_with_feed(exe, program, feed_names, fetch_list, score_path, batches, + batch_num, dev_count): + score_file = open(score_path, 'w') + for it in six.moves.xrange(batch_num // dev_count): + feed_list = [] + for dev in six.moves.xrange(dev_count): + val_index = it * dev_count + dev + batch_data = reader.make_one_batch_input(batches, val_index) + feed_dict = dict(zip(feed_names, batch_data)) + feed_list.append(feed_dict) + + predicts = exe.run(feed=feed_list, fetch_list=fetch_list) + + scores = np.array(predicts[0]) + for dev in six.moves.xrange(dev_count): + val_index = it * dev_count + dev + for i in six.moves.xrange(args.batch_size): + score_file.write( + str(scores[args.batch_size * dev + i][0]) + '\t' + str( + batches["label"][val_index][i]) + '\n') + score_file.close() + + +def test_with_pyreader(exe, program, pyreader, fetch_list, score_path, batches, + batch_num, dev_count): + def data_provider(): + for index in six.moves.xrange(batch_num): + yield reader.make_one_batch_input(batches, index) + + score_file = open(score_path, 'w') + pyreader.decorate_tensor_provider(data_provider) + it = 0 + pyreader.start() + while True: + try: + predicts = exe.run(fetch_list=fetch_list) + + scores = np.array(predicts[0]) + for dev in six.moves.xrange(dev_count): + val_index = it * dev_count + dev + for i in six.moves.xrange(args.batch_size): + score_file.write( + str(scores[args.batch_size * dev + i][0]) + '\t' + str( + batches["label"][val_index][i]) + '\n') + it += 1 + except fluid.core.EOFException: + pyreader.reset() + break + score_file.close() + + def train(args): + if not os.path.exists(args.save_path): + os.makedirs(args.save_path) + # data data_config data_conf = { "batch_size": args.batch_size, @@ -117,27 +189,47 @@ def train(args): dam = Net(args.max_turn_num, args.max_turn_len, args.vocab_size, args.emb_size, args.stack_num, args.channel1_num, args.channel2_num) - loss, logits = dam.create_network() - loss.persistable = True - logits.persistable = True - - train_program = fluid.default_main_program() - test_program = fluid.default_main_program().clone(for_test=True) - - # gradient clipping - fluid.clip.set_gradient_clip(clip=fluid.clip.GradientClipByValue( - max=1.0, min=-1.0)) - - optimizer = fluid.optimizer.Adam( - learning_rate=fluid.layers.exponential_decay( - learning_rate=args.learning_rate, - decay_steps=400, - decay_rate=0.9, - staircase=True)) - optimizer.minimize(loss) - - fluid.memory_optimize(train_program) + train_program = fluid.Program() + train_startup = fluid.Program() + with fluid.program_guard(train_program, train_startup): + with fluid.unique_name.guard(): + if args.use_pyreader: + train_pyreader = dam.create_py_reader( + capacity=10, name='train_reader') + else: + dam.create_data_layers() + loss, logits = dam.create_network() + loss.persistable = True + logits.persistable = True + # gradient clipping + fluid.clip.set_gradient_clip(clip=fluid.clip.GradientClipByValue( + max=1.0, min=-1.0)) + + optimizer = fluid.optimizer.Adam( + learning_rate=fluid.layers.exponential_decay( + learning_rate=args.learning_rate, + decay_steps=400, + decay_rate=0.9, + staircase=True)) + optimizer.minimize(loss) + fluid.memory_optimize(train_program) + + test_program = fluid.Program() + test_startup = fluid.Program() + with fluid.program_guard(test_program, test_startup): + with fluid.unique_name.guard(): + if args.use_pyreader: + test_pyreader = dam.create_py_reader( + capacity=10, name='test_reader') + else: + dam.create_data_layers() + + loss, logits = dam.create_network() + loss.persistable = True + logits.persistable = True + + test_program = test_program.clone(for_test=True) if args.use_cuda: place = fluid.CUDAPlace(0) @@ -152,7 +244,8 @@ def train(args): program=train_program, batch_size=args.batch_size)) exe = fluid.Executor(place) - exe.run(fluid.default_startup_program()) + exe.run(train_startup) + exe.run(test_startup) train_exe = fluid.ParallelExecutor( use_cuda=args.use_cuda, loss_name=loss.name, main_program=train_program) @@ -162,11 +255,6 @@ def train(args): main_program=test_program, share_vars_from=train_exe) - if args.ext_eval: - import utils.douban_evaluation as eva - else: - import utils.evaluation as eva - if args.word_emb_init is not None: print("start loading word embedding init ...") if six.PY2: @@ -199,17 +287,15 @@ def train(args): print("begin model training ...") print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) - step = 0 - for epoch in six.moves.xrange(args.num_scan_data): - shuffle_train = reader.unison_shuffle(train_data) - train_batches = reader.build_batches(shuffle_train, data_conf) - + # train on one epoch data by feeding + def train_with_feed(step): ave_cost = 0.0 for it in six.moves.xrange(batch_num // dev_count): feed_list = [] for dev in six.moves.xrange(dev_count): index = it * dev_count + dev - feed_dict = reader.make_one_batch_input(train_batches, index) + batch_data = reader.make_one_batch_input(train_batches, index) + feed_dict = dict(zip(dam.get_feed_names(), batch_data)) feed_list.append(feed_dict) cost = train_exe.run(feed=feed_list, fetch_list=[loss.name]) @@ -226,41 +312,73 @@ def train(args): print("Save model at step %d ... " % step) print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) - fluid.io.save_persistables(exe, save_path) + fluid.io.save_persistables(exe, save_path, train_program) score_path = os.path.join(args.save_path, 'score.' + str(step)) - score_file = open(score_path, 'w') - for it in six.moves.xrange(val_batch_num // dev_count): - feed_list = [] - for dev in six.moves.xrange(dev_count): - val_index = it * dev_count + dev - feed_dict = reader.make_one_batch_input(val_batches, - val_index) - feed_list.append(feed_dict) - - predicts = test_exe.run(feed=feed_list, - fetch_list=[logits.name]) - - scores = np.array(predicts[0]) - for dev in six.moves.xrange(dev_count): - val_index = it * dev_count + dev - for i in six.moves.xrange(args.batch_size): - score_file.write( - str(scores[args.batch_size * dev + i][0]) + '\t' - + str(val_batches["label"][val_index][ - i]) + '\n') - score_file.close() - - #write evaluation result - result = eva.evaluate(score_path) + test_with_feed(test_exe, test_program, + dam.get_feed_names(), [logits.name], score_path, + val_batches, val_batch_num, dev_count) + result_file_path = os.path.join(args.save_path, 'result.' + str(step)) - with open(result_file_path, 'w') as out_file: - for p_at in result: - out_file.write(str(p_at) + '\n') - print('finish evaluation') - print(time.strftime('%Y-%m-%d %H:%M:%S', - time.localtime(time.time()))) + evaluate(score_path, result_file_path) + return step + + # train on one epoch with pyreader + def train_with_pyreader(step): + def data_provider(): + for index in six.moves.xrange(batch_num): + yield reader.make_one_batch_input(train_batches, index) + + train_pyreader.decorate_tensor_provider(data_provider) + + ave_cost = 0.0 + train_pyreader.start() + while True: + try: + cost = train_exe.run(fetch_list=[loss.name]) + + ave_cost += np.array(cost[0]).mean() + step = step + 1 + if step % print_step == 0: + print("processed: [" + str(step * dev_count * 1.0 / + batch_num) + "] ave loss: [" + + str(ave_cost / print_step) + "]") + ave_cost = 0.0 + + if (args.save_path is not None) and (step % save_step == 0): + save_path = os.path.join(args.save_path, + "step_" + str(step)) + print("Save model at step %d ... " % step) + print(time.strftime('%Y-%m-%d %H:%M:%S', + time.localtime(time.time()))) + fluid.io.save_persistables(exe, save_path, train_program) + + score_path = os.path.join(args.save_path, + 'score.' + str(step)) + test_with_pyreader(test_exe, test_program, test_pyreader, + [logits.name], score_path, val_batches, + val_batch_num, dev_count) + + result_file_path = os.path.join(args.save_path, + 'result.' + str(step)) + evaluate(score_path, result_file_path) + + except fluid.core.EOFException: + train_pyreader.reset() + break + return step + + # train over different epoches + global_step = 0 + for epoch in six.moves.xrange(args.num_scan_data): + shuffle_train = reader.unison_shuffle(train_data) + train_batches = reader.build_batches(shuffle_train, data_conf) + + if args.use_pyreader: + global_step = train_with_pyreader(global_step) + else: + global_step = train_with_feed(global_step) if __name__ == '__main__': diff --git a/fluid/deep_attention_matching_net/ubuntu/train.sh b/fluid/deep_attention_matching_net/ubuntu/train.sh index 451e017aaa76c68cfca1e0d82cc64ea43ba61ae9..66ebc2e62f66276f808ac36dfad31470950fd9b9 100644 --- a/fluid/deep_attention_matching_net/ubuntu/train.sh +++ b/fluid/deep_attention_matching_net/ubuntu/train.sh @@ -4,6 +4,7 @@ python -u ../train_and_evaluate.py --use_cuda \ --data_path ./data/data.pkl \ --word_emb_init ./data/word_embedding.pkl \ --save_path ./models \ + --use_pyreader \ --batch_size 256 \ --vocab_size 434512 \ --emb_size 200 \ diff --git a/fluid/deep_attention_matching_net/utils/reader.py b/fluid/deep_attention_matching_net/utils/reader.py index af687a6eb35b2ca4d88eff064190025a284c27c9..96b4bfd71658c037076c4c64a3cccccb467c5e9e 100644 --- a/fluid/deep_attention_matching_net/utils/reader.py +++ b/fluid/deep_attention_matching_net/utils/reader.py @@ -202,30 +202,30 @@ def make_one_batch_input(data_batches, index): every_turn_len[:, i] for i in six.moves.xrange(max_turn_num) ] - feed_dict = {} + feed_list = [] for i, turn in enumerate(turns_list): - feed_dict["turn_%d" % i] = turn - feed_dict["turn_%d" % i] = np.expand_dims( - feed_dict["turn_%d" % i], axis=-1) + turn = np.expand_dims(turn, axis=-1) + feed_list.append(turn) for i, turn_len in enumerate(every_turn_len_list): - feed_dict["turn_mask_%d" % i] = np.ones( - (batch_size, max_turn_len, 1)).astype("float32") + turn_mask = np.ones((batch_size, max_turn_len, 1)).astype("float32") for row in six.moves.xrange(batch_size): - feed_dict["turn_mask_%d" % i][row, turn_len[row]:, 0] = 0 + turn_mask[row, turn_len[row]:, 0] = 0 + feed_list.append(turn_mask) - feed_dict["response"] = response - feed_dict["response"] = np.expand_dims(feed_dict["response"], axis=-1) + response = np.expand_dims(response, axis=-1) + feed_list.append(response) - feed_dict["response_mask"] = np.ones( - (batch_size, max_turn_len, 1)).astype("float32") + response_mask = np.ones((batch_size, max_turn_len, 1)).astype("float32") for row in six.moves.xrange(batch_size): - feed_dict["response_mask"][row, response_len[row]:, 0] = 0 + response_mask[row, response_len[row]:, 0] = 0 + feed_list.append(response_mask) - feed_dict["label"] = np.array([data_batches["label"][index]]).reshape( + label = np.array([data_batches["label"][index]]).reshape( [-1, 1]).astype("float32") + feed_list.append(label) - return feed_dict + return feed_list if __name__ == '__main__': diff --git a/fluid/image_classification/dist_train/args.py b/fluid/image_classification/dist_train/args.py deleted file mode 100644 index fff9fd11a194abf05e49b76fb89125036b2c893d..0000000000000000000000000000000000000000 --- a/fluid/image_classification/dist_train/args.py +++ /dev/null @@ -1,118 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import argparse - -__all__ = ['parse_args', ] - -BENCHMARK_MODELS = [ - "ResNet50", "ResNet101", "ResNet152" -] - - -def parse_args(): - parser = argparse.ArgumentParser('Distributed Image Classification Training.') - parser.add_argument( - '--model', - type=str, - choices=BENCHMARK_MODELS, - default='resnet', - help='The model to run benchmark with.') - parser.add_argument( - '--batch_size', type=int, default=32, help='The minibatch size.') - # args related to learning rate - parser.add_argument( - '--learning_rate', type=float, default=0.001, help='The learning rate.') - # TODO(wuyi): add "--use_fake_data" option back. - parser.add_argument( - '--skip_batch_num', - type=int, - default=5, - help='The first num of minibatch num to skip, for better performance test' - ) - parser.add_argument( - '--iterations', type=int, default=80, help='The number of minibatches.') - parser.add_argument( - '--pass_num', type=int, default=100, help='The number of passes.') - parser.add_argument( - '--data_format', - type=str, - default='NCHW', - choices=['NCHW', 'NHWC'], - help='The data data_format, now only support NCHW.') - parser.add_argument( - '--device', - type=str, - default='GPU', - choices=['CPU', 'GPU'], - help='The device type.') - parser.add_argument( - '--gpus', - type=int, - default=1, - help='If gpus > 1, will use ParallelExecutor to run, else use Executor.') - # this option is available only for vgg and resnet. - parser.add_argument( - '--cpus', - type=int, - default=1, - help='If cpus > 1, will set ParallelExecutor to use multiple threads.') - parser.add_argument( - '--data_set', - type=str, - default='flowers', - choices=['cifar10', 'flowers', 'imagenet'], - help='Optional dataset for benchmark.') - parser.add_argument( - '--no_test', - action='store_true', - help='If set, do not test the testset during training.') - parser.add_argument( - '--memory_optimize', - action='store_true', - help='If set, optimize runtime memory before start.') - parser.add_argument( - '--update_method', - type=str, - default='local', - choices=['local', 'pserver', 'nccl2'], - help='Choose parameter update method, can be local, pserver, nccl2.') - parser.add_argument( - '--no_split_var', - action='store_true', - default=False, - help='Whether split variables into blocks when update_method is pserver') - parser.add_argument( - '--async_mode', - action='store_true', - default=False, - help='Whether start pserver in async mode to support ASGD') - parser.add_argument( - '--no_random', - action='store_true', - help='If set, keep the random seed and do not shuffle the data.') - parser.add_argument( - '--reduce_strategy', - type=str, - choices=['reduce', 'all_reduce'], - default='all_reduce', - help='Specify the reduce strategy, can be reduce, all_reduce') - parser.add_argument( - '--data_dir', - type=str, - default="../data/ILSVRC2012", - help="The ImageNet dataset root dir." - ) - args = parser.parse_args() - return args diff --git a/fluid/image_classification/dist_train/dist_train.py b/fluid/image_classification/dist_train/dist_train.py index 127aff6e527ded5475cbe7d785d268baa6df43d8..160bfb95ac4cdb38083891b9e5f3e76d5371fc06 100644 --- a/fluid/image_classification/dist_train/dist_train.py +++ b/fluid/image_classification/dist_train/dist_train.py @@ -26,9 +26,84 @@ import six import sys sys.path.append("..") import models -from args import * from reader import train, val +def parse_args(): + parser = argparse.ArgumentParser('Distributed Image Classification Training.') + parser.add_argument( + '--model', + type=str, + default='DistResNet', + help='The model to run.') + parser.add_argument( + '--batch_size', type=int, default=32, help='The minibatch size per device.') + parser.add_argument( + '--multi_batch_repeat', type=int, default=1, help='Batch merge repeats.') + parser.add_argument( + '--learning_rate', type=float, default=0.1, help='The learning rate.') + parser.add_argument( + '--pass_num', type=int, default=90, help='The number of passes.') + parser.add_argument( + '--data_format', + type=str, + default='NCHW', + choices=['NCHW', 'NHWC'], + help='The data data_format, now only support NCHW.') + parser.add_argument( + '--device', + type=str, + default='GPU', + choices=['CPU', 'GPU'], + help='The device type.') + parser.add_argument( + '--gpus', + type=int, + default=1, + help='If gpus > 1, will use ParallelExecutor to run, else use Executor.') + parser.add_argument( + '--cpus', + type=int, + default=1, + help='If cpus > 1, will set ParallelExecutor to use multiple threads.') + parser.add_argument( + '--no_test', + action='store_true', + help='If set, do not test the testset during training.') + parser.add_argument( + '--memory_optimize', + action='store_true', + help='If set, optimize runtime memory before start.') + parser.add_argument( + '--update_method', + type=str, + default='local', + choices=['local', 'pserver', 'nccl2'], + help='Choose parameter update method, can be local, pserver, nccl2.') + parser.add_argument( + '--no_split_var', + action='store_true', + default=False, + help='Whether split variables into blocks when update_method is pserver') + parser.add_argument( + '--async_mode', + action='store_true', + default=False, + help='Whether start pserver in async mode to support ASGD') + parser.add_argument( + '--reduce_strategy', + type=str, + choices=['reduce', 'all_reduce'], + default='all_reduce', + help='Specify the reduce strategy, can be reduce, all_reduce') + parser.add_argument( + '--data_dir', + type=str, + default="../data/ILSVRC2012", + help="The ImageNet dataset root dir." + ) + args = parser.parse_args() + return args + def get_model(args, is_train, main_prog, startup_prog): pyreader = None class_dim = 1000 @@ -51,7 +126,7 @@ def get_model(args, is_train, main_prog, startup_prog): name="train_reader" if is_train else "test_reader", use_double_buffer=True) input, label = fluid.layers.read_file(pyreader) - model_def = models.__dict__[args.model]() + model_def = models.__dict__[args.model](layers=50, is_train=is_train) predict = model_def.net(input, class_dim=class_dim) cost = fluid.layers.cross_entropy(input=predict, label=label) @@ -60,89 +135,64 @@ def get_model(args, is_train, main_prog, startup_prog): batch_acc1 = fluid.layers.accuracy(input=predict, label=label, k=1) batch_acc5 = fluid.layers.accuracy(input=predict, label=label, k=5) - # configure optimize optimizer = None if is_train: - + start_lr = args.learning_rate + # n * worker * repeat + end_lr = args.learning_rate * trainer_count * args.multi_batch_repeat total_images = 1281167 / trainer_count - - step = int(total_images / (args.batch_size * args.gpus) + 1) - epochs = [30, 60, 90] + step = int(total_images / (args.batch_size * args.gpus * args.multi_batch_repeat) + 1) + warmup_steps = step * 5 # warmup 5 passes + epochs = [30, 60, 80] bd = [step * e for e in epochs] - base_lr = args.learning_rate + base_lr = end_lr lr = [] lr = [base_lr * (0.1**i) for i in range(len(bd) + 1)] + optimizer = fluid.optimizer.Momentum( - learning_rate=fluid.layers.piecewise_decay( - boundaries=bd, values=lr), + learning_rate=models.learning_rate.lr_warmup( + fluid.layers.piecewise_decay( + boundaries=bd, values=lr), + warmup_steps, start_lr, end_lr), momentum=0.9, regularization=fluid.regularizer.L2Decay(1e-4)) optimizer.minimize(avg_cost) - if args.memory_optimize: - fluid.memory_optimize(main_prog) - batched_reader = None pyreader.decorate_paddle_reader( paddle.batch( - reader if args.no_random else paddle.reader.shuffle( - reader, buf_size=5120), + reader, batch_size=args.batch_size)) return avg_cost, optimizer, [batch_acc1, batch_acc5], batched_reader, pyreader def append_nccl2_prepare(trainer_id, startup_prog): - if trainer_id >= 0: - # append gen_nccl_id at the end of startup program - trainer_id = int(os.getenv("PADDLE_TRAINER_ID")) - port = os.getenv("PADDLE_PSERVER_PORT") - worker_ips = os.getenv("PADDLE_TRAINER_IPS") - worker_endpoints = [] - for ip in worker_ips.split(","): - worker_endpoints.append(':'.join([ip, port])) - num_trainers = len(worker_endpoints) - current_endpoint = os.getenv("PADDLE_CURRENT_IP") + ":" + port - worker_endpoints.remove(current_endpoint) - - nccl_id_var = startup_prog.global_block().create_var( - name="NCCLID", - persistable=True, - type=fluid.core.VarDesc.VarType.RAW) - startup_prog.global_block().append_op( - type="gen_nccl_id", - inputs={}, - outputs={"NCCLID": nccl_id_var}, - attrs={ - "endpoint": current_endpoint, - "endpoint_list": worker_endpoints, - "trainer_id": trainer_id - }) - return nccl_id_var, num_trainers, trainer_id - else: - raise Exception("must set positive PADDLE_TRAINER_ID env variables for " - "nccl-based dist train.") + trainer_id = int(os.getenv("PADDLE_TRAINER_ID")) + port = os.getenv("PADDLE_PSERVER_PORT") + worker_ips = os.getenv("PADDLE_TRAINER_IPS") + worker_endpoints = [] + for ip in worker_ips.split(","): + worker_endpoints.append(':'.join([ip, port])) + current_endpoint = os.getenv("PADDLE_CURRENT_IP") + ":" + port + + config = fluid.DistributeTranspilerConfig() + config.mode = "nccl2" + t = fluid.DistributeTranspiler(config=config) + t.transpile(trainer_id, trainers=','.join(worker_endpoints), + current_endpoint=current_endpoint, + startup_program=startup_prog) def dist_transpile(trainer_id, args, train_prog, startup_prog): - if trainer_id < 0: - return None, None - - # the port of all pservers, needed by both trainer and pserver port = os.getenv("PADDLE_PSERVER_PORT", "6174") - # comma separated ips of all pservers, needed by trainer and - # pserver pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "") eplist = [] for ip in pserver_ips.split(","): eplist.append(':'.join([ip, port])) pserver_endpoints = ",".join(eplist) - # total number of workers/trainers in the job, needed by - # trainer and pserver trainers = int(os.getenv("PADDLE_TRAINERS")) - # the IP of the local machine, needed by pserver only current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port - # the role, should be either PSERVER or TRAINER training_role = os.getenv("PADDLE_TRAINING_ROLE") config = fluid.DistributeTranspilerConfig() @@ -150,8 +200,6 @@ def dist_transpile(trainer_id, args, train_prog, startup_prog): t = fluid.DistributeTranspiler(config=config) t.transpile( trainer_id, - # NOTE: *MUST* use train_prog, for we are using with guard to - # generate different program for train and test. program=train_prog, pservers=pserver_endpoints, trainers=trainers, @@ -171,7 +219,7 @@ def dist_transpile(trainer_id, args, train_prog, startup_prog): ) -def test_parallel(exe, test_args, args, test_prog, feeder): +def test_parallel(exe, test_args, args, test_prog): acc_evaluators = [] for i in six.moves.xrange(len(test_args[2])): acc_evaluators.append(fluid.metrics.Accuracy()) @@ -190,13 +238,10 @@ def test_parallel(exe, test_args, args, test_prog, feeder): return [e.eval() for e in acc_evaluators] - -# NOTE: only need to benchmark using parallelexe def train_parallel(train_args, test_args, args, train_prog, test_prog, startup_prog, nccl_id_var, num_trainers, trainer_id): over_all_start = time.time() place = core.CPUPlace() if args.device == 'CPU' else core.CUDAPlace(0) - feeder = None if nccl_id_var and trainer_id == 0: #FIXME(wuyi): wait other trainer to start listening @@ -237,31 +282,27 @@ def train_parallel(train_args, test_args, args, train_prog, test_prog, if args.update_method == "pserver": test_scope = None else: - # NOTE: use an empty scope to avoid test exe using NCCLID test_scope = fluid.Scope() test_exe = fluid.ParallelExecutor( - True, main_program=test_prog, share_vars_from=exe) + True, main_program=test_prog, share_vars_from=exe, + scope=test_scope) pyreader = train_args[4] for pass_id in range(args.pass_num): num_samples = 0 - iters = 0 start_time = time.time() batch_id = 0 pyreader.start() while True: - if iters == args.iterations: - break - - if iters == args.skip_batch_num: - start_time = time.time() - num_samples = 0 fetch_list = [avg_loss.name] acc_name_list = [v.name for v in train_args[2]] fetch_list.extend(acc_name_list) try: - fetch_ret = exe.run(fetch_list) + if batch_id % 30 == 0: + fetch_ret = exe.run(fetch_list) + else: + fetch_ret = exe.run([]) except fluid.core.EOFException as eof: break except fluid.core.EnforceNotMet as ex: @@ -269,20 +310,17 @@ def train_parallel(train_args, test_args, args, train_prog, test_prog, break num_samples += args.batch_size * args.gpus - iters += 1 - if batch_id % 1 == 0: + if batch_id % 30 == 0: fetched_data = [np.mean(np.array(d)) for d in fetch_ret] print("Pass %d, batch %d, loss %s, accucacys: %s" % (pass_id, batch_id, fetched_data[0], fetched_data[1:])) batch_id += 1 print_train_time(start_time, time.time(), num_samples) - pyreader.reset() # reset reader handle + pyreader.reset() if not args.no_test and test_args[2]: - test_feeder = None - test_ret = test_parallel(test_exe, test_args, args, test_prog, - test_feeder) + test_ret = test_parallel(test_exe, test_args, args, test_prog) print("Pass: %d, Test Accuracy: %s\n" % (pass_id, [np.mean(np.array(v)) for v in test_ret])) @@ -316,8 +354,6 @@ def main(): args = parse_args() print_arguments(args) print_paddle_envs() - if args.no_random: - fluid.default_startup_program().random_seed = 1 # the unique trainer id, starting from 0, needed by trainer # only diff --git a/fluid/image_classification/models/__init__.py b/fluid/image_classification/models/__init__.py index 34134fd01e8e5db99abdc375bc1aa94dcfe8b567..f43275b6c674e4d9772e480bd9ec480c75c447d1 100644 --- a/fluid/image_classification/models/__init__.py +++ b/fluid/image_classification/models/__init__.py @@ -3,6 +3,8 @@ from .mobilenet import MobileNet from .googlenet import GoogleNet from .vgg import VGG11, VGG13, VGG16, VGG19 from .resnet import ResNet50, ResNet101, ResNet152 +from .resnet_dist import DistResNet from .inception_v4 import InceptionV4 from .se_resnext import SE_ResNeXt50_32x4d, SE_ResNeXt101_32x4d, SE_ResNeXt152_32x4d from .dpn import DPN68, DPN92, DPN98, DPN107, DPN131 +import learning_rate diff --git a/fluid/image_classification/models/learning_rate.py b/fluid/image_classification/models/learning_rate.py index a385a931c0cfc68abd9ee61c009226e14b0be8b4..01922eb3a490320c58a6a1a68da7c1479882379b 100644 --- a/fluid/image_classification/models/learning_rate.py +++ b/fluid/image_classification/models/learning_rate.py @@ -20,3 +20,31 @@ def cosine_decay(learning_rate, step_each_epoch, epochs=120): decayed_lr = learning_rate * \ (ops.cos(epoch * (math.pi / epochs)) + 1)/2 return decayed_lr + + +def lr_warmup(learning_rate, warmup_steps, start_lr, end_lr): + """ Applies linear learning rate warmup for distributed training + Argument learning_rate can be float or a Variable + lr = lr + (warmup_rate * step / warmup_steps) + """ + assert(isinstance(end_lr, float)) + assert(isinstance(start_lr, float)) + linear_step = end_lr - start_lr + with fluid.default_main_program()._lr_schedule_guard(): + lr = fluid.layers.tensor.create_global_var( + shape=[1], + value=0.0, + dtype='float32', + persistable=True, + name="learning_rate_warmup") + + global_step = fluid.layers.learning_rate_scheduler._decay_step_counter() + + with fluid.layers.control_flow.Switch() as switch: + with switch.case(global_step < warmup_steps): + decayed_lr = start_lr + linear_step * (global_step / warmup_steps) + fluid.layers.tensor.assign(decayed_lr, lr) + with switch.default(): + fluid.layers.tensor.assign(learning_rate, lr) + + return lr \ No newline at end of file diff --git a/fluid/image_classification/models/resnet_dist.py b/fluid/image_classification/models/resnet_dist.py new file mode 100644 index 0000000000000000000000000000000000000000..2dab3e6111d8d02df44233c0440a8ea9ce74faa5 --- /dev/null +++ b/fluid/image_classification/models/resnet_dist.py @@ -0,0 +1,121 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +import paddle +import paddle.fluid as fluid +import math + +__all__ = ["DistResNet"] + +train_parameters = { + "input_size": [3, 224, 224], + "input_mean": [0.485, 0.456, 0.406], + "input_std": [0.229, 0.224, 0.225], + "learning_strategy": { + "name": "piecewise_decay", + "batch_size": 256, + "epochs": [30, 60, 90], + "steps": [0.1, 0.01, 0.001, 0.0001] + } +} + + +class DistResNet(): + def __init__(self, layers=50, is_train=True): + self.params = train_parameters + self.layers = layers + self.is_train = is_train + self.weight_decay = 1e-4 + + def net(self, input, class_dim=1000): + layers = self.layers + supported_layers = [50, 101, 152] + assert layers in supported_layers, \ + "supported layers are {} but input layer is {}".format(supported_layers, layers) + + if layers == 50: + depth = [3, 4, 6, 3] + elif layers == 101: + depth = [3, 4, 23, 3] + elif layers == 152: + depth = [3, 8, 36, 3] + num_filters = [64, 128, 256, 512] + + conv = self.conv_bn_layer( + input=input, num_filters=64, filter_size=7, stride=2, act='relu') + conv = fluid.layers.pool2d( + input=conv, + pool_size=3, + pool_stride=2, + pool_padding=1, + pool_type='max') + + for block in range(len(depth)): + for i in range(depth[block]): + conv = self.bottleneck_block( + input=conv, + num_filters=num_filters[block], + stride=2 if i == 0 and block != 0 else 1) + + pool = fluid.layers.pool2d( + input=conv, pool_size=7, pool_type='avg', global_pooling=True) + stdv = 1.0 / math.sqrt(pool.shape[1] * 1.0) + out = fluid.layers.fc(input=pool, + size=class_dim, + act='softmax', + param_attr=fluid.param_attr.ParamAttr( + initializer=fluid.initializer.Uniform(-stdv, + stdv), + regularizer=fluid.regularizer.L2Decay(self.weight_decay)), + bias_attr=fluid.ParamAttr( + regularizer=fluid.regularizer.L2Decay(self.weight_decay)) + ) + return out + + def conv_bn_layer(self, + input, + num_filters, + filter_size, + stride=1, + groups=1, + act=None, + bn_init_value=1.0): + conv = fluid.layers.conv2d( + input=input, + num_filters=num_filters, + filter_size=filter_size, + stride=stride, + padding=(filter_size - 1) // 2, + groups=groups, + act=None, + bias_attr=False, + param_attr=fluid.ParamAttr(regularizer=fluid.regularizer.L2Decay(self.weight_decay))) + return fluid.layers.batch_norm( + input=conv, act=act, is_test=not self.is_train, + param_attr=fluid.ParamAttr( + initializer=fluid.initializer.Constant(bn_init_value), + regularizer=None)) + + def shortcut(self, input, ch_out, stride): + ch_in = input.shape[1] + if ch_in != ch_out or stride != 1: + return self.conv_bn_layer(input, ch_out, 1, stride) + else: + return input + + def bottleneck_block(self, input, num_filters, stride): + conv0 = self.conv_bn_layer( + input=input, num_filters=num_filters, filter_size=1, act='relu') + conv1 = self.conv_bn_layer( + input=conv0, + num_filters=num_filters, + filter_size=3, + stride=stride, + act='relu') + # NOTE: default bias is 0.0 already + conv2 = self.conv_bn_layer( + input=conv1, num_filters=num_filters * 4, filter_size=1, act=None, bn_init_value=0.0) + + short = self.shortcut(input, num_filters * 4, stride) + + return fluid.layers.elementwise_add(x=short, y=conv2, act='relu') diff --git a/fluid/machine_reading_comprehension/dataset.py b/fluid/machine_reading_comprehension/dataset.py index 7a5cea1860745e63e15727bb0cc45733b4e2c8fa..7e50b7e0e2996b58e56e8e725549502bb1dfbe52 100644 --- a/fluid/machine_reading_comprehension/dataset.py +++ b/fluid/machine_reading_comprehension/dataset.py @@ -132,61 +132,44 @@ class BRCDataset(object): 'passage_token_ids': [], 'passage_length': [], 'start_id': [], - 'end_id': [] + 'end_id': [], + 'passage_num': [] } max_passage_num = max( [len(sample['passages']) for sample in batch_data['raw_data']]) - #max_passage_num = min(self.max_p_num, max_passage_num) - max_passage_num = self.max_p_num + max_passage_num = min(self.max_p_num, max_passage_num) for sidx, sample in enumerate(batch_data['raw_data']): + count = 0 for pidx in range(max_passage_num): if pidx < len(sample['passages']): + count += 1 batch_data['question_token_ids'].append(sample[ - 'question_token_ids']) + 'question_token_ids'][0:self.max_q_len]) batch_data['question_length'].append( - len(sample['question_token_ids'])) + min(len(sample['question_token_ids']), self.max_q_len)) passage_token_ids = sample['passages'][pidx][ - 'passage_token_ids'] + 'passage_token_ids'][0:self.max_p_len] batch_data['passage_token_ids'].append(passage_token_ids) batch_data['passage_length'].append( min(len(passage_token_ids), self.max_p_len)) - else: - batch_data['question_token_ids'].append([]) - batch_data['question_length'].append(0) - batch_data['passage_token_ids'].append([]) - batch_data['passage_length'].append(0) - batch_data, padded_p_len, padded_q_len = self._dynamic_padding( - batch_data, pad_id) - for sample in batch_data['raw_data']: + # record the start passage index of current doc + passade_idx_offset = sum(batch_data['passage_num']) + batch_data['passage_num'].append(count) + gold_passage_offset = 0 if 'answer_passages' in sample and len(sample['answer_passages']): - gold_passage_offset = padded_p_len * sample['answer_passages'][ - 0] - batch_data['start_id'].append(gold_passage_offset + sample[ - 'answer_spans'][0][0]) - batch_data['end_id'].append(gold_passage_offset + sample[ - 'answer_spans'][0][1]) + for i in range(sample['answer_passages'][0]): + gold_passage_offset += len(batch_data['passage_token_ids'][ + passade_idx_offset + i]) + start_id = min(sample['answer_spans'][0][0], self.max_p_len) + end_id = min(sample['answer_spans'][0][1], self.max_p_len) + batch_data['start_id'].append(gold_passage_offset + start_id) + batch_data['end_id'].append(gold_passage_offset + end_id) else: # fake span for some samples, only valid for testing batch_data['start_id'].append(0) batch_data['end_id'].append(0) return batch_data - def _dynamic_padding(self, batch_data, pad_id): - """ - Dynamically pads the batch_data with pad_id - """ - pad_p_len = min(self.max_p_len, max(batch_data['passage_length'])) - pad_q_len = min(self.max_q_len, max(batch_data['question_length'])) - batch_data['passage_token_ids'] = [ - (ids + [pad_id] * (pad_p_len - len(ids)))[:pad_p_len] - for ids in batch_data['passage_token_ids'] - ] - batch_data['question_token_ids'] = [ - (ids + [pad_id] * (pad_q_len - len(ids)))[:pad_q_len] - for ids in batch_data['question_token_ids'] - ] - return batch_data, pad_p_len, pad_q_len - def word_iter(self, set_name=None): """ Iterates over all the words in the dataset diff --git a/fluid/machine_reading_comprehension/rc_model.py b/fluid/machine_reading_comprehension/rc_model.py index 11d5b5d91d734a82d687a09b587ad614d0f03fff..932ccd9cafd1772a2b7f4e20867aa51eee74c9c5 100644 --- a/fluid/machine_reading_comprehension/rc_model.py +++ b/fluid/machine_reading_comprehension/rc_model.py @@ -68,16 +68,23 @@ def bi_lstm_encoder(input_seq, gate_size, para_name, args): return encoder_out -def encoder(input_name, para_name, shape, hidden_size, args): +def get_data(input_name, lod_level, args): input_ids = layers.data( - name=input_name, shape=[1], dtype='int64', lod_level=1) + name=input_name, shape=[1], dtype='int64', lod_level=lod_level) + return input_ids + + +def embedding(input_ids, shape, args): input_embedding = layers.embedding( input=input_ids, size=shape, dtype='float32', is_sparse=True, param_attr=fluid.ParamAttr(name='embedding_para')) + return input_embedding + +def encoder(input_embedding, para_name, hidden_size, args): encoder_out = bi_lstm_encoder( input_seq=input_embedding, gate_size=hidden_size, @@ -259,40 +266,41 @@ def fusion(g, args): def rc_model(hidden_size, vocab, args): emb_shape = [vocab.size(), vocab.embed_dim] + start_labels = layers.data( + name="start_lables", shape=[1], dtype='float32', lod_level=1) + end_labels = layers.data( + name="end_lables", shape=[1], dtype='float32', lod_level=1) + # stage 1:encode - p_ids_names = [] - q_ids_names = [] - ms = [] - gs = [] - qs = [] - for i in range(args.doc_num): - p_ids_name = "pids_%d" % i - p_ids_names.append(p_ids_name) - p_enc_i = encoder(p_ids_name, 'p_enc', emb_shape, hidden_size, args) - - q_ids_name = "qids_%d" % i - q_ids_names.append(q_ids_name) - q_enc_i = encoder(q_ids_name, 'q_enc', emb_shape, hidden_size, args) + q_id0 = get_data('q_id0', 1, args) + + q_ids = get_data('q_ids', 2, args) + p_ids_name = 'p_ids' + + p_ids = get_data('p_ids', 2, args) + p_embs = embedding(p_ids, emb_shape, args) + q_embs = embedding(q_ids, emb_shape, args) + drnn = layers.DynamicRNN() + with drnn.block(): + p_emb = drnn.step_input(p_embs) + q_emb = drnn.step_input(q_embs) + + p_enc = encoder(p_emb, 'p_enc', hidden_size, args) + q_enc = encoder(q_emb, 'q_enc', hidden_size, args) # stage 2:match - g_i = attn_flow(q_enc_i, p_enc_i, p_ids_name, args) + g_i = attn_flow(q_enc, p_enc, p_ids_name, args) # stage 3:fusion m_i = fusion(g_i, args) - ms.append(m_i) - gs.append(g_i) - qs.append(q_enc_i) - m = layers.sequence_concat(input=ms) - g = layers.sequence_concat(input=gs) - q_vec = layers.sequence_concat(input=qs) + drnn.output(m_i, q_enc) + + ms, q_encs = drnn() + p_vec = layers.lod_reset(x=ms, y=start_labels) + q_vec = layers.lod_reset(x=q_encs, y=q_id0) # stage 4:decode start_probs, end_probs = point_network_decoder( - p_vec=m, q_vec=q_vec, hidden_size=hidden_size, args=args) - - start_labels = layers.data( - name="start_lables", shape=[1], dtype='float32', lod_level=1) - end_labels = layers.data( - name="end_lables", shape=[1], dtype='float32', lod_level=1) + p_vec=p_vec, q_vec=q_vec, hidden_size=hidden_size, args=args) cost0 = layers.sequence_pool( layers.cross_entropy( @@ -308,5 +316,5 @@ def rc_model(hidden_size, vocab, args): cost = cost0 + cost1 cost.persistable = True - feeding_list = q_ids_names + ["start_lables", "end_lables"] + p_ids_names - return cost, start_probs, end_probs, feeding_list + feeding_list = ["q_ids", "start_lables", "end_lables", "p_ids", "q_id0"] + return cost, start_probs, end_probs, ms, feeding_list diff --git a/fluid/machine_reading_comprehension/run.py b/fluid/machine_reading_comprehension/run.py index 1b68d79f284104097da6b208aee4d763c4f3dcbc..0ab05b9052c8d54d036d625ccd05f743ae47781a 100644 --- a/fluid/machine_reading_comprehension/run.py +++ b/fluid/machine_reading_comprehension/run.py @@ -46,22 +46,32 @@ from vocab import Vocab def prepare_batch_input(insts, args): - doc_num = args.doc_num - batch_size = len(insts['raw_data']) + inst_num = len(insts['passage_num']) + if batch_size != inst_num: + print("data error %d, %d" % (batch_size, inst_num)) + return None new_insts = [] + passage_idx = 0 for i in range(batch_size): + p_len = 0 p_id = [] - q_id = [] p_ids = [] q_ids = [] - p_len = 0 - for j in range(i * doc_num, (i + 1) * doc_num): - p_ids.append(insts['passage_token_ids'][j]) - p_id = p_id + insts['passage_token_ids'][j] - q_ids.append(insts['question_token_ids'][j]) - q_id = q_id + insts['question_token_ids'][j] + q_id = [] + p_id_r = [] + p_ids_r = [] + q_ids_r = [] + q_id_r = [] + + for j in range(insts['passage_num'][i]): + p_ids.append(insts['passage_token_ids'][passage_idx + j]) + p_id = p_id + insts['passage_token_ids'][passage_idx + j] + q_ids.append(insts['question_token_ids'][passage_idx + j]) + q_id = q_id + insts['question_token_ids'][passage_idx + j] + + passage_idx += insts['passage_num'][i] p_len = len(p_id) def _get_label(idx, ref_len): @@ -72,11 +82,46 @@ def prepare_batch_input(insts, args): start_label = _get_label(insts['start_id'][i], p_len) end_label = _get_label(insts['end_id'][i], p_len) - new_inst = q_ids + [start_label, end_label] + p_ids + new_inst = [q_ids, start_label, end_label, p_ids, q_id] new_insts.append(new_inst) return new_insts +def batch_reader(batch_list, args): + res = [] + for batch in batch_list: + res.append(prepare_batch_input(batch, args)) + return res + + +def read_multiple(reader, count, clip_last=True): + """ + Stack data from reader for multi-devices. + """ + + def __impl__(): + res = [] + for item in reader(): + res.append(item) + if len(res) == count: + yield res + res = [] + if len(res) == count: + yield res + elif not clip_last: + data = [] + for item in res: + data += item + if len(data) > count: + inst_num_per_part = len(data) // count + yield [ + data[inst_num_per_part * i:inst_num_per_part * (i + 1)] + for i in range(count) + ] + + return __impl__ + + def LodTensor_Array(lod_tensor): lod = lod_tensor.lod() array = np.array(lod_tensor) @@ -103,7 +148,7 @@ def print_para(train_prog, train_exe, logger, args): logger.info("total param num: {0}".format(num_sum)) -def find_best_answer_for_passage(start_probs, end_probs, passage_len, args): +def find_best_answer_for_passage(start_probs, end_probs, passage_len): """ Finds the best answer with the maximum start_prob * end_prob from a single passage """ @@ -125,7 +170,7 @@ def find_best_answer_for_passage(start_probs, end_probs, passage_len, args): return (best_start, best_end), max_prob -def find_best_answer(sample, start_prob, end_prob, padded_p_len, args): +def find_best_answer_for_inst(sample, start_prob, end_prob, inst_lod): """ Finds the best answer for a sample given start_prob and end_prob for each position. This will call find_best_answer_for_passage because there are multiple passages in a sample @@ -134,11 +179,16 @@ def find_best_answer(sample, start_prob, end_prob, padded_p_len, args): for p_idx, passage in enumerate(sample['passages']): if p_idx >= args.max_p_num: continue + if len(start_prob) != len(end_prob): + logger.info('error: {}'.format(sample['question'])) + continue + passage_start = inst_lod[p_idx] - inst_lod[0] + passage_end = inst_lod[p_idx + 1] - inst_lod[0] + passage_len = passage_end - passage_start passage_len = min(args.max_p_len, len(passage['passage_tokens'])) answer_span, score = find_best_answer_for_passage( - start_prob[p_idx * padded_p_len:(p_idx + 1) * padded_p_len], - end_prob[p_idx * padded_p_len:(p_idx + 1) * padded_p_len], - passage_len, args) + start_prob[passage_start:passage_end], + end_prob[passage_start:passage_end], passage_len) if score > best_score: best_score = score best_p_idx = p_idx @@ -148,11 +198,11 @@ def find_best_answer(sample, start_prob, end_prob, padded_p_len, args): else: best_answer = ''.join(sample['passages'][best_p_idx]['passage_tokens'][ best_span[0]:best_span[1] + 1]) - return best_answer + return best_answer, best_span -def validation(inference_program, avg_cost, s_probs, e_probs, feed_order, place, - vocab, brc_data, logger, args): +def validation(inference_program, avg_cost, s_probs, e_probs, match, feed_order, + place, dev_count, vocab, brc_data, logger, args): """ """ @@ -165,6 +215,8 @@ def validation(inference_program, avg_cost, s_probs, e_probs, feed_order, place, # Use test set as validation each pass total_loss = 0.0 count = 0 + n_batch_cnt = 0 + n_batch_loss = 0.0 pred_answers, ref_answers = [], [] val_feed_list = [ inference_program.global_block().var(var_name) @@ -172,55 +224,80 @@ def validation(inference_program, avg_cost, s_probs, e_probs, feed_order, place, ] val_feeder = fluid.DataFeeder(val_feed_list, place) pad_id = vocab.get_id(vocab.pad_token) - dev_batches = brc_data.gen_mini_batches( - 'dev', args.batch_size, pad_id, shuffle=False) + dev_reader = lambda:brc_data.gen_mini_batches('dev', args.batch_size, pad_id, shuffle=False) + dev_reader = read_multiple(dev_reader, dev_count) - for batch_id, batch in enumerate(dev_batches, 1): - feed_data = prepare_batch_input(batch, args) + for batch_id, batch_list in enumerate(dev_reader(), 1): + feed_data = batch_reader(batch_list, args) val_fetch_outs = parallel_executor.run( - feed=val_feeder.feed(feed_data), - fetch_list=[avg_cost.name, s_probs.name, e_probs.name], + feed=list(val_feeder.feed_parallel(feed_data, dev_count)), + fetch_list=[avg_cost.name, s_probs.name, e_probs.name, match.name], return_numpy=False) - - total_loss += np.array(val_fetch_outs[0])[0] - - start_probs = LodTensor_Array(val_fetch_outs[1]) - end_probs = LodTensor_Array(val_fetch_outs[2]) - count += len(batch['raw_data']) - - padded_p_len = len(batch['passage_token_ids'][0]) - for sample, start_prob, end_prob in zip(batch['raw_data'], start_probs, - end_probs): - - best_answer = find_best_answer(sample, start_prob, end_prob, - padded_p_len, args) - pred_answers.append({ - 'question_id': sample['question_id'], - 'question_type': sample['question_type'], - 'answers': [best_answer], - 'entity_answers': [[]], - 'yesno_answers': [] - }) - if 'answers' in sample: - ref_answers.append({ + total_loss += np.array(val_fetch_outs[0]).sum() + start_probs_m = LodTensor_Array(val_fetch_outs[1]) + end_probs_m = LodTensor_Array(val_fetch_outs[2]) + match_lod = val_fetch_outs[3].lod() + count += len(np.array(val_fetch_outs[0])) + + n_batch_cnt += len(np.array(val_fetch_outs[0])) + n_batch_loss += np.array(val_fetch_outs[0]).sum() + log_every_n_batch = args.log_interval + if log_every_n_batch > 0 and batch_id % log_every_n_batch == 0: + logger.info('Average dev loss from batch {} to {} is {}'.format( + batch_id - log_every_n_batch + 1, batch_id, "%.10f" % ( + n_batch_loss / n_batch_cnt))) + n_batch_loss = 0.0 + n_batch_cnt = 0 + + for idx, batch in enumerate(batch_list): + #one batch + batch_size = len(batch['raw_data']) + batch_range = match_lod[0][idx * batch_size:(idx + 1) * batch_size + + 1] + batch_lod = [[batch_range[x], batch_range[x + 1]] + for x in range(len(batch_range[:-1]))] + start_prob_batch = start_probs_m[idx * batch_size:(idx + 1) * + batch_size] + end_prob_batch = end_probs_m[idx * batch_size:(idx + 1) * + batch_size] + for sample, start_prob_inst, end_prob_inst, inst_range in zip( + batch['raw_data'], start_prob_batch, end_prob_batch, + batch_lod): + #one instance + inst_lod = match_lod[1][inst_range[0]:inst_range[1] + 1] + best_answer, best_span = find_best_answer_for_inst( + sample, start_prob_inst, end_prob_inst, inst_lod) + pred = { 'question_id': sample['question_id'], 'question_type': sample['question_type'], - 'answers': sample['answers'], + 'answers': [best_answer], 'entity_answers': [[]], - 'yesno_answers': [] - }) - if args.result_dir is not None and args.result_name is not None: + 'yesno_answers': [best_span] + } + pred_answers.append(pred) + if 'answers' in sample: + ref = { + 'question_id': sample['question_id'], + 'question_type': sample['question_type'], + 'answers': sample['answers'], + 'entity_answers': [[]], + 'yesno_answers': [] + } + ref_answers.append(ref) + + result_dir = args.result_dir + result_prefix = args.result_name + if result_dir is not None and result_prefix is not None: if not os.path.exists(args.result_dir): os.makedirs(args.result_dir) - result_file = os.path.join(args.result_dir, args.result_name + '.json') + result_file = os.path.join(result_dir, result_prefix + 'json') with open(result_file, 'w') as fout: for pred_answer in pred_answers: fout.write(json.dumps(pred_answer, ensure_ascii=False) + '\n') - logger.info('Saving {} results to {}'.format(args.result_name, + logger.info('Saving {} results to {}'.format(result_prefix, result_file)) ave_loss = 1.0 * total_loss / count - # compute the bleu and rouge scores if reference answers is provided if len(ref_answers) > 0: pred_dict, ref_dict = {}, {} @@ -250,6 +327,13 @@ def train(logger, args): brc_data.convert_to_ids(vocab) logger.info('Initialize the model...') + if not args.use_gpu: + place = fluid.CPUPlace() + dev_count = int(os.environ.get('CPU_NUM', multiprocessing.cpu_count())) + else: + place = fluid.CUDAPlace(0) + dev_count = fluid.core.get_cuda_device_count() + # build model main_program = fluid.Program() startup_prog = fluid.Program() @@ -257,7 +341,7 @@ def train(logger, args): startup_prog.random_seed = args.random_seed with fluid.program_guard(main_program, startup_prog): with fluid.unique_name.guard(): - avg_cost, s_probs, e_probs, feed_order = rc_model.rc_model( + avg_cost, s_probs, e_probs, match, feed_order = rc_model.rc_model( args.hidden_size, vocab, args) # clone from default main program and use it as the validation program inference_program = main_program.clone(for_test=True) @@ -314,20 +398,21 @@ def train(logger, args): for pass_id in range(1, args.pass_num + 1): pass_start_time = time.time() pad_id = vocab.get_id(vocab.pad_token) - train_batches = brc_data.gen_mini_batches( - 'train', args.batch_size, pad_id, shuffle=True) + train_reader = lambda:brc_data.gen_mini_batches('train', args.batch_size, pad_id, shuffle=False) + train_reader = read_multiple(train_reader, dev_count) log_every_n_batch, n_batch_loss = args.log_interval, 0 total_num, total_loss = 0, 0 - for batch_id, batch in enumerate(train_batches, 1): - input_data_dict = prepare_batch_input(batch, args) + for batch_id, batch_list in enumerate(train_reader(), 1): + feed_data = batch_reader(batch_list, args) fetch_outs = parallel_executor.run( - feed=feeder.feed(input_data_dict), + feed=list(feeder.feed_parallel(feed_data, dev_count)), fetch_list=[avg_cost.name], return_numpy=False) - cost_train = np.array(fetch_outs[0])[0] - total_num += len(batch['raw_data']) + cost_train = np.array(fetch_outs[0]).mean() + total_num += args.batch_size * dev_count n_batch_loss += cost_train - total_loss += cost_train * len(batch['raw_data']) + total_loss += cost_train * args.batch_size * dev_count + if log_every_n_batch > 0 and batch_id % log_every_n_batch == 0: print_para(main_program, parallel_executor, logger, args) @@ -337,19 +422,23 @@ def train(logger, args): "%.10f" % (n_batch_loss / log_every_n_batch))) n_batch_loss = 0 if args.dev_interval > 0 and batch_id % args.dev_interval == 0: - eval_loss, bleu_rouge = validation( - inference_program, avg_cost, s_probs, e_probs, - feed_order, place, vocab, brc_data, logger, args) - logger.info('Dev eval loss {}'.format(eval_loss)) - logger.info('Dev eval result: {}'.format(bleu_rouge)) + if brc_data.dev_set is not None: + eval_loss, bleu_rouge = validation( + inference_program, avg_cost, s_probs, e_probs, + match, feed_order, place, dev_count, vocab, + brc_data, logger, args) + logger.info('Dev eval loss {}'.format(eval_loss)) + logger.info('Dev eval result: {}'.format( + bleu_rouge)) pass_end_time = time.time() logger.info('Evaluating the model after epoch {}'.format( pass_id)) if brc_data.dev_set is not None: eval_loss, bleu_rouge = validation( - inference_program, avg_cost, s_probs, e_probs, - feed_order, place, vocab, brc_data, logger, args) + inference_program, avg_cost, s_probs, e_probs, match, + feed_order, place, dev_count, vocab, brc_data, logger, + args) logger.info('Dev eval loss {}'.format(eval_loss)) logger.info('Dev eval result: {}'.format(bleu_rouge)) else: @@ -389,10 +478,17 @@ def evaluate(logger, args): startup_prog.random_seed = args.random_seed with fluid.program_guard(main_program, startup_prog): with fluid.unique_name.guard(): - avg_cost, s_probs, e_probs, feed_order = rc_model.rc_model( + avg_cost, s_probs, e_probs, match, feed_order = rc_model.rc_model( args.hidden_size, vocab, args) # initialize parameters - place = core.CUDAPlace(0) if args.use_gpu else core.CPUPlace() + if not args.use_gpu: + place = fluid.CPUPlace() + dev_count = int( + os.environ.get('CPU_NUM', multiprocessing.cpu_count())) + else: + place = fluid.CUDAPlace(0) + dev_count = fluid.core.get_cuda_device_count() + exe = Executor(place) if args.load_dir: logger.info('load from {}'.format(args.load_dir)) @@ -402,17 +498,10 @@ def evaluate(logger, args): logger.error('No model file to load ...') return - # prepare data - feed_list = [ - main_program.global_block().var(var_name) - for var_name in feed_order - ] - feeder = fluid.DataFeeder(feed_list, place) - inference_program = main_program.clone(for_test=True) eval_loss, bleu_rouge = validation( inference_program, avg_cost, s_probs, e_probs, feed_order, - place, vocab, brc_data, logger, args) + place, dev_count, vocab, brc_data, logger, args) logger.info('Dev eval loss {}'.format(eval_loss)) logger.info('Dev eval result: {}'.format(bleu_rouge)) logger.info('Predicted answers are saved to {}'.format( @@ -438,10 +527,17 @@ def predict(logger, args): startup_prog.random_seed = args.random_seed with fluid.program_guard(main_program, startup_prog): with fluid.unique_name.guard(): - avg_cost, s_probs, e_probs, feed_order = rc_model.rc_model( + avg_cost, s_probs, e_probs, match, feed_order = rc_model.rc_model( args.hidden_size, vocab, args) # initialize parameters - place = core.CUDAPlace(0) if args.use_gpu else core.CPUPlace() + if not args.use_gpu: + place = fluid.CPUPlace() + dev_count = int( + os.environ.get('CPU_NUM', multiprocessing.cpu_count())) + else: + place = fluid.CUDAPlace(0) + dev_count = fluid.core.get_cuda_device_count() + exe = Executor(place) if args.load_dir: logger.info('load from {}'.format(args.load_dir)) @@ -451,17 +547,10 @@ def predict(logger, args): logger.error('No model file to load ...') return - # prepare data - feed_list = [ - main_program.global_block().var(var_name) - for var_name in feed_order - ] - feeder = fluid.DataFeeder(feed_list, place) - inference_program = main_program.clone(for_test=True) eval_loss, bleu_rouge = validation( - inference_program, avg_cost, s_probs, e_probs, feed_order, - place, vocab, brc_data, logger, args) + inference_program, avg_cost, s_probs, e_probs, match, + feed_order, place, dev_count, vocab, brc_data, logger, args) def prepare(logger, args): diff --git a/fluid/machine_reading_comprehension/run.sh b/fluid/machine_reading_comprehension/run.sh index 4bcab2beecba4a7951f09ee500f40fa947738365..a8241d05c2b6dca7915f2f6164f26e3e5938ab73 100644 --- a/fluid/machine_reading_comprehension/run.sh +++ b/fluid/machine_reading_comprehension/run.sh @@ -1,4 +1,4 @@ -export CUDA_VISIBLE_DEVICES=1 +export CUDA_VISIBLE_DEVICES=0 python run.py \ --trainset 'data/preprocessed/trainset/search.train.json' \ 'data/preprocessed/trainset/zhidao.train.json' \ @@ -11,11 +11,12 @@ python run.py \ --save_dir ./models \ --pass_num 10 \ --learning_rate 0.001 \ ---batch_size 8 \ +--batch_size 32 \ --embed_size 300 \ --hidden_size 150 \ --max_p_num 5 \ --max_p_len 500 \ --max_q_len 60 \ --max_a_len 200 \ +--weight_decay 0.0 \ --drop_rate 0.2 $@\