From e05abab60923c819c16980bdd53bdd684518f75b Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Thu, 31 May 2018 11:56:59 +0800 Subject: [PATCH] use recordio in dist train --- doc/v2/howto/recordio/README.md | 122 ++++++++++++++++++ .../details/threaded_ssa_graph_executor.cc | 6 +- .../reader/create_recordio_file_reader_op.cc | 12 +- python/paddle/fluid/layers/io.py | 20 +-- python/paddle/fluid/recordio_writer.py | 35 ++++- tools/codestyle/docstring_checker.pyc | Bin 11769 -> 11769 bytes 6 files changed, 178 insertions(+), 17 deletions(-) create mode 100644 doc/v2/howto/recordio/README.md diff --git a/doc/v2/howto/recordio/README.md b/doc/v2/howto/recordio/README.md new file mode 100644 index 0000000000..3f81d54b8e --- /dev/null +++ b/doc/v2/howto/recordio/README.md @@ -0,0 +1,122 @@ +# How to use RecordIO in Fluid + +If you want to use RecordIO as your training data format, you need to convert to your training data +to RecordIO files and reading them in the process of training, PaddlePaddle Fluid provides some +interface to deal with the RecordIO files. + +## Generate RecordIO File + +Before start training with RecordIO files, you need to convert your training data +to RecordIO format by `fluid.recordio_writer.convert_reader_to_recordio_file`, the sample codes +as follows: + +```python + reader = paddle.batch(mnist.train(), batch_size=1) + feeder = fluid.DataFeeder( + feed_list=[ # order is image and label + fluid.layers.data( + name='image', shape=[784]), + fluid.layers.data( + name='label', shape=[1], dtype='int64'), + ], + place=fluid.CPUPlace()) + fluid.recordio_writer.convert_reader_to_recordio_file('./mnist.recordio', reader, feeder) +``` + +The above codes would generate a RecordIO `./mnist.recordio` on your host. + +## Use the RecordIO file in a Local Training Job + +PaddlePaddle Fluid provides an interface `fluid.layers.io.open_recordio_file` to load your RecordIO file +and then you can use them as a Layer in your network configuration, the sample codes as follows: + +```python + data_file = fluid.layers.io.open_recordio_file( + filename="./mnist.recordio", + shapes=[(-1, 784),(-1, 1)], + lod_levels=[0, 0], + dtypes=["float32", "int32"]) + data_file = fluid.layers.io.batch(data_file, batch_size=4) + + img, label = fluid.layers.io.read_file(data_file) + hidden = fluid.layers.fc(input=img, size=100, act='tanh') + prediction = fluid.layers.fc(input=hidden, size=10, act='softmax') + loss = fluid.layers.cross_entropy(input=prediction, label=label) + avg_loss = fluid.layers.mean(loss) + + fluid.optimizer.Adam(learning_rate=1e-3).minimize(avg_loss) + + place = fluid.CPUPlace() + + exe = fluid.Executor(place) + exe.run(fluid.default_startup_program()) + avg_loss_np = [] + + # train a pass + batch_id = 0 + while True: + tmp, = exe.run(fetch_list=[avg_loss]) + + avg_loss_np.append(tmp) + print(batch_id) + batch_id += 1 +``` + +## Use the RecordIO files in Distributed Training + +1. generate multiple RecordIO files + +For a distributed training job, you may have multiple trainer nodes, +and one or more RecordIO files for one trainer node, you can use the interface +`fluid.recordio_writer.convert_reader_to_recordio_files` to convert your training data +into multiple RecordIO files, the sample codes as follows: + +```python + reader = paddle.batch(mnist.train(), batch_size=1) + feeder = fluid.DataFeeder( + feed_list=[ # order is image and label + fluid.layers.data( + name='image', shape=[784]), + fluid.layers.data( + name='label', shape=[1], dtype='int64'), + ], + place=fluid.CPUPlace()) + fluid.recordio_writer.convert_reader_to_recordio_files( + filename_suffix='./mnist.recordio', batch_per_file=100, reader, feeder) +``` + +The above codes would generate multiple RecordIO files on your host like: + +```bash +. + \_mnist.recordio-00000 + |-mnist.recordio-00001 + |-mnist.recordio-00002 + |-mnist.recordio-00003 + |-mnist.recordio-00004 +``` + +1. read these RecordIO files with `fluid.layers.io.open_recordio_file` + +For a distributed training job, the distributed operator system will schedule trainer process on multiple nodes, +each trainer process reads parts of the whole training data, we usually take the following approach to make the training +data allocated by each trainer process as uniform as possiable: + +```python +def gen_train_list(file_pattern, trainers, trainer_id): + file_list = glob.glob(file_pattern) + ret_list = [] + for idx, f in enumerate(file_list): + if (idx + trainers) % trainers == trainer_id: + ret_list.append(f) + return ret_list + +trainers = int(os.getenv("TRAINERS")) +trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID")) +data_file = fluid.layers.io.open_recordio_file( + filename=gen_train_list("./mnist.recordio*", trainers, trainer_id), + shapes=[(-1, 784),(-1, 1)], + lod_levels=[0, 0], + dtypes=["float32", "int32"]) +data_file = fluid.layers.io.batch(data_file, batch_size=4) +``` diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc index 815f739371..335c067dd7 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc @@ -189,9 +189,11 @@ void ThreadedSSAGraphExecutor::RunOp( BlockingQueue *ready_var_q, details::OpHandleBase *op) { auto op_run = [ready_var_q, op, this] { try { - VLOG(10) << op << " " << op->Name() << " : " << op->DebugString(); + VLOG(10) << "PE start " + << " " << op->Name() << " : " << op->DebugString(); op->Run(strategy_.use_event_); - VLOG(10) << op << " " << op->Name() << " Done "; + VLOG(10) << "PE end " + << " " << op->Name() << " Done "; running_ops_--; ready_var_q->Extend(op->Outputs()); VLOG(10) << op << " " << op->Name() << "Signal posted"; diff --git a/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc b/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc index 282ec3f36b..6b6d447026 100644 --- a/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc +++ b/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc @@ -65,20 +65,22 @@ class CreateRecordIOReaderOp : public framework::OperatorBase { static_cast(shape_concat.size()), "The accumulate of all ranks should be equal to the " "shape concat's length."); - std::string filename = Attr("filename"); + auto filenames = Attr>("filenames"); auto* out = scope.FindVar(Output("Out")) ->template GetMutable(); - - out->Reset(new RecordIOFileReader( - filename, RestoreShapes(shape_concat, ranks))); + for (auto& fn : filenames) { + out->Reset( + new RecordIOFileReader(fn, RestoreShapes(shape_concat, ranks))); + } } }; class CreateRecordIOReaderOpMaker : public FileReaderMakerBase { protected: void Apply() override { - AddAttr("filename", "The filename of record io reader"); + AddAttr>("filenames", + "The filenames of record io reader"); AddComment(R"DOC( CreateRecordIOReader Operator diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index 8758ac9f94..b9d5582730 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -21,7 +21,7 @@ from ..layer_helper import LayerHelper from ..executor import global_scope __all__ = [ - 'data', 'BlockGuardServ', 'ListenAndServ', 'Send', 'open_recordio_file', + 'data', 'BlockGuardServ', 'ListenAndServ', 'Send', 'open_recordio_files', 'open_files', 'read_file', 'shuffle', 'batch', 'double_buffer', 'random_data_generator', 'Preprocessor' ] @@ -291,12 +291,12 @@ def _copy_reader_create_op_(block, op): return new_op -def open_recordio_file(filename, - shapes, - lod_levels, - dtypes, - pass_num=1, - for_parallel=True): +def open_recordio_files(filenames, + shapes, + lod_levels, + dtypes, + pass_num=1, + for_parallel=True): """ Open a RecordIO file @@ -304,7 +304,7 @@ def open_recordio_file(filename, Via the Reader Variable, we can get data from the given RecordIO file. Args: - filename(str): The RecordIO file's name. + filename(str) or list(str): The RecordIO file's name. shapes(list): List of tuples which declaring data shapes. lod_levels(list): List of ints which declaring data lod_level. dtypes(list): List of strs which declaring data type. @@ -336,6 +336,8 @@ def open_recordio_file(filename, ranks.append(len(shape)) var_name = unique_name('open_recordio_file') + if isinstance(filenames, str): + filenames = [filenames] startup_blk = default_startup_program().current_block() startup_var = startup_blk.create_var(name=var_name) @@ -345,7 +347,7 @@ def open_recordio_file(filename, attrs={ 'shape_concat': shape_concat, 'lod_levels': lod_levels, - 'filename': filename, + 'filenames': filenames, 'ranks': ranks }) diff --git a/python/paddle/fluid/recordio_writer.py b/python/paddle/fluid/recordio_writer.py index 5accaacd53..7c966cba74 100644 --- a/python/paddle/fluid/recordio_writer.py +++ b/python/paddle/fluid/recordio_writer.py @@ -14,7 +14,7 @@ import core import contextlib - +from ..batch import batch __all__ = ['convert_reader_to_recordio_file'] @@ -46,3 +46,36 @@ def convert_reader_to_recordio_file( writer.complete_append_tensor() counter += 1 return counter + + +import paddle + + +def convert_reader_to_recordio_files( + filename_suffix, + batch_per_file, + reader_creator, + feeder, + compressor=core.RecordIOWriter.Compressor.Snappy, + max_num_records=1000, + feed_order=None): + if feed_order is None: + feed_order = feeder.feed_names + lines = [] + f_idx = 0 + counter = 0 + for idx, batch in enumerate(reader_creator()): + lines.append(batch) + if idx >= batch_per_file and idx % batch_per_file == 0: + filename = "%s-%05d" % (filename_suffix, f_idx) + with create_recordio_writer(filename, compressor, + max_num_records) as writer: + for l in lines: + res = feeder.feed(l) + for each in feed_order: + writer.append_tensor(res[each]) + writer.complete_append_tensor() + counter += 1 + lines = [] + f_idx += 1 + return counter diff --git a/tools/codestyle/docstring_checker.pyc b/tools/codestyle/docstring_checker.pyc index 1ce612ca2318ccb9b9f28d51cb93ce8e5e1d0680..07e875aec6c9bae8002bde4223348c6a29647b03 100644 GIT binary patch delta 939 zcmewv{WF@K`72B7N6d=Nrn_Hwj znTRk=T=oZH({v_B$ZsaXuuz4!1Pt38uT)5kv!qmAh*7pe-Ij1_)ZMJFElI##!0e@OH)lM*viO7&gxlS^|`^Gb^K(=$ur%kzt}iwhEyQzyqO z>1?)OiX=kW