未验证 提交 f4d9adc7 编写于 作者: W WeiXin 提交者: GitHub

support save/load binary format tensor. (#32211)

* support save/load binary format tensor

* Fix error when create cudaplace

* Fix error when create cudaplace

* Fix error when create cudaplace

* get devive context from pool.

* move define of 'SerializeToStream' and 'DeserializeFromStream' to 'lod_tensor.cc' and 'selected_rows.cc'.

* improve coverage.

* improve coverage.

* polish API

* deal with conflict

* disable save/load large file in unnittest

* split unnittest.
上级 e58c705b
...@@ -268,6 +268,21 @@ void SerializeToStream(std::ostream &os, const LoDTensor &tensor, ...@@ -268,6 +268,21 @@ void SerializeToStream(std::ostream &os, const LoDTensor &tensor,
TensorToStream(os, static_cast<Tensor>(tensor), dev_ctx); TensorToStream(os, static_cast<Tensor>(tensor), dev_ctx);
} }
void SerializeToStream(std::ostream &os, const LoDTensor &tensor) {
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
const platform::DeviceContext *dev_ctx;
auto place = tensor.place();
dev_ctx = pool.Get(place);
SerializeToStream(os, tensor, *dev_ctx);
}
void DeserializeFromStream(std::ifstream &os, LoDTensor *tensor) {
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
const platform::DeviceContext *dev_ctx;
dev_ctx = pool.Get(platform::CPUPlace());
DeserializeFromStream(os, tensor, *dev_ctx);
}
void DeserializeFromStream(std::istream &is, LoDTensor *tensor, void DeserializeFromStream(std::istream &is, LoDTensor *tensor,
const platform::DeviceContext &dev_ctx, const platform::DeviceContext &dev_ctx,
const size_t &seek, const size_t &seek,
......
...@@ -255,5 +255,9 @@ LoD ConvertToLengthBasedLoD(const LoD& offset_lod); ...@@ -255,5 +255,9 @@ LoD ConvertToLengthBasedLoD(const LoD& offset_lod);
LoD ConvertToOffsetBasedLoD(const LoD& length_lod); LoD ConvertToOffsetBasedLoD(const LoD& length_lod);
void SerializeToStream(std::ostream& os, const LoDTensor& tensor);
void DeserializeFromStream(std::ifstream& os, LoDTensor* tensor);
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
...@@ -113,6 +113,21 @@ void SerializeToStream(std::ostream& os, const SelectedRows& selected_rows, ...@@ -113,6 +113,21 @@ void SerializeToStream(std::ostream& os, const SelectedRows& selected_rows,
TensorToStream(os, selected_rows.value(), dev_ctx); TensorToStream(os, selected_rows.value(), dev_ctx);
} }
void SerializeToStream(std::ostream& os, const SelectedRows& selected_rows) {
platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
const platform::DeviceContext* dev_ctx;
auto place = selected_rows.place();
dev_ctx = pool.Get(place);
SerializeToStream(os, selected_rows, *dev_ctx);
}
void DeserializeFromStream(std::ifstream& os, SelectedRows* selected_rows) {
platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
const platform::DeviceContext* dev_ctx;
dev_ctx = pool.Get(platform::CPUPlace());
DeserializeFromStream(os, selected_rows, *dev_ctx);
}
void DeserializeFromStream(std::istream& is, SelectedRows* selected_rows, void DeserializeFromStream(std::istream& is, SelectedRows* selected_rows,
const platform::DeviceContext& dev_ctx) { const platform::DeviceContext& dev_ctx) {
{ {
......
...@@ -173,5 +173,9 @@ void SerializeToStream(std::ostream& os, const SelectedRows& selected_rows, ...@@ -173,5 +173,9 @@ void SerializeToStream(std::ostream& os, const SelectedRows& selected_rows,
void DeserializeFromStream(std::istream& is, SelectedRows* selected_rows, void DeserializeFromStream(std::istream& is, SelectedRows* selected_rows,
const platform::DeviceContext& dev_ctx); const platform::DeviceContext& dev_ctx);
void SerializeToStream(std::ostream& os, const SelectedRows& selected_rows);
void DeserializeFromStream(std::ifstream& os, SelectedRows* selected_rows);
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
...@@ -496,7 +496,56 @@ PYBIND11_MODULE(core_noavx, m) { ...@@ -496,7 +496,56 @@ PYBIND11_MODULE(core_noavx, m) {
#endif #endif
return tensor; return tensor;
}); });
m.def("_save_lod_tensor", [](const LoDTensor &tensor,
const std::string &str_file_name) {
std::ofstream fout(str_file_name, std::ios::binary);
PADDLE_ENFORCE_EQ(static_cast<bool>(fout), true,
platform::errors::Unavailable(
"Cannot open %s to save variables.", str_file_name));
SerializeToStream(fout, tensor);
int64_t tellp = fout.tellp();
fout.close();
return tellp;
});
m.def("_load_lod_tensor", [](LoDTensor &tensor,
const std::string &str_file_name) {
std::ifstream fin(str_file_name, std::ios::binary);
PADDLE_ENFORCE_EQ(static_cast<bool>(fin), true,
platform::errors::Unavailable(
"Cannot open %s to load variables.", str_file_name));
DeserializeFromStream(fin, &tensor);
int64_t tellg = fin.tellg();
fin.close();
return tellg;
});
m.def("_save_selected_rows", [](const SelectedRows &selected_rows,
const std::string &str_file_name) {
std::ofstream fout(str_file_name, std::ios::binary);
PADDLE_ENFORCE_EQ(
static_cast<bool>(fout), true,
platform::errors::Unavailable("Cannot open %s to save SelectedRows.",
str_file_name));
SerializeToStream(fout, selected_rows);
int64_t tellp = fout.tellp();
fout.close();
return tellp;
});
m.def("_load_selected_rows",
[](SelectedRows &selected_rows, const std::string &str_file_name) {
std::ifstream fin(str_file_name, std::ios::binary);
PADDLE_ENFORCE_EQ(
static_cast<bool>(fin), true,
platform::errors::Unavailable(
"Cannot open %s to load SelectedRows.", str_file_name));
DeserializeFromStream(fin, &selected_rows);
int64_t tellg = fin.tellg();
fin.close();
return tellg;
});
m.def("_save_static_dict", m.def("_save_static_dict",
[](const std::string &str_file_name, const py::handle &vec_var_list, [](const std::string &str_file_name, const py::handle &vec_var_list,
const Scope &scope) { const Scope &scope) {
......
...@@ -270,6 +270,10 @@ if avx_supported(): ...@@ -270,6 +270,10 @@ if avx_supported():
from .core_avx import _load_static_dict from .core_avx import _load_static_dict
from .core_avx import _save_dygraph_dict from .core_avx import _save_dygraph_dict
from .core_avx import _load_dygraph_dict from .core_avx import _load_dygraph_dict
from .core_avx import _save_lod_tensor
from .core_avx import _load_lod_tensor
from .core_avx import _save_selected_rows
from .core_avx import _load_selected_rows
from .core_avx import _create_loaded_parameter from .core_avx import _create_loaded_parameter
from .core_avx import _cuda_synchronize from .core_avx import _cuda_synchronize
from .core_avx import _promote_types_if_complex_exists from .core_avx import _promote_types_if_complex_exists
...@@ -325,6 +329,10 @@ if load_noavx: ...@@ -325,6 +329,10 @@ if load_noavx:
from .core_noavx import _load_static_dict from .core_noavx import _load_static_dict
from .core_noavx import _save_dygraph_dict from .core_noavx import _save_dygraph_dict
from .core_noavx import _load_dygraph_dict from .core_noavx import _load_dygraph_dict
from .core_noavx import _save_lod_tensor
from .core_noavx import _load_lod_tensor
from .core_noavx import _save_selected_rows
from .core_noavx import _load_selected_rows
from .core_noavx import _create_loaded_parameter from .core_noavx import _create_loaded_parameter
from .core_noavx import _cuda_synchronize from .core_noavx import _cuda_synchronize
from .core_noavx import _promote_types_if_complex_exists from .core_noavx import _promote_types_if_complex_exists
......
...@@ -741,6 +741,7 @@ set_tests_properties(test_bicubic_interp_v2_op PROPERTIES TIMEOUT 120) ...@@ -741,6 +741,7 @@ set_tests_properties(test_bicubic_interp_v2_op PROPERTIES TIMEOUT 120)
set_tests_properties(test_gather_op PROPERTIES TIMEOUT 120) set_tests_properties(test_gather_op PROPERTIES TIMEOUT 120)
set_tests_properties(test_static_save_load PROPERTIES TIMEOUT 250) set_tests_properties(test_static_save_load PROPERTIES TIMEOUT 250)
set_tests_properties(test_pylayer_op PROPERTIES TIMEOUT 120) set_tests_properties(test_pylayer_op PROPERTIES TIMEOUT 120)
set_tests_properties(test_paddle_save_load_binary PROPERTIES TIMEOUT 120)
if (WIN32) if (WIN32)
set_tests_properties(test_static_save_load_large PROPERTIES TIMEOUT 900) set_tests_properties(test_static_save_load_large PROPERTIES TIMEOUT 900)
set_tests_properties(test_paddle_save_load PROPERTIES TIMEOUT 250) set_tests_properties(test_paddle_save_load PROPERTIES TIMEOUT 250)
......
# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import unittest
import numpy as np
import os
import sys
import six
import paddle
import paddle.nn as nn
import paddle.optimizer as opt
import paddle.fluid as fluid
from paddle.fluid.optimizer import Adam
import paddle.fluid.framework as framework
from test_imperative_base import new_program_scope
IMAGE_SIZE = 784
class TestSaveLoadBinaryFormat(unittest.TestCase):
def setUp(self):
# enable static graph mode
paddle.enable_static()
def set_zero(self, prog, place, scope=None):
if scope is None:
scope = fluid.global_scope()
for var in prog.list_vars():
if isinstance(var, framework.Parameter) or var.persistable:
ten = scope.find_var(var.name).get_tensor()
if ten is not None:
ten.set(np.zeros_like(np.array(ten)), place)
new_t = np.array(scope.find_var(var.name).get_tensor())
self.assertTrue(np.sum(np.abs(new_t)) == 0)
def replace_save_vars(self, program, dirname):
def predicate(var):
return var.persistable
vars = filter(predicate, program.list_vars())
for var in vars:
paddle.save(
var.get_value(),
os.path.join(dirname, var.name),
use_binary_format=True)
def replace_load_vars(self, program, dirname):
def predicate(var):
return var.persistable
var_list = list(filter(predicate, program.list_vars()))
for var in var_list:
var_load = paddle.load(os.path.join(dirname, var.name))
# set var_load to scope
var.set_value(var_load)
def test_replace_save_load_vars(self):
paddle.enable_static()
with new_program_scope():
# create network
x = paddle.static.data(
name="x", shape=[None, IMAGE_SIZE], dtype='float32')
z = paddle.static.nn.fc(x, 10, bias_attr=False)
z = paddle.static.nn.fc(z, 128, bias_attr=False)
loss = fluid.layers.reduce_mean(z)
place = fluid.CPUPlace(
) if not paddle.fluid.core.is_compiled_with_cuda(
) else fluid.CUDAPlace(0)
exe = paddle.static.Executor(place)
exe.run(paddle.static.default_startup_program())
prog = paddle.static.default_main_program()
base_map = {}
for var in prog.list_vars():
if isinstance(var, framework.Parameter) or var.persistable:
t = np.array(fluid.global_scope().find_var(var.name)
.get_tensor())
# make sure all the paramerter or optimizer var have been update
self.assertTrue(np.sum(np.abs(t)) != 0)
base_map[var.name] = t
# test for replace_save_vars/io.load_vars
path_vars1 = 'test_replace_save_load_vars_binary1/model'
self.replace_save_vars(prog, path_vars1)
# set var to zero
self.set_zero(prog, place)
var_list = list(
filter(lambda var: var.persistable, prog.list_vars()))
fluid.io.load_vars(
exe, path_vars1, main_program=prog, vars=var_list)
for var in prog.list_vars():
if var.persistable:
new_t = np.array(fluid.global_scope().find_var(var.name)
.get_tensor())
base_t = base_map[var.name]
self.assertTrue(np.array_equal(new_t, base_t))
# test for io.save_vars/replace_load_vars
path_vars2 = 'test_replace_save_load_vars_binary2/model/'
fluid.io.save_vars(
exe, path_vars2, main_program=prog, vars=var_list)
self.set_zero(prog, place)
self.replace_load_vars(prog, path_vars2)
for var in prog.list_vars():
if var.persistable:
new_t = np.array(fluid.global_scope().find_var(var.name)
.get_tensor())
base_t = base_map[var.name]
self.assertTrue(np.array_equal(new_t, base_t))
def test_save_load_lod_tensor(self):
paddle.enable_static()
OUTPUT_NUM = 32
with new_program_scope():
x = fluid.data(name="x", shape=[None, IMAGE_SIZE], dtype='float32')
y = fluid.layers.fc(
x,
OUTPUT_NUM,
name='fc_vars', )
prog = fluid.default_main_program()
place = fluid.CPUPlace(
) if not paddle.fluid.core.is_compiled_with_cuda(
) else fluid.CUDAPlace(0)
exe = fluid.Executor(place)
prog = paddle.static.default_main_program()
exe.run(fluid.default_startup_program())
dirname = 'test_save_load_lod_tensor1/tensor_'
for var in prog.list_vars():
if var.persistable and list(
var.shape) == [IMAGE_SIZE, OUTPUT_NUM]:
tensor = var.get_value()
paddle.save(
tensor, dirname + 'fc_vars.w_0', use_binary_format=True)
break
origin = np.array(var.get_value())
var.set_value(np.zeros_like(origin))
is_zeros = np.array(var.get_value())
loaded_tensor = paddle.load(dirname + 'fc_vars.w_0')
self.assertTrue(isinstance(loaded_tensor, fluid.core.LoDTensor))
self.assertTrue(
list(loaded_tensor.shape()) == [IMAGE_SIZE, OUTPUT_NUM])
to_array = np.array(loaded_tensor)
self.assertTrue(np.array_equal(origin, to_array))
with self.assertRaises(NotImplementedError):
path = 'test_save_load_error/temp'
paddle.save({}, path, use_binary_format=True)
with self.assertRaises(ValueError):
path = 'test_save_load_error/temp'
with open(path, "w") as f:
f.write('\0')
paddle.load(path)
with self.assertRaises(ValueError):
temp_lod = fluid.core.LoDTensor()
paddle.save(temp_lod, path, use_binary_format=True)
with self.assertRaises(RuntimeError):
fluid.core._save_lod_tensor(
temp_lod, 'test_save_load_error_not_exist_file/not_exist_file')
with self.assertRaises(RuntimeError):
fluid.core._load_lod_tensor(
temp_lod, 'test_save_load_error_not_exist_file/not_exist_file')
def test_save_load_selected_rows(self):
paddle.enable_static()
place = fluid.CPUPlace() if not paddle.fluid.core.is_compiled_with_cuda(
) else fluid.CUDAPlace(0)
height = 10
rows = [0, 4, 7]
row_numel = 12
selected_rows = fluid.core.SelectedRows(rows, height)
path = 'test_paddle_save_load_selected_rows/sr.pdsr'
with self.assertRaises(ValueError):
paddle.save(selected_rows, path, use_binary_format=True)
np_array = np.random.randn(len(rows), row_numel).astype("float32")
tensor = selected_rows.get_tensor()
tensor.set(np_array, place)
paddle.save(selected_rows, path, use_binary_format=True)
load_sr = paddle.load(path)
self.assertTrue(isinstance(load_sr, fluid.core.SelectedRows))
self.assertTrue(list(load_sr.rows()) == rows)
self.assertTrue(load_sr.height() == height)
self.assertTrue(
np.array_equal(np.array(load_sr.get_tensor()), np_array))
with self.assertRaises(RuntimeError):
fluid.core._save_selected_rows(
selected_rows,
'test_paddle_save_load_selected_rows_not_exist_file/temp')
with self.assertRaises(RuntimeError):
fluid.core._load_selected_rows(
selected_rows,
'test_paddle_save_load_selected_rows_not_exist_file/temp')
...@@ -348,6 +348,48 @@ def _ndarray_to_tensor(obj, return_numpy): ...@@ -348,6 +348,48 @@ def _ndarray_to_tensor(obj, return_numpy):
return _to_LodTensor(obj) return _to_LodTensor(obj)
def _save_lod_tensor(tensor, file_name):
if not tensor._is_initialized():
raise ValueError("The saved tensor is not initialized.")
_seek = core._save_lod_tensor(tensor, file_name)
# '_seek' is the end position of this tensor in the file.
return _seek
def _load_lod_tensor(file_name):
temp_t = paddle.fluid.core.LoDTensor()
# '_seek' is the end position of this tensor in the file.
_seek = paddle.fluid.core._load_lod_tensor(temp_t, file_name)
return temp_t, _seek
def _save_selected_rows(selected_rows, file_name):
# '_seek' is the end position of this SelectedRows in the file.
if not selected_rows.get_tensor()._is_initialized():
raise ValueError("The saved tensor is not initialized.")
_seek = core._save_selected_rows(selected_rows, file_name)
return _seek
def _load_selected_rows(file_name):
temp_sr = core.SelectedRows()
# '_seek' is the end position of this SelectedRows in the file.
_seek = core._load_selected_rows(temp_sr, file_name)
return temp_sr, _seek
def _save_binary_var(obj, path):
if isinstance(obj, core.LoDTensor):
_save_lod_tensor(obj, path)
elif isinstance(obj, core.SelectedRows):
_save_selected_rows(obj, path)
else:
# Since the concept of 'Tensor' is only exposed to users, the error message can only contain tensor instead of 'LoDTensor' or 'SelectedRows'
raise NotImplementedError(
"When use_binary_format = True, `paddle.save` expected Tensor, but received {}.".
format(type(obj)))
def save(obj, path, protocol=2, **configs): def save(obj, path, protocol=2, **configs):
''' '''
Save an object to the specified path. Save an object to the specified path.
...@@ -447,6 +489,9 @@ def save(obj, path, protocol=2, **configs): ...@@ -447,6 +489,9 @@ def save(obj, path, protocol=2, **configs):
"Type of `use_binary_format` should be bool, but received {}.". "Type of `use_binary_format` should be bool, but received {}.".
format(type(config.use_binary_format))) format(type(config.use_binary_format)))
if config.use_binary_format:
_save_binary_var(obj, path)
else:
# `protocol` need to be used, `pickle_protocol` is a deprecated arg. # `protocol` need to be used, `pickle_protocol` is a deprecated arg.
if config.pickle_protocol is not None: if config.pickle_protocol is not None:
protocol = config.pickle_protocol protocol = config.pickle_protocol
...@@ -462,6 +507,19 @@ def save(obj, path, protocol=2, **configs): ...@@ -462,6 +507,19 @@ def save(obj, path, protocol=2, **configs):
_legacy_save(obj, path, protocol) _legacy_save(obj, path, protocol)
else: else:
_legacy_static_save(obj, path, protocol) _legacy_static_save(obj, path, protocol)
else:
# `protocol` need to be used, `pickle_protocol` is a deprecated arg.
if config.pickle_protocol is not None:
protocol = config.pickle_protocol
warnings.warn(
"'pickle_protocol' is a deprecated argument. Please use 'protocol' instead."
)
if _use_legacy(obj):
if in_dygraph_mode():
_legacy_save(obj, path, protocol)
else:
_legacy_static_save(obj, path, protocol)
else: else:
# save single variable # save single variable
with open(path, 'wb') as f: with open(path, 'wb') as f:
...@@ -675,11 +733,26 @@ def load(path, **configs): ...@@ -675,11 +733,26 @@ def load(path, **configs):
raise NotImplementedError( raise NotImplementedError(
'Only support tensor and state_dict, but received {}.'. 'Only support tensor and state_dict, but received {}.'.
format(type(load_result))) format(type(load_result)))
except exception_type:
except exception_type as msg_pickle:
try:
tensor, _ = _load_selected_rows(path)
return tensor
except:
try:
tensor, _ = _load_lod_tensor(path)
return tensor
except:
try:
with open(path, "rb") as f: with open(path, "rb") as f:
program_desc_str = f.read() program_desc_str = f.read()
program = Program.parse_from_string(program_desc_str) program = Program.parse_from_string(
program_desc_str)
return program return program
except:
raise ValueError(
"`paddle.load` can not parse the file:{}.".format(
path))
else: else:
load_result = _legacy_load(path, **configs) load_result = _legacy_load(path, **configs)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册