diff --git a/cmake/external/snappy.cmake b/cmake/external/snappy.cmake new file mode 100644 index 0000000000000000000000000000000000000000..2c109727cfa641b16ba01d6392613fa497b95902 --- /dev/null +++ b/cmake/external/snappy.cmake @@ -0,0 +1,57 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +IF(MOBILE_INFERENCE) + return() +ENDIF() + +include (ExternalProject) + +# NOTE: snappy is needed when linking with recordio + +SET(SNAPPY_SOURCES_DIR ${THIRD_PARTY_PATH}/snappy) +SET(SNAPPY_INSTALL_DIR ${THIRD_PARTY_PATH}/install/snappy) +SET(SNAPPY_INCLUDE_DIR "${SNAPPY_INSTALL_DIR}/include/" CACHE PATH "snappy include directory." FORCE) + +ExternalProject_Add( + extern_snappy + GIT_REPOSITORY "https://github.com/google/snappy" + GIT_TAG "1.1.7" + PREFIX ${SNAPPY_SOURCES_DIR} + UPDATE_COMMAND "" + CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER} + -DCMAKE_C_COMPILER=${CMAKE_C_COMPILER} + -DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS} + -DCMAKE_C_FLAGS=${CMAKE_C_FLAGS} + -DCMAKE_INSTALL_PREFIX=${SNAPPY_INSTALL_DIR} + -DCMAKE_INSTALL_LIBDIR=${SNAPPY_INSTALL_DIR}/lib + -DCMAKE_POSITION_INDEPENDENT_CODE=ON + -DBUILD_TESTING=OFF + -DCMAKE_BUILD_TYPE=${THIRD_PARTY_BUILD_TYPE} + ${EXTERNAL_OPTIONAL_ARGS} + CMAKE_CACHE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=${SNAPPY_INSTALL_DIR} + -DCMAKE_INSTALL_LIBDIR:PATH=${SNAPPY_INSTALL_DIR}/lib + -DCMAKE_POSITION_INDEPENDENT_CODE:BOOL=ON + -DCMAKE_BUILD_TYPE:STRING=${THIRD_PARTY_BUILD_TYPE} + BUILD_COMMAND make -j8 + INSTALL_COMMAND make install +) + +add_library(snappy STATIC IMPORTED GLOBAL) +set_property(TARGET snappy PROPERTY IMPORTED_LOCATION + "${SNAPPY_INSTALL_DIR}/lib/libsnappy.a") + +include_directories(${SNAPPY_INCLUDE_DIR}) +add_dependencies(snappy extern_snappy) diff --git a/paddle/fluid/recordio/CMakeLists.txt b/paddle/fluid/recordio/CMakeLists.txt index 37c3214ff8673fee7a417de9b19af6df6ab57569..86b4583c7b8be2949499ae998e2c3c3b5532934c 100644 --- a/paddle/fluid/recordio/CMakeLists.txt +++ b/paddle/fluid/recordio/CMakeLists.txt @@ -1,2 +1,4 @@ cc_library(header SRCS header.cc) cc_test(header_test SRCS header_test.cc DEPS header) +cc_library(io SRCS io.cc DEPS stringpiece) +cc_test(io_test SRCS io_test.cc DEPS io) diff --git a/paddle/fluid/recordio/chunk.h b/paddle/fluid/recordio/chunk.h index 77c0ae81b74207549e3a1b9319fc918bba4ee12e..48626b92fed93532e16a61ee0f22d0d6cfe7d257 100644 --- a/paddle/fluid/recordio/chunk.h +++ b/paddle/fluid/recordio/chunk.h @@ -32,9 +32,11 @@ public: bool Dump(std::ostream& os, Compressor ct); void Parse(std::istream& iss, int64_t offset); const std::string Record(int i) { return records_[i]; } + size_t NumBytes() { return num_bytes_; } private: std::vector records_; + // sum of record lengths in bytes. size_t num_bytes_; }; diff --git a/paddle/fluid/recordio/filesys.h b/paddle/fluid/recordio/filesys.h new file mode 100644 index 0000000000000000000000000000000000000000..b21702bf3a0dcff282fb06edead46c3bf295602f --- /dev/null +++ b/paddle/fluid/recordio/filesys.h @@ -0,0 +1,24 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +class DefaultFileSys { +public: +private: +}; diff --git a/paddle/fluid/recordio/header_test.cc b/paddle/fluid/recordio/header_test.cc index 991ea05ec14e664b351661f7a2f968fdadafafb9..322f63190a594382af54adb8c0ee8c8f2fde59ac 100644 --- a/paddle/fluid/recordio/header_test.cc +++ b/paddle/fluid/recordio/header_test.cc @@ -18,7 +18,7 @@ #include "gtest/gtest.h" -using namespace recordio; +using namespace paddle::recordio; TEST(Recordio, ChunkHead) { Header hdr(0, 1, Compressor::kGzip, 3); @@ -32,5 +32,5 @@ TEST(Recordio, ChunkHead) { std::ostringstream oss2; hdr2.Write(oss2); EXPECT_STREQ(oss2.str().c_str(), oss.str().c_str()); - EXPECT_EQ(hdr == hdr2); + EXPECT_TRUE(hdr == hdr2); } diff --git a/paddle/fluid/recordio/io.cc b/paddle/fluid/recordio/io.cc new file mode 100644 index 0000000000000000000000000000000000000000..2c82d1d42d9b2d891866fb6e7fdd9c9d456fca90 --- /dev/null +++ b/paddle/fluid/recordio/io.cc @@ -0,0 +1,53 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/recordio/io.h" +#include "paddle/fluid/string/piece.h" + +namespace paddle { +namespace recordio { +Stream* Stream::Open(const char* filename, const char* mode) { + // Create IOStream for different filesystems + // HDFS: hdfs://tmp/file.txt + // Default: /tmp/file.txt + FILE* fp = nullptr; + if (string::HasPrefix(string::Piece(filename), string::Piece("/"))) { + fp = fopen(filename, mode); + } + return new FileStream(fp); +} + +size_t FileStream::Read(void* ptr, size_t size) { + return fread(ptr, 1, size, fp_); +} + +void FileStream::Write(const void* ptr, size_t size) { + size_t real = fwrite(ptr, 1, size, fp_); + PADDLE_ENFORCE(real == size, "FileStream write incomplete."); +} + +size_t FileStream::Tell() { return ftell(fp_); } +void FileStream::Seek(size_t p) { fseek(fp_, static_cast(p), SEEK_SET); } + +bool FileStream::Eof() { return feof(fp_); } + +void FileStream::Close() { + if (fp_ != nullptr) { + fclose(fp_); + fp_ = nullptr; + } +} + +} // namespace recordio +} // namespace paddle diff --git a/paddle/fluid/recordio/io.h b/paddle/fluid/recordio/io.h new file mode 100644 index 0000000000000000000000000000000000000000..ff647b95d8ebfd9ff4a680dc94275afb74d32916 --- /dev/null +++ b/paddle/fluid/recordio/io.h @@ -0,0 +1,53 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include "paddle/fluid/platform/enforce.h" + +namespace paddle { +namespace recordio { + +// Stream abstract object for read and write +class Stream { +public: + virtual ~Stream() {} + virtual size_t Read(void* ptr, size_t size); + virtual void Write(const void* ptr, size_t size); + virtual size_t Tell(); + virtual void Seek(); + // Create Stream Instance + static Stream* Open(const char* filename, const char* mode); +}; + +// FileStream +class FileStream : public Stream { +public: + explicit FileStream(FILE* fp) : fp_(fp) {} + ~FileStream() { this->Close(); } + size_t Read(void* ptr, size_t size); + void Write(const void* ptr, size_t size); + size_t Tell(); + void Seek(size_t p); + bool Eof(); + void Close(); + +private: + FILE* fp_; +}; + +} // namespace recordio +} // namespace paddle diff --git a/paddle/fluid/recordio/io_test.cc b/paddle/fluid/recordio/io_test.cc new file mode 100644 index 0000000000000000000000000000000000000000..b2e5733ffed5216b9932c1e4c113f1bc82dc53a1 --- /dev/null +++ b/paddle/fluid/recordio/io_test.cc @@ -0,0 +1,36 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/recordio/io.h" + +#include "gtest/gtest.h" + +using namespace paddle::recordio; + +TEST(FileStream, IO) { + { + // Write + Stream* fs = Stream::Open("/tmp/record_0", "rw"); + fs->Write("hello", 6); + delete fs; + } + { + // Read + Stream* fs = Stream::Open("/tmp/record_0", "r+"); + char buf[10]; + fs->Read(&buf, 6); + EXPECT_STREQ(buf, "hello"); + delete fs; + } +} diff --git a/paddle/fluid/recordio/scanner.h b/paddle/fluid/recordio/scanner.h index 8bcdea3c6fe590062e6f9ab6b9f778ec42556897..dc09bd5fdda3492b38d0956ba99d5858edd3c85d 100644 --- a/paddle/fluid/recordio/scanner.h +++ b/paddle/fluid/recordio/scanner.h @@ -21,8 +21,9 @@ #include #include -// Scanner +class RangeScanner; +// Scanner is a scanner for multiple recordio files. class Scanner { public: Scanner(const char* paths); diff --git a/paddle/fluid/recordio/writer.cc b/paddle/fluid/recordio/writer.cc index 08d3d2c5778b4b67a3b2671742b6abbc5a1bc205..acb84fb8e8d3da628bc47edcbd2b6cd291537362 100644 --- a/paddle/fluid/recordio/writer.cc +++ b/paddle/fluid/recordio/writer.cc @@ -17,29 +17,36 @@ namespace paddle { namespace recordio { -Writer::Writer(std::ostream& os) - : stream_(os.rdbuf()), max_chunk_size_(0), compressor_(0) {} +Writer::Writer(Stream* fo) : stream_(fo), max_chunk_size_(0), compressor_(0) {} -Writer::Writer(std::ostream& os, int maxChunkSize, int compressor) - : stream_(os.rdbuf()), +Writer::Writer(Stream* fo, int maxChunkSize, int compressor) + : stream_(fo), max_chunk_size_(maxChunkSize), - compressor_(compressor) { - // clear rdstate - stream_.clear(); + compressor_(static_cast(compressor)) { chunk_.reset(new Chunk); } -size_t Writer::Write(const std::string& buf) { return Write(std::string(buf)); } - -size_t Writer::Write(const char* buf, size_t length) { - return Write(std::string(buf, length)); +size_t Writer::Write(const std::string& record) { + if (stream_ == nullptr) { + LOG(WARNING) << "Cannot write since writer had been closed."; + return 0; + } + if ((record.size() + chunk_->NumBytes()) > max_chunk_size_) { + chunk_->Dump(stream_, compressor_); + } + chunk_->Add(record); + return record.size(); } -size_t Writer::Write(std::string&& buf) {} +// size_t Writer::Write(const char* buf, size_t length) { +// return Write(std::string(buf, length)); +// } + +// size_t Writer::Write(std::string&& buf) {} void Writer::Close() { - stream_.flush(); - stream_.setstate(std::ios::eofbit); + chunk_->Dump(stream_, compressor_); + stream_ = nullptr; } } // namespace recordio diff --git a/paddle/fluid/recordio/writer.h b/paddle/fluid/recordio/writer.h index 49b86a6a28a819c238c7d3132c6d58610e71d431..250d59813cbaf993bc1f8b47fd469406c367d78d 100644 --- a/paddle/fluid/recordio/writer.h +++ b/paddle/fluid/recordio/writer.h @@ -16,8 +16,9 @@ #include #include -#include "paddle/fluid/platform/macros.h" // for DISABLE COPY ASSIGN +#include "paddle/fluid/platform/macros.h" // DISABLE_COPY_ASSIGN #include "paddle/fluid/recordio/header.h" +#include "paddle/fluid/recordio/io.h" namespace paddle { namespace recordio { @@ -25,32 +26,27 @@ namespace recordio { // Writer creates a RecordIO file. class Writer { public: - Writer(std::ostream& os); - Writer(std::ostream& os, int maxChunkSize, int c); + Writer(Stream* fo); + Writer(Stream* fo, int maxChunkSize, int c); // Writes a record. It returns an error if Close has been called. size_t Write(const char* buf, size_t length); - size_t Write(const std::string& buf); - size_t Write(std::string&& buf); // Close flushes the current chunk and makes the writer invalid. void Close(); private: - // Set rdstate to mark a closed writer - std::ostream stream_; + // Set nullptr to mark a closed writer + Stream* stream_; + // Chunk for store object std::unique_ptr chunk_; // total records size, excluding metadata, before compression. int max_chunk_size_; - int compressor_; + // Compressor used for chuck + Compressor compressor_; + DISABLE_COPY_AND_ASSIGN(Writer); }; -template -Writer& operator<<(const T& val) { - stream_ << val; - return *this; -} - } // namespace recordio } // namespace paddle diff --git a/paddle/fluid/recordio/writer_test.cc b/paddle/fluid/recordio/writer_test.cc new file mode 100644 index 0000000000000000000000000000000000000000..1ba32bf2df523a2236ed6fa7c9e2259817f7d0f0 --- /dev/null +++ b/paddle/fluid/recordio/writer_test.cc @@ -0,0 +1,21 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/recordio/writer.h" + +#include "gtest/gtest.h" + +using namespace paddle::recordio; + +TEST(Writer, Normal) { Stream }