From 5b6673c44df19ee89c191cc7efeb669c1943050f Mon Sep 17 00:00:00 2001 From: Zeng Jinle <32832641+sneaxiy@users.noreply.github.com> Date: Mon, 19 Aug 2019 16:26:47 +0800 Subject: [PATCH] merge develop to solve conflict, also fix API doc, test=develop (#18823) --- CMakeLists.txt | 2 - cmake/external/brpc.cmake | 4 +- cmake/external/leveldb.cmake | 2 - cmake/external/snappy.cmake | 65 ---- cmake/external/snappystream.cmake | 63 ---- cmake/inference_lib.cmake | 12 - paddle/fluid/API.spec | 15 +- paddle/fluid/CMakeLists.txt | 1 - paddle/fluid/framework/CMakeLists.txt | 2 +- paddle/fluid/framework/lod_tensor.cc | 33 -- paddle/fluid/framework/lod_tensor.h | 14 - paddle/fluid/framework/lod_tensor_test.cc | 50 --- .../inference/api/demo_ci/CMakeLists.txt | 8 +- paddle/fluid/operators/reader/CMakeLists.txt | 7 - .../reader/create_batch_reader_op.cc | 151 -------- .../reader/create_multi_pass_reader_op.cc | 93 ----- .../reader/create_random_data_generator_op.cc | 107 ------ .../reader/create_recordio_file_reader_op.cc | 93 ----- .../reader/create_shuffle_reader_op.cc | 124 ------- .../fluid/operators/reader/open_files_op.cc | 277 -------------- paddle/fluid/pybind/CMakeLists.txt | 1 - paddle/fluid/pybind/pybind.cc | 2 - paddle/fluid/pybind/recordio.cc | 88 ----- paddle/fluid/pybind/recordio.h | 27 -- paddle/fluid/recordio/CMakeLists.txt | 9 - paddle/fluid/recordio/README.md | 13 - paddle/fluid/recordio/chunk.cc | 174 --------- paddle/fluid/recordio/chunk.h | 73 ---- paddle/fluid/recordio/chunk_test.cc | 47 --- paddle/fluid/recordio/header.cc | 70 ---- paddle/fluid/recordio/header.h | 66 ---- paddle/fluid/recordio/header_test.cc | 29 -- paddle/fluid/recordio/scanner.cc | 57 --- paddle/fluid/recordio/scanner.h | 43 --- paddle/fluid/recordio/writer.cc | 40 -- paddle/fluid/recordio/writer.h | 44 --- paddle/fluid/recordio/writer_scanner_test.cc | 70 ---- python/paddle/dataset/common.py | 38 -- python/paddle/fluid/__init__.py | 1 - python/paddle/fluid/layers/io.py | 350 +----------------- python/paddle/fluid/recordio_writer.py | 132 ------- .../file_reader/convert_data_to_recordio.py | 63 ---- python/paddle/fluid/tests/test_cpp_reader.py | 94 ----- .../fluid/tests/unittests/CMakeLists.txt | 2 +- .../fluid/tests/unittests/feed_data_reader.py | 78 ++++ .../unittests/parallel_executor_test_base.py | 14 +- .../test_fuse_relu_depthwise_conv_pass.py | 33 +- .../unittests/test_ir_memory_optimize_pass.py | 40 +- .../test_ir_memory_optimize_transformer.py | 30 +- .../tests/unittests/test_multi_file_reader.py | 81 ---- .../tests/unittests/test_multi_pass_reader.py | 69 ---- .../test_parallel_executor_transformer.py | 67 +++- .../tests/unittests/test_preprocessor.py | 96 ----- .../test_py_reader_using_executor.py | 132 ++++--- .../tests/unittests/test_reader_reset.py | 75 ++-- .../tests/unittests/test_recordio_reader.py | 92 ----- .../tests/unittests/transformer_model.py | 77 ++-- .../paddle/reader/tests/test_data_creator.txt | 3 - .../reader/tests/test_reader_recordio.dat | Bin 76 -> 0 bytes .../reader/tests/test_recordio_creator.dat | Bin 88 -> 0 bytes python/requirements.txt | 1 - 61 files changed, 343 insertions(+), 3201 deletions(-) delete mode 100644 cmake/external/snappy.cmake delete mode 100644 cmake/external/snappystream.cmake delete mode 100644 paddle/fluid/operators/reader/create_batch_reader_op.cc delete mode 100644 paddle/fluid/operators/reader/create_multi_pass_reader_op.cc delete mode 100644 paddle/fluid/operators/reader/create_random_data_generator_op.cc delete mode 100644 paddle/fluid/operators/reader/create_recordio_file_reader_op.cc delete mode 100644 paddle/fluid/operators/reader/create_shuffle_reader_op.cc delete mode 100644 paddle/fluid/operators/reader/open_files_op.cc delete mode 100644 paddle/fluid/pybind/recordio.cc delete mode 100644 paddle/fluid/pybind/recordio.h delete mode 100644 paddle/fluid/recordio/CMakeLists.txt delete mode 100644 paddle/fluid/recordio/README.md delete mode 100644 paddle/fluid/recordio/chunk.cc delete mode 100644 paddle/fluid/recordio/chunk.h delete mode 100644 paddle/fluid/recordio/chunk_test.cc delete mode 100644 paddle/fluid/recordio/header.cc delete mode 100644 paddle/fluid/recordio/header.h delete mode 100644 paddle/fluid/recordio/header_test.cc delete mode 100644 paddle/fluid/recordio/scanner.cc delete mode 100644 paddle/fluid/recordio/scanner.h delete mode 100644 paddle/fluid/recordio/writer.cc delete mode 100644 paddle/fluid/recordio/writer.h delete mode 100644 paddle/fluid/recordio/writer_scanner_test.cc delete mode 100644 python/paddle/fluid/recordio_writer.py delete mode 100644 python/paddle/fluid/tests/demo/file_reader/convert_data_to_recordio.py delete mode 100644 python/paddle/fluid/tests/test_cpp_reader.py create mode 100644 python/paddle/fluid/tests/unittests/feed_data_reader.py delete mode 100644 python/paddle/fluid/tests/unittests/test_multi_file_reader.py delete mode 100644 python/paddle/fluid/tests/unittests/test_multi_pass_reader.py delete mode 100644 python/paddle/fluid/tests/unittests/test_preprocessor.py delete mode 100644 python/paddle/fluid/tests/unittests/test_recordio_reader.py delete mode 100644 python/paddle/reader/tests/test_data_creator.txt delete mode 100644 python/paddle/reader/tests/test_reader_recordio.dat delete mode 100644 python/paddle/reader/tests/test_recordio_creator.dat diff --git a/CMakeLists.txt b/CMakeLists.txt index 656c890bcd..be6a4d7c09 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -149,8 +149,6 @@ include(external/cub) include(external/rocprim) include(external/xxhash) # download xxhash include(external/dlpack) -include(external/snappy) # download snappy -include(external/snappystream) # download snappystream include(external/warpctc) # download, build, install warpctc if (NOT WIN32) diff --git a/cmake/external/brpc.cmake b/cmake/external/brpc.cmake index 0dd35c090e..a5a86afa4a 100644 --- a/cmake/external/brpc.cmake +++ b/cmake/external/brpc.cmake @@ -33,7 +33,7 @@ SET(BRPC_LIBRARIES "${BRPC_INSTALL_DIR}/lib/libbrpc.a" CACHE FILEPATH "brpc libr INCLUDE_DIRECTORIES(${BRPC_INCLUDE_DIR}) # Reference https://stackoverflow.com/questions/45414507/pass-a-list-of-prefix-paths-to-externalproject-add-in-cmake-args -set(prefix_path "${THIRD_PARTY_PATH}/install/gflags|${THIRD_PARTY_PATH}/install/leveldb|${THIRD_PARTY_PATH}/install/snappy|${THIRD_PARTY_PATH}/install/gtest|${THIRD_PARTY_PATH}/install/protobuf|${THIRD_PARTY_PATH}/install/zlib|${THIRD_PARTY_PATH}/install/glog") +set(prefix_path "${THIRD_PARTY_PATH}/install/gflags|${THIRD_PARTY_PATH}/install/gtest|${THIRD_PARTY_PATH}/install/protobuf|${THIRD_PARTY_PATH}/install/zlib|${THIRD_PARTY_PATH}/install/glog") # If minimal .a is need, you can set WITH_DEBUG_SYMBOLS=OFF ExternalProject_Add( @@ -62,7 +62,7 @@ ExternalProject_Add( -DCMAKE_POSITION_INDEPENDENT_CODE:BOOL=ON -DCMAKE_BUILD_TYPE:STRING=${THIRD_PARTY_BUILD_TYPE} ) -ADD_DEPENDENCIES(extern_brpc protobuf ssl crypto leveldb gflags glog gtest snappy) +ADD_DEPENDENCIES(extern_brpc protobuf ssl crypto leveldb gflags glog gtest) ADD_LIBRARY(brpc STATIC IMPORTED GLOBAL) SET_PROPERTY(TARGET brpc PROPERTY IMPORTED_LOCATION ${BRPC_LIBRARIES}) ADD_DEPENDENCIES(brpc extern_brpc) diff --git a/cmake/external/leveldb.cmake b/cmake/external/leveldb.cmake index ac0febd076..3ba8a466c6 100644 --- a/cmake/external/leveldb.cmake +++ b/cmake/external/leveldb.cmake @@ -34,8 +34,6 @@ ExternalProject_Add( BUILD_IN_SOURCE 1 ) -ADD_DEPENDENCIES(extern_leveldb snappy) - ADD_LIBRARY(leveldb STATIC IMPORTED GLOBAL) SET_PROPERTY(TARGET leveldb PROPERTY IMPORTED_LOCATION ${LEVELDB_LIBRARIES}) ADD_DEPENDENCIES(leveldb extern_leveldb) diff --git a/cmake/external/snappy.cmake b/cmake/external/snappy.cmake deleted file mode 100644 index 3fb6b49f47..0000000000 --- a/cmake/external/snappy.cmake +++ /dev/null @@ -1,65 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -include (ExternalProject) - -# NOTE: snappy is needed when linking with recordio - -set(SNAPPY_SOURCES_DIR ${THIRD_PARTY_PATH}/snappy) -set(SNAPPY_INSTALL_DIR ${THIRD_PARTY_PATH}/install/snappy) -set(SNAPPY_INCLUDE_DIR "${SNAPPY_INSTALL_DIR}/include" CACHE PATH "snappy include directory." FORCE) - -if(WIN32) - SET(SNAPPY_CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /wd4244 /wd4267") -else() - SET(SNAPPY_CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS}) -endif() - -ExternalProject_Add( - extern_snappy - GIT_REPOSITORY "https://github.com/google/snappy" - GIT_TAG "1.1.7" - PREFIX ${SNAPPY_SOURCES_DIR} - UPDATE_COMMAND "" - CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER} - -DCMAKE_C_COMPILER=${CMAKE_C_COMPILER} - -DCMAKE_C_FLAGS=${CMAKE_C_FLAGS} - -DCMAKE_C_FLAGS_DEBUG=${CMAKE_C_FLAGS_DEBUG} - -DCMAKE_C_FLAGS_RELEASE=${CMAKE_C_FLAGS_RELEASE} - -DCMAKE_CXX_FLAGS=${SNAPPY_CMAKE_CXX_FLAGS} - -DCMAKE_CXX_FLAGS_RELEASE=${CMAKE_CXX_FLAGS_RELEASE} - -DCMAKE_CXX_FLAGS_DEBUG=${CMAKE_CXX_FLAGS_DEBUG} - -DCMAKE_INSTALL_PREFIX=${SNAPPY_INSTALL_DIR} - -DCMAKE_INSTALL_LIBDIR=${SNAPPY_INSTALL_DIR}/lib - -DCMAKE_POSITION_INDEPENDENT_CODE=ON - -DBUILD_TESTING=OFF - -DSNAPPY_BUILD_TESTS:BOOL=OFF - -DCMAKE_BUILD_TYPE=${THIRD_PARTY_BUILD_TYPE} - ${EXTERNAL_OPTIONAL_ARGS} - CMAKE_CACHE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=${SNAPPY_INSTALL_DIR} - -DCMAKE_INSTALL_LIBDIR:PATH=${SNAPPY_INSTALL_DIR}/lib - -DCMAKE_POSITION_INDEPENDENT_CODE:BOOL=ON - -DCMAKE_BUILD_TYPE:STRING=${THIRD_PARTY_BUILD_TYPE} -) -IF(WIN32) - set(SNAPPY_LIBRARIES "${SNAPPY_INSTALL_DIR}/lib/snappy.lib") -else(WIN32) - set(SNAPPY_LIBRARIES "${SNAPPY_INSTALL_DIR}/lib/libsnappy.a") -endif (WIN32) - -add_library(snappy STATIC IMPORTED GLOBAL) -set_property(TARGET snappy PROPERTY IMPORTED_LOCATION ${SNAPPY_LIBRARIES}) - -include_directories(${SNAPPY_INCLUDE_DIR}) -add_dependencies(snappy extern_snappy) diff --git a/cmake/external/snappystream.cmake b/cmake/external/snappystream.cmake deleted file mode 100644 index 392f186b7c..0000000000 --- a/cmake/external/snappystream.cmake +++ /dev/null @@ -1,63 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -include (ExternalProject) - -set(SNAPPYSTREAM_SOURCES_DIR ${THIRD_PARTY_PATH}/snappy_stream) -set(SNAPPYSTREAM_INSTALL_DIR ${THIRD_PARTY_PATH}/install/snappy_stream) -set(SNAPPYSTREAM_INCLUDE_DIR "${SNAPPYSTREAM_INSTALL_DIR}/include" CACHE PATH "snappy stream include directory." FORCE) - -if(WIN32) - # Fix me, VS2015 come without VLA support - set(SNAPPYSTREAM_LIBRARIES "${SNAPPYSTREAM_INSTALL_DIR}/lib/snappystream.lib") - MESSAGE(WARNING, "In windows, snappystream has no compile support for windows, - please build it manually and put it at " ${SNAPPYSTREAM_INSTALL_DIR}) -else(WIN32) - set(SNAPPYSTREAM_LIBRARIES "${SNAPPYSTREAM_INSTALL_DIR}/lib/libsnappystream.a") - - ExternalProject_Add( - extern_snappystream - GIT_REPOSITORY "https://github.com/hoxnox/snappystream.git" - GIT_TAG "0.2.8" - PREFIX ${SNAPPYSTREAM_SOURCES_DIR} - UPDATE_COMMAND "" - CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER} - -DCMAKE_C_COMPILER=${CMAKE_C_COMPILER} - -DCMAKE_C_FLAGS=${CMAKE_C_FLAGS} - -DCMAKE_C_FLAGS_DEBUG=${CMAKE_C_FLAGS_DEBUG} - -DCMAKE_C_FLAGS_RELEASE=${CMAKE_C_FLAGS_RELEASE} - -DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS} - -DCMAKE_CXX_FLAGS_RELEASE=${CMAKE_CXX_FLAGS_RELEASE} - -DCMAKE_CXX_FLAGS_DEBUG=${CMAKE_CXX_FLAGS_DEBUG} - -DCMAKE_INSTALL_PREFIX=${SNAPPY_INSTALL_DIR} - -DCMAKE_INSTALL_LIBDIR=${SNAPPY_INSTALL_DIR}/lib - -DCMAKE_POSITION_INDEPENDENT_CODE=ON - -DCMAKE_BUILD_TYPE=${THIRD_PARTY_BUILD_TYPE} - -DSNAPPY_ROOT=${SNAPPY_INSTALL_DIR} - ${EXTERNAL_OPTIONAL_ARGS} - CMAKE_CACHE_ARGS - -DCMAKE_INSTALL_PREFIX:PATH=${SNAPPYSTREAM_INSTALL_DIR} - -DCMAKE_INSTALL_LIBDIR:PATH=${SNAPPYSTREAM_INSTALL_DIR}/lib - -DCMAKE_BUILD_TYPE:STRING=${THIRD_PARTY_BUILD_TYPE} - DEPENDS snappy - ) -endif(WIN32) - -add_library(snappystream STATIC IMPORTED GLOBAL) -set_property(TARGET snappystream PROPERTY IMPORTED_LOCATION ${SNAPPYSTREAM_LIBRARIES}) - -include_directories(${SNAPPYSTREAM_INCLUDE_DIR}) # For snappysteam to include its own headers. -include_directories(${THIRD_PARTY_PATH}/install) # For Paddle to include snappy stream headers. - -add_dependencies(snappystream extern_snappystream) diff --git a/cmake/inference_lib.cmake b/cmake/inference_lib.cmake index 87a7a1a86f..10d399209e 100644 --- a/cmake/inference_lib.cmake +++ b/cmake/inference_lib.cmake @@ -149,18 +149,6 @@ if (WITH_NGRAPH) ) endif () -set(dst_dir "${FLUID_INSTALL_DIR}/third_party/install/snappy") -copy(snappy_lib - SRCS ${SNAPPY_INCLUDE_DIR} ${SNAPPY_LIBRARIES} - DSTS ${dst_dir} ${dst_dir}/lib - DEPS snappy) - -set(dst_dir "${FLUID_INSTALL_DIR}/third_party/install/snappystream") -copy(snappystream_lib - SRCS ${SNAPPYSTREAM_INCLUDE_DIR} ${SNAPPYSTREAM_LIBRARIES} - DSTS ${dst_dir} ${dst_dir}/lib - DEPS snappystream) - set(dst_dir "${FLUID_INSTALL_DIR}/third_party/install/zlib") copy(zlib_lib SRCS ${ZLIB_INCLUDE_DIR} ${ZLIB_LIBRARIES} diff --git a/paddle/fluid/API.spec b/paddle/fluid/API.spec index 97e9ca6bde..c823cb54e3 100644 --- a/paddle/fluid/API.spec +++ b/paddle/fluid/API.spec @@ -288,19 +288,10 @@ paddle.fluid.layers.var_conv_2d (ArgSpec(args=['input', 'row', 'col', 'input_cha paddle.fluid.layers.shard_index (ArgSpec(args=['input', 'index_num', 'nshards', 'shard_id', 'ignore_value'], varargs=None, keywords=None, defaults=(-1,)), ('document', '5786fdbba6753ecd6cbce5e6b0889924')) paddle.fluid.layers.hard_swish (ArgSpec(args=['x', 'threshold', 'scale', 'offset', 'name'], varargs=None, keywords=None, defaults=(6.0, 6.0, 3.0, None)), ('document', '6a5152a7015c62cb8278fc24cb456459')) paddle.fluid.layers.data (ArgSpec(args=['name', 'shape', 'append_batch_size', 'dtype', 'lod_level', 'type', 'stop_gradient'], varargs=None, keywords=None, defaults=(True, 'float32', 0, VarType.LOD_TENSOR, True)), ('document', '9d7806e31bdf727c1a23b8782a09b545')) -paddle.fluid.layers.open_files (ArgSpec(args=['filenames', 'shapes', 'lod_levels', 'dtypes', 'thread_num', 'buffer_size', 'pass_num', 'is_test'], varargs=None, keywords=None, defaults=(None, None, 1, None)), ('document', 'cccb6eb5410c822e5307c947aca2c899')) -paddle.fluid.layers.read_file (ArgSpec(args=['reader'], varargs=None, keywords=None, defaults=None), ('document', '32181f6037e387fb6e68a5beaafe33b6')) -paddle.fluid.layers.shuffle (ArgSpec(args=['reader', 'buffer_size'], varargs=None, keywords=None, defaults=None), ('document', 'aa5803d1eccdaef03cdfb0b7ca088071')) -paddle.fluid.layers.batch (ArgSpec(args=['reader', 'batch_size'], varargs=None, keywords=None, defaults=None), ('document', '3007211c84c5c77eda8dc83619a6eaf8')) -paddle.fluid.layers.double_buffer (ArgSpec(args=['reader', 'place', 'name'], varargs=None, keywords=None, defaults=(None, None)), ('document', '7241dd1c142f4c65c8d7f66948140aa7')) -paddle.fluid.layers.random_data_generator (ArgSpec(args=['low', 'high', 'shapes', 'lod_levels', 'for_parallel'], varargs=None, keywords=None, defaults=(True,)), ('document', '290f5b97f24f0022e195f7228dd56fd9')) +paddle.fluid.layers.read_file (ArgSpec(args=['reader'], varargs=None, keywords=None, defaults=None), ('document', '88367daf9a30c9ab83adc5d7221e23ef')) +paddle.fluid.layers.double_buffer (ArgSpec(args=['reader', 'place', 'name'], varargs=None, keywords=None, defaults=(None, None)), ('document', '44724c493f41a124abc7531c2740e2e3')) paddle.fluid.layers.py_reader (ArgSpec(args=['capacity', 'shapes', 'dtypes', 'lod_levels', 'name', 'use_double_buffer'], varargs=None, keywords=None, defaults=(None, None, True)), ('document', 'd78a1c7344955c5caed8dc13adb7beb6')) paddle.fluid.layers.create_py_reader_by_data (ArgSpec(args=['capacity', 'feed_list', 'name', 'use_double_buffer'], varargs=None, keywords=None, defaults=(None, True)), ('document', '2edf37d57862b24a7a26aa19a3573f73')) -paddle.fluid.layers.Preprocessor ('paddle.fluid.layers.io.Preprocessor', ('document', '1c2efbbc1197b44941a95b9ec4e737ae')) -paddle.fluid.layers.Preprocessor.__init__ (ArgSpec(args=['self', 'reader', 'name'], varargs=None, keywords=None, defaults=(None,)), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) -paddle.fluid.layers.Preprocessor.block (ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) -paddle.fluid.layers.Preprocessor.inputs (ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) -paddle.fluid.layers.Preprocessor.outputs (ArgSpec(args=['self'], varargs='outs', keywords=None, defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) paddle.fluid.layers.load (ArgSpec(args=['out', 'file_path', 'load_as_fp16'], varargs=None, keywords=None, defaults=(None,)), ('document', '9d1a4bc97bbce9fa1d4f7a4200a771ff')) paddle.fluid.layers.create_tensor (ArgSpec(args=['dtype', 'name', 'persistable'], varargs=None, keywords=None, defaults=(None, False)), ('document', 'aaf0176c743c43e9bc684dd7dfac25c5')) paddle.fluid.layers.create_parameter (ArgSpec(args=['shape', 'dtype', 'name', 'attr', 'is_bias', 'default_initializer'], varargs=None, keywords=None, defaults=(None, None, False, None)), ('document', '021272f30e0cdf7503586815378abfb8')) @@ -1044,7 +1035,5 @@ paddle.fluid.profiler.stop_profiler (ArgSpec(args=['sorted_key', 'profile_path'] paddle.fluid.unique_name.generate (ArgSpec(args=['key'], varargs=None, keywords=None, defaults=None), ('document', '4d68cde4c4df8f1b8018620b4dc19b42')) paddle.fluid.unique_name.switch (ArgSpec(args=['new_generator'], varargs=None, keywords=None, defaults=(None,)), ('document', '695a6e91afbcdbafac69a069038811be')) paddle.fluid.unique_name.guard (ArgSpec(args=['new_generator'], varargs=None, keywords=None, defaults=(None,)), ('document', 'ead717d6d440a1eb11971695cd1727f4')) -paddle.fluid.recordio_writer.convert_reader_to_recordio_file (ArgSpec(args=['filename', 'reader_creator', 'feeder', 'compressor', 'max_num_records', 'feed_order'], varargs=None, keywords=None, defaults=(Compressor.Snappy, 1000, None)), ('document', '65c7523e86f0c50bb729b01667f36310')) -paddle.fluid.recordio_writer.convert_reader_to_recordio_files (ArgSpec(args=['filename', 'batch_per_file', 'reader_creator', 'feeder', 'compressor', 'max_num_records', 'feed_order'], varargs=None, keywords=None, defaults=(Compressor.Snappy, 1000, None)), ('document', 'bc643f0f5f1b9db57ff0d8a57d379bd7')) paddle.fluid.Scope Scope() -> paddle.fluid.core_avx._Scope paddle.fluid.install_check.run_check (ArgSpec(args=[], varargs=None, keywords=None, defaults=None), ('document', '66b7c84a17ed32fec2df9628367be2b9')) diff --git a/paddle/fluid/CMakeLists.txt b/paddle/fluid/CMakeLists.txt index 595454e90b..16457b564f 100644 --- a/paddle/fluid/CMakeLists.txt +++ b/paddle/fluid/CMakeLists.txt @@ -4,7 +4,6 @@ add_subdirectory(framework) add_subdirectory(imperative) add_subdirectory(operators) add_subdirectory(string) -add_subdirectory(recordio) add_subdirectory(pybind) # NOTE: please add subdirectory inference at last. diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index 7dc3a6c60e..da1e977a9d 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -63,7 +63,7 @@ if(WITH_GPU) else() cc_test(mixed_vector_test SRCS mixed_vector_test.cc DEPS place memory device_context tensor) endif() -cc_library(lod_tensor SRCS lod_tensor.cc DEPS ddim place tensor framework_proto recordio version) +cc_library(lod_tensor SRCS lod_tensor.cc DEPS ddim place tensor framework_proto version) cc_test(lod_tensor_test SRCS lod_tensor_test.cc DEPS lod_tensor memory) nv_test(lod_tensor_gpu_test SRCS lod_tensor_test.cu DEPS lod_tensor) diff --git a/paddle/fluid/framework/lod_tensor.cc b/paddle/fluid/framework/lod_tensor.cc index 9883a19405..89122851c7 100644 --- a/paddle/fluid/framework/lod_tensor.cc +++ b/paddle/fluid/framework/lod_tensor.cc @@ -26,9 +26,6 @@ limitations under the License. */ #include "paddle/fluid/memory/memcpy.h" #include "paddle/fluid/memory/memory.h" -#include "paddle/fluid/recordio/scanner.h" -#include "paddle/fluid/recordio/writer.h" - namespace paddle { namespace framework { @@ -275,36 +272,6 @@ void DeserializeFromStream(std::istream &is, LoDTensor *tensor, TensorFromStream(is, static_cast(tensor), dev_ctx); } -void WriteToRecordIO(recordio::Writer *writer, - const std::vector &tensor, - const platform::DeviceContext &dev_ctx) { - std::stringstream buffer; - size_t sz = tensor.size(); - buffer.write(reinterpret_cast(&sz), sizeof(uint32_t)); - for (auto &each : tensor) { - SerializeToStream(buffer, each, dev_ctx); - } - writer->Write(buffer.str()); -} - -bool ReadFromRecordIO(recordio::Scanner *scanner, - const platform::DeviceContext &dev_ctx, - std::vector *result_ptr) { - if (!scanner->HasNext()) { - return false; - } - std::istringstream sin(scanner->Next()); - uint32_t sz; - sin.read(reinterpret_cast(&sz), sizeof(uint32_t)); - auto &result = *result_ptr; - result.resize(sz); - for (uint32_t i = 0; i < sz; ++i) { - DeserializeFromStream(sin, &result[i], dev_ctx); - } - - return true; -} - std::vector LoDTensor::SplitLoDTensor( const std::vector places) const { check_memory_size(); diff --git a/paddle/fluid/framework/lod_tensor.h b/paddle/fluid/framework/lod_tensor.h index 5e20ba7c1c..ef48753349 100644 --- a/paddle/fluid/framework/lod_tensor.h +++ b/paddle/fluid/framework/lod_tensor.h @@ -32,12 +32,6 @@ limitations under the License. */ #include "paddle/fluid/platform/place.h" namespace paddle { - -namespace recordio { -class Writer; -class Scanner; -} - namespace framework { /* @@ -216,14 +210,6 @@ void SerializeToStream(std::ostream& os, const LoDTensor& tensor, void DeserializeFromStream(std::istream& is, LoDTensor* tensor, const platform::DeviceContext& dev_ctx); -extern void WriteToRecordIO(recordio::Writer* writer, - const std::vector& tensor, - const platform::DeviceContext& dev_ctx); - -extern bool ReadFromRecordIO(recordio::Scanner* scanner, - const platform::DeviceContext& dev_ctx, - std::vector* result_ptr); - /* * Convert between length-based LoD and offset-based LoD. * The implementation of LoDTensor class use offset-based LoD. diff --git a/paddle/fluid/framework/lod_tensor_test.cc b/paddle/fluid/framework/lod_tensor_test.cc index d1554113bc..1024076e59 100644 --- a/paddle/fluid/framework/lod_tensor_test.cc +++ b/paddle/fluid/framework/lod_tensor_test.cc @@ -20,9 +20,6 @@ #include "paddle/fluid/framework/lod_tensor.h" -#include "paddle/fluid/recordio/scanner.h" -#include "paddle/fluid/recordio/writer.h" - namespace paddle { namespace framework { @@ -281,52 +278,5 @@ TEST(LoD, ConvertToOffsetBasedLoD) { EXPECT_EQ(offset_lod, expected); } -template -static void TestRecordIO() { - LoDTensor tensor; - T* tmp = tensor.mutable_data(make_ddim({4, 5}), platform::CPUPlace()); - for (int i = 0; i < 20; ++i) { - tmp[i] = static_cast(i); - } - - std::stringstream* stream = new std::stringstream(); - auto& ctx = - *platform::DeviceContextPool::Instance().Get(platform::CPUPlace()); - { - recordio::Writer writer(stream, recordio::Compressor::kSnappy); - WriteToRecordIO(&writer, {tensor, tensor}, ctx); - WriteToRecordIO(&writer, {tensor, tensor}, ctx); - writer.Flush(); - } - - auto assert_tensor_ok = [](const LoDTensor& tensor) { - for (int i = 0; i < 20; ++i) { - ASSERT_EQ(tensor.data()[i], static_cast(i)); - } - }; - - { - std::unique_ptr stream_ptr(stream); - recordio::Scanner scanner(std::move(stream_ptr)); - std::vector tensors; - ASSERT_TRUE(ReadFromRecordIO(&scanner, ctx, &tensors)); - ASSERT_EQ(tensors.size(), static_cast(2)); - assert_tensor_ok(tensors[0]); - assert_tensor_ok(tensors[1]); - ASSERT_TRUE(ReadFromRecordIO(&scanner, ctx, &tensors)); - ASSERT_EQ(tensors.size(), static_cast(2)); - assert_tensor_ok(tensors[0]); - assert_tensor_ok(tensors[1]); - } -} - -TEST(LoDTensor, RecordIO) { - TestRecordIO(); - TestRecordIO(); - TestRecordIO(); - TestRecordIO(); - TestRecordIO(); -} - } // namespace framework } // namespace paddle diff --git a/paddle/fluid/inference/api/demo_ci/CMakeLists.txt b/paddle/fluid/inference/api/demo_ci/CMakeLists.txt index 127441ffef..aa31823eea 100644 --- a/paddle/fluid/inference/api/demo_ci/CMakeLists.txt +++ b/paddle/fluid/inference/api/demo_ci/CMakeLists.txt @@ -31,14 +31,10 @@ include_directories("${PADDLE_LIB_THIRD_PARTY_PATH}protobuf/include") include_directories("${PADDLE_LIB_THIRD_PARTY_PATH}glog/include") include_directories("${PADDLE_LIB_THIRD_PARTY_PATH}gflags/include") include_directories("${PADDLE_LIB_THIRD_PARTY_PATH}xxhash/include") -include_directories("${PADDLE_LIB_THIRD_PARTY_PATH}snappy/include") -include_directories("${PADDLE_LIB_THIRD_PARTY_PATH}snappystream/include") include_directories("${PADDLE_LIB_THIRD_PARTY_PATH}zlib/include") include_directories("${PADDLE_LIB}/third_party/boost") include_directories("${PADDLE_LIB}/third_party/eigen3") -link_directories("${PADDLE_LIB_THIRD_PARTY_PATH}snappy/lib") -link_directories("${PADDLE_LIB_THIRD_PARTY_PATH}snappystream/lib") link_directories("${PADDLE_LIB_THIRD_PARTY_PATH}zlib/lib") link_directories("${PADDLE_LIB_THIRD_PARTY_PATH}protobuf/lib") link_directories("${PADDLE_LIB_THIRD_PARTY_PATH}glog/lib") @@ -127,12 +123,12 @@ if (NOT WIN32) set(EXTERNAL_LIB "-lrt -ldl -lpthread") set(DEPS ${DEPS} ${MATH_LIB} ${MKLDNN_LIB} ${NGRAPH_LIB} - glog gflags protobuf snappystream snappy z xxhash + glog gflags protobuf z xxhash ${EXTERNAL_LIB}) else() set(DEPS ${DEPS} ${MATH_LIB} ${MKLDNN_LIB} - glog gflags_static libprotobuf snappy zlibstatic xxhash snappystream ${EXTERNAL_LIB}) + glog gflags_static libprotobuf zlibstatic xxhash ${EXTERNAL_LIB}) set(DEPS ${DEPS} libcmt shlwapi.lib) endif(NOT WIN32) diff --git a/paddle/fluid/operators/reader/CMakeLists.txt b/paddle/fluid/operators/reader/CMakeLists.txt index 616901399f..f61af33329 100644 --- a/paddle/fluid/operators/reader/CMakeLists.txt +++ b/paddle/fluid/operators/reader/CMakeLists.txt @@ -20,14 +20,7 @@ endfunction() cc_library(py_reader SRCS py_reader.cc DEPS reader) cc_library(buffered_reader SRCS buffered_reader.cc DEPS reader simple_threadpool) -reader_library(open_files_op SRCS open_files_op.cc DEPS buffered_reader) -reader_library(create_random_data_generator_op SRCS create_random_data_generator_op.cc) -reader_library(create_shuffle_reader_op SRCS create_shuffle_reader_op.cc) -reader_library(create_batch_reader_op SRCS create_batch_reader_op.cc) -reader_library(create_recordio_file_reader_op SRCS create_recordio_file_reader_op.cc) reader_library(create_double_buffer_reader_op SRCS create_double_buffer_reader_op.cc DEPS buffered_reader) -reader_library(create_multi_pass_reader_op SRCS create_multi_pass_reader_op.cc) -reader_library(create_custom_reader_op SRCS create_custom_reader_op.cc) reader_library(create_py_reader_op SRCS create_py_reader_op.cc DEPS py_reader) cc_test(reader_blocking_queue_test SRCS reader_blocking_queue_test.cc) diff --git a/paddle/fluid/operators/reader/create_batch_reader_op.cc b/paddle/fluid/operators/reader/create_batch_reader_op.cc deleted file mode 100644 index f771cebd0c..0000000000 --- a/paddle/fluid/operators/reader/create_batch_reader_op.cc +++ /dev/null @@ -1,151 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "paddle/fluid/operators/reader/reader_op_registry.h" - -namespace paddle { -namespace operators { -namespace reader { - -class BatchReader : public framework::DecoratedReader { - public: - BatchReader(const std::shared_ptr& reader, int batch_size, - bool discard_leftover) - : DecoratedReader(reader), - batch_size_(static_cast(batch_size)), - discard_leftover_(discard_leftover) { - buffer_.reserve(batch_size_); - } - - void ReadNextImpl(std::vector* out) override; - - private: - size_t batch_size_; - bool discard_leftover_; - std::vector> buffer_; -}; - -class CreateBatchReaderOp : public framework::OperatorBase { - public: - using framework::OperatorBase::OperatorBase; - - private: - void RunImpl(const framework::Scope& scope, - const platform::Place& dev_place) const override { - auto* out = scope.FindVar(Output("Out")) - ->template GetMutable(); - if (out->Get() != nullptr) { - return; - } - const auto& underlying_reader = scope.FindVar(Input("UnderlyingReader")) - ->Get(); - out->Reset(framework::MakeDecoratedReader( - underlying_reader, Attr("batch_size"), - Attr("discard_leftover"))); - } -}; - -class CreateBatchReaderOpMaker : public DecoratedReaderMakerBase { - protected: - void Apply() override { - AddAttr("batch_size", - "How many instances the batch reader yields each time.") - .GreaterThan(0); - AddAttr("discard_leftover", - "If true, the leftover instances that are not enough for a " - "new batch will be discarded.") - .SetDefault(true); - AddComment(R"DOC( - CreateBatchReader Operator - - A batch reader takes another reader as its 'underlying reader', - gathers the underlying reader's outputs and then yields them in batches. - )DOC"); - } -}; - -void BatchReader::ReadNextImpl(std::vector* out) { - buffer_.clear(); - buffer_.reserve(batch_size_); - for (size_t i = 0; i < batch_size_; ++i) { - buffer_.push_back(std::vector()); - reader_->ReadNext(&buffer_.back()); - if (buffer_.back().empty()) { - buffer_.pop_back(); - break; - } - } - if (discard_leftover_ && buffer_.size() < batch_size_) { - buffer_.clear(); - } - // Concat instances - out->clear(); - if (buffer_.empty()) { - // if buffer_ is empty, the 'out' will return as an empty vector. - return; - } - size_t out_num = buffer_[0].size(); - out->reserve(out_num); - for (size_t j = 0; j < out_num; ++j) { - // Merge shape and check date type - auto batch_type = buffer_[0][j].type(); - framework::DDim batch_shape = buffer_[0][j].dims(); - for (size_t i = 1; i < buffer_.size(); ++i) { - auto ins_type = buffer_[i][j].type(); - framework::DDim ins_shape = buffer_[i][j].dims(); - PADDLE_ENFORCE_EQ(batch_type, ins_type); - PADDLE_ENFORCE_EQ(slice_ddim(batch_shape, 1, batch_shape.size()), - slice_ddim(ins_shape, 1, ins_shape.size())); - PADDLE_ENFORCE_GT(ins_shape[0], 0); - batch_shape[0] += ins_shape[0]; - } - - framework::LoDTensor out_tensor; - out_tensor.Resize(batch_shape); - out_tensor.mutable_data(platform::CPUPlace(), batch_type); - int64_t dst_offset = 0; - - // Merge lod and data - framework::LoD batch_lod; - for (size_t i = 0; i < buffer_.size(); ++i) { - framework::DDim ins_shape = buffer_[i][j].dims(); - framework::LoD ins_lod = buffer_[i][j].lod(); - if (i == 0) { - batch_lod = ins_lod; - } else { - PADDLE_ENFORCE_EQ(batch_lod.size(), ins_lod.size()); - for (size_t level_idx = 0; level_idx < batch_lod.size(); ++level_idx) { - auto& lod_level = batch_lod[level_idx]; - for (size_t k = 1; k < ins_lod[level_idx].size(); ++k) { - lod_level.push_back(ins_lod[level_idx][k] + lod_level.back()); - } - } - } - auto dst = out_tensor.Slice(dst_offset, dst_offset + ins_shape[0]); - TensorCopy(buffer_[i][j], platform::CPUPlace(), &dst); - dst_offset += ins_shape[0]; - } - out_tensor.set_lod(batch_lod); - out->push_back(out_tensor); - } -} - -} // namespace reader -} // namespace operators -} // namespace paddle - -namespace ops = paddle::operators::reader; -REGISTER_DECORATED_READER_OPERATOR(create_batch_reader, - ops::CreateBatchReaderOp, - ops::CreateBatchReaderOpMaker); diff --git a/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc b/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc deleted file mode 100644 index 0a225597d3..0000000000 --- a/paddle/fluid/operators/reader/create_multi_pass_reader_op.cc +++ /dev/null @@ -1,93 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "paddle/fluid/operators/detail/safe_ref.h" -#include "paddle/fluid/operators/reader/reader_op_registry.h" - -namespace paddle { -namespace operators { -namespace reader { - -class MultiPassReader : public framework::DecoratedReader { - public: - MultiPassReader(const std::shared_ptr& reader, int pass_num) - : DecoratedReader(reader), pass_num_(pass_num), pass_count_(0) {} - - void ReadNextImpl(std::vector* out) override { - reader_->ReadNext(out); - if (out->empty() && pass_count_ < pass_num_ - 1) { - reader_->Shutdown(); - reader_->Start(); - reader_->ReadNext(out); - ++pass_count_; - } - } - - private: - void StartImpl() override { - pass_count_ = 0; - reader_->Start(); - } - - int pass_num_; - mutable int pass_count_; -}; - -class CreateMultiPassReaderOp : public framework::OperatorBase { - public: - using framework::OperatorBase::OperatorBase; - - private: - void RunImpl(const framework::Scope& scope, - const platform::Place& dev_place) const override { - auto* out = detail::Ref(scope.FindVar(Output("Out"))) - .GetMutable(); - if (out->Get() != nullptr) { - return; - } - const auto& underlying_reader = scope.FindVar(Input("UnderlyingReader")) - ->Get(); - int pass_num = Attr("pass_num"); - out->Reset(framework::MakeDecoratedReader( - underlying_reader, pass_num)); - } -}; - -class CreateMultiPassReaderOpMaker : public DecoratedReaderMakerBase { - protected: - void Apply() override { - AddAttr("pass_num", "The number of pass to run.").GreaterThan(0); - AddComment(R"DOC( - CreateMultiPassReader Operator - - This operator creates a multi-pass reader. A multi-pass reader - is used to yield data for several pass training continuously. - It takes the number of passes to run as one of its attributes - ('pass_num'), and maintains a pass counter to record how many - passes it has completed. When the underlying reader reaches the - EOF, the multi-pass reader checks whether it has completed training - of the given number of pass. If not, the underlying reader will - be re-initialized and starts a new pass automatically. - )DOC"); - } -}; - -} // namespace reader -} // namespace operators -} // namespace paddle - -namespace ops = paddle::operators::reader; -REGISTER_DECORATED_READER_OPERATOR(create_multi_pass_reader, - ops::CreateMultiPassReaderOp, - ops::CreateMultiPassReaderOpMaker); diff --git a/paddle/fluid/operators/reader/create_random_data_generator_op.cc b/paddle/fluid/operators/reader/create_random_data_generator_op.cc deleted file mode 100644 index e5c116dfcd..0000000000 --- a/paddle/fluid/operators/reader/create_random_data_generator_op.cc +++ /dev/null @@ -1,107 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "paddle/fluid/operators/reader/reader_op_registry.h" - -namespace paddle { -namespace operators { -namespace reader { - -template -class RandomDataGenerator : public framework::FileReader { - public: - RandomDataGenerator(const std::vector& shapes, float low, - float high) - : framework::FileReader(), low_(low), high_(high), shapes_(shapes) { - PADDLE_ENFORCE_LE(low, high, - "'low' shouldn't be greater than 'high'.(%f vs %f)", low, - high); - unsigned int seed = std::random_device()(); - engine_.seed(seed); - dist_ = std::uniform_real_distribution(low_, high_); - } - - void ReadNextImpl(std::vector* out) override { - out->clear(); - out->reserve(shapes_.size()); - for (const framework::DDim& shape : shapes_) { - PADDLE_ENFORCE_GE( - shape.size(), 2, - "The rank of reader's output data should be 2 at least.(Now it's %d)", - shape.size()); - framework::LoDTensor out_tensor; - out_tensor.Resize(shape); - T* data = out_tensor.mutable_data(platform::CPUPlace()); - int64_t numel = framework::product(shape); - for (int64_t i = 0; i < numel; ++i) { - data[i] = dist_(engine_); - } - out->push_back(out_tensor); - } - } - - private: - float low_; - float high_; - std::minstd_rand engine_; - std::uniform_real_distribution dist_; - std::vector shapes_; -}; - -template -class CreateRandomDataGeneratorOp : public framework::OperatorBase { - public: - using framework::OperatorBase::OperatorBase; - - private: - void RunImpl(const framework::Scope& scope, - const platform::Place& dev_place) const override { - const auto& shape_concat = Attr>("shape_concat"); - const auto& ranks = Attr>("ranks"); - PADDLE_ENFORCE(!shape_concat.empty() && !ranks.empty()); - PADDLE_ENFORCE_EQ(std::accumulate(ranks.begin(), ranks.end(), 0), - static_cast(shape_concat.size()), - "The accumulate of all ranks should be equal to the " - "shape concat's length."); - std::vector shapes = RestoreShapes(shape_concat, ranks); - auto* out = scope.FindVar(Output("Out")) - ->template GetMutable(); - out->Reset(std::make_shared>( - shapes, Attr("low"), Attr("high"))); - } -}; - -class CreateRandomDataGeneratorOpMaker : public FileReaderMakerBase { - protected: - void Apply() override { - AddAttr("low", "The lower bound of reader's uniform distribution."); - AddAttr("high", "The upper bound of reader's uniform distribution."); - AddComment(R"DOC( - CreateRandomDataGenerator Operator - - This Op creates a random reader. - The reader generates random data instead of really reading from files. - Generated data follow an uniform distribution between 'low' and 'high'. - )DOC"); - } -}; - -} // namespace reader -} // namespace operators -} // namespace paddle - -namespace ops = paddle::operators::reader; -REGISTER_FILE_READER_OPERATOR(create_random_data_generator, - ops::CreateRandomDataGeneratorOp, - ops::CreateRandomDataGeneratorOpMaker); diff --git a/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc b/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc deleted file mode 100644 index d7a048257f..0000000000 --- a/paddle/fluid/operators/reader/create_recordio_file_reader_op.cc +++ /dev/null @@ -1,93 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "paddle/fluid/operators/reader/reader_op_registry.h" -#include "paddle/fluid/platform/lock_guard_ptr.h" -#include "paddle/fluid/recordio/scanner.h" - -namespace paddle { -namespace operators { -namespace reader { -template -class RecordIOFileReader : public framework::FileReader { - public: - explicit RecordIOFileReader(const std::string& filename) - : scanner_(filename), - dev_ctx_(*platform::DeviceContextPool::Instance().Get( - platform::CPUPlace())) { - if (ThreadSafe) { - mutex_.reset(new std::mutex()); - } - LOG(INFO) << "Creating file reader" << filename; - } - - protected: - void ReadNextImpl(std::vector* out) override { - platform::LockGuardPtr guard(mutex_); - bool ok = framework::ReadFromRecordIO(&scanner_, dev_ctx_, out); - if (!ok) { - out->clear(); - } - } - - void StartImpl() override { scanner_.Reset(); } - - private: - std::unique_ptr mutex_; - recordio::Scanner scanner_; - const platform::DeviceContext& dev_ctx_; -}; - -class CreateRecordIOReaderOp : public framework::OperatorBase { - public: - using framework::OperatorBase::OperatorBase; - - private: - void RunImpl(const framework::Scope& scope, - const platform::Place& dev_place) const override { - std::string filename = Attr("filename"); - auto* out = scope.FindVar(Output("Out")) - ->template GetMutable(); - - out->Reset(std::make_shared>(filename)); - } -}; - -class CreateRecordIOReaderOpMaker : public FileReaderMakerBase { - protected: - void Apply() override { - AddAttr( - "filename", - "The filename of record file. This file will given to reader."); - AddComment(R"DOC( -Open a recordio file and return the reader object. The returned reader object -is thread-safe. - -NOTE: This is a very low-level API. It is used for debugging data file or -training. Please use `open_files` instead of this API for production usage. - )DOC"); - } -}; - -} // namespace reader -} // namespace operators -} // namespace paddle - -namespace reader = paddle::operators::reader; - -REGISTER_FILE_READER_OPERATOR(create_recordio_file_reader, - reader::CreateRecordIOReaderOp, - reader::CreateRecordIOReaderOpMaker); - -REGISTER_FILE_READER(recordio, reader::RecordIOFileReader); diff --git a/paddle/fluid/operators/reader/create_shuffle_reader_op.cc b/paddle/fluid/operators/reader/create_shuffle_reader_op.cc deleted file mode 100644 index 3f72890a7c..0000000000 --- a/paddle/fluid/operators/reader/create_shuffle_reader_op.cc +++ /dev/null @@ -1,124 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include "glog/logging.h" -#include "paddle/fluid/operators/detail/safe_ref.h" -#include "paddle/fluid/operators/reader/reader_op_registry.h" - -namespace paddle { -namespace operators { -namespace reader { - -class ShuffleReader : public framework::DecoratedReader { - public: - ShuffleReader(const std::shared_ptr& reader, size_t buffer_size, - size_t seed = 0) - : DecoratedReader(reader), buffer_size_(buffer_size), seed_(seed) { - VLOG(10) << "Create shuffle reader of " << reader_; - if (seed_ == 0) { - std::random_device device; - seed_ = device(); - } - ReloadBuffer(); - } - - void ReadNextImpl(std::vector* out) override { - out->clear(); - if (iteration_pos_ >= buffer_.size()) { - VLOG(10) << "Resetting shuffle buffer"; - ReloadBuffer(); - if (buffer_.empty()) { - return; - } - } - *out = buffer_[iteration_pos_++]; - } - - private: - void ShutdownImpl() override { - reader_->Shutdown(); - buffer_.clear(); - iteration_pos_ = 0; - } - - void StartImpl() override { - reader_->Start(); - ReloadBuffer(); - } - - void ReloadBuffer() { - buffer_.clear(); - buffer_.reserve(buffer_size_); - iteration_pos_ = 0; - for (size_t i = 0; i < buffer_size_; ++i) { - std::vector ins; - reader_->ReadNext(&ins); - if (ins.empty()) { - break; - } - buffer_.emplace_back(ins); - } - std::mt19937 g(seed_); - std::shuffle(buffer_.begin(), buffer_.end(), g); - seed_ = g(); // update seed_; - VLOG(10) << "random buffer size = " << buffer_.size(); - } - - size_t buffer_size_; - std::vector> buffer_; - - size_t iteration_pos_; - size_t seed_; -}; - -class CreateShuffleReaderOp : public framework::OperatorBase { - public: - using framework::OperatorBase::OperatorBase; - - private: - void RunImpl(const framework::Scope& scope, - const platform::Place& dev_place) const override { - auto* out = detail::Ref(scope.FindVar(Output("Out"))) - .GetMutable(); - if (out->Get() != nullptr) { - return; - } - const auto& underlying_reader = scope.FindVar(Input("UnderlyingReader")) - ->Get(); - out->Reset(framework::MakeDecoratedReader( - underlying_reader, static_cast(Attr("buffer_size")))); - } -}; - -class CreateShuffleReaderOpMaker : public DecoratedReaderMakerBase { - protected: - void Apply() override { - AddAttr("buffer_size", "The shuffle buffer size.").GreaterThan(0); - AddComment(R"DOC( - CreateShuffleReader Operator - - A shuffle reader takes another reader as its 'underlying reader' - and yields the underlying reader's outputs in a shuffled order. - )DOC"); - } -}; -} // namespace reader -} // namespace operators -} // namespace paddle - -namespace ops = paddle::operators::reader; -REGISTER_DECORATED_READER_OPERATOR(create_shuffle_reader, - ops::CreateShuffleReaderOp, - ops::CreateShuffleReaderOpMaker); diff --git a/paddle/fluid/operators/reader/open_files_op.cc b/paddle/fluid/operators/reader/open_files_op.cc deleted file mode 100644 index 38223e0699..0000000000 --- a/paddle/fluid/operators/reader/open_files_op.cc +++ /dev/null @@ -1,277 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include // NOLINT -#include "ThreadPool.h" -#include "paddle/fluid/framework/blocking_queue.h" -#include "paddle/fluid/operators/reader/blocking_queue.h" -#include "paddle/fluid/operators/reader/buffered_reader.h" -#include "paddle/fluid/operators/reader/reader_op_registry.h" - -namespace paddle { -namespace operators { -namespace reader { - -class IReaderContainer { - public: - virtual ~IReaderContainer() {} - virtual void AppendReader( - std::unique_ptr&& readers) = 0; - virtual void Stop() = 0; - virtual void Start() = 0; - virtual void ReadNext(std::vector* out) = 0; -}; - -class OrderedReaderContainer : public IReaderContainer { - public: - void AppendReader(std::unique_ptr&& reader) override { - pending_.emplace(std::move(reader)); - } - - void Stop() override { - while (!pending_.empty()) { - MoveFrontPendingToDone(); - } - } - - void Start() override { std::swap(done_, pending_); } - - void ReadNext(std::vector* out) override { - if (!pending_.empty()) { - pending_.front()->ReadNext(out); - if (out->empty()) { - MoveFrontPendingToDone(); - ReadNext(out); - } - } else { - out->clear(); - } - } - - private: - void MoveFrontPendingToDone() { - pending_.front()->Shutdown(); - pending_.front()->Start(); - done_.emplace(move(pending_.front())); - pending_.pop(); - } - - std::queue> pending_; - std::queue> done_; -}; - -class PreemptiveReaderContainer : public IReaderContainer { - using ReaderList = std::list>; - - struct FutureItem { - std::vector data_; - ReaderList::iterator reader_it_; - std::exception_ptr exception_; - }; - - using FutureList = std::list>; - - public: - explicit PreemptiveReaderContainer(size_t thread_num) : pool_(thread_num) {} - - void Stop() override { - if (!pending_.empty()) { - for (auto& reader : pending_) { - reader->Shutdown(); - } - for (auto& fu : futures_) { - fu.wait(); - } - futures_.clear(); - for (auto& reader : pending_) { - reader->Start(); - done_.emplace_back(std::move(reader)); - } - pending_.clear(); - bool timeout; - complete_queue_.PopAll(1000, &timeout); - PADDLE_ENFORCE(!timeout); - } - } - - void Start() override { - for (auto& reader : done_) { - AppendReader(std::move(reader)); - } - done_.clear(); - } - - void ReadNext(std::vector* out) override { - if (!pending_.empty()) { - auto future_it = complete_queue_.Pop(); - FutureItem item = future_it->get(); - if (item.exception_) { - for (auto it = futures_.begin(); it != futures_.end(); ++it) { - if (it != future_it) { - it->wait(); // Wait all other threads complete. - } - } - std::rethrow_exception(item.exception_); - - } else if (item.data_.empty()) { // reader done. - done_.emplace_back(std::move(*item.reader_it_)); - pending_.erase(item.reader_it_); - futures_.erase(future_it); - ReadNext(out); - } else { - *out = item.data_; - // continue read async - ReadAsync(item.reader_it_, &future_it); - } - } else { - out->clear(); - } - } - - private: - void AppendReader(std::unique_ptr&& reader) override { - pending_.emplace_back(std::move(reader)); - auto reader_it = pending_.end(); - --reader_it; - - futures_.emplace_back(); - auto future_it = futures_.end(); - --future_it; - - ReadAsync(reader_it, &future_it); - } - - void ReadAsync(const ReaderList::iterator& reader_it, - FutureList::iterator* future_it_ptr) { - auto& future_it = *future_it_ptr; - *future_it = pool_.enqueue([reader_it, future_it, this] { - try { - FutureItem item; - item.reader_it_ = reader_it; - (*reader_it)->ReadNext(&item.data_); - if (item.data_.empty()) { - (*reader_it)->Shutdown(); - (*reader_it)->Start(); - } - complete_queue_.Push(future_it); - return item; - } catch (...) { - FutureItem item; - item.exception_ = std::current_exception(); - complete_queue_.Push(future_it); - return item; - } - }); - } - - FutureList futures_; - ThreadPool pool_; - framework::BlockingQueue complete_queue_; - std::list> pending_; - std::list> done_; -}; - -class MultiFileReader : public framework::ReaderBase { - public: - MultiFileReader(const std::vector& file_names, - std::unique_ptr&& container) - : container_(std::move(container)) { - for (auto& fn : file_names) { - container_->AppendReader(CreateReaderByFileName(fn)); - } - } - - ~MultiFileReader() { container_->Stop(); } - - protected: - void ReadNextImpl(std::vector* out) override { - container_->ReadNext(out); - } - void ShutdownImpl() override { container_->Stop(); } - void StartImpl() override { container_->Start(); } - - private: - std::unique_ptr container_; -}; - -class OpenFilesOp : public framework::OperatorBase { - public: - using framework::OperatorBase::OperatorBase; - - private: - void RunImpl(const framework::Scope& scope, - const platform::Place& dev_place) const override { - const auto& shape_concat = Attr>("shape_concat"); - const auto& ranks = Attr>("ranks"); - PADDLE_ENFORCE(!shape_concat.empty() && !ranks.empty()); - PADDLE_ENFORCE_EQ(std::accumulate(ranks.begin(), ranks.end(), 0), - static_cast(shape_concat.size()), - "The accumulate of all ranks should be equal to the " - "shape concat's length."); - const auto& file_names = Attr>("file_names"); - PADDLE_ENFORCE(!file_names.empty(), "No file to be read!"); - bool is_test = Attr("is_test"); - - auto* out = scope.FindVar(Output("Out")) - ->template GetMutable(); - std::unique_ptr container; - - if (is_test) { - container.reset(new OrderedReaderContainer()); - } else { - container.reset(new PreemptiveReaderContainer( - static_cast(Attr("thread_num")))); - } - - std::shared_ptr reader( - new MultiFileReader(file_names, std::move(container))); - auto buffer_size = Attr("buffer_size"); - if (buffer_size > 1) { - reader = framework::MakeDecoratedReader( - reader, platform::CPUPlace(), buffer_size); - } - out->Reset(reader); - } -}; - -class OpenFilesOpMaker : public FileReaderMakerBase { - protected: - void Apply() override { - AddAttr>("file_names", "Files to be read."); - AddAttr("is_test", "Used for testing data.").SetDefault(false); - - AddComment(R"DOC( - OpenFiles Operator - - An OpenFilesOp creates a MultiFileReader, which is able to - read data multi-threaded from multiple files. - )DOC"); - AddAttr("thread_num", - "The maximal concurrent prefetch thread number. Used only " - "when is_test = False"); - AddAttr("buffer_size", "The reading buffer of these files.") - .GreaterThan(0); - } -}; - -} // namespace reader -} // namespace operators -} // namespace paddle - -namespace reader = paddle::operators::reader; - -REGISTER_FILE_READER_OPERATOR(open_files, reader::OpenFilesOp, - reader::OpenFilesOpMaker); diff --git a/paddle/fluid/pybind/CMakeLists.txt b/paddle/fluid/pybind/CMakeLists.txt index 8ee03c7982..ff35ca6ca4 100644 --- a/paddle/fluid/pybind/CMakeLists.txt +++ b/paddle/fluid/pybind/CMakeLists.txt @@ -15,7 +15,6 @@ set(PYBIND_SRCS exception.cc protobuf.cc const_value.cc - recordio.cc reader_py.cc fleet_wrapper_py.cc nccl_wrapper_py.cc diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index c0aeb82b7b..2303c2e665 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -66,7 +66,6 @@ limitations under the License. */ #include "paddle/fluid/pybind/protobuf.h" #include "paddle/fluid/pybind/pybind.h" // NOLINT #include "paddle/fluid/pybind/reader_py.h" -#include "paddle/fluid/pybind/recordio.h" #include "paddle/fluid/pybind/tensor_py.h" #include "paddle/fluid/string/to_string.h" #ifdef PADDLE_WITH_CUDA @@ -1662,7 +1661,6 @@ All parameter, weight, gradient are variables in Paddle. return self.Run(fetch_tensors); }); - BindRecordIOWriter(&m); BindFleetWrapper(&m); #ifndef _WIN32 BindNCCLWrapper(&m); diff --git a/paddle/fluid/pybind/recordio.cc b/paddle/fluid/pybind/recordio.cc deleted file mode 100644 index 32caf4bed9..0000000000 --- a/paddle/fluid/pybind/recordio.cc +++ /dev/null @@ -1,88 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "paddle/fluid/pybind/recordio.h" - -#include -#include -#include - -#include "paddle/fluid/framework/lod_tensor.h" -#include "paddle/fluid/recordio/writer.h" - -namespace paddle { -namespace pybind { - -namespace { - -class RecordIOWriter { - public: - RecordIOWriter(const std::string& filename, recordio::Compressor compressor, - size_t max_num_record) - : closed_(false), - stream_(filename, std::ios::binary), - writer_(&stream_, compressor, max_num_record) {} - - void AppendTensor(const framework::LoDTensor& tensor) { - tensors_.push_back(tensor); - } - - void CompleteAppendTensor() { - auto& ctx = - *platform::DeviceContextPool::Instance().Get(platform::CPUPlace()); - framework::WriteToRecordIO(&writer_, tensors_, ctx); - tensors_.clear(); - } - - void Close() { - PADDLE_ENFORCE(tensors_.empty()); - writer_.Flush(); - stream_.close(); - closed_ = true; - } - - ~RecordIOWriter() { - if (!closed_) { - Close(); - } - } - - private: - bool closed_; - std::vector tensors_; - std::ofstream stream_; - recordio::Writer writer_; -}; - -} // namespace - -void BindRecordIOWriter(py::module* m) { - py::class_ writer(*m, "RecordIOWriter", ""); - py::enum_(writer, "Compressor", "") - .value("Snappy", recordio::Compressor::kSnappy) - .value("NoCompress", recordio::Compressor::kNoCompress); - - writer - .def("__init__", - [](RecordIOWriter& self, const std::string& filename, - recordio::Compressor compressor, size_t max_num_record) { - new (&self) RecordIOWriter(filename, compressor, max_num_record); - }) - .def("append_tensor", &RecordIOWriter::AppendTensor) - .def("complete_append_tensor", &RecordIOWriter::CompleteAppendTensor) - .def("close", &RecordIOWriter::Close); -} - -} // namespace pybind -} // namespace paddle diff --git a/paddle/fluid/pybind/recordio.h b/paddle/fluid/pybind/recordio.h deleted file mode 100644 index 2555f9b719..0000000000 --- a/paddle/fluid/pybind/recordio.h +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once -#include "pybind11/pybind11.h" -#include "pybind11/stl.h" - -namespace py = pybind11; - -namespace paddle { -namespace pybind { - -void BindRecordIOWriter(py::module* m); - -} // namespace pybind -} // namespace paddle diff --git a/paddle/fluid/recordio/CMakeLists.txt b/paddle/fluid/recordio/CMakeLists.txt deleted file mode 100644 index 92e97a6c85..0000000000 --- a/paddle/fluid/recordio/CMakeLists.txt +++ /dev/null @@ -1,9 +0,0 @@ -# internal library. -cc_library(header SRCS header.cc) -cc_test(header_test SRCS header_test.cc DEPS header) -cc_library(chunk SRCS chunk.cc DEPS snappystream snappy header zlib) -cc_test(chunk_test SRCS chunk_test.cc DEPS chunk) -cc_library(writer SRCS writer.cc DEPS chunk) -cc_library(scanner SRCS scanner.cc DEPS chunk) -cc_test(writer_scanner_test SRCS writer_scanner_test.cc DEPS writer scanner) -cc_library(recordio DEPS chunk header writer scanner) diff --git a/paddle/fluid/recordio/README.md b/paddle/fluid/recordio/README.md deleted file mode 100644 index ef99c0cf0f..0000000000 --- a/paddle/fluid/recordio/README.md +++ /dev/null @@ -1,13 +0,0 @@ -## Background - -The RecordIO file format is a container for records. This package is a C++ implementation of https://github.com/paddlepaddle/recordio, which originates from https://github.com/wangkuiyi/recordio. - -## Fault-tolerant Writing - -For the initial design purpose of RecordIO within Google, which was logging, RecordIO groups record into *chunks*, whose header contains an MD5 hash of the chunk. A process that writes logs is supposed to call the Writer interface to add records. Once the writer accumulates a handful of them, it groups a chunk, put the MD5 into the chunk header, and appends the chunk to the file. In the event the process crashes unexpected, the last chunk in the RecordIO file could be incomplete/corrupt. The RecordIO reader is able to recover from these errors when the process restarts by identifying incomplete chucks and skipping over them. - -## Reading Ranges - -A side-effect of chunks is to make it easy to indexing records while reading, thus allows us to read a range of successive records. This is good for distributed log process, where each MapReduce task handles only part of records in a big RecordIO file. - -The procedure that creates the index starts from reading the header of the first chunk. It indexes the offset (0) and the size of the chunk, and skips to the header of the next chunk by calling the `fseek` API. Please be aware that most distributed filesystems and all POSIX-compatible local filesystem provides `fseek`, and makes sure that `fseek` runs much faster than `fread`. This procedure generates a map from chunks to their offsets, which allows the readers is to locate and read a range of records. diff --git a/paddle/fluid/recordio/chunk.cc b/paddle/fluid/recordio/chunk.cc deleted file mode 100644 index 6c65d9160c..0000000000 --- a/paddle/fluid/recordio/chunk.cc +++ /dev/null @@ -1,174 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "paddle/fluid/recordio/chunk.h" - -#include -#include -#include -#include - -#include "paddle/fluid/platform/enforce.h" -#include "snappystream.hpp" - -namespace paddle { -namespace recordio { -constexpr size_t kMaxBufSize = 1024; - -/** - * Read Stream by a fixed sized buffer. - * @param in input stream - * @param limit read at most `limit` bytes from input stream. 0 means no limit - * @param callback A function object with (const char* buf, size_t size) -> void - * as its type. - */ -template -static void ReadStreamByBuf(std::istream& in, size_t limit, Callback callback) { - char buf[kMaxBufSize]; - std::streamsize actual_size; - size_t counter = 0; - size_t actual_max; - while (!in.eof() || - (limit != 0 && counter >= limit)) { // End of file or reach limit - actual_max = - limit != 0 ? std::min(limit - counter, kMaxBufSize) : kMaxBufSize; - in.read(buf, actual_max); - actual_size = in.gcount(); - if (actual_size == 0) { - break; - } - callback(buf, actual_size); - if (limit != 0) { - counter += actual_size; - } - } - in.clear(); // unset eof state -} - -/** - * Copy stream in to another stream - */ -static void PipeStream(std::istream& in, std::ostream& os) { - ReadStreamByBuf(in, 0, - [&os](const char* buf, size_t len) { os.write(buf, len); }); -} - -/** - * Calculate CRC32 from an input stream. - */ -static uint32_t Crc32Stream(std::istream& in, size_t limit = 0) { - uint32_t crc = static_cast(crc32(0, nullptr, 0)); - ReadStreamByBuf(in, limit, [&crc](const char* buf, size_t len) { - crc = static_cast(crc32(crc, reinterpret_cast(buf), - static_cast(len))); - }); - return crc; -} - -bool Chunk::Write(std::ostream& os, Compressor ct) const { - // NOTE(dzhwinter): don't check records.numBytes instead, because - // empty records are allowed. - if (records_.empty()) { - return false; - } - std::stringstream sout; - std::unique_ptr compressed_stream; - switch (ct) { - case Compressor::kNoCompress: - break; - case Compressor::kSnappy: - compressed_stream.reset(new snappy::oSnappyStream(sout)); - break; - default: - PADDLE_THROW("Not implemented"); - } - - std::ostream& buf_stream = compressed_stream ? *compressed_stream : sout; - - for (auto& record : records_) { - size_t sz = record.size(); - buf_stream.write(reinterpret_cast(&sz), sizeof(uint32_t)) - .write(record.data(), record.size()); - } - - if (compressed_stream) { - compressed_stream.reset(); - } - - sout.seekg(0, std::ios::end); - uint32_t len = static_cast(sout.tellg()); - sout.seekg(0, std::ios::beg); - uint32_t crc = Crc32Stream(sout); - Header hdr(static_cast(records_.size()), crc, ct, len); - hdr.Write(os); - sout.seekg(0, std::ios::beg); - sout.clear(); - PipeStream(sout, os); - return true; -} - -bool Chunk::Parse(std::istream& sin) { - ChunkParser parser(sin); - if (!parser.Init()) { - return false; - } - Clear(); - while (parser.HasNext()) { - Add(parser.Next()); - } - return true; -} - -ChunkParser::ChunkParser(std::istream& sin) : in_(sin) {} -bool ChunkParser::Init() { - pos_ = 0; - bool ok = header_.Parse(in_); - if (!ok) { - return ok; - } - auto beg_pos = in_.tellg(); - uint32_t crc = Crc32Stream(in_, header_.CompressSize()); - PADDLE_ENFORCE_EQ(header_.Checksum(), crc); - in_.seekg(beg_pos, in_.beg); - - switch (header_.CompressType()) { - case Compressor::kNoCompress: - break; - case Compressor::kSnappy: - compressed_stream_.reset(new snappy::iSnappyStream(in_)); - break; - default: - PADDLE_THROW("Not implemented"); - } - return true; -} - -bool ChunkParser::HasNext() const { return pos_ < header_.NumRecords(); } - -std::string ChunkParser::Next() { - if (!HasNext()) { - return ""; - } - ++pos_; - std::istream& stream = compressed_stream_ ? *compressed_stream_ : in_; - uint32_t rec_len; - stream.read(reinterpret_cast(&rec_len), sizeof(uint32_t)); - std::string buf; - buf.resize(rec_len); - stream.read(&buf[0], rec_len); - PADDLE_ENFORCE_EQ(rec_len, stream.gcount()); - return buf; -} -} // namespace recordio -} // namespace paddle diff --git a/paddle/fluid/recordio/chunk.h b/paddle/fluid/recordio/chunk.h deleted file mode 100644 index cfb954a591..0000000000 --- a/paddle/fluid/recordio/chunk.h +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once -#include -#include -#include - -#include "paddle/fluid/platform/macros.h" -#include "paddle/fluid/recordio/header.h" - -namespace paddle { -namespace recordio { - -// A Chunk contains the Header and optionally compressed records. -class Chunk { - public: - Chunk() : num_bytes_(0) {} - void Add(const std::string& buf) { - num_bytes_ += buf.size(); - records_.emplace_back(buf); - } - // dump the chunk into w, and clears the chunk and makes it ready for - // the next add invocation. - bool Write(std::ostream& fo, Compressor ct) const; - void Clear() { - records_.clear(); - num_bytes_ = 0; - } - - // returns true if ok, false if eof - bool Parse(std::istream& sin); - size_t NumBytes() const { return num_bytes_; } - size_t NumRecords() const { return records_.size(); } - const std::string& Record(int i) const { return records_[i]; } - - bool Empty() const { return records_.empty(); } - - private: - std::vector records_; - // sum of record lengths in bytes. - size_t num_bytes_; - DISABLE_COPY_AND_ASSIGN(Chunk); -}; - -class ChunkParser { - public: - explicit ChunkParser(std::istream& sin); - - bool Init(); - std::string Next(); - bool HasNext() const; - - private: - Header header_; - uint32_t pos_{0}; - std::istream& in_; - std::unique_ptr compressed_stream_; -}; - -} // namespace recordio -} // namespace paddle diff --git a/paddle/fluid/recordio/chunk_test.cc b/paddle/fluid/recordio/chunk_test.cc deleted file mode 100644 index 5177475c01..0000000000 --- a/paddle/fluid/recordio/chunk_test.cc +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "paddle/fluid/recordio/chunk.h" - -#include - -#include "gtest/gtest.h" - -TEST(Chunk, SaveLoad) { - paddle::recordio::Chunk ch; - ch.Add(std::string("12345", 6)); - ch.Add(std::string("123", 4)); - std::stringstream ss; - ch.Write(ss, paddle::recordio::Compressor::kNoCompress); - ss.seekg(0); - ch.Parse(ss); - ASSERT_EQ(ch.NumBytes(), 10U); -} - -TEST(Chunk, Compressor) { - paddle::recordio::Chunk ch; - ch.Add(std::string("12345", 6)); - ch.Add(std::string("123", 4)); - ch.Add(std::string("123", 4)); - ch.Add(std::string("123", 4)); - std::stringstream ss; - ch.Write(ss, paddle::recordio::Compressor::kSnappy); - std::stringstream ss2; - ch.Write(ss2, paddle::recordio::Compressor::kNoCompress); - ASSERT_LE(ss.tellp(), ss2.tellp()); // Compress should contain less data; - - ch.Clear(); - ch.Parse(ss); - ASSERT_EQ(ch.NumBytes(), 18ul); -} diff --git a/paddle/fluid/recordio/header.cc b/paddle/fluid/recordio/header.cc deleted file mode 100644 index c4822329a4..0000000000 --- a/paddle/fluid/recordio/header.cc +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "paddle/fluid/recordio/header.h" - -#include - -#include "paddle/fluid/platform/enforce.h" - -namespace paddle { -namespace recordio { - -Header::Header() - : num_records_(0), - checksum_(0), - compressor_(Compressor::kNoCompress), - compress_size_(0) {} - -Header::Header(uint32_t num, uint32_t sum, Compressor c, uint32_t cs) - : num_records_(num), checksum_(sum), compressor_(c), compress_size_(cs) {} - -bool Header::Parse(std::istream& is) { - uint32_t magic; - is.read(reinterpret_cast(&magic), sizeof(uint32_t)); - size_t read_size = is.gcount(); - if (read_size < sizeof(uint32_t)) { - return false; - } - PADDLE_ENFORCE_EQ(magic, kMagicNumber); - - is.read(reinterpret_cast(&num_records_), sizeof(uint32_t)) - .read(reinterpret_cast(&checksum_), sizeof(uint32_t)) - .read(reinterpret_cast(&compressor_), sizeof(uint32_t)) - .read(reinterpret_cast(&compress_size_), sizeof(uint32_t)); - return true; -} - -void Header::Write(std::ostream& os) const { - os.write(reinterpret_cast(&kMagicNumber), sizeof(uint32_t)) - .write(reinterpret_cast(&num_records_), sizeof(uint32_t)) - .write(reinterpret_cast(&checksum_), sizeof(uint32_t)) - .write(reinterpret_cast(&compressor_), sizeof(uint32_t)) - .write(reinterpret_cast(&compress_size_), sizeof(uint32_t)); -} - -std::ostream& operator<<(std::ostream& os, Header h) { - os << "Header: " << h.NumRecords() << ", " << h.Checksum() << ", " - << static_cast(h.CompressType()) << ", " << h.CompressSize(); - return os; -} - -bool operator==(Header l, Header r) { - return l.NumRecords() == r.NumRecords() && l.Checksum() == r.Checksum() && - l.CompressType() == r.CompressType() && - l.CompressSize() == r.CompressSize(); -} - -} // namespace recordio -} // namespace paddle diff --git a/paddle/fluid/recordio/header.h b/paddle/fluid/recordio/header.h deleted file mode 100644 index 245425990b..0000000000 --- a/paddle/fluid/recordio/header.h +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include - -namespace paddle { -namespace recordio { - -// MagicNumber for memory checking -constexpr uint32_t kMagicNumber = 0x01020304; - -enum class Compressor : uint32_t { - // NoCompression means writing raw chunk data into files. - // With other choices, chunks are compressed before written. - kNoCompress = 0, - // Snappy had been the default compressing algorithm widely - // used in Google. It compromises between speech and - // compression ratio. - kSnappy = 1, - // Gzip is a well-known compression algorithm. It is - // recommmended only you are looking for compression ratio. - kGzip = 2, -}; - -// Header is the metadata of Chunk -class Header { - public: - Header(); - Header(uint32_t num, uint32_t sum, Compressor ct, uint32_t cs); - - void Write(std::ostream& os) const; - - // returns true if OK, false if eof - bool Parse(std::istream& is); - - uint32_t NumRecords() const { return num_records_; } - uint32_t Checksum() const { return checksum_; } - Compressor CompressType() const { return compressor_; } - uint32_t CompressSize() const { return compress_size_; } - - private: - uint32_t num_records_; - uint32_t checksum_; - Compressor compressor_; - uint32_t compress_size_; -}; - -// Allow Header Loggable -std::ostream& operator<<(std::ostream& os, Header h); -bool operator==(Header l, Header r); - -} // namespace recordio -} // namespace paddle diff --git a/paddle/fluid/recordio/header_test.cc b/paddle/fluid/recordio/header_test.cc deleted file mode 100644 index 00f1887dc5..0000000000 --- a/paddle/fluid/recordio/header_test.cc +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "paddle/fluid/recordio/header.h" - -#include - -#include "gtest/gtest.h" - -TEST(Recordio, ChunkHead) { - paddle::recordio::Header hdr(0, 1, paddle::recordio::Compressor::kGzip, 3); - std::stringstream ss; - hdr.Write(ss); - ss.seekg(0, std::ios::beg); - paddle::recordio::Header hdr2; - hdr2.Parse(ss); - EXPECT_TRUE(hdr == hdr2); -} diff --git a/paddle/fluid/recordio/scanner.cc b/paddle/fluid/recordio/scanner.cc deleted file mode 100644 index b06c274ada..0000000000 --- a/paddle/fluid/recordio/scanner.cc +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "paddle/fluid/recordio/scanner.h" - -#include -#include - -#include "paddle/fluid/platform/enforce.h" - -namespace paddle { -namespace recordio { - -Scanner::Scanner(std::unique_ptr &&stream) - : stream_(std::move(stream)), parser_(*stream_) { - Reset(); -} - -Scanner::Scanner(const std::string &filename) - : stream_(new std::ifstream(filename, std::ios::in | std::ios::binary)), - parser_(*stream_) { - PADDLE_ENFORCE(static_cast(*stream_), "Cannot open file %s", filename); - Reset(); -} - -void Scanner::Reset() { - stream_->clear(); - stream_->seekg(0, std::ios::beg); - parser_.Init(); -} - -std::string Scanner::Next() { - if (stream_->eof()) { - return ""; - } - - auto res = parser_.Next(); - if (!parser_.HasNext() && HasNext()) { - parser_.Init(); - } - return res; -} - -bool Scanner::HasNext() const { return !stream_->eof(); } -} // namespace recordio -} // namespace paddle diff --git a/paddle/fluid/recordio/scanner.h b/paddle/fluid/recordio/scanner.h deleted file mode 100644 index 0d885dd87a..0000000000 --- a/paddle/fluid/recordio/scanner.h +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include -#include -#include - -#include "paddle/fluid/recordio/chunk.h" - -namespace paddle { -namespace recordio { - -class Scanner { - public: - explicit Scanner(std::unique_ptr&& stream); - - explicit Scanner(const std::string& filename); - - void Reset(); - - std::string Next(); - - bool HasNext() const; - - private: - std::unique_ptr stream_; - ChunkParser parser_; -}; -} // namespace recordio -} // namespace paddle diff --git a/paddle/fluid/recordio/writer.cc b/paddle/fluid/recordio/writer.cc deleted file mode 100644 index 8046f4ff78..0000000000 --- a/paddle/fluid/recordio/writer.cc +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -#include "paddle/fluid/recordio/writer.h" - -#include - -#include "paddle/fluid/platform/enforce.h" - -namespace paddle { -namespace recordio { - -void Writer::Write(const std::string& record) { - cur_chunk_.Add(record); - if (cur_chunk_.NumRecords() >= max_num_records_in_chunk_) { - Flush(); - } -} - -void Writer::Flush() { - cur_chunk_.Write(stream_, compressor_); - cur_chunk_.Clear(); -} - -Writer::~Writer() { - PADDLE_ENFORCE(cur_chunk_.Empty(), "Writer must be flushed when destroy."); -} - -} // namespace recordio -} // namespace paddle diff --git a/paddle/fluid/recordio/writer.h b/paddle/fluid/recordio/writer.h deleted file mode 100644 index ac7e50ee90..0000000000 --- a/paddle/fluid/recordio/writer.h +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -#pragma once - -#include - -#include "paddle/fluid/recordio/chunk.h" -namespace paddle { -namespace recordio { - -class Writer { - public: - Writer(std::ostream* sout, Compressor compressor, - size_t max_num_records_in_chunk = 1000) - : stream_(*sout), - max_num_records_in_chunk_(max_num_records_in_chunk), - compressor_(compressor) {} - - void Write(const std::string& record); - - void Flush(); - - ~Writer(); - - private: - std::ostream& stream_; - size_t max_num_records_in_chunk_; - Chunk cur_chunk_; - Compressor compressor_; -}; - -} // namespace recordio -} // namespace paddle diff --git a/paddle/fluid/recordio/writer_scanner_test.cc b/paddle/fluid/recordio/writer_scanner_test.cc deleted file mode 100644 index 6583df21a2..0000000000 --- a/paddle/fluid/recordio/writer_scanner_test.cc +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include - -#include "gtest/gtest.h" -#include "paddle/fluid/recordio/scanner.h" -#include "paddle/fluid/recordio/writer.h" - -TEST(WriterScanner, Normal) { - std::stringstream* stream = new std::stringstream(); - - { - paddle::recordio::Writer writer(stream, - paddle::recordio::Compressor::kSnappy); - writer.Write("ABC"); - writer.Write("BCD"); - writer.Write("CDE"); - writer.Flush(); - } - - { - stream->seekg(0, std::ios::beg); - std::unique_ptr stream_ptr(stream); - paddle::recordio::Scanner scanner(std::move(stream_ptr)); - ASSERT_TRUE(scanner.HasNext()); - ASSERT_EQ(scanner.Next(), "ABC"); - ASSERT_EQ("BCD", scanner.Next()); - ASSERT_TRUE(scanner.HasNext()); - ASSERT_EQ("CDE", scanner.Next()); - ASSERT_FALSE(scanner.HasNext()); - } -} - -TEST(WriterScanner, TinyChunk) { - std::stringstream* stream = new std::stringstream(); - { - paddle::recordio::Writer writer( - stream, paddle::recordio::Compressor::kNoCompress, 2 /*max chunk num*/); - writer.Write("ABC"); - writer.Write("BCD"); - writer.Write("CDE"); - writer.Write("DEFG"); - writer.Flush(); - } - - { - stream->seekg(0, std::ios::beg); - std::unique_ptr stream_ptr(stream); - paddle::recordio::Scanner scanner(std::move(stream_ptr)); - ASSERT_TRUE(scanner.HasNext()); - ASSERT_EQ(scanner.Next(), "ABC"); - ASSERT_EQ(scanner.Next(), "BCD"); - ASSERT_EQ(scanner.Next(), "CDE"); - ASSERT_EQ(scanner.Next(), "DEFG"); - ASSERT_FALSE(scanner.HasNext()); - } -} diff --git a/python/paddle/dataset/common.py b/python/paddle/dataset/common.py index 58a4c66c20..3567ecfa00 100644 --- a/python/paddle/dataset/common.py +++ b/python/paddle/dataset/common.py @@ -32,7 +32,6 @@ __all__ = [ 'md5file', 'split', 'cluster_files_reader', - 'convert', ] DATA_HOME = os.path.expanduser('~/.cache/paddle/dataset') @@ -205,40 +204,3 @@ def cluster_files_reader(files_pattern, yield line return reader - - -def convert(output_path, reader, line_count, name_prefix): - import recordio - """ - Convert data from reader to recordio format files. - - :param output_path: directory in which output files will be saved. - :param reader: a data reader, from which the convert program will read - data instances. - :param name_prefix: the name prefix of generated files. - :param max_lines_to_shuffle: the max lines numbers to shuffle before - writing. - """ - - assert line_count >= 1 - indx_f = 0 - - def write_data(indx_f, lines): - filename = "%s/%s-%05d" % (output_path, name_prefix, indx_f) - writer = recordio.writer(filename) - for l in lines: - # FIXME(Yancey1989): - # dumps with protocol: pickle.HIGHEST_PROTOCOL - writer.write(pickle.dumps(l)) - writer.close() - - lines = [] - for i, d in enumerate(reader()): - lines.append(d) - if i % line_count == 0 and i >= line_count: - write_data(indx_f, lines) - lines = [] - indx_f += 1 - continue - - write_data(indx_f, lines) diff --git a/python/paddle/fluid/__init__.py b/python/paddle/fluid/__init__.py index dfe58c7e4d..9f7560a6fe 100644 --- a/python/paddle/fluid/__init__.py +++ b/python/paddle/fluid/__init__.py @@ -75,7 +75,6 @@ from . import clip from . import dygraph_grad_clip from . import profiler from . import unique_name -from . import recordio_writer from . import parallel_executor from .parallel_executor import * from . import compiler diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index 88408d6236..cfb2c42fcd 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -32,9 +32,8 @@ from ..unique_name import generate as unique_name import logging __all__ = [ - 'data', 'open_files', 'read_file', 'shuffle', 'batch', 'double_buffer', - 'random_data_generator', 'py_reader', 'create_py_reader_by_data', - 'Preprocessor', 'load' + 'data', 'read_file', 'double_buffer', 'py_reader', + 'create_py_reader_by_data', 'load' ] @@ -362,137 +361,6 @@ def _copy_reader_create_op_(block, op): return new_op -@templatedoc(op_type='create_recordio_file_reader') -def open_recordio_file(filename, - shapes, - lod_levels, - dtypes, - pass_num=1, - for_parallel=True): - """ - ${comment} - - Args: - filename(${filename_type}): ${filename_comment}. - shapes(list): List of tuples which declaring data shapes. - lod_levels(${lod_levels_type}): ${lod_levels_comment}. - dtypes(list): List of strs which declaring data type. - pass_num(int): Number of passes to run. - for_parallel(Bool): Set it as True if you are going to run - subsequent operators in parallel. - - Returns: - ${out_comment}. - - Examples: - - >>> import paddle.fluid as fluid - >>> reader = fluid.layers.io.open_recordio_file( - >>> filename='./data.recordio', - >>> shapes=[(3,224,224), (1,)], - >>> lod_levels=[0, 0], - >>> dtypes=['float32', 'int64']) - >>> # Via the reader, we can use 'read_file' layer to get data: - >>> image, label = fluid.layers.io.read_file(reader) - """ - dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes] - shape_concat = [] - ranks = [] - - for shape in shapes: - shape_concat.extend(shape) - ranks.append(len(shape)) - - var_name = unique_name('open_recordio_file') - - startup_blk = default_startup_program().current_block() - startup_var = startup_blk.create_var(name=var_name) - startup_blk.append_op( - type='create_recordio_file_reader', - outputs={'Out': [startup_var]}, - attrs={ - 'shape_concat': shape_concat, - 'lod_levels': lod_levels, - 'filename': filename, - 'ranks': ranks - }) - - startup_var.desc.set_dtypes(dtypes) - startup_var.persistable = True - main_prog_var = _copy_reader_var_(default_main_program().current_block(), - startup_var) - - if pass_num > 1: - main_prog_var = multi_pass(reader=main_prog_var, pass_num=pass_num) - - return monkey_patch_reader_methods(main_prog_var) - - -def random_data_generator(low, high, shapes, lod_levels, for_parallel=True): - """ - Create a uniform random data generator - - This layer returns a Reader Variable. - Instead of opening a file and reading data from it, this - Reader Variable generates float uniform random data by itself. - It can be used as a dummy reader to test a network without - opening a real file. - - Args: - low(float): The lower bound of data's uniform distribution. - high(float): The upper bound of data's uniform distribution. - shapes(list): List of tuples which declaring data shapes. - lod_levels(list): List of ints which declaring data lod_level. - for_parallel(Bool): Set it as True if you are going to run - subsequent operators in parallel. - - Returns: - Variable: A Reader Variable from which we can get random data. - - Examples: - - .. code-block:: python - - import paddle.fluid as fluid - reader = fluid.layers.random_data_generator( - low=0.0, - high=1.0, - shapes=[[3,224,224], [1]], - lod_levels=[0, 0]) - # Via the reader, we can use 'read_file' layer to get data: - image, label = fluid.layers.read_file(reader) - """ - dtypes = [core.VarDesc.VarType.FP32] * len(shapes) - shape_concat = [] - ranks = [] - - for shape in shapes: - shape_concat.extend(shape) - ranks.append(len(shape)) - - var_name = unique_name('random_data_generator') - - startup_blk = default_startup_program().current_block() - startup_var = startup_blk.create_var(name=var_name) - startup_blk.append_op( - type='create_random_data_generator', - outputs={'Out': [startup_var]}, - attrs={ - 'low': low, - 'high': high, - 'shape_concat': shape_concat, - 'lod_levels': lod_levels, - 'ranks': ranks - }) - - startup_var.desc.set_dtypes(dtypes) - startup_var.persistable = True - main_prog_var = _copy_reader_var_(default_main_program().current_block(), - startup_var) - - return monkey_patch_reader_methods(main_prog_var) - - def _py_reader(capacity, shapes, dtypes, @@ -1006,79 +874,6 @@ def __create_unshared_decorated_reader__(op_type, reader, attrs, name=None): return monkey_patch_reader_methods(new_reader) -def shuffle(reader, buffer_size): - """ - Creates a data reader whose data output is shuffled. - Output from the iterator that created by original reader will be - buffered into shuffle buffer, and then shuffled. The size of shuffle buffer - is determined by argument buf_size. - - Args: - reader(callable): the original reader whose output will be shuffled. - buf_size(int): shuffle buffer size. - - Returns: - callable: the new reader whose output is shuffled. - - Examples: - .. code-block:: python - - import paddle.fluid as fluid - raw_reader = fluid.layers.io.open_files(filenames=['./data1.recordio', - './data2.recordio'], - shapes=[(3,224,224), (1,)], - lod_levels=[0, 0], - dtypes=['float32', 'int64'], - thread_num=2, - buffer_size=2) - batch_reader = fluid.layers.batch(reader=raw_reader, batch_size=5) - shuffle_reader = fluid.layers.shuffle(reader=batch_reader, buffer_size=5000) - """ - return __create_unshared_decorated_reader__( - 'create_shuffle_reader', reader, {'buffer_size': int(buffer_size)}) - - -def batch(reader, batch_size): - """ - This layer is a reader decorator. It takes a reader and adds - 'batching' decoration on it. When reading with the result - decorated reader, output data will be automatically organized - to the form of batches. - - Args: - reader(Variable): The reader to be decorated with 'batching'. - batch_size(int): The batch size. - - Returns: - Variable: The reader which has been decorated with 'batching'. - - Examples: - .. code-block:: python - - import paddle.fluid as fluid - raw_reader = fluid.layers.io.open_files(filenames=['./data1.recordio', - './data2.recordio'], - shapes=[(3,224,224), (1,)], - lod_levels=[0, 0], - dtypes=['float32', 'int64'], - thread_num=2, - buffer_size=2) - batch_reader = fluid.layers.batch(reader=raw_reader, batch_size=5) - - # If we read data with the raw_reader: - # data = fluid.layers.read_file(raw_reader) - # We can only get data instance by instance. - # - # However, if we read data with the batch_reader: - # data = fluid.layers.read_file(batch_reader) - # Each 5 adjacent instances will be automatically combined together - # to become a batch. So what we get('data') is a batch data instead - # of an instance. - """ - return __create_unshared_decorated_reader__( - 'create_batch_reader', reader, {'batch_size': int(batch_size)}) - - def double_buffer(reader, place=None, name=None): """ Wrap a double buffer reader. The data will copy to target place with a @@ -1096,14 +891,15 @@ def double_buffer(reader, place=None, name=None): wrapped reader with double buffer. Examples: - - >>> import paddle.fluid as fluid - >>> reader = fluid.layers.open_files(filenames=['mnist.recordio'], - >>> shapes=[[-1, 784], [-1, 1]], - >>> lod_levels=[0, 0], - >>> dtypes=['float32', 'int64']) - >>> reader = fluid.layers.double_buffer(reader) - >>> img, label = fluid.layers.read_file(reader) + .. code-block:: python + + import paddle.fluid as fluid + reader = fluid.layers.py_reader(capacity=64, + shapes=[(-1, 1, 28, 28), (-1, 1)], + dtypes=['float32', 'int64'], + use_double_buffer=False) + reader = fluid.layers.double_buffer(reader) + image, label = fluid.layers.read_file(reader) """ attrs = dict() if place is not None: @@ -1112,11 +908,6 @@ def double_buffer(reader, place=None, name=None): 'create_double_buffer_reader', reader, attrs, name=name) -def multi_pass(reader, pass_num): - return __create_shared_decorated_reader__( - 'create_multi_pass_reader', reader, {'pass_num': int(pass_num)}) - - def read_file(reader): """ Execute the given reader and get data via it. @@ -1136,14 +927,10 @@ def read_file(reader): .. code-block:: python import paddle.fluid as fluid - data_file = fluid.layers.open_files( - filenames=['mnist.recordio'], - shapes=[(-1, 748), (-1, 1)], - lod_levels=[0, 0], - dtypes=["float32", "int64"]) - data_file = fluid.layers.double_buffer( - fluid.layers.batch(data_file, batch_size=64)) - input, label = fluid.layers.read_file(data_file) + reader = fluid.layers.py_reader(capacity=64, + shapes=[(-1, 1, 28, 28), (-1, 1)], + dtypes=['float32', 'int64']) + image, label = fluid.layers.read_file(reader) """ helper = LayerHelper('read_file') out = [ @@ -1159,113 +946,6 @@ def read_file(reader): return out -class Preprocessor(object): - """ - A block for data pre-processing in reader. - - Args: - reader (Variable): A reader variable. - name (str, default None): The name of the reader. - - Examples: - .. code-block:: python - - import paddle.fluid as fluid - - reader = fluid.layers.io.open_files( - filenames=['./data1.recordio', './data2.recordio'], - shapes=[(3, 224, 224), (1, )], - lod_levels=[0, 0], - dtypes=['float32', 'int64']) - - preprocessor = fluid.layers.io.Preprocessor(reader=reader) - with preprocessor.block(): - img, lbl = preprocessor.inputs() - img_out = img / 2 - lbl_out = lbl + 1 - preprocessor.outputs(img_out, lbl_out) - - data_file = fluid.layers.io.double_buffer(preprocessor()) - - """ - BEFORE_SUB_BLOCK = 0 - IN_SUB_BLOCK = 1 - AFTER_SUB_BLOCK = 2 - - def __init__(self, reader, name=None): - self.underlying_reader = reader - new_reader_name = name if name is not None else unique_name( - "create_custom_reader") - self.main_prog = default_main_program() - self.reader = self.main_prog.current_block().create_var( - name=new_reader_name) - self.sub_block = None - self.source_var_names = None - self.sink_var_names = None - self.status = Preprocessor.BEFORE_SUB_BLOCK - - def _is_completed(self): - return self.sub_block and self.source_var_names and self.sink_var_names - - @signature_safe_contextmanager - def block(self): - self.status = Preprocessor.IN_SUB_BLOCK - self.sub_block = self.main_prog._create_block() - yield - self.main_prog._rollback() - self.status = Preprocessor.AFTER_SUB_BLOCK - if not self._is_completed(): - raise RuntimeError( - "The definition of preprocessor is incompleted! " - "Please make sure that you have set input and output " - "variables by invoking 'inputs' and 'outputs' in " - "Preprocessor's sub-block.") - - def inputs(self): - if self.status != Preprocessor.IN_SUB_BLOCK: - raise RuntimeError( - "Preprocessor.inputs() can only be invoked inside the sub-block." - ) - - source_shapes = self.underlying_reader.desc.shapes() - source_dtypes = self.underlying_reader.desc.dtypes() - source_lod_levels = self.underlying_reader.desc.lod_levels() - self.source_var_names = [ - unique_name("preprocessor_source") - for _ in six.moves.range(len(source_shapes)) - ] - source_vars = [] - for var_name, shape, dtype, lod_level in zip( - self.source_var_names, source_shapes, source_dtypes, - source_lod_levels): - source_vars.append(self.main_prog.current_block().create_var( - name=var_name, shape=shape, dtype=dtype, lod_level=lod_level)) - return source_vars - - def outputs(self, *outs): - if self.status != Preprocessor.IN_SUB_BLOCK: - raise RuntimeError( - "Preprocessor.outputs() can only be invoked inside the sub-block." - ) - self.sink_var_names = [var.name for var in outs] - - def __call__(self, *args, **kwargs): - if self.status != Preprocessor.AFTER_SUB_BLOCK: - raise RuntimeError( - "Preprocessor output can only be retrieved after rnn block.") - - self.main_prog.current_block().append_op( - type="create_custom_reader", - inputs={'UnderlyingReader': self.underlying_reader}, - outputs={'Out': [self.reader]}, - attrs={ - "sub_block": self.sub_block, - "source_var_names": self.source_var_names, - "sink_var_names": self.sink_var_names - }) - return monkey_patch_reader_methods(self.reader) - - @templatedoc() def load(out, file_path, load_as_fp16=None): """ diff --git a/python/paddle/fluid/recordio_writer.py b/python/paddle/fluid/recordio_writer.py deleted file mode 100644 index aa581f23a1..0000000000 --- a/python/paddle/fluid/recordio_writer.py +++ /dev/null @@ -1,132 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function - -import os -from .wrapped_decorator import signature_safe_contextmanager -from . import core -__all__ = [ - 'convert_reader_to_recordio_file', 'convert_reader_to_recordio_files' -] - - -@signature_safe_contextmanager -def create_recordio_writer(filename, - compressor=core.RecordIOWriter.Compressor.Snappy, - max_num_records=1000): - writer = core.RecordIOWriter(filename, compressor, max_num_records) - yield writer - writer.close() - - -def convert_reader_to_recordio_file( - filename, - reader_creator, - feeder, - compressor=core.RecordIOWriter.Compressor.Snappy, - max_num_records=1000, - feed_order=None): - """ - Convert a Python Reader to a recordio file. - - Examples: - - >>> import paddle.fluid as fluid - >>> import paddle.dataset.mnist as mnist - >>> import paddle - >>> - >>> tmp_program = fluid.Program() - >>> with fluid.program_guard(tmp_program): - >>> img = fluid.layers.data(name='img', shape=[784]) - >>> label = fluid.layers.data(name='label', shape=[1], dtype='int64') - >>> feeder = fluid.DataFeeder(feed_list=[img, label], place=fluid.CPUPlace()) - >>> # mnist.recordio will be generated in current directory - >>> fluid.recordio_writer.convert_reader_to_recordio_file( - >>> filename="mnist.recordio", - >>> reader_creator=paddle.batch(mnist.train(), batch_size=32), - >>> feeder=feeder) - - Args: - filename(str): The recordio filename. - reader_creator(callable): The Python Reader Creator. See - :ref:`api_guide_python_reader`. - feeder(DataFeeder): The DataFeeder instance. Used to convert - :code:`reader_creator` to :code: `lod_tensor` - compressor: Must in fluid.core.RecordIOWriter.Compressor.Snappy or - fluid.core.RecordIOWriter.Compressor.NoCompress. Use :code:`Snappy` - by default. - max_num_records(int): Maximum number of records in one chuck. Each record - is each return value from reader function - feed_order(list): The order of variable names that the reader returns - - Returns: - int: the number of record that saved. - """ - if feed_order is None: - feed_order = feeder.feed_names - counter = 0 - with create_recordio_writer(filename, compressor, - max_num_records) as writer: - for batch in reader_creator(): - res = feeder.feed(batch) - for each in feed_order: - writer.append_tensor(res[each]) - writer.complete_append_tensor() - counter += 1 - return counter - - -def convert_reader_to_recordio_files( - filename, - batch_per_file, - reader_creator, - feeder, - compressor=core.RecordIOWriter.Compressor.Snappy, - max_num_records=1000, - feed_order=None): - """ - convert a python reader to many recordio files. - - This API is basically same as :code:`convert_reader_to_recordio_file`, - instead of it will create many recordio files. Each file contains at - most :code:`batch_per_file` records. - - Please reference - :ref:`api_fluid_recordio_writer_convert_reader_to_recordio_file` for more - details. - """ - if feed_order is None: - feed_order = feeder.feed_names - f_name, f_ext = os.path.splitext(filename) - assert (f_ext == ".recordio") - - lines = [] - f_idx = 0 - counter = 0 - for idx, batch in enumerate(reader_creator()): - lines.append(batch) - if idx >= batch_per_file and idx % batch_per_file == 0: - filename = "%s-%05d%s" % (f_name, f_idx, f_ext) - with create_recordio_writer(filename, compressor, - max_num_records) as writer: - for l in lines: - res = feeder.feed(l) - for each in feed_order: - writer.append_tensor(res[each]) - writer.complete_append_tensor() - counter += 1 - lines = [] - f_idx += 1 - return counter diff --git a/python/paddle/fluid/tests/demo/file_reader/convert_data_to_recordio.py b/python/paddle/fluid/tests/demo/file_reader/convert_data_to_recordio.py deleted file mode 100644 index b00af91a9d..0000000000 --- a/python/paddle/fluid/tests/demo/file_reader/convert_data_to_recordio.py +++ /dev/null @@ -1,63 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function - -import sys -import paddle.fluid as fluid - - -def load_vocab(filename): - """ - load vocabulary - """ - vocab = {} - with open(filename) as f: - wid = 0 - for line in f: - vocab[line.strip()] = wid - wid += 1 - return vocab - - -# load word dict with paddle inner function -if len(sys.argv) == 1: - word_dict = paddle.dataset.imdb.word_dict() -else: - word_dict = load_vocab(sys.argv[1]) - word_dict[""] = len(word_dict) -print("Dict dim = ", len(word_dict)) - -# input text data -data = fluid.layers.data(name="words", shape=[1], dtype="int64", lod_level=1) - -# label data -label = fluid.layers.data(name="label", shape=[1], dtype="int64") -# like placeholder -feeder = fluid.DataFeeder(feed_list=[data, label], place=fluid.CPUPlace()) - -# train data set -BATCH_SIZE = 128 -train_reader = paddle.batch( - paddle.reader.shuffle( - paddle.dataset.imdb.train(word_dict), buf_size=25000), - batch_size=BATCH_SIZE) - -test_reader = paddle.batch( - paddle.dataset.imdb.test(word_dict), batch_size=BATCH_SIZE) - -fluid.recordio_writer.convert_reader_to_recordio_file( - "train.recordio", feeder=feeder, reader_creator=train_reader) -fluid.recordio_writer.convert_reader_to_recordio_file( - "test.recordio", feeder=feeder, reader_creator=test_reader) diff --git a/python/paddle/fluid/tests/test_cpp_reader.py b/python/paddle/fluid/tests/test_cpp_reader.py deleted file mode 100644 index b2a5253b95..0000000000 --- a/python/paddle/fluid/tests/test_cpp_reader.py +++ /dev/null @@ -1,94 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function - -import paddle -import paddle.fluid as fluid -import numpy as np -import sys - -startup_prog = fluid.framework.Program() -startup_block = startup_prog.current_block() - -random_reader = startup_block.create_var( - type=fluid.core.VarDesc.VarType.READER, name="RandomDataGenerator") -random_reader.desc.set_dtypes( - [fluid.core.VarDesc.VarType.FP32, fluid.core.VarDesc.VarType.FP32]) -random_reader.persistable = True -shuffle_reader = startup_block.create_var( - type=fluid.core.VarDesc.VarType.READER, name="ShuffleReader") -shuffle_reader.persistable = True -batch_reader = startup_block.create_var( - type=fluid.core.VarDesc.VarType.READER, name="BatchReader") -batch_reader.persistable = True -double_buffer = startup_block.create_var( - type=fluid.core.VarDesc.VarType.READER, name="DoubleBuffer") -double_buffer.persistable = True - -main_prog = startup_prog.clone() -main_block = main_prog.current_block() - -create_random_data_generator_op = startup_block.append_op( - type="create_random_data_generator", - outputs={"Out": random_reader}, - attrs={ - "shape_concat": [1, 2, 1, 1], - "ranks": [2, 2], - "low": 0.0, - "high": 1.0, - 'lod_levels': [0, 0] - }) - -create_shuffle_reader_op = startup_block.append_op( - type="create_shuffle_reader", - inputs={"UnderlyingReader": random_reader}, - outputs={"Out": shuffle_reader}, - attrs={"buffer_size": 7}) - -create_batch_reader_op = startup_block.append_op( - type="create_batch_reader", - inputs={"UnderlyingReader": shuffle_reader}, - outputs={"Out": batch_reader}, - attrs={"batch_size": 10}) - -create_double_buffer_reader_op = startup_block.append_op( - type="create_double_buffer_reader", - inputs={"UnderlyingReader": batch_reader}, - outputs={"Out": double_buffer}) - -out1 = main_block.create_var( - type=fluid.core.VarDesc.VarType.LOD_TENSOR, name="Out1") -out2 = main_block.create_var( - type=fluid.core.VarDesc.VarType.LOD_TENSOR, name="Out2") - -main_block.var("DoubleBuffer").desc.set_shapes(double_buffer.desc.shapes()) -main_block.var("DoubleBuffer").desc.set_dtypes(double_buffer.desc.dtypes()) -main_block.var("DoubleBuffer").desc.set_lod_levels( - double_buffer.desc.lod_levels()) - -read_op = main_block.append_op( - type="read", - inputs={"Reader": double_buffer}, - outputs={"Out": [out1, out2]}) - -place = fluid.CPUPlace() -exe = fluid.Executor(place) - -exe.run(startup_prog) - -for i in range(1, 100): - [res1, res2] = exe.run(main_prog, fetch_list=[out1, out2]) - if not (res1.shape == (10, 2) and res2.shape == (10, 1)): - exit(1) diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 933121ee3f..93ce55a1d4 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -305,7 +305,7 @@ if (WITH_MKLDNN) add_subdirectory(mkldnn) endif() -set_tests_properties(test_recordio_reader test_parallel_executor_test_while_train test_parallel_executor_mnist +set_tests_properties(test_parallel_executor_test_while_train test_parallel_executor_mnist test_parallel_executor_seresnext test_parallel_executor_crf test_sync_batch_norm_op test_parallel_executor_crf_auto_growth test_buffer_shared_memory_reuse_pass_and_fuse_optimization_op_pass test_buffer_shared_memory_reuse_pass PROPERTIES LABELS "RUN_TYPE=DIST") diff --git a/python/paddle/fluid/tests/unittests/feed_data_reader.py b/python/paddle/fluid/tests/unittests/feed_data_reader.py new file mode 100644 index 0000000000..1e6016d57b --- /dev/null +++ b/python/paddle/fluid/tests/unittests/feed_data_reader.py @@ -0,0 +1,78 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import six +import paddle.fluid as fluid +from paddle.fluid.framework import Variable + + +def cyclic_reader(reader): + def __reader__(): + while True: + for data in reader(): + yield data + + return __reader__ + + +class FeedDataReader(object): + def __init__(self, feed_list, reader): + self._feed_list = [] + for var in feed_list: + if isinstance(var, Variable): + self._feed_list.append(var.name) + else: + self._feed_list.append(var) + + self._reader = cyclic_reader(reader) + self._iter = self._reader() + + def _feed_executor(self): + next_data = next(self._iter) + feed_data = dict() + assert len(self._feed_list) == len(next_data) + for key, value in six.moves.zip(self._feed_list, next_data): + feed_data[key] = value + return feed_data + + def _feed_parallel_executor(self, device_num): + feed_data = [] + for _ in six.moves.range(device_num): + feed_data.append(self._feed_executor()) + + return feed_data + + def get_next(self, exe, program): + result = [] + assert isinstance(exe, fluid.Executor), "exe must be Executor" + use_cuda = isinstance(exe.place, fluid.CUDAPlace) + if isinstance(program, fluid.CompiledProgram): + if program._is_data_parallel: + use_executor = False + if program._places is None: + device_num = len(fluid.cuda_places()) if use_cuda else len( + fluid.cpu_places()) + else: + device_num = len(program._places) + else: + use_executor = True + device_num = 1 + else: + use_executor = True + device_num = 1 + + if use_executor: + return self._feed_executor() + else: + return self._feed_parallel_executor(device_num) diff --git a/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py b/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py index 94e451ff26..e56f41d713 100644 --- a/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py +++ b/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py @@ -24,6 +24,7 @@ import time import numpy as np import math import sys +from feed_data_reader import FeedDataReader __all__ = ['TestParallelExecutorBase'] @@ -36,6 +37,7 @@ class TestParallelExecutorBase(unittest.TestCase): iter=50, batch_size=None, feed_dict=None, + feed_data_reader=None, get_data_from_feeder=None, use_parallel_executor=True, use_reduce=False, @@ -49,9 +51,19 @@ class TestParallelExecutorBase(unittest.TestCase): use_fast_executor=False, enable_sequential_execution=False): def run_executor(exe, binary, feed, fetch_list): - res = exe.run(binary, feed=feed, fetch_list=fetch_list) + if feed_data_reader is None: + res = exe.run(binary, feed=feed, fetch_list=fetch_list) + else: + res = exe.run(binary, + feed=feed_data_reader.get_next(exe, binary), + fetch_list=fetch_list) return res + if feed_data_reader is not None: + assert isinstance( + feed_data_reader, FeedDataReader + ), "feed_data_reader must be type of FeedDataReader" + main = fluid.Program() startup = fluid.Program() startup.random_seed = 1 diff --git a/python/paddle/fluid/tests/unittests/test_fuse_relu_depthwise_conv_pass.py b/python/paddle/fluid/tests/unittests/test_fuse_relu_depthwise_conv_pass.py index d4be4f84af..7c9b56d403 100644 --- a/python/paddle/fluid/tests/unittests/test_fuse_relu_depthwise_conv_pass.py +++ b/python/paddle/fluid/tests/unittests/test_fuse_relu_depthwise_conv_pass.py @@ -21,8 +21,6 @@ import paddle.dataset.mnist as mnist import unittest import os -MNIST_RECORDIO_FILE = "./mnist_test_pe.recordio" - def norm(*args, **kargs): return fluid.layers.batch_norm(*args, **kargs) @@ -51,17 +49,9 @@ def sep_conv(input, channel, stride, filter, dilation=1, act=None): def simple_depthwise_net(use_feed): - if use_feed: - img = fluid.layers.data(name='image', shape=[784], dtype='float32') - label = fluid.layers.data(name='label', shape=[1], dtype='int64') - else: - reader = fluid.layers.open_files( - filenames=[MNIST_RECORDIO_FILE], - shapes=[[-1, 784], [-1, 1]], - lod_levels=[0, 0], - dtypes=['float32', 'int64']) - reader = fluid.layers.io.double_buffer(reader) - img, label = fluid.layers.read_file(reader) + assert use_feed + img = fluid.layers.data(name='image', shape=[784], dtype='float32') + label = fluid.layers.data(name='label', shape=[1], dtype='int64') hidden = fluid.layers.reshape(img, (-1, 1, 28, 28)) for _ in range(4): hidden = sep_conv(hidden, channel=200, stride=2, filter=5) @@ -73,23 +63,6 @@ def simple_depthwise_net(use_feed): class TestMNIST(TestParallelExecutorBase): - @classmethod - def setUpClass(cls): - os.environ['CPU_NUM'] = str(4) - # Convert mnist to recordio file - with fluid.program_guard(fluid.Program(), fluid.Program()): - reader = paddle.batch(mnist.train(), batch_size=4) - feeder = fluid.DataFeeder( - feed_list=[ # order is image and label - fluid.layers.data( - name='image', shape=[784]), - fluid.layers.data( - name='label', shape=[1], dtype='int64'), - ], - place=fluid.CPUPlace()) - fluid.recordio_writer.convert_reader_to_recordio_file( - MNIST_RECORDIO_FILE, reader, feeder) - def _init_data(self, random=True): np.random.seed(5) if random: diff --git a/python/paddle/fluid/tests/unittests/test_ir_memory_optimize_pass.py b/python/paddle/fluid/tests/unittests/test_ir_memory_optimize_pass.py index e224caee6e..d9f68c2d15 100644 --- a/python/paddle/fluid/tests/unittests/test_ir_memory_optimize_pass.py +++ b/python/paddle/fluid/tests/unittests/test_ir_memory_optimize_pass.py @@ -21,26 +21,16 @@ import paddle.dataset.mnist as mnist import unittest import os -MNIST_RECORDIO_FILE = "./mnist_test_pe.recordio" - -def _feed_data_helper(use_feed): - if use_feed: - img = fluid.layers.data(name='image', shape=[784], dtype='float32') - label = fluid.layers.data(name='label', shape=[1], dtype='int64') - else: - reader = fluid.layers.open_files( - filenames=[MNIST_RECORDIO_FILE], - shapes=[[-1, 784], [-1, 1]], - lod_levels=[0, 0], - dtypes=['float32', 'int64']) - reader = fluid.layers.io.double_buffer(reader) - img, label = fluid.layers.read_file(reader) +def _feed_data_helper(): + img = fluid.layers.data(name='image', shape=[784], dtype='float32') + label = fluid.layers.data(name='label', shape=[1], dtype='int64') return img, label def simple_fc_net(use_feed): - x, y = _feed_data_helper(use_feed) + assert use_feed + x, y = _feed_data_helper() hidden_layer = 4 for _ in range(hidden_layer): x = fluid.layers.fc(input=x, size=20, act='relu') @@ -51,7 +41,8 @@ def simple_fc_net(use_feed): def fc_with_inplace_net(use_feed): - x, y = _feed_data_helper(use_feed) + assert use_feed + x, y = _feed_data_helper() fc = fluid.layers.fc(input=x, size=20, act='relu') fc = fluid.layers.fc(input=fc, size=10, act='relu') reshape = fluid.layers.reshape(x=fc, shape=[-1, 2, 5]) @@ -63,23 +54,6 @@ def fc_with_inplace_net(use_feed): class TestMNIST(TestParallelExecutorBase): - @classmethod - def setUpClass(cls): - os.environ['CPU_NUM'] = str(4) - # Convert mnist to recordio file - with fluid.program_guard(fluid.Program(), fluid.Program()): - reader = paddle.batch(mnist.train(), batch_size=4) - feeder = fluid.DataFeeder( - feed_list=[ # order is image and label - fluid.layers.data( - name='image', shape=[784]), - fluid.layers.data( - name='label', shape=[1], dtype='int64'), - ], - place=fluid.CPUPlace()) - fluid.recordio_writer.convert_reader_to_recordio_file( - MNIST_RECORDIO_FILE, reader, feeder) - def _dummy_data(self): np.random.seed(5) img = np.random.random(size=[32, 784]).astype(np.float32) diff --git a/python/paddle/fluid/tests/unittests/test_ir_memory_optimize_transformer.py b/python/paddle/fluid/tests/unittests/test_ir_memory_optimize_transformer.py index 642be33b6e..1af696f873 100644 --- a/python/paddle/fluid/tests/unittests/test_ir_memory_optimize_transformer.py +++ b/python/paddle/fluid/tests/unittests/test_ir_memory_optimize_transformer.py @@ -22,46 +22,30 @@ import paddle.fluid.core as core import paddle.dataset.wmt16 as wmt16 os.environ['FLAGS_eager_delete_tensor_gb'] = "0.0" -os.environ[ - 'RECORDIO_FILENAME'] = '/tmp/ir_memory_optimize_transformer.wmt16.recordio' -from test_parallel_executor_transformer import transformer, ModelHyperParams, transformer_model, transformer, prepare_batch_input from parallel_executor_test_base import TestParallelExecutorBase +from test_parallel_executor_transformer import get_feed_data_reader, transformer # NOTE(dzhwinter): test diferent strategy colisions. # open the eager delete tensor strategy by default. class TestTransformerWithIR(TestParallelExecutorBase): - @classmethod - def setUpClass(cls): - os.environ['CPU_NUM'] = str(4) - reader = paddle.batch( - wmt16.train(ModelHyperParams.src_vocab_size, - ModelHyperParams.trg_vocab_size), - batch_size=transformer_model.batch_size) - - with fluid.recordio_writer.create_recordio_writer( - os.environ.get("RECORDIO_FILENAME")) as writer: - for batch in reader(): - for tensor in prepare_batch_input( - batch, ModelHyperParams.src_pad_idx, - ModelHyperParams.trg_pad_idx, ModelHyperParams.n_head): - t = fluid.LoDTensor() - t.set(tensor, fluid.CPUPlace()) - writer.append_tensor(t) - writer.complete_append_tensor() - def test_main(self): if core.is_compiled_with_cuda(): # check python transpiler self.check_network_convergence( transformer, use_cuda=True, + feed_data_reader=get_feed_data_reader(), use_ir_memory_optimize=False, iter=2) # check IR memory optimize self.check_network_convergence( - transformer, use_cuda=True, use_ir_memory_optimize=True, iter=2) + transformer, + use_cuda=True, + feed_data_reader=get_feed_data_reader(), + use_ir_memory_optimize=True, + iter=2) if __name__ == '__main__': diff --git a/python/paddle/fluid/tests/unittests/test_multi_file_reader.py b/python/paddle/fluid/tests/unittests/test_multi_file_reader.py deleted file mode 100644 index 09788868cc..0000000000 --- a/python/paddle/fluid/tests/unittests/test_multi_file_reader.py +++ /dev/null @@ -1,81 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function - -import unittest - -import paddle.fluid as fluid -import paddle -import paddle.dataset.mnist as mnist -from shutil import copyfile - - -class TestMultipleReader(unittest.TestCase): - def setUp(self): - self.batch_size = 64 - # Convert mnist to recordio file - with fluid.program_guard(fluid.Program(), fluid.Program()): - reader = paddle.batch(mnist.train(), batch_size=self.batch_size) - feeder = fluid.DataFeeder( - feed_list=[ # order is image and label - fluid.layers.data( - name='image', shape=[784]), - fluid.layers.data( - name='label', shape=[1], dtype='int64'), - ], - place=fluid.CPUPlace()) - self.num_batch = fluid.recordio_writer.convert_reader_to_recordio_file( - './mnist_0.recordio', reader, feeder) - copyfile('./mnist_0.recordio', './mnist_1.recordio') - copyfile('./mnist_0.recordio', './mnist_2.recordio') - - def main(self, is_test=False): - file_list = [ - './mnist_0.recordio', './mnist_1.recordio', './mnist_2.recordio' - ] - with fluid.program_guard(fluid.Program(), fluid.Program()): - data_files = fluid.layers.open_files( - filenames=file_list, - shapes=[(-1, 784), (-1, 1)], - lod_levels=[0, 0], - dtypes=['float32', 'int64'], - is_test=is_test) - img, label = fluid.layers.read_file(data_files) - - if fluid.core.is_compiled_with_cuda(): - place = fluid.CUDAPlace(0) - else: - place = fluid.CPUPlace() - - exe = fluid.Executor(place) - exe.run(fluid.default_startup_program()) - - batch_count = 0 - while True: - try: - img_val, = exe.run(fetch_list=[img]) - except fluid.core.EOFException: - break - batch_count += 1 - self.assertLessEqual(img_val.shape[0], self.batch_size) - self.assertEqual(batch_count, self.num_batch * 3) - - def test_main(self): - self.main(is_test=False) - self.main(is_test=True) - - -if __name__ == '__main__': - unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_multi_pass_reader.py b/python/paddle/fluid/tests/unittests/test_multi_pass_reader.py deleted file mode 100644 index 8835b6995e..0000000000 --- a/python/paddle/fluid/tests/unittests/test_multi_pass_reader.py +++ /dev/null @@ -1,69 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function - -import unittest - -import paddle.fluid as fluid -import paddle -import paddle.dataset.mnist as mnist -from paddle.fluid.layers.io import open_recordio_file - - -class TestMultipleReader(unittest.TestCase): - def setUp(self): - self.batch_size = 64 - self.pass_num = 3 - # Convert mnist to recordio file - with fluid.program_guard(fluid.Program(), fluid.Program()): - data_file = paddle.batch(mnist.train(), batch_size=self.batch_size) - feeder = fluid.DataFeeder( - feed_list=[ - fluid.layers.data( - name='image', shape=[784]), - fluid.layers.data( - name='label', shape=[1], dtype='int64'), - ], - place=fluid.CPUPlace()) - self.num_batch = fluid.recordio_writer.convert_reader_to_recordio_file( - './mnist.recordio', data_file, feeder) - - def test_main(self): - with fluid.program_guard(fluid.Program(), fluid.Program()): - data_file = open_recordio_file( - filename='./mnist.recordio', - shapes=[(-1, 784), (-1, 1)], - lod_levels=[0, 0], - dtypes=['float32', 'int64'], - pass_num=self.pass_num) - img, label = fluid.layers.read_file(data_file) - - if fluid.core.is_compiled_with_cuda(): - place = fluid.CUDAPlace(0) - else: - place = fluid.CPUPlace() - - exe = fluid.Executor(place) - exe.run(fluid.default_startup_program()) - - batch_count = 0 - while True: - try: - img_val, = exe.run(fetch_list=[img]) - except fluid.core.EOFException: - break - batch_count += 1 - self.assertLessEqual(img_val.shape[0], self.batch_size) - self.assertEqual(batch_count, self.num_batch * self.pass_num) diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_transformer.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_transformer.py index b1851f4c78..1f47d87811 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_transformer.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_transformer.py @@ -23,8 +23,7 @@ import paddle import paddle.fluid.core as core import paddle.dataset.wmt16 as wmt16 import os - -WMT16_RECORDIO_FILE = os.environ.get('RECORDIO_FILENAME', '/tmp/wmt16.recordio') +from feed_data_reader import FeedDataReader class ModelHyperParams(object): @@ -140,6 +139,9 @@ def prepare_batch_input(insts, src_pad_idx, trg_pad_idx, n_head): ] +feed_data_reader = None + + def transformer(use_feed): assert not use_feed, "transfomer doesn't support feed yet" return transformer_model.transformer( @@ -152,32 +154,57 @@ def transformer(use_feed): ModelHyperParams.trg_pad_idx, ModelHyperParams.pos_pad_idx) +def get_feed_data_reader(): + global feed_data_reader + if feed_data_reader is not None: + return feed_data_reader + + reader = paddle.batch( + wmt16.train(ModelHyperParams.src_vocab_size, + ModelHyperParams.trg_vocab_size), + batch_size=transformer_model.batch_size) + all_batch_tensors = [] + for batch in reader(): + tensors = [] + for tensor in prepare_batch_input(batch, ModelHyperParams.src_pad_idx, + ModelHyperParams.trg_pad_idx, + ModelHyperParams.n_head): + tensors.append(np.array(tensor)) + all_batch_tensors.append(tensors) + + def __reader__(): + for t in all_batch_tensors: + yield t + + feed_data_reader = FeedDataReader( + feed_list=transformer_model.build_inputs( + ModelHyperParams.max_length + 1, ModelHyperParams.n_head), + reader=__reader__) + + return feed_data_reader + + class TestTransformer(TestParallelExecutorBase): @classmethod def setUpClass(cls): os.environ['CPU_NUM'] = str(4) - reader = paddle.batch( - wmt16.train(ModelHyperParams.src_vocab_size, - ModelHyperParams.trg_vocab_size), - batch_size=transformer_model.batch_size) - - with fluid.recordio_writer.create_recordio_writer( - WMT16_RECORDIO_FILE) as writer: - for batch in reader(): - for tensor in prepare_batch_input( - batch, ModelHyperParams.src_pad_idx, - ModelHyperParams.trg_pad_idx, ModelHyperParams.n_head): - t = fluid.LoDTensor() - t.set(tensor, fluid.CPUPlace()) - writer.append_tensor(t) - writer.complete_append_tensor() def test_main(self): if core.is_compiled_with_cuda(): - self.check_network_convergence(transformer, use_cuda=True) self.check_network_convergence( - transformer, use_cuda=True, enable_sequential_execution=True) - self.check_network_convergence(transformer, use_cuda=False, iter=2) + transformer, + use_cuda=True, + feed_data_reader=get_feed_data_reader()) + self.check_network_convergence( + transformer, + use_cuda=True, + enable_sequential_execution=True, + feed_data_reader=get_feed_data_reader()) + self.check_network_convergence( + transformer, + use_cuda=False, + iter=2, + feed_data_reader=get_feed_data_reader()) if __name__ == '__main__': diff --git a/python/paddle/fluid/tests/unittests/test_preprocessor.py b/python/paddle/fluid/tests/unittests/test_preprocessor.py deleted file mode 100644 index 0f0bdfc44a..0000000000 --- a/python/paddle/fluid/tests/unittests/test_preprocessor.py +++ /dev/null @@ -1,96 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function - -import unittest -import numpy as np - -import paddle -import paddle.fluid as fluid -import paddle.dataset.mnist as mnist -from paddle.fluid.layers.io import open_recordio_file - - -class TestPreprocessor(unittest.TestCase): - def setUp(self): - with fluid.program_guard(fluid.Program(), fluid.Program()): - reader = paddle.batch(mnist.train(), batch_size=32) - feeder = fluid.DataFeeder( - feed_list=[ # order is image and label - fluid.layers.data( - name='image', shape=[784]), - fluid.layers.data( - name='label', shape=[1], dtype='int64'), - ], - place=fluid.CPUPlace()) - self.num_batches = fluid.recordio_writer.convert_reader_to_recordio_file( - './mnist_for_preprocessor_test.recordio', reader, feeder) - - def test_main(self): - N = 10 - - img_expected_res = [] - lbl_expected_res = [] - with fluid.program_guard(fluid.Program(), fluid.Program()): - data_file = open_recordio_file( - './mnist_for_preprocessor_test.recordio', - shapes=[[-1, 784], [-1, 1]], - lod_levels=[0, 0], - dtypes=['float32', 'int64']) - img, lbl = fluid.layers.io.read_file(data_file) - - if fluid.core.is_compiled_with_cuda(): - place = fluid.CUDAPlace(0) - else: - place = fluid.CPUPlace() - exe = fluid.Executor(place) - exe.run(fluid.default_startup_program()) - for _ in range(N): - img_v, lbl_v = exe.run(fetch_list=[img, lbl]) - img_expected_res.append(img_v / 2) - lbl_expected_res.append(lbl_v + 1) - - img_actual_res = [] - lbl_actual_res = [] - with fluid.program_guard(fluid.Program(), fluid.Program()): - data_file = open_recordio_file( - './mnist_for_preprocessor_test.recordio', - shapes=[[-1, 784], [-1, 1]], - lod_levels=[0, 0], - dtypes=['float32', 'int64']) - preprocessor = fluid.layers.io.Preprocessor(reader=data_file) - with preprocessor.block(): - img, lbl = preprocessor.inputs() - img_out = img / 2 - lbl_out = lbl + 1 - preprocessor.outputs(img_out, lbl_out) - - data_file = fluid.layers.io.double_buffer(preprocessor()) - img, lbl = fluid.layers.io.read_file(data_file) - - if fluid.core.is_compiled_with_cuda(): - place = fluid.CUDAPlace(0) - else: - place = fluid.CPUPlace() - exe = fluid.Executor(place) - exe.run(fluid.default_startup_program()) - for _ in range(N): - img_v, lbl_v = exe.run(fetch_list=[img, lbl]) - img_actual_res.append(img_v) - lbl_actual_res.append(lbl_v) - - for idx in range(N): - np.allclose(img_expected_res[idx], img_actual_res[idx]) - np.allclose(lbl_expected_res[idx], lbl_actual_res[idx]) diff --git a/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py b/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py index e4fb9b1970..abdeff9cb0 100644 --- a/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py +++ b/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py @@ -15,8 +15,10 @@ from __future__ import print_function import unittest +import paddle import paddle.fluid as fluid from paddle.fluid import compiler +import paddle.fluid.unique_name as unique_name import paddle.fluid.core as core import numpy as np import threading @@ -42,13 +44,42 @@ def as_numpy(tensor_or_numpy): tensor_or_numpy, np.ndarray) else np.array(tensor_or_numpy) -def feed_data(feed_queue, reader): - data_generator = reader() +def sample_list_to_tensor_array(sample_list): + slot_num = None + slots = None + for sample in sample_list: + if slot_num is None: + slot_num = len(sample) + slots = [None] * len(sample) + else: + assert slot_num == len(sample) + + for slot_id, slot_item in enumerate(sample): + if slots[slot_id] is None: + slots[slot_id] = [] + slots[slot_id].append(slot_item) + + tensor_array = fluid.LoDTensorArray() + for slot in slots: + t = fluid.LoDTensor() + t.set(np.array(slot), fluid.CPUPlace()) + tensor_array.append(t) + + return tensor_array + + +def feed_data(feed_queue, batch_reader): + data_generator = batch_reader() while True: data = next(data_generator, None) - if data is None or not feed_queue.push(data): + if data is None or (len(data) == 1 and data[0] is None): break + if not feed_queue.push(sample_list_to_tensor_array(data)): + break + + feed_queue.close() + def simple_fc_net(in_size, class_num, @@ -57,26 +88,25 @@ def simple_fc_net(in_size, queue_capacity, use_double_buffer=False, use_feed_list=True): + in_data = fluid.layers.data(name="data", dtype='float32', shape=[in_size]) + label = fluid.layers.data(name='label', dtype='int64', shape=[1]) if use_feed_list: - data = fluid.layers.data(name="data", dtype='float32', shape=[in_size]) - label = fluid.layers.data(name='label', dtype='int64', shape=[1]) py_reader = fluid.layers.create_py_reader_by_data( capacity=queue_capacity, - use_double_buffer=False, - feed_list=[data, label]) + use_double_buffer=use_double_buffer, + feed_list=[in_data, label], + name=unique_name.generate('py_reader_name')) else: py_reader = fluid.layers.py_reader( capacity=queue_capacity, - shapes=[[-1, in_size], [-1, 1]], - lod_levels=[0, 0], + shapes=[in_data.shape, label.shape], dtypes=['float32', 'int64'], - use_double_buffer=False) - feed_queue = py_reader.queue - reader = fluid.layers.batch(py_reader, batch_size=batch_size) - if use_double_buffer: - reader = fluid.layers.double_buffer(reader) + name=unique_name.generate('py_reader_name'), + use_double_buffer=use_double_buffer) + + in_data, label = fluid.layers.read_file(py_reader) - in_data, label = fluid.layers.read_file(reader) + feed_queue = py_reader.queue hidden = in_data for hidden_size in hidden_sizes: @@ -128,33 +158,24 @@ class TestPyReaderUsingExecutor(unittest.TestCase): def tensor_reader(self, use_decorate_paddle_reader): def reader(): - self.inputs = [] - cnt = 0 - while True: - tensors = fluid.LoDTensorArray() + for sample_id in range(self.batch_size * self.iterations * + self.batch_size_times): in_data = np.random.uniform( - low=0, high=1, size=(1, self.in_size)).astype('float32') - tensors.append(as_tensor(in_data)) + low=0, high=1, size=(self.in_size, )).astype('float32') label = np.random.random_integers( - low=0, high=self.class_num - 1, size=(1, 1)).astype('int64') - tensors.append(as_tensor(label)) - - if cnt < self.iterations * self.batch_size * self.batch_size_times: - if cnt % (self.batch_size * self.batch_size_times) == 0: - self.inputs.append([in_data, label]) - else: - self.inputs[-1][0] = np.concatenate( - (self.inputs[-1][0], in_data), axis=0) - self.inputs[-1][1] = np.concatenate( - (self.inputs[-1][1], label), axis=0) - elif not self.use_double_buffer: - break + low=0, high=self.class_num - 1, size=(1, )).astype('int64') - if use_decorate_paddle_reader: - yield [(in_data, label)] + reshaped_in_data = np.reshape(in_data, [1, -1]) + reshaped_label = np.reshape(label, [1, -1]) + if sample_id % (self.batch_size * self.batch_size_times) == 0: + self.inputs.append([reshaped_in_data, reshaped_label]) else: - yield tensors - cnt += 1 + self.inputs[-1][0] = np.concatenate( + (self.inputs[-1][0], reshaped_in_data), axis=0) + self.inputs[-1][1] = np.concatenate( + (self.inputs[-1][1], reshaped_label), axis=0) + + yield in_data, label if not use_decorate_paddle_reader: yield None @@ -205,21 +226,31 @@ class TestPyReaderUsingExecutor(unittest.TestCase): self.batch_size_times = 1 reader = self.tensor_reader(use_decorate_paddle_reader) + batch_reader = paddle.batch(reader, batch_size=self.batch_size) + + self.inputs = [] + self.outputs = [] + if use_decorate_paddle_reader: - py_reader.decorate_paddle_reader(reader) + if use_feed_list: + py_reader.decorate_paddle_reader(batch_reader) + else: + py_reader.decorate_sample_list_generator(batch_reader) py_reader.start() else: thread = threading.Thread( - target=feed_data, args=(feed_queue, reader)) + target=feed_data, args=(feed_queue, batch_reader)) thread.daemon = True thread.start() - self.outputs = [] - for _ in range(self.iterations): - fetches = exe.run(train_cp, - fetch_list=[in_data.name, label.name]) - fetches = [as_numpy(fetch) for fetch in fetches] - self.outputs.append(fetches) + try: + while True: + fetches = exe.run(train_cp, + fetch_list=[in_data.name, label.name]) + fetches = [as_numpy(fetch) for fetch in fetches] + self.outputs.append(fetches) + except fluid.core.EOFException: + pass feed_queue.close() self.validate() @@ -230,8 +261,13 @@ class TestPyReaderUsingExecutor(unittest.TestCase): thread.join() def validate(self): - self.assertEqual(len(self.inputs), len(self.outputs)) - for batch_in, batch_out in zip(self.inputs, self.outputs): + if not self.use_double_buffer: + self.assertEqual(len(self.inputs), len(self.outputs)) + else: + self.assertTrue(len(self.inputs) >= len(self.outputs)) + for idx in range(len(self.outputs)): + batch_in = self.inputs[idx] + batch_out = self.outputs[idx] self.assertEqual(len(batch_in), len(batch_out)) if self.use_parallel_executor and not self.use_double_buffer: self.validate_unordered_batch(batch_in, batch_out) diff --git a/python/paddle/fluid/tests/unittests/test_reader_reset.py b/python/paddle/fluid/tests/unittests/test_reader_reset.py index da89ccb961..cb1be32935 100644 --- a/python/paddle/fluid/tests/unittests/test_reader_reset.py +++ b/python/paddle/fluid/tests/unittests/test_reader_reset.py @@ -14,6 +14,7 @@ from __future__ import print_function import os +os.environ['CPU_NUM'] = str(1) import paddle.fluid as fluid from paddle.fluid import compiler import paddle @@ -27,28 +28,14 @@ class TestReaderReset(unittest.TestCase): for n in range(self.total_ins_num): yield np.ones(self.ins_shape) * n, n - # Prepare data - with fluid.program_guard(fluid.Program(), fluid.Program()): - reader = paddle.batch(fake_data_generator, batch_size=1) - feeder = fluid.DataFeeder( - feed_list=[ - fluid.layers.data( - name='data', shape=[3], dtype='float32'), - fluid.layers.data( - name='label', shape=[1], dtype='int64'), - ], - place=fluid.CPUPlace()) - fluid.recordio_writer.convert_reader_to_recordio_file( - self.data_file_name, reader, feeder) + return fake_data_generator def setUp(self): - # set parallel threads to fit 20 batches in line 49 - os.environ['CPU_NUM'] = str(20) self.use_cuda = fluid.core.is_compiled_with_cuda() - self.data_file_name = './reader_reset_test.recordio' self.ins_shape = [3] self.batch_size = 5 - self.total_ins_num = self.batch_size * 20 + self.batch_num = 20 + self.total_ins_num = self.batch_size * self.batch_num self.test_pass_num = 100 self.prepare_data() @@ -57,42 +44,46 @@ class TestReaderReset(unittest.TestCase): startup_prog = fluid.Program() with fluid.program_guard(main_prog, startup_prog): - data_reader_handle = fluid.layers.io.open_files( - filenames=[self.data_file_name], - shapes=[[-1] + self.ins_shape, [-1, 1]], - lod_levels=[0, 0], - dtypes=['float32', 'int64'], - thread_num=1, - pass_num=1) - data_reader = fluid.layers.io.batch(data_reader_handle, - self.batch_size) - if with_double_buffer: - data_reader = fluid.layers.double_buffer(data_reader) - image, label = fluid.layers.read_file(data_reader) + image = fluid.layers.data( + name='image', shape=self.ins_shape, dtype='float32') + label = fluid.layers.data(name='label', shape=[1], dtype='int64') + data_reader_handle = fluid.io.PyReader( + feed_list=[image, label], + capacity=16, + iterable=False, + use_double_buffer=with_double_buffer) fetch_list = [image.name, label.name] place = fluid.CUDAPlace(0) if self.use_cuda else fluid.CPUPlace() exe = fluid.Executor(place) exe.run(startup_prog) + data_reader_handle.decorate_sample_list_generator( + paddle.batch( + self.prepare_data(), batch_size=self.batch_size)) + train_cp = compiler.CompiledProgram(main_prog).with_data_parallel() + + batch_id = 0 pass_count = 0 - while (True): + while pass_count < self.test_pass_num: + data_reader_handle.start() try: - data_val, label_val = exe.run(train_cp, - fetch_list=fetch_list, - return_numpy=True) - ins_num = data_val.shape[0] - broadcasted_label = np.ones((ins_num, ) + tuple( - self.ins_shape)) * label_val.reshape((ins_num, 1)) - self.assertEqual(data_val.all(), broadcasted_label.all()) - + while True: + data_val, label_val = exe.run(train_cp, + fetch_list=fetch_list, + return_numpy=True) + ins_num = data_val.shape[0] + broadcasted_label = np.ones((ins_num, ) + tuple( + self.ins_shape)) * label_val.reshape((ins_num, 1)) + self.assertEqual(data_val.all(), broadcasted_label.all()) + batch_id += 1 except fluid.core.EOFException: + data_reader_handle.reset() pass_count += 1 - if pass_count < self.test_pass_num: - data_reader_handle.reset() - else: - break + self.assertEqual(pass_count * self.batch_num, batch_id) + + self.assertEqual(pass_count, self.test_pass_num) def test_all(self): self.main(with_double_buffer=False) diff --git a/python/paddle/fluid/tests/unittests/test_recordio_reader.py b/python/paddle/fluid/tests/unittests/test_recordio_reader.py deleted file mode 100644 index 0417da7228..0000000000 --- a/python/paddle/fluid/tests/unittests/test_recordio_reader.py +++ /dev/null @@ -1,92 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function - -import unittest - -import paddle.fluid as fluid -import paddle -import paddle.dataset.mnist as mnist -from paddle.fluid.layers.io import open_recordio_file - - -class TestRecordIO(unittest.TestCase): - def setUp(self): - # Convert mnist to recordio file - with fluid.program_guard(fluid.Program(), fluid.Program()): - reader = paddle.batch(mnist.train(), batch_size=32) - feeder = fluid.DataFeeder( - feed_list=[ # order is image and label - fluid.layers.data( - name='image', shape=[784]), - fluid.layers.data( - name='label', shape=[1], dtype='int64'), - ], - place=fluid.CPUPlace()) - self.num_batches = fluid.recordio_writer.convert_reader_to_recordio_file( - './mnist.recordio', reader, feeder) - - def test_main(self, decorator_callback=None): - # use new program - with fluid.program_guard(fluid.Program(), fluid.Program()): - data_file = open_recordio_file( - './mnist.recordio', - shapes=[[-1, 784], [-1, 1]], - lod_levels=[0, 0], - dtypes=['float32', 'int64']) - if decorator_callback is not None: - data_file = decorator_callback(data_file) - img, label = fluid.layers.read_file(data_file) - - hidden = fluid.layers.fc(input=img, size=100, act='tanh') - prediction = fluid.layers.fc(input=hidden, size=10, act='softmax') - loss = fluid.layers.cross_entropy(input=prediction, label=label) - avg_loss = fluid.layers.mean(loss) - - fluid.optimizer.Adam(learning_rate=1e-3).minimize(avg_loss) - - if fluid.core.is_compiled_with_cuda(): - place = fluid.CUDAPlace(0) - else: - place = fluid.CPUPlace() - - exe = fluid.Executor(place) - exe.run(fluid.default_startup_program()) - avg_loss_np = [] - - # train a pass - batch_id = 0 - while True: - try: - tmp, = exe.run(fetch_list=[avg_loss]) - except fluid.core.EOFException: - break - - avg_loss_np.append(tmp) - batch_id += 1 - self.assertEqual(batch_id, self.num_batches) - self.assertLess(avg_loss_np[-1], avg_loss_np[0]) - - def test_shuffle_reader(self): - self.test_main(decorator_callback=lambda reader: fluid.layers.io.shuffle( - reader, buffer_size=200)) - - def test_double_buffer_reader(self): - self.test_main(decorator_callback=lambda reader: fluid.layers.io.double_buffer(reader, - place='cuda:0' if fluid.core.is_compiled_with_cuda() else 'cpu')) - - -if __name__ == '__main__': - unittest.main() diff --git a/python/paddle/fluid/tests/unittests/transformer_model.py b/python/paddle/fluid/tests/unittests/transformer_model.py index d59f9da4a9..1782d43249 100644 --- a/python/paddle/fluid/tests/unittests/transformer_model.py +++ b/python/paddle/fluid/tests/unittests/transformer_model.py @@ -20,7 +20,6 @@ import numpy as np import os import paddle.fluid as fluid import paddle.fluid.layers as layers -from paddle.fluid.layers.io import open_recordio_file pos_enc_param_names = ( "src_pos_enc_table", @@ -394,6 +393,51 @@ def decoder(dec_input, return dec_output +def build_inputs(max_length, n_head): + names = [ + 'src_word', + 'src_pos', + 'trg_word', + 'trg_pos', + 'src_slf_attn_bias', + 'trg_slf_attn_bias', + 'trg_src_attn_bias', + 'gold', + 'weights', + ] + + shapes = [ + [batch_size * max_length, 1], + [batch_size * max_length, 1], + [batch_size * max_length, 1], + [batch_size * max_length, 1], + [batch_size, n_head, max_length, max_length], + [batch_size, n_head, max_length, max_length], + [batch_size, n_head, max_length, max_length], + [batch_size * max_length, 1], + [batch_size * max_length, 1], + ] + + dtypes = [ + 'int64', + 'int64', + 'int64', + 'int64', + 'float32', + 'float32', + 'float32', + 'int64', + 'float32', + ] + + all_inputs = [] + for name, shape, dtype in zip(names, shapes, dtypes): + all_inputs.append( + fluid.layers.data( + name=name, shape=shape, dtype=dtype, append_batch_size=False)) + return all_inputs + + def transformer( src_vocab_size, trg_vocab_size, @@ -408,34 +452,9 @@ def transformer( src_pad_idx, trg_pad_idx, pos_pad_idx, ): - file_obj = open_recordio_file( - filename=os.environ.get('RECORDIO_FILENAME', '/tmp/wmt16.recordio'), - shapes=[ - [batch_size * max_length, 1], - [batch_size * max_length, 1], - [batch_size * max_length, 1], - [batch_size * max_length, 1], - [batch_size, n_head, max_length, max_length], - [batch_size, n_head, max_length, max_length], - [batch_size, n_head, max_length, max_length], - [batch_size * max_length, 1], - [batch_size * max_length, 1], - ], - dtypes=[ - 'int64', - 'int64', - 'int64', - 'int64', - 'float32', - 'float32', - 'float32', - 'int64', - 'float32', - ], - lod_levels=[0] * 9) - - src_word, src_pos, trg_word, trg_pos, src_slf_attn_bias, trg_slf_attn_bias, trg_src_attn_bias, gold, weights = fluid.layers.read_file( - file_obj) + + src_word, src_pos, trg_word, trg_pos, src_slf_attn_bias, trg_slf_attn_bias, trg_src_attn_bias, gold, weights = build_inputs( + max_length, n_head) enc_input = prepare_encoder( src_word, diff --git a/python/paddle/reader/tests/test_data_creator.txt b/python/paddle/reader/tests/test_data_creator.txt deleted file mode 100644 index a2a8d47d43..0000000000 --- a/python/paddle/reader/tests/test_data_creator.txt +++ /dev/null @@ -1,3 +0,0 @@ -0 1 -2 3 -4 5 diff --git a/python/paddle/reader/tests/test_reader_recordio.dat b/python/paddle/reader/tests/test_reader_recordio.dat deleted file mode 100644 index a99a35bb829e066c4845d0b85b96cd1eb3a12491..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 76 zcmZQ!W@4P2Bs!asfq}sSh?#)+KN|x>v0q|9K_sIV14Bftj}1RiRKwGd%hQO<)0nHI Tz>rH1B4onlY0Bkk1`z@P(}N7c diff --git a/python/paddle/reader/tests/test_recordio_creator.dat b/python/paddle/reader/tests/test_recordio_creator.dat deleted file mode 100644 index 17aa89b6796184407e83246d3f342a55a66b4a69..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 88 zcmZQ!W@2QOHw=2.20.0 numpy>=1.12, <=1.16.4 ; python_version<"3.5" numpy>=1.12 ; python_version>="3.5" protobuf>=3.1.0 -recordio>=0.1.0 matplotlib<=2.2.4 ; python_version<"3.6" scipy>=0.19.0, <=1.2.1 ; python_version<"3.5" nltk>=3.2.2, <=3.4 ; python_version<"3.5" -- GitLab