diff --git a/CMakeLists.txt b/CMakeLists.txt index 656c890bcd08dd23776ee590e05be3007e8cabf6..be6a4d7c09658cb3c07d9846d9cd1ff8edb31144 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -149,8 +149,6 @@ include(external/cub) include(external/rocprim) include(external/xxhash) # download xxhash include(external/dlpack) -include(external/snappy) # download snappy -include(external/snappystream) # download snappystream include(external/warpctc) # download, build, install warpctc if (NOT WIN32) diff --git a/cmake/external/brpc.cmake b/cmake/external/brpc.cmake index 0dd35c090ee1d6903529d8218ae25411bf106deb..a5a86afa4a5352f586714041d9f041b610d97b8e 100644 --- a/cmake/external/brpc.cmake +++ b/cmake/external/brpc.cmake @@ -33,7 +33,7 @@ SET(BRPC_LIBRARIES "${BRPC_INSTALL_DIR}/lib/libbrpc.a" CACHE FILEPATH "brpc libr INCLUDE_DIRECTORIES(${BRPC_INCLUDE_DIR}) # Reference https://stackoverflow.com/questions/45414507/pass-a-list-of-prefix-paths-to-externalproject-add-in-cmake-args -set(prefix_path "${THIRD_PARTY_PATH}/install/gflags|${THIRD_PARTY_PATH}/install/leveldb|${THIRD_PARTY_PATH}/install/snappy|${THIRD_PARTY_PATH}/install/gtest|${THIRD_PARTY_PATH}/install/protobuf|${THIRD_PARTY_PATH}/install/zlib|${THIRD_PARTY_PATH}/install/glog") +set(prefix_path "${THIRD_PARTY_PATH}/install/gflags|${THIRD_PARTY_PATH}/install/gtest|${THIRD_PARTY_PATH}/install/protobuf|${THIRD_PARTY_PATH}/install/zlib|${THIRD_PARTY_PATH}/install/glog") # If minimal .a is need, you can set WITH_DEBUG_SYMBOLS=OFF ExternalProject_Add( @@ -62,7 +62,7 @@ ExternalProject_Add( -DCMAKE_POSITION_INDEPENDENT_CODE:BOOL=ON -DCMAKE_BUILD_TYPE:STRING=${THIRD_PARTY_BUILD_TYPE} ) -ADD_DEPENDENCIES(extern_brpc protobuf ssl crypto leveldb gflags glog gtest snappy) +ADD_DEPENDENCIES(extern_brpc protobuf ssl crypto leveldb gflags glog gtest) ADD_LIBRARY(brpc STATIC IMPORTED GLOBAL) SET_PROPERTY(TARGET brpc PROPERTY IMPORTED_LOCATION ${BRPC_LIBRARIES}) ADD_DEPENDENCIES(brpc extern_brpc) diff --git a/cmake/external/leveldb.cmake b/cmake/external/leveldb.cmake index ac0febd076e659927a6a882ff487c61ac130437a..3ba8a466c647f1aeef0ad20d4a540b6926e94054 100644 --- a/cmake/external/leveldb.cmake +++ b/cmake/external/leveldb.cmake @@ -34,8 +34,6 @@ ExternalProject_Add( BUILD_IN_SOURCE 1 ) -ADD_DEPENDENCIES(extern_leveldb snappy) - ADD_LIBRARY(leveldb STATIC IMPORTED GLOBAL) SET_PROPERTY(TARGET leveldb PROPERTY IMPORTED_LOCATION ${LEVELDB_LIBRARIES}) ADD_DEPENDENCIES(leveldb extern_leveldb) diff --git a/cmake/external/snappy.cmake b/cmake/external/snappy.cmake deleted file mode 100644 index 3fb6b49f472df48b77ca689f4ef22e6abc2902a9..0000000000000000000000000000000000000000 --- a/cmake/external/snappy.cmake +++ /dev/null @@ -1,65 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -include (ExternalProject) - -# NOTE: snappy is needed when linking with recordio - -set(SNAPPY_SOURCES_DIR ${THIRD_PARTY_PATH}/snappy) -set(SNAPPY_INSTALL_DIR ${THIRD_PARTY_PATH}/install/snappy) -set(SNAPPY_INCLUDE_DIR "${SNAPPY_INSTALL_DIR}/include" CACHE PATH "snappy include directory." FORCE) - -if(WIN32) - SET(SNAPPY_CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /wd4244 /wd4267") -else() - SET(SNAPPY_CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS}) -endif() - -ExternalProject_Add( - extern_snappy - GIT_REPOSITORY "https://github.com/google/snappy" - GIT_TAG "1.1.7" - PREFIX ${SNAPPY_SOURCES_DIR} - UPDATE_COMMAND "" - CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER} - -DCMAKE_C_COMPILER=${CMAKE_C_COMPILER} - -DCMAKE_C_FLAGS=${CMAKE_C_FLAGS} - -DCMAKE_C_FLAGS_DEBUG=${CMAKE_C_FLAGS_DEBUG} - -DCMAKE_C_FLAGS_RELEASE=${CMAKE_C_FLAGS_RELEASE} - -DCMAKE_CXX_FLAGS=${SNAPPY_CMAKE_CXX_FLAGS} - -DCMAKE_CXX_FLAGS_RELEASE=${CMAKE_CXX_FLAGS_RELEASE} - -DCMAKE_CXX_FLAGS_DEBUG=${CMAKE_CXX_FLAGS_DEBUG} - -DCMAKE_INSTALL_PREFIX=${SNAPPY_INSTALL_DIR} - -DCMAKE_INSTALL_LIBDIR=${SNAPPY_INSTALL_DIR}/lib - -DCMAKE_POSITION_INDEPENDENT_CODE=ON - -DBUILD_TESTING=OFF - -DSNAPPY_BUILD_TESTS:BOOL=OFF - -DCMAKE_BUILD_TYPE=${THIRD_PARTY_BUILD_TYPE} - ${EXTERNAL_OPTIONAL_ARGS} - CMAKE_CACHE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=${SNAPPY_INSTALL_DIR} - -DCMAKE_INSTALL_LIBDIR:PATH=${SNAPPY_INSTALL_DIR}/lib - -DCMAKE_POSITION_INDEPENDENT_CODE:BOOL=ON - -DCMAKE_BUILD_TYPE:STRING=${THIRD_PARTY_BUILD_TYPE} -) -IF(WIN32) - set(SNAPPY_LIBRARIES "${SNAPPY_INSTALL_DIR}/lib/snappy.lib") -else(WIN32) - set(SNAPPY_LIBRARIES "${SNAPPY_INSTALL_DIR}/lib/libsnappy.a") -endif (WIN32) - -add_library(snappy STATIC IMPORTED GLOBAL) -set_property(TARGET snappy PROPERTY IMPORTED_LOCATION ${SNAPPY_LIBRARIES}) - -include_directories(${SNAPPY_INCLUDE_DIR}) -add_dependencies(snappy extern_snappy) diff --git a/cmake/external/snappystream.cmake b/cmake/external/snappystream.cmake deleted file mode 100644 index 392f186b7ce3821f313ed6fc3dd5a97c2a7adebd..0000000000000000000000000000000000000000 --- a/cmake/external/snappystream.cmake +++ /dev/null @@ -1,63 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -include (ExternalProject) - -set(SNAPPYSTREAM_SOURCES_DIR ${THIRD_PARTY_PATH}/snappy_stream) -set(SNAPPYSTREAM_INSTALL_DIR ${THIRD_PARTY_PATH}/install/snappy_stream) -set(SNAPPYSTREAM_INCLUDE_DIR "${SNAPPYSTREAM_INSTALL_DIR}/include" CACHE PATH "snappy stream include directory." FORCE) - -if(WIN32) - # Fix me, VS2015 come without VLA support - set(SNAPPYSTREAM_LIBRARIES "${SNAPPYSTREAM_INSTALL_DIR}/lib/snappystream.lib") - MESSAGE(WARNING, "In windows, snappystream has no compile support for windows, - please build it manually and put it at " ${SNAPPYSTREAM_INSTALL_DIR}) -else(WIN32) - set(SNAPPYSTREAM_LIBRARIES "${SNAPPYSTREAM_INSTALL_DIR}/lib/libsnappystream.a") - - ExternalProject_Add( - extern_snappystream - GIT_REPOSITORY "https://github.com/hoxnox/snappystream.git" - GIT_TAG "0.2.8" - PREFIX ${SNAPPYSTREAM_SOURCES_DIR} - UPDATE_COMMAND "" - CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER} - -DCMAKE_C_COMPILER=${CMAKE_C_COMPILER} - -DCMAKE_C_FLAGS=${CMAKE_C_FLAGS} - -DCMAKE_C_FLAGS_DEBUG=${CMAKE_C_FLAGS_DEBUG} - -DCMAKE_C_FLAGS_RELEASE=${CMAKE_C_FLAGS_RELEASE} - -DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS} - -DCMAKE_CXX_FLAGS_RELEASE=${CMAKE_CXX_FLAGS_RELEASE} - -DCMAKE_CXX_FLAGS_DEBUG=${CMAKE_CXX_FLAGS_DEBUG} - -DCMAKE_INSTALL_PREFIX=${SNAPPY_INSTALL_DIR} - -DCMAKE_INSTALL_LIBDIR=${SNAPPY_INSTALL_DIR}/lib - -DCMAKE_POSITION_INDEPENDENT_CODE=ON - -DCMAKE_BUILD_TYPE=${THIRD_PARTY_BUILD_TYPE} - -DSNAPPY_ROOT=${SNAPPY_INSTALL_DIR} - ${EXTERNAL_OPTIONAL_ARGS} - CMAKE_CACHE_ARGS - -DCMAKE_INSTALL_PREFIX:PATH=${SNAPPYSTREAM_INSTALL_DIR} - -DCMAKE_INSTALL_LIBDIR:PATH=${SNAPPYSTREAM_INSTALL_DIR}/lib - -DCMAKE_BUILD_TYPE:STRING=${THIRD_PARTY_BUILD_TYPE} - DEPENDS snappy - ) -endif(WIN32) - -add_library(snappystream STATIC IMPORTED GLOBAL) -set_property(TARGET snappystream PROPERTY IMPORTED_LOCATION ${SNAPPYSTREAM_LIBRARIES}) - -include_directories(${SNAPPYSTREAM_INCLUDE_DIR}) # For snappysteam to include its own headers. -include_directories(${THIRD_PARTY_PATH}/install) # For Paddle to include snappy stream headers. - -add_dependencies(snappystream extern_snappystream) diff --git a/cmake/inference_lib.cmake b/cmake/inference_lib.cmake index 87a7a1a86f190a8fb07b8af18987986f4ab7fd7e..10d399209e80fe47a8dc77276533818e50a98d93 100644 --- a/cmake/inference_lib.cmake +++ b/cmake/inference_lib.cmake @@ -149,18 +149,6 @@ if (WITH_NGRAPH) ) endif () -set(dst_dir "${FLUID_INSTALL_DIR}/third_party/install/snappy") -copy(snappy_lib - SRCS ${SNAPPY_INCLUDE_DIR} ${SNAPPY_LIBRARIES} - DSTS ${dst_dir} ${dst_dir}/lib - DEPS snappy) - -set(dst_dir "${FLUID_INSTALL_DIR}/third_party/install/snappystream") -copy(snappystream_lib - SRCS ${SNAPPYSTREAM_INCLUDE_DIR} ${SNAPPYSTREAM_LIBRARIES} - DSTS ${dst_dir} ${dst_dir}/lib - DEPS snappystream) - set(dst_dir "${FLUID_INSTALL_DIR}/third_party/install/zlib") copy(zlib_lib SRCS ${ZLIB_INCLUDE_DIR} ${ZLIB_LIBRARIES} diff --git a/paddle/fluid/API.spec b/paddle/fluid/API.spec index 97e9ca6bde84be21aac9e5537961ea31d03ae54c..c823cb54e3900333f9af31ba0b67eb9f91f8ad74 100644 --- a/paddle/fluid/API.spec +++ b/paddle/fluid/API.spec @@ -288,19 +288,10 @@ paddle.fluid.layers.var_conv_2d (ArgSpec(args=['input', 'row', 'col', 'input_cha paddle.fluid.layers.shard_index (ArgSpec(args=['input', 'index_num', 'nshards', 'shard_id', 'ignore_value'], varargs=None, keywords=None, defaults=(-1,)), ('document', '5786fdbba6753ecd6cbce5e6b0889924')) paddle.fluid.layers.hard_swish (ArgSpec(args=['x', 'threshold', 'scale', 'offset', 'name'], varargs=None, keywords=None, defaults=(6.0, 6.0, 3.0, None)), ('document', '6a5152a7015c62cb8278fc24cb456459')) paddle.fluid.layers.data (ArgSpec(args=['name', 'shape', 'append_batch_size', 'dtype', 'lod_level', 'type', 'stop_gradient'], varargs=None, keywords=None, defaults=(True, 'float32', 0, VarType.LOD_TENSOR, True)), ('document', '9d7806e31bdf727c1a23b8782a09b545')) -paddle.fluid.layers.open_files (ArgSpec(args=['filenames', 'shapes', 'lod_levels', 'dtypes', 'thread_num', 'buffer_size', 'pass_num', 'is_test'], varargs=None, keywords=None, defaults=(None, None, 1, None)), ('document', 'cccb6eb5410c822e5307c947aca2c899')) -paddle.fluid.layers.read_file (ArgSpec(args=['reader'], varargs=None, keywords=None, defaults=None), ('document', '32181f6037e387fb6e68a5beaafe33b6')) -paddle.fluid.layers.shuffle (ArgSpec(args=['reader', 'buffer_size'], varargs=None, keywords=None, defaults=None), ('document', 'aa5803d1eccdaef03cdfb0b7ca088071')) -paddle.fluid.layers.batch (ArgSpec(args=['reader', 'batch_size'], varargs=None, keywords=None, defaults=None), ('document', '3007211c84c5c77eda8dc83619a6eaf8')) -paddle.fluid.layers.double_buffer (ArgSpec(args=['reader', 'place', 'name'], varargs=None, keywords=None, defaults=(None, None)), ('document', '7241dd1c142f4c65c8d7f66948140aa7')) -paddle.fluid.layers.random_data_generator (ArgSpec(args=['low', 'high', 'shapes', 'lod_levels', 'for_parallel'], varargs=None, keywords=None, defaults=(True,)), ('document', '290f5b97f24f0022e195f7228dd56fd9')) +paddle.fluid.layers.read_file (ArgSpec(args=['reader'], varargs=None, keywords=None, defaults=None), ('document', '88367daf9a30c9ab83adc5d7221e23ef')) +paddle.fluid.layers.double_buffer (ArgSpec(args=['reader', 'place', 'name'], varargs=None, keywords=None, defaults=(None, None)), ('document', '44724c493f41a124abc7531c2740e2e3')) paddle.fluid.layers.py_reader (ArgSpec(args=['capacity', 'shapes', 'dtypes', 'lod_levels', 'name', 'use_double_buffer'], varargs=None, keywords=None, defaults=(None, None, True)), ('document', 'd78a1c7344955c5caed8dc13adb7beb6')) paddle.fluid.layers.create_py_reader_by_data (ArgSpec(args=['capacity', 'feed_list', 'name', 'use_double_buffer'], varargs=None, keywords=None, defaults=(None, True)), ('document', '2edf37d57862b24a7a26aa19a3573f73')) -paddle.fluid.layers.Preprocessor ('paddle.fluid.layers.io.Preprocessor', ('document', '1c2efbbc1197b44941a95b9ec4e737ae')) -paddle.fluid.layers.Preprocessor.__init__ (ArgSpec(args=['self', 'reader', 'name'], varargs=None, keywords=None, defaults=(None,)), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) -paddle.fluid.layers.Preprocessor.block (ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) -paddle.fluid.layers.Preprocessor.inputs (ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) -paddle.fluid.layers.Preprocessor.outputs (ArgSpec(args=['self'], varargs='outs', keywords=None, defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) paddle.fluid.layers.load (ArgSpec(args=['out', 'file_path', 'load_as_fp16'], varargs=None, keywords=None, defaults=(None,)), ('document', '9d1a4bc97bbce9fa1d4f7a4200a771ff')) paddle.fluid.layers.create_tensor (ArgSpec(args=['dtype', 'name', 'persistable'], varargs=None, keywords=None, defaults=(None, False)), ('document', 'aaf0176c743c43e9bc684dd7dfac25c5')) paddle.fluid.layers.create_parameter (ArgSpec(args=['shape', 'dtype', 'name', 'attr', 'is_bias', 'default_initializer'], varargs=None, keywords=None, defaults=(None, None, False, None)), ('document', '021272f30e0cdf7503586815378abfb8')) @@ -1044,7 +1035,5 @@ paddle.fluid.profiler.stop_profiler (ArgSpec(args=['sorted_key', 'profile_path'] paddle.fluid.unique_name.generate (ArgSpec(args=['key'], varargs=None, keywords=None, defaults=None), ('document', '4d68cde4c4df8f1b8018620b4dc19b42')) paddle.fluid.unique_name.switch (ArgSpec(args=['new_generator'], varargs=None, keywords=None, defaults=(None,)), ('document', '695a6e91afbcdbafac69a069038811be')) paddle.fluid.unique_name.guard (ArgSpec(args=['new_generator'], varargs=None, keywords=None, defaults=(None,)), ('document', 'ead717d6d440a1eb11971695cd1727f4')) -paddle.fluid.recordio_writer.convert_reader_to_recordio_file (ArgSpec(args=['filename', 'reader_creator', 'feeder', 'compressor', 'max_num_records', 'feed_order'], varargs=None, keywords=None, defaults=(Compressor.Snappy, 1000, None)), ('document', '65c7523e86f0c50bb729b01667f36310')) -paddle.fluid.recordio_writer.convert_reader_to_recordio_files (ArgSpec(args=['filename', 'batch_per_file', 'reader_creator', 'feeder', 'compressor', 'max_num_records', 'feed_order'], varargs=None, keywords=None, defaults=(Compressor.Snappy, 1000, None)), ('document', 'bc643f0f5f1b9db57ff0d8a57d379bd7')) paddle.fluid.Scope Scope() -> paddle.fluid.core_avx._Scope paddle.fluid.install_check.run_check (ArgSpec(args=[], varargs=None, keywords=None, defaults=None), ('document', '66b7c84a17ed32fec2df9628367be2b9')) diff --git a/paddle/fluid/CMakeLists.txt b/paddle/fluid/CMakeLists.txt index 595454e90b9cd713fd2baed24538cf5fbc93934a..16457b564ffc82a4246776dc283261bed0351ec6 100644 --- a/paddle/fluid/CMakeLists.txt +++ b/paddle/fluid/CMakeLists.txt @@ -4,7 +4,6 @@ add_subdirectory(framework) add_subdirectory(imperative) add_subdirectory(operators) add_subdirectory(string) -add_subdirectory(recordio) add_subdirectory(pybind) # NOTE: please add subdirectory inference at last. diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index 7dc3a6c60e794a32f22a4b39bbf5ffd0099aa81d..da1e977a9d44974ec16599bd0da3d63e0892fa7a 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -63,7 +63,7 @@ if(WITH_GPU) else() cc_test(mixed_vector_test SRCS mixed_vector_test.cc DEPS place memory device_context tensor) endif() -cc_library(lod_tensor SRCS lod_tensor.cc DEPS ddim place tensor framework_proto recordio version) +cc_library(lod_tensor SRCS lod_tensor.cc DEPS ddim place tensor framework_proto version) cc_test(lod_tensor_test SRCS lod_tensor_test.cc DEPS lod_tensor memory) nv_test(lod_tensor_gpu_test SRCS lod_tensor_test.cu DEPS lod_tensor) diff --git a/paddle/fluid/framework/lod_tensor.cc b/paddle/fluid/framework/lod_tensor.cc index 9883a1940567fb5f5e6ce1eed7774c7d4a90dc9e..89122851c7a7c2eb7853ab457eee48630418d18b 100644 --- a/paddle/fluid/framework/lod_tensor.cc +++ b/paddle/fluid/framework/lod_tensor.cc @@ -26,9 +26,6 @@ limitations under the License. */ #include "paddle/fluid/memory/memcpy.h" #include "paddle/fluid/memory/memory.h" -#include "paddle/fluid/recordio/scanner.h" -#include "paddle/fluid/recordio/writer.h" - namespace paddle { namespace framework { @@ -275,36 +272,6 @@ void DeserializeFromStream(std::istream &is, LoDTensor *tensor, TensorFromStream(is, static_cast(tensor), dev_ctx); } -void WriteToRecordIO(recordio::Writer *writer, - const std::vector &tensor, - const platform::DeviceContext &dev_ctx) { - std::stringstream buffer; - size_t sz = tensor.size(); - buffer.write(reinterpret_cast(&sz), sizeof(uint32_t)); - for (auto &each : tensor) { - SerializeToStream(buffer, each, dev_ctx); - } - writer->Write(buffer.str()); -} - -bool ReadFromRecordIO(recordio::Scanner *scanner, - const platform::DeviceContext &dev_ctx, - std::vector *result_ptr) { - if (!scanner->HasNext()) { - return false; - } - std::istringstream sin(scanner->Next()); - uint32_t sz; - sin.read(reinterpret_cast(&sz), sizeof(uint32_t)); - auto &result = *result_ptr; - result.resize(sz); - for (uint32_t i = 0; i < sz; ++i) { - DeserializeFromStream(sin, &result[i], dev_ctx); - } - - return true; -} - std::vector LoDTensor::SplitLoDTensor( const std::vector places) const { check_memory_size(); diff --git a/paddle/fluid/framework/lod_tensor.h b/paddle/fluid/framework/lod_tensor.h index 5e20ba7c1cf1fd7089ab1540d1b3b4062a4b6e26..ef48753349ec7b07d2c1c0ee68d133145e4e4047 100644 --- a/paddle/fluid/framework/lod_tensor.h +++ b/paddle/fluid/framework/lod_tensor.h @@ -32,12 +32,6 @@ limitations under the License. */ #include "paddle/fluid/platform/place.h" namespace paddle { - -namespace recordio { -class Writer; -class Scanner; -} - namespace framework { /* @@ -216,14 +210,6 @@ void SerializeToStream(std::ostream& os, const LoDTensor& tensor, void DeserializeFromStream(std::istream& is, LoDTensor* tensor, const platform::DeviceContext& dev_ctx); -extern void WriteToRecordIO(recordio::Writer* writer, - const std::vector& tensor, - const platform::DeviceContext& dev_ctx); - -extern bool ReadFromRecordIO(recordio::Scanner* scanner, - const platform::DeviceContext& dev_ctx, - std::vector* result_ptr); - /* * Convert between length-based LoD and offset-based LoD. * The implementation of LoDTensor class use offset-based LoD. diff --git a/paddle/fluid/framework/lod_tensor_test.cc b/paddle/fluid/framework/lod_tensor_test.cc index d1554113bc366f38d1cfd7603e2848f618794d9f..1024076e596b5a87128fd48fa6e4c6570817ed2d 100644 --- a/paddle/fluid/framework/lod_tensor_test.cc +++ b/paddle/fluid/framework/lod_tensor_test.cc @@ -20,9 +20,6 @@ #include "paddle/fluid/framework/lod_tensor.h" -#include "paddle/fluid/recordio/scanner.h" -#include "paddle/fluid/recordio/writer.h" - namespace paddle { namespace framework { @@ -281,52 +278,5 @@ TEST(LoD, ConvertToOffsetBasedLoD) { EXPECT_EQ(offset_lod, expected); } -template -static void TestRecordIO() { - LoDTensor tensor; - T* tmp = tensor.mutable_data(make_ddim({4, 5}), platform::CPUPlace()); - for (int i = 0; i < 20; ++i) { - tmp[i] = static_cast(i); - } - - std::stringstream* stream = new std::stringstream(); - auto& ctx = - *platform::DeviceContextPool::Instance().Get(platform::CPUPlace()); - { - recordio::Writer writer(stream, recordio::Compressor::kSnappy); - WriteToRecordIO(&writer, {tensor, tensor}, ctx); - WriteToRecordIO(&writer, {tensor, tensor}, ctx); - writer.Flush(); - } - - auto assert_tensor_ok = [](const LoDTensor& tensor) { - for (int i = 0; i < 20; ++i) { - ASSERT_EQ(tensor.data()[i], static_cast(i)); - } - }; - - { - std::unique_ptr stream_ptr(stream); - recordio::Scanner scanner(std::move(stream_ptr)); - std::vector tensors; - ASSERT_TRUE(ReadFromRecordIO(&scanner, ctx, &tensors)); - ASSERT_EQ(tensors.size(), static_cast(2)); - assert_tensor_ok(tensors[0]); - assert_tensor_ok(tensors[1]); - ASSERT_TRUE(ReadFromRecordIO(&scanner, ctx, &tensors)); - ASSERT_EQ(tensors.size(), static_cast(2)); - assert_tensor_ok(tensors[0]); - assert_tensor_ok(tensors[1]); - } -} - -TEST(LoDTensor, RecordIO) { - TestRecordIO(); - TestRecordIO(); - TestRecordIO(); - TestRecordIO(); - TestRecordIO(); -} - } // namespace framework } // namespace paddle diff --git a/paddle/fluid/inference/api/demo_ci/CMakeLists.txt b/paddle/fluid/inference/api/demo_ci/CMakeLists.txt index 127441ffef58318180ebe1a45ecb31a0c77a201c..aa31823eeaf8ef0fc55f1bb1f8814c2606d9b437 100644 --- a/paddle/fluid/inference/api/demo_ci/CMakeLists.txt +++ b/paddle/fluid/inference/api/demo_ci/CMakeLists.txt @@ -31,14 +31,10 @@ include_directories("${PADDLE_LIB_THIRD_PARTY_PATH}protobuf/include") include_directories("${PADDLE_LIB_THIRD_PARTY_PATH}glog/include") include_directories("${PADDLE_LIB_THIRD_PARTY_PATH}gflags/include") include_directories("${PADDLE_LIB_THIRD_PARTY_PATH}xxhash/include") -include_directories("${PADDLE_LIB_THIRD_PARTY_PATH}snappy/include") -include_directories("${PADDLE_LIB_THIRD_PARTY_PATH}snappystream/include") include_directories("${PADDLE_LIB_THIRD_PARTY_PATH}zlib/include") include_directories("${PADDLE_LIB}/third_party/boost") include_directories("${PADDLE_LIB}/third_party/eigen3") -link_directories("${PADDLE_LIB_THIRD_PARTY_PATH}snappy/lib") -link_directories("${PADDLE_LIB_THIRD_PARTY_PATH}snappystream/lib") link_directories("${PADDLE_LIB_THIRD_PARTY_PATH}zlib/lib") link_directories("${PADDLE_LIB_THIRD_PARTY_PATH}protobuf/lib") link_directories("${PADDLE_LIB_THIRD_PARTY_PATH}glog/lib") @@ -127,12 +123,12 @@ if (NOT WIN32) set(EXTERNAL_LIB "-lrt -ldl -lpthread") set(DEPS ${DEPS} ${MATH_LIB} ${MKLDNN_LIB} ${NGRAPH_LIB} - glog gflags protobuf snappystream snappy z xxhash + glog gflags protobuf z xxhash ${EXTERNAL_LIB}) else() set(DEPS ${DEPS} ${MATH_LIB} ${MKLDNN_LIB} - glog gflags_static libprotobuf snappy zlibstatic xxhash snappystream ${EXTERNAL_LIB}) + glog gflags_static libprotobuf zlibstatic xxhash ${EXTERNAL_LIB}) set(DEPS ${DEPS} libcmt shlwapi.lib) endif(NOT WIN32) diff --git a/paddle/fluid/operators/reader/CMakeLists.txt b/paddle/fluid/operators/reader/CMakeLists.txt index 616901399f8c5787563dad4f2bbd720a244c96e2..f61af3332911b6115853ed8d382b3ca35161d5b8 100644 --- a/paddle/fluid/operators/reader/CMakeLists.txt +++ b/paddle/fluid/operators/reader/CMakeLists.txt @@ -20,14 +20,7 @@ endfunction() cc_library(py_reader SRCS py_reader.cc DEPS reader) cc_library(buffered_reader SRCS buffered_reader.cc DEPS reader simple_threadpool) -reader_library(open_files_op SRCS open_files_op.cc DEPS buffered_reader) -reader_library(create_random_data_generator_op SRCS create_random_data_generator_op.cc) -reader_library(create_shuffle_reader_op SRCS create_shuffle_reader_op.cc) -reader_library(create_batch_reader_op SRCS create_batch_reader_op.cc) -reader_library(create_recordio_file_reader_op SRCS create_recordio_file_reader_op.cc) reader_library(create_double_buffer_reader_op SRCS create_double_buffer_reader_op.cc DEPS buffered_reader) -reader_library(create_multi_pass_reader_op SRCS create_multi_pass_reader_op.cc) -reader_library(create_custom_reader_op SRCS create_custom_reader_op.cc) reader_library(create_py_reader_op SRCS create_py_reader_op.cc DEPS py_reader) cc_test(reader_blocking_queue_test SRCS reader_blocking_queue_test.cc) diff --git a/paddle/fluid/operators/reader/create_batch_reader_op.cc b/paddle/fluid/operators/reader/create_batch_reader_op.cc deleted file mode 100644 index f771cebd0ccee38a044e9f87a258fe3565398ecb..0000000000000000000000000000000000000000 --- a/paddle/fluid/operators/reader/create_batch_reader_op.cc +++ /dev/null @@ -1,151 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "paddle/fluid/operators/reader/reader_op_registry.h" - -namespace paddle { -namespace operators { -namespace reader { - -class BatchReader : public framework::DecoratedReader { - public: - BatchReader(const std::shared_ptr& reader, int batch_size, - bool discard_leftover) - : DecoratedReader(reader), - batch_size_(static_cast(batch_size)), - discard_leftover_(discard_leftover) { - buffer_.reserve(batch_size_); - } - - void ReadNextImpl(std::vector* out) override; - - private: - size_t batch_size_; - bool discard_leftover_; - std::vector> buffer_; -}; - -class CreateBatchReaderOp : public framework::OperatorBase { - public: - using framework::OperatorBase::OperatorBase; - - private: - void RunImpl(const framework::Scope& scope, - const platform::Place& dev_place) const override { - auto* out = scope.FindVar(Output("Out")) - ->template GetMutable(); - if (out->Get() != nullptr) { - return; - } - const auto& underlying_reader = scope.FindVar(Input("UnderlyingReader")) - ->Get(); - out->Reset(framework::MakeDecoratedReader( - underlying_reader, Attr("batch_size"), - Attr("discard_leftover"))); - } -}; - -class CreateBatchReaderOpMaker : public DecoratedReaderMakerBase { - protected: - void Apply() override { - AddAttr("batch_size", - "How many instances the batch reader yields each time.") - .GreaterThan(0); - AddAttr("discard_leftover", - "If true, the leftover instances that are not enough for a " - "new batch will be discarded.") - .SetDefault(true); - AddComment(R"DOC( - CreateBatchReader Operator - - A batch reader takes another reader as its 'underlying reader', - gathers the underlying reader's outputs and then yields them in batches. - )DOC"); - } -}; - -void BatchReader::ReadNextImpl(std::vector* out) { - buffer_.clear(); - buffer_.reserve(batch_size_); - for (size_t i = 0; i < batch_size_; ++i) { - buffer_.push_back(std::vector()); - reader_->ReadNext(&buffer_.back()); - if (buffer_.back().empty()) { - buffer_.pop_back(); - break; - } - } - if (discard_leftover_ && buffer_.size() < batch_size_) { - buffer_.clear(); - } - // Concat instances - out->clear(); - if (buffer_.empty()) { - // if buffer_ is empty, the 'out' will return as an empty vector. - return; - } - size_t out_num = buffer_[0].size(); - out->reserve(out_num); - for (size_t j = 0; j < out_num; ++j) { - // Merge shape and check date type - auto batch_type = buffer_[0][j].type(); - framework::DDim batch_shape = buffer_[0][j].dims(); - for (size_t i = 1; i < buffer_.size(); ++i) { - auto ins_type = buffer_[i][j].type(); - framework::DDim ins_shape = buffer_[i][j].dims(); - PADDLE_ENFORCE_EQ(batch_type, ins_type); - PADDLE_ENFORCE_EQ(slice_ddim(batch_shape, 1, batch_shape.size()), - slice_ddim(ins_shape, 1, ins_shape.size())); - PADDLE_ENFORCE_GT(ins_shape[0], 0); - batch_shape[0] += ins_shape[0]; - } - - framework::LoDTensor out_tensor; - out_tensor.Resize(batch_shape); - out_tensor.mutable_data(platform::CPUPlace(), batch_type); - int64_t dst_offset = 0; - - // Merge lod and data - framework::LoD batch_lod; - for (size_t i = 0; i < buffer_.size(); ++i) { - framework::DDim ins_shape = buffer_[i][j].dims(); - framework::LoD ins_lod = buffer_[i][j].lod(); - if (i == 0) { - batch_lod = ins_lod; - } else { - PADDLE_ENFORCE_EQ(batch_lod.size(), ins_lod.size()); - for (size_t level_idx = 0; level_idx < batch_lod.size(); ++level_idx) { - auto& lod_level = batch_lod[level_idx]; - for (size_t k = 1; k < ins_lod[level_idx].size(); ++k) { - lod_level.push_back(ins_lod[level_idx][k] + lod_level.back()); - } - } - } - auto dst = out_tensor.Slice(dst_offset, dst_offset + ins_shape[0]); - TensorCopy(buffer_[i][j], platform::CPUPlace(), &dst); - dst_offset += ins_shape[0]; - } - out_tensor.set_lod(batch_lod); - out->push_back(out_tensor); - } -} - -} // namespace reader -} // namespace operators -} // namespace paddle - -namespace ops = paddle::operators::reader; -REGISTER_DECORATED_READER_OPERATOR(create_batch_reader, - ops::CreateBatchReaderOp, - ops::CreateBatchReaderOpMaker); diff --git a/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc b/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc deleted file mode 100644 index 0a225597d34f43c7fb82aeae2552cdf16c8ba566..0000000000000000000000000000000000000000 --- a/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc +++ /dev/null @@ -1,93 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "paddle/fluid/operators/detail/safe_ref.h" -#include "paddle/fluid/operators/reader/reader_op_registry.h" - -namespace paddle { -namespace operators { -namespace reader { - -class MultiPassReader : public framework::DecoratedReader { - public: - MultiPassReader(const std::shared_ptr& reader, int pass_num) - : DecoratedReader(reader), pass_num_(pass_num), pass_count_(0) {} - - void ReadNextImpl(std::vector* out) override { - reader_->ReadNext(out); - if (out->empty() && pass_count_ < pass_num_ - 1) { - reader_->Shutdown(); - reader_->Start(); - reader_->ReadNext(out); - ++pass_count_; - } - } - - private: - void StartImpl() override { - pass_count_ = 0; - reader_->Start(); - } - - int pass_num_; - mutable int pass_count_; -}; - -class CreateMultiPassReaderOp : public framework::OperatorBase { - public: - using framework::OperatorBase::OperatorBase; - - private: - void RunImpl(const framework::Scope& scope, - const platform::Place& dev_place) const override { - auto* out = detail::Ref(scope.FindVar(Output("Out"))) - .GetMutable(); - if (out->Get() != nullptr) { - return; - } - const auto& underlying_reader = scope.FindVar(Input("UnderlyingReader")) - ->Get(); - int pass_num = Attr("pass_num"); - out->Reset(framework::MakeDecoratedReader( - underlying_reader, pass_num)); - } -}; - -class CreateMultiPassReaderOpMaker : public DecoratedReaderMakerBase { - protected: - void Apply() override { - AddAttr("pass_num", "The number of pass to run.").GreaterThan(0); - AddComment(R"DOC( - CreateMultiPassReader Operator - - This operator creates a multi-pass reader. A multi-pass reader - is used to yield data for several pass training continuously. - It takes the number of passes to run as one of its attributes - ('pass_num'), and maintains a pass counter to record how many - passes it has completed. When the underlying reader reaches the - EOF, the multi-pass reader checks whether it has completed training - of the given number of pass. If not, the underlying reader will - be re-initialized and starts a new pass automatically. - )DOC"); - } -}; - -} // namespace reader -} // namespace operators -} // namespace paddle - -namespace ops = paddle::operators::reader; -REGISTER_DECORATED_READER_OPERATOR(create_multi_pass_reader, - ops::CreateMultiPassReaderOp, - ops::CreateMultiPassReaderOpMaker); diff --git a/paddle/fluid/operators/reader/create_random_data_generator_op.cc b/paddle/fluid/operators/reader/create_random_data_generator_op.cc deleted file mode 100644 index e5c116dfcd71ef40597ca19d1da0b51038baaad1..0000000000000000000000000000000000000000 --- a/paddle/fluid/operators/reader/create_random_data_generator_op.cc +++ /dev/null @@ -1,107 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "paddle/fluid/operators/reader/reader_op_registry.h" - -namespace paddle { -namespace operators { -namespace reader { - -template -class RandomDataGenerator : public framework::FileReader { - public: - RandomDataGenerator(const std::vector& shapes, float low, - float high) - : framework::FileReader(), low_(low), high_(high), shapes_(shapes) { - PADDLE_ENFORCE_LE(low, high, - "'low' shouldn't be greater than 'high'.(%f vs %f)", low, - high); - unsigned int seed = std::random_device()(); - engine_.seed(seed); - dist_ = std::uniform_real_distribution(low_, high_); - } - - void ReadNextImpl(std::vector* out) override { - out->clear(); - out->reserve(shapes_.size()); - for (const framework::DDim& shape : shapes_) { - PADDLE_ENFORCE_GE( - shape.size(), 2, - "The rank of reader's output data should be 2 at least.(Now it's %d)", - shape.size()); - framework::LoDTensor out_tensor; - out_tensor.Resize(shape); - T* data = out_tensor.mutable_data(platform::CPUPlace()); - int64_t numel = framework::product(shape); - for (int64_t i = 0; i < numel; ++i) { - data[i] = dist_(engine_); - } - out->push_back(out_tensor); - } - } - - private: - float low_; - float high_; - std::minstd_rand engine_; - std::uniform_real_distribution dist_; - std::vector shapes_; -}; - -template -class CreateRandomDataGeneratorOp : public framework::OperatorBase { - public: - using framework::OperatorBase::OperatorBase; - - private: - void RunImpl(const framework::Scope& scope, - const platform::Place& dev_place) const override { - const auto& shape_concat = Attr>("shape_concat"); - const auto& ranks = Attr>("ranks"); - PADDLE_ENFORCE(!shape_concat.empty() && !ranks.empty()); - PADDLE_ENFORCE_EQ(std::accumulate(ranks.begin(), ranks.end(), 0), - static_cast(shape_concat.size()), - "The accumulate of all ranks should be equal to the " - "shape concat's length."); - std::vector shapes = RestoreShapes(shape_concat, ranks); - auto* out = scope.FindVar(Output("Out")) - ->template GetMutable(); - out->Reset(std::make_shared>( - shapes, Attr("low"), Attr("high"))); - } -}; - -class CreateRandomDataGeneratorOpMaker : public FileReaderMakerBase { - protected: - void Apply() override { - AddAttr("low", "The lower bound of reader's uniform distribution."); - AddAttr("high", "The upper bound of reader's uniform distribution."); - AddComment(R"DOC( - CreateRandomDataGenerator Operator - - This Op creates a random reader. - The reader generates random data instead of really reading from files. - Generated data follow an uniform distribution between 'low' and 'high'. - )DOC"); - } -}; - -} // namespace reader -} // namespace operators -} // namespace paddle - -namespace ops = paddle::operators::reader; -REGISTER_FILE_READER_OPERATOR(create_random_data_generator, - ops::CreateRandomDataGeneratorOp, - ops::CreateRandomDataGeneratorOpMaker); diff --git a/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc b/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc deleted file mode 100644 index d7a048257f92c1c58c34decf1a93ff95f5f736c7..0000000000000000000000000000000000000000 --- a/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc +++ /dev/null @@ -1,93 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "paddle/fluid/operators/reader/reader_op_registry.h" -#include "paddle/fluid/platform/lock_guard_ptr.h" -#include "paddle/fluid/recordio/scanner.h" - -namespace paddle { -namespace operators { -namespace reader { -template -class RecordIOFileReader : public framework::FileReader { - public: - explicit RecordIOFileReader(const std::string& filename) - : scanner_(filename), - dev_ctx_(*platform::DeviceContextPool::Instance().Get( - platform::CPUPlace())) { - if (ThreadSafe) { - mutex_.reset(new std::mutex()); - } - LOG(INFO) << "Creating file reader" << filename; - } - - protected: - void ReadNextImpl(std::vector* out) override { - platform::LockGuardPtr guard(mutex_); - bool ok = framework::ReadFromRecordIO(&scanner_, dev_ctx_, out); - if (!ok) { - out->clear(); - } - } - - void StartImpl() override { scanner_.Reset(); } - - private: - std::unique_ptr mutex_; - recordio::Scanner scanner_; - const platform::DeviceContext& dev_ctx_; -}; - -class CreateRecordIOReaderOp : public framework::OperatorBase { - public: - using framework::OperatorBase::OperatorBase; - - private: - void RunImpl(const framework::Scope& scope, - const platform::Place& dev_place) const override { - std::string filename = Attr("filename"); - auto* out = scope.FindVar(Output("Out")) - ->template GetMutable(); - - out->Reset(std::make_shared>(filename)); - } -}; - -class CreateRecordIOReaderOpMaker : public FileReaderMakerBase { - protected: - void Apply() override { - AddAttr( - "filename", - "The filename of record file. This file will given to reader."); - AddComment(R"DOC( -Open a recordio file and return the reader object. The returned reader object -is thread-safe. - -NOTE: This is a very low-level API. It is used for debugging data file or -training. Please use `open_files` instead of this API for production usage. - )DOC"); - } -}; - -} // namespace reader -} // namespace operators -} // namespace paddle - -namespace reader = paddle::operators::reader; - -REGISTER_FILE_READER_OPERATOR(create_recordio_file_reader, - reader::CreateRecordIOReaderOp, - reader::CreateRecordIOReaderOpMaker); - -REGISTER_FILE_READER(recordio, reader::RecordIOFileReader); diff --git a/paddle/fluid/operators/reader/create_shuffle_reader_op.cc b/paddle/fluid/operators/reader/create_shuffle_reader_op.cc deleted file mode 100644 index 3f72890a7cee1453585d50afa04fa62a9b059dc3..0000000000000000000000000000000000000000 --- a/paddle/fluid/operators/reader/create_shuffle_reader_op.cc +++ /dev/null @@ -1,124 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include "glog/logging.h" -#include "paddle/fluid/operators/detail/safe_ref.h" -#include "paddle/fluid/operators/reader/reader_op_registry.h" - -namespace paddle { -namespace operators { -namespace reader { - -class ShuffleReader : public framework::DecoratedReader { - public: - ShuffleReader(const std::shared_ptr& reader, size_t buffer_size, - size_t seed = 0) - : DecoratedReader(reader), buffer_size_(buffer_size), seed_(seed) { - VLOG(10) << "Create shuffle reader of " << reader_; - if (seed_ == 0) { - std::random_device device; - seed_ = device(); - } - ReloadBuffer(); - } - - void ReadNextImpl(std::vector* out) override { - out->clear(); - if (iteration_pos_ >= buffer_.size()) { - VLOG(10) << "Resetting shuffle buffer"; - ReloadBuffer(); - if (buffer_.empty()) { - return; - } - } - *out = buffer_[iteration_pos_++]; - } - - private: - void ShutdownImpl() override { - reader_->Shutdown(); - buffer_.clear(); - iteration_pos_ = 0; - } - - void StartImpl() override { - reader_->Start(); - ReloadBuffer(); - } - - void ReloadBuffer() { - buffer_.clear(); - buffer_.reserve(buffer_size_); - iteration_pos_ = 0; - for (size_t i = 0; i < buffer_size_; ++i) { - std::vector ins; - reader_->ReadNext(&ins); - if (ins.empty()) { - break; - } - buffer_.emplace_back(ins); - } - std::mt19937 g(seed_); - std::shuffle(buffer_.begin(), buffer_.end(), g); - seed_ = g(); // update seed_; - VLOG(10) << "random buffer size = " << buffer_.size(); - } - - size_t buffer_size_; - std::vector> buffer_; - - size_t iteration_pos_; - size_t seed_; -}; - -class CreateShuffleReaderOp : public framework::OperatorBase { - public: - using framework::OperatorBase::OperatorBase; - - private: - void RunImpl(const framework::Scope& scope, - const platform::Place& dev_place) const override { - auto* out = detail::Ref(scope.FindVar(Output("Out"))) - .GetMutable(); - if (out->Get() != nullptr) { - return; - } - const auto& underlying_reader = scope.FindVar(Input("UnderlyingReader")) - ->Get(); - out->Reset(framework::MakeDecoratedReader( - underlying_reader, static_cast(Attr("buffer_size")))); - } -}; - -class CreateShuffleReaderOpMaker : public DecoratedReaderMakerBase { - protected: - void Apply() override { - AddAttr("buffer_size", "The shuffle buffer size.").GreaterThan(0); - AddComment(R"DOC( - CreateShuffleReader Operator - - A shuffle reader takes another reader as its 'underlying reader' - and yields the underlying reader's outputs in a shuffled order. - )DOC"); - } -}; -} // namespace reader -} // namespace operators -} // namespace paddle - -namespace ops = paddle::operators::reader; -REGISTER_DECORATED_READER_OPERATOR(create_shuffle_reader, - ops::CreateShuffleReaderOp, - ops::CreateShuffleReaderOpMaker); diff --git a/paddle/fluid/operators/reader/open_files_op.cc b/paddle/fluid/operators/reader/open_files_op.cc deleted file mode 100644 index 38223e069975a08791d58d6ae10e2112b79a61fe..0000000000000000000000000000000000000000 --- a/paddle/fluid/operators/reader/open_files_op.cc +++ /dev/null @@ -1,277 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include // NOLINT -#include "ThreadPool.h" -#include "paddle/fluid/framework/blocking_queue.h" -#include "paddle/fluid/operators/reader/blocking_queue.h" -#include "paddle/fluid/operators/reader/buffered_reader.h" -#include "paddle/fluid/operators/reader/reader_op_registry.h" - -namespace paddle { -namespace operators { -namespace reader { - -class IReaderContainer { - public: - virtual ~IReaderContainer() {} - virtual void AppendReader( - std::unique_ptr&& readers) = 0; - virtual void Stop() = 0; - virtual void Start() = 0; - virtual void ReadNext(std::vector* out) = 0; -}; - -class OrderedReaderContainer : public IReaderContainer { - public: - void AppendReader(std::unique_ptr&& reader) override { - pending_.emplace(std::move(reader)); - } - - void Stop() override { - while (!pending_.empty()) { - MoveFrontPendingToDone(); - } - } - - void Start() override { std::swap(done_, pending_); } - - void ReadNext(std::vector* out) override { - if (!pending_.empty()) { - pending_.front()->ReadNext(out); - if (out->empty()) { - MoveFrontPendingToDone(); - ReadNext(out); - } - } else { - out->clear(); - } - } - - private: - void MoveFrontPendingToDone() { - pending_.front()->Shutdown(); - pending_.front()->Start(); - done_.emplace(move(pending_.front())); - pending_.pop(); - } - - std::queue> pending_; - std::queue> done_; -}; - -class PreemptiveReaderContainer : public IReaderContainer { - using ReaderList = std::list>; - - struct FutureItem { - std::vector data_; - ReaderList::iterator reader_it_; - std::exception_ptr exception_; - }; - - using FutureList = std::list>; - - public: - explicit PreemptiveReaderContainer(size_t thread_num) : pool_(thread_num) {} - - void Stop() override { - if (!pending_.empty()) { - for (auto& reader : pending_) { - reader->Shutdown(); - } - for (auto& fu : futures_) { - fu.wait(); - } - futures_.clear(); - for (auto& reader : pending_) { - reader->Start(); - done_.emplace_back(std::move(reader)); - } - pending_.clear(); - bool timeout; - complete_queue_.PopAll(1000, &timeout); - PADDLE_ENFORCE(!timeout); - } - } - - void Start() override { - for (auto& reader : done_) { - AppendReader(std::move(reader)); - } - done_.clear(); - } - - void ReadNext(std::vector* out) override { - if (!pending_.empty()) { - auto future_it = complete_queue_.Pop(); - FutureItem item = future_it->get(); - if (item.exception_) { - for (auto it = futures_.begin(); it != futures_.end(); ++it) { - if (it != future_it) { - it->wait(); // Wait all other threads complete. - } - } - std::rethrow_exception(item.exception_); - - } else if (item.data_.empty()) { // reader done. - done_.emplace_back(std::move(*item.reader_it_)); - pending_.erase(item.reader_it_); - futures_.erase(future_it); - ReadNext(out); - } else { - *out = item.data_; - // continue read async - ReadAsync(item.reader_it_, &future_it); - } - } else { - out->clear(); - } - } - - private: - void AppendReader(std::unique_ptr&& reader) override { - pending_.emplace_back(std::move(reader)); - auto reader_it = pending_.end(); - --reader_it; - - futures_.emplace_back(); - auto future_it = futures_.end(); - --future_it; - - ReadAsync(reader_it, &future_it); - } - - void ReadAsync(const ReaderList::iterator& reader_it, - FutureList::iterator* future_it_ptr) { - auto& future_it = *future_it_ptr; - *future_it = pool_.enqueue([reader_it, future_it, this] { - try { - FutureItem item; - item.reader_it_ = reader_it; - (*reader_it)->ReadNext(&item.data_); - if (item.data_.empty()) { - (*reader_it)->Shutdown(); - (*reader_it)->Start(); - } - complete_queue_.Push(future_it); - return item; - } catch (...) { - FutureItem item; - item.exception_ = std::current_exception(); - complete_queue_.Push(future_it); - return item; - } - }); - } - - FutureList futures_; - ThreadPool pool_; - framework::BlockingQueue complete_queue_; - std::list> pending_; - std::list> done_; -}; - -class MultiFileReader : public framework::ReaderBase { - public: - MultiFileReader(const std::vector& file_names, - std::unique_ptr&& container) - : container_(std::move(container)) { - for (auto& fn : file_names) { - container_->AppendReader(CreateReaderByFileName(fn)); - } - } - - ~MultiFileReader() { container_->Stop(); } - - protected: - void ReadNextImpl(std::vector* out) override { - container_->ReadNext(out); - } - void ShutdownImpl() override { container_->Stop(); } - void StartImpl() override { container_->Start(); } - - private: - std::unique_ptr container_; -}; - -class OpenFilesOp : public framework::OperatorBase { - public: - using framework::OperatorBase::OperatorBase; - - private: - void RunImpl(const framework::Scope& scope, - const platform::Place& dev_place) const override { - const auto& shape_concat = Attr>("shape_concat"); - const auto& ranks = Attr>("ranks"); - PADDLE_ENFORCE(!shape_concat.empty() && !ranks.empty()); - PADDLE_ENFORCE_EQ(std::accumulate(ranks.begin(), ranks.end(), 0), - static_cast(shape_concat.size()), - "The accumulate of all ranks should be equal to the " - "shape concat's length."); - const auto& file_names = Attr>("file_names"); - PADDLE_ENFORCE(!file_names.empty(), "No file to be read!"); - bool is_test = Attr("is_test"); - - auto* out = scope.FindVar(Output("Out")) - ->template GetMutable(); - std::unique_ptr container; - - if (is_test) { - container.reset(new OrderedReaderContainer()); - } else { - container.reset(new PreemptiveReaderContainer( - static_cast(Attr("thread_num")))); - } - - std::shared_ptr reader( - new MultiFileReader(file_names, std::move(container))); - auto buffer_size = Attr("buffer_size"); - if (buffer_size > 1) { - reader = framework::MakeDecoratedReader( - reader, platform::CPUPlace(), buffer_size); - } - out->Reset(reader); - } -}; - -class OpenFilesOpMaker : public FileReaderMakerBase { - protected: - void Apply() override { - AddAttr>("file_names", "Files to be read."); - AddAttr("is_test", "Used for testing data.").SetDefault(false); - - AddComment(R"DOC( - OpenFiles Operator - - An OpenFilesOp creates a MultiFileReader, which is able to - read data multi-threaded from multiple files. - )DOC"); - AddAttr("thread_num", - "The maximal concurrent prefetch thread number. Used only " - "when is_test = False"); - AddAttr("buffer_size", "The reading buffer of these files.") - .GreaterThan(0); - } -}; - -} // namespace reader -} // namespace operators -} // namespace paddle - -namespace reader = paddle::operators::reader; - -REGISTER_FILE_READER_OPERATOR(open_files, reader::OpenFilesOp, - reader::OpenFilesOpMaker); diff --git a/paddle/fluid/pybind/CMakeLists.txt b/paddle/fluid/pybind/CMakeLists.txt index 8ee03c79829d706d74b57271e5eb0ba546aa9231..ff35ca6ca4ee2c958fe5f3250763c68ec1fe925d 100644 --- a/paddle/fluid/pybind/CMakeLists.txt +++ b/paddle/fluid/pybind/CMakeLists.txt @@ -15,7 +15,6 @@ set(PYBIND_SRCS exception.cc protobuf.cc const_value.cc - recordio.cc reader_py.cc fleet_wrapper_py.cc nccl_wrapper_py.cc diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index c0aeb82b7b87f6e2f52c3dd778a397d15a700b44..2303c2e6656331cd5b8189c086bcaa4cd97f23ab 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -66,7 +66,6 @@ limitations under the License. */ #include "paddle/fluid/pybind/protobuf.h" #include "paddle/fluid/pybind/pybind.h" // NOLINT #include "paddle/fluid/pybind/reader_py.h" -#include "paddle/fluid/pybind/recordio.h" #include "paddle/fluid/pybind/tensor_py.h" #include "paddle/fluid/string/to_string.h" #ifdef PADDLE_WITH_CUDA @@ -1662,7 +1661,6 @@ All parameter, weight, gradient are variables in Paddle. return self.Run(fetch_tensors); }); - BindRecordIOWriter(&m); BindFleetWrapper(&m); #ifndef _WIN32 BindNCCLWrapper(&m); diff --git a/paddle/fluid/pybind/recordio.cc b/paddle/fluid/pybind/recordio.cc deleted file mode 100644 index 32caf4bed9a37340c267038a8d173f0ccceca75a..0000000000000000000000000000000000000000 --- a/paddle/fluid/pybind/recordio.cc +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "paddle/fluid/pybind/recordio.h" - -#include -#include -#include - -#include "paddle/fluid/framework/lod_tensor.h" -#include "paddle/fluid/recordio/writer.h" - -namespace paddle { -namespace pybind { - -namespace { - -class RecordIOWriter { - public: - RecordIOWriter(const std::string& filename, recordio::Compressor compressor, - size_t max_num_record) - : closed_(false), - stream_(filename, std::ios::binary), - writer_(&stream_, compressor, max_num_record) {} - - void AppendTensor(const framework::LoDTensor& tensor) { - tensors_.push_back(tensor); - } - - void CompleteAppendTensor() { - auto& ctx = - *platform::DeviceContextPool::Instance().Get(platform::CPUPlace()); - framework::WriteToRecordIO(&writer_, tensors_, ctx); - tensors_.clear(); - } - - void Close() { - PADDLE_ENFORCE(tensors_.empty()); - writer_.Flush(); - stream_.close(); - closed_ = true; - } - - ~RecordIOWriter() { - if (!closed_) { - Close(); - } - } - - private: - bool closed_; - std::vector tensors_; - std::ofstream stream_; - recordio::Writer writer_; -}; - -} // namespace - -void BindRecordIOWriter(py::module* m) { - py::class_ writer(*m, "RecordIOWriter", ""); - py::enum_(writer, "Compressor", "") - .value("Snappy", recordio::Compressor::kSnappy) - .value("NoCompress", recordio::Compressor::kNoCompress); - - writer - .def("__init__", - [](RecordIOWriter& self, const std::string& filename, - recordio::Compressor compressor, size_t max_num_record) { - new (&self) RecordIOWriter(filename, compressor, max_num_record); - }) - .def("append_tensor", &RecordIOWriter::AppendTensor) - .def("complete_append_tensor", &RecordIOWriter::CompleteAppendTensor) - .def("close", &RecordIOWriter::Close); -} - -} // namespace pybind -} // namespace paddle diff --git a/paddle/fluid/pybind/recordio.h b/paddle/fluid/pybind/recordio.h deleted file mode 100644 index 2555f9b719af8f73fbac10d92b890afd99fac290..0000000000000000000000000000000000000000 --- a/paddle/fluid/pybind/recordio.h +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once -#include "pybind11/pybind11.h" -#include "pybind11/stl.h" - -namespace py = pybind11; - -namespace paddle { -namespace pybind { - -void BindRecordIOWriter(py::module* m); - -} // namespace pybind -} // namespace paddle diff --git a/paddle/fluid/recordio/CMakeLists.txt b/paddle/fluid/recordio/CMakeLists.txt deleted file mode 100644 index 92e97a6c85d7c8f01c8473feb9772f2285d49673..0000000000000000000000000000000000000000 --- a/paddle/fluid/recordio/CMakeLists.txt +++ /dev/null @@ -1,9 +0,0 @@ -# internal library. -cc_library(header SRCS header.cc) -cc_test(header_test SRCS header_test.cc DEPS header) -cc_library(chunk SRCS chunk.cc DEPS snappystream snappy header zlib) -cc_test(chunk_test SRCS chunk_test.cc DEPS chunk) -cc_library(writer SRCS writer.cc DEPS chunk) -cc_library(scanner SRCS scanner.cc DEPS chunk) -cc_test(writer_scanner_test SRCS writer_scanner_test.cc DEPS writer scanner) -cc_library(recordio DEPS chunk header writer scanner) diff --git a/paddle/fluid/recordio/README.md b/paddle/fluid/recordio/README.md deleted file mode 100644 index ef99c0cf0fa71d807a95898454d8fabb287324e9..0000000000000000000000000000000000000000 --- a/paddle/fluid/recordio/README.md +++ /dev/null @@ -1,13 +0,0 @@ -## Background - -The RecordIO file format is a container for records. This package is a C++ implementation of https://github.com/paddlepaddle/recordio, which originates from https://github.com/wangkuiyi/recordio. - -## Fault-tolerant Writing - -For the initial design purpose of RecordIO within Google, which was logging, RecordIO groups record into *chunks*, whose header contains an MD5 hash of the chunk. A process that writes logs is supposed to call the Writer interface to add records. Once the writer accumulates a handful of them, it groups a chunk, put the MD5 into the chunk header, and appends the chunk to the file. In the event the process crashes unexpected, the last chunk in the RecordIO file could be incomplete/corrupt. The RecordIO reader is able to recover from these errors when the process restarts by identifying incomplete chucks and skipping over them. - -## Reading Ranges - -A side-effect of chunks is to make it easy to indexing records while reading, thus allows us to read a range of successive records. This is good for distributed log process, where each MapReduce task handles only part of records in a big RecordIO file. - -The procedure that creates the index starts from reading the header of the first chunk. It indexes the offset (0) and the size of the chunk, and skips to the header of the next chunk by calling the `fseek` API. Please be aware that most distributed filesystems and all POSIX-compatible local filesystem provides `fseek`, and makes sure that `fseek` runs much faster than `fread`. This procedure generates a map from chunks to their offsets, which allows the readers is to locate and read a range of records. diff --git a/paddle/fluid/recordio/chunk.cc b/paddle/fluid/recordio/chunk.cc deleted file mode 100644 index 6c65d9160c059ac143ee258b2bdaed5915a1dca1..0000000000000000000000000000000000000000 --- a/paddle/fluid/recordio/chunk.cc +++ /dev/null @@ -1,174 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "paddle/fluid/recordio/chunk.h" - -#include -#include -#include -#include - -#include "paddle/fluid/platform/enforce.h" -#include "snappystream.hpp" - -namespace paddle { -namespace recordio { -constexpr size_t kMaxBufSize = 1024; - -/** - * Read Stream by a fixed sized buffer. - * @param in input stream - * @param limit read at most `limit` bytes from input stream. 0 means no limit - * @param callback A function object with (const char* buf, size_t size) -> void - * as its type. - */ -template -static void ReadStreamByBuf(std::istream& in, size_t limit, Callback callback) { - char buf[kMaxBufSize]; - std::streamsize actual_size; - size_t counter = 0; - size_t actual_max; - while (!in.eof() || - (limit != 0 && counter >= limit)) { // End of file or reach limit - actual_max = - limit != 0 ? std::min(limit - counter, kMaxBufSize) : kMaxBufSize; - in.read(buf, actual_max); - actual_size = in.gcount(); - if (actual_size == 0) { - break; - } - callback(buf, actual_size); - if (limit != 0) { - counter += actual_size; - } - } - in.clear(); // unset eof state -} - -/** - * Copy stream in to another stream - */ -static void PipeStream(std::istream& in, std::ostream& os) { - ReadStreamByBuf(in, 0, - [&os](const char* buf, size_t len) { os.write(buf, len); }); -} - -/** - * Calculate CRC32 from an input stream. - */ -static uint32_t Crc32Stream(std::istream& in, size_t limit = 0) { - uint32_t crc = static_cast(crc32(0, nullptr, 0)); - ReadStreamByBuf(in, limit, [&crc](const char* buf, size_t len) { - crc = static_cast(crc32(crc, reinterpret_cast(buf), - static_cast(len))); - }); - return crc; -} - -bool Chunk::Write(std::ostream& os, Compressor ct) const { - // NOTE(dzhwinter): don't check records.numBytes instead, because - // empty records are allowed. - if (records_.empty()) { - return false; - } - std::stringstream sout; - std::unique_ptr compressed_stream; - switch (ct) { - case Compressor::kNoCompress: - break; - case Compressor::kSnappy: - compressed_stream.reset(new snappy::oSnappyStream(sout)); - break; - default: - PADDLE_THROW("Not implemented"); - } - - std::ostream& buf_stream = compressed_stream ? *compressed_stream : sout; - - for (auto& record : records_) { - size_t sz = record.size(); - buf_stream.write(reinterpret_cast(&sz), sizeof(uint32_t)) - .write(record.data(), record.size()); - } - - if (compressed_stream) { - compressed_stream.reset(); - } - - sout.seekg(0, std::ios::end); - uint32_t len = static_cast(sout.tellg()); - sout.seekg(0, std::ios::beg); - uint32_t crc = Crc32Stream(sout); - Header hdr(static_cast(records_.size()), crc, ct, len); - hdr.Write(os); - sout.seekg(0, std::ios::beg); - sout.clear(); - PipeStream(sout, os); - return true; -} - -bool Chunk::Parse(std::istream& sin) { - ChunkParser parser(sin); - if (!parser.Init()) { - return false; - } - Clear(); - while (parser.HasNext()) { - Add(parser.Next()); - } - return true; -} - -ChunkParser::ChunkParser(std::istream& sin) : in_(sin) {} -bool ChunkParser::Init() { - pos_ = 0; - bool ok = header_.Parse(in_); - if (!ok) { - return ok; - } - auto beg_pos = in_.tellg(); - uint32_t crc = Crc32Stream(in_, header_.CompressSize()); - PADDLE_ENFORCE_EQ(header_.Checksum(), crc); - in_.seekg(beg_pos, in_.beg); - - switch (header_.CompressType()) { - case Compressor::kNoCompress: - break; - case Compressor::kSnappy: - compressed_stream_.reset(new snappy::iSnappyStream(in_)); - break; - default: - PADDLE_THROW("Not implemented"); - } - return true; -} - -bool ChunkParser::HasNext() const { return pos_ < header_.NumRecords(); } - -std::string ChunkParser::Next() { - if (!HasNext()) { - return ""; - } - ++pos_; - std::istream& stream = compressed_stream_ ? *compressed_stream_ : in_; - uint32_t rec_len; - stream.read(reinterpret_cast(&rec_len), sizeof(uint32_t)); - std::string buf; - buf.resize(rec_len); - stream.read(&buf[0], rec_len); - PADDLE_ENFORCE_EQ(rec_len, stream.gcount()); - return buf; -} -} // namespace recordio -} // namespace paddle diff --git a/paddle/fluid/recordio/chunk.h b/paddle/fluid/recordio/chunk.h deleted file mode 100644 index cfb954a591679c2d2c4f42ecd99ca0c8bd1084cf..0000000000000000000000000000000000000000 --- a/paddle/fluid/recordio/chunk.h +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once -#include -#include -#include - -#include "paddle/fluid/platform/macros.h" -#include "paddle/fluid/recordio/header.h" - -namespace paddle { -namespace recordio { - -// A Chunk contains the Header and optionally compressed records. -class Chunk { - public: - Chunk() : num_bytes_(0) {} - void Add(const std::string& buf) { - num_bytes_ += buf.size(); - records_.emplace_back(buf); - } - // dump the chunk into w, and clears the chunk and makes it ready for - // the next add invocation. - bool Write(std::ostream& fo, Compressor ct) const; - void Clear() { - records_.clear(); - num_bytes_ = 0; - } - - // returns true if ok, false if eof - bool Parse(std::istream& sin); - size_t NumBytes() const { return num_bytes_; } - size_t NumRecords() const { return records_.size(); } - const std::string& Record(int i) const { return records_[i]; } - - bool Empty() const { return records_.empty(); } - - private: - std::vector records_; - // sum of record lengths in bytes. - size_t num_bytes_; - DISABLE_COPY_AND_ASSIGN(Chunk); -}; - -class ChunkParser { - public: - explicit ChunkParser(std::istream& sin); - - bool Init(); - std::string Next(); - bool HasNext() const; - - private: - Header header_; - uint32_t pos_{0}; - std::istream& in_; - std::unique_ptr compressed_stream_; -}; - -} // namespace recordio -} // namespace paddle diff --git a/paddle/fluid/recordio/chunk_test.cc b/paddle/fluid/recordio/chunk_test.cc deleted file mode 100644 index 5177475c016097d9a118aa79f855672354b3ef53..0000000000000000000000000000000000000000 --- a/paddle/fluid/recordio/chunk_test.cc +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "paddle/fluid/recordio/chunk.h" - -#include - -#include "gtest/gtest.h" - -TEST(Chunk, SaveLoad) { - paddle::recordio::Chunk ch; - ch.Add(std::string("12345", 6)); - ch.Add(std::string("123", 4)); - std::stringstream ss; - ch.Write(ss, paddle::recordio::Compressor::kNoCompress); - ss.seekg(0); - ch.Parse(ss); - ASSERT_EQ(ch.NumBytes(), 10U); -} - -TEST(Chunk, Compressor) { - paddle::recordio::Chunk ch; - ch.Add(std::string("12345", 6)); - ch.Add(std::string("123", 4)); - ch.Add(std::string("123", 4)); - ch.Add(std::string("123", 4)); - std::stringstream ss; - ch.Write(ss, paddle::recordio::Compressor::kSnappy); - std::stringstream ss2; - ch.Write(ss2, paddle::recordio::Compressor::kNoCompress); - ASSERT_LE(ss.tellp(), ss2.tellp()); // Compress should contain less data; - - ch.Clear(); - ch.Parse(ss); - ASSERT_EQ(ch.NumBytes(), 18ul); -} diff --git a/paddle/fluid/recordio/header.cc b/paddle/fluid/recordio/header.cc deleted file mode 100644 index c4822329a43a79adc81f0b0cf145b22661ac6f50..0000000000000000000000000000000000000000 --- a/paddle/fluid/recordio/header.cc +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "paddle/fluid/recordio/header.h" - -#include - -#include "paddle/fluid/platform/enforce.h" - -namespace paddle { -namespace recordio { - -Header::Header() - : num_records_(0), - checksum_(0), - compressor_(Compressor::kNoCompress), - compress_size_(0) {} - -Header::Header(uint32_t num, uint32_t sum, Compressor c, uint32_t cs) - : num_records_(num), checksum_(sum), compressor_(c), compress_size_(cs) {} - -bool Header::Parse(std::istream& is) { - uint32_t magic; - is.read(reinterpret_cast(&magic), sizeof(uint32_t)); - size_t read_size = is.gcount(); - if (read_size < sizeof(uint32_t)) { - return false; - } - PADDLE_ENFORCE_EQ(magic, kMagicNumber); - - is.read(reinterpret_cast(&num_records_), sizeof(uint32_t)) - .read(reinterpret_cast(&checksum_), sizeof(uint32_t)) - .read(reinterpret_cast(&compressor_), sizeof(uint32_t)) - .read(reinterpret_cast(&compress_size_), sizeof(uint32_t)); - return true; -} - -void Header::Write(std::ostream& os) const { - os.write(reinterpret_cast(&kMagicNumber), sizeof(uint32_t)) - .write(reinterpret_cast(&num_records_), sizeof(uint32_t)) - .write(reinterpret_cast(&checksum_), sizeof(uint32_t)) - .write(reinterpret_cast(&compressor_), sizeof(uint32_t)) - .write(reinterpret_cast(&compress_size_), sizeof(uint32_t)); -} - -std::ostream& operator<<(std::ostream& os, Header h) { - os << "Header: " << h.NumRecords() << ", " << h.Checksum() << ", " - << static_cast(h.CompressType()) << ", " << h.CompressSize(); - return os; -} - -bool operator==(Header l, Header r) { - return l.NumRecords() == r.NumRecords() && l.Checksum() == r.Checksum() && - l.CompressType() == r.CompressType() && - l.CompressSize() == r.CompressSize(); -} - -} // namespace recordio -} // namespace paddle diff --git a/paddle/fluid/recordio/header.h b/paddle/fluid/recordio/header.h deleted file mode 100644 index 245425990b93a90d7ac6b233cff54feb48308d48..0000000000000000000000000000000000000000 --- a/paddle/fluid/recordio/header.h +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include - -namespace paddle { -namespace recordio { - -// MagicNumber for memory checking -constexpr uint32_t kMagicNumber = 0x01020304; - -enum class Compressor : uint32_t { - // NoCompression means writing raw chunk data into files. - // With other choices, chunks are compressed before written. - kNoCompress = 0, - // Snappy had been the default compressing algorithm widely - // used in Google. It compromises between speech and - // compression ratio. - kSnappy = 1, - // Gzip is a well-known compression algorithm. It is - // recommmended only you are looking for compression ratio. - kGzip = 2, -}; - -// Header is the metadata of Chunk -class Header { - public: - Header(); - Header(uint32_t num, uint32_t sum, Compressor ct, uint32_t cs); - - void Write(std::ostream& os) const; - - // returns true if OK, false if eof - bool Parse(std::istream& is); - - uint32_t NumRecords() const { return num_records_; } - uint32_t Checksum() const { return checksum_; } - Compressor CompressType() const { return compressor_; } - uint32_t CompressSize() const { return compress_size_; } - - private: - uint32_t num_records_; - uint32_t checksum_; - Compressor compressor_; - uint32_t compress_size_; -}; - -// Allow Header Loggable -std::ostream& operator<<(std::ostream& os, Header h); -bool operator==(Header l, Header r); - -} // namespace recordio -} // namespace paddle diff --git a/paddle/fluid/recordio/header_test.cc b/paddle/fluid/recordio/header_test.cc deleted file mode 100644 index 00f1887dc5e1188829ef4cd42754d161f041656d..0000000000000000000000000000000000000000 --- a/paddle/fluid/recordio/header_test.cc +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "paddle/fluid/recordio/header.h" - -#include - -#include "gtest/gtest.h" - -TEST(Recordio, ChunkHead) { - paddle::recordio::Header hdr(0, 1, paddle::recordio::Compressor::kGzip, 3); - std::stringstream ss; - hdr.Write(ss); - ss.seekg(0, std::ios::beg); - paddle::recordio::Header hdr2; - hdr2.Parse(ss); - EXPECT_TRUE(hdr == hdr2); -} diff --git a/paddle/fluid/recordio/scanner.cc b/paddle/fluid/recordio/scanner.cc deleted file mode 100644 index b06c274adad9bb4e25b360980898a6e52f08b213..0000000000000000000000000000000000000000 --- a/paddle/fluid/recordio/scanner.cc +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "paddle/fluid/recordio/scanner.h" - -#include -#include - -#include "paddle/fluid/platform/enforce.h" - -namespace paddle { -namespace recordio { - -Scanner::Scanner(std::unique_ptr &&stream) - : stream_(std::move(stream)), parser_(*stream_) { - Reset(); -} - -Scanner::Scanner(const std::string &filename) - : stream_(new std::ifstream(filename, std::ios::in | std::ios::binary)), - parser_(*stream_) { - PADDLE_ENFORCE(static_cast(*stream_), "Cannot open file %s", filename); - Reset(); -} - -void Scanner::Reset() { - stream_->clear(); - stream_->seekg(0, std::ios::beg); - parser_.Init(); -} - -std::string Scanner::Next() { - if (stream_->eof()) { - return ""; - } - - auto res = parser_.Next(); - if (!parser_.HasNext() && HasNext()) { - parser_.Init(); - } - return res; -} - -bool Scanner::HasNext() const { return !stream_->eof(); } -} // namespace recordio -} // namespace paddle diff --git a/paddle/fluid/recordio/scanner.h b/paddle/fluid/recordio/scanner.h deleted file mode 100644 index 0d885dd87a2f819ba1d9f76259196f6cfff0b2a0..0000000000000000000000000000000000000000 --- a/paddle/fluid/recordio/scanner.h +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include -#include -#include - -#include "paddle/fluid/recordio/chunk.h" - -namespace paddle { -namespace recordio { - -class Scanner { - public: - explicit Scanner(std::unique_ptr&& stream); - - explicit Scanner(const std::string& filename); - - void Reset(); - - std::string Next(); - - bool HasNext() const; - - private: - std::unique_ptr stream_; - ChunkParser parser_; -}; -} // namespace recordio -} // namespace paddle diff --git a/paddle/fluid/recordio/writer.cc b/paddle/fluid/recordio/writer.cc deleted file mode 100644 index 8046f4ff7896c897ebe1de2e2bb231cad5a0e410..0000000000000000000000000000000000000000 --- a/paddle/fluid/recordio/writer.cc +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -#include "paddle/fluid/recordio/writer.h" - -#include - -#include "paddle/fluid/platform/enforce.h" - -namespace paddle { -namespace recordio { - -void Writer::Write(const std::string& record) { - cur_chunk_.Add(record); - if (cur_chunk_.NumRecords() >= max_num_records_in_chunk_) { - Flush(); - } -} - -void Writer::Flush() { - cur_chunk_.Write(stream_, compressor_); - cur_chunk_.Clear(); -} - -Writer::~Writer() { - PADDLE_ENFORCE(cur_chunk_.Empty(), "Writer must be flushed when destroy."); -} - -} // namespace recordio -} // namespace paddle diff --git a/paddle/fluid/recordio/writer.h b/paddle/fluid/recordio/writer.h deleted file mode 100644 index ac7e50ee90e6e8671d68e0d8065e0cf06c819ad0..0000000000000000000000000000000000000000 --- a/paddle/fluid/recordio/writer.h +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -#pragma once - -#include - -#include "paddle/fluid/recordio/chunk.h" -namespace paddle { -namespace recordio { - -class Writer { - public: - Writer(std::ostream* sout, Compressor compressor, - size_t max_num_records_in_chunk = 1000) - : stream_(*sout), - max_num_records_in_chunk_(max_num_records_in_chunk), - compressor_(compressor) {} - - void Write(const std::string& record); - - void Flush(); - - ~Writer(); - - private: - std::ostream& stream_; - size_t max_num_records_in_chunk_; - Chunk cur_chunk_; - Compressor compressor_; -}; - -} // namespace recordio -} // namespace paddle diff --git a/paddle/fluid/recordio/writer_scanner_test.cc b/paddle/fluid/recordio/writer_scanner_test.cc deleted file mode 100644 index 6583df21a20e9e034adc14b1d3eeb136899d659e..0000000000000000000000000000000000000000 --- a/paddle/fluid/recordio/writer_scanner_test.cc +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include - -#include "gtest/gtest.h" -#include "paddle/fluid/recordio/scanner.h" -#include "paddle/fluid/recordio/writer.h" - -TEST(WriterScanner, Normal) { - std::stringstream* stream = new std::stringstream(); - - { - paddle::recordio::Writer writer(stream, - paddle::recordio::Compressor::kSnappy); - writer.Write("ABC"); - writer.Write("BCD"); - writer.Write("CDE"); - writer.Flush(); - } - - { - stream->seekg(0, std::ios::beg); - std::unique_ptr stream_ptr(stream); - paddle::recordio::Scanner scanner(std::move(stream_ptr)); - ASSERT_TRUE(scanner.HasNext()); - ASSERT_EQ(scanner.Next(), "ABC"); - ASSERT_EQ("BCD", scanner.Next()); - ASSERT_TRUE(scanner.HasNext()); - ASSERT_EQ("CDE", scanner.Next()); - ASSERT_FALSE(scanner.HasNext()); - } -} - -TEST(WriterScanner, TinyChunk) { - std::stringstream* stream = new std::stringstream(); - { - paddle::recordio::Writer writer( - stream, paddle::recordio::Compressor::kNoCompress, 2 /*max chunk num*/); - writer.Write("ABC"); - writer.Write("BCD"); - writer.Write("CDE"); - writer.Write("DEFG"); - writer.Flush(); - } - - { - stream->seekg(0, std::ios::beg); - std::unique_ptr stream_ptr(stream); - paddle::recordio::Scanner scanner(std::move(stream_ptr)); - ASSERT_TRUE(scanner.HasNext()); - ASSERT_EQ(scanner.Next(), "ABC"); - ASSERT_EQ(scanner.Next(), "BCD"); - ASSERT_EQ(scanner.Next(), "CDE"); - ASSERT_EQ(scanner.Next(), "DEFG"); - ASSERT_FALSE(scanner.HasNext()); - } -} diff --git a/python/paddle/dataset/common.py b/python/paddle/dataset/common.py index 58a4c66c206c3f783437126c855c2890644f1bc0..3567ecfa0098328f69404335b450e35d443376cf 100644 --- a/python/paddle/dataset/common.py +++ b/python/paddle/dataset/common.py @@ -32,7 +32,6 @@ __all__ = [ 'md5file', 'split', 'cluster_files_reader', - 'convert', ] DATA_HOME = os.path.expanduser('~/.cache/paddle/dataset') @@ -205,40 +204,3 @@ def cluster_files_reader(files_pattern, yield line return reader - - -def convert(output_path, reader, line_count, name_prefix): - import recordio - """ - Convert data from reader to recordio format files. - - :param output_path: directory in which output files will be saved. - :param reader: a data reader, from which the convert program will read - data instances. - :param name_prefix: the name prefix of generated files. - :param max_lines_to_shuffle: the max lines numbers to shuffle before - writing. - """ - - assert line_count >= 1 - indx_f = 0 - - def write_data(indx_f, lines): - filename = "%s/%s-%05d" % (output_path, name_prefix, indx_f) - writer = recordio.writer(filename) - for l in lines: - # FIXME(Yancey1989): - # dumps with protocol: pickle.HIGHEST_PROTOCOL - writer.write(pickle.dumps(l)) - writer.close() - - lines = [] - for i, d in enumerate(reader()): - lines.append(d) - if i % line_count == 0 and i >= line_count: - write_data(indx_f, lines) - lines = [] - indx_f += 1 - continue - - write_data(indx_f, lines) diff --git a/python/paddle/fluid/__init__.py b/python/paddle/fluid/__init__.py index dfe58c7e4d92edd9fdbfa3689305b1ed29211947..9f7560a6fe7699d41a37654afe2dcb0848597511 100644 --- a/python/paddle/fluid/__init__.py +++ b/python/paddle/fluid/__init__.py @@ -75,7 +75,6 @@ from . import clip from . import dygraph_grad_clip from . import profiler from . import unique_name -from . import recordio_writer from . import parallel_executor from .parallel_executor import * from . import compiler diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index 88408d62361c15e76d316a819441392ffec505e5..cfb2c42fcd8071463b71c514987d16b17c31d385 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -32,9 +32,8 @@ from ..unique_name import generate as unique_name import logging __all__ = [ - 'data', 'open_files', 'read_file', 'shuffle', 'batch', 'double_buffer', - 'random_data_generator', 'py_reader', 'create_py_reader_by_data', - 'Preprocessor', 'load' + 'data', 'read_file', 'double_buffer', 'py_reader', + 'create_py_reader_by_data', 'load' ] @@ -362,137 +361,6 @@ def _copy_reader_create_op_(block, op): return new_op -@templatedoc(op_type='create_recordio_file_reader') -def open_recordio_file(filename, - shapes, - lod_levels, - dtypes, - pass_num=1, - for_parallel=True): - """ - ${comment} - - Args: - filename(${filename_type}): ${filename_comment}. - shapes(list): List of tuples which declaring data shapes. - lod_levels(${lod_levels_type}): ${lod_levels_comment}. - dtypes(list): List of strs which declaring data type. - pass_num(int): Number of passes to run. - for_parallel(Bool): Set it as True if you are going to run - subsequent operators in parallel. - - Returns: - ${out_comment}. - - Examples: - - >>> import paddle.fluid as fluid - >>> reader = fluid.layers.io.open_recordio_file( - >>> filename='./data.recordio', - >>> shapes=[(3,224,224), (1,)], - >>> lod_levels=[0, 0], - >>> dtypes=['float32', 'int64']) - >>> # Via the reader, we can use 'read_file' layer to get data: - >>> image, label = fluid.layers.io.read_file(reader) - """ - dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes] - shape_concat = [] - ranks = [] - - for shape in shapes: - shape_concat.extend(shape) - ranks.append(len(shape)) - - var_name = unique_name('open_recordio_file') - - startup_blk = default_startup_program().current_block() - startup_var = startup_blk.create_var(name=var_name) - startup_blk.append_op( - type='create_recordio_file_reader', - outputs={'Out': [startup_var]}, - attrs={ - 'shape_concat': shape_concat, - 'lod_levels': lod_levels, - 'filename': filename, - 'ranks': ranks - }) - - startup_var.desc.set_dtypes(dtypes) - startup_var.persistable = True - main_prog_var = _copy_reader_var_(default_main_program().current_block(), - startup_var) - - if pass_num > 1: - main_prog_var = multi_pass(reader=main_prog_var, pass_num=pass_num) - - return monkey_patch_reader_methods(main_prog_var) - - -def random_data_generator(low, high, shapes, lod_levels, for_parallel=True): - """ - Create a uniform random data generator - - This layer returns a Reader Variable. - Instead of opening a file and reading data from it, this - Reader Variable generates float uniform random data by itself. - It can be used as a dummy reader to test a network without - opening a real file. - - Args: - low(float): The lower bound of data's uniform distribution. - high(float): The upper bound of data's uniform distribution. - shapes(list): List of tuples which declaring data shapes. - lod_levels(list): List of ints which declaring data lod_level. - for_parallel(Bool): Set it as True if you are going to run - subsequent operators in parallel. - - Returns: - Variable: A Reader Variable from which we can get random data. - - Examples: - - .. code-block:: python - - import paddle.fluid as fluid - reader = fluid.layers.random_data_generator( - low=0.0, - high=1.0, - shapes=[[3,224,224], [1]], - lod_levels=[0, 0]) - # Via the reader, we can use 'read_file' layer to get data: - image, label = fluid.layers.read_file(reader) - """ - dtypes = [core.VarDesc.VarType.FP32] * len(shapes) - shape_concat = [] - ranks = [] - - for shape in shapes: - shape_concat.extend(shape) - ranks.append(len(shape)) - - var_name = unique_name('random_data_generator') - - startup_blk = default_startup_program().current_block() - startup_var = startup_blk.create_var(name=var_name) - startup_blk.append_op( - type='create_random_data_generator', - outputs={'Out': [startup_var]}, - attrs={ - 'low': low, - 'high': high, - 'shape_concat': shape_concat, - 'lod_levels': lod_levels, - 'ranks': ranks - }) - - startup_var.desc.set_dtypes(dtypes) - startup_var.persistable = True - main_prog_var = _copy_reader_var_(default_main_program().current_block(), - startup_var) - - return monkey_patch_reader_methods(main_prog_var) - - def _py_reader(capacity, shapes, dtypes, @@ -1006,79 +874,6 @@ def __create_unshared_decorated_reader__(op_type, reader, attrs, name=None): return monkey_patch_reader_methods(new_reader) -def shuffle(reader, buffer_size): - """ - Creates a data reader whose data output is shuffled. - Output from the iterator that created by original reader will be - buffered into shuffle buffer, and then shuffled. The size of shuffle buffer - is determined by argument buf_size. - - Args: - reader(callable): the original reader whose output will be shuffled. - buf_size(int): shuffle buffer size. - - Returns: - callable: the new reader whose output is shuffled. - - Examples: - .. code-block:: python - - import paddle.fluid as fluid - raw_reader = fluid.layers.io.open_files(filenames=['./data1.recordio', - './data2.recordio'], - shapes=[(3,224,224), (1,)], - lod_levels=[0, 0], - dtypes=['float32', 'int64'], - thread_num=2, - buffer_size=2) - batch_reader = fluid.layers.batch(reader=raw_reader, batch_size=5) - shuffle_reader = fluid.layers.shuffle(reader=batch_reader, buffer_size=5000) - """ - return __create_unshared_decorated_reader__( - 'create_shuffle_reader', reader, {'buffer_size': int(buffer_size)}) - - -def batch(reader, batch_size): - """ - This layer is a reader decorator. It takes a reader and adds - 'batching' decoration on it. When reading with the result - decorated reader, output data will be automatically organized - to the form of batches. - - Args: - reader(Variable): The reader to be decorated with 'batching'. - batch_size(int): The batch size. - - Returns: - Variable: The reader which has been decorated with 'batching'. - - Examples: - .. code-block:: python - - import paddle.fluid as fluid - raw_reader = fluid.layers.io.open_files(filenames=['./data1.recordio', - './data2.recordio'], - shapes=[(3,224,224), (1,)], - lod_levels=[0, 0], - dtypes=['float32', 'int64'], - thread_num=2, - buffer_size=2) - batch_reader = fluid.layers.batch(reader=raw_reader, batch_size=5) - - # If we read data with the raw_reader: - # data = fluid.layers.read_file(raw_reader) - # We can only get data instance by instance. - # - # However, if we read data with the batch_reader: - # data = fluid.layers.read_file(batch_reader) - # Each 5 adjacent instances will be automatically combined together - # to become a batch. So what we get('data') is a batch data instead - # of an instance. - """ - return __create_unshared_decorated_reader__( - 'create_batch_reader', reader, {'batch_size': int(batch_size)}) - - def double_buffer(reader, place=None, name=None): """ Wrap a double buffer reader. The data will copy to target place with a @@ -1096,14 +891,15 @@ def double_buffer(reader, place=None, name=None): wrapped reader with double buffer. Examples: - - >>> import paddle.fluid as fluid - >>> reader = fluid.layers.open_files(filenames=['mnist.recordio'], - >>> shapes=[[-1, 784], [-1, 1]], - >>> lod_levels=[0, 0], - >>> dtypes=['float32', 'int64']) - >>> reader = fluid.layers.double_buffer(reader) - >>> img, label = fluid.layers.read_file(reader) + .. code-block:: python + + import paddle.fluid as fluid + reader = fluid.layers.py_reader(capacity=64, + shapes=[(-1, 1, 28, 28), (-1, 1)], + dtypes=['float32', 'int64'], + use_double_buffer=False) + reader = fluid.layers.double_buffer(reader) + image, label = fluid.layers.read_file(reader) """ attrs = dict() if place is not None: @@ -1112,11 +908,6 @@ def double_buffer(reader, place=None, name=None): 'create_double_buffer_reader', reader, attrs, name=name) -def multi_pass(reader, pass_num): - return __create_shared_decorated_reader__( - 'create_multi_pass_reader', reader, {'pass_num': int(pass_num)}) - - def read_file(reader): """ Execute the given reader and get data via it. @@ -1136,14 +927,10 @@ def read_file(reader): .. code-block:: python import paddle.fluid as fluid - data_file = fluid.layers.open_files( - filenames=['mnist.recordio'], - shapes=[(-1, 748), (-1, 1)], - lod_levels=[0, 0], - dtypes=["float32", "int64"]) - data_file = fluid.layers.double_buffer( - fluid.layers.batch(data_file, batch_size=64)) - input, label = fluid.layers.read_file(data_file) + reader = fluid.layers.py_reader(capacity=64, + shapes=[(-1, 1, 28, 28), (-1, 1)], + dtypes=['float32', 'int64']) + image, label = fluid.layers.read_file(reader) """ helper = LayerHelper('read_file') out = [ @@ -1159,113 +946,6 @@ def read_file(reader): return out -class Preprocessor(object): - """ - A block for data pre-processing in reader. - - Args: - reader (Variable): A reader variable. - name (str, default None): The name of the reader. - - Examples: - .. code-block:: python - - import paddle.fluid as fluid - - reader = fluid.layers.io.open_files( - filenames=['./data1.recordio', './data2.recordio'], - shapes=[(3, 224, 224), (1, )], - lod_levels=[0, 0], - dtypes=['float32', 'int64']) - - preprocessor = fluid.layers.io.Preprocessor(reader=reader) - with preprocessor.block(): - img, lbl = preprocessor.inputs() - img_out = img / 2 - lbl_out = lbl + 1 - preprocessor.outputs(img_out, lbl_out) - - data_file = fluid.layers.io.double_buffer(preprocessor()) - - """ - BEFORE_SUB_BLOCK = 0 - IN_SUB_BLOCK = 1 - AFTER_SUB_BLOCK = 2 - - def __init__(self, reader, name=None): - self.underlying_reader = reader - new_reader_name = name if name is not None else unique_name( - "create_custom_reader") - self.main_prog = default_main_program() - self.reader = self.main_prog.current_block().create_var( - name=new_reader_name) - self.sub_block = None - self.source_var_names = None - self.sink_var_names = None - self.status = Preprocessor.BEFORE_SUB_BLOCK - - def _is_completed(self): - return self.sub_block and self.source_var_names and self.sink_var_names - - @signature_safe_contextmanager - def block(self): - self.status = Preprocessor.IN_SUB_BLOCK - self.sub_block = self.main_prog._create_block() - yield - self.main_prog._rollback() - self.status = Preprocessor.AFTER_SUB_BLOCK - if not self._is_completed(): - raise RuntimeError( - "The definition of preprocessor is incompleted! " - "Please make sure that you have set input and output " - "variables by invoking 'inputs' and 'outputs' in " - "Preprocessor's sub-block.") - - def inputs(self): - if self.status != Preprocessor.IN_SUB_BLOCK: - raise RuntimeError( - "Preprocessor.inputs() can only be invoked inside the sub-block." - ) - - source_shapes = self.underlying_reader.desc.shapes() - source_dtypes = self.underlying_reader.desc.dtypes() - source_lod_levels = self.underlying_reader.desc.lod_levels() - self.source_var_names = [ - unique_name("preprocessor_source") - for _ in six.moves.range(len(source_shapes)) - ] - source_vars = [] - for var_name, shape, dtype, lod_level in zip( - self.source_var_names, source_shapes, source_dtypes, - source_lod_levels): - source_vars.append(self.main_prog.current_block().create_var( - name=var_name, shape=shape, dtype=dtype, lod_level=lod_level)) - return source_vars - - def outputs(self, *outs): - if self.status != Preprocessor.IN_SUB_BLOCK: - raise RuntimeError( - "Preprocessor.outputs() can only be invoked inside the sub-block." - ) - self.sink_var_names = [var.name for var in outs] - - def __call__(self, *args, **kwargs): - if self.status != Preprocessor.AFTER_SUB_BLOCK: - raise RuntimeError( - "Preprocessor output can only be retrieved after rnn block.") - - self.main_prog.current_block().append_op( - type="create_custom_reader", - inputs={'UnderlyingReader': self.underlying_reader}, - outputs={'Out': [self.reader]}, - attrs={ - "sub_block": self.sub_block, - "source_var_names": self.source_var_names, - "sink_var_names": self.sink_var_names - }) - return monkey_patch_reader_methods(self.reader) - - @templatedoc() def load(out, file_path, load_as_fp16=None): """ diff --git a/python/paddle/fluid/recordio_writer.py b/python/paddle/fluid/recordio_writer.py deleted file mode 100644 index aa581f23a191639fdc026e7781897d5d996823a9..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/recordio_writer.py +++ /dev/null @@ -1,132 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function - -import os -from .wrapped_decorator import signature_safe_contextmanager -from . import core -__all__ = [ - 'convert_reader_to_recordio_file', 'convert_reader_to_recordio_files' -] - - -@signature_safe_contextmanager -def create_recordio_writer(filename, - compressor=core.RecordIOWriter.Compressor.Snappy, - max_num_records=1000): - writer = core.RecordIOWriter(filename, compressor, max_num_records) - yield writer - writer.close() - - -def convert_reader_to_recordio_file( - filename, - reader_creator, - feeder, - compressor=core.RecordIOWriter.Compressor.Snappy, - max_num_records=1000, - feed_order=None): - """ - Convert a Python Reader to a recordio file. - - Examples: - - >>> import paddle.fluid as fluid - >>> import paddle.dataset.mnist as mnist - >>> import paddle - >>> - >>> tmp_program = fluid.Program() - >>> with fluid.program_guard(tmp_program): - >>> img = fluid.layers.data(name='img', shape=[784]) - >>> label = fluid.layers.data(name='label', shape=[1], dtype='int64') - >>> feeder = fluid.DataFeeder(feed_list=[img, label], place=fluid.CPUPlace()) - >>> # mnist.recordio will be generated in current directory - >>> fluid.recordio_writer.convert_reader_to_recordio_file( - >>> filename="mnist.recordio", - >>> reader_creator=paddle.batch(mnist.train(), batch_size=32), - >>> feeder=feeder) - - Args: - filename(str): The recordio filename. - reader_creator(callable): The Python Reader Creator. See - :ref:`api_guide_python_reader`. - feeder(DataFeeder): The DataFeeder instance. Used to convert - :code:`reader_creator` to :code: `lod_tensor` - compressor: Must in fluid.core.RecordIOWriter.Compressor.Snappy or - fluid.core.RecordIOWriter.Compressor.NoCompress. Use :code:`Snappy` - by default. - max_num_records(int): Maximum number of records in one chuck. Each record - is each return value from reader function - feed_order(list): The order of variable names that the reader returns - - Returns: - int: the number of record that saved. - """ - if feed_order is None: - feed_order = feeder.feed_names - counter = 0 - with create_recordio_writer(filename, compressor, - max_num_records) as writer: - for batch in reader_creator(): - res = feeder.feed(batch) - for each in feed_order: - writer.append_tensor(res[each]) - writer.complete_append_tensor() - counter += 1 - return counter - - -def convert_reader_to_recordio_files( - filename, - batch_per_file, - reader_creator, - feeder, - compressor=core.RecordIOWriter.Compressor.Snappy, - max_num_records=1000, - feed_order=None): - """ - convert a python reader to many recordio files. - - This API is basically same as :code:`convert_reader_to_recordio_file`, - instead of it will create many recordio files. Each file contains at - most :code:`batch_per_file` records. - - Please reference - :ref:`api_fluid_recordio_writer_convert_reader_to_recordio_file` for more - details. - """ - if feed_order is None: - feed_order = feeder.feed_names - f_name, f_ext = os.path.splitext(filename) - assert (f_ext == ".recordio") - - lines = [] - f_idx = 0 - counter = 0 - for idx, batch in enumerate(reader_creator()): - lines.append(batch) - if idx >= batch_per_file and idx % batch_per_file == 0: - filename = "%s-%05d%s" % (f_name, f_idx, f_ext) - with create_recordio_writer(filename, compressor, - max_num_records) as writer: - for l in lines: - res = feeder.feed(l) - for each in feed_order: - writer.append_tensor(res[each]) - writer.complete_append_tensor() - counter += 1 - lines = [] - f_idx += 1 - return counter diff --git a/python/paddle/fluid/tests/demo/file_reader/convert_data_to_recordio.py b/python/paddle/fluid/tests/demo/file_reader/convert_data_to_recordio.py deleted file mode 100644 index b00af91a9dce637e312c9dc5d7d3824106b5a051..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/demo/file_reader/convert_data_to_recordio.py +++ /dev/null @@ -1,63 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function - -import sys -import paddle.fluid as fluid - - -def load_vocab(filename): - """ - load vocabulary - """ - vocab = {} - with open(filename) as f: - wid = 0 - for line in f: - vocab[line.strip()] = wid - wid += 1 - return vocab - - -# load word dict with paddle inner function -if len(sys.argv) == 1: - word_dict = paddle.dataset.imdb.word_dict() -else: - word_dict = load_vocab(sys.argv[1]) - word_dict[""] = len(word_dict) -print("Dict dim = ", len(word_dict)) - -# input text data -data = fluid.layers.data(name="words", shape=[1], dtype="int64", lod_level=1) - -# label data -label = fluid.layers.data(name="label", shape=[1], dtype="int64") -# like placeholder -feeder = fluid.DataFeeder(feed_list=[data, label], place=fluid.CPUPlace()) - -# train data set -BATCH_SIZE = 128 -train_reader = paddle.batch( - paddle.reader.shuffle( - paddle.dataset.imdb.train(word_dict), buf_size=25000), - batch_size=BATCH_SIZE) - -test_reader = paddle.batch( - paddle.dataset.imdb.test(word_dict), batch_size=BATCH_SIZE) - -fluid.recordio_writer.convert_reader_to_recordio_file( - "train.recordio", feeder=feeder, reader_creator=train_reader) -fluid.recordio_writer.convert_reader_to_recordio_file( - "test.recordio", feeder=feeder, reader_creator=test_reader) diff --git a/python/paddle/fluid/tests/test_cpp_reader.py b/python/paddle/fluid/tests/test_cpp_reader.py deleted file mode 100644 index b2a5253b9500bb504c651b2ab684206133199ada..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/test_cpp_reader.py +++ /dev/null @@ -1,94 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function - -import paddle -import paddle.fluid as fluid -import numpy as np -import sys - -startup_prog = fluid.framework.Program() -startup_block = startup_prog.current_block() - -random_reader = startup_block.create_var( - type=fluid.core.VarDesc.VarType.READER, name="RandomDataGenerator") -random_reader.desc.set_dtypes( - [fluid.core.VarDesc.VarType.FP32, fluid.core.VarDesc.VarType.FP32]) -random_reader.persistable = True -shuffle_reader = startup_block.create_var( - type=fluid.core.VarDesc.VarType.READER, name="ShuffleReader") -shuffle_reader.persistable = True -batch_reader = startup_block.create_var( - type=fluid.core.VarDesc.VarType.READER, name="BatchReader") -batch_reader.persistable = True -double_buffer = startup_block.create_var( - type=fluid.core.VarDesc.VarType.READER, name="DoubleBuffer") -double_buffer.persistable = True - -main_prog = startup_prog.clone() -main_block = main_prog.current_block() - -create_random_data_generator_op = startup_block.append_op( - type="create_random_data_generator", - outputs={"Out": random_reader}, - attrs={ - "shape_concat": [1, 2, 1, 1], - "ranks": [2, 2], - "low": 0.0, - "high": 1.0, - 'lod_levels': [0, 0] - }) - -create_shuffle_reader_op = startup_block.append_op( - type="create_shuffle_reader", - inputs={"UnderlyingReader": random_reader}, - outputs={"Out": shuffle_reader}, - attrs={"buffer_size": 7}) - -create_batch_reader_op = startup_block.append_op( - type="create_batch_reader", - inputs={"UnderlyingReader": shuffle_reader}, - outputs={"Out": batch_reader}, - attrs={"batch_size": 10}) - -create_double_buffer_reader_op = startup_block.append_op( - type="create_double_buffer_reader", - inputs={"UnderlyingReader": batch_reader}, - outputs={"Out": double_buffer}) - -out1 = main_block.create_var( - type=fluid.core.VarDesc.VarType.LOD_TENSOR, name="Out1") -out2 = main_block.create_var( - type=fluid.core.VarDesc.VarType.LOD_TENSOR, name="Out2") - -main_block.var("DoubleBuffer").desc.set_shapes(double_buffer.desc.shapes()) -main_block.var("DoubleBuffer").desc.set_dtypes(double_buffer.desc.dtypes()) -main_block.var("DoubleBuffer").desc.set_lod_levels( - double_buffer.desc.lod_levels()) - -read_op = main_block.append_op( - type="read", - inputs={"Reader": double_buffer}, - outputs={"Out": [out1, out2]}) - -place = fluid.CPUPlace() -exe = fluid.Executor(place) - -exe.run(startup_prog) - -for i in range(1, 100): - [res1, res2] = exe.run(main_prog, fetch_list=[out1, out2]) - if not (res1.shape == (10, 2) and res2.shape == (10, 1)): - exit(1) diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 933121ee3f5d08802fdb178c42e065e7fd920cbe..93ce55a1d44e9d68ab01a414238bf199bd5ffc44 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -305,7 +305,7 @@ if (WITH_MKLDNN) add_subdirectory(mkldnn) endif() -set_tests_properties(test_recordio_reader test_parallel_executor_test_while_train test_parallel_executor_mnist +set_tests_properties(test_parallel_executor_test_while_train test_parallel_executor_mnist test_parallel_executor_seresnext test_parallel_executor_crf test_sync_batch_norm_op test_parallel_executor_crf_auto_growth test_buffer_shared_memory_reuse_pass_and_fuse_optimization_op_pass test_buffer_shared_memory_reuse_pass PROPERTIES LABELS "RUN_TYPE=DIST") diff --git a/python/paddle/fluid/tests/unittests/feed_data_reader.py b/python/paddle/fluid/tests/unittests/feed_data_reader.py new file mode 100644 index 0000000000000000000000000000000000000000..1e6016d57bd776ecc1f3ee0db63808e5bcb97eea --- /dev/null +++ b/python/paddle/fluid/tests/unittests/feed_data_reader.py @@ -0,0 +1,78 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import six +import paddle.fluid as fluid +from paddle.fluid.framework import Variable + + +def cyclic_reader(reader): + def __reader__(): + while True: + for data in reader(): + yield data + + return __reader__ + + +class FeedDataReader(object): + def __init__(self, feed_list, reader): + self._feed_list = [] + for var in feed_list: + if isinstance(var, Variable): + self._feed_list.append(var.name) + else: + self._feed_list.append(var) + + self._reader = cyclic_reader(reader) + self._iter = self._reader() + + def _feed_executor(self): + next_data = next(self._iter) + feed_data = dict() + assert len(self._feed_list) == len(next_data) + for key, value in six.moves.zip(self._feed_list, next_data): + feed_data[key] = value + return feed_data + + def _feed_parallel_executor(self, device_num): + feed_data = [] + for _ in six.moves.range(device_num): + feed_data.append(self._feed_executor()) + + return feed_data + + def get_next(self, exe, program): + result = [] + assert isinstance(exe, fluid.Executor), "exe must be Executor" + use_cuda = isinstance(exe.place, fluid.CUDAPlace) + if isinstance(program, fluid.CompiledProgram): + if program._is_data_parallel: + use_executor = False + if program._places is None: + device_num = len(fluid.cuda_places()) if use_cuda else len( + fluid.cpu_places()) + else: + device_num = len(program._places) + else: + use_executor = True + device_num = 1 + else: + use_executor = True + device_num = 1 + + if use_executor: + return self._feed_executor() + else: + return self._feed_parallel_executor(device_num) diff --git a/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py b/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py index 94e451ff26670321f961ac9b0aa4254bab9436a1..e56f41d71346ae4b72cde0869174336aea0e23dc 100644 --- a/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py +++ b/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py @@ -24,6 +24,7 @@ import time import numpy as np import math import sys +from feed_data_reader import FeedDataReader __all__ = ['TestParallelExecutorBase'] @@ -36,6 +37,7 @@ class TestParallelExecutorBase(unittest.TestCase): iter=50, batch_size=None, feed_dict=None, + feed_data_reader=None, get_data_from_feeder=None, use_parallel_executor=True, use_reduce=False, @@ -49,9 +51,19 @@ class TestParallelExecutorBase(unittest.TestCase): use_fast_executor=False, enable_sequential_execution=False): def run_executor(exe, binary, feed, fetch_list): - res = exe.run(binary, feed=feed, fetch_list=fetch_list) + if feed_data_reader is None: + res = exe.run(binary, feed=feed, fetch_list=fetch_list) + else: + res = exe.run(binary, + feed=feed_data_reader.get_next(exe, binary), + fetch_list=fetch_list) return res + if feed_data_reader is not None: + assert isinstance( + feed_data_reader, FeedDataReader + ), "feed_data_reader must be type of FeedDataReader" + main = fluid.Program() startup = fluid.Program() startup.random_seed = 1 diff --git a/python/paddle/fluid/tests/unittests/test_fuse_relu_depthwise_conv_pass.py b/python/paddle/fluid/tests/unittests/test_fuse_relu_depthwise_conv_pass.py index d4be4f84af4289f8aee060a93d64493bf22ab854..7c9b56d403092ebbd4effe5b15ade9520a4f5d8c 100644 --- a/python/paddle/fluid/tests/unittests/test_fuse_relu_depthwise_conv_pass.py +++ b/python/paddle/fluid/tests/unittests/test_fuse_relu_depthwise_conv_pass.py @@ -21,8 +21,6 @@ import paddle.dataset.mnist as mnist import unittest import os -MNIST_RECORDIO_FILE = "./mnist_test_pe.recordio" - def norm(*args, **kargs): return fluid.layers.batch_norm(*args, **kargs) @@ -51,17 +49,9 @@ def sep_conv(input, channel, stride, filter, dilation=1, act=None): def simple_depthwise_net(use_feed): - if use_feed: - img = fluid.layers.data(name='image', shape=[784], dtype='float32') - label = fluid.layers.data(name='label', shape=[1], dtype='int64') - else: - reader = fluid.layers.open_files( - filenames=[MNIST_RECORDIO_FILE], - shapes=[[-1, 784], [-1, 1]], - lod_levels=[0, 0], - dtypes=['float32', 'int64']) - reader = fluid.layers.io.double_buffer(reader) - img, label = fluid.layers.read_file(reader) + assert use_feed + img = fluid.layers.data(name='image', shape=[784], dtype='float32') + label = fluid.layers.data(name='label', shape=[1], dtype='int64') hidden = fluid.layers.reshape(img, (-1, 1, 28, 28)) for _ in range(4): hidden = sep_conv(hidden, channel=200, stride=2, filter=5) @@ -73,23 +63,6 @@ def simple_depthwise_net(use_feed): class TestMNIST(TestParallelExecutorBase): - @classmethod - def setUpClass(cls): - os.environ['CPU_NUM'] = str(4) - # Convert mnist to recordio file - with fluid.program_guard(fluid.Program(), fluid.Program()): - reader = paddle.batch(mnist.train(), batch_size=4) - feeder = fluid.DataFeeder( - feed_list=[ # order is image and label - fluid.layers.data( - name='image', shape=[784]), - fluid.layers.data( - name='label', shape=[1], dtype='int64'), - ], - place=fluid.CPUPlace()) - fluid.recordio_writer.convert_reader_to_recordio_file( - MNIST_RECORDIO_FILE, reader, feeder) - def _init_data(self, random=True): np.random.seed(5) if random: diff --git a/python/paddle/fluid/tests/unittests/test_ir_memory_optimize_pass.py b/python/paddle/fluid/tests/unittests/test_ir_memory_optimize_pass.py index e224caee6e5bd4f83df823a3ce9b48a348926abc..d9f68c2d15ee7c728379140f2601e69dc0c245fc 100644 --- a/python/paddle/fluid/tests/unittests/test_ir_memory_optimize_pass.py +++ b/python/paddle/fluid/tests/unittests/test_ir_memory_optimize_pass.py @@ -21,26 +21,16 @@ import paddle.dataset.mnist as mnist import unittest import os -MNIST_RECORDIO_FILE = "./mnist_test_pe.recordio" - -def _feed_data_helper(use_feed): - if use_feed: - img = fluid.layers.data(name='image', shape=[784], dtype='float32') - label = fluid.layers.data(name='label', shape=[1], dtype='int64') - else: - reader = fluid.layers.open_files( - filenames=[MNIST_RECORDIO_FILE], - shapes=[[-1, 784], [-1, 1]], - lod_levels=[0, 0], - dtypes=['float32', 'int64']) - reader = fluid.layers.io.double_buffer(reader) - img, label = fluid.layers.read_file(reader) +def _feed_data_helper(): + img = fluid.layers.data(name='image', shape=[784], dtype='float32') + label = fluid.layers.data(name='label', shape=[1], dtype='int64') return img, label def simple_fc_net(use_feed): - x, y = _feed_data_helper(use_feed) + assert use_feed + x, y = _feed_data_helper() hidden_layer = 4 for _ in range(hidden_layer): x = fluid.layers.fc(input=x, size=20, act='relu') @@ -51,7 +41,8 @@ def simple_fc_net(use_feed): def fc_with_inplace_net(use_feed): - x, y = _feed_data_helper(use_feed) + assert use_feed + x, y = _feed_data_helper() fc = fluid.layers.fc(input=x, size=20, act='relu') fc = fluid.layers.fc(input=fc, size=10, act='relu') reshape = fluid.layers.reshape(x=fc, shape=[-1, 2, 5]) @@ -63,23 +54,6 @@ def fc_with_inplace_net(use_feed): class TestMNIST(TestParallelExecutorBase): - @classmethod - def setUpClass(cls): - os.environ['CPU_NUM'] = str(4) - # Convert mnist to recordio file - with fluid.program_guard(fluid.Program(), fluid.Program()): - reader = paddle.batch(mnist.train(), batch_size=4) - feeder = fluid.DataFeeder( - feed_list=[ # order is image and label - fluid.layers.data( - name='image', shape=[784]), - fluid.layers.data( - name='label', shape=[1], dtype='int64'), - ], - place=fluid.CPUPlace()) - fluid.recordio_writer.convert_reader_to_recordio_file( - MNIST_RECORDIO_FILE, reader, feeder) - def _dummy_data(self): np.random.seed(5) img = np.random.random(size=[32, 784]).astype(np.float32) diff --git a/python/paddle/fluid/tests/unittests/test_ir_memory_optimize_transformer.py b/python/paddle/fluid/tests/unittests/test_ir_memory_optimize_transformer.py index 642be33b6edff49158ffb3e8ea6bf1a2af72cec5..1af696f873315c2a6494266fc931185525e023ac 100644 --- a/python/paddle/fluid/tests/unittests/test_ir_memory_optimize_transformer.py +++ b/python/paddle/fluid/tests/unittests/test_ir_memory_optimize_transformer.py @@ -22,46 +22,30 @@ import paddle.fluid.core as core import paddle.dataset.wmt16 as wmt16 os.environ['FLAGS_eager_delete_tensor_gb'] = "0.0" -os.environ[ - 'RECORDIO_FILENAME'] = '/tmp/ir_memory_optimize_transformer.wmt16.recordio' -from test_parallel_executor_transformer import transformer, ModelHyperParams, transformer_model, transformer, prepare_batch_input from parallel_executor_test_base import TestParallelExecutorBase +from test_parallel_executor_transformer import get_feed_data_reader, transformer # NOTE(dzhwinter): test diferent strategy colisions. # open the eager delete tensor strategy by default. class TestTransformerWithIR(TestParallelExecutorBase): - @classmethod - def setUpClass(cls): - os.environ['CPU_NUM'] = str(4) - reader = paddle.batch( - wmt16.train(ModelHyperParams.src_vocab_size, - ModelHyperParams.trg_vocab_size), - batch_size=transformer_model.batch_size) - - with fluid.recordio_writer.create_recordio_writer( - os.environ.get("RECORDIO_FILENAME")) as writer: - for batch in reader(): - for tensor in prepare_batch_input( - batch, ModelHyperParams.src_pad_idx, - ModelHyperParams.trg_pad_idx, ModelHyperParams.n_head): - t = fluid.LoDTensor() - t.set(tensor, fluid.CPUPlace()) - writer.append_tensor(t) - writer.complete_append_tensor() - def test_main(self): if core.is_compiled_with_cuda(): # check python transpiler self.check_network_convergence( transformer, use_cuda=True, + feed_data_reader=get_feed_data_reader(), use_ir_memory_optimize=False, iter=2) # check IR memory optimize self.check_network_convergence( - transformer, use_cuda=True, use_ir_memory_optimize=True, iter=2) + transformer, + use_cuda=True, + feed_data_reader=get_feed_data_reader(), + use_ir_memory_optimize=True, + iter=2) if __name__ == '__main__': diff --git a/python/paddle/fluid/tests/unittests/test_multi_file_reader.py b/python/paddle/fluid/tests/unittests/test_multi_file_reader.py deleted file mode 100644 index 09788868ccb926f56c2f622b5caf695670fd17f8..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/unittests/test_multi_file_reader.py +++ /dev/null @@ -1,81 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function - -import unittest - -import paddle.fluid as fluid -import paddle -import paddle.dataset.mnist as mnist -from shutil import copyfile - - -class TestMultipleReader(unittest.TestCase): - def setUp(self): - self.batch_size = 64 - # Convert mnist to recordio file - with fluid.program_guard(fluid.Program(), fluid.Program()): - reader = paddle.batch(mnist.train(), batch_size=self.batch_size) - feeder = fluid.DataFeeder( - feed_list=[ # order is image and label - fluid.layers.data( - name='image', shape=[784]), - fluid.layers.data( - name='label', shape=[1], dtype='int64'), - ], - place=fluid.CPUPlace()) - self.num_batch = fluid.recordio_writer.convert_reader_to_recordio_file( - './mnist_0.recordio', reader, feeder) - copyfile('./mnist_0.recordio', './mnist_1.recordio') - copyfile('./mnist_0.recordio', './mnist_2.recordio') - - def main(self, is_test=False): - file_list = [ - './mnist_0.recordio', './mnist_1.recordio', './mnist_2.recordio' - ] - with fluid.program_guard(fluid.Program(), fluid.Program()): - data_files = fluid.layers.open_files( - filenames=file_list, - shapes=[(-1, 784), (-1, 1)], - lod_levels=[0, 0], - dtypes=['float32', 'int64'], - is_test=is_test) - img, label = fluid.layers.read_file(data_files) - - if fluid.core.is_compiled_with_cuda(): - place = fluid.CUDAPlace(0) - else: - place = fluid.CPUPlace() - - exe = fluid.Executor(place) - exe.run(fluid.default_startup_program()) - - batch_count = 0 - while True: - try: - img_val, = exe.run(fetch_list=[img]) - except fluid.core.EOFException: - break - batch_count += 1 - self.assertLessEqual(img_val.shape[0], self.batch_size) - self.assertEqual(batch_count, self.num_batch * 3) - - def test_main(self): - self.main(is_test=False) - self.main(is_test=True) - - -if __name__ == '__main__': - unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_multi_pass_reader.py b/python/paddle/fluid/tests/unittests/test_multi_pass_reader.py deleted file mode 100644 index 8835b6995e00756bcfd3385f362c292924d98128..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/unittests/test_multi_pass_reader.py +++ /dev/null @@ -1,69 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function - -import unittest - -import paddle.fluid as fluid -import paddle -import paddle.dataset.mnist as mnist -from paddle.fluid.layers.io import open_recordio_file - - -class TestMultipleReader(unittest.TestCase): - def setUp(self): - self.batch_size = 64 - self.pass_num = 3 - # Convert mnist to recordio file - with fluid.program_guard(fluid.Program(), fluid.Program()): - data_file = paddle.batch(mnist.train(), batch_size=self.batch_size) - feeder = fluid.DataFeeder( - feed_list=[ - fluid.layers.data( - name='image', shape=[784]), - fluid.layers.data( - name='label', shape=[1], dtype='int64'), - ], - place=fluid.CPUPlace()) - self.num_batch = fluid.recordio_writer.convert_reader_to_recordio_file( - './mnist.recordio', data_file, feeder) - - def test_main(self): - with fluid.program_guard(fluid.Program(), fluid.Program()): - data_file = open_recordio_file( - filename='./mnist.recordio', - shapes=[(-1, 784), (-1, 1)], - lod_levels=[0, 0], - dtypes=['float32', 'int64'], - pass_num=self.pass_num) - img, label = fluid.layers.read_file(data_file) - - if fluid.core.is_compiled_with_cuda(): - place = fluid.CUDAPlace(0) - else: - place = fluid.CPUPlace() - - exe = fluid.Executor(place) - exe.run(fluid.default_startup_program()) - - batch_count = 0 - while True: - try: - img_val, = exe.run(fetch_list=[img]) - except fluid.core.EOFException: - break - batch_count += 1 - self.assertLessEqual(img_val.shape[0], self.batch_size) - self.assertEqual(batch_count, self.num_batch * self.pass_num) diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_transformer.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_transformer.py index b1851f4c78ddf984b06cf67f628099d5b60c771e..1f47d87811cf4ca63bda63da860e2ac3b9de1e7e 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_transformer.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_transformer.py @@ -23,8 +23,7 @@ import paddle import paddle.fluid.core as core import paddle.dataset.wmt16 as wmt16 import os - -WMT16_RECORDIO_FILE = os.environ.get('RECORDIO_FILENAME', '/tmp/wmt16.recordio') +from feed_data_reader import FeedDataReader class ModelHyperParams(object): @@ -140,6 +139,9 @@ def prepare_batch_input(insts, src_pad_idx, trg_pad_idx, n_head): ] +feed_data_reader = None + + def transformer(use_feed): assert not use_feed, "transfomer doesn't support feed yet" return transformer_model.transformer( @@ -152,32 +154,57 @@ def transformer(use_feed): ModelHyperParams.trg_pad_idx, ModelHyperParams.pos_pad_idx) +def get_feed_data_reader(): + global feed_data_reader + if feed_data_reader is not None: + return feed_data_reader + + reader = paddle.batch( + wmt16.train(ModelHyperParams.src_vocab_size, + ModelHyperParams.trg_vocab_size), + batch_size=transformer_model.batch_size) + all_batch_tensors = [] + for batch in reader(): + tensors = [] + for tensor in prepare_batch_input(batch, ModelHyperParams.src_pad_idx, + ModelHyperParams.trg_pad_idx, + ModelHyperParams.n_head): + tensors.append(np.array(tensor)) + all_batch_tensors.append(tensors) + + def __reader__(): + for t in all_batch_tensors: + yield t + + feed_data_reader = FeedDataReader( + feed_list=transformer_model.build_inputs( + ModelHyperParams.max_length + 1, ModelHyperParams.n_head), + reader=__reader__) + + return feed_data_reader + + class TestTransformer(TestParallelExecutorBase): @classmethod def setUpClass(cls): os.environ['CPU_NUM'] = str(4) - reader = paddle.batch( - wmt16.train(ModelHyperParams.src_vocab_size, - ModelHyperParams.trg_vocab_size), - batch_size=transformer_model.batch_size) - - with fluid.recordio_writer.create_recordio_writer( - WMT16_RECORDIO_FILE) as writer: - for batch in reader(): - for tensor in prepare_batch_input( - batch, ModelHyperParams.src_pad_idx, - ModelHyperParams.trg_pad_idx, ModelHyperParams.n_head): - t = fluid.LoDTensor() - t.set(tensor, fluid.CPUPlace()) - writer.append_tensor(t) - writer.complete_append_tensor() def test_main(self): if core.is_compiled_with_cuda(): - self.check_network_convergence(transformer, use_cuda=True) self.check_network_convergence( - transformer, use_cuda=True, enable_sequential_execution=True) - self.check_network_convergence(transformer, use_cuda=False, iter=2) + transformer, + use_cuda=True, + feed_data_reader=get_feed_data_reader()) + self.check_network_convergence( + transformer, + use_cuda=True, + enable_sequential_execution=True, + feed_data_reader=get_feed_data_reader()) + self.check_network_convergence( + transformer, + use_cuda=False, + iter=2, + feed_data_reader=get_feed_data_reader()) if __name__ == '__main__': diff --git a/python/paddle/fluid/tests/unittests/test_preprocessor.py b/python/paddle/fluid/tests/unittests/test_preprocessor.py deleted file mode 100644 index 0f0bdfc44a7bec7cdf1af22e2dd291de23293fc8..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/unittests/test_preprocessor.py +++ /dev/null @@ -1,96 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function - -import unittest -import numpy as np - -import paddle -import paddle.fluid as fluid -import paddle.dataset.mnist as mnist -from paddle.fluid.layers.io import open_recordio_file - - -class TestPreprocessor(unittest.TestCase): - def setUp(self): - with fluid.program_guard(fluid.Program(), fluid.Program()): - reader = paddle.batch(mnist.train(), batch_size=32) - feeder = fluid.DataFeeder( - feed_list=[ # order is image and label - fluid.layers.data( - name='image', shape=[784]), - fluid.layers.data( - name='label', shape=[1], dtype='int64'), - ], - place=fluid.CPUPlace()) - self.num_batches = fluid.recordio_writer.convert_reader_to_recordio_file( - './mnist_for_preprocessor_test.recordio', reader, feeder) - - def test_main(self): - N = 10 - - img_expected_res = [] - lbl_expected_res = [] - with fluid.program_guard(fluid.Program(), fluid.Program()): - data_file = open_recordio_file( - './mnist_for_preprocessor_test.recordio', - shapes=[[-1, 784], [-1, 1]], - lod_levels=[0, 0], - dtypes=['float32', 'int64']) - img, lbl = fluid.layers.io.read_file(data_file) - - if fluid.core.is_compiled_with_cuda(): - place = fluid.CUDAPlace(0) - else: - place = fluid.CPUPlace() - exe = fluid.Executor(place) - exe.run(fluid.default_startup_program()) - for _ in range(N): - img_v, lbl_v = exe.run(fetch_list=[img, lbl]) - img_expected_res.append(img_v / 2) - lbl_expected_res.append(lbl_v + 1) - - img_actual_res = [] - lbl_actual_res = [] - with fluid.program_guard(fluid.Program(), fluid.Program()): - data_file = open_recordio_file( - './mnist_for_preprocessor_test.recordio', - shapes=[[-1, 784], [-1, 1]], - lod_levels=[0, 0], - dtypes=['float32', 'int64']) - preprocessor = fluid.layers.io.Preprocessor(reader=data_file) - with preprocessor.block(): - img, lbl = preprocessor.inputs() - img_out = img / 2 - lbl_out = lbl + 1 - preprocessor.outputs(img_out, lbl_out) - - data_file = fluid.layers.io.double_buffer(preprocessor()) - img, lbl = fluid.layers.io.read_file(data_file) - - if fluid.core.is_compiled_with_cuda(): - place = fluid.CUDAPlace(0) - else: - place = fluid.CPUPlace() - exe = fluid.Executor(place) - exe.run(fluid.default_startup_program()) - for _ in range(N): - img_v, lbl_v = exe.run(fetch_list=[img, lbl]) - img_actual_res.append(img_v) - lbl_actual_res.append(lbl_v) - - for idx in range(N): - np.allclose(img_expected_res[idx], img_actual_res[idx]) - np.allclose(lbl_expected_res[idx], lbl_actual_res[idx]) diff --git a/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py b/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py index e4fb9b1970a8da4bfec5d48f1182e9552aa77ca8..abdeff9cb05b58643510a9190205db932e59a6d4 100644 --- a/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py +++ b/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py @@ -15,8 +15,10 @@ from __future__ import print_function import unittest +import paddle import paddle.fluid as fluid from paddle.fluid import compiler +import paddle.fluid.unique_name as unique_name import paddle.fluid.core as core import numpy as np import threading @@ -42,13 +44,42 @@ def as_numpy(tensor_or_numpy): tensor_or_numpy, np.ndarray) else np.array(tensor_or_numpy) -def feed_data(feed_queue, reader): - data_generator = reader() +def sample_list_to_tensor_array(sample_list): + slot_num = None + slots = None + for sample in sample_list: + if slot_num is None: + slot_num = len(sample) + slots = [None] * len(sample) + else: + assert slot_num == len(sample) + + for slot_id, slot_item in enumerate(sample): + if slots[slot_id] is None: + slots[slot_id] = [] + slots[slot_id].append(slot_item) + + tensor_array = fluid.LoDTensorArray() + for slot in slots: + t = fluid.LoDTensor() + t.set(np.array(slot), fluid.CPUPlace()) + tensor_array.append(t) + + return tensor_array + + +def feed_data(feed_queue, batch_reader): + data_generator = batch_reader() while True: data = next(data_generator, None) - if data is None or not feed_queue.push(data): + if data is None or (len(data) == 1 and data[0] is None): break + if not feed_queue.push(sample_list_to_tensor_array(data)): + break + + feed_queue.close() + def simple_fc_net(in_size, class_num, @@ -57,26 +88,25 @@ def simple_fc_net(in_size, queue_capacity, use_double_buffer=False, use_feed_list=True): + in_data = fluid.layers.data(name="data", dtype='float32', shape=[in_size]) + label = fluid.layers.data(name='label', dtype='int64', shape=[1]) if use_feed_list: - data = fluid.layers.data(name="data", dtype='float32', shape=[in_size]) - label = fluid.layers.data(name='label', dtype='int64', shape=[1]) py_reader = fluid.layers.create_py_reader_by_data( capacity=queue_capacity, - use_double_buffer=False, - feed_list=[data, label]) + use_double_buffer=use_double_buffer, + feed_list=[in_data, label], + name=unique_name.generate('py_reader_name')) else: py_reader = fluid.layers.py_reader( capacity=queue_capacity, - shapes=[[-1, in_size], [-1, 1]], - lod_levels=[0, 0], + shapes=[in_data.shape, label.shape], dtypes=['float32', 'int64'], - use_double_buffer=False) - feed_queue = py_reader.queue - reader = fluid.layers.batch(py_reader, batch_size=batch_size) - if use_double_buffer: - reader = fluid.layers.double_buffer(reader) + name=unique_name.generate('py_reader_name'), + use_double_buffer=use_double_buffer) + + in_data, label = fluid.layers.read_file(py_reader) - in_data, label = fluid.layers.read_file(reader) + feed_queue = py_reader.queue hidden = in_data for hidden_size in hidden_sizes: @@ -128,33 +158,24 @@ class TestPyReaderUsingExecutor(unittest.TestCase): def tensor_reader(self, use_decorate_paddle_reader): def reader(): - self.inputs = [] - cnt = 0 - while True: - tensors = fluid.LoDTensorArray() + for sample_id in range(self.batch_size * self.iterations * + self.batch_size_times): in_data = np.random.uniform( - low=0, high=1, size=(1, self.in_size)).astype('float32') - tensors.append(as_tensor(in_data)) + low=0, high=1, size=(self.in_size, )).astype('float32') label = np.random.random_integers( - low=0, high=self.class_num - 1, size=(1, 1)).astype('int64') - tensors.append(as_tensor(label)) - - if cnt < self.iterations * self.batch_size * self.batch_size_times: - if cnt % (self.batch_size * self.batch_size_times) == 0: - self.inputs.append([in_data, label]) - else: - self.inputs[-1][0] = np.concatenate( - (self.inputs[-1][0], in_data), axis=0) - self.inputs[-1][1] = np.concatenate( - (self.inputs[-1][1], label), axis=0) - elif not self.use_double_buffer: - break + low=0, high=self.class_num - 1, size=(1, )).astype('int64') - if use_decorate_paddle_reader: - yield [(in_data, label)] + reshaped_in_data = np.reshape(in_data, [1, -1]) + reshaped_label = np.reshape(label, [1, -1]) + if sample_id % (self.batch_size * self.batch_size_times) == 0: + self.inputs.append([reshaped_in_data, reshaped_label]) else: - yield tensors - cnt += 1 + self.inputs[-1][0] = np.concatenate( + (self.inputs[-1][0], reshaped_in_data), axis=0) + self.inputs[-1][1] = np.concatenate( + (self.inputs[-1][1], reshaped_label), axis=0) + + yield in_data, label if not use_decorate_paddle_reader: yield None @@ -205,21 +226,31 @@ class TestPyReaderUsingExecutor(unittest.TestCase): self.batch_size_times = 1 reader = self.tensor_reader(use_decorate_paddle_reader) + batch_reader = paddle.batch(reader, batch_size=self.batch_size) + + self.inputs = [] + self.outputs = [] + if use_decorate_paddle_reader: - py_reader.decorate_paddle_reader(reader) + if use_feed_list: + py_reader.decorate_paddle_reader(batch_reader) + else: + py_reader.decorate_sample_list_generator(batch_reader) py_reader.start() else: thread = threading.Thread( - target=feed_data, args=(feed_queue, reader)) + target=feed_data, args=(feed_queue, batch_reader)) thread.daemon = True thread.start() - self.outputs = [] - for _ in range(self.iterations): - fetches = exe.run(train_cp, - fetch_list=[in_data.name, label.name]) - fetches = [as_numpy(fetch) for fetch in fetches] - self.outputs.append(fetches) + try: + while True: + fetches = exe.run(train_cp, + fetch_list=[in_data.name, label.name]) + fetches = [as_numpy(fetch) for fetch in fetches] + self.outputs.append(fetches) + except fluid.core.EOFException: + pass feed_queue.close() self.validate() @@ -230,8 +261,13 @@ class TestPyReaderUsingExecutor(unittest.TestCase): thread.join() def validate(self): - self.assertEqual(len(self.inputs), len(self.outputs)) - for batch_in, batch_out in zip(self.inputs, self.outputs): + if not self.use_double_buffer: + self.assertEqual(len(self.inputs), len(self.outputs)) + else: + self.assertTrue(len(self.inputs) >= len(self.outputs)) + for idx in range(len(self.outputs)): + batch_in = self.inputs[idx] + batch_out = self.outputs[idx] self.assertEqual(len(batch_in), len(batch_out)) if self.use_parallel_executor and not self.use_double_buffer: self.validate_unordered_batch(batch_in, batch_out) diff --git a/python/paddle/fluid/tests/unittests/test_reader_reset.py b/python/paddle/fluid/tests/unittests/test_reader_reset.py index da89ccb961c0af99aea117218eb429a5599c2bd2..cb1be32935b4a1b6450e347378e6548797158dab 100644 --- a/python/paddle/fluid/tests/unittests/test_reader_reset.py +++ b/python/paddle/fluid/tests/unittests/test_reader_reset.py @@ -14,6 +14,7 @@ from __future__ import print_function import os +os.environ['CPU_NUM'] = str(1) import paddle.fluid as fluid from paddle.fluid import compiler import paddle @@ -27,28 +28,14 @@ class TestReaderReset(unittest.TestCase): for n in range(self.total_ins_num): yield np.ones(self.ins_shape) * n, n - # Prepare data - with fluid.program_guard(fluid.Program(), fluid.Program()): - reader = paddle.batch(fake_data_generator, batch_size=1) - feeder = fluid.DataFeeder( - feed_list=[ - fluid.layers.data( - name='data', shape=[3], dtype='float32'), - fluid.layers.data( - name='label', shape=[1], dtype='int64'), - ], - place=fluid.CPUPlace()) - fluid.recordio_writer.convert_reader_to_recordio_file( - self.data_file_name, reader, feeder) + return fake_data_generator def setUp(self): - # set parallel threads to fit 20 batches in line 49 - os.environ['CPU_NUM'] = str(20) self.use_cuda = fluid.core.is_compiled_with_cuda() - self.data_file_name = './reader_reset_test.recordio' self.ins_shape = [3] self.batch_size = 5 - self.total_ins_num = self.batch_size * 20 + self.batch_num = 20 + self.total_ins_num = self.batch_size * self.batch_num self.test_pass_num = 100 self.prepare_data() @@ -57,42 +44,46 @@ class TestReaderReset(unittest.TestCase): startup_prog = fluid.Program() with fluid.program_guard(main_prog, startup_prog): - data_reader_handle = fluid.layers.io.open_files( - filenames=[self.data_file_name], - shapes=[[-1] + self.ins_shape, [-1, 1]], - lod_levels=[0, 0], - dtypes=['float32', 'int64'], - thread_num=1, - pass_num=1) - data_reader = fluid.layers.io.batch(data_reader_handle, - self.batch_size) - if with_double_buffer: - data_reader = fluid.layers.double_buffer(data_reader) - image, label = fluid.layers.read_file(data_reader) + image = fluid.layers.data( + name='image', shape=self.ins_shape, dtype='float32') + label = fluid.layers.data(name='label', shape=[1], dtype='int64') + data_reader_handle = fluid.io.PyReader( + feed_list=[image, label], + capacity=16, + iterable=False, + use_double_buffer=with_double_buffer) fetch_list = [image.name, label.name] place = fluid.CUDAPlace(0) if self.use_cuda else fluid.CPUPlace() exe = fluid.Executor(place) exe.run(startup_prog) + data_reader_handle.decorate_sample_list_generator( + paddle.batch( + self.prepare_data(), batch_size=self.batch_size)) + train_cp = compiler.CompiledProgram(main_prog).with_data_parallel() + + batch_id = 0 pass_count = 0 - while (True): + while pass_count < self.test_pass_num: + data_reader_handle.start() try: - data_val, label_val = exe.run(train_cp, - fetch_list=fetch_list, - return_numpy=True) - ins_num = data_val.shape[0] - broadcasted_label = np.ones((ins_num, ) + tuple( - self.ins_shape)) * label_val.reshape((ins_num, 1)) - self.assertEqual(data_val.all(), broadcasted_label.all()) - + while True: + data_val, label_val = exe.run(train_cp, + fetch_list=fetch_list, + return_numpy=True) + ins_num = data_val.shape[0] + broadcasted_label = np.ones((ins_num, ) + tuple( + self.ins_shape)) * label_val.reshape((ins_num, 1)) + self.assertEqual(data_val.all(), broadcasted_label.all()) + batch_id += 1 except fluid.core.EOFException: + data_reader_handle.reset() pass_count += 1 - if pass_count < self.test_pass_num: - data_reader_handle.reset() - else: - break + self.assertEqual(pass_count * self.batch_num, batch_id) + + self.assertEqual(pass_count, self.test_pass_num) def test_all(self): self.main(with_double_buffer=False) diff --git a/python/paddle/fluid/tests/unittests/test_recordio_reader.py b/python/paddle/fluid/tests/unittests/test_recordio_reader.py deleted file mode 100644 index 0417da7228e96ed8daffa7bbfcb7c12358cd78ec..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/unittests/test_recordio_reader.py +++ /dev/null @@ -1,92 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function - -import unittest - -import paddle.fluid as fluid -import paddle -import paddle.dataset.mnist as mnist -from paddle.fluid.layers.io import open_recordio_file - - -class TestRecordIO(unittest.TestCase): - def setUp(self): - # Convert mnist to recordio file - with fluid.program_guard(fluid.Program(), fluid.Program()): - reader = paddle.batch(mnist.train(), batch_size=32) - feeder = fluid.DataFeeder( - feed_list=[ # order is image and label - fluid.layers.data( - name='image', shape=[784]), - fluid.layers.data( - name='label', shape=[1], dtype='int64'), - ], - place=fluid.CPUPlace()) - self.num_batches = fluid.recordio_writer.convert_reader_to_recordio_file( - './mnist.recordio', reader, feeder) - - def test_main(self, decorator_callback=None): - # use new program - with fluid.program_guard(fluid.Program(), fluid.Program()): - data_file = open_recordio_file( - './mnist.recordio', - shapes=[[-1, 784], [-1, 1]], - lod_levels=[0, 0], - dtypes=['float32', 'int64']) - if decorator_callback is not None: - data_file = decorator_callback(data_file) - img, label = fluid.layers.read_file(data_file) - - hidden = fluid.layers.fc(input=img, size=100, act='tanh') - prediction = fluid.layers.fc(input=hidden, size=10, act='softmax') - loss = fluid.layers.cross_entropy(input=prediction, label=label) - avg_loss = fluid.layers.mean(loss) - - fluid.optimizer.Adam(learning_rate=1e-3).minimize(avg_loss) - - if fluid.core.is_compiled_with_cuda(): - place = fluid.CUDAPlace(0) - else: - place = fluid.CPUPlace() - - exe = fluid.Executor(place) - exe.run(fluid.default_startup_program()) - avg_loss_np = [] - - # train a pass - batch_id = 0 - while True: - try: - tmp, = exe.run(fetch_list=[avg_loss]) - except fluid.core.EOFException: - break - - avg_loss_np.append(tmp) - batch_id += 1 - self.assertEqual(batch_id, self.num_batches) - self.assertLess(avg_loss_np[-1], avg_loss_np[0]) - - def test_shuffle_reader(self): - self.test_main(decorator_callback=lambda reader: fluid.layers.io.shuffle( - reader, buffer_size=200)) - - def test_double_buffer_reader(self): - self.test_main(decorator_callback=lambda reader: fluid.layers.io.double_buffer(reader, - place='cuda:0' if fluid.core.is_compiled_with_cuda() else 'cpu')) - - -if __name__ == '__main__': - unittest.main() diff --git a/python/paddle/fluid/tests/unittests/transformer_model.py b/python/paddle/fluid/tests/unittests/transformer_model.py index d59f9da4a94a81e9403ffe153f19c7aee2762bc8..1782d432490c796362590805ab20cad1f6a61359 100644 --- a/python/paddle/fluid/tests/unittests/transformer_model.py +++ b/python/paddle/fluid/tests/unittests/transformer_model.py @@ -20,7 +20,6 @@ import numpy as np import os import paddle.fluid as fluid import paddle.fluid.layers as layers -from paddle.fluid.layers.io import open_recordio_file pos_enc_param_names = ( "src_pos_enc_table", @@ -394,6 +393,51 @@ def decoder(dec_input, return dec_output +def build_inputs(max_length, n_head): + names = [ + 'src_word', + 'src_pos', + 'trg_word', + 'trg_pos', + 'src_slf_attn_bias', + 'trg_slf_attn_bias', + 'trg_src_attn_bias', + 'gold', + 'weights', + ] + + shapes = [ + [batch_size * max_length, 1], + [batch_size * max_length, 1], + [batch_size * max_length, 1], + [batch_size * max_length, 1], + [batch_size, n_head, max_length, max_length], + [batch_size, n_head, max_length, max_length], + [batch_size, n_head, max_length, max_length], + [batch_size * max_length, 1], + [batch_size * max_length, 1], + ] + + dtypes = [ + 'int64', + 'int64', + 'int64', + 'int64', + 'float32', + 'float32', + 'float32', + 'int64', + 'float32', + ] + + all_inputs = [] + for name, shape, dtype in zip(names, shapes, dtypes): + all_inputs.append( + fluid.layers.data( + name=name, shape=shape, dtype=dtype, append_batch_size=False)) + return all_inputs + + def transformer( src_vocab_size, trg_vocab_size, @@ -408,34 +452,9 @@ def transformer( src_pad_idx, trg_pad_idx, pos_pad_idx, ): - file_obj = open_recordio_file( - filename=os.environ.get('RECORDIO_FILENAME', '/tmp/wmt16.recordio'), - shapes=[ - [batch_size * max_length, 1], - [batch_size * max_length, 1], - [batch_size * max_length, 1], - [batch_size * max_length, 1], - [batch_size, n_head, max_length, max_length], - [batch_size, n_head, max_length, max_length], - [batch_size, n_head, max_length, max_length], - [batch_size * max_length, 1], - [batch_size * max_length, 1], - ], - dtypes=[ - 'int64', - 'int64', - 'int64', - 'int64', - 'float32', - 'float32', - 'float32', - 'int64', - 'float32', - ], - lod_levels=[0] * 9) - - src_word, src_pos, trg_word, trg_pos, src_slf_attn_bias, trg_slf_attn_bias, trg_src_attn_bias, gold, weights = fluid.layers.read_file( - file_obj) + + src_word, src_pos, trg_word, trg_pos, src_slf_attn_bias, trg_slf_attn_bias, trg_src_attn_bias, gold, weights = build_inputs( + max_length, n_head) enc_input = prepare_encoder( src_word, diff --git a/python/paddle/reader/tests/test_data_creator.txt b/python/paddle/reader/tests/test_data_creator.txt deleted file mode 100644 index a2a8d47d43868d369083808497697da79e620e31..0000000000000000000000000000000000000000 --- a/python/paddle/reader/tests/test_data_creator.txt +++ /dev/null @@ -1,3 +0,0 @@ -0 1 -2 3 -4 5 diff --git a/python/paddle/reader/tests/test_reader_recordio.dat b/python/paddle/reader/tests/test_reader_recordio.dat deleted file mode 100644 index a99a35bb829e066c4845d0b85b96cd1eb3a12491..0000000000000000000000000000000000000000 Binary files a/python/paddle/reader/tests/test_reader_recordio.dat and /dev/null differ diff --git a/python/paddle/reader/tests/test_recordio_creator.dat b/python/paddle/reader/tests/test_recordio_creator.dat deleted file mode 100644 index 17aa89b6796184407e83246d3f342a55a66b4a69..0000000000000000000000000000000000000000 Binary files a/python/paddle/reader/tests/test_recordio_creator.dat and /dev/null differ diff --git a/python/requirements.txt b/python/requirements.txt index 26dc552140fbe4e2f9e3073a8b08ff9923b34e96..c4ced49be3332edd43adccd748274fbbaaf06777 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -2,7 +2,6 @@ requests>=2.20.0 numpy>=1.12, <=1.16.4 ; python_version<"3.5" numpy>=1.12 ; python_version>="3.5" protobuf>=3.1.0 -recordio>=0.1.0 matplotlib<=2.2.4 ; python_version<"3.6" scipy>=0.19.0, <=1.2.1 ; python_version<"3.5" nltk>=3.2.2, <=3.4 ; python_version<"3.5"