diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 9e2e54ade6e891e79744fb6909b052ec521f7eb0..13f8e15b92a0ef4643d2e72e4c14fb7dadc527b9 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -723,15 +723,19 @@ void ParallelExecutor::FeedTensorsIntoLocalScopes( void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes( const std::unordered_map &tensors) { + size_t num_places = member_->places_.size(); for (auto &pair : tensors) { + bool is_persistable = member_->IsPersistable(pair.first); + VLOG(3) << "Split " << (is_persistable ? "persistable" : "no persistable") + << " data (" << pair.first << "), dim:" << pair.second.dims() + << ", place: " << pair.second.place(); auto lod_tensors = pair.second.SplitLoDTensor(member_->places_); - if (member_->places_.size() != lod_tensors.size()) { - bool is_cpu_place = platform::is_cpu_place(member_->places_.front()); + bool is_cpu_place = platform::is_cpu_place(member_->places_.front()); + if (!is_persistable && num_places != lod_tensors.size()) { auto error_info = string::Sprintf( - "The number(%d) of samples of " - "current batch is less than the count(%d) of " - "devices(%s), currently, it is not allowed. ", - lod_tensors.size(), member_->places_.size(), + "The number(%d) of samples[%s] of current batch is less than the " + "count(%d) of devices(%s), currently, it is not allowed. ", + lod_tensors.size(), pair.first, num_places, (is_cpu_place ? "CPU" : "GPU")); if (is_cpu_place) { error_info += @@ -739,10 +743,35 @@ void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes( "to determine the number of devices you need."; } PADDLE_THROW(error_info); + } else if (is_persistable) { + if (lod_tensors.size() == 1) { + lod_tensors.reserve(num_places); + auto &tensor = lod_tensors.front(); + PADDLE_ENFORCE_EQ(tensor.dims(), pair.second.dims(), + "The dim doesn't match."); + PADDLE_ENFORCE_EQ(tensor.place(), member_->places_.at(0), + "The place doesn't match."); + for (size_t i = 1; i < num_places; ++i) { + lod_tensors.emplace_back(); + auto &tmp = lod_tensors.back(); + framework::TensorCopy(pair.second, member_->places_.at(i), &tmp); + } + } + if (lod_tensors.size() != num_places) { + auto error_info = string::Sprintf( + "The number(%d) of samples[%s] of the current batch does not match " + "the count(%d) of devices(%s). Because that %s is a persistable " + "variable, you can feed just one sample, in that case, the input " + "sample will be copied in %d copies and be sent to different " + "places separately. If you need that different place has different " + "value, you should feed %d samples.", + lod_tensors.size(), pair.first, num_places, + (is_cpu_place ? "CPU" : "GPU"), pair.first, num_places, num_places); + PADDLE_THROW(error_info); + } } - bool is_persistable = member_->IsPersistable(pair.first); - for (size_t j = 0; j < member_->places_.size(); ++j) { + for (size_t j = 0; j < num_places; ++j) { auto *feed_scope = is_persistable ? member_->local_scopes_[j] : member_->local_exec_scopes_[j]; auto *feed_var = feed_scope->Var(pair.first); diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 6dbfc7e35352d1068fcf2d170514da7c984dc0b2..f038a15fd0fde6e2b171c56ff4dff645d0bcf9aa 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -498,8 +498,11 @@ class Executor(object): feed_tensor = feed[feed_name] if not isinstance(feed_tensor, core.LoDTensor): feed_tensor = core.LoDTensor() - # always set to CPU place, since the tensor need to be splitted + # always set to CPU place, since the tensor need to be split # it is fast in CPU + assert isinstance( feed[feed_name], np.ndarray ), \ + "The input({}) should be numpy.array, but not {}.".format( + feed_name, type(feed[feed_name])) feed_tensor.set(feed[feed_name], core.CPUPlace()) feed_tensor_dict[feed_name] = feed_tensor @@ -520,6 +523,9 @@ class Executor(object): tensor = each[feed_name] if not isinstance(tensor, core.LoDTensor): tmp = core.LoDTensor() + assert isinstance(each[feed_name], np.ndarray), \ + "The input({}) should be numpy.array, but not {}.".format( + feed_name, type(each[feed_name])) tmp.set(tensor, program._places[i]) tensor = tmp res_dict[feed_name] = tensor @@ -528,11 +534,7 @@ class Executor(object): fetch_var_names = list(map(_to_name_str, fetch_list)) tensors = exe.run(fetch_var_names)._move_to_list() - - if return_numpy: - return as_numpy(tensors) - else: - return tensors + return as_numpy(tensors) if return_numpy else tensors def run(self, program=None, @@ -611,7 +613,7 @@ class Executor(object): use_program_cache=use_program_cache) except Exception as e: if not isinstance(e, core.EOFException): - print("An exception was thrown!\n {}".format(str(e))) + print("!!!A non-EOF exception is thrown.") six.reraise(*sys.exc_info()) def _run_impl(self, program, feed, fetch_list, feed_var_name, diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index c35873dc1475e56593c43287ea4c15d13e6bb4b2..a357b6b864fea39bbea502a5f83ab1cf3d7148bc 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -310,5 +310,6 @@ set_tests_properties(test_parallel_executor_test_while_train test_parallel_execu test_parallel_executor_seresnext_base_gpu test_parallel_executor_seresnext_with_reduce_gpu test_parallel_executor_seresnext_with_fuse_all_reduce_gpu test_parallel_executor_crf test_sync_batch_norm_op + test_parallel_executor_feed_persistable_var test_parallel_executor_crf_auto_growth test_buffer_shared_memory_reuse_pass_and_fuse_optimization_op_pass test_buffer_shared_memory_reuse_pass PROPERTIES LABELS "RUN_TYPE=DIST") diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_feed_persistable_var.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_feed_persistable_var.py new file mode 100644 index 0000000000000000000000000000000000000000..831e2e761088bb173168b946fb6bca945d6c90f5 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_feed_persistable_var.py @@ -0,0 +1,88 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +from functools import partial +import numpy +import unittest +import paddle.fluid.core as core +import paddle.fluid as fluid +from simple_nets import init_data, simple_fc_net +import os + + +class TestFeedPersistableVar(unittest.TestCase): + @classmethod + def setUpClass(cls): + os.environ['CPU_NUM'] = str(4) + batch_size = 4 + cls.img, cls.label = init_data( + batch_size, img_shape=[784], label_range=9) + cls.feed_dict = { + 'image': cls.img, + 'label': cls.label, + 'learning_rate': numpy.array([1.0]).astype("float32") + } + + def optimizer(self): + learning_rate = fluid.layers.create_global_var( + name="learning_rate", + shape=[1], + value=1.0, + dtype='float32', + persistable=True) + optimizer = fluid.optimizer.SGD(learning_rate=learning_rate) + return optimizer + + def check_feed_persistable_var(self, feed_dict, use_cuda=False): + if use_cuda and not core.is_compiled_with_cuda(): + return + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + exe = fluid.Executor(place) + + main = fluid.Program() + startup = fluid.Program() + with fluid.program_guard(main, startup): + loss = simple_fc_net() + + optimizer = self.optimizer() + optimizer.minimize(loss) + + exe.run(program=startup) + compiled_prog = fluid.compiler.CompiledProgram( + main).with_data_parallel(loss_name=loss.name) + + exe.run(program=compiled_prog, feed=feed_dict) + + def test_feed_persistable_var(self): + self.check_feed_persistable_var(self.feed_dict) + self.check_feed_persistable_var(self.feed_dict, use_cuda=True) + + self.feed_dict['learning_rate'] = numpy.array( + [1.0, 1.0]).astype("float32") + self.check_feed_persistable_var(self.feed_dict, use_cuda=True) + + self.feed_dict['learning_rate'] = numpy.array( + [1.0, 1.0]).astype("float32") + run = partial(self.check_feed_persistable_var, self.feed_dict) + self.assertRaises(core.EnforceNotMet, run) + + self.feed_dict['image'] = self.img[0, :] + self.feed_dict['label'] = self.label[0, :] + run = partial(self.check_feed_persistable_var, self.feed_dict) + self.assertRaises(core.EnforceNotMet, run) + + +if __name__ == '__main__': + unittest.main()