diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 815042c7419395178b45133b00211646acc82b06..9e2e54ade6e891e79744fb6909b052ec521f7eb0 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -683,8 +683,8 @@ void ParallelExecutor::BCastParamsToDevices( } } -void ParallelExecutor::Run(const std::vector &fetch_tensors, - const std::string &fetched_var_name) { +FeedFetchList ParallelExecutor::Run( + const std::vector &fetch_tensors) { VLOG(3) << "enter ParallelExecutor Run"; #ifdef WITH_GPERFTOOLS if (gProfileStarted) { @@ -699,8 +699,7 @@ void ParallelExecutor::Run(const std::vector &fetch_tensors, VLOG(3) << "ParallelExecutor begin to run member_->executor_->Run"; auto fetch_data = member_->executor_->Run(fetch_tensors); - *member_->global_scope_->Var(fetched_var_name)->GetMutable() = - fetch_data; + return fetch_data; } void ParallelExecutor::FeedTensorsIntoLocalScopes( diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index 1ac800c9596b174d5d1187802265a766fdd32e74..00ac5e134db91836d499cac765d606a19fe0f954 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -25,6 +25,7 @@ limitations under the License. */ #include "paddle/fluid/framework/details/execution_strategy.h" #include "paddle/fluid/framework/details/op_handle_base.h" #include "paddle/fluid/framework/executor.h" +#include "paddle/fluid/framework/feed_fetch_type.h" #include "paddle/fluid/framework/op_info.h" #include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/scope.h" @@ -74,8 +75,7 @@ class ParallelExecutor { void FeedAndSplitTensorIntoLocalScopes( const std::unordered_map &tensors); - void Run(const std::vector &fetch_tensors, - const std::string &fetched_var_name); + FeedFetchList Run(const std::vector &fetch_tensors); private: // broadcast the parameters from the 0th device. diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 57eefc0b5f0e0fc89abc6a2b02e573c48d3d0132..5db8b4df99adac35048997f24c86092838ca9b20 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -1068,7 +1068,17 @@ All parameter, weight, gradient are variables in Paddle. t = fluid.LoDTensor() t.set(np.ndarray([5, 30]), fluid.CPUPlace()) arr.append(t) - )DOC"); + )DOC") + .def("_move_to_list", + [](LoDTensorArray &self) -> py::list { + py::list res(self.size()); + for (size_t i = 0; i < self.size(); ++i) { + res[i] = py::cast(std::move(self[i])); + } + self.clear(); + return res; + }, + py::return_value_policy::take_ownership); m.def("IsInplace", [](std::string op) -> bool { return operators::IsInplace(op); }); @@ -1650,10 +1660,9 @@ All parameter, weight, gradient are variables in Paddle. .def("feed_and_split_tensor_into_local_scopes", &ParallelExecutor::FeedAndSplitTensorIntoLocalScopes) .def("run", [](ParallelExecutor &self, - const std::vector &fetch_tensors, - const std::string &fetched_var_name) { + const std::vector &fetch_tensors) { pybind11::gil_scoped_release release; - self.Run(fetch_tensors, fetched_var_name); + return self.Run(fetch_tensors); }); BindRecordIOWriter(&m); diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 90b58c38057f394856ba258d490c93665a499181..6c1d8f4d3b0a307e82e0a18620c0ecfeb65eceff 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -527,12 +527,12 @@ class Executor(object): exe.feed_tensors_into_local_scopes(res) fetch_var_names = list(map(_to_name_str, fetch_list)) - exe.run(fetch_var_names, fetch_var_name) - arr = scope.find_var(fetch_var_name).get_lod_tensor_array() + tensors = exe.run(fetch_var_names)._move_to_list() if return_numpy: - return as_numpy(arr) - return [arr[i] for i in range(len(arr))] + return as_numpy(tensors) + else: + return tensors def run(self, program=None, @@ -745,10 +745,12 @@ class Executor(object): exe.run(program.desc, scope, 0, True, True, fetch_var_name) else: exe.run_cached_prepared_ctx(ctx, scope, False, False, False) - outs = self._fetch_data(fetch_list, fetch_var_name, scope) + arr = scope.find_var(fetch_var_name).get_lod_tensor_array() + tensors = arr._move_to_list() if return_numpy: - outs = as_numpy(outs) - return outs + return as_numpy(tensors) + else: + return tensors def _run_inference(self, exe, feed): return exe.run(feed) diff --git a/python/paddle/fluid/tests/unittests/op_test.py b/python/paddle/fluid/tests/unittests/op_test.py index 6b8622b6f26f6102e5ee02716f30a847ed9a2fed..b43e1922d5815c8e11da57d28efd8a0cde176eea 100644 --- a/python/paddle/fluid/tests/unittests/op_test.py +++ b/python/paddle/fluid/tests/unittests/op_test.py @@ -327,19 +327,11 @@ class OpTest(unittest.TestCase): if parallel: use_cuda = False - if isinstance(place, fluid.CUDAPlace(0)): + if isinstance(place, fluid.CUDAPlace): use_cuda = True - if loss: - executor = fluid.ParallelExecutor( - use_cuda=use_cuda, - loss_name=loss.name, - main_program=program) - else: - executor = fluid.ParallelExecutor( - use_cuda=use_cuda, main_program=program) - else: - executor = Executor(place) - + compiled_prog = fluid.CompiledProgram(program).with_data_parallel( + loss_name=loss.name if loss else None, places=place) + program = compiled_prog fetch_list = getattr(self, "fetch_list", []) # if the fetch_list is customized by user, we use it directly. # if not, fill the fetch_list by the user configured outputs in test. @@ -359,6 +351,7 @@ class OpTest(unittest.TestCase): # fetch_list = map(block.var, fetch_list) if not isinstance(fetch_list[0], fluid.framework.Variable): fetch_list = list(map(block.var, fetch_list)) + executor = Executor(place) outs = executor.run(program, feed=feed_map, fetch_list=fetch_list, @@ -657,12 +650,12 @@ class OpTest(unittest.TestCase): fetch_list = [g for p, g in param_grad_list] if parallel: use_cuda = False - if isinstance(place, fluid.CUDAPlace(0)): + if isinstance(place, fluid.CUDAPlace): use_cuda = True - executor = fluid.ParallelExecutor( - use_cuda=use_cuda, loss_name=loss.name, main_program=prog) - else: - executor = Executor(place) + compiled_prog = fluid.CompiledProgram(prog).with_data_parallel( + loss_name=loss.name, places=place) + prog = compiled_prog + executor = fluid.Executor(place) return list( map(np.array, executor.run(prog, feed_dict, fetch_list, return_numpy=False))) diff --git a/python/paddle/fluid/tests/unittests/test_executor_return_tensor_not_overwriting.py b/python/paddle/fluid/tests/unittests/test_executor_return_tensor_not_overwriting.py new file mode 100644 index 0000000000000000000000000000000000000000..64a8f20dae1e8f14ca44979dfade519958d4d4c7 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_executor_return_tensor_not_overwriting.py @@ -0,0 +1,112 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import unittest +import numpy as np +import paddle.fluid.core as core +import paddle.fluid as fluid +from op_test import OpTest + + +class TestExecutorReturnTensorNotOverwritingWithOptest(OpTest): + def setUp(self): + pass + + def calc_add_out(self, place=None, parallel=None): + self.x = np.random.random((2, 5)).astype(np.float32) + self.y = np.random.random((2, 5)).astype(np.float32) + self.out = np.add(self.x, self.y) + self.inputs = { + 'X': OpTest.np_dtype_to_fluid_dtype(self.x), + 'Y': OpTest.np_dtype_to_fluid_dtype(self.y) + } + self.outputs = {'Out': self.out} + self.op_type = "elementwise_add" + self.dtype = np.float32 + outs, fetch_list = self._calc_output(place, parallel=parallel) + return outs + + def calc_mul_out(self, place=None, parallel=None): + self.x = np.random.random((2, 5)).astype(np.float32) + self.y = np.random.random((5, 2)).astype(np.float32) + self.out = np.dot(self.x, self.y) + self.inputs = { + 'X': OpTest.np_dtype_to_fluid_dtype(self.x), + 'Y': OpTest.np_dtype_to_fluid_dtype(self.y) + } + self.outputs = {'Out': self.out} + self.op_type = "elementwise_mul" + self.dtype = np.float32 + outs, fetch_list = self._calc_output(place, parallel=parallel) + return outs + + def test_executor_run_twice(self): + places = [fluid.CPUPlace()] + if fluid.is_compiled_with_cuda(): + places.append(fluid.CUDAPlace(0)) + + for place in places: + for parallel in [True, False]: + add_out = self.calc_add_out(place, parallel) + add_out1 = np.array(add_out[0]) + mul_out = self.calc_mul_out(place, parallel) + add_out2 = np.array(add_out[0]) + self.assertTrue(np.array_equal(add_out1, add_out2)) + + +class TestExecutorReturnTensorNotOverOverwritingWithLayers(unittest.TestCase): + def setUp(self): + pass + + def calc_add_out(self, place=None, parallel=None): + x = fluid.layers.ones(shape=[3, 3], dtype='float32') + y = fluid.layers.ones(shape=[3, 3], dtype='float32') + out = fluid.layers.elementwise_add(x=x, y=y) + program = fluid.default_main_program() + if parallel: + program = fluid.CompiledProgram(program).with_data_parallel( + places=place) + exe = fluid.Executor(place) + out = exe.run(program, fetch_list=[out], return_numpy=False) + return out + + def calc_sub_out(self, place=None, parallel=None): + x = fluid.layers.ones(shape=[2, 2], dtype='float32') + y = fluid.layers.ones(shape=[2, 2], dtype='float32') + out = fluid.layers.elementwise_sub(x=x, y=y) + program = fluid.default_main_program() + if parallel: + program = fluid.CompiledProgram(program).with_data_parallel( + places=place) + exe = fluid.Executor(place) + out = exe.run(program, fetch_list=[out], return_numpy=False) + return out + + def test_executor_run_twice(self): + places = [fluid.CPUPlace()] + if fluid.is_compiled_with_cuda(): + places.append(fluid.CUDAPlace(0)) + + for place in places: + for parallel in [True, False]: + add_out = self.calc_add_out(place, parallel) + add_out1 = np.array(add_out[0]) + sub_out = self.calc_sub_out(place, parallel) + add_out2 = np.array(add_out[0]) + self.assertTrue(np.array_equal(add_out1, add_out2)) + + +if __name__ == '__main__': + unittest.main()