diff --git a/CMakeLists.txt b/CMakeLists.txt index 469af0f7859b9ea79d1fc4c53e19cc29bfe28ce8..c86889c05c8cf0d521dce9adbf3e918ba91729a1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -144,6 +144,8 @@ include(external/eigen) # download eigen3 include(external/pybind11) # download pybind11 include(external/cares) include(external/grpc) +include(external/snappy) # download snappy +include(external/snappystream) include(cudnn) # set cudnn libraries, must before configure include(cupti) diff --git a/cmake/external/snappy.cmake b/cmake/external/snappy.cmake new file mode 100644 index 0000000000000000000000000000000000000000..2c109727cfa641b16ba01d6392613fa497b95902 --- /dev/null +++ b/cmake/external/snappy.cmake @@ -0,0 +1,57 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +IF(MOBILE_INFERENCE) + return() +ENDIF() + +include (ExternalProject) + +# NOTE: snappy is needed when linking with recordio + +SET(SNAPPY_SOURCES_DIR ${THIRD_PARTY_PATH}/snappy) +SET(SNAPPY_INSTALL_DIR ${THIRD_PARTY_PATH}/install/snappy) +SET(SNAPPY_INCLUDE_DIR "${SNAPPY_INSTALL_DIR}/include/" CACHE PATH "snappy include directory." FORCE) + +ExternalProject_Add( + extern_snappy + GIT_REPOSITORY "https://github.com/google/snappy" + GIT_TAG "1.1.7" + PREFIX ${SNAPPY_SOURCES_DIR} + UPDATE_COMMAND "" + CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER} + -DCMAKE_C_COMPILER=${CMAKE_C_COMPILER} + -DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS} + -DCMAKE_C_FLAGS=${CMAKE_C_FLAGS} + -DCMAKE_INSTALL_PREFIX=${SNAPPY_INSTALL_DIR} + -DCMAKE_INSTALL_LIBDIR=${SNAPPY_INSTALL_DIR}/lib + -DCMAKE_POSITION_INDEPENDENT_CODE=ON + -DBUILD_TESTING=OFF + -DCMAKE_BUILD_TYPE=${THIRD_PARTY_BUILD_TYPE} + ${EXTERNAL_OPTIONAL_ARGS} + CMAKE_CACHE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=${SNAPPY_INSTALL_DIR} + -DCMAKE_INSTALL_LIBDIR:PATH=${SNAPPY_INSTALL_DIR}/lib + -DCMAKE_POSITION_INDEPENDENT_CODE:BOOL=ON + -DCMAKE_BUILD_TYPE:STRING=${THIRD_PARTY_BUILD_TYPE} + BUILD_COMMAND make -j8 + INSTALL_COMMAND make install +) + +add_library(snappy STATIC IMPORTED GLOBAL) +set_property(TARGET snappy PROPERTY IMPORTED_LOCATION + "${SNAPPY_INSTALL_DIR}/lib/libsnappy.a") + +include_directories(${SNAPPY_INCLUDE_DIR}) +add_dependencies(snappy extern_snappy) diff --git a/cmake/external/snappystream.cmake b/cmake/external/snappystream.cmake new file mode 100644 index 0000000000000000000000000000000000000000..5377a0b046a796cd6f0bb1fb466e1cd0b4b678bf --- /dev/null +++ b/cmake/external/snappystream.cmake @@ -0,0 +1,58 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +IF(MOBILE_INFERENCE) + return() +ENDIF() + +include (ExternalProject) + +# NOTE: snappy is needed when linking with recordio + +SET(SNAPPYSTREAM_SOURCES_DIR ${THIRD_PARTY_PATH}/snappy_stream) +SET(SNAPPYSTREAM_INSTALL_DIR ${THIRD_PARTY_PATH}/install/snappy_stream) +SET(SNAPPYSTREAM_INCLUDE_DIR "${SNAPPYSTREAM_INSTALL_DIR}/include/" CACHE PATH "snappy stream include directory." FORCE) + +ExternalProject_Add( + extern_snappystream + GIT_REPOSITORY "https://github.com/hoxnox/snappystream.git" + GIT_TAG "0.2.8" + PREFIX ${SNAPPYSTREAM_SOURCES_DIR} + UPDATE_COMMAND "" + CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER} + -DCMAKE_C_COMPILER=${CMAKE_C_COMPILER} + -DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS} + -DCMAKE_C_FLAGS=${CMAKE_C_FLAGS} + -DCMAKE_INSTALL_PREFIX=${SNAPPY_INSTALL_DIR} + -DCMAKE_INSTALL_LIBDIR=${SNAPPY_INSTALL_DIR}/lib + -DCMAKE_POSITION_INDEPENDENT_CODE=ON + -DCMAKE_BUILD_TYPE=${THIRD_PARTY_BUILD_TYPE} + -DSNAPPY_ROOT=${SNAPPY_INSTALL_DIR} + ${EXTERNAL_OPTIONAL_ARGS} + CMAKE_CACHE_ARGS + -DCMAKE_INSTALL_PREFIX:PATH=${SNAPPYSTREAM_INSTALL_DIR} + -DCMAKE_INSTALL_LIBDIR:PATH=${SNAPPYSTREAM_INSTALL_DIR}/lib + -DCMAKE_BUILD_TYPE:STRING=${THIRD_PARTY_BUILD_TYPE} + BUILD_COMMAND make -j8 + INSTALL_COMMAND make install + DEPENDS snappy +) + +add_library(snappystream STATIC IMPORTED GLOBAL) +set_property(TARGET snappystream PROPERTY IMPORTED_LOCATION + "${SNAPPYSTREAM_INSTALL_DIR}/lib/libsnappystream.a") + +include_directories(${SNAPPYSTREAM_INCLUDE_DIR}) +add_dependencies(snappystream extern_snappystream) diff --git a/paddle/fluid/CMakeLists.txt b/paddle/fluid/CMakeLists.txt index 7405ef17d3e01e4837412c96385e13a9e605a75b..d725763b01d5953985f8e090605f68a8419b5498 100644 --- a/paddle/fluid/CMakeLists.txt +++ b/paddle/fluid/CMakeLists.txt @@ -5,3 +5,4 @@ add_subdirectory(operators) add_subdirectory(pybind) add_subdirectory(inference) add_subdirectory(string) +add_subdirectory(recordio) diff --git a/paddle/fluid/recordio/CMakeLists.txt b/paddle/fluid/recordio/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..e1e7c2cdb3d0c960d5cd408420b5aaead73e70d7 --- /dev/null +++ b/paddle/fluid/recordio/CMakeLists.txt @@ -0,0 +1,6 @@ +# internal library. +cc_library(header SRCS header.cc) +cc_test(header_test SRCS header_test.cc DEPS header) +cc_library(chunk SRCS chunk.cc DEPS snappystream snappy header zlib) +cc_test(chunk_test SRCS chunk_test.cc DEPS chunk) +cc_library(recordio DEPS chunk header) diff --git a/paddle/fluid/recordio/chunk.cc b/paddle/fluid/recordio/chunk.cc new file mode 100644 index 0000000000000000000000000000000000000000..587fd375c38ca83e1c65cb3ccc20b3509b6348c7 --- /dev/null +++ b/paddle/fluid/recordio/chunk.cc @@ -0,0 +1,134 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/recordio/chunk.h" + +#include +#include +#include "paddle/fluid/platform/enforce.h" +#include "snappystream.hpp" +#include "zlib.h" + +namespace paddle { +namespace recordio { +constexpr size_t kMaxBufSize = 1024; + +template +static void ReadStreamByBuf(std::istream& in, int limit, Callback callback) { + char buf[kMaxBufSize]; + std::streamsize actual_size; + size_t counter = 0; + do { + auto actual_max = + limit > 0 ? std::min(limit - counter, kMaxBufSize) : kMaxBufSize; + actual_size = in.readsome(buf, actual_max); + if (actual_size == 0) { + break; + } + callback(buf, actual_size); + if (limit > 0) { + counter += actual_size; + } + } while (actual_size == kMaxBufSize); +} + +static void PipeStream(std::istream& in, std::ostream& os) { + ReadStreamByBuf( + in, -1, [&os](const char* buf, size_t len) { os.write(buf, len); }); +} +static uint32_t Crc32Stream(std::istream& in, int limit = -1) { + auto crc = crc32(0, nullptr, 0); + ReadStreamByBuf(in, limit, [&crc](const char* buf, size_t len) { + crc = crc32(crc, reinterpret_cast(buf), len); + }); + return crc; +} + +bool Chunk::Write(std::ostream& os, Compressor ct) const { + // NOTE(dzhwinter): don't check records.numBytes instead, because + // empty records are allowed. + if (records_.empty()) { + return false; + } + std::stringstream sout; + std::unique_ptr compressed_stream; + switch (ct) { + case Compressor::kNoCompress: + break; + case Compressor::kSnappy: + compressed_stream.reset(new snappy::oSnappyStream(sout)); + break; + default: + PADDLE_THROW("Not implemented"); + } + + std::ostream& buf_stream = compressed_stream ? *compressed_stream : sout; + + for (auto& record : records_) { + size_t sz = record.size(); + buf_stream.write(reinterpret_cast(&sz), sizeof(uint32_t)) + .write(record.data(), record.size()); + } + + if (compressed_stream) { + compressed_stream.reset(); + } + + auto end_pos = sout.tellg(); + sout.seekg(0, std::ios::beg); + uint32_t len = static_cast(end_pos - sout.tellg()); + uint32_t crc = Crc32Stream(sout); + sout.seekg(0, std::ios::beg); + + Header hdr(static_cast(records_.size()), crc, ct, len); + hdr.Write(os); + PipeStream(sout, os); + return true; +} + +void Chunk::Parse(std::istream& sin) { + Header hdr; + hdr.Parse(sin); + auto beg_pos = sin.tellg(); + auto crc = Crc32Stream(sin, hdr.CompressSize()); + PADDLE_ENFORCE_EQ(hdr.Checksum(), crc); + + Clear(); + + sin.seekg(beg_pos, std::ios::beg); + std::unique_ptr compressed_stream; + switch (hdr.CompressType()) { + case Compressor::kNoCompress: + break; + case Compressor::kSnappy: + compressed_stream.reset(new snappy::iSnappyStream(sin)); + break; + default: + PADDLE_THROW("Not implemented"); + } + + std::istream& stream = compressed_stream ? *compressed_stream : sin; + + for (uint32_t i = 0; i < hdr.NumRecords(); ++i) { + uint32_t rec_len; + stream.read(reinterpret_cast(&rec_len), sizeof(uint32_t)); + std::string buf; + buf.resize(rec_len); + stream.read(&buf[0], rec_len); + Add(buf); + } +} + +} // namespace recordio +} // namespace paddle diff --git a/paddle/fluid/recordio/chunk.h b/paddle/fluid/recordio/chunk.h new file mode 100644 index 0000000000000000000000000000000000000000..0ba9c63abbe72e7a51ddb1af5f0d206aa9f6cc5b --- /dev/null +++ b/paddle/fluid/recordio/chunk.h @@ -0,0 +1,56 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once +#include +#include + +#include "paddle/fluid/platform/macros.h" +#include "paddle/fluid/recordio/header.h" + +namespace paddle { +namespace recordio { + +// A Chunk contains the Header and optionally compressed records. +class Chunk { +public: + Chunk() : num_bytes_(0) {} + void Add(std::string buf) { + records_.push_back(buf); + num_bytes_ += buf.size(); + } + // dump the chunk into w, and clears the chunk and makes it ready for + // the next add invocation. + bool Write(std::ostream& fo, Compressor ct) const; + void Clear() { + records_.clear(); + num_bytes_ = 0; + } + void Parse(std::istream& sin); + size_t NumBytes() { return num_bytes_; } + const std::string& Record(int i) const { return records_[i]; } + +private: + std::vector records_; + // sum of record lengths in bytes. + size_t num_bytes_; + DISABLE_COPY_AND_ASSIGN(Chunk); +}; + +size_t CompressData(const char* in, size_t in_length, Compressor ct, char* out); + +void DeflateData(const char* in, size_t in_length, Compressor ct, char* out); + +} // namespace recordio +} // namespace paddle diff --git a/paddle/fluid/recordio/chunk_test.cc b/paddle/fluid/recordio/chunk_test.cc new file mode 100644 index 0000000000000000000000000000000000000000..a67ba32ed6ab8bda230d1414975c96a0be6d682b --- /dev/null +++ b/paddle/fluid/recordio/chunk_test.cc @@ -0,0 +1,49 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/recordio/chunk.h" + +#include + +#include "gtest/gtest.h" + +using namespace paddle::recordio; + +TEST(Chunk, SaveLoad) { + Chunk ch; + ch.Add(std::string("12345", 6)); + ch.Add(std::string("123", 4)); + std::stringstream ss; + ch.Write(ss, Compressor::kNoCompress); + ch.Clear(); + ch.Parse(ss); + ASSERT_EQ(ch.NumBytes(), 10U); +} + +TEST(Chunk, Compressor) { + Chunk ch; + ch.Add(std::string("12345", 6)); + ch.Add(std::string("123", 4)); + ch.Add(std::string("123", 4)); + ch.Add(std::string("123", 4)); + std::stringstream ss; + ch.Write(ss, Compressor::kSnappy); + std::stringstream ss2; + ch.Write(ss2, Compressor::kNoCompress); + ASSERT_LE(ss.tellp(), ss2.tellp()); // Compress should contain less data; + + ch.Clear(); + ch.Parse(ss); + ASSERT_EQ(ch.NumBytes(), 18); +} diff --git a/paddle/fluid/recordio/header.cc b/paddle/fluid/recordio/header.cc new file mode 100644 index 0000000000000000000000000000000000000000..3641caaa8981020519cbc31e5362348c02d3bbce --- /dev/null +++ b/paddle/fluid/recordio/header.cc @@ -0,0 +1,56 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/recordio/header.h" + +namespace paddle { +namespace recordio { + +Header::Header() + : num_records_(0), + checksum_(0), + compressor_(Compressor::kNoCompress), + compress_size_(0) {} + +Header::Header(uint32_t num, uint32_t sum, Compressor c, uint32_t cs) + : num_records_(num), checksum_(sum), compressor_(c), compress_size_(cs) {} + +void Header::Parse(std::istream& is) { + is.read(reinterpret_cast(&num_records_), sizeof(uint32_t)) + .read(reinterpret_cast(&checksum_), sizeof(uint32_t)) + .read(reinterpret_cast(&compressor_), sizeof(uint32_t)) + .read(reinterpret_cast(&compress_size_), sizeof(uint32_t)); +} + +void Header::Write(std::ostream& os) const { + os.write(reinterpret_cast(&num_records_), sizeof(uint32_t)) + .write(reinterpret_cast(&checksum_), sizeof(uint32_t)) + .write(reinterpret_cast(&compressor_), sizeof(uint32_t)) + .write(reinterpret_cast(&compress_size_), sizeof(uint32_t)); +} + +std::ostream& operator<<(std::ostream& os, Header h) { + os << h.NumRecords() << h.Checksum() + << static_cast(h.CompressType()) << h.CompressSize(); + return os; +} + +bool operator==(Header l, Header r) { + return l.NumRecords() == r.NumRecords() && l.Checksum() == r.Checksum() && + l.CompressType() == r.CompressType() && + l.CompressSize() == r.CompressSize(); +} + +} // namespace recordio +} // namespace paddle diff --git a/paddle/fluid/recordio/header.h b/paddle/fluid/recordio/header.h new file mode 100644 index 0000000000000000000000000000000000000000..cbd52642a668d1eaeeafb672e50af1a476975080 --- /dev/null +++ b/paddle/fluid/recordio/header.h @@ -0,0 +1,66 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace paddle { +namespace recordio { + +// Default ChunkSize +constexpr size_t kDefaultMaxChunkSize = 32 * 1024 * 1024; +// MagicNumber for memory checking +constexpr uint32_t kMagicNumber = 0x01020304; + +enum class Compressor : uint32_t { + // NoCompression means writing raw chunk data into files. + // With other choices, chunks are compressed before written. + kNoCompress = 0, + // Snappy had been the default compressing algorithm widely + // used in Google. It compromises between speech and + // compression ratio. + kSnappy = 1, + // Gzip is a well-known compression algorithm. It is + // recommmended only you are looking for compression ratio. + kGzip = 2, +}; + +// Header is the metadata of Chunk +class Header { +public: + Header(); + Header(uint32_t num, uint32_t sum, Compressor ct, uint32_t cs); + + void Write(std::ostream& os) const; + void Parse(std::istream& is); + + uint32_t NumRecords() const { return num_records_; } + uint32_t Checksum() const { return checksum_; } + Compressor CompressType() const { return compressor_; } + uint32_t CompressSize() const { return compress_size_; } + +private: + uint32_t num_records_; + uint32_t checksum_; + Compressor compressor_; + uint32_t compress_size_; +}; + +// Allow Header Loggable +std::ostream& operator<<(std::ostream& os, Header h); +bool operator==(Header l, Header r); + +} // namespace recordio +} // namespace paddle diff --git a/paddle/fluid/recordio/header_test.cc b/paddle/fluid/recordio/header_test.cc new file mode 100644 index 0000000000000000000000000000000000000000..a7d627c3eb4a7af1954795f77e5f24739edadae8 --- /dev/null +++ b/paddle/fluid/recordio/header_test.cc @@ -0,0 +1,31 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/recordio/header.h" + +#include + +#include "gtest/gtest.h" + +using namespace paddle::recordio; + +TEST(Recordio, ChunkHead) { + Header hdr(0, 1, Compressor::kGzip, 3); + std::stringstream ss; + hdr.Write(ss); + ss.seekg(0, std::ios::beg); + Header hdr2; + hdr2.Parse(ss); + EXPECT_TRUE(hdr == hdr2); +}