提交 8f537354 编写于 作者: L Leo Chen 提交者: Zeng Jinle

Fix memory overwriting of tensors returned by executor (#19030)

* fix memory overlapping of fetch var (return of executor.run), test=develop

* fix wrong usage of ParallelExecutor in op_test, test=develop

* remove useless parameter and simplify code

* avoid tensor destruct untimely, test=develop

* add testcase independent of OpTest, test=develop
上级 95ff4fba
...@@ -683,8 +683,8 @@ void ParallelExecutor::BCastParamsToDevices( ...@@ -683,8 +683,8 @@ void ParallelExecutor::BCastParamsToDevices(
} }
} }
void ParallelExecutor::Run(const std::vector<std::string> &fetch_tensors, FeedFetchList ParallelExecutor::Run(
const std::string &fetched_var_name) { const std::vector<std::string> &fetch_tensors) {
VLOG(3) << "enter ParallelExecutor Run"; VLOG(3) << "enter ParallelExecutor Run";
#ifdef WITH_GPERFTOOLS #ifdef WITH_GPERFTOOLS
if (gProfileStarted) { if (gProfileStarted) {
...@@ -699,8 +699,7 @@ void ParallelExecutor::Run(const std::vector<std::string> &fetch_tensors, ...@@ -699,8 +699,7 @@ void ParallelExecutor::Run(const std::vector<std::string> &fetch_tensors,
VLOG(3) << "ParallelExecutor begin to run member_->executor_->Run"; VLOG(3) << "ParallelExecutor begin to run member_->executor_->Run";
auto fetch_data = member_->executor_->Run(fetch_tensors); auto fetch_data = member_->executor_->Run(fetch_tensors);
*member_->global_scope_->Var(fetched_var_name)->GetMutable<FeedFetchList>() = return fetch_data;
fetch_data;
} }
void ParallelExecutor::FeedTensorsIntoLocalScopes( void ParallelExecutor::FeedTensorsIntoLocalScopes(
......
...@@ -25,6 +25,7 @@ limitations under the License. */ ...@@ -25,6 +25,7 @@ limitations under the License. */
#include "paddle/fluid/framework/details/execution_strategy.h" #include "paddle/fluid/framework/details/execution_strategy.h"
#include "paddle/fluid/framework/details/op_handle_base.h" #include "paddle/fluid/framework/details/op_handle_base.h"
#include "paddle/fluid/framework/executor.h" #include "paddle/fluid/framework/executor.h"
#include "paddle/fluid/framework/feed_fetch_type.h"
#include "paddle/fluid/framework/op_info.h" #include "paddle/fluid/framework/op_info.h"
#include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/scope.h"
...@@ -74,8 +75,7 @@ class ParallelExecutor { ...@@ -74,8 +75,7 @@ class ParallelExecutor {
void FeedAndSplitTensorIntoLocalScopes( void FeedAndSplitTensorIntoLocalScopes(
const std::unordered_map<std::string, LoDTensor> &tensors); const std::unordered_map<std::string, LoDTensor> &tensors);
void Run(const std::vector<std::string> &fetch_tensors, FeedFetchList Run(const std::vector<std::string> &fetch_tensors);
const std::string &fetched_var_name);
private: private:
// broadcast the parameters from the 0th device. // broadcast the parameters from the 0th device.
......
...@@ -1068,7 +1068,17 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -1068,7 +1068,17 @@ All parameter, weight, gradient are variables in Paddle.
t = fluid.LoDTensor() t = fluid.LoDTensor()
t.set(np.ndarray([5, 30]), fluid.CPUPlace()) t.set(np.ndarray([5, 30]), fluid.CPUPlace())
arr.append(t) arr.append(t)
)DOC"); )DOC")
.def("_move_to_list",
[](LoDTensorArray &self) -> py::list {
py::list res(self.size());
for (size_t i = 0; i < self.size(); ++i) {
res[i] = py::cast(std::move(self[i]));
}
self.clear();
return res;
},
py::return_value_policy::take_ownership);
m.def("IsInplace", m.def("IsInplace",
[](std::string op) -> bool { return operators::IsInplace(op); }); [](std::string op) -> bool { return operators::IsInplace(op); });
...@@ -1650,10 +1660,9 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -1650,10 +1660,9 @@ All parameter, weight, gradient are variables in Paddle.
.def("feed_and_split_tensor_into_local_scopes", .def("feed_and_split_tensor_into_local_scopes",
&ParallelExecutor::FeedAndSplitTensorIntoLocalScopes) &ParallelExecutor::FeedAndSplitTensorIntoLocalScopes)
.def("run", [](ParallelExecutor &self, .def("run", [](ParallelExecutor &self,
const std::vector<std::string> &fetch_tensors, const std::vector<std::string> &fetch_tensors) {
const std::string &fetched_var_name) {
pybind11::gil_scoped_release release; pybind11::gil_scoped_release release;
self.Run(fetch_tensors, fetched_var_name); return self.Run(fetch_tensors);
}); });
BindRecordIOWriter(&m); BindRecordIOWriter(&m);
......
...@@ -527,12 +527,12 @@ class Executor(object): ...@@ -527,12 +527,12 @@ class Executor(object):
exe.feed_tensors_into_local_scopes(res) exe.feed_tensors_into_local_scopes(res)
fetch_var_names = list(map(_to_name_str, fetch_list)) fetch_var_names = list(map(_to_name_str, fetch_list))
exe.run(fetch_var_names, fetch_var_name) tensors = exe.run(fetch_var_names)._move_to_list()
arr = scope.find_var(fetch_var_name).get_lod_tensor_array()
if return_numpy: if return_numpy:
return as_numpy(arr) return as_numpy(tensors)
return [arr[i] for i in range(len(arr))] else:
return tensors
def run(self, def run(self,
program=None, program=None,
...@@ -745,10 +745,12 @@ class Executor(object): ...@@ -745,10 +745,12 @@ class Executor(object):
exe.run(program.desc, scope, 0, True, True, fetch_var_name) exe.run(program.desc, scope, 0, True, True, fetch_var_name)
else: else:
exe.run_cached_prepared_ctx(ctx, scope, False, False, False) exe.run_cached_prepared_ctx(ctx, scope, False, False, False)
outs = self._fetch_data(fetch_list, fetch_var_name, scope) arr = scope.find_var(fetch_var_name).get_lod_tensor_array()
tensors = arr._move_to_list()
if return_numpy: if return_numpy:
outs = as_numpy(outs) return as_numpy(tensors)
return outs else:
return tensors
def _run_inference(self, exe, feed): def _run_inference(self, exe, feed):
return exe.run(feed) return exe.run(feed)
......
...@@ -327,19 +327,11 @@ class OpTest(unittest.TestCase): ...@@ -327,19 +327,11 @@ class OpTest(unittest.TestCase):
if parallel: if parallel:
use_cuda = False use_cuda = False
if isinstance(place, fluid.CUDAPlace(0)): if isinstance(place, fluid.CUDAPlace):
use_cuda = True use_cuda = True
if loss: compiled_prog = fluid.CompiledProgram(program).with_data_parallel(
executor = fluid.ParallelExecutor( loss_name=loss.name if loss else None, places=place)
use_cuda=use_cuda, program = compiled_prog
loss_name=loss.name,
main_program=program)
else:
executor = fluid.ParallelExecutor(
use_cuda=use_cuda, main_program=program)
else:
executor = Executor(place)
fetch_list = getattr(self, "fetch_list", []) fetch_list = getattr(self, "fetch_list", [])
# if the fetch_list is customized by user, we use it directly. # if the fetch_list is customized by user, we use it directly.
# if not, fill the fetch_list by the user configured outputs in test. # if not, fill the fetch_list by the user configured outputs in test.
...@@ -359,6 +351,7 @@ class OpTest(unittest.TestCase): ...@@ -359,6 +351,7 @@ class OpTest(unittest.TestCase):
# fetch_list = map(block.var, fetch_list) # fetch_list = map(block.var, fetch_list)
if not isinstance(fetch_list[0], fluid.framework.Variable): if not isinstance(fetch_list[0], fluid.framework.Variable):
fetch_list = list(map(block.var, fetch_list)) fetch_list = list(map(block.var, fetch_list))
executor = Executor(place)
outs = executor.run(program, outs = executor.run(program,
feed=feed_map, feed=feed_map,
fetch_list=fetch_list, fetch_list=fetch_list,
...@@ -657,12 +650,12 @@ class OpTest(unittest.TestCase): ...@@ -657,12 +650,12 @@ class OpTest(unittest.TestCase):
fetch_list = [g for p, g in param_grad_list] fetch_list = [g for p, g in param_grad_list]
if parallel: if parallel:
use_cuda = False use_cuda = False
if isinstance(place, fluid.CUDAPlace(0)): if isinstance(place, fluid.CUDAPlace):
use_cuda = True use_cuda = True
executor = fluid.ParallelExecutor( compiled_prog = fluid.CompiledProgram(prog).with_data_parallel(
use_cuda=use_cuda, loss_name=loss.name, main_program=prog) loss_name=loss.name, places=place)
else: prog = compiled_prog
executor = Executor(place) executor = fluid.Executor(place)
return list( return list(
map(np.array, map(np.array,
executor.run(prog, feed_dict, fetch_list, return_numpy=False))) executor.run(prog, feed_dict, fetch_list, return_numpy=False)))
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import unittest
import numpy as np
import paddle.fluid.core as core
import paddle.fluid as fluid
from op_test import OpTest
class TestExecutorReturnTensorNotOverwritingWithOptest(OpTest):
def setUp(self):
pass
def calc_add_out(self, place=None, parallel=None):
self.x = np.random.random((2, 5)).astype(np.float32)
self.y = np.random.random((2, 5)).astype(np.float32)
self.out = np.add(self.x, self.y)
self.inputs = {
'X': OpTest.np_dtype_to_fluid_dtype(self.x),
'Y': OpTest.np_dtype_to_fluid_dtype(self.y)
}
self.outputs = {'Out': self.out}
self.op_type = "elementwise_add"
self.dtype = np.float32
outs, fetch_list = self._calc_output(place, parallel=parallel)
return outs
def calc_mul_out(self, place=None, parallel=None):
self.x = np.random.random((2, 5)).astype(np.float32)
self.y = np.random.random((5, 2)).astype(np.float32)
self.out = np.dot(self.x, self.y)
self.inputs = {
'X': OpTest.np_dtype_to_fluid_dtype(self.x),
'Y': OpTest.np_dtype_to_fluid_dtype(self.y)
}
self.outputs = {'Out': self.out}
self.op_type = "elementwise_mul"
self.dtype = np.float32
outs, fetch_list = self._calc_output(place, parallel=parallel)
return outs
def test_executor_run_twice(self):
places = [fluid.CPUPlace()]
if fluid.is_compiled_with_cuda():
places.append(fluid.CUDAPlace(0))
for place in places:
for parallel in [True, False]:
add_out = self.calc_add_out(place, parallel)
add_out1 = np.array(add_out[0])
mul_out = self.calc_mul_out(place, parallel)
add_out2 = np.array(add_out[0])
self.assertTrue(np.array_equal(add_out1, add_out2))
class TestExecutorReturnTensorNotOverOverwritingWithLayers(unittest.TestCase):
def setUp(self):
pass
def calc_add_out(self, place=None, parallel=None):
x = fluid.layers.ones(shape=[3, 3], dtype='float32')
y = fluid.layers.ones(shape=[3, 3], dtype='float32')
out = fluid.layers.elementwise_add(x=x, y=y)
program = fluid.default_main_program()
if parallel:
program = fluid.CompiledProgram(program).with_data_parallel(
places=place)
exe = fluid.Executor(place)
out = exe.run(program, fetch_list=[out], return_numpy=False)
return out
def calc_sub_out(self, place=None, parallel=None):
x = fluid.layers.ones(shape=[2, 2], dtype='float32')
y = fluid.layers.ones(shape=[2, 2], dtype='float32')
out = fluid.layers.elementwise_sub(x=x, y=y)
program = fluid.default_main_program()
if parallel:
program = fluid.CompiledProgram(program).with_data_parallel(
places=place)
exe = fluid.Executor(place)
out = exe.run(program, fetch_list=[out], return_numpy=False)
return out
def test_executor_run_twice(self):
places = [fluid.CPUPlace()]
if fluid.is_compiled_with_cuda():
places.append(fluid.CUDAPlace(0))
for place in places:
for parallel in [True, False]:
add_out = self.calc_add_out(place, parallel)
add_out1 = np.array(add_out[0])
sub_out = self.calc_sub_out(place, parallel)
add_out2 = np.array(add_out[0])
self.assertTrue(np.array_equal(add_out1, add_out2))
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册