提交 69c79911 编写于 作者: D dongzhihong

"add snappy library"

上级 6540cda1
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
IF(MOBILE_INFERENCE)
return()
ENDIF()
include (ExternalProject)
# NOTE: snappy is needed when linking with recordio
SET(SNAPPY_SOURCES_DIR ${THIRD_PARTY_PATH}/snappy)
SET(SNAPPY_INSTALL_DIR ${THIRD_PARTY_PATH}/install/snappy)
SET(SNAPPY_INCLUDE_DIR "${SNAPPY_INSTALL_DIR}/include/" CACHE PATH "snappy include directory." FORCE)
ExternalProject_Add(
extern_snappy
GIT_REPOSITORY "https://github.com/google/snappy"
GIT_TAG "1.1.7"
PREFIX ${SNAPPY_SOURCES_DIR}
UPDATE_COMMAND ""
CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
-DCMAKE_C_COMPILER=${CMAKE_C_COMPILER}
-DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS}
-DCMAKE_C_FLAGS=${CMAKE_C_FLAGS}
-DCMAKE_INSTALL_PREFIX=${SNAPPY_INSTALL_DIR}
-DCMAKE_INSTALL_LIBDIR=${SNAPPY_INSTALL_DIR}/lib
-DCMAKE_POSITION_INDEPENDENT_CODE=ON
-DBUILD_TESTING=OFF
-DCMAKE_BUILD_TYPE=${THIRD_PARTY_BUILD_TYPE}
${EXTERNAL_OPTIONAL_ARGS}
CMAKE_CACHE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=${SNAPPY_INSTALL_DIR}
-DCMAKE_INSTALL_LIBDIR:PATH=${SNAPPY_INSTALL_DIR}/lib
-DCMAKE_POSITION_INDEPENDENT_CODE:BOOL=ON
-DCMAKE_BUILD_TYPE:STRING=${THIRD_PARTY_BUILD_TYPE}
BUILD_COMMAND make -j8
INSTALL_COMMAND make install
)
add_library(snappy STATIC IMPORTED GLOBAL)
set_property(TARGET snappy PROPERTY IMPORTED_LOCATION
"${SNAPPY_INSTALL_DIR}/lib/libsnappy.a")
include_directories(${SNAPPY_INCLUDE_DIR})
add_dependencies(snappy extern_snappy)
cc_library(header SRCS header.cc) cc_library(header SRCS header.cc)
cc_test(header_test SRCS header_test.cc DEPS header) cc_test(header_test SRCS header_test.cc DEPS header)
cc_library(io SRCS io.cc DEPS stringpiece)
cc_test(io_test SRCS io_test.cc DEPS io)
...@@ -32,9 +32,11 @@ public: ...@@ -32,9 +32,11 @@ public:
bool Dump(std::ostream& os, Compressor ct); bool Dump(std::ostream& os, Compressor ct);
void Parse(std::istream& iss, int64_t offset); void Parse(std::istream& iss, int64_t offset);
const std::string Record(int i) { return records_[i]; } const std::string Record(int i) { return records_[i]; }
size_t NumBytes() { return num_bytes_; }
private: private:
std::vector<std::string> records_; std::vector<std::string> records_;
// sum of record lengths in bytes.
size_t num_bytes_; size_t num_bytes_;
}; };
......
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <fcntl.h>
#include <stdio.h>
#include <unistd.h>
class DefaultFileSys {
public:
private:
};
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
#include "gtest/gtest.h" #include "gtest/gtest.h"
using namespace recordio; using namespace paddle::recordio;
TEST(Recordio, ChunkHead) { TEST(Recordio, ChunkHead) {
Header hdr(0, 1, Compressor::kGzip, 3); Header hdr(0, 1, Compressor::kGzip, 3);
...@@ -32,5 +32,5 @@ TEST(Recordio, ChunkHead) { ...@@ -32,5 +32,5 @@ TEST(Recordio, ChunkHead) {
std::ostringstream oss2; std::ostringstream oss2;
hdr2.Write(oss2); hdr2.Write(oss2);
EXPECT_STREQ(oss2.str().c_str(), oss.str().c_str()); EXPECT_STREQ(oss2.str().c_str(), oss.str().c_str());
EXPECT_EQ(hdr == hdr2); EXPECT_TRUE(hdr == hdr2);
} }
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/recordio/io.h"
#include "paddle/fluid/string/piece.h"
namespace paddle {
namespace recordio {
Stream* Stream::Open(const char* filename, const char* mode) {
// Create IOStream for different filesystems
// HDFS: hdfs://tmp/file.txt
// Default: /tmp/file.txt
FILE* fp = nullptr;
if (string::HasPrefix(string::Piece(filename), string::Piece("/"))) {
fp = fopen(filename, mode);
}
return new FileStream(fp);
}
size_t FileStream::Read(void* ptr, size_t size) {
return fread(ptr, 1, size, fp_);
}
void FileStream::Write(const void* ptr, size_t size) {
size_t real = fwrite(ptr, 1, size, fp_);
PADDLE_ENFORCE(real == size, "FileStream write incomplete.");
}
size_t FileStream::Tell() { return ftell(fp_); }
void FileStream::Seek(size_t p) { fseek(fp_, static_cast<long>(p), SEEK_SET); }
bool FileStream::Eof() { return feof(fp_); }
void FileStream::Close() {
if (fp_ != nullptr) {
fclose(fp_);
fp_ = nullptr;
}
}
} // namespace recordio
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <stdio.h>
#include <string>
#include "paddle/fluid/platform/enforce.h"
namespace paddle {
namespace recordio {
// Stream abstract object for read and write
class Stream {
public:
virtual ~Stream() {}
virtual size_t Read(void* ptr, size_t size);
virtual void Write(const void* ptr, size_t size);
virtual size_t Tell();
virtual void Seek();
// Create Stream Instance
static Stream* Open(const char* filename, const char* mode);
};
// FileStream
class FileStream : public Stream {
public:
explicit FileStream(FILE* fp) : fp_(fp) {}
~FileStream() { this->Close(); }
size_t Read(void* ptr, size_t size);
void Write(const void* ptr, size_t size);
size_t Tell();
void Seek(size_t p);
bool Eof();
void Close();
private:
FILE* fp_;
};
} // namespace recordio
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/recordio/io.h"
#include "gtest/gtest.h"
using namespace paddle::recordio;
TEST(FileStream, IO) {
{
// Write
Stream* fs = Stream::Open("/tmp/record_0", "rw");
fs->Write("hello", 6);
delete fs;
}
{
// Read
Stream* fs = Stream::Open("/tmp/record_0", "r+");
char buf[10];
fs->Read(&buf, 6);
EXPECT_STREQ(buf, "hello");
delete fs;
}
}
...@@ -21,8 +21,9 @@ ...@@ -21,8 +21,9 @@
#include <utility> #include <utility>
#include <vector> #include <vector>
// Scanner class RangeScanner;
// Scanner is a scanner for multiple recordio files.
class Scanner { class Scanner {
public: public:
Scanner(const char* paths); Scanner(const char* paths);
......
...@@ -17,29 +17,36 @@ ...@@ -17,29 +17,36 @@
namespace paddle { namespace paddle {
namespace recordio { namespace recordio {
Writer::Writer(std::ostream& os) Writer::Writer(Stream* fo) : stream_(fo), max_chunk_size_(0), compressor_(0) {}
: stream_(os.rdbuf()), max_chunk_size_(0), compressor_(0) {}
Writer::Writer(std::ostream& os, int maxChunkSize, int compressor) Writer::Writer(Stream* fo, int maxChunkSize, int compressor)
: stream_(os.rdbuf()), : stream_(fo),
max_chunk_size_(maxChunkSize), max_chunk_size_(maxChunkSize),
compressor_(compressor) { compressor_(static_cast<Compressor>(compressor)) {
// clear rdstate
stream_.clear();
chunk_.reset(new Chunk); chunk_.reset(new Chunk);
} }
size_t Writer::Write(const std::string& buf) { return Write(std::string(buf)); } size_t Writer::Write(const std::string& record) {
if (stream_ == nullptr) {
size_t Writer::Write(const char* buf, size_t length) { LOG(WARNING) << "Cannot write since writer had been closed.";
return Write(std::string(buf, length)); return 0;
}
if ((record.size() + chunk_->NumBytes()) > max_chunk_size_) {
chunk_->Dump(stream_, compressor_);
}
chunk_->Add(record);
return record.size();
} }
size_t Writer::Write(std::string&& buf) {} // size_t Writer::Write(const char* buf, size_t length) {
// return Write(std::string(buf, length));
// }
// size_t Writer::Write(std::string&& buf) {}
void Writer::Close() { void Writer::Close() {
stream_.flush(); chunk_->Dump(stream_, compressor_);
stream_.setstate(std::ios::eofbit); stream_ = nullptr;
} }
} // namespace recordio } // namespace recordio
......
...@@ -16,8 +16,9 @@ ...@@ -16,8 +16,9 @@
#include <memory> #include <memory>
#include <string> #include <string>
#include "paddle/fluid/platform/macros.h" // for DISABLE COPY ASSIGN #include "paddle/fluid/platform/macros.h" // DISABLE_COPY_ASSIGN
#include "paddle/fluid/recordio/header.h" #include "paddle/fluid/recordio/header.h"
#include "paddle/fluid/recordio/io.h"
namespace paddle { namespace paddle {
namespace recordio { namespace recordio {
...@@ -25,32 +26,27 @@ namespace recordio { ...@@ -25,32 +26,27 @@ namespace recordio {
// Writer creates a RecordIO file. // Writer creates a RecordIO file.
class Writer { class Writer {
public: public:
Writer(std::ostream& os); Writer(Stream* fo);
Writer(std::ostream& os, int maxChunkSize, int c); Writer(Stream* fo, int maxChunkSize, int c);
// Writes a record. It returns an error if Close has been called. // Writes a record. It returns an error if Close has been called.
size_t Write(const char* buf, size_t length); size_t Write(const char* buf, size_t length);
size_t Write(const std::string& buf);
size_t Write(std::string&& buf);
// Close flushes the current chunk and makes the writer invalid. // Close flushes the current chunk and makes the writer invalid.
void Close(); void Close();
private: private:
// Set rdstate to mark a closed writer // Set nullptr to mark a closed writer
std::ostream stream_; Stream* stream_;
// Chunk for store object
std::unique_ptr<Chunk> chunk_; std::unique_ptr<Chunk> chunk_;
// total records size, excluding metadata, before compression. // total records size, excluding metadata, before compression.
int max_chunk_size_; int max_chunk_size_;
int compressor_; // Compressor used for chuck
Compressor compressor_;
DISABLE_COPY_AND_ASSIGN(Writer); DISABLE_COPY_AND_ASSIGN(Writer);
}; };
template <typename T>
Writer& operator<<(const T& val) {
stream_ << val;
return *this;
}
} // namespace recordio } // namespace recordio
} // namespace paddle } // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/recordio/writer.h"
#include "gtest/gtest.h"
using namespace paddle::recordio;
TEST(Writer, Normal) { Stream }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册