提交 b4aaa00a 编写于 作者: Y Yu Yang

Polish logic of ParallelExecutor

上级 2ab12ca2
......@@ -155,13 +155,9 @@ void ParallelExecutor::BCastParamsToGPUs(
#endif
}
void ParallelExecutor::Run(
const std::vector<std::string> &fetch_tensors,
const std::string &fetched_var_name,
const std::unordered_map<std::string, LoDTensor> &feed_tensors) {
void ParallelExecutor::Run(const std::vector<std::string> &fetch_tensors,
const std::string &fetched_var_name) {
platform::RecordBlock b(0);
SplitTensorToPlaces(feed_tensors);
// Create local scopes.
for (auto &scope : member_->local_scopes_) {
Scope &local_scope = scope->NewScope();
......@@ -195,14 +191,28 @@ void ParallelExecutor::Run(
auto &local_scope =
*scope->Var(details::kLocalExecScopeName)->GetMutable<Scope *>();
scope->DeleteScope(local_scope);
local_scope = nullptr;
}
}
void ParallelExecutor::SplitTensorToPlaces(
const std::unordered_map<std::string, LoDTensor> &feed_tensors) {
for (auto it : feed_tensors) {
auto lod_tensors = it.second.SplitLoDTensor(member_->places_);
void ParallelExecutor::FeedTensorsIntoLocalScopes(
const std::vector<std::unordered_map<std::string, LoDTensor>> &tensors) {
PADDLE_ENFORCE_EQ(member_->local_scopes_.size(), tensors.size());
for (size_t i = 0; i < tensors.size(); ++i) {
auto &map = tensors[i];
auto *scope = member_->local_scopes_[i];
for (auto &pair : map) {
auto *trg = scope->Var(pair.first)->GetMutable<LoDTensor>();
trg->ShareDataWith(pair.second);
trg->set_lod(pair.second.lod());
}
}
}
void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes(
const std::unordered_map<std::string, LoDTensor> &tensors) {
for (auto pair : tensors) {
auto lod_tensors = pair.second.SplitLoDTensor(member_->places_);
PADDLE_ENFORCE_EQ(
member_->places_.size(), lod_tensors.size(),
"The number of samples of current batch is less than the count of "
......@@ -211,7 +221,7 @@ void ParallelExecutor::SplitTensorToPlaces(
for (size_t j = 0; j < member_->places_.size(); ++j) {
// TODO(panxy0718): Do I need to delete this var?
auto t =
member_->local_scopes_[j]->Var(it.first)->GetMutable<LoDTensor>();
member_->local_scopes_[j]->Var(pair.first)->GetMutable<LoDTensor>();
t->ShareDataWith(lod_tensors[j]);
t->set_lod(lod_tensors[j].lod());
}
......
......@@ -44,16 +44,22 @@ class ParallelExecutor {
std::vector<Scope*>& GetLocalScopes();
/**
* Feed tensors to local scopes. The size of tensors should be equal to the
* size of local scopes.
*/
void FeedTensorsIntoLocalScopes(
const std::vector<std::unordered_map<std::string, LoDTensor>>& tensors);
void FeedAndSplitTensorIntoLocalScopes(
const std::unordered_map<std::string, LoDTensor>& tensors);
void Run(const std::vector<std::string>& fetch_tensors,
const std::string& fetched_var_name,
const std::unordered_map<std::string, LoDTensor>& feed_tensors);
const std::string& fetched_var_name);
void BCastParamsToGPUs(const std::unordered_set<std::string>& vars) const;
private:
void SplitTensorToPlaces(
const std::unordered_map<std::string, LoDTensor>& feed_tensors);
ParallelExecutorPrivate* member_;
};
......
......@@ -514,9 +514,10 @@ All parameter, weight, gradient are variables in Paddle.
return &self.GetLocalScopes();
},
py::return_value_policy::reference)
.def("local_scope", [](ParallelExecutor &self,
size_t i) { return self.GetLocalScopes()[i]; },
py::return_value_policy::reference)
.def("feed_tensors_into_local_scopes",
&ParallelExecutor::FeedTensorsIntoLocalScopes)
.def("feed_and_split_tensor_into_local_scopes",
&ParallelExecutor::FeedAndSplitTensorIntoLocalScopes)
.def("run", &ParallelExecutor::Run);
BindRecordIOWriter(&m);
......
......@@ -123,28 +123,65 @@ class ParallelExecutor(object):
allow_op_delay)
self.scope = scope
def run(self, fetch_list, feed_dict={}):
def run(self, fetch_list, feed=None, feed_dict=None):
"""
:param fetch_list: A list of variable names that will be fetched.
:param feed_dict: A dict mapping for feed variable name to LoDTensor
or numpy array.
:return: fetched value list.
Args:
fetch_list(list): The fetched variable names
feed(list|dict|None): The feed variables. If the feed is a dict, tensors in that dict will be splitted
into each devices. If the feed is a list, each element of the list will be copied to each device.
feed_dict: Alias for feed parameter, for backward compatibility.
Returns: fetched result list.
"""
if not isinstance(feed_dict, dict):
raise TypeError("feed_dict should be a dict")
if feed is None:
feed = feed_dict
feed_tensor_dict = {}
for i, feed_name in enumerate(feed_dict):
feed_tensor = feed_dict[feed_name]
if isinstance(feed, dict):
feed_tensor_dict = dict()
for feed_name in feed:
feed_tensor = feed[feed_name]
if not isinstance(feed_tensor, core.LoDTensor):
feed_tensor = core.LoDTensor()
feed_tensor.set(feed_dict[feed_name], self._act_places[0])
# always set to CPU place, since the tensor need to be splitted
# it is fast in CPU
feed_tensor.set(feed[feed_name], core.CPUPlace())
feed_tensor_dict[feed_name] = feed_tensor
self.executor.feed_and_split_tensor_into_local_scopes(
feed_tensor_dict)
elif isinstance(feed, list) or isinstance(feed, tuple):
if len(feed) != len(self._act_places):
raise ValueError(
"Feed a list of tensor, the list should be the same size as places"
)
res = list()
for i, each in enumerate(feed):
if not isinstance(each, dict):
raise TypeError(
"Each element of feed list should be a dict")
res_dict = dict()
for feed_name in each:
tensor = each[feed_name]
if not isinstance(tensor, core.LoDTensor):
tmp = core.LoDTensor()
tmp.set(tensor, self._act_places[i])
tensor = tmp
res_dict[feed_name] = tensor
res.append(res_dict)
self.executor.feed_tensors_into_local_scopes(res)
fetch_var_name = '@FETCHED_VAR_NAME@'
self.executor.run(fetch_list, fetch_var_name, feed_tensor_dict)
self.executor.run(fetch_list, fetch_var_name)
arr = self.scope.find_var(fetch_var_name).get_lod_tensor_array()
return [arr[i] for i in range(len(arr))]
def bcast_params(self):
self.executor.bcast_params(set(self.persistable_vars))
@property
def device_count(self):
return len(self._act_places)
......@@ -203,11 +203,11 @@ class TestParallelExecutorBase(unittest.TestCase):
iter=10,
batch_size=None,
allow_op_delay=False,
feed_dict={}):
feed_dict=None):
main = fluid.Program()
startup = fluid.Program()
with fluid.program_guard(main, startup):
loss = method(use_feed=len(feed_dict) > 0)
loss = method(use_feed=feed_dict is not None)
adam = fluid.optimizer.Adam()
adam.minimize(loss)
if memory_opt:
......@@ -221,13 +221,13 @@ class TestParallelExecutorBase(unittest.TestCase):
if batch_size is not None:
batch_size *= fluid.core.get_cuda_device_count()
begin = time.time()
first_loss, = exe.run([loss.name], feed_dict=feed_dict)
first_loss, = exe.run([loss.name], feed=feed_dict)
first_loss = numpy.array(first_loss)
for i in xrange(iter):
exe.run([], feed_dict=feed_dict)
exe.run([], feed=feed_dict)
last_loss, = exe.run([loss.name], feed_dict=feed_dict)
last_loss, = exe.run([loss.name], feed=feed_dict)
end = time.time()
if batch_size is not None:
......@@ -648,5 +648,5 @@ class TestCRFModel(unittest.TestCase):
for i in xrange(10):
cur_batch = next(data)
print map(numpy.array,
pe.run(feed_dict=feeder.feed(cur_batch),
pe.run(feed=feeder.feed(cur_batch),
fetch_list=[avg_cost.name]))[0]
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册