未验证 提交 a7a4b72b 编写于 作者: C chengduo 提交者: GitHub

[Cherry pick] Support feed single persistable variable to PE (#19435)

* update executor feed
上级 5860cc47
...@@ -200,12 +200,17 @@ class ParallelExecutorPrivate { ...@@ -200,12 +200,17 @@ class ParallelExecutorPrivate {
InitNCCLCtxs(scope, bst); InitNCCLCtxs(scope, bst);
} }
#endif #endif
inline bool IsPersistable(const std::string &name) const {
auto iter = is_persistable_.find(name);
return iter != is_persistable_.end() && iter->second;
}
BuildStrategy build_strategy_; BuildStrategy build_strategy_;
std::vector<platform::Place> places_; std::vector<platform::Place> places_;
std::vector<Scope *> local_scopes_; std::vector<Scope *> local_scopes_;
Scope *global_scope_; // not owned Scope *global_scope_; // not owned
std::unique_ptr<details::SSAGraphExecutor> executor_; std::unique_ptr<details::SSAGraphExecutor> executor_;
std::unordered_map<std::string, bool> is_persistable_;
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
platform::NCCLCommunicator *nccl_ctxs_{nullptr}; platform::NCCLCommunicator *nccl_ctxs_{nullptr};
...@@ -473,6 +478,8 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places, ...@@ -473,6 +478,8 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places,
var_infos.back().name_ = node->Var()->Name(); var_infos.back().name_ = node->Var()->Name();
var_infos.back().type_ = node->Var()->GetType(); var_infos.back().type_ = node->Var()->GetType();
var_infos.back().persistable_ = node->Var()->Persistable(); var_infos.back().persistable_ = node->Var()->Persistable();
member_->is_persistable_.emplace(node->Var()->Name(),
node->Var()->Persistable());
} }
} }
...@@ -642,15 +649,19 @@ void ParallelExecutor::FeedTensorsIntoLocalScopes( ...@@ -642,15 +649,19 @@ void ParallelExecutor::FeedTensorsIntoLocalScopes(
void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes( void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes(
const std::unordered_map<std::string, LoDTensor> &tensors) { const std::unordered_map<std::string, LoDTensor> &tensors) {
for (auto pair : tensors) { size_t num_places = member_->places_.size();
for (auto &pair : tensors) {
bool is_persistable = member_->IsPersistable(pair.first);
VLOG(3) << "Split " << (is_persistable ? "persistable" : "no persistable")
<< " data (" << pair.first << "), dim:" << pair.second.dims()
<< ", place: " << pair.second.place();
auto lod_tensors = pair.second.SplitLoDTensor(member_->places_); auto lod_tensors = pair.second.SplitLoDTensor(member_->places_);
if (member_->places_.size() != lod_tensors.size()) { bool is_cpu_place = platform::is_cpu_place(member_->places_.front());
bool is_cpu_place = platform::is_cpu_place(member_->places_.front()); if (!is_persistable && num_places != lod_tensors.size()) {
auto error_info = string::Sprintf( auto error_info = string::Sprintf(
"The number(%d) of samples of " "The number(%d) of samples[%s] of current batch is less than the "
"current batch is less than the count(%d) of " "count(%d) of devices(%s), currently, it is not allowed. ",
"devices(%s), currently, it is not allowed. ", lod_tensors.size(), pair.first, num_places,
lod_tensors.size(), member_->places_.size(),
(is_cpu_place ? "CPU" : "GPU")); (is_cpu_place ? "CPU" : "GPU"));
if (is_cpu_place) { if (is_cpu_place) {
error_info += error_info +=
...@@ -658,7 +669,38 @@ void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes( ...@@ -658,7 +669,38 @@ void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes(
"to determine the number of devices you need."; "to determine the number of devices you need.";
} }
PADDLE_THROW(error_info); PADDLE_THROW(error_info);
} else if (is_persistable) {
if (lod_tensors.size() == 1) {
lod_tensors.reserve(num_places);
auto &tensor = lod_tensors.front();
PADDLE_ENFORCE_EQ(tensor.dims(), pair.second.dims(),
"The dim doesn't match.");
PADDLE_ENFORCE_EQ(tensor.place(), member_->places_.at(0),
"The place doesn't match.");
for (size_t i = 1; i < num_places; ++i) {
lod_tensors.emplace_back();
auto &tmp = lod_tensors.back();
framework::TensorCopy(pair.second, member_->places_.at(i), &tmp);
}
}
if (lod_tensors.size() != num_places) {
auto error_info = string::Sprintf(
"The number(%d) of samples[%s] of the current batch does not match "
"the count(%d) of devices(%s). Because that %s is a persistable "
"variable, you can feed just one sample, in that case, the input "
"sample will be copied in %d copies and be sent to different "
"places separately. If you need that different place has different "
"value, you should feed %d samples.",
lod_tensors.size(), pair.first, num_places,
(is_cpu_place ? "CPU" : "GPU"), pair.first, num_places, num_places);
PADDLE_THROW(error_info);
}
} }
PADDLE_ENFORCE_EQ(
lod_tensors.size(), num_places,
"The number(%d) of samples of the current batch does not match the "
"count(%d) of devices.",
lod_tensors.size(), num_places);
for (size_t j = 0; j < member_->places_.size(); ++j) { for (size_t j = 0; j < member_->places_.size(); ++j) {
// TODO(panxy0718): Do I need to delete this var? // TODO(panxy0718): Do I need to delete this var?
auto t = auto t =
......
...@@ -496,8 +496,11 @@ class Executor(object): ...@@ -496,8 +496,11 @@ class Executor(object):
feed_tensor = feed[feed_name] feed_tensor = feed[feed_name]
if not isinstance(feed_tensor, core.LoDTensor): if not isinstance(feed_tensor, core.LoDTensor):
feed_tensor = core.LoDTensor() feed_tensor = core.LoDTensor()
# always set to CPU place, since the tensor need to be splitted # always set to CPU place, since the tensor need to be split
# it is fast in CPU # it is fast in CPU
assert isinstance( feed[feed_name], np.ndarray ), \
"The input({}) should be numpy.array, but not {}.".format(
feed_name, type(feed[feed_name]))
feed_tensor.set(feed[feed_name], core.CPUPlace()) feed_tensor.set(feed[feed_name], core.CPUPlace())
feed_tensor_dict[feed_name] = feed_tensor feed_tensor_dict[feed_name] = feed_tensor
...@@ -518,6 +521,9 @@ class Executor(object): ...@@ -518,6 +521,9 @@ class Executor(object):
tensor = each[feed_name] tensor = each[feed_name]
if not isinstance(tensor, core.LoDTensor): if not isinstance(tensor, core.LoDTensor):
tmp = core.LoDTensor() tmp = core.LoDTensor()
assert isinstance(each[feed_name], np.ndarray), \
"The input({}) should be numpy.array, but not {}.".format(
feed_name, type(each[feed_name]))
tmp.set(tensor, program._places[i]) tmp.set(tensor, program._places[i])
tensor = tmp tensor = tmp
res_dict[feed_name] = tensor res_dict[feed_name] = tensor
......
...@@ -230,5 +230,6 @@ if(WITH_DISTRIBUTE) ...@@ -230,5 +230,6 @@ if(WITH_DISTRIBUTE)
endif() endif()
set_tests_properties(test_recordio_reader test_parallel_executor_test_while_train test_parallel_executor_mnist set_tests_properties(test_recordio_reader test_parallel_executor_test_while_train test_parallel_executor_mnist
test_parallel_executor_feed_persistable_var
test_parallel_executor_seresnext test_parallel_executor_crf test_sync_batch_norm_op test_parallel_executor_seresnext test_parallel_executor_crf test_sync_batch_norm_op
PROPERTIES LABELS "RUN_TYPE=DIST") PROPERTIES LABELS "RUN_TYPE=DIST")
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from functools import partial
import numpy
import unittest
import paddle.fluid.core as core
import paddle.fluid as fluid
from simple_nets import init_data, simple_fc_net
import os
class TestFeedPersistableVar(unittest.TestCase):
@classmethod
def setUpClass(cls):
os.environ['CPU_NUM'] = str(4)
batch_size = 4
cls.img, cls.label = init_data(
batch_size, img_shape=[784], label_range=9)
cls.feed_dict = {
'image': cls.img,
'label': cls.label,
'learning_rate': numpy.array([1.0]).astype("float32")
}
def optimizer(self):
learning_rate = fluid.layers.create_global_var(
name="learning_rate",
shape=[1],
value=1.0,
dtype='float32',
persistable=True)
optimizer = fluid.optimizer.SGD(learning_rate=learning_rate)
return optimizer
def check_feed_persistable_var(self, feed_dict, use_cuda=False):
if use_cuda and not core.is_compiled_with_cuda():
return
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe = fluid.Executor(place)
main = fluid.Program()
startup = fluid.Program()
with fluid.program_guard(main, startup):
loss = simple_fc_net()
optimizer = self.optimizer()
optimizer.minimize(loss)
exe.run(program=startup)
compiled_prog = fluid.compiler.CompiledProgram(
main).with_data_parallel(loss_name=loss.name)
exe.run(program=compiled_prog, feed=feed_dict)
def test_feed_persistable_var(self):
self.check_feed_persistable_var(self.feed_dict)
self.check_feed_persistable_var(self.feed_dict, use_cuda=True)
self.feed_dict['learning_rate'] = numpy.array(
[1.0, 1.0]).astype("float32")
self.check_feed_persistable_var(self.feed_dict, use_cuda=True)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册