From 54ada9449ea6b9aa60faa58f4894c5533e9df399 Mon Sep 17 00:00:00 2001 From: Yu Yang Date: Tue, 24 Apr 2018 12:53:39 +0800 Subject: [PATCH] Add demo for recordio train/test and parallel executor --- .../details/threaded_ssa_graph_executor.cc | 4 +- paddle/fluid/framework/parallel_executor.cc | 2 +- .../reader/create_threaded_reader_op.cc | 21 +-- python/paddle/fluid/layers/io.py | 10 +- .../tests/demo/text_classification/.gitignore | 1 + .../convert_data_to_recordio.py | 59 +++++++ .../tests/demo/text_classification/train.py | 148 ++++++++++++++++++ 7 files changed, 220 insertions(+), 25 deletions(-) create mode 100644 python/paddle/fluid/tests/demo/text_classification/.gitignore create mode 100644 python/paddle/fluid/tests/demo/text_classification/convert_data_to_recordio.py create mode 100644 python/paddle/fluid/tests/demo/text_classification/train.py diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc index 3b7d6160730..5e6ed5cb7cd 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc @@ -140,7 +140,9 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( if (timeout) { if (exception_) { - throw * exception_; + auto exp = *exception_; + exception_.reset(); + throw exp; } else { continue; } diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index a673fa52880..de644e85199 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -74,7 +74,7 @@ ParallelExecutor::ParallelExecutor( member_->own_local_scope = false; PADDLE_ENFORCE_EQ(member_->places_.size(), local_scopes.size()); for (size_t i = 0; i < member_->places_.size(); ++i) { - member_->local_scopes_.emplace_back(local_scopes[i]); + member_->local_scopes_.emplace_back(&local_scopes[i]->NewScope()); } } diff --git a/paddle/fluid/operators/reader/create_threaded_reader_op.cc b/paddle/fluid/operators/reader/create_threaded_reader_op.cc index cbf709d5e73..1cb9bd36455 100644 --- a/paddle/fluid/operators/reader/create_threaded_reader_op.cc +++ b/paddle/fluid/operators/reader/create_threaded_reader_op.cc @@ -21,26 +21,16 @@ namespace reader { class ThreadedReader : public framework::DecoratedReader { public: - ThreadedReader(ReaderBase* reader, bool safe_mode) - : DecoratedReader(reader), safe_mode_(safe_mode) {} + explicit ThreadedReader(ReaderBase* reader) : DecoratedReader(reader) {} void ReadNext(std::vector* out) override { std::lock_guard lock(mutex_); reader_->ReadNext(out); } - void ReInit() override { - if (safe_mode_) { - PADDLE_THROW( - "ThreadedReader::ReInit() is disabled when 'safe_mode' is true."); - } - VLOG(5) << "ThreadedReader::ReInit() is invoked! It might be buggy in " - "multi-thread environment."; - reader_->ReInit(); - } + void ReInit() override { reader_->ReInit(); } private: - bool safe_mode_; std::mutex mutex_; }; @@ -58,8 +48,7 @@ class CreateThreadedReaderOp : public framework::OperatorBase { } const auto& underlying_reader = scope.FindVar(Input("UnderlyingReader")) ->Get(); - bool safe_mode = Attr("safe_mode"); - out->Reset(new ThreadedReader(underlying_reader.Get(), safe_mode)); + out->Reset(new ThreadedReader(underlying_reader.Get())); } }; @@ -67,10 +56,6 @@ class CreateThreadedReaderOpMaker : public DecoratedReaderMakerBase { public: CreateThreadedReaderOpMaker(OpProto* op_proto, OpAttrChecker* op_checker) : DecoratedReaderMakerBase(op_proto, op_checker) { - AddAttr("safe_mode", - "When 'safe_mode' is true, 'ReInit()' is disabled to avoid " - "unexpected bugs in multi-thread environment.") - .SetDefault(true); AddComment(R"DOC( CreateThreadedReader Operator diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index 34382fb9fec..cc71c2136a6 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -457,8 +457,8 @@ def __create_shared_decorated_reader__(op_type, reader, attrs): return monkey_patch_reader_methods(main_prog_var) -def __create_unshared_decorated_reader__(op_type, reader, attrs): - new_reader_name = unique_name(op_type) +def __create_unshared_decorated_reader__(op_type, reader, attrs, name=None): + new_reader_name = name if name is not None else unique_name(op_type) main_blk = default_main_program().current_block() new_reader = main_blk.create_var(name=new_reader_name) main_blk.append_op( @@ -481,12 +481,12 @@ def batch(reader, batch_size): 'create_batch_reader', reader, {'batch_size': int(batch_size)}) -def double_buffer(reader, place=None): +def double_buffer(reader, place=None, name=None): attrs = dict() if place is not None: attrs['place'] = str(place).upper() - return __create_unshared_decorated_reader__('create_double_buffer_reader', - reader, attrs) + return __create_unshared_decorated_reader__( + 'create_double_buffer_reader', reader, attrs, name=name) def multi_pass(reader, pass_num): diff --git a/python/paddle/fluid/tests/demo/text_classification/.gitignore b/python/paddle/fluid/tests/demo/text_classification/.gitignore new file mode 100644 index 00000000000..780d05b9466 --- /dev/null +++ b/python/paddle/fluid/tests/demo/text_classification/.gitignore @@ -0,0 +1 @@ +*.recordio diff --git a/python/paddle/fluid/tests/demo/text_classification/convert_data_to_recordio.py b/python/paddle/fluid/tests/demo/text_classification/convert_data_to_recordio.py new file mode 100644 index 00000000000..9425d472a48 --- /dev/null +++ b/python/paddle/fluid/tests/demo/text_classification/convert_data_to_recordio.py @@ -0,0 +1,59 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import paddle.fluid as fluid +import paddle.v2 as paddle + + +def load_vocab(filename): + """ + load vocabulary + """ + vocab = {} + with open(filename) as f: + wid = 0 + for line in f: + vocab[line.strip()] = wid + wid += 1 + return vocab + + +# load word dict with paddle inner function +word_dict = load_vocab(sys.argv[1]) +word_dict[""] = len(word_dict) +print "Dict dim = ", len(word_dict) + +# input text data +data = fluid.layers.data(name="words", shape=[1], dtype="int64", lod_level=1) + +# label data +label = fluid.layers.data(name="label", shape=[1], dtype="int64") +# like placeholder +feeder = fluid.DataFeeder(feed_list=[data, label], place=fluid.CPUPlace()) + +# train data set +BATCH_SIZE = 128 +train_reader = paddle.batch( + paddle.reader.shuffle( + paddle.dataset.imdb.train(word_dict), buf_size=10000), + batch_size=BATCH_SIZE) + +test_reader = paddle.batch( + paddle.dataset.imdb.test(word_dict), batch_size=BATCH_SIZE) + +fluid.recordio_writer.convert_reader_to_recordio_file( + "train.recordio", feeder=feeder, reader_creator=train_reader) +fluid.recordio_writer.convert_reader_to_recordio_file( + "test.recordio", feeder=feeder, reader_creator=test_reader) diff --git a/python/paddle/fluid/tests/demo/text_classification/train.py b/python/paddle/fluid/tests/demo/text_classification/train.py new file mode 100644 index 00000000000..e408684c6e0 --- /dev/null +++ b/python/paddle/fluid/tests/demo/text_classification/train.py @@ -0,0 +1,148 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle.fluid as fluid +import numpy +import sys + +TRAIN_FILES = ['train.recordio'] +TEST_FILES = ['test.recordio'] + +DICT_DIM = 89528 + +# embedding dim +emb_dim = 128 + +# hidden dim +hid_dim = 128 + +# hidden dim2 +hid_dim2 = 96 + +# class num +class_dim = 2 + + +def network_cfg(is_train, pass_num=100): + with fluid.unique_name.guard(): + train_file_obj = fluid.layers.open_files( + filenames=TRAIN_FILES, + pass_num=pass_num, + shapes=[[-1, 1], [-1, 1]], + lod_levels=[1, 0], + dtypes=['int64', 'int64'], + thread_num=1) + + test_file_obj = fluid.layers.open_files( + filenames=TEST_FILES, + pass_num=1, + shapes=[[-1, 1], [-1, 1]], + lod_levels=[1, 0], + dtypes=['int64', 'int64'], + thread_num=1) + + if is_train: + file_obj = fluid.layers.shuffle(train_file_obj, buffer_size=1000) + else: + file_obj = test_file_obj + + file_obj = fluid.layers.double_buffer( + file_obj, + name="train_double_buffer" if is_train else 'test_double_buffer') + + data, label = fluid.layers.read_file(file_obj) + + emb = fluid.layers.embedding(input=data, size=[DICT_DIM, emb_dim]) + + # sequence conv with window size = 3 + win_size = 3 + conv_3 = fluid.nets.sequence_conv_pool( + input=emb, + num_filters=hid_dim, + filter_size=win_size, + act="tanh", + pool_type="max") + + # fc layer after conv + fc_1 = fluid.layers.fc(input=[conv_3], size=hid_dim2) + + # probability of each class + prediction = fluid.layers.fc(input=[fc_1], + size=class_dim, + act="softmax") + # cross entropy loss + cost = fluid.layers.cross_entropy(input=prediction, label=label) + + # mean loss + avg_cost = fluid.layers.mean(x=cost) + acc = fluid.layers.accuracy(input=prediction, label=label) + + if is_train: + # SGD optimizer + sgd_optimizer = fluid.optimizer.Adagrad(learning_rate=0.01) + sgd_optimizer.minimize(avg_cost) + + return { + 'loss': avg_cost, + 'log': [avg_cost, acc], + 'file': train_file_obj if is_train else test_file_obj + } + + +def main(): + train = fluid.Program() + startup = fluid.Program() + + with fluid.program_guard(train, startup): + train_args = network_cfg(is_train=True) + + test = fluid.Program() + + with fluid.program_guard(test, fluid.Program()): + test_args = network_cfg(is_train=False) + + # startup + place = fluid.CUDAPlace(0) + exe = fluid.Executor(place=place) + exe.run(startup) + + train_exe = fluid.ParallelExecutor( + use_cuda=True, loss_name=train_args['loss'].name, main_program=train) + + fetch_var_list = [var.name for var in train_args['log']] + for i in xrange(sys.maxint): + result = map(numpy.array, + train_exe.run(fetch_list=fetch_var_list + if i % 1000 == 0 else [])) + if len(result) != 0: + print 'Train: ', result + + if i % 1000 == 0: + test_exe = fluid.ParallelExecutor( + use_cuda=True, main_program=test, share_vars_from=train_exe) + loss = [] + acc = [] + try: + while True: + loss_np, acc_np = map( + numpy.array, test_exe.run(fetch_list=fetch_var_list)) + loss.append(loss_np[0]) + acc.append(acc_np[0]) + except: + test_args['file'].reset() + print 'TEST: ', numpy.mean(loss), numpy.mean(acc) + + +if __name__ == '__main__': + main() -- GitLab