未验证 提交 fc6f0be2 编写于 作者: Y Yu Yang 提交者: GitHub

Merge pull request #9942 from reyoung/feature/tuning_pe_trans

Feature/tuning pe trans
...@@ -155,13 +155,9 @@ void ParallelExecutor::BCastParamsToGPUs( ...@@ -155,13 +155,9 @@ void ParallelExecutor::BCastParamsToGPUs(
#endif #endif
} }
void ParallelExecutor::Run( void ParallelExecutor::Run(const std::vector<std::string> &fetch_tensors,
const std::vector<std::string> &fetch_tensors, const std::string &fetched_var_name) {
const std::string &fetched_var_name,
const std::unordered_map<std::string, LoDTensor> &feed_tensors) {
platform::RecordBlock b(0); platform::RecordBlock b(0);
SplitTensorToPlaces(feed_tensors);
// Create local scopes. // Create local scopes.
for (auto &scope : member_->local_scopes_) { for (auto &scope : member_->local_scopes_) {
Scope &local_scope = scope->NewScope(); Scope &local_scope = scope->NewScope();
...@@ -195,14 +191,28 @@ void ParallelExecutor::Run( ...@@ -195,14 +191,28 @@ void ParallelExecutor::Run(
auto &local_scope = auto &local_scope =
*scope->Var(details::kLocalExecScopeName)->GetMutable<Scope *>(); *scope->Var(details::kLocalExecScopeName)->GetMutable<Scope *>();
scope->DeleteScope(local_scope); scope->DeleteScope(local_scope);
local_scope = nullptr;
} }
} }
void ParallelExecutor::SplitTensorToPlaces( void ParallelExecutor::FeedTensorsIntoLocalScopes(
const std::unordered_map<std::string, LoDTensor> &feed_tensors) { const std::vector<std::unordered_map<std::string, LoDTensor>> &tensors) {
for (auto it : feed_tensors) { PADDLE_ENFORCE_EQ(member_->local_scopes_.size(), tensors.size());
auto lod_tensors = it.second.SplitLoDTensor(member_->places_);
for (size_t i = 0; i < tensors.size(); ++i) {
auto &map = tensors[i];
auto *scope = member_->local_scopes_[i];
for (auto &pair : map) {
auto *trg = scope->Var(pair.first)->GetMutable<LoDTensor>();
trg->ShareDataWith(pair.second);
trg->set_lod(pair.second.lod());
}
}
}
void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes(
const std::unordered_map<std::string, LoDTensor> &tensors) {
for (auto pair : tensors) {
auto lod_tensors = pair.second.SplitLoDTensor(member_->places_);
PADDLE_ENFORCE_EQ( PADDLE_ENFORCE_EQ(
member_->places_.size(), lod_tensors.size(), member_->places_.size(), lod_tensors.size(),
"The number of samples of current batch is less than the count of " "The number of samples of current batch is less than the count of "
...@@ -211,7 +221,7 @@ void ParallelExecutor::SplitTensorToPlaces( ...@@ -211,7 +221,7 @@ void ParallelExecutor::SplitTensorToPlaces(
for (size_t j = 0; j < member_->places_.size(); ++j) { for (size_t j = 0; j < member_->places_.size(); ++j) {
// TODO(panxy0718): Do I need to delete this var? // TODO(panxy0718): Do I need to delete this var?
auto t = auto t =
member_->local_scopes_[j]->Var(it.first)->GetMutable<LoDTensor>(); member_->local_scopes_[j]->Var(pair.first)->GetMutable<LoDTensor>();
t->ShareDataWith(lod_tensors[j]); t->ShareDataWith(lod_tensors[j]);
t->set_lod(lod_tensors[j].lod()); t->set_lod(lod_tensors[j].lod());
} }
......
...@@ -44,16 +44,22 @@ class ParallelExecutor { ...@@ -44,16 +44,22 @@ class ParallelExecutor {
std::vector<Scope*>& GetLocalScopes(); std::vector<Scope*>& GetLocalScopes();
/**
* Feed tensors to local scopes. The size of tensors should be equal to the
* size of local scopes.
*/
void FeedTensorsIntoLocalScopes(
const std::vector<std::unordered_map<std::string, LoDTensor>>& tensors);
void FeedAndSplitTensorIntoLocalScopes(
const std::unordered_map<std::string, LoDTensor>& tensors);
void Run(const std::vector<std::string>& fetch_tensors, void Run(const std::vector<std::string>& fetch_tensors,
const std::string& fetched_var_name, const std::string& fetched_var_name);
const std::unordered_map<std::string, LoDTensor>& feed_tensors);
void BCastParamsToGPUs(const std::unordered_set<std::string>& vars) const; void BCastParamsToGPUs(const std::unordered_set<std::string>& vars) const;
private: private:
void SplitTensorToPlaces(
const std::unordered_map<std::string, LoDTensor>& feed_tensors);
ParallelExecutorPrivate* member_; ParallelExecutorPrivate* member_;
}; };
......
...@@ -505,11 +505,19 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -505,11 +505,19 @@ All parameter, weight, gradient are variables in Paddle.
scope, local_scopes, allow_op_delay); scope, local_scopes, allow_op_delay);
}) })
.def("bcast_params", &ParallelExecutor::BCastParamsToGPUs) .def("bcast_params", &ParallelExecutor::BCastParamsToGPUs)
// NOTE: even we return a vec<Scope*>* to Python use reference policy.
// We still cannot get local_scope from this vector, since the element
// of vec<Scope*> will be freed by Python GC. We can only return Scope*
// one by one and mark them as reference.
.def("local_scopes", .def("local_scopes",
[](ParallelExecutor &self) -> std::vector<Scope *> * { [](ParallelExecutor &self) -> std::vector<Scope *> * {
return &self.GetLocalScopes(); return &self.GetLocalScopes();
}, },
py::return_value_policy::reference) py::return_value_policy::reference)
.def("feed_tensors_into_local_scopes",
&ParallelExecutor::FeedTensorsIntoLocalScopes)
.def("feed_and_split_tensor_into_local_scopes",
&ParallelExecutor::FeedAndSplitTensorIntoLocalScopes)
.def("run", &ParallelExecutor::Run); .def("run", &ParallelExecutor::Run);
BindRecordIOWriter(&m); BindRecordIOWriter(&m);
......
...@@ -190,6 +190,11 @@ void PyCUDATensorSetFromArray( ...@@ -190,6 +190,11 @@ void PyCUDATensorSetFromArray(
static_cast<const platform::CUDADeviceContext *>(pool.Get(place)); static_cast<const platform::CUDADeviceContext *>(pool.Get(place));
paddle::platform::GpuMemcpyAsync(dst, array.data(), sizeof(T) * array.size(), paddle::platform::GpuMemcpyAsync(dst, array.data(), sizeof(T) * array.size(),
cudaMemcpyHostToDevice, dev_ctx->stream()); cudaMemcpyHostToDevice, dev_ctx->stream());
// NOTE: For safety, here wait the copy complete.
// It because the CPU array.data() could be destroyed after this method.
// If we make this method async, it could be copied data from a memory buffer
// that has been freed.
dev_ctx->Wait();
} }
template <> template <>
...@@ -216,6 +221,11 @@ void PyCUDATensorSetFromArray( ...@@ -216,6 +221,11 @@ void PyCUDATensorSetFromArray(
paddle::platform::GpuMemcpyAsync(dst, array.data(), paddle::platform::GpuMemcpyAsync(dst, array.data(),
sizeof(uint16_t) * array.size(), sizeof(uint16_t) * array.size(),
cudaMemcpyHostToDevice, dev_ctx->stream()); cudaMemcpyHostToDevice, dev_ctx->stream());
// NOTE: For safety, here wait the copy complete.
// It because the CPU array.data() could be destroyed after this method.
// If we make this method async, it could be copied data from a memory buffer
// that has been freed.
dev_ctx->Wait();
} }
template <typename T> template <typename T>
......
...@@ -16,6 +16,7 @@ import core ...@@ -16,6 +16,7 @@ import core
import multiprocessing import multiprocessing
import framework import framework
import executor import executor
import sys
__all__ = ['ParallelExecutor'] __all__ = ['ParallelExecutor']
...@@ -123,28 +124,93 @@ class ParallelExecutor(object): ...@@ -123,28 +124,93 @@ class ParallelExecutor(object):
allow_op_delay) allow_op_delay)
self.scope = scope self.scope = scope
def run(self, fetch_list, feed_dict={}): def run(self, fetch_list, feed=None, feed_dict=None):
""" """
:param fetch_list: A list of variable names that will be fetched. Run a parallel executor with fetch_list.
:param feed_dict: A dict mapping for feed variable name to LoDTensor
or numpy array. The feed parameter can be a dict or a list. If feed is a dict, the
:return: fetched value list. feed data will be split into multiple devices. If feed is a list, we
""" assume the data has been splitted into multiple devices, the each
if not isinstance(feed_dict, dict): element in the list will be copied to each device directly.
raise TypeError("feed_dict should be a dict")
For example, if the feed is a dict:
>>> exe = ParallelExecutor()
>>> # the image will be splitted into devices. If there is two devices
>>> # each device will process an image with shape (24, 1, 28, 28)
>>> exe.run(feed={'image': numpy.random.random(size=(48, 1, 28, 28))})
For example, if the feed is a list:
>>> exe = ParallelExecutor()
>>> # each device will process each element in the list.
>>> # the 1st device will process an image with shape (48, 1, 28, 28)
>>> # the 2nd device will process an image with shape (32, 1, 28, 28)
>>> #
>>> # you can use exe.device_count to get the device number.
>>> exe.run(feed=[{"image": numpy.random.random(size=(48, 1, 28, 28))},
>>> {"image": numpy.random.random(size=(32, 1, 28, 28))},
>>> ])
Args:
fetch_list(list): The fetched variable names
feed(list|dict|None): The feed variables. If the feed is a dict,
tensors in that dict will be splitted into each devices. If
the feed is a list, each element of the list will be copied
to each device.
feed_dict: Alias for feed parameter, for backward compatibility.
This parameter is deprecated.
feed_tensor_dict = {} Returns: fetched result list.
for i, feed_name in enumerate(feed_dict):
feed_tensor = feed_dict[feed_name] """
if not isinstance(feed_tensor, core.LoDTensor): if feed is None:
feed_tensor = core.LoDTensor() feed = feed_dict
feed_tensor.set(feed_dict[feed_name], self._act_places[0]) print >> sys.stderr, "`feed_dict` is deprecated. Please use `feed=`"
feed_tensor_dict[feed_name] = feed_tensor
if isinstance(feed, dict):
feed_tensor_dict = dict()
for feed_name in feed:
feed_tensor = feed[feed_name]
if not isinstance(feed_tensor, core.LoDTensor):
feed_tensor = core.LoDTensor()
# always set to CPU place, since the tensor need to be splitted
# it is fast in CPU
feed_tensor.set(feed[feed_name], core.CPUPlace())
feed_tensor_dict[feed_name] = feed_tensor
self.executor.feed_and_split_tensor_into_local_scopes(
feed_tensor_dict)
elif isinstance(feed, list) or isinstance(feed, tuple):
if len(feed) != len(self._act_places):
raise ValueError(
"Feed a list of tensor, the list should be the same size as places"
)
res = list()
for i, each in enumerate(feed):
if not isinstance(each, dict):
raise TypeError(
"Each element of feed list should be a dict")
res_dict = dict()
for feed_name in each:
tensor = each[feed_name]
if not isinstance(tensor, core.LoDTensor):
tmp = core.LoDTensor()
tmp.set(tensor, self._act_places[i])
tensor = tmp
res_dict[feed_name] = tensor
res.append(res_dict)
self.executor.feed_tensors_into_local_scopes(res)
fetch_var_name = '@FETCHED_VAR_NAME@' fetch_var_name = '@FETCHED_VAR_NAME@'
self.executor.run(fetch_list, fetch_var_name, feed_tensor_dict) self.executor.run(fetch_list, fetch_var_name)
arr = self.scope.find_var(fetch_var_name).get_lod_tensor_array() arr = self.scope.find_var(fetch_var_name).get_lod_tensor_array()
return [arr[i] for i in range(len(arr))] return [arr[i] for i in range(len(arr))]
def bcast_params(self): def bcast_params(self):
self.executor.bcast_params(set(self.persistable_vars)) self.executor.bcast_params(set(self.persistable_vars))
@property
def device_count(self):
return len(self._act_places)
...@@ -203,12 +203,12 @@ class TestParallelExecutorBase(unittest.TestCase): ...@@ -203,12 +203,12 @@ class TestParallelExecutorBase(unittest.TestCase):
iter=10, iter=10,
batch_size=None, batch_size=None,
allow_op_delay=False, allow_op_delay=False,
feed_dict={}): feed_dict=None):
main = fluid.Program() main = fluid.Program()
startup = fluid.Program() startup = fluid.Program()
startup.random_seed = 1 # Fix random seed startup.random_seed = 1 # Fix random seed
with fluid.program_guard(main, startup): with fluid.program_guard(main, startup):
loss = method(use_feed=len(feed_dict) > 0) loss = method(use_feed=feed_dict is not None)
adam = fluid.optimizer.Adam() adam = fluid.optimizer.Adam()
adam.minimize(loss) adam.minimize(loss)
if memory_opt: if memory_opt:
...@@ -222,13 +222,13 @@ class TestParallelExecutorBase(unittest.TestCase): ...@@ -222,13 +222,13 @@ class TestParallelExecutorBase(unittest.TestCase):
if batch_size is not None: if batch_size is not None:
batch_size *= fluid.core.get_cuda_device_count() batch_size *= fluid.core.get_cuda_device_count()
begin = time.time() begin = time.time()
first_loss, = exe.run([loss.name], feed_dict=feed_dict) first_loss, = exe.run([loss.name], feed=feed_dict)
first_loss = numpy.array(first_loss) first_loss = numpy.array(first_loss)
for i in xrange(iter): for i in xrange(iter):
exe.run([], feed_dict=feed_dict) exe.run([], feed=feed_dict)
last_loss, = exe.run([loss.name], feed_dict=feed_dict) last_loss, = exe.run([loss.name], feed=feed_dict)
end = time.time() end = time.time()
if batch_size is not None: if batch_size is not None:
...@@ -649,5 +649,5 @@ class TestCRFModel(unittest.TestCase): ...@@ -649,5 +649,5 @@ class TestCRFModel(unittest.TestCase):
for i in xrange(10): for i in xrange(10):
cur_batch = next(data) cur_batch = next(data)
print map(numpy.array, print map(numpy.array,
pe.run(feed_dict=feeder.feed(cur_batch), pe.run(feed=feeder.feed(cur_batch),
fetch_list=[avg_cost.name]))[0] fetch_list=[avg_cost.name]))[0]
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册