提交 7016979c 编写于 作者: D dongzhihong

"add crc32 encoder"

上级 69c79911
...@@ -2,3 +2,4 @@ cc_library(header SRCS header.cc) ...@@ -2,3 +2,4 @@ cc_library(header SRCS header.cc)
cc_test(header_test SRCS header_test.cc DEPS header) cc_test(header_test SRCS header_test.cc DEPS header)
cc_library(io SRCS io.cc DEPS stringpiece) cc_library(io SRCS io.cc DEPS stringpiece)
cc_test(io_test SRCS io_test.cc DEPS io) cc_test(io_test SRCS io_test.cc DEPS io)
cc_library(chunk SRCS chunk.cc DEPS snappy)
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/recordio/chunk.h"
#include <cstring>
#include <sstream>
#include <utility>
#include "snappy.h"
#include "paddle/fluid/recordio/crc32.h"
namespace paddle {
namespace recordio {
void Chunk::Add(const char* record, size_t length) {
records_.emplace_after(std::move(s));
num_bytes_ += s.size() * sizeof(char);
}
bool Chunk::Dump(Stream* fo, Compressor ct) {
// NOTE(dzhwinter): don't check records.numBytes instead, because
// empty records are allowed.
if (records_.size() == 0) return false;
// pack the record into consecutive memory for compress
std::ostringstream os;
for (auto& record : records_) {
os.write(record.size(), sizeof(size_t));
os.write(record.data(), static_cast<std::streamsize>(record.size()));
}
std::unique_ptr<char[]> buffer(new char[kDefaultMaxChunkSize]);
size_t compressed =
CompressData(os.str().c_str(), num_bytes_, ct, buffer.get());
uint32_t checksum = Crc32(buffer.get(), compressed);
Header hdr(records_.size(), checksum, ct, static_cast<uint32_t>(compressed));
hdr.Write(fo);
fo.Write(buffer.get(), compressed);
return true;
}
void Chunk::Parse(Stream* fi, size_t offset) {
fi->Seek(offset);
Header hdr;
hdr.Parse(fi);
std::unique_ptr<char[]> buffer(new char[kDefaultMaxChunkSize]);
fi->Read(buffer.get(), static_cast<size_t>(hdr.CompressSize()));
uint32_t deflated_size =
DeflateData(buffer.get(), hdr.CompressSize(), hdr.CompressType());
std::istringstream deflated(std::string(buffer.get(), deflated_size));
for (size_t i = 0; i < hdr.NumRecords(); ++i) {
uint32_t rs;
deflated >> rs;
std::string record(rs, '\0');
deflated.read(&record[0], rs);
records_.emplace_back(record);
num_bytes_ += record.size();
}
}
size_t CompressData(const char* in,
size_t in_length,
Compressor ct,
char* out) {
size_t compressd_size = 0;
switch (ct) {
case Compressor::kNoCompress:
// do nothing
memcpy(out, in, in_length);
compressd_size = in_length;
break;
case Compressor::kSnappy:
snappy::RawCompress(in, in_length, out, &compressd_size);
break;
}
return compressd_size;
}
void DeflateData(const char* in, size_t in_length, Compressor ct, char* out) {
switch (c) {
case Compressor::kNoCompress:
memcpy(out, in, in_length);
break;
case Compressor::kSnappy:
snappy::RawUncompress(in, in_length, out);
break;
}
}
} // namespace recordio
} // namespace paddle
...@@ -13,109 +13,36 @@ ...@@ -13,109 +13,36 @@
// limitations under the License. // limitations under the License.
#pragma once #pragma once
#include <forward_list>
#include <fstream>
#include <memory>
#include <sstream>
#include <string> #include <string>
#include <utility>
#include <vector>
// Chunk #include "paddle/fluid/recordio/header.h"
// a chunk contains the Header and optionally compressed records. #include "paddle/fluid/recordio/io.h"
namespace paddle {
namespace recordio {
// A Chunk contains the Header and optionally compressed records.
class Chunk { class Chunk {
public: public:
Chunk() = default; Chunk() {}
void Add(const char* record, size_t length); void Add(const char* record, size_t size);
void Add(const std::string&); // dump the chunk into w, and clears the chunk and makes it ready for
// the next add invocation.
bool Dump(std::ostream& os, Compressor ct); bool Dump(Stream* fo, Compressor ct);
void Parse(std::istream& iss, int64_t offset); void Parse(Stream* fi, size_t offset);
const std::string Record(int i) { return records_[i]; }
size_t NumBytes() { return num_bytes_; } size_t NumBytes() { return num_bytes_; }
private: private:
std::vector<std::string> records_; std::forward_list<std::string> records_;
// sum of record lengths in bytes. // sum of record lengths in bytes.
size_t num_bytes_; size_t num_bytes_;
DISABLE_COPY_AND_ASSIGN(Chunk);
}; };
size_t CompressData(const std::stringstream& ss, Compressor ct, char* buffer); size_t CompressData(const char* in, size_t in_length, Compressor ct, char* out);
uint32_t DeflateData(char* buffer, uint32_t size, Compressor c);
// implementation
void Chunk::Add(const std::string& s) {
num_bytes_ += s.size() * sizeof(char);
records_.emplace_back(std::move(s));
// records_.resize(records_.size()+1);
// records_[records_.size()-1] = s;
}
void Chunk::Add(const char* record, size_t length) {
Add(std::string(record, length));
}
bool Chunk::Dump(std::ostream& os, Compressor ct) {
if (records_.size() == 0) return false;
// TODO(dzhwinter):
// we pack the string with same size buffer,
// then compress with another buffer.
// Here can be optimized if it is the bottle-neck.
std::ostringstream oss;
for (auto& record : records_) {
unsigned len = record.size();
oss << len;
oss << record;
// os.write(std::to_string(len).c_str(), sizeof(unsigned));
// os.write(record.c_str(), record.size());
}
std::unique_ptr<char[]> buffer(new char[kDefaultMaxChunkSize]);
size_t compressed = CompressData(oss.str(), ct, buffer.get());
// TODO(dzhwinter): crc32 checksum
size_t checksum = compressed;
Header hdr(records_.size(), checksum, ct, compressed);
return true;
}
void Chunk::Parse(std::istream& iss, int64_t offset) {
iss.seekg(offset, iss.beg);
Header hdr;
hdr.Parse(iss);
std::unique_ptr<char[]> buffer(new char[kDefaultMaxChunkSize]); void DeflateData(const char* in, size_t in_length, Compressor ct, char* out);
iss.read(buffer.get(), static_cast<size_t>(hdr.CompressSize()));
// TODO(dzhwinter): checksum
uint32_t deflated_size =
DeflateData(buffer.get(), hdr.CompressSize(), hdr.CompressType());
std::istringstream deflated(std::string(buffer.get(), deflated_size));
for (size_t i = 0; i < hdr.NumRecords(); ++i) {
uint32_t rs;
deflated >> rs;
std::string record(rs, '\0');
deflated.read(&record[0], rs);
records_.emplace_back(record);
num_bytes_ += record.size();
}
}
uint32_t DeflateData(char* buffer, uint32_t size, Compressor c) { } // namespace recordio
uint32_t deflated_size = 0; } // namespace paddle
std::string uncompressed;
switch (c) {
case Compressor::kNoCompress:
deflated_size = size;
break;
case Compressor::kSnappy:
// snappy::Uncompress(buffer, size, &uncompressed);
// deflated_size = uncompressed.size();
// memcpy(buffer, uncompressed.data(), uncompressed.size() *
// sizeof(char));
break;
}
return deflated_size;
}
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/recordio/chunk.h"
#include <sstream>
#include "gtest/gtest.h"
using namespace paddle::recordio;
TEST(Chunk, SaveLoad) {}
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// A wrapper on crc library https://github.com/d-bahr/CRCpp
#include <cstdint>
#include "paddle/fluid/recordio/detail/crc.h"
namespace paddle {
namespace recordio {
// usage
// char data[] = "hello,world";
// crc = Crc32(data, 12);
// Assert_EQ(crc, 68a85159);
uint32_t Crc32(const char* data, size_t size) {
return CRC::Calculate(data, size, CRC::CRC_32())
}
} // namespace recordio
} // namespace paddle
此差异已折叠。
...@@ -26,18 +26,18 @@ Header::Header() ...@@ -26,18 +26,18 @@ Header::Header()
Header::Header(uint32_t num, uint32_t sum, Compressor c, uint32_t cs) Header::Header(uint32_t num, uint32_t sum, Compressor c, uint32_t cs)
: num_records_(num), checksum_(sum), compressor_(c), compress_size_(cs) {} : num_records_(num), checksum_(sum), compressor_(c), compress_size_(cs) {}
void Header::Parse(std::istream& iss) { void Header::Parse(Stream* iss) {
iss.read(reinterpret_cast<char*>(&num_records_), sizeof(uint32_t)); iss.Read(reinterpret_cast<char*>(&num_records_), sizeof(uint32_t));
iss.read(reinterpret_cast<char*>(&checksum_), sizeof(uint32_t)); iss.Read(reinterpret_cast<char*>(&checksum_), sizeof(uint32_t));
iss.read(reinterpret_cast<char*>(&compressor_), sizeof(uint32_t)); iss.Read(reinterpret_cast<char*>(&compressor_), sizeof(uint32_t));
iss.read(reinterpret_cast<char*>(&compress_size_), sizeof(uint32_t)); iss.Read(reinterpret_cast<char*>(&compress_size_), sizeof(uint32_t));
} }
void Header::Write(std::ostream& os) { void Header::Write(Stream* os) {
os.write(reinterpret_cast<char*>(&num_records_), sizeof(uint32_t)); os.Write(reinterpret_cast<char*>(&num_records_), sizeof(uint32_t));
os.write(reinterpret_cast<char*>(&checksum_), sizeof(uint32_t)); os.Write(reinterpret_cast<char*>(&checksum_), sizeof(uint32_t));
os.write(reinterpret_cast<char*>(&compressor_), sizeof(uint32_t)); os.Write(reinterpret_cast<char*>(&compressor_), sizeof(uint32_t));
os.write(reinterpret_cast<char*>(&compress_size_), sizeof(uint32_t)); os.Write(reinterpret_cast<char*>(&compress_size_), sizeof(uint32_t));
} }
// std::ostream& operator << (std::ostream& os, Header h) { // std::ostream& operator << (std::ostream& os, Header h) {
...@@ -54,28 +54,8 @@ std::ostream& operator<<(std::ostream& os, Header h) { ...@@ -54,28 +54,8 @@ std::ostream& operator<<(std::ostream& os, Header h) {
return os; return os;
} }
// bool operator==(Header l, Header r) {
// return num_records_ == rhs.NumRecords() &&
// checksum_ == rhs.Checksum() &&
// compressor_ == rhs.CompressType() &&
// compress_size_ == rhs.CompressSize();
// }
bool operator==(Header l, Header r) { bool operator==(Header l, Header r) {
return l.NumRecords() == r.NumRecords() && l.Checksum() == r.Checksum() && return l.NumRecords() == r.NumRecords() && l.Checksum() == r.Checksum() &&
l.CompressType() == r.CompressType() && l.CompressType() == r.CompressType() &&
l.CompressSize() == r.CompressSize(); l.CompressSize() == r.CompressSize();
} }
// size_t CompressData(const std::string& os, Compressor ct, char* buffer) {
// size_t compress_size = 0;
// // std::unique_ptr<char[]> buffer(new char[kDefaultMaxChunkSize]);
// // std::string compressed;
// compress_size =os.size();
// memcpy(buffer, os.c_str(), compress_size);
// return compress_size;
// }
} // namespace recordio
} // namespace paddle
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
#include <sstream> #include <sstream>
#include "paddle/fluid/recordio/io.h"
namespace paddle { namespace paddle {
namespace recordio { namespace recordio {
...@@ -43,8 +45,8 @@ public: ...@@ -43,8 +45,8 @@ public:
Header(); Header();
Header(uint32_t num, uint32_t sum, Compressor ct, uint32_t cs); Header(uint32_t num, uint32_t sum, Compressor ct, uint32_t cs);
void Write(std::ostream& os); void Write(Stream* os);
void Parse(std::istream& iss); void Parse(Stream* iss);
uint32_t NumRecords() const { return num_records_; } uint32_t NumRecords() const { return num_records_; }
uint32_t Checksum() const { return checksum_; } uint32_t Checksum() const { return checksum_; }
......
...@@ -22,15 +22,12 @@ using namespace paddle::recordio; ...@@ -22,15 +22,12 @@ using namespace paddle::recordio;
TEST(Recordio, ChunkHead) { TEST(Recordio, ChunkHead) {
Header hdr(0, 1, Compressor::kGzip, 3); Header hdr(0, 1, Compressor::kGzip, 3);
std::ostringstream oss; Stream* oss = Stream::Open("/tmp/record_1", "w");
hdr.Write(oss); hdr.Write(oss);
std::istringstream iss(oss.str()); Stream* iss = Stream::Open("/tmp/record_1", "r");
Header hdr2; Header hdr2;
hdr2.Parse(iss); hdr2.Parse(iss);
std::ostringstream oss2;
hdr2.Write(oss2);
EXPECT_STREQ(oss2.str().c_str(), oss.str().c_str());
EXPECT_TRUE(hdr == hdr2); EXPECT_TRUE(hdr == hdr2);
} }
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
#include "paddle/fluid/recordio/io.h" #include "paddle/fluid/recordio/io.h"
#include "paddle/fluid/string/piece.h" #include "paddle/fluid/string/piece.h"
#include <iostream>
namespace paddle { namespace paddle {
namespace recordio { namespace recordio {
Stream* Stream::Open(const char* filename, const char* mode) { Stream* Stream::Open(const char* filename, const char* mode) {
...@@ -38,7 +40,7 @@ void FileStream::Write(const void* ptr, size_t size) { ...@@ -38,7 +40,7 @@ void FileStream::Write(const void* ptr, size_t size) {
} }
size_t FileStream::Tell() { return ftell(fp_); } size_t FileStream::Tell() { return ftell(fp_); }
void FileStream::Seek(size_t p) { fseek(fp_, static_cast<long>(p), SEEK_SET); } void FileStream::Seek(size_t p) { fseek(fp_, p, SEEK_SET); }
bool FileStream::Eof() { return feof(fp_); } bool FileStream::Eof() { return feof(fp_); }
......
...@@ -16,19 +16,21 @@ ...@@ -16,19 +16,21 @@
#include <stdio.h> #include <stdio.h>
#include <string> #include <string>
#include "paddle/fluid/platform/enforce.h" #include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/platform/macros.h" // DISABLE_COPY_ASSIGN
namespace paddle { namespace paddle {
namespace recordio { namespace recordio {
// Stream abstract object for read and write // Seekable Stream Interface for read and write
class Stream { class Stream {
public: public:
virtual ~Stream() {} virtual ~Stream() {}
virtual size_t Read(void* ptr, size_t size); virtual size_t Read(void* ptr, size_t size) = 0;
virtual void Write(const void* ptr, size_t size); virtual void Write(const void* ptr, size_t size) = 0;
virtual size_t Tell(); virtual size_t Tell() = 0;
virtual void Seek(); virtual void Seek(size_t p) = 0;
// Create Stream Instance // Create Stream Instance
static Stream* Open(const char* filename, const char* mode); static Stream* Open(const char* filename, const char* mode);
}; };
...@@ -47,6 +49,7 @@ public: ...@@ -47,6 +49,7 @@ public:
private: private:
FILE* fp_; FILE* fp_;
DISABLE_COPY_AND_ASSIGN(FileStream);
}; };
} // namespace recordio } // namespace recordio
......
...@@ -21,7 +21,7 @@ using namespace paddle::recordio; ...@@ -21,7 +21,7 @@ using namespace paddle::recordio;
TEST(FileStream, IO) { TEST(FileStream, IO) {
{ {
// Write // Write
Stream* fs = Stream::Open("/tmp/record_0", "rw"); Stream* fs = Stream::Open("/tmp/record_0", "w");
fs->Write("hello", 6); fs->Write("hello", 6);
delete fs; delete fs;
} }
......
...@@ -26,16 +26,16 @@ Writer::Writer(Stream* fo, int maxChunkSize, int compressor) ...@@ -26,16 +26,16 @@ Writer::Writer(Stream* fo, int maxChunkSize, int compressor)
chunk_.reset(new Chunk); chunk_.reset(new Chunk);
} }
size_t Writer::Write(const std::string& record) { size_t Writer::Write(const char* buf, size_t length) {
if (stream_ == nullptr) { if (stream_ == nullptr) {
LOG(WARNING) << "Cannot write since writer had been closed."; LOG(WARNING) << "Cannot write since writer had been closed.";
return 0; return 0;
} }
if ((record.size() + chunk_->NumBytes()) > max_chunk_size_) { if ((length + chunk_->NumBytes()) > max_chunk_size_) {
chunk_->Dump(stream_, compressor_); chunk_->Dump(stream_, compressor_);
} }
chunk_->Add(record); chunk_->Add(buf, length);
return record.size(); return length;
} }
// size_t Writer::Write(const char* buf, size_t length) { // size_t Writer::Write(const char* buf, size_t length) {
......
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
#include <memory> #include <memory>
#include <string> #include <string>
#include "paddle/fluid/platform/macros.h" // DISABLE_COPY_ASSIGN
#include "paddle/fluid/recordio/header.h" #include "paddle/fluid/recordio/header.h"
#include "paddle/fluid/recordio/io.h" #include "paddle/fluid/recordio/io.h"
...@@ -44,7 +43,6 @@ private: ...@@ -44,7 +43,6 @@ private:
int max_chunk_size_; int max_chunk_size_;
// Compressor used for chuck // Compressor used for chuck
Compressor compressor_; Compressor compressor_;
DISABLE_COPY_AND_ASSIGN(Writer); DISABLE_COPY_AND_ASSIGN(Writer);
}; };
......
...@@ -18,4 +18,4 @@ ...@@ -18,4 +18,4 @@
using namespace paddle::recordio; using namespace paddle::recordio;
TEST(Writer, Normal) { Stream } TEST(Writer, Normal) {}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册