diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 9ba0985dcb00d58ca57ec191b8a125c82cdb0f83..df7b7b49ed9f8cd0b8f5d9f2d94114660e7c4bd1 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -200,12 +200,17 @@ class ParallelExecutorPrivate { InitNCCLCtxs(scope, bst); } #endif + inline bool IsPersistable(const std::string &name) const { + auto iter = is_persistable_.find(name); + return iter != is_persistable_.end() && iter->second; + } BuildStrategy build_strategy_; std::vector places_; std::vector local_scopes_; Scope *global_scope_; // not owned std::unique_ptr executor_; + std::unordered_map is_persistable_; #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) platform::NCCLCommunicator *nccl_ctxs_{nullptr}; @@ -473,6 +478,8 @@ ParallelExecutor::ParallelExecutor(const std::vector &places, var_infos.back().name_ = node->Var()->Name(); var_infos.back().type_ = node->Var()->GetType(); var_infos.back().persistable_ = node->Var()->Persistable(); + member_->is_persistable_.emplace(node->Var()->Name(), + node->Var()->Persistable()); } } @@ -642,15 +649,19 @@ void ParallelExecutor::FeedTensorsIntoLocalScopes( void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes( const std::unordered_map &tensors) { - for (auto pair : tensors) { + size_t num_places = member_->places_.size(); + for (auto &pair : tensors) { + bool is_persistable = member_->IsPersistable(pair.first); + VLOG(3) << "Split " << (is_persistable ? "persistable" : "no persistable") + << " data (" << pair.first << "), dim:" << pair.second.dims() + << ", place: " << pair.second.place(); auto lod_tensors = pair.second.SplitLoDTensor(member_->places_); - if (member_->places_.size() != lod_tensors.size()) { - bool is_cpu_place = platform::is_cpu_place(member_->places_.front()); + bool is_cpu_place = platform::is_cpu_place(member_->places_.front()); + if (!is_persistable && num_places != lod_tensors.size()) { auto error_info = string::Sprintf( - "The number(%d) of samples of " - "current batch is less than the count(%d) of " - "devices(%s), currently, it is not allowed. ", - lod_tensors.size(), member_->places_.size(), + "The number(%d) of samples[%s] of current batch is less than the " + "count(%d) of devices(%s), currently, it is not allowed. ", + lod_tensors.size(), pair.first, num_places, (is_cpu_place ? "CPU" : "GPU")); if (is_cpu_place) { error_info += @@ -658,7 +669,38 @@ void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes( "to determine the number of devices you need."; } PADDLE_THROW(error_info); + } else if (is_persistable) { + if (lod_tensors.size() == 1) { + lod_tensors.reserve(num_places); + auto &tensor = lod_tensors.front(); + PADDLE_ENFORCE_EQ(tensor.dims(), pair.second.dims(), + "The dim doesn't match."); + PADDLE_ENFORCE_EQ(tensor.place(), member_->places_.at(0), + "The place doesn't match."); + for (size_t i = 1; i < num_places; ++i) { + lod_tensors.emplace_back(); + auto &tmp = lod_tensors.back(); + framework::TensorCopy(pair.second, member_->places_.at(i), &tmp); + } + } + if (lod_tensors.size() != num_places) { + auto error_info = string::Sprintf( + "The number(%d) of samples[%s] of the current batch does not match " + "the count(%d) of devices(%s). Because that %s is a persistable " + "variable, you can feed just one sample, in that case, the input " + "sample will be copied in %d copies and be sent to different " + "places separately. If you need that different place has different " + "value, you should feed %d samples.", + lod_tensors.size(), pair.first, num_places, + (is_cpu_place ? "CPU" : "GPU"), pair.first, num_places, num_places); + PADDLE_THROW(error_info); + } } + PADDLE_ENFORCE_EQ( + lod_tensors.size(), num_places, + "The number(%d) of samples of the current batch does not match the " + "count(%d) of devices.", + lod_tensors.size(), num_places); for (size_t j = 0; j < member_->places_.size(); ++j) { // TODO(panxy0718): Do I need to delete this var? auto t = diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 85282f88743500c6cfe4a285aa6862a7a54a5ba4..b74555ee4d2f035968faeb825cfa20d8f5eb4451 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -496,8 +496,11 @@ class Executor(object): feed_tensor = feed[feed_name] if not isinstance(feed_tensor, core.LoDTensor): feed_tensor = core.LoDTensor() - # always set to CPU place, since the tensor need to be splitted + # always set to CPU place, since the tensor need to be split # it is fast in CPU + assert isinstance( feed[feed_name], np.ndarray ), \ + "The input({}) should be numpy.array, but not {}.".format( + feed_name, type(feed[feed_name])) feed_tensor.set(feed[feed_name], core.CPUPlace()) feed_tensor_dict[feed_name] = feed_tensor @@ -518,6 +521,9 @@ class Executor(object): tensor = each[feed_name] if not isinstance(tensor, core.LoDTensor): tmp = core.LoDTensor() + assert isinstance(each[feed_name], np.ndarray), \ + "The input({}) should be numpy.array, but not {}.".format( + feed_name, type(each[feed_name])) tmp.set(tensor, program._places[i]) tensor = tmp res_dict[feed_name] = tensor diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 9b2ccf7049ec09b0bdaacd97354248278786ac4a..08e186c2d19bdf2c2e34d66c717eff1ebf517ce8 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -230,5 +230,6 @@ if(WITH_DISTRIBUTE) endif() set_tests_properties(test_recordio_reader test_parallel_executor_test_while_train test_parallel_executor_mnist + test_parallel_executor_feed_persistable_var test_parallel_executor_seresnext test_parallel_executor_crf test_sync_batch_norm_op PROPERTIES LABELS "RUN_TYPE=DIST") diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_feed_persistable_var.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_feed_persistable_var.py new file mode 100644 index 0000000000000000000000000000000000000000..c0f8028d48905913e3811a730db67214f8e4b4a7 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_feed_persistable_var.py @@ -0,0 +1,78 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +from functools import partial +import numpy +import unittest +import paddle.fluid.core as core +import paddle.fluid as fluid +from simple_nets import init_data, simple_fc_net +import os + + +class TestFeedPersistableVar(unittest.TestCase): + @classmethod + def setUpClass(cls): + os.environ['CPU_NUM'] = str(4) + batch_size = 4 + cls.img, cls.label = init_data( + batch_size, img_shape=[784], label_range=9) + cls.feed_dict = { + 'image': cls.img, + 'label': cls.label, + 'learning_rate': numpy.array([1.0]).astype("float32") + } + + def optimizer(self): + learning_rate = fluid.layers.create_global_var( + name="learning_rate", + shape=[1], + value=1.0, + dtype='float32', + persistable=True) + optimizer = fluid.optimizer.SGD(learning_rate=learning_rate) + return optimizer + + def check_feed_persistable_var(self, feed_dict, use_cuda=False): + if use_cuda and not core.is_compiled_with_cuda(): + return + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + exe = fluid.Executor(place) + + main = fluid.Program() + startup = fluid.Program() + with fluid.program_guard(main, startup): + loss = simple_fc_net() + + optimizer = self.optimizer() + optimizer.minimize(loss) + + exe.run(program=startup) + compiled_prog = fluid.compiler.CompiledProgram( + main).with_data_parallel(loss_name=loss.name) + + exe.run(program=compiled_prog, feed=feed_dict) + + def test_feed_persistable_var(self): + self.check_feed_persistable_var(self.feed_dict) + self.check_feed_persistable_var(self.feed_dict, use_cuda=True) + + self.feed_dict['learning_rate'] = numpy.array( + [1.0, 1.0]).astype("float32") + self.check_feed_persistable_var(self.feed_dict, use_cuda=True) + + +if __name__ == '__main__': + unittest.main()