From f4d9adc7d08030f806fafb398b9fef8cc21c2d8f Mon Sep 17 00:00:00 2001 From: WeiXin Date: Thu, 22 Apr 2021 11:14:30 +0800 Subject: [PATCH] support save/load binary format tensor. (#32211) * support save/load binary format tensor * Fix error when create cudaplace * Fix error when create cudaplace * Fix error when create cudaplace * get devive context from pool. * move define of 'SerializeToStream' and 'DeserializeFromStream' to 'lod_tensor.cc' and 'selected_rows.cc'. * improve coverage. * improve coverage. * polish API * deal with conflict * disable save/load large file in unnittest * split unnittest. --- paddle/fluid/framework/lod_tensor.cc | 15 ++ paddle/fluid/framework/lod_tensor.h | 4 + paddle/fluid/framework/selected_rows.cc | 15 ++ paddle/fluid/framework/selected_rows.h | 4 + paddle/fluid/pybind/pybind.cc | 51 +++- python/paddle/fluid/core.py | 8 + .../fluid/tests/unittests/CMakeLists.txt | 1 + .../unittests/test_paddle_save_load_binary.py | 217 ++++++++++++++++++ python/paddle/framework/io.py | 119 ++++++++-- 9 files changed, 410 insertions(+), 24 deletions(-) create mode 100644 python/paddle/fluid/tests/unittests/test_paddle_save_load_binary.py diff --git a/paddle/fluid/framework/lod_tensor.cc b/paddle/fluid/framework/lod_tensor.cc index 3a79452e230..0a6b5e44452 100644 --- a/paddle/fluid/framework/lod_tensor.cc +++ b/paddle/fluid/framework/lod_tensor.cc @@ -268,6 +268,21 @@ void SerializeToStream(std::ostream &os, const LoDTensor &tensor, TensorToStream(os, static_cast(tensor), dev_ctx); } +void SerializeToStream(std::ostream &os, const LoDTensor &tensor) { + platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); + const platform::DeviceContext *dev_ctx; + auto place = tensor.place(); + dev_ctx = pool.Get(place); + SerializeToStream(os, tensor, *dev_ctx); +} + +void DeserializeFromStream(std::ifstream &os, LoDTensor *tensor) { + platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); + const platform::DeviceContext *dev_ctx; + dev_ctx = pool.Get(platform::CPUPlace()); + DeserializeFromStream(os, tensor, *dev_ctx); +} + void DeserializeFromStream(std::istream &is, LoDTensor *tensor, const platform::DeviceContext &dev_ctx, const size_t &seek, diff --git a/paddle/fluid/framework/lod_tensor.h b/paddle/fluid/framework/lod_tensor.h index 6b6112f1f3e..6b357aba1c5 100644 --- a/paddle/fluid/framework/lod_tensor.h +++ b/paddle/fluid/framework/lod_tensor.h @@ -255,5 +255,9 @@ LoD ConvertToLengthBasedLoD(const LoD& offset_lod); LoD ConvertToOffsetBasedLoD(const LoD& length_lod); +void SerializeToStream(std::ostream& os, const LoDTensor& tensor); + +void DeserializeFromStream(std::ifstream& os, LoDTensor* tensor); + } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/selected_rows.cc b/paddle/fluid/framework/selected_rows.cc index 4c30c40ad58..7e48d0dc5f9 100644 --- a/paddle/fluid/framework/selected_rows.cc +++ b/paddle/fluid/framework/selected_rows.cc @@ -113,6 +113,21 @@ void SerializeToStream(std::ostream& os, const SelectedRows& selected_rows, TensorToStream(os, selected_rows.value(), dev_ctx); } +void SerializeToStream(std::ostream& os, const SelectedRows& selected_rows) { + platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); + const platform::DeviceContext* dev_ctx; + auto place = selected_rows.place(); + dev_ctx = pool.Get(place); + SerializeToStream(os, selected_rows, *dev_ctx); +} + +void DeserializeFromStream(std::ifstream& os, SelectedRows* selected_rows) { + platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); + const platform::DeviceContext* dev_ctx; + dev_ctx = pool.Get(platform::CPUPlace()); + DeserializeFromStream(os, selected_rows, *dev_ctx); +} + void DeserializeFromStream(std::istream& is, SelectedRows* selected_rows, const platform::DeviceContext& dev_ctx) { { diff --git a/paddle/fluid/framework/selected_rows.h b/paddle/fluid/framework/selected_rows.h index 48353b43f56..e53e3d973c5 100644 --- a/paddle/fluid/framework/selected_rows.h +++ b/paddle/fluid/framework/selected_rows.h @@ -173,5 +173,9 @@ void SerializeToStream(std::ostream& os, const SelectedRows& selected_rows, void DeserializeFromStream(std::istream& is, SelectedRows* selected_rows, const platform::DeviceContext& dev_ctx); +void SerializeToStream(std::ostream& os, const SelectedRows& selected_rows); + +void DeserializeFromStream(std::ifstream& os, SelectedRows* selected_rows); + } // namespace framework } // namespace paddle diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index faaeceefa88..560d8c892b0 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -496,7 +496,56 @@ PYBIND11_MODULE(core_noavx, m) { #endif return tensor; }); - + m.def("_save_lod_tensor", [](const LoDTensor &tensor, + const std::string &str_file_name) { + std::ofstream fout(str_file_name, std::ios::binary); + PADDLE_ENFORCE_EQ(static_cast(fout), true, + platform::errors::Unavailable( + "Cannot open %s to save variables.", str_file_name)); + SerializeToStream(fout, tensor); + + int64_t tellp = fout.tellp(); + fout.close(); + return tellp; + }); + m.def("_load_lod_tensor", [](LoDTensor &tensor, + const std::string &str_file_name) { + std::ifstream fin(str_file_name, std::ios::binary); + PADDLE_ENFORCE_EQ(static_cast(fin), true, + platform::errors::Unavailable( + "Cannot open %s to load variables.", str_file_name)); + + DeserializeFromStream(fin, &tensor); + int64_t tellg = fin.tellg(); + fin.close(); + return tellg; + }); + m.def("_save_selected_rows", [](const SelectedRows &selected_rows, + const std::string &str_file_name) { + std::ofstream fout(str_file_name, std::ios::binary); + PADDLE_ENFORCE_EQ( + static_cast(fout), true, + platform::errors::Unavailable("Cannot open %s to save SelectedRows.", + str_file_name)); + + SerializeToStream(fout, selected_rows); + int64_t tellp = fout.tellp(); + fout.close(); + return tellp; + }); + m.def("_load_selected_rows", + [](SelectedRows &selected_rows, const std::string &str_file_name) { + std::ifstream fin(str_file_name, std::ios::binary); + PADDLE_ENFORCE_EQ( + static_cast(fin), true, + platform::errors::Unavailable( + "Cannot open %s to load SelectedRows.", str_file_name)); + + DeserializeFromStream(fin, &selected_rows); + int64_t tellg = fin.tellg(); + fin.close(); + return tellg; + }); m.def("_save_static_dict", [](const std::string &str_file_name, const py::handle &vec_var_list, const Scope &scope) { diff --git a/python/paddle/fluid/core.py b/python/paddle/fluid/core.py index d3dc26c946d..49bcaf6dd60 100644 --- a/python/paddle/fluid/core.py +++ b/python/paddle/fluid/core.py @@ -270,6 +270,10 @@ if avx_supported(): from .core_avx import _load_static_dict from .core_avx import _save_dygraph_dict from .core_avx import _load_dygraph_dict + from .core_avx import _save_lod_tensor + from .core_avx import _load_lod_tensor + from .core_avx import _save_selected_rows + from .core_avx import _load_selected_rows from .core_avx import _create_loaded_parameter from .core_avx import _cuda_synchronize from .core_avx import _promote_types_if_complex_exists @@ -325,6 +329,10 @@ if load_noavx: from .core_noavx import _load_static_dict from .core_noavx import _save_dygraph_dict from .core_noavx import _load_dygraph_dict + from .core_noavx import _save_lod_tensor + from .core_noavx import _load_lod_tensor + from .core_noavx import _save_selected_rows + from .core_noavx import _load_selected_rows from .core_noavx import _create_loaded_parameter from .core_noavx import _cuda_synchronize from .core_noavx import _promote_types_if_complex_exists diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index d0906052c99..c3ec312331b 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -741,6 +741,7 @@ set_tests_properties(test_bicubic_interp_v2_op PROPERTIES TIMEOUT 120) set_tests_properties(test_gather_op PROPERTIES TIMEOUT 120) set_tests_properties(test_static_save_load PROPERTIES TIMEOUT 250) set_tests_properties(test_pylayer_op PROPERTIES TIMEOUT 120) +set_tests_properties(test_paddle_save_load_binary PROPERTIES TIMEOUT 120) if (WIN32) set_tests_properties(test_static_save_load_large PROPERTIES TIMEOUT 900) set_tests_properties(test_paddle_save_load PROPERTIES TIMEOUT 250) diff --git a/python/paddle/fluid/tests/unittests/test_paddle_save_load_binary.py b/python/paddle/fluid/tests/unittests/test_paddle_save_load_binary.py new file mode 100644 index 00000000000..8b508d5c9ae --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_paddle_save_load_binary.py @@ -0,0 +1,217 @@ +# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import unittest +import numpy as np +import os +import sys +import six + +import paddle +import paddle.nn as nn +import paddle.optimizer as opt +import paddle.fluid as fluid +from paddle.fluid.optimizer import Adam +import paddle.fluid.framework as framework +from test_imperative_base import new_program_scope + +IMAGE_SIZE = 784 + + +class TestSaveLoadBinaryFormat(unittest.TestCase): + def setUp(self): + # enable static graph mode + paddle.enable_static() + + def set_zero(self, prog, place, scope=None): + if scope is None: + scope = fluid.global_scope() + for var in prog.list_vars(): + if isinstance(var, framework.Parameter) or var.persistable: + ten = scope.find_var(var.name).get_tensor() + if ten is not None: + ten.set(np.zeros_like(np.array(ten)), place) + new_t = np.array(scope.find_var(var.name).get_tensor()) + self.assertTrue(np.sum(np.abs(new_t)) == 0) + + def replace_save_vars(self, program, dirname): + def predicate(var): + return var.persistable + + vars = filter(predicate, program.list_vars()) + for var in vars: + paddle.save( + var.get_value(), + os.path.join(dirname, var.name), + use_binary_format=True) + + def replace_load_vars(self, program, dirname): + def predicate(var): + return var.persistable + + var_list = list(filter(predicate, program.list_vars())) + for var in var_list: + var_load = paddle.load(os.path.join(dirname, var.name)) + # set var_load to scope + var.set_value(var_load) + + def test_replace_save_load_vars(self): + paddle.enable_static() + with new_program_scope(): + # create network + x = paddle.static.data( + name="x", shape=[None, IMAGE_SIZE], dtype='float32') + z = paddle.static.nn.fc(x, 10, bias_attr=False) + z = paddle.static.nn.fc(z, 128, bias_attr=False) + loss = fluid.layers.reduce_mean(z) + place = fluid.CPUPlace( + ) if not paddle.fluid.core.is_compiled_with_cuda( + ) else fluid.CUDAPlace(0) + exe = paddle.static.Executor(place) + exe.run(paddle.static.default_startup_program()) + prog = paddle.static.default_main_program() + base_map = {} + for var in prog.list_vars(): + if isinstance(var, framework.Parameter) or var.persistable: + t = np.array(fluid.global_scope().find_var(var.name) + .get_tensor()) + # make sure all the paramerter or optimizer var have been update + self.assertTrue(np.sum(np.abs(t)) != 0) + base_map[var.name] = t + # test for replace_save_vars/io.load_vars + path_vars1 = 'test_replace_save_load_vars_binary1/model' + self.replace_save_vars(prog, path_vars1) + # set var to zero + self.set_zero(prog, place) + var_list = list( + filter(lambda var: var.persistable, prog.list_vars())) + fluid.io.load_vars( + exe, path_vars1, main_program=prog, vars=var_list) + + for var in prog.list_vars(): + if var.persistable: + new_t = np.array(fluid.global_scope().find_var(var.name) + .get_tensor()) + base_t = base_map[var.name] + + self.assertTrue(np.array_equal(new_t, base_t)) + # test for io.save_vars/replace_load_vars + path_vars2 = 'test_replace_save_load_vars_binary2/model/' + fluid.io.save_vars( + exe, path_vars2, main_program=prog, vars=var_list) + self.set_zero(prog, place) + self.replace_load_vars(prog, path_vars2) + for var in prog.list_vars(): + if var.persistable: + new_t = np.array(fluid.global_scope().find_var(var.name) + .get_tensor()) + base_t = base_map[var.name] + + self.assertTrue(np.array_equal(new_t, base_t)) + + def test_save_load_lod_tensor(self): + paddle.enable_static() + OUTPUT_NUM = 32 + with new_program_scope(): + x = fluid.data(name="x", shape=[None, IMAGE_SIZE], dtype='float32') + y = fluid.layers.fc( + x, + OUTPUT_NUM, + name='fc_vars', ) + prog = fluid.default_main_program() + place = fluid.CPUPlace( + ) if not paddle.fluid.core.is_compiled_with_cuda( + ) else fluid.CUDAPlace(0) + exe = fluid.Executor(place) + prog = paddle.static.default_main_program() + exe.run(fluid.default_startup_program()) + + dirname = 'test_save_load_lod_tensor1/tensor_' + for var in prog.list_vars(): + if var.persistable and list( + var.shape) == [IMAGE_SIZE, OUTPUT_NUM]: + tensor = var.get_value() + paddle.save( + tensor, dirname + 'fc_vars.w_0', use_binary_format=True) + break + + origin = np.array(var.get_value()) + var.set_value(np.zeros_like(origin)) + is_zeros = np.array(var.get_value()) + + loaded_tensor = paddle.load(dirname + 'fc_vars.w_0') + self.assertTrue(isinstance(loaded_tensor, fluid.core.LoDTensor)) + self.assertTrue( + list(loaded_tensor.shape()) == [IMAGE_SIZE, OUTPUT_NUM]) + to_array = np.array(loaded_tensor) + self.assertTrue(np.array_equal(origin, to_array)) + + with self.assertRaises(NotImplementedError): + path = 'test_save_load_error/temp' + paddle.save({}, path, use_binary_format=True) + + with self.assertRaises(ValueError): + path = 'test_save_load_error/temp' + with open(path, "w") as f: + f.write('\0') + paddle.load(path) + + with self.assertRaises(ValueError): + temp_lod = fluid.core.LoDTensor() + paddle.save(temp_lod, path, use_binary_format=True) + + with self.assertRaises(RuntimeError): + fluid.core._save_lod_tensor( + temp_lod, 'test_save_load_error_not_exist_file/not_exist_file') + + with self.assertRaises(RuntimeError): + fluid.core._load_lod_tensor( + temp_lod, 'test_save_load_error_not_exist_file/not_exist_file') + + def test_save_load_selected_rows(self): + paddle.enable_static() + place = fluid.CPUPlace() if not paddle.fluid.core.is_compiled_with_cuda( + ) else fluid.CUDAPlace(0) + height = 10 + rows = [0, 4, 7] + row_numel = 12 + selected_rows = fluid.core.SelectedRows(rows, height) + path = 'test_paddle_save_load_selected_rows/sr.pdsr' + + with self.assertRaises(ValueError): + paddle.save(selected_rows, path, use_binary_format=True) + + np_array = np.random.randn(len(rows), row_numel).astype("float32") + tensor = selected_rows.get_tensor() + tensor.set(np_array, place) + + paddle.save(selected_rows, path, use_binary_format=True) + load_sr = paddle.load(path) + + self.assertTrue(isinstance(load_sr, fluid.core.SelectedRows)) + self.assertTrue(list(load_sr.rows()) == rows) + self.assertTrue(load_sr.height() == height) + self.assertTrue( + np.array_equal(np.array(load_sr.get_tensor()), np_array)) + + with self.assertRaises(RuntimeError): + fluid.core._save_selected_rows( + selected_rows, + 'test_paddle_save_load_selected_rows_not_exist_file/temp') + with self.assertRaises(RuntimeError): + fluid.core._load_selected_rows( + selected_rows, + 'test_paddle_save_load_selected_rows_not_exist_file/temp') diff --git a/python/paddle/framework/io.py b/python/paddle/framework/io.py index 4df84c12ad9..8f2f56a6628 100644 --- a/python/paddle/framework/io.py +++ b/python/paddle/framework/io.py @@ -348,6 +348,48 @@ def _ndarray_to_tensor(obj, return_numpy): return _to_LodTensor(obj) +def _save_lod_tensor(tensor, file_name): + if not tensor._is_initialized(): + raise ValueError("The saved tensor is not initialized.") + _seek = core._save_lod_tensor(tensor, file_name) + # '_seek' is the end position of this tensor in the file. + return _seek + + +def _load_lod_tensor(file_name): + temp_t = paddle.fluid.core.LoDTensor() + # '_seek' is the end position of this tensor in the file. + _seek = paddle.fluid.core._load_lod_tensor(temp_t, file_name) + return temp_t, _seek + + +def _save_selected_rows(selected_rows, file_name): + # '_seek' is the end position of this SelectedRows in the file. + if not selected_rows.get_tensor()._is_initialized(): + raise ValueError("The saved tensor is not initialized.") + _seek = core._save_selected_rows(selected_rows, file_name) + return _seek + + +def _load_selected_rows(file_name): + temp_sr = core.SelectedRows() + # '_seek' is the end position of this SelectedRows in the file. + _seek = core._load_selected_rows(temp_sr, file_name) + return temp_sr, _seek + + +def _save_binary_var(obj, path): + if isinstance(obj, core.LoDTensor): + _save_lod_tensor(obj, path) + elif isinstance(obj, core.SelectedRows): + _save_selected_rows(obj, path) + else: + # Since the concept of 'Tensor' is only exposed to users, the error message can only contain tensor instead of 'LoDTensor' or 'SelectedRows' + raise NotImplementedError( + "When use_binary_format = True, `paddle.save` expected Tensor, but received {}.". + format(type(obj))) + + def save(obj, path, protocol=2, **configs): ''' Save an object to the specified path. @@ -447,25 +489,41 @@ def save(obj, path, protocol=2, **configs): "Type of `use_binary_format` should be bool, but received {}.". format(type(config.use_binary_format))) - # `protocol` need to be used, `pickle_protocol` is a deprecated arg. - if config.pickle_protocol is not None: - protocol = config.pickle_protocol - warnings.warn( - "'pickle_protocol' is a deprecated argument. Please use 'protocol' instead." - ) - if isinstance(obj, Program): - obj.desc.flush() - with open(path, "wb") as f: - f.write(obj.desc.serialize_to_string()) - elif _use_legacy(obj): - if in_dygraph_mode(): - _legacy_save(obj, path, protocol) - else: - _legacy_static_save(obj, path, protocol) + if config.use_binary_format: + _save_binary_var(obj, path) else: - # save single variable - with open(path, 'wb') as f: - _pickle_save(obj, f, protocol) + # `protocol` need to be used, `pickle_protocol` is a deprecated arg. + if config.pickle_protocol is not None: + protocol = config.pickle_protocol + warnings.warn( + "'pickle_protocol' is a deprecated argument. Please use 'protocol' instead." + ) + if isinstance(obj, Program): + obj.desc.flush() + with open(path, "wb") as f: + f.write(obj.desc.serialize_to_string()) + elif _use_legacy(obj): + if in_dygraph_mode(): + _legacy_save(obj, path, protocol) + else: + _legacy_static_save(obj, path, protocol) + else: + # `protocol` need to be used, `pickle_protocol` is a deprecated arg. + if config.pickle_protocol is not None: + protocol = config.pickle_protocol + warnings.warn( + "'pickle_protocol' is a deprecated argument. Please use 'protocol' instead." + ) + + if _use_legacy(obj): + if in_dygraph_mode(): + _legacy_save(obj, path, protocol) + else: + _legacy_static_save(obj, path, protocol) + else: + # save single variable + with open(path, 'wb') as f: + _pickle_save(obj, f, protocol) def _legacy_save(obj, path, protocol=2): @@ -675,11 +733,26 @@ def load(path, **configs): raise NotImplementedError( 'Only support tensor and state_dict, but received {}.'. format(type(load_result))) - except exception_type: - with open(path, "rb") as f: - program_desc_str = f.read() - program = Program.parse_from_string(program_desc_str) - return program + + except exception_type as msg_pickle: + try: + tensor, _ = _load_selected_rows(path) + return tensor + except: + try: + tensor, _ = _load_lod_tensor(path) + return tensor + except: + try: + with open(path, "rb") as f: + program_desc_str = f.read() + program = Program.parse_from_string( + program_desc_str) + return program + except: + raise ValueError( + "`paddle.load` can not parse the file:{}.".format( + path)) else: load_result = _legacy_load(path, **configs) -- GitLab