提交 9dc69582 编写于 作者: Y Yu Yang

Make recordio simple

上级 fe183415
......@@ -145,6 +145,7 @@ include(external/pybind11) # download pybind11
include(external/cares)
include(external/grpc)
include(external/snappy) # download snappy
include(external/snappystream)
include(cudnn) # set cudnn libraries, must before configure
include(cupti)
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
IF(MOBILE_INFERENCE)
return()
ENDIF()
include (ExternalProject)
# NOTE: snappy is needed when linking with recordio
SET(SNAPPYSTREAM_SOURCES_DIR ${THIRD_PARTY_PATH}/snappy_stream)
SET(SNAPPYSTREAM_INSTALL_DIR ${THIRD_PARTY_PATH}/install/snappy_stream)
SET(SNAPPYSTREAM_INCLUDE_DIR "${SNAPPYSTREAM_INSTALL_DIR}/include/" CACHE PATH "snappy stream include directory." FORCE)
ExternalProject_Add(
extern_snappystream
GIT_REPOSITORY "https://github.com/hoxnox/snappystream.git"
GIT_TAG "0.2.8"
PREFIX ${SNAPPYSTREAM_SOURCES_DIR}
UPDATE_COMMAND ""
CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
-DCMAKE_C_COMPILER=${CMAKE_C_COMPILER}
-DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS}
-DCMAKE_C_FLAGS=${CMAKE_C_FLAGS}
-DCMAKE_INSTALL_PREFIX=${SNAPPY_INSTALL_DIR}
-DCMAKE_INSTALL_LIBDIR=${SNAPPY_INSTALL_DIR}/lib
-DCMAKE_POSITION_INDEPENDENT_CODE=ON
-DCMAKE_BUILD_TYPE=${THIRD_PARTY_BUILD_TYPE}
-DSNAPPY_ROOT=${SNAPPY_INSTALL_DIR}
${EXTERNAL_OPTIONAL_ARGS}
CMAKE_CACHE_ARGS
-DCMAKE_INSTALL_PREFIX:PATH=${SNAPPYSTREAM_INSTALL_DIR}
-DCMAKE_INSTALL_LIBDIR:PATH=${SNAPPYSTREAM_INSTALL_DIR}/lib
-DCMAKE_BUILD_TYPE:STRING=${THIRD_PARTY_BUILD_TYPE}
BUILD_COMMAND make -j8
INSTALL_COMMAND make install
DEPENDS snappy
)
add_library(snappystream STATIC IMPORTED GLOBAL)
set_property(TARGET snappystream PROPERTY IMPORTED_LOCATION
"${SNAPPYSTREAM_INSTALL_DIR}/lib/libsnappystream.a")
include_directories(${SNAPPYSTREAM_INCLUDE_DIR})
add_dependencies(snappystream extern_snappystream)
# internal library.
cc_library(io SRCS io.cc DEPS stringpiece)
cc_test(io_test SRCS io_test.cc DEPS io)
cc_library(header SRCS header.cc DEPS io)
cc_library(header SRCS header.cc)
cc_test(header_test SRCS header_test.cc DEPS header)
cc_library(chunk SRCS chunk.cc DEPS snappy)
cc_library(chunk SRCS chunk.cc DEPS snappystream snappy header zlib)
cc_test(chunk_test SRCS chunk_test.cc DEPS chunk)
cc_library(range_scanner SRCS range_scanner.cc DEPS io chunk)
cc_test(range_scanner_test SRCS range_scanner_test.cc DEPS range_scanner)
cc_library(scanner SRCS scanner.cc DEPS range_scanner)
cc_test(scanner_test SRCS scanner_test.cc DEPS scanner)
# exported library.
cc_library(recordio SRCS recordio.cc DEPS scanner chunk header)
cc_test(recordio_test SRCS recordio_test.cc DEPS scanner)
cc_library(recordio DEPS chunk header)
......@@ -14,97 +14,119 @@
#include "paddle/fluid/recordio/chunk.h"
#include <cstring>
#include <memory>
#include <sstream>
#include <utility>
#include "snappy.h"
#include "paddle/fluid/recordio/crc32.h"
#include "paddle/fluid/platform/enforce.h"
#include "snappystream.hpp"
#include "zlib.h"
namespace paddle {
namespace recordio {
constexpr size_t kMaxBufSize = 1024;
void Chunk::Add(const char* record, size_t length) {
records_.emplace_after(std::string(record, length));
num_bytes_ += s.size() * sizeof(char);
template <typename Callback>
static void ReadStreamByBuf(std::istream& in, int limit, Callback callback) {
char buf[kMaxBufSize];
std::streamsize actual_size;
size_t counter = 0;
do {
auto actual_max =
limit > 0 ? std::min(limit - counter, kMaxBufSize) : kMaxBufSize;
actual_size = in.readsome(buf, actual_max);
if (actual_size == 0) {
break;
}
callback(buf, actual_size);
if (limit > 0) {
counter += actual_size;
}
} while (actual_size == kMaxBufSize);
}
bool Chunk::Dump(Stream* fo, Compressor ct) {
static void PipeStream(std::istream& in, std::ostream& os) {
ReadStreamByBuf(
in, -1, [&os](const char* buf, size_t len) { os.write(buf, len); });
}
static uint32_t Crc32Stream(std::istream& in, int limit = -1) {
auto crc = crc32(0, nullptr, 0);
ReadStreamByBuf(in, limit, [&crc](const char* buf, size_t len) {
crc = crc32(crc, reinterpret_cast<const Bytef*>(buf), len);
});
return crc;
}
bool Chunk::Write(std::ostream& os, Compressor ct) const {
// NOTE(dzhwinter): don't check records.numBytes instead, because
// empty records are allowed.
if (records_.size() == 0) return false;
if (records_.empty()) {
return false;
}
std::stringstream sout;
std::unique_ptr<std::ostream> compressed_stream;
switch (ct) {
case Compressor::kNoCompress:
break;
case Compressor::kSnappy:
compressed_stream.reset(new snappy::oSnappyStream(sout));
break;
default:
PADDLE_THROW("Not implemented");
}
std::ostream& buf_stream = compressed_stream ? *compressed_stream : sout;
// pack the record into consecutive memory for compress
std::ostringstream os;
for (auto& record : records_) {
os.write(record.size(), sizeof(size_t));
os.write(record.data(), static_cast<std::streamsize>(record.size()));
size_t sz = record.size();
buf_stream.write(reinterpret_cast<const char*>(&sz), sizeof(uint32_t))
.write(record.data(), record.size());
}
std::unique_ptr<char[]> buffer(new char[num_bytes_]);
size_t compressed =
CompressData(os.str().c_str(), num_bytes_, ct, buffer.get());
uint32_t checksum = Crc32(buffer.get(), compressed);
Header hdr(records_.size(), checksum, ct, static_cast<uint32_t>(compressed));
hdr.Write(fo);
fo.Write(buffer.get(), compressed);
// clear the content
records_.clear();
num_bytes_ = 0;
if (compressed_stream) {
compressed_stream.reset();
}
auto end_pos = sout.tellg();
sout.seekg(0, std::ios::beg);
uint32_t len = static_cast<uint32_t>(end_pos - sout.tellg());
uint32_t crc = Crc32Stream(sout);
sout.seekg(0, std::ios::beg);
Header hdr(static_cast<uint32_t>(records_.size()), crc, ct, len);
hdr.Write(os);
PipeStream(sout, os);
return true;
}
void Chunk::Parse(Stream* fi, size_t offset) {
fi->Seek(offset);
void Chunk::Parse(std::istream& sin) {
Header hdr;
hdr.Parse(fi);
size_t size = static_cast<size_t>(hdr.CompressSize());
std::unique_ptr<char[]> buffer(new char[size]);
fi->Read(buffer.get(), size);
size_t deflated_size = 0;
snappy::GetUncompressedLength(buffer.get(), size, &deflated_size);
std::unique_ptr<char[]> deflated_buffer(new char[deflated_size]);
DeflateData(buffer.get(), size, hdr.CompressType(), deflated_buffer.get());
std::istringstream deflated(
std::string(deflated_buffer.get(), deflated_size));
for (size_t i = 0; i < hdr.NumRecords(); ++i) {
size_t rs;
deflated.read(&rs, sizeof(size_t));
std::string record(rs, '\0');
deflated.read(&record[0], rs);
records_.emplace_back(record);
num_bytes_ += record.size();
}
}
hdr.Parse(sin);
auto beg_pos = sin.tellg();
auto crc = Crc32Stream(sin, hdr.CompressSize());
PADDLE_ENFORCE_EQ(hdr.Checksum(), crc);
size_t CompressData(const char* in,
size_t in_length,
Compressor ct,
char* out) {
size_t compressd_size = 0;
switch (ct) {
Clear();
sin.seekg(beg_pos, std::ios::beg);
std::unique_ptr<std::istream> compressed_stream;
switch (hdr.CompressType()) {
case Compressor::kNoCompress:
// do nothing
memcpy(out, in, in_length);
compressd_size = in_length;
break;
case Compressor::kSnappy:
snappy::RawCompress(in, in_length, out, &compressd_size);
compressed_stream.reset(new snappy::iSnappyStream(sin));
break;
default:
PADDLE_THROW("Not implemented");
}
return compressd_size;
}
void DeflateData(const char* in, size_t in_length, Compressor ct, char* out) {
switch (c) {
case Compressor::kNoCompress:
memcpy(out, in, in_length);
break;
case Compressor::kSnappy:
snappy::RawUncompress(in, in_length, out);
break;
std::istream& stream = compressed_stream ? *compressed_stream : sin;
for (uint32_t i = 0; i < hdr.NumRecords(); ++i) {
uint32_t rec_len;
stream.read(reinterpret_cast<char*>(&rec_len), sizeof(uint32_t));
std::string buf;
buf.resize(rec_len);
stream.read(&buf[0], rec_len);
Add(buf);
}
}
......
......@@ -13,11 +13,11 @@
// limitations under the License.
#pragma once
#include <forward_list>
#include <string>
#include <vector>
#include "paddle/fluid/platform/macros.h"
#include "paddle/fluid/recordio/header.h"
#include "paddle/fluid/recordio/io.h"
namespace paddle {
namespace recordio {
......@@ -26,16 +26,23 @@ namespace recordio {
class Chunk {
public:
Chunk() : num_bytes_(0) {}
void Add(const char* record, size_t size);
void Add(std::string buf) {
records_.push_back(buf);
num_bytes_ += buf.size();
}
// dump the chunk into w, and clears the chunk and makes it ready for
// the next add invocation.
bool Dump(Stream* fo, Compressor ct);
void Parse(Stream* fi, size_t offset);
bool Write(std::ostream& fo, Compressor ct) const;
void Clear() {
records_.clear();
num_bytes_ = 0;
}
void Parse(std::istream& sin);
size_t NumBytes() { return num_bytes_; }
const std::string Record(int i) { return records_[i]; }
const std::string& Record(int i) const { return records_[i]; }
private:
std::forward_list<const std::string> records_;
std::vector<std::string> records_;
// sum of record lengths in bytes.
size_t num_bytes_;
DISABLE_COPY_AND_ASSIGN(Chunk);
......
......@@ -22,34 +22,28 @@ using namespace paddle::recordio;
TEST(Chunk, SaveLoad) {
Chunk ch;
ch.Add("12345", 6);
ch.Add("123", 4);
{
Stream* fs = Stream::Open("/tmp/record_11", "w");
ch.Dump(fs, Compressor::kNoCompress);
EXPECT_EQ(ch.NumBytes(), 0);
}
{
Stream* fs = Stream::Open("/tmp/record_11", "r");
ch.Parse(fs, 0);
EXPECT_EQ(ch.NumBytes(), 10);
}
ch.Add(std::string("12345", 6));
ch.Add(std::string("123", 4));
std::stringstream ss;
ch.Write(ss, Compressor::kNoCompress);
ch.Clear();
ch.Parse(ss);
ASSERT_EQ(ch.NumBytes(), 10U);
}
TEST(Chunk, Compressor) {
Chunk ch;
ch.Add("12345", 6);
ch.Add("123", 4);
ch.Add("123", 4);
ch.Add("123", 4);
{
Stream* fs = Stream::Open("/tmp/record_12", "w");
ch.Dump(fs, Compressor::kSnappy);
EXPECT_EQ(ch.NumBytes(), 0);
}
{
Stream* fs = Stream::Open("/tmp/record_12", "r");
ch.Parse(fs, 0);
EXPECT_EQ(ch.NumBytes(), 10);
}
ch.Add(std::string("12345", 6));
ch.Add(std::string("123", 4));
ch.Add(std::string("123", 4));
ch.Add(std::string("123", 4));
std::stringstream ss;
ch.Write(ss, Compressor::kSnappy);
std::stringstream ss2;
ch.Write(ss2, Compressor::kNoCompress);
ASSERT_LE(ss.tellp(), ss2.tellp()); // Compress should contain less data;
ch.Clear();
ch.Parse(ss);
ASSERT_EQ(ch.NumBytes(), 18);
}
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// A wrapper on crc library https://github.com/d-bahr/CRCpp
#include <cstdint>
#include "paddle/fluid/recordio/detail/crc.h"
namespace paddle {
namespace recordio {
// usage
// char data[] = "hello,world";
// crc = Crc32(data, 12);
// Assert_EQ(crc, 68a85159);
uint32_t Crc32(const char* data, size_t size) {
return CRC::Calculate(data, size, CRC::CRC_32())
}
} // namespace recordio
} // namespace paddle
此差异已折叠。
......@@ -26,18 +26,18 @@ Header::Header()
Header::Header(uint32_t num, uint32_t sum, Compressor c, uint32_t cs)
: num_records_(num), checksum_(sum), compressor_(c), compress_size_(cs) {}
void Header::Parse(Stream* iss) {
iss->Read(reinterpret_cast<char*>(&num_records_), sizeof(uint32_t));
iss->Read(reinterpret_cast<char*>(&checksum_), sizeof(uint32_t));
iss->Read(reinterpret_cast<char*>(&compressor_), sizeof(uint32_t));
iss->Read(reinterpret_cast<char*>(&compress_size_), sizeof(uint32_t));
void Header::Parse(std::istream& is) {
is.read(reinterpret_cast<char*>(&num_records_), sizeof(uint32_t))
.read(reinterpret_cast<char*>(&checksum_), sizeof(uint32_t))
.read(reinterpret_cast<char*>(&compressor_), sizeof(uint32_t))
.read(reinterpret_cast<char*>(&compress_size_), sizeof(uint32_t));
}
void Header::Write(Stream* os) {
os->Write(reinterpret_cast<char*>(&num_records_), sizeof(uint32_t));
os->Write(reinterpret_cast<char*>(&checksum_), sizeof(uint32_t));
os->Write(reinterpret_cast<char*>(&compressor_), sizeof(uint32_t));
os->Write(reinterpret_cast<char*>(&compress_size_), sizeof(uint32_t));
void Header::Write(std::ostream& os) const {
os.write(reinterpret_cast<const char*>(&num_records_), sizeof(uint32_t))
.write(reinterpret_cast<const char*>(&checksum_), sizeof(uint32_t))
.write(reinterpret_cast<const char*>(&compressor_), sizeof(uint32_t))
.write(reinterpret_cast<const char*>(&compress_size_), sizeof(uint32_t));
}
std::ostream& operator<<(std::ostream& os, Header h) {
......
......@@ -16,8 +16,6 @@
#include <sstream>
#include "paddle/fluid/recordio/io.h"
namespace paddle {
namespace recordio {
......@@ -26,7 +24,7 @@ constexpr size_t kDefaultMaxChunkSize = 32 * 1024 * 1024;
// MagicNumber for memory checking
constexpr uint32_t kMagicNumber = 0x01020304;
enum class Compressor {
enum class Compressor : uint32_t {
// NoCompression means writing raw chunk data into files.
// With other choices, chunks are compressed before written.
kNoCompress = 0,
......@@ -45,8 +43,8 @@ public:
Header();
Header(uint32_t num, uint32_t sum, Compressor ct, uint32_t cs);
void Write(Stream* os);
void Parse(Stream* iss);
void Write(std::ostream& os) const;
void Parse(std::istream& is);
uint32_t NumRecords() const { return num_records_; }
uint32_t Checksum() const { return checksum_; }
......
......@@ -22,18 +22,10 @@ using namespace paddle::recordio;
TEST(Recordio, ChunkHead) {
Header hdr(0, 1, Compressor::kGzip, 3);
{
Stream* oss = Stream::Open("/tmp/record_1", "w");
hdr.Write(oss);
delete oss;
}
std::stringstream ss;
hdr.Write(ss);
ss.seekg(0, std::ios::beg);
Header hdr2;
{
Stream* iss = Stream::Open("/tmp/record_1", "r");
hdr2.Parse(iss);
delete iss;
}
hdr2.Parse(ss);
EXPECT_TRUE(hdr == hdr2);
}
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/recordio/io.h"
#include "paddle/fluid/string/piece.h"
#include <iostream>
namespace paddle {
namespace recordio {
Stream* Stream::Open(const char* filename, const char* mode) {
// Create IOStream for different filesystems
// HDFS: hdfs://tmp/file.txt
// Default: /tmp/file.txt
FILE* fp = nullptr;
if (string::HasPrefix(string::Piece(filename), string::Piece("/"))) {
fp = fopen(filename, mode);
}
return new FileStream(fp);
}
size_t FileStream::Read(void* ptr, size_t size) {
return fread(ptr, 1, size, fp_);
}
void FileStream::Write(const void* ptr, size_t size) {
size_t real = fwrite(ptr, 1, size, fp_);
PADDLE_ENFORCE(real == size, "FileStream write incomplete.");
}
size_t FileStream::Tell() { return ftell(fp_); }
void FileStream::Seek(size_t p) { fseek(fp_, p, SEEK_SET); }
bool FileStream::Eof() { return feof(fp_); }
void FileStream::Close() {
if (fp_ != nullptr) {
fclose(fp_);
fp_ = nullptr;
}
}
} // namespace recordio
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <stdio.h>
#include <string>
#include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/platform/macros.h" // DISABLE_COPY_ASSIGN
namespace paddle {
namespace recordio {
// Seekable Stream Interface for read and write
class Stream {
public:
virtual ~Stream() {}
virtual size_t Read(void* ptr, size_t size) = 0;
virtual void Write(const void* ptr, size_t size) = 0;
virtual size_t Tell() = 0;
virtual void Seek(size_t p) = 0;
// Create Stream Instance
static Stream* Open(const char* filename, const char* mode);
};
// FileStream
class FileStream : public Stream {
public:
explicit FileStream(FILE* fp) : fp_(fp) {}
~FileStream() { this->Close(); }
size_t Read(void* ptr, size_t size);
void Write(const void* ptr, size_t size);
size_t Tell();
void Seek(size_t p);
bool Eof();
void Close();
private:
FILE* fp_;
DISABLE_COPY_AND_ASSIGN(FileStream);
};
} // namespace recordio
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/recordio/io.h"
#include "gtest/gtest.h"
using namespace paddle::recordio;
TEST(FileStream, IO) {
{
// Write
Stream* fs = Stream::Open("/tmp/record_0", "w");
fs->Write("hello", 6);
delete fs;
}
{
// Read
Stream* fs = Stream::Open("/tmp/record_0", "r+");
char buf[10];
fs->Read(&buf, 6);
EXPECT_STREQ(buf, "hello");
delete fs;
}
}
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/recordio/range_scanner.h"
namespace paddle {
namespace recordio {
void Index::LoadIndex(FileStream* fi) {
int64_t offset = 0;
while (!fi->Eof()) {
Header hdr;
hdr.Parse(fi);
chunk_offsets_.push_back(offset);
chunk_lens_.push_back(hdr.NumRecords());
chunk_records_.push_back(hdr.NumRecords());
num_records_ += hdr.NumRecords();
offset += hdr.CompressSize();
}
}
Index Index::ChunkIndex(int i) { Index idx; }
std::pair<int, int> Index::Locate(int record_idx) {
std::pair<int, int> range(-1, -1);
int sum = 0;
for (size_t i = 0; i < chunk_lens_.size(); ++i) {
int len = static_cast<int>(chunk_lens_[i]);
sum += len;
if (record_idx < sum) {
range.first = static_cast<int>(i);
range.second = record_idx - sum + len;
}
}
return range;
}
RangeScanner::RangeScanner(Stream* fi, Index idx, int start, int len)
: stream_(fi), index_(idx) {
if (start < 0) {
start = 0;
}
if (len < 0 || start + len >= idx.NumRecords()) {
len = idx.NumRecords() - start;
}
start_ = start;
end_ = start + len;
cur_ = start - 1; // The intial status required by Scan
chunk_index_ = -1;
chunk_.reset(new Chunk);
}
bool RangeScanner::Scan() {
++cur_;
if (cur_ >= end_) {
return false;
} else {
auto cursor = index_.Locate(cur_);
if (chunk_index_ != cursor.first) {
chunk_index_ = cursor.first;
chunk_->Parse(fi, index_.ChunkOffsets[chunk_index_]);
}
}
return true;
}
const std::string RangeScanner::Record() {
auto cursor = index_.Locate(cur_);
return chunk_->Record(cursor.second);
}
} // namespace recordio
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <utility>
#include "paddle/fluid/recordio/chunk.h"
#include "paddle/fluid/recordio/io.h"
namespace paddle {
namespace recordio {
// Index consists offsets and sizes of the consequetive chunks in a RecordIO
// file.
//
// Index supports Gob. Every field in the Index needs to be exported
// for the correct encoding and decoding using Gob.
class Index {
public:
Index() : num_records_(0) {}
// LoadIndex scans the file and parse chunkOffsets, chunkLens, and len.
void LoadIndex(Stream* fi);
// NumRecords returns the total number of all records in a RecordIO file.
int NumRecords() { return num_records_; }
// NumChunks returns the total number of chunks in a RecordIO file.
int NumChunks() { return chunk_lens_.size(); }
// ChunkIndex return the Index of i-th Chunk.
int ChunkIndex(int i);
int64_t ChunkOffsets(int i) { return chunk_offsets_[i]; }
// Locate returns the index of chunk that contains the given record,
// and the record index within the chunk. It returns (-1, -1) if the
// record is out of range.
std::pair<int, int> Locate(int record_idx);
private:
// the offset of each chunk in a file.
std::vector<int64_t> chunk_offsets_;
// the length of each chunk in a file.
std::vector<uint32_t> chunk_lens_;
// the numer of all records in a file.
int num_records_;
// the number of records in chunks.
std::vector<int> chunk_records_;
};
// RangeScanner
class RangeScanner {
public:
// creates a scanner that sequencially reads records in the
// range [start, start+len). If start < 0, it scans from the
// beginning. If len < 0, it scans till the end of file.
RangeScanner(Stream* fi, Index idx, int start, int end);
// Scan moves the cursor forward for one record and loads the chunk
// containing the record if not yet.
bool Scan();
const std::string Record();
private:
Stream* fi;
Index index_;
int start_, end_, cur_;
int chunk_index_;
std::unique_ptr<Chunk> chunk_;
};
} // namespace recordio
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/recordio/range_scanner.h"
#include "gtest/gtest.h"
using namespace paddle::recordio;
TEST(RangeScanner, Recordio) {
Stream* fo = Stream::Open("/tmp/record_range", "w");
}
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/recordio/io.h"
#include "paddle/fluid/string/piece.h"
namespace paddle {
namespace recordio {} // namespace recordio
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "paddle/fluid/recordio/chunk.h"
#include "paddle/fluid/recordio/header.h"
#include "paddle/fluid/recordio/io.h"
#include "paddle/fluid/recordio/scanner.h"
#include "paddle/fluid/recordio/writer.h"
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/recordio/chunk.h"
#include <glob.h> // glob
namespace paddle {
namespace recordio {
Scanner::Scanner(const char* paths)
: cur_file_(nullptr), path_idx_(0), end_(false) {
glob_t glob_result;
glob(paths, GLOB_TILDE, NULL, &glob_result);
for (size_t i = 0; i < glob_result.gl_pathc; ++i) {
paths_.emplace_back(std::string(glob_result.gl_pathv[i]));
}
globfree(&glob_result);
}
bool Scanner::Scan() {
if (end_ == true) {
return false;
}
if (cur_scanner_ == nullptr) {
if (!NextFile()) {
end_ = true;
return false;
}
}
if (!cur_scanner_->Scan()) {
end_ = true;
cur_file_ = nullptr;
return false;
}
return true;
}
bool Scanner::NextFile() {
if (path_idx_ >= paths_.size()) {
return false;
}
std::string path = paths_[path_idx_];
++path_idx_;
cur_file_ = Stream::Open(path);
if (cur_file_ == nullptr) {
return false;
}
Index idx;
idx.LoadIndex(cur_file_);
cur_scanner_ = RangeScanner(cur_file_, idx, 0, -1);
return true;
}
} // namespace recordio
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "paddle/fluid/recordio/io.h"
namespace paddle {
namespace recordio {
class RangeScanner;
// Scanner is a scanner for multiple recordio files.
class Scanner {
public:
Scanner(const char* paths);
const std::string Record();
bool Scan();
void Close();
bool NextFile();
int Err() { return err_; }
private:
std::vector<std::string> paths_;
Stream* cur_file_;
RangeScanner* cur_scanner_;
int path_idx_;
bool end_;
int err_;
};
} // namespace recordio
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/recordio/scanner.h"
#include "gtest/gtest.h"
using namespace paddle::recordio;
TEST(Scanner, Normal) { Scanner s("/tmp/record_*"); }
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/recordio/writer.h"
namespace paddle {
namespace recordio {
Writer::Writer(Stream* fo) : stream_(fo), max_chunk_size_(0), compressor_(0) {}
Writer::Writer(Stream* fo, int maxChunkSize, int compressor)
: stream_(fo),
max_chunk_size_(maxChunkSize),
compressor_(static_cast<Compressor>(compressor)) {
chunk_.reset(new Chunk);
}
size_t Writer::Write(const char* buf, size_t length) {
if (stream_ == nullptr) {
LOG(WARNING) << "Cannot write since writer had been closed.";
return 0;
}
if ((length + chunk_->NumBytes()) > max_chunk_size_) {
chunk_->Dump(stream_, compressor_);
}
chunk_->Add(buf, length);
return length;
}
// size_t Writer::Write(const char* buf, size_t length) {
// return Write(std::string(buf, length));
// }
// size_t Writer::Write(std::string&& buf) {}
void Writer::Close() {
chunk_->Dump(stream_, compressor_);
stream_ = nullptr;
}
} // namespace recordio
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <memory>
#include <string>
#include "paddle/fluid/recordio/header.h"
#include "paddle/fluid/recordio/io.h"
namespace paddle {
namespace recordio {
// Writer creates a RecordIO file.
class Writer {
public:
Writer(Stream* fo);
Writer(Stream* fo, int maxChunkSize, int c);
// Writes a record. It returns an error if Close has been called.
size_t Write(const char* buf, size_t length);
// Close flushes the current chunk and makes the writer invalid.
void Close();
private:
// Set nullptr to mark a closed writer
Stream* stream_;
// Chunk for store object
std::unique_ptr<Chunk> chunk_;
// total records size, excluding metadata, before compression.
int max_chunk_size_;
// Compressor used for chuck
Compressor compressor_;
DISABLE_COPY_AND_ASSIGN(Writer);
};
} // namespace recordio
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/recordio/writer.h"
#include "gtest/gtest.h"
using namespace paddle::recordio;
TEST(Writer, Normal) {
Stream* fs = Stream::Open("/tmp/record_21", "w");
Writer w(fs);
w.Write("123", 4);
// test exception
w.Close();
EXPECT_ANY_THROW(w.Write("123", 4));
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册