diff --git a/paddle/fluid/recordio/chunk.cc b/paddle/fluid/recordio/chunk.cc index 4fed4889776f96049d76a2d741931bce5105cd83..13d059f844aebb847282a80a43e4cbdad71d7fa0 100644 --- a/paddle/fluid/recordio/chunk.cc +++ b/paddle/fluid/recordio/chunk.cc @@ -24,13 +24,21 @@ namespace paddle { namespace recordio { constexpr size_t kMaxBufSize = 1024; +/** + * Read Stream by a fixed sized buffer. + * @param in input stream + * @param limit read at most `limit` bytes from input stream. 0 means no limit + * @param callback A function object with (const char* buf, size_t size) -> void + * as its type. + */ template static void ReadStreamByBuf(std::istream& in, size_t limit, Callback callback) { char buf[kMaxBufSize]; std::streamsize actual_size; size_t counter = 0; size_t actual_max; - while (!in.eof() || (limit != 0 && counter >= limit)) { + while (!in.eof() || + (limit != 0 && counter >= limit)) { // End of file or reach limit actual_max = limit != 0 ? std::min(limit - counter, kMaxBufSize) : kMaxBufSize; in.read(buf, actual_max); @@ -46,10 +54,17 @@ static void ReadStreamByBuf(std::istream& in, size_t limit, Callback callback) { in.clear(); // unset eof state } +/** + * Copy stream in to another stream + */ static void PipeStream(std::istream& in, std::ostream& os) { ReadStreamByBuf( in, 0, [&os](const char* buf, size_t len) { os.write(buf, len); }); } + +/** + * Calculate CRC32 from an input stream. + */ static uint32_t Crc32Stream(std::istream& in, size_t limit = 0) { uint32_t crc = static_cast(crc32(0, nullptr, 0)); ReadStreamByBuf(in, limit, [&crc](const char* buf, size_t len) { @@ -89,7 +104,9 @@ bool Chunk::Write(std::ostream& os, Compressor ct) const { compressed_stream.reset(); } - uint32_t len = static_cast(sout.str().size()); + sout.seekg(0, std::ios::end); + uint32_t len = static_cast(sout.tellg()); + sout.seekg(0, std::ios::beg); uint32_t crc = Crc32Stream(sout); Header hdr(static_cast(records_.size()), crc, ct, len); hdr.Write(os); diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index 5d8d76c404fe81964bd8fdb3df6ea8d95a4f6c15..641cee3bd921bfc03c8aa0ebad102ea82ef55097 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -272,7 +272,7 @@ def read_file(file_obj): out = [ helper.create_tmp_variable( stop_gradient=True, dtype='float32') - for i in range(len(file_obj.desc.shapes())) + for _ in range(len(file_obj.desc.shapes())) ] helper.append_op( type='read', inputs={'Reader': [file_obj]}, outputs={'Out': out}) diff --git a/python/paddle/fluid/recordio_writer.py b/python/paddle/fluid/recordio_writer.py index 12debb639ef7c5c3762ce7c6267b644691ba7d05..9735df8c06113230af9695f76a7589ea9f50e527 100644 --- a/python/paddle/fluid/recordio_writer.py +++ b/python/paddle/fluid/recordio_writer.py @@ -13,33 +13,18 @@ # limitations under the License. import core +import contextlib +__all__ = ['convert_reader_to_recordio_file'] -class RecordIOWriter(object): - def __init__(self, - filename, - compressor=core.RecordIOWriter.Compressor.Snappy, - max_num_records=1000): - self.filename = filename - self.compressor = compressor - self.max_num_records = max_num_records - self.writer = None - def __enter__(self): - self.writer = core.RecordIOWriter(self.filename, self.compressor, - self.max_num_records) - - def __exit__(self, exc_type, exc_val, exc_tb): - if exc_type is not None: - return False - else: - self.writer.close() - - def append_tensor(self, tensor): - self.writer.append_tensor(tensor) - - def complete_append_tensor(self): - self.writer.complete_append_tensor() +@contextlib.contextmanager +def create_recordio_writer(filename, + compressor=core.RecordIOWriter.Compressor.Snappy, + max_num_records=1000): + writer = core.RecordIOWriter(filename, compressor, max_num_records) + yield writer + writer.close() def convert_reader_to_recordio_file( @@ -49,14 +34,12 @@ def convert_reader_to_recordio_file( compressor=core.RecordIOWriter.Compressor.Snappy, max_num_records=1000, feed_order=None): - writer = RecordIOWriter(filename, compressor, max_num_records) - with writer: + if feed_order is None: + feed_order = feeder.feed_names + with create_recordio_writer(filename, compressor, + max_num_records) as writer: for batch in reader_creator(): res = feeder.feed(batch) - if feed_order is None: - for each in res: - writer.append_tensor(res[each]) - else: - for each in feed_order: - writer.append_tensor(res[each]) + for each in feed_order: + writer.append_tensor(res[each]) writer.complete_append_tensor() diff --git a/python/paddle/fluid/tests/unittests/.gitignore b/python/paddle/fluid/tests/unittests/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..6b3fc2a83c649c28d21c9a8a0b35c2f2fa04f269 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/.gitignore @@ -0,0 +1 @@ +mnist.recordio diff --git a/python/paddle/fluid/tests/unittests/test_recordio_reader.py b/python/paddle/fluid/tests/unittests/test_recordio_reader.py index de4c314cd0d29f85d74a593b5b5a16ebcc46c7a8..1a135fcdd0b54b5e8fa876d7016a7a08bb7fdcb1 100644 --- a/python/paddle/fluid/tests/unittests/test_recordio_reader.py +++ b/python/paddle/fluid/tests/unittests/test_recordio_reader.py @@ -20,22 +20,21 @@ import paddle.v2 as paddle class TestRecordIO(unittest.TestCase): def setUp(self): + # Convert mnist to recordio file with fluid.program_guard(fluid.Program()): reader = paddle.batch(mnist.train(), batch_size=32) feeder = fluid.DataFeeder( - feed_list=[ + feed_list=[ # order is image and label fluid.layers.data( - name='image', shape=[784]), fluid.layers.data( - name='label', shape=[1], dtype='int64') + name='image', shape=[784]), + fluid.layers.data( + name='label', shape=[1], dtype='int64'), ], place=fluid.CPUPlace()) fluid.recordio_writer.convert_reader_to_recordio_file( - './mnist.recordio', - reader, - feeder, - feed_order=['image', 'label']) + './mnist.recordio', reader, feeder) - def testMain(self): + def test_main(self): data_file = fluid.layers.open_recordio_file( './mnist.recordio', shapes=[[-1, 784], [-1, 1]], @@ -48,9 +47,12 @@ class TestRecordIO(unittest.TestCase): loss = fluid.layers.cross_entropy(input=prediction, label=label) avg_loss = fluid.layers.mean(loss) - fluid.optimizer.SGD(learning_rate=1e-3).minimize(avg_loss) + fluid.optimizer.Adam(learning_rate=1e-3).minimize(avg_loss) exe = fluid.Executor(fluid.CPUPlace()) exe.run(fluid.default_startup_program()) - avg_loss_np, = exe.run(fetch_list=[avg_loss]) - print avg_loss_np + avg_loss_np = [] + for i in xrange(100): # train 100 mini-batch + tmp, = exe.run(fetch_list=[avg_loss]) + avg_loss_np.append(tmp) + self.assertLess(avg_loss_np[-1], avg_loss_np[0])