diff --git a/paddle/fluid/CMakeLists.txt b/paddle/fluid/CMakeLists.txt index 7405ef17d3e01e4837412c96385e13a9e605a75b..d725763b01d5953985f8e090605f68a8419b5498 100644 --- a/paddle/fluid/CMakeLists.txt +++ b/paddle/fluid/CMakeLists.txt @@ -5,3 +5,4 @@ add_subdirectory(operators) add_subdirectory(pybind) add_subdirectory(inference) add_subdirectory(string) +add_subdirectory(recordio) diff --git a/paddle/fluid/recordio/CMakeLists.txt b/paddle/fluid/recordio/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..37c3214ff8673fee7a417de9b19af6df6ab57569 --- /dev/null +++ b/paddle/fluid/recordio/CMakeLists.txt @@ -0,0 +1,2 @@ +cc_library(header SRCS header.cc) +cc_test(header_test SRCS header_test.cc DEPS header) diff --git a/paddle/fluid/recordio/chunk.h b/paddle/fluid/recordio/chunk.h new file mode 100644 index 0000000000000000000000000000000000000000..77c0ae81b74207549e3a1b9319fc918bba4ee12e --- /dev/null +++ b/paddle/fluid/recordio/chunk.h @@ -0,0 +1,119 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include + +// Chunk +// a chunk contains the Header and optionally compressed records. +class Chunk { +public: + Chunk() = default; + void Add(const char* record, size_t length); + void Add(const std::string&); + + bool Dump(std::ostream& os, Compressor ct); + void Parse(std::istream& iss, int64_t offset); + const std::string Record(int i) { return records_[i]; } + +private: + std::vector records_; + size_t num_bytes_; +}; + +size_t CompressData(const std::stringstream& ss, Compressor ct, char* buffer); + +uint32_t DeflateData(char* buffer, uint32_t size, Compressor c); + +// implementation +void Chunk::Add(const std::string& s) { + num_bytes_ += s.size() * sizeof(char); + records_.emplace_back(std::move(s)); + // records_.resize(records_.size()+1); + // records_[records_.size()-1] = s; +} + +void Chunk::Add(const char* record, size_t length) { + Add(std::string(record, length)); +} + +bool Chunk::Dump(std::ostream& os, Compressor ct) { + if (records_.size() == 0) return false; + + // TODO(dzhwinter): + // we pack the string with same size buffer, + // then compress with another buffer. + // Here can be optimized if it is the bottle-neck. + std::ostringstream oss; + for (auto& record : records_) { + unsigned len = record.size(); + oss << len; + oss << record; + // os.write(std::to_string(len).c_str(), sizeof(unsigned)); + // os.write(record.c_str(), record.size()); + } + std::unique_ptr buffer(new char[kDefaultMaxChunkSize]); + size_t compressed = CompressData(oss.str(), ct, buffer.get()); + + // TODO(dzhwinter): crc32 checksum + size_t checksum = compressed; + + Header hdr(records_.size(), checksum, ct, compressed); + + return true; +} + +void Chunk::Parse(std::istream& iss, int64_t offset) { + iss.seekg(offset, iss.beg); + Header hdr; + hdr.Parse(iss); + + std::unique_ptr buffer(new char[kDefaultMaxChunkSize]); + iss.read(buffer.get(), static_cast(hdr.CompressSize())); + // TODO(dzhwinter): checksum + uint32_t deflated_size = + DeflateData(buffer.get(), hdr.CompressSize(), hdr.CompressType()); + std::istringstream deflated(std::string(buffer.get(), deflated_size)); + for (size_t i = 0; i < hdr.NumRecords(); ++i) { + uint32_t rs; + deflated >> rs; + std::string record(rs, '\0'); + deflated.read(&record[0], rs); + records_.emplace_back(record); + num_bytes_ += record.size(); + } +} + +uint32_t DeflateData(char* buffer, uint32_t size, Compressor c) { + uint32_t deflated_size = 0; + std::string uncompressed; + switch (c) { + case Compressor::kNoCompress: + deflated_size = size; + break; + case Compressor::kSnappy: + // snappy::Uncompress(buffer, size, &uncompressed); + // deflated_size = uncompressed.size(); + // memcpy(buffer, uncompressed.data(), uncompressed.size() * + // sizeof(char)); + break; + } + return deflated_size; +} diff --git a/paddle/fluid/recordio/header.cc b/paddle/fluid/recordio/header.cc new file mode 100644 index 0000000000000000000000000000000000000000..c82d05c3a2573bb8dbfcfaf40d80727695beb556 --- /dev/null +++ b/paddle/fluid/recordio/header.cc @@ -0,0 +1,81 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/recordio/header.h" + +namespace paddle { +namespace recordio { + +Header::Header() + : num_records_(0), + checksum_(0), + compressor_(Compressor::kNoCompress), + compress_size_(0) {} + +Header::Header(uint32_t num, uint32_t sum, Compressor c, uint32_t cs) + : num_records_(num), checksum_(sum), compressor_(c), compress_size_(cs) {} + +void Header::Parse(std::istream& iss) { + iss.read(reinterpret_cast(&num_records_), sizeof(uint32_t)); + iss.read(reinterpret_cast(&checksum_), sizeof(uint32_t)); + iss.read(reinterpret_cast(&compressor_), sizeof(uint32_t)); + iss.read(reinterpret_cast(&compress_size_), sizeof(uint32_t)); +} + +void Header::Write(std::ostream& os) { + os.write(reinterpret_cast(&num_records_), sizeof(uint32_t)); + os.write(reinterpret_cast(&checksum_), sizeof(uint32_t)); + os.write(reinterpret_cast(&compressor_), sizeof(uint32_t)); + os.write(reinterpret_cast(&compress_size_), sizeof(uint32_t)); +} + +// std::ostream& operator << (std::ostream& os, Header h) { +// os << h.num_records_ +// << h.checksum_ +// << static_cast(h.compressor_) +// << h.compress_size_; +// return os; +// } + +std::ostream& operator<<(std::ostream& os, Header h) { + os << h.NumRecords() << h.Checksum() + << static_cast(h.CompressType()) << h.CompressSize(); + return os; +} + +// bool operator==(Header l, Header r) { +// return num_records_ == rhs.NumRecords() && +// checksum_ == rhs.Checksum() && +// compressor_ == rhs.CompressType() && +// compress_size_ == rhs.CompressSize(); +// } + +bool operator==(Header l, Header r) { + return l.NumRecords() == r.NumRecords() && l.Checksum() == r.Checksum() && + l.CompressType() == r.CompressType() && + l.CompressSize() == r.CompressSize(); +} + +// size_t CompressData(const std::string& os, Compressor ct, char* buffer) { +// size_t compress_size = 0; + +// // std::unique_ptr buffer(new char[kDefaultMaxChunkSize]); +// // std::string compressed; +// compress_size =os.size(); +// memcpy(buffer, os.c_str(), compress_size); +// return compress_size; +// } + +} // namespace recordio +} // namespace paddle diff --git a/paddle/fluid/recordio/header.h b/paddle/fluid/recordio/header.h new file mode 100644 index 0000000000000000000000000000000000000000..92c040617dba2e31b74aefd747be59aaba4302dd --- /dev/null +++ b/paddle/fluid/recordio/header.h @@ -0,0 +1,66 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace paddle { +namespace recordio { + +// Default ChunkSize +constexpr size_t kDefaultMaxChunkSize = 32 * 1024 * 1024; +// MagicNumber for memory checking +constexpr uint32_t kMagicNumber = 0x01020304; + +enum class Compressor { + // NoCompression means writing raw chunk data into files. + // With other choices, chunks are compressed before written. + kNoCompress = 0, + // Snappy had been the default compressing algorithm widely + // used in Google. It compromises between speech and + // compression ratio. + kSnappy = 1, + // Gzip is a well-known compression algorithm. It is + // recommmended only you are looking for compression ratio. + kGzip = 2, +}; + +// Header is the metadata of Chunk +class Header { +public: + Header(); + Header(uint32_t num, uint32_t sum, Compressor ct, uint32_t cs); + + void Write(std::ostream& os); + void Parse(std::istream& iss); + + uint32_t NumRecords() const { return num_records_; } + uint32_t Checksum() const { return checksum_; } + Compressor CompressType() const { return compressor_; } + uint32_t CompressSize() const { return compress_size_; } + +private: + uint32_t num_records_; + uint32_t checksum_; + Compressor compressor_; + uint32_t compress_size_; +}; + +// Allow Header Loggable +std::ostream& operator<<(std::ostream& os, Header h); +bool operator==(Header l, Header r); + +} // namespace recordio +} // namespace paddle diff --git a/paddle/fluid/recordio/header_test.cc b/paddle/fluid/recordio/header_test.cc new file mode 100644 index 0000000000000000000000000000000000000000..ae8201ab00a2e6156b35b45f9c07e2b1f032bc39 --- /dev/null +++ b/paddle/fluid/recordio/header_test.cc @@ -0,0 +1,45 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/recordio/header.h" + +#include + +#include "gtest/gtest.h" + +using namespace recordio; + +TEST(Recordio, ChunkHead) { + Header hdr(0, 1, Compressor::kGzip, 3); + std::ostringstream oss; + hdr.Write(oss); + + std::istringstream iss(oss.str()); + Header hdr2; + hdr2.Parse(iss); + + std::ostringstream oss2; + hdr2.Write(oss2); + EXPECT_STREQ(oss2.str().c_str(), oss.str().c_str()); +} + +TEST(Recordio, Stream) { + Header hdr(0, 1, static_cast(2), 3); + std::ostringstream oss1; + hdr.Write(oss1); + + std::ostringstream oss2; + oss2 << hdr; + EXPECT_STREQ(oss2.str().c_str(), oss1.str().c_str()); +} diff --git a/paddle/fluid/recordio/range_scanner.h b/paddle/fluid/recordio/range_scanner.h new file mode 100644 index 0000000000000000000000000000000000000000..44b1b49abc2bf1bb969da8a65fc49526d4a7c17e --- /dev/null +++ b/paddle/fluid/recordio/range_scanner.h @@ -0,0 +1,69 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include + +class Index { +public: + int NumRecords() { return num_records_; } + + // Locate returns the index of chunk that contains the given record, + // and the record index within the chunk. It returns (-1, -1) if the + // record is out of range. + void Locate(int record_idx, std::pair* out) { + size_t sum = 0; + for (size_t i = 0; i < chunk_lens_.size(); ++i) { + sum += chunk_lens_[i]; + if (static_cast(record_idx) < sum) { + out->first = i; + out->second = record_idx - sum + chunk_lens_[i]; + return; + } + } + // out->swap(std::make_pair(-1, -1)); + out->first = -1; + out->second = -1; + } + +private: + std::vector chunk_offsets_; + std::vector chunk_lens_; + int num_records_; + std::vector chunk_records_; +}; + +// RangeScanner +// creates a scanner that sequencially reads records in the +// range [start, start+len). If start < 0, it scans from the +// beginning. If len < 0, it scans till the end of file. +class RangeScanner { +public: + RangeScanner(std::istream is, Index idx, int start, int end); + bool Scan(); + const std::string Record(); + +private: + std::istream stream_; + Index index_; + int start_, end_, cur_; + int chunk_index_; + std::unique_ptr chunk_; +}; diff --git a/paddle/fluid/recordio/scanner.h b/paddle/fluid/recordio/scanner.h new file mode 100644 index 0000000000000000000000000000000000000000..8bcdea3c6fe590062e6f9ab6b9f778ec42556897 --- /dev/null +++ b/paddle/fluid/recordio/scanner.h @@ -0,0 +1,44 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include + +// Scanner + +class Scanner { +public: + Scanner(const char* paths); + const std::string Record(); + bool Scan(); + void Close(); + +private: + bool NextFile(); + int Err() { return err_; } + +private: + std::vector paths_; + FILE* cur_file_; + RangeScanner* cur_scanner_; + int path_idx_; + bool end_; + int err_; +}; diff --git a/paddle/fluid/recordio/writer.cc b/paddle/fluid/recordio/writer.cc new file mode 100644 index 0000000000000000000000000000000000000000..9383199889d5ec14d7a02656f2f01eda636a5728 --- /dev/null +++ b/paddle/fluid/recordio/writer.cc @@ -0,0 +1,45 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/recordio/writer.h" + +namespace paddle { +namespace recordio { + +Writer::Writer(std::ostream& os) + : stream_(os.rdbuf()), max_chunk_size_(0), compressor_(0) {} + +Writer::Writer(std::ostream& os, int maxChunkSize, int compressor) + : stream_(os.rdbuf()), + max_chunk_size_(maxChunkSize), + compressor_(compressor) { + // clear rdstate + stream_.clear(); + chunk_.reset(new Chunk); +} + +size_t Writer::Write(const std::string& buf) {} + +size_t Writer::Write(const char* buf, size_t length) { + // std::string s(buf, length); + Write(std::string(buf, length)); +} + +void Writer::Close() { + stream_.flush(); + stream_.setstate(std::ios::eofbit); +} + +} // namespace recordio +} // namespace paddle diff --git a/paddle/fluid/recordio/writer.h b/paddle/fluid/recordio/writer.h new file mode 100644 index 0000000000000000000000000000000000000000..49b86a6a28a819c238c7d3132c6d58610e71d431 --- /dev/null +++ b/paddle/fluid/recordio/writer.h @@ -0,0 +1,56 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once +#include +#include + +#include "paddle/fluid/platform/macros.h" // for DISABLE COPY ASSIGN +#include "paddle/fluid/recordio/header.h" + +namespace paddle { +namespace recordio { + +// Writer creates a RecordIO file. +class Writer { +public: + Writer(std::ostream& os); + Writer(std::ostream& os, int maxChunkSize, int c); + + // Writes a record. It returns an error if Close has been called. + size_t Write(const char* buf, size_t length); + size_t Write(const std::string& buf); + size_t Write(std::string&& buf); + + // Close flushes the current chunk and makes the writer invalid. + void Close(); + +private: + // Set rdstate to mark a closed writer + std::ostream stream_; + std::unique_ptr chunk_; + // total records size, excluding metadata, before compression. + int max_chunk_size_; + int compressor_; + DISABLE_COPY_AND_ASSIGN(Writer); +}; + +template +Writer& operator<<(const T& val) { + stream_ << val; + return *this; +} + +} // namespace recordio +} // namespace paddle