From b4aaa00a8a71c106172da84ce3f81ba16df76227 Mon Sep 17 00:00:00 2001 From: Yu Yang Date: Tue, 17 Apr 2018 13:57:07 +0800 Subject: [PATCH] Polish logic of ParallelExecutor --- paddle/fluid/framework/parallel_executor.cc | 34 +++++---- paddle/fluid/framework/parallel_executor.h | 16 +++-- paddle/fluid/pybind/pybind.cc | 7 +- python/paddle/fluid/parallel_executor.py | 69 ++++++++++++++----- .../tests/unittests/test_parallel_executor.py | 12 ++-- 5 files changed, 96 insertions(+), 42 deletions(-) diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index c1486b527d2..0962f40c4a6 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -155,13 +155,9 @@ void ParallelExecutor::BCastParamsToGPUs( #endif } -void ParallelExecutor::Run( - const std::vector &fetch_tensors, - const std::string &fetched_var_name, - const std::unordered_map &feed_tensors) { +void ParallelExecutor::Run(const std::vector &fetch_tensors, + const std::string &fetched_var_name) { platform::RecordBlock b(0); - SplitTensorToPlaces(feed_tensors); - // Create local scopes. for (auto &scope : member_->local_scopes_) { Scope &local_scope = scope->NewScope(); @@ -195,14 +191,28 @@ void ParallelExecutor::Run( auto &local_scope = *scope->Var(details::kLocalExecScopeName)->GetMutable(); scope->DeleteScope(local_scope); - local_scope = nullptr; } } -void ParallelExecutor::SplitTensorToPlaces( - const std::unordered_map &feed_tensors) { - for (auto it : feed_tensors) { - auto lod_tensors = it.second.SplitLoDTensor(member_->places_); +void ParallelExecutor::FeedTensorsIntoLocalScopes( + const std::vector> &tensors) { + PADDLE_ENFORCE_EQ(member_->local_scopes_.size(), tensors.size()); + + for (size_t i = 0; i < tensors.size(); ++i) { + auto &map = tensors[i]; + auto *scope = member_->local_scopes_[i]; + for (auto &pair : map) { + auto *trg = scope->Var(pair.first)->GetMutable(); + trg->ShareDataWith(pair.second); + trg->set_lod(pair.second.lod()); + } + } +} + +void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes( + const std::unordered_map &tensors) { + for (auto pair : tensors) { + auto lod_tensors = pair.second.SplitLoDTensor(member_->places_); PADDLE_ENFORCE_EQ( member_->places_.size(), lod_tensors.size(), "The number of samples of current batch is less than the count of " @@ -211,7 +221,7 @@ void ParallelExecutor::SplitTensorToPlaces( for (size_t j = 0; j < member_->places_.size(); ++j) { // TODO(panxy0718): Do I need to delete this var? auto t = - member_->local_scopes_[j]->Var(it.first)->GetMutable(); + member_->local_scopes_[j]->Var(pair.first)->GetMutable(); t->ShareDataWith(lod_tensors[j]); t->set_lod(lod_tensors[j].lod()); } diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index b4f16dba858..303ac3bc55c 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -44,16 +44,22 @@ class ParallelExecutor { std::vector& GetLocalScopes(); + /** + * Feed tensors to local scopes. The size of tensors should be equal to the + * size of local scopes. + */ + void FeedTensorsIntoLocalScopes( + const std::vector>& tensors); + + void FeedAndSplitTensorIntoLocalScopes( + const std::unordered_map& tensors); + void Run(const std::vector& fetch_tensors, - const std::string& fetched_var_name, - const std::unordered_map& feed_tensors); + const std::string& fetched_var_name); void BCastParamsToGPUs(const std::unordered_set& vars) const; private: - void SplitTensorToPlaces( - const std::unordered_map& feed_tensors); - ParallelExecutorPrivate* member_; }; diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 5121987f922..19bd30d9665 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -514,9 +514,10 @@ All parameter, weight, gradient are variables in Paddle. return &self.GetLocalScopes(); }, py::return_value_policy::reference) - .def("local_scope", [](ParallelExecutor &self, - size_t i) { return self.GetLocalScopes()[i]; }, - py::return_value_policy::reference) + .def("feed_tensors_into_local_scopes", + &ParallelExecutor::FeedTensorsIntoLocalScopes) + .def("feed_and_split_tensor_into_local_scopes", + &ParallelExecutor::FeedAndSplitTensorIntoLocalScopes) .def("run", &ParallelExecutor::Run); BindRecordIOWriter(&m); diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index 5ce2aa1fc4d..f9741fbed6e 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -123,28 +123,65 @@ class ParallelExecutor(object): allow_op_delay) self.scope = scope - def run(self, fetch_list, feed_dict={}): + def run(self, fetch_list, feed=None, feed_dict=None): """ - :param fetch_list: A list of variable names that will be fetched. - :param feed_dict: A dict mapping for feed variable name to LoDTensor - or numpy array. - :return: fetched value list. - """ - if not isinstance(feed_dict, dict): - raise TypeError("feed_dict should be a dict") - feed_tensor_dict = {} - for i, feed_name in enumerate(feed_dict): - feed_tensor = feed_dict[feed_name] - if not isinstance(feed_tensor, core.LoDTensor): - feed_tensor = core.LoDTensor() - feed_tensor.set(feed_dict[feed_name], self._act_places[0]) - feed_tensor_dict[feed_name] = feed_tensor + Args: + fetch_list(list): The fetched variable names + feed(list|dict|None): The feed variables. If the feed is a dict, tensors in that dict will be splitted + into each devices. If the feed is a list, each element of the list will be copied to each device. + feed_dict: Alias for feed parameter, for backward compatibility. + + Returns: fetched result list. + + """ + if feed is None: + feed = feed_dict + + if isinstance(feed, dict): + feed_tensor_dict = dict() + for feed_name in feed: + feed_tensor = feed[feed_name] + if not isinstance(feed_tensor, core.LoDTensor): + feed_tensor = core.LoDTensor() + # always set to CPU place, since the tensor need to be splitted + # it is fast in CPU + feed_tensor.set(feed[feed_name], core.CPUPlace()) + feed_tensor_dict[feed_name] = feed_tensor + + self.executor.feed_and_split_tensor_into_local_scopes( + feed_tensor_dict) + elif isinstance(feed, list) or isinstance(feed, tuple): + if len(feed) != len(self._act_places): + raise ValueError( + "Feed a list of tensor, the list should be the same size as places" + ) + + res = list() + + for i, each in enumerate(feed): + if not isinstance(each, dict): + raise TypeError( + "Each element of feed list should be a dict") + res_dict = dict() + for feed_name in each: + tensor = each[feed_name] + if not isinstance(tensor, core.LoDTensor): + tmp = core.LoDTensor() + tmp.set(tensor, self._act_places[i]) + tensor = tmp + res_dict[feed_name] = tensor + res.append(res_dict) + self.executor.feed_tensors_into_local_scopes(res) fetch_var_name = '@FETCHED_VAR_NAME@' - self.executor.run(fetch_list, fetch_var_name, feed_tensor_dict) + self.executor.run(fetch_list, fetch_var_name) arr = self.scope.find_var(fetch_var_name).get_lod_tensor_array() return [arr[i] for i in range(len(arr))] def bcast_params(self): self.executor.bcast_params(set(self.persistable_vars)) + + @property + def device_count(self): + return len(self._act_places) diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor.py b/python/paddle/fluid/tests/unittests/test_parallel_executor.py index 83d22fd799e..2f8c10fc1cb 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor.py @@ -203,11 +203,11 @@ class TestParallelExecutorBase(unittest.TestCase): iter=10, batch_size=None, allow_op_delay=False, - feed_dict={}): + feed_dict=None): main = fluid.Program() startup = fluid.Program() with fluid.program_guard(main, startup): - loss = method(use_feed=len(feed_dict) > 0) + loss = method(use_feed=feed_dict is not None) adam = fluid.optimizer.Adam() adam.minimize(loss) if memory_opt: @@ -221,13 +221,13 @@ class TestParallelExecutorBase(unittest.TestCase): if batch_size is not None: batch_size *= fluid.core.get_cuda_device_count() begin = time.time() - first_loss, = exe.run([loss.name], feed_dict=feed_dict) + first_loss, = exe.run([loss.name], feed=feed_dict) first_loss = numpy.array(first_loss) for i in xrange(iter): - exe.run([], feed_dict=feed_dict) + exe.run([], feed=feed_dict) - last_loss, = exe.run([loss.name], feed_dict=feed_dict) + last_loss, = exe.run([loss.name], feed=feed_dict) end = time.time() if batch_size is not None: @@ -648,5 +648,5 @@ class TestCRFModel(unittest.TestCase): for i in xrange(10): cur_batch = next(data) print map(numpy.array, - pe.run(feed_dict=feeder.feed(cur_batch), + pe.run(feed=feeder.feed(cur_batch), fetch_list=[avg_cost.name]))[0] -- GitLab