提交 e05abab6 编写于 作者: Y Yancey1989

use recordio in dist train

上级 ccf61b30
# How to use RecordIO in Fluid
If you want to use RecordIO as your training data format, you need to convert to your training data
to RecordIO files and reading them in the process of training, PaddlePaddle Fluid provides some
interface to deal with the RecordIO files.
## Generate RecordIO File
Before start training with RecordIO files, you need to convert your training data
to RecordIO format by `fluid.recordio_writer.convert_reader_to_recordio_file`, the sample codes
as follows:
```python
reader = paddle.batch(mnist.train(), batch_size=1)
feeder = fluid.DataFeeder(
feed_list=[ # order is image and label
fluid.layers.data(
name='image', shape=[784]),
fluid.layers.data(
name='label', shape=[1], dtype='int64'),
],
place=fluid.CPUPlace())
fluid.recordio_writer.convert_reader_to_recordio_file('./mnist.recordio', reader, feeder)
```
The above codes would generate a RecordIO `./mnist.recordio` on your host.
## Use the RecordIO file in a Local Training Job
PaddlePaddle Fluid provides an interface `fluid.layers.io.open_recordio_file` to load your RecordIO file
and then you can use them as a Layer in your network configuration, the sample codes as follows:
```python
data_file = fluid.layers.io.open_recordio_file(
filename="./mnist.recordio",
shapes=[(-1, 784),(-1, 1)],
lod_levels=[0, 0],
dtypes=["float32", "int32"])
data_file = fluid.layers.io.batch(data_file, batch_size=4)
img, label = fluid.layers.io.read_file(data_file)
hidden = fluid.layers.fc(input=img, size=100, act='tanh')
prediction = fluid.layers.fc(input=hidden, size=10, act='softmax')
loss = fluid.layers.cross_entropy(input=prediction, label=label)
avg_loss = fluid.layers.mean(loss)
fluid.optimizer.Adam(learning_rate=1e-3).minimize(avg_loss)
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
avg_loss_np = []
# train a pass
batch_id = 0
while True:
tmp, = exe.run(fetch_list=[avg_loss])
avg_loss_np.append(tmp)
print(batch_id)
batch_id += 1
```
## Use the RecordIO files in Distributed Training
1. generate multiple RecordIO files
For a distributed training job, you may have multiple trainer nodes,
and one or more RecordIO files for one trainer node, you can use the interface
`fluid.recordio_writer.convert_reader_to_recordio_files` to convert your training data
into multiple RecordIO files, the sample codes as follows:
```python
reader = paddle.batch(mnist.train(), batch_size=1)
feeder = fluid.DataFeeder(
feed_list=[ # order is image and label
fluid.layers.data(
name='image', shape=[784]),
fluid.layers.data(
name='label', shape=[1], dtype='int64'),
],
place=fluid.CPUPlace())
fluid.recordio_writer.convert_reader_to_recordio_files(
filename_suffix='./mnist.recordio', batch_per_file=100, reader, feeder)
```
The above codes would generate multiple RecordIO files on your host like:
```bash
.
\_mnist.recordio-00000
|-mnist.recordio-00001
|-mnist.recordio-00002
|-mnist.recordio-00003
|-mnist.recordio-00004
```
1. read these RecordIO files with `fluid.layers.io.open_recordio_file`
For a distributed training job, the distributed operator system will schedule trainer process on multiple nodes,
each trainer process reads parts of the whole training data, we usually take the following approach to make the training
data allocated by each trainer process as uniform as possiable:
```python
def gen_train_list(file_pattern, trainers, trainer_id):
file_list = glob.glob(file_pattern)
ret_list = []
for idx, f in enumerate(file_list):
if (idx + trainers) % trainers == trainer_id:
ret_list.append(f)
return ret_list
trainers = int(os.getenv("TRAINERS"))
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))
data_file = fluid.layers.io.open_recordio_file(
filename=gen_train_list("./mnist.recordio*", trainers, trainer_id),
shapes=[(-1, 784),(-1, 1)],
lod_levels=[0, 0],
dtypes=["float32", "int32"])
data_file = fluid.layers.io.batch(data_file, batch_size=4)
```
...@@ -189,9 +189,11 @@ void ThreadedSSAGraphExecutor::RunOp( ...@@ -189,9 +189,11 @@ void ThreadedSSAGraphExecutor::RunOp(
BlockingQueue<VarHandleBase *> *ready_var_q, details::OpHandleBase *op) { BlockingQueue<VarHandleBase *> *ready_var_q, details::OpHandleBase *op) {
auto op_run = [ready_var_q, op, this] { auto op_run = [ready_var_q, op, this] {
try { try {
VLOG(10) << op << " " << op->Name() << " : " << op->DebugString(); VLOG(10) << "PE start "
<< " " << op->Name() << " : " << op->DebugString();
op->Run(strategy_.use_event_); op->Run(strategy_.use_event_);
VLOG(10) << op << " " << op->Name() << " Done "; VLOG(10) << "PE end "
<< " " << op->Name() << " Done ";
running_ops_--; running_ops_--;
ready_var_q->Extend(op->Outputs()); ready_var_q->Extend(op->Outputs());
VLOG(10) << op << " " << op->Name() << "Signal posted"; VLOG(10) << op << " " << op->Name() << "Signal posted";
......
...@@ -65,20 +65,22 @@ class CreateRecordIOReaderOp : public framework::OperatorBase { ...@@ -65,20 +65,22 @@ class CreateRecordIOReaderOp : public framework::OperatorBase {
static_cast<int>(shape_concat.size()), static_cast<int>(shape_concat.size()),
"The accumulate of all ranks should be equal to the " "The accumulate of all ranks should be equal to the "
"shape concat's length."); "shape concat's length.");
std::string filename = Attr<std::string>("filename"); auto filenames = Attr<std::vector<std::string>>("filenames");
auto* out = scope.FindVar(Output("Out")) auto* out = scope.FindVar(Output("Out"))
->template GetMutable<framework::ReaderHolder>(); ->template GetMutable<framework::ReaderHolder>();
for (auto& fn : filenames) {
out->Reset(new RecordIOFileReader<true>( out->Reset(
filename, RestoreShapes(shape_concat, ranks))); new RecordIOFileReader<true>(fn, RestoreShapes(shape_concat, ranks)));
}
} }
}; };
class CreateRecordIOReaderOpMaker : public FileReaderMakerBase { class CreateRecordIOReaderOpMaker : public FileReaderMakerBase {
protected: protected:
void Apply() override { void Apply() override {
AddAttr<std::string>("filename", "The filename of record io reader"); AddAttr<std::vector<std::string>>("filenames",
"The filenames of record io reader");
AddComment(R"DOC( AddComment(R"DOC(
CreateRecordIOReader Operator CreateRecordIOReader Operator
......
...@@ -21,7 +21,7 @@ from ..layer_helper import LayerHelper ...@@ -21,7 +21,7 @@ from ..layer_helper import LayerHelper
from ..executor import global_scope from ..executor import global_scope
__all__ = [ __all__ = [
'data', 'BlockGuardServ', 'ListenAndServ', 'Send', 'open_recordio_file', 'data', 'BlockGuardServ', 'ListenAndServ', 'Send', 'open_recordio_files',
'open_files', 'read_file', 'shuffle', 'batch', 'double_buffer', 'open_files', 'read_file', 'shuffle', 'batch', 'double_buffer',
'random_data_generator', 'Preprocessor' 'random_data_generator', 'Preprocessor'
] ]
...@@ -291,7 +291,7 @@ def _copy_reader_create_op_(block, op): ...@@ -291,7 +291,7 @@ def _copy_reader_create_op_(block, op):
return new_op return new_op
def open_recordio_file(filename, def open_recordio_files(filenames,
shapes, shapes,
lod_levels, lod_levels,
dtypes, dtypes,
...@@ -304,7 +304,7 @@ def open_recordio_file(filename, ...@@ -304,7 +304,7 @@ def open_recordio_file(filename,
Via the Reader Variable, we can get data from the given RecordIO file. Via the Reader Variable, we can get data from the given RecordIO file.
Args: Args:
filename(str): The RecordIO file's name. filename(str) or list(str): The RecordIO file's name.
shapes(list): List of tuples which declaring data shapes. shapes(list): List of tuples which declaring data shapes.
lod_levels(list): List of ints which declaring data lod_level. lod_levels(list): List of ints which declaring data lod_level.
dtypes(list): List of strs which declaring data type. dtypes(list): List of strs which declaring data type.
...@@ -336,6 +336,8 @@ def open_recordio_file(filename, ...@@ -336,6 +336,8 @@ def open_recordio_file(filename,
ranks.append(len(shape)) ranks.append(len(shape))
var_name = unique_name('open_recordio_file') var_name = unique_name('open_recordio_file')
if isinstance(filenames, str):
filenames = [filenames]
startup_blk = default_startup_program().current_block() startup_blk = default_startup_program().current_block()
startup_var = startup_blk.create_var(name=var_name) startup_var = startup_blk.create_var(name=var_name)
...@@ -345,7 +347,7 @@ def open_recordio_file(filename, ...@@ -345,7 +347,7 @@ def open_recordio_file(filename,
attrs={ attrs={
'shape_concat': shape_concat, 'shape_concat': shape_concat,
'lod_levels': lod_levels, 'lod_levels': lod_levels,
'filename': filename, 'filenames': filenames,
'ranks': ranks 'ranks': ranks
}) })
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
import core import core
import contextlib import contextlib
from ..batch import batch
__all__ = ['convert_reader_to_recordio_file'] __all__ = ['convert_reader_to_recordio_file']
...@@ -46,3 +46,36 @@ def convert_reader_to_recordio_file( ...@@ -46,3 +46,36 @@ def convert_reader_to_recordio_file(
writer.complete_append_tensor() writer.complete_append_tensor()
counter += 1 counter += 1
return counter return counter
import paddle
def convert_reader_to_recordio_files(
filename_suffix,
batch_per_file,
reader_creator,
feeder,
compressor=core.RecordIOWriter.Compressor.Snappy,
max_num_records=1000,
feed_order=None):
if feed_order is None:
feed_order = feeder.feed_names
lines = []
f_idx = 0
counter = 0
for idx, batch in enumerate(reader_creator()):
lines.append(batch)
if idx >= batch_per_file and idx % batch_per_file == 0:
filename = "%s-%05d" % (filename_suffix, f_idx)
with create_recordio_writer(filename, compressor,
max_num_records) as writer:
for l in lines:
res = feeder.feed(l)
for each in feed_order:
writer.append_tensor(res[each])
writer.complete_append_tensor()
counter += 1
lines = []
f_idx += 1
return counter
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册