未验证 提交 e340df01 编写于 作者: C chengduo 提交者: GitHub

Support feed single persistable variable to PE (#19417)

* update executor feed
上级 fcec365d
......@@ -723,15 +723,19 @@ void ParallelExecutor::FeedTensorsIntoLocalScopes(
void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes(
const std::unordered_map<std::string, LoDTensor> &tensors) {
size_t num_places = member_->places_.size();
for (auto &pair : tensors) {
bool is_persistable = member_->IsPersistable(pair.first);
VLOG(3) << "Split " << (is_persistable ? "persistable" : "no persistable")
<< " data (" << pair.first << "), dim:" << pair.second.dims()
<< ", place: " << pair.second.place();
auto lod_tensors = pair.second.SplitLoDTensor(member_->places_);
if (member_->places_.size() != lod_tensors.size()) {
bool is_cpu_place = platform::is_cpu_place(member_->places_.front());
bool is_cpu_place = platform::is_cpu_place(member_->places_.front());
if (!is_persistable && num_places != lod_tensors.size()) {
auto error_info = string::Sprintf(
"The number(%d) of samples of "
"current batch is less than the count(%d) of "
"devices(%s), currently, it is not allowed. ",
lod_tensors.size(), member_->places_.size(),
"The number(%d) of samples[%s] of current batch is less than the "
"count(%d) of devices(%s), currently, it is not allowed. ",
lod_tensors.size(), pair.first, num_places,
(is_cpu_place ? "CPU" : "GPU"));
if (is_cpu_place) {
error_info +=
......@@ -739,10 +743,35 @@ void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes(
"to determine the number of devices you need.";
}
PADDLE_THROW(error_info);
} else if (is_persistable) {
if (lod_tensors.size() == 1) {
lod_tensors.reserve(num_places);
auto &tensor = lod_tensors.front();
PADDLE_ENFORCE_EQ(tensor.dims(), pair.second.dims(),
"The dim doesn't match.");
PADDLE_ENFORCE_EQ(tensor.place(), member_->places_.at(0),
"The place doesn't match.");
for (size_t i = 1; i < num_places; ++i) {
lod_tensors.emplace_back();
auto &tmp = lod_tensors.back();
framework::TensorCopy(pair.second, member_->places_.at(i), &tmp);
}
}
if (lod_tensors.size() != num_places) {
auto error_info = string::Sprintf(
"The number(%d) of samples[%s] of the current batch does not match "
"the count(%d) of devices(%s). Because that %s is a persistable "
"variable, you can feed just one sample, in that case, the input "
"sample will be copied in %d copies and be sent to different "
"places separately. If you need that different place has different "
"value, you should feed %d samples.",
lod_tensors.size(), pair.first, num_places,
(is_cpu_place ? "CPU" : "GPU"), pair.first, num_places, num_places);
PADDLE_THROW(error_info);
}
}
bool is_persistable = member_->IsPersistable(pair.first);
for (size_t j = 0; j < member_->places_.size(); ++j) {
for (size_t j = 0; j < num_places; ++j) {
auto *feed_scope = is_persistable ? member_->local_scopes_[j]
: member_->local_exec_scopes_[j];
auto *feed_var = feed_scope->Var(pair.first);
......
......@@ -498,8 +498,11 @@ class Executor(object):
feed_tensor = feed[feed_name]
if not isinstance(feed_tensor, core.LoDTensor):
feed_tensor = core.LoDTensor()
# always set to CPU place, since the tensor need to be splitted
# always set to CPU place, since the tensor need to be split
# it is fast in CPU
assert isinstance( feed[feed_name], np.ndarray ), \
"The input({}) should be numpy.array, but not {}.".format(
feed_name, type(feed[feed_name]))
feed_tensor.set(feed[feed_name], core.CPUPlace())
feed_tensor_dict[feed_name] = feed_tensor
......@@ -520,6 +523,9 @@ class Executor(object):
tensor = each[feed_name]
if not isinstance(tensor, core.LoDTensor):
tmp = core.LoDTensor()
assert isinstance(each[feed_name], np.ndarray), \
"The input({}) should be numpy.array, but not {}.".format(
feed_name, type(each[feed_name]))
tmp.set(tensor, program._places[i])
tensor = tmp
res_dict[feed_name] = tensor
......@@ -528,11 +534,7 @@ class Executor(object):
fetch_var_names = list(map(_to_name_str, fetch_list))
tensors = exe.run(fetch_var_names)._move_to_list()
if return_numpy:
return as_numpy(tensors)
else:
return tensors
return as_numpy(tensors) if return_numpy else tensors
def run(self,
program=None,
......@@ -611,7 +613,7 @@ class Executor(object):
use_program_cache=use_program_cache)
except Exception as e:
if not isinstance(e, core.EOFException):
print("An exception was thrown!\n {}".format(str(e)))
print("!!!A non-EOF exception is thrown.")
six.reraise(*sys.exc_info())
def _run_impl(self, program, feed, fetch_list, feed_var_name,
......
......@@ -310,5 +310,6 @@ set_tests_properties(test_parallel_executor_test_while_train test_parallel_execu
test_parallel_executor_seresnext_base_gpu test_parallel_executor_seresnext_with_reduce_gpu
test_parallel_executor_seresnext_with_fuse_all_reduce_gpu
test_parallel_executor_crf test_sync_batch_norm_op
test_parallel_executor_feed_persistable_var
test_parallel_executor_crf_auto_growth test_buffer_shared_memory_reuse_pass_and_fuse_optimization_op_pass
test_buffer_shared_memory_reuse_pass PROPERTIES LABELS "RUN_TYPE=DIST")
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from functools import partial
import numpy
import unittest
import paddle.fluid.core as core
import paddle.fluid as fluid
from simple_nets import init_data, simple_fc_net
import os
class TestFeedPersistableVar(unittest.TestCase):
@classmethod
def setUpClass(cls):
os.environ['CPU_NUM'] = str(4)
batch_size = 4
cls.img, cls.label = init_data(
batch_size, img_shape=[784], label_range=9)
cls.feed_dict = {
'image': cls.img,
'label': cls.label,
'learning_rate': numpy.array([1.0]).astype("float32")
}
def optimizer(self):
learning_rate = fluid.layers.create_global_var(
name="learning_rate",
shape=[1],
value=1.0,
dtype='float32',
persistable=True)
optimizer = fluid.optimizer.SGD(learning_rate=learning_rate)
return optimizer
def check_feed_persistable_var(self, feed_dict, use_cuda=False):
if use_cuda and not core.is_compiled_with_cuda():
return
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe = fluid.Executor(place)
main = fluid.Program()
startup = fluid.Program()
with fluid.program_guard(main, startup):
loss = simple_fc_net()
optimizer = self.optimizer()
optimizer.minimize(loss)
exe.run(program=startup)
compiled_prog = fluid.compiler.CompiledProgram(
main).with_data_parallel(loss_name=loss.name)
exe.run(program=compiled_prog, feed=feed_dict)
def test_feed_persistable_var(self):
self.check_feed_persistable_var(self.feed_dict)
self.check_feed_persistable_var(self.feed_dict, use_cuda=True)
self.feed_dict['learning_rate'] = numpy.array(
[1.0, 1.0]).astype("float32")
self.check_feed_persistable_var(self.feed_dict, use_cuda=True)
self.feed_dict['learning_rate'] = numpy.array(
[1.0, 1.0]).astype("float32")
run = partial(self.check_feed_persistable_var, self.feed_dict)
self.assertRaises(core.EnforceNotMet, run)
self.feed_dict['image'] = self.img[0, :]
self.feed_dict['label'] = self.label[0, :]
run = partial(self.check_feed_persistable_var, self.feed_dict)
self.assertRaises(core.EnforceNotMet, run)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册