diff --git a/paddle/fluid/recordio/CMakeLists.txt b/paddle/fluid/recordio/CMakeLists.txt index 5d55709b4bbffafefcfef85993ead5f879a2a674..46188e0a5b0247bbe2ae05bd87ede94f4f075e24 100644 --- a/paddle/fluid/recordio/CMakeLists.txt +++ b/paddle/fluid/recordio/CMakeLists.txt @@ -1,5 +1,14 @@ -cc_library(header SRCS header.cc) -cc_test(header_test SRCS header_test.cc DEPS header) +# internal library. cc_library(io SRCS io.cc DEPS stringpiece) cc_test(io_test SRCS io_test.cc DEPS io) +cc_library(header SRCS header.cc DEPS io) +cc_test(header_test SRCS header_test.cc DEPS header) cc_library(chunk SRCS chunk.cc DEPS snappy) +cc_test(chunk_test SRCS chunk_test.cc DEPS chunk) +cc_library(range_scanner SRCS range_scanner.cc DEPS io chunk) +cc_test(range_scanner_test SRCS range_scanner_test.cc DEPS range_scanner) +cc_library(scanner SRCS scanner.cc DEPS range_scanner) +cc_test(scanner_test SRCS scanner_test.cc DEPS scanner) +# exported library. +cc_library(recordio SRCS recordio.cc DEPS scanner chunk header) +cc_test(recordio_test SRCS recordio_test.cc DEPS scanner) diff --git a/paddle/fluid/recordio/chunk.h b/paddle/fluid/recordio/chunk.h index a36c71cf4cce202aa57af74b7633f1555dbe6758..661364cd5d8019ca0b42bfb0d945903303271ac9 100644 --- a/paddle/fluid/recordio/chunk.h +++ b/paddle/fluid/recordio/chunk.h @@ -32,9 +32,10 @@ public: bool Dump(Stream* fo, Compressor ct); void Parse(Stream* fi, size_t offset); size_t NumBytes() { return num_bytes_; } + const std::string Record(int i) { return records_[i]; } private: - std::forward_list records_; + std::forward_list records_; // sum of record lengths in bytes. size_t num_bytes_; DISABLE_COPY_AND_ASSIGN(Chunk); diff --git a/paddle/fluid/recordio/header_test.cc b/paddle/fluid/recordio/header_test.cc index 12e8f14ced426eeac43cb12253252cb3332d7534..d6ab26732901605417e1e9a18130cbe7bad87cba 100644 --- a/paddle/fluid/recordio/header_test.cc +++ b/paddle/fluid/recordio/header_test.cc @@ -22,12 +22,18 @@ using namespace paddle::recordio; TEST(Recordio, ChunkHead) { Header hdr(0, 1, Compressor::kGzip, 3); - Stream* oss = Stream::Open("/tmp/record_1", "w"); - hdr->Write(oss); + { + Stream* oss = Stream::Open("/tmp/record_1", "w"); + hdr.Write(oss); + delete oss; + } - // Stream* iss = Stream::Open("/tmp/record_1", "r"); - // Header hdr2; - // hdr2.Parse(iss); + Header hdr2; + { + Stream* iss = Stream::Open("/tmp/record_1", "r"); + hdr2.Parse(iss); + delete iss; + } - // EXPECT_TRUE(hdr == hdr2); + EXPECT_TRUE(hdr == hdr2); } diff --git a/paddle/fluid/recordio/range_scanner.cc b/paddle/fluid/recordio/range_scanner.cc index 4c0e80e2f88885725a036e56b84c6ce8487b6ab1..faf5078ba90ecbe561c3c2253d54b482c2fb783b 100644 --- a/paddle/fluid/recordio/range_scanner.cc +++ b/paddle/fluid/recordio/range_scanner.cc @@ -17,10 +17,37 @@ namespace paddle { namespace recordio { +void Index::LoadIndex(FileStream* fi) { + int64_t offset = 0; + while (!fi->Eof()) { + Header hdr; + hdr.Parse(fi); + chunk_offsets_.push_back(offset); + chunk_lens_.push_back(hdr.NumRecords()); + chunk_records_.push_back(hdr.NumRecords()); + num_records_ += hdr.NumRecords(); + offset += hdr.CompressSize(); + } +} + Index Index::ChunkIndex(int i) { Index idx; } -RangeScanner::RangeScanner(std::istream is, Index idx, int start, int len) - : stream_(is.rdbuf()), index_(idx) { +std::pair Index::Locate(int record_idx) { + std::pair range(-1, -1); + int sum = 0; + for (size_t i = 0; i < chunk_lens_.size(); ++i) { + int len = static_cast(chunk_lens_[i]); + sum += len; + if (record_idx < sum) { + range.first = static_cast(i); + range.second = record_idx - sum + len; + } + } + return range; +} + +RangeScanner::RangeScanner(Stream* fi, Index idx, int start, int len) + : stream_(fi), index_(idx) { if (start < 0) { start = 0; } @@ -30,16 +57,28 @@ RangeScanner::RangeScanner(std::istream is, Index idx, int start, int len) start_ = start; end_ = start + len; - cur_ = start - 1; + cur_ = start - 1; // The intial status required by Scan chunk_index_ = -1; - // chunk_->reset(new Chunk()); + chunk_.reset(new Chunk); } -bool RangeScanner::Scan() {} +bool RangeScanner::Scan() { + ++cur_; + if (cur_ >= end_) { + return false; + } else { + auto cursor = index_.Locate(cur_); + if (chunk_index_ != cursor.first) { + chunk_index_ = cursor.first; + chunk_->Parse(fi, index_.ChunkOffsets[chunk_index_]); + } + } + return true; +} const std::string RangeScanner::Record() { - // int i = index_.Locate(cur_); - // return chunk_->Record(i); + auto cursor = index_.Locate(cur_); + return chunk_->Record(cursor.second); } } // namespace recordio diff --git a/paddle/fluid/recordio/range_scanner.h b/paddle/fluid/recordio/range_scanner.h index 000a328d77478cb0780161f6842500fe653f526c..043fd8091e8e1ef9c3e1253c8073066070a9d355 100644 --- a/paddle/fluid/recordio/range_scanner.h +++ b/paddle/fluid/recordio/range_scanner.h @@ -14,6 +14,9 @@ #pragma once +#include + +#include "paddle/fluid/recordio/chunk.h" #include "paddle/fluid/recordio/io.h" namespace paddle { @@ -26,29 +29,22 @@ namespace recordio { // for the correct encoding and decoding using Gob. class Index { public: + Index() : num_records_(0) {} + // LoadIndex scans the file and parse chunkOffsets, chunkLens, and len. + void LoadIndex(Stream* fi); + // NumRecords returns the total number of all records in a RecordIO file. int NumRecords() { return num_records_; } // NumChunks returns the total number of chunks in a RecordIO file. int NumChunks() { return chunk_lens_.size(); } // ChunkIndex return the Index of i-th Chunk. int ChunkIndex(int i); + int64_t ChunkOffsets(int i) { return chunk_offsets_[i]; } + // Locate returns the index of chunk that contains the given record, // and the record index within the chunk. It returns (-1, -1) if the // record is out of range. - void Locate(int record_idx, std::pair* out) { - size_t sum = 0; - for (size_t i = 0; i < chunk_lens_.size(); ++i) { - sum += chunk_lens_[i]; - if (static_cast(record_idx) < sum) { - out->first = i; - out->second = record_idx - sum + chunk_lens_[i]; - return; - } - } - // out->swap(std::make_pair(-1, -1)); - out->first = -1; - out->second = -1; - } + std::pair Locate(int record_idx); private: // the offset of each chunk in a file. @@ -62,12 +58,14 @@ private: }; // RangeScanner -// creates a scanner that sequencially reads records in the -// range [start, start+len). If start < 0, it scans from the -// beginning. If len < 0, it scans till the end of file. class RangeScanner { public: + // creates a scanner that sequencially reads records in the + // range [start, start+len). If start < 0, it scans from the + // beginning. If len < 0, it scans till the end of file. RangeScanner(Stream* fi, Index idx, int start, int end); + // Scan moves the cursor forward for one record and loads the chunk + // containing the record if not yet. bool Scan(); const std::string Record(); diff --git a/paddle/fluid/recordio/range_scanner_test.cc b/paddle/fluid/recordio/range_scanner_test.cc new file mode 100644 index 0000000000000000000000000000000000000000..e365efc48b6aabb6d699ddb8099d28872efec2de --- /dev/null +++ b/paddle/fluid/recordio/range_scanner_test.cc @@ -0,0 +1,23 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/recordio/range_scanner.h" + +#include "gtest/gtest.h" + +using namespace paddle::recordio; + +TEST(RangeScanner, Recordio) { + Stream* fo = Stream::Open("/tmp/record_range", "w"); +} diff --git a/paddle/fluid/recordio/filesys.h b/paddle/fluid/recordio/recordio.cc similarity index 79% rename from paddle/fluid/recordio/filesys.h rename to paddle/fluid/recordio/recordio.cc index b21702bf3a0dcff282fb06edead46c3bf295602f..f8ed1fedf63998fe6c27ff9e94643aec7f8c7b6e 100644 --- a/paddle/fluid/recordio/filesys.h +++ b/paddle/fluid/recordio/recordio.cc @@ -12,13 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -#pragma once +#include "paddle/fluid/recordio/io.h" +#include "paddle/fluid/string/piece.h" -#include -#include -#include - -class DefaultFileSys { -public: -private: -}; +namespace paddle { +namespace recordio {} // namespace recordio +} // namespace paddle diff --git a/paddle/fluid/recordio/recordio.h b/paddle/fluid/recordio/recordio.h new file mode 100644 index 0000000000000000000000000000000000000000..39ae953ce1a10c70067f4409f9bbf968743428da --- /dev/null +++ b/paddle/fluid/recordio/recordio.h @@ -0,0 +1,20 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once + +#include "paddle/fluid/recordio/chunk.h" +#include "paddle/fluid/recordio/header.h" +#include "paddle/fluid/recordio/io.h" +#include "paddle/fluid/recordio/scanner.h" +#include "paddle/fluid/recordio/writer.h" diff --git a/paddle/fluid/recordio/scanner.cc b/paddle/fluid/recordio/scanner.cc index d5464ae9d8df8554ec1f567b7332263ed0d2fd7c..45cf472e9d070a795e0c43fba73ca0235b205efc 100644 --- a/paddle/fluid/recordio/scanner.cc +++ b/paddle/fluid/recordio/scanner.cc @@ -31,7 +31,7 @@ Scanner::Scanner(const char* paths) } bool Scanner::Scan() { - if (err_ == -1 || end_ == true) { + if (end_ == true) { return false; } if (cur_scanner_ == nullptr) { @@ -39,20 +39,30 @@ bool Scanner::Scan() { end_ = true; return false; } - if (err_ == -1) { - return false; - } } if (!cur_scanner_->Scan()) { - if (err_ == -1) { - return false; - } + end_ = true; + cur_file_ = nullptr; + return false; } - return true; } -bool Scanner::NextFile() {} +bool Scanner::NextFile() { + if (path_idx_ >= paths_.size()) { + return false; + } + std::string path = paths_[path_idx_]; + ++path_idx_; + cur_file_ = Stream::Open(path); + if (cur_file_ == nullptr) { + return false; + } + Index idx; + idx.LoadIndex(cur_file_); + cur_scanner_ = RangeScanner(cur_file_, idx, 0, -1); + return true; +} } // namespace recordio } // namespace paddle