提交 db46778b 编写于 作者: Y Yu Yang

Polish codes and comments

上级 5cb79524
...@@ -24,13 +24,21 @@ namespace paddle { ...@@ -24,13 +24,21 @@ namespace paddle {
namespace recordio { namespace recordio {
constexpr size_t kMaxBufSize = 1024; constexpr size_t kMaxBufSize = 1024;
/**
* Read Stream by a fixed sized buffer.
* @param in input stream
* @param limit read at most `limit` bytes from input stream. 0 means no limit
* @param callback A function object with (const char* buf, size_t size) -> void
* as its type.
*/
template <typename Callback> template <typename Callback>
static void ReadStreamByBuf(std::istream& in, size_t limit, Callback callback) { static void ReadStreamByBuf(std::istream& in, size_t limit, Callback callback) {
char buf[kMaxBufSize]; char buf[kMaxBufSize];
std::streamsize actual_size; std::streamsize actual_size;
size_t counter = 0; size_t counter = 0;
size_t actual_max; size_t actual_max;
while (!in.eof() || (limit != 0 && counter >= limit)) { while (!in.eof() ||
(limit != 0 && counter >= limit)) { // End of file or reach limit
actual_max = actual_max =
limit != 0 ? std::min(limit - counter, kMaxBufSize) : kMaxBufSize; limit != 0 ? std::min(limit - counter, kMaxBufSize) : kMaxBufSize;
in.read(buf, actual_max); in.read(buf, actual_max);
...@@ -46,10 +54,17 @@ static void ReadStreamByBuf(std::istream& in, size_t limit, Callback callback) { ...@@ -46,10 +54,17 @@ static void ReadStreamByBuf(std::istream& in, size_t limit, Callback callback) {
in.clear(); // unset eof state in.clear(); // unset eof state
} }
/**
* Copy stream in to another stream
*/
static void PipeStream(std::istream& in, std::ostream& os) { static void PipeStream(std::istream& in, std::ostream& os) {
ReadStreamByBuf( ReadStreamByBuf(
in, 0, [&os](const char* buf, size_t len) { os.write(buf, len); }); in, 0, [&os](const char* buf, size_t len) { os.write(buf, len); });
} }
/**
* Calculate CRC32 from an input stream.
*/
static uint32_t Crc32Stream(std::istream& in, size_t limit = 0) { static uint32_t Crc32Stream(std::istream& in, size_t limit = 0) {
uint32_t crc = static_cast<uint32_t>(crc32(0, nullptr, 0)); uint32_t crc = static_cast<uint32_t>(crc32(0, nullptr, 0));
ReadStreamByBuf(in, limit, [&crc](const char* buf, size_t len) { ReadStreamByBuf(in, limit, [&crc](const char* buf, size_t len) {
...@@ -89,7 +104,9 @@ bool Chunk::Write(std::ostream& os, Compressor ct) const { ...@@ -89,7 +104,9 @@ bool Chunk::Write(std::ostream& os, Compressor ct) const {
compressed_stream.reset(); compressed_stream.reset();
} }
uint32_t len = static_cast<uint32_t>(sout.str().size()); sout.seekg(0, std::ios::end);
uint32_t len = static_cast<uint32_t>(sout.tellg());
sout.seekg(0, std::ios::beg);
uint32_t crc = Crc32Stream(sout); uint32_t crc = Crc32Stream(sout);
Header hdr(static_cast<uint32_t>(records_.size()), crc, ct, len); Header hdr(static_cast<uint32_t>(records_.size()), crc, ct, len);
hdr.Write(os); hdr.Write(os);
......
...@@ -272,7 +272,7 @@ def read_file(file_obj): ...@@ -272,7 +272,7 @@ def read_file(file_obj):
out = [ out = [
helper.create_tmp_variable( helper.create_tmp_variable(
stop_gradient=True, dtype='float32') stop_gradient=True, dtype='float32')
for i in range(len(file_obj.desc.shapes())) for _ in range(len(file_obj.desc.shapes()))
] ]
helper.append_op( helper.append_op(
type='read', inputs={'Reader': [file_obj]}, outputs={'Out': out}) type='read', inputs={'Reader': [file_obj]}, outputs={'Out': out})
......
...@@ -13,33 +13,18 @@ ...@@ -13,33 +13,18 @@
# limitations under the License. # limitations under the License.
import core import core
import contextlib
__all__ = ['convert_reader_to_recordio_file']
class RecordIOWriter(object):
def __init__(self,
filename,
compressor=core.RecordIOWriter.Compressor.Snappy,
max_num_records=1000):
self.filename = filename
self.compressor = compressor
self.max_num_records = max_num_records
self.writer = None
def __enter__(self): @contextlib.contextmanager
self.writer = core.RecordIOWriter(self.filename, self.compressor, def create_recordio_writer(filename,
self.max_num_records) compressor=core.RecordIOWriter.Compressor.Snappy,
max_num_records=1000):
def __exit__(self, exc_type, exc_val, exc_tb): writer = core.RecordIOWriter(filename, compressor, max_num_records)
if exc_type is not None: yield writer
return False writer.close()
else:
self.writer.close()
def append_tensor(self, tensor):
self.writer.append_tensor(tensor)
def complete_append_tensor(self):
self.writer.complete_append_tensor()
def convert_reader_to_recordio_file( def convert_reader_to_recordio_file(
...@@ -49,14 +34,12 @@ def convert_reader_to_recordio_file( ...@@ -49,14 +34,12 @@ def convert_reader_to_recordio_file(
compressor=core.RecordIOWriter.Compressor.Snappy, compressor=core.RecordIOWriter.Compressor.Snappy,
max_num_records=1000, max_num_records=1000,
feed_order=None): feed_order=None):
writer = RecordIOWriter(filename, compressor, max_num_records) if feed_order is None:
with writer: feed_order = feeder.feed_names
with create_recordio_writer(filename, compressor,
max_num_records) as writer:
for batch in reader_creator(): for batch in reader_creator():
res = feeder.feed(batch) res = feeder.feed(batch)
if feed_order is None: for each in feed_order:
for each in res: writer.append_tensor(res[each])
writer.append_tensor(res[each])
else:
for each in feed_order:
writer.append_tensor(res[each])
writer.complete_append_tensor() writer.complete_append_tensor()
...@@ -20,22 +20,21 @@ import paddle.v2 as paddle ...@@ -20,22 +20,21 @@ import paddle.v2 as paddle
class TestRecordIO(unittest.TestCase): class TestRecordIO(unittest.TestCase):
def setUp(self): def setUp(self):
# Convert mnist to recordio file
with fluid.program_guard(fluid.Program()): with fluid.program_guard(fluid.Program()):
reader = paddle.batch(mnist.train(), batch_size=32) reader = paddle.batch(mnist.train(), batch_size=32)
feeder = fluid.DataFeeder( feeder = fluid.DataFeeder(
feed_list=[ feed_list=[ # order is image and label
fluid.layers.data( fluid.layers.data(
name='image', shape=[784]), fluid.layers.data( name='image', shape=[784]),
name='label', shape=[1], dtype='int64') fluid.layers.data(
name='label', shape=[1], dtype='int64'),
], ],
place=fluid.CPUPlace()) place=fluid.CPUPlace())
fluid.recordio_writer.convert_reader_to_recordio_file( fluid.recordio_writer.convert_reader_to_recordio_file(
'./mnist.recordio', './mnist.recordio', reader, feeder)
reader,
feeder,
feed_order=['image', 'label'])
def testMain(self): def test_main(self):
data_file = fluid.layers.open_recordio_file( data_file = fluid.layers.open_recordio_file(
'./mnist.recordio', './mnist.recordio',
shapes=[[-1, 784], [-1, 1]], shapes=[[-1, 784], [-1, 1]],
...@@ -48,9 +47,12 @@ class TestRecordIO(unittest.TestCase): ...@@ -48,9 +47,12 @@ class TestRecordIO(unittest.TestCase):
loss = fluid.layers.cross_entropy(input=prediction, label=label) loss = fluid.layers.cross_entropy(input=prediction, label=label)
avg_loss = fluid.layers.mean(loss) avg_loss = fluid.layers.mean(loss)
fluid.optimizer.SGD(learning_rate=1e-3).minimize(avg_loss) fluid.optimizer.Adam(learning_rate=1e-3).minimize(avg_loss)
exe = fluid.Executor(fluid.CPUPlace()) exe = fluid.Executor(fluid.CPUPlace())
exe.run(fluid.default_startup_program()) exe.run(fluid.default_startup_program())
avg_loss_np, = exe.run(fetch_list=[avg_loss]) avg_loss_np = []
print avg_loss_np for i in xrange(100): # train 100 mini-batch
tmp, = exe.run(fetch_list=[avg_loss])
avg_loss_np.append(tmp)
self.assertLess(avg_loss_np[-1], avg_loss_np[0])
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册