提交 4bbfa9ec 编写于 作者: X Xin Pan

Add feed to ParallelExecutor

上级 a98a3fdc
文件模式从 100755 更改为 100644
...@@ -142,6 +142,7 @@ class LoDTensor : public Tensor { ...@@ -142,6 +142,7 @@ class LoDTensor : public Tensor {
return (lod_)[level].size() - 1; return (lod_)[level].size() - 1;
} }
// Split LoDTensor and copy to each place specified in places.
std::vector<LoDTensor> SplitLoDTensor( std::vector<LoDTensor> SplitLoDTensor(
const std::vector<platform::Place> places) const; const std::vector<platform::Place> places) const;
......
...@@ -150,13 +150,30 @@ void ParallelExecutor::BCastParamsToGPUs( ...@@ -150,13 +150,30 @@ void ParallelExecutor::BCastParamsToGPUs(
#endif #endif
} }
void ParallelExecutor::Run(const std::vector<std::string> &fetch_tensors, void ParallelExecutor::Run(
const std::string &fetched_var_name) { const std::vector<std::string> &fetch_tensors,
const std::string &fetched_var_name,
const std::unordered_map<std::string, LoDTensor> &feed_tensors) {
platform::RecordBlock b(0); platform::RecordBlock b(0);
SplitTensorToPlaces(feed_tensors);
auto fetch_data = member_->executor_->Run(fetch_tensors); auto fetch_data = member_->executor_->Run(fetch_tensors);
*member_->global_scope_->Var(fetched_var_name)->GetMutable<FeedFetchList>() = *member_->global_scope_->Var(fetched_var_name)->GetMutable<FeedFetchList>() =
fetch_data; fetch_data;
} }
void ParallelExecutor::SplitTensorToPlaces(
const std::unordered_map<std::string, LoDTensor> &feed_tensors) {
for (auto it : feed_tensors) {
auto lod_tensors = it.second.SplitLoDTensor(member_->places_);
for (size_t j = 0; j < member_->places_.size(); ++j) {
// TODO(panxy0718): Do I need to delete this var?
member_->local_scopes_[j]
->Var(it.first)
->GetMutable<LoDTensor>()
->ShareDataWith(lod_tensors[j]);
}
}
}
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
...@@ -42,9 +42,13 @@ class ParallelExecutor { ...@@ -42,9 +42,13 @@ class ParallelExecutor {
bool allow_op_delay); bool allow_op_delay);
void Run(const std::vector<std::string>& fetch_tensors, void Run(const std::vector<std::string>& fetch_tensors,
const std::string& fetched_var_name = "fetched_var"); const std::string& fetched_var_name,
const std::unordered_map<std::string, LoDTensor>& feed_tensors);
private: private:
void SplitTensorToPlaces(
const std::unordered_map<std::string, LoDTensor>& feed_tensors);
ParallelExecutorPrivate* member_; ParallelExecutorPrivate* member_;
void BCastParamsToGPUs(const ProgramDesc& startup_program) const; void BCastParamsToGPUs(const ProgramDesc& startup_program) const;
......
...@@ -26,25 +26,29 @@ class ParallelExecutor(object): ...@@ -26,25 +26,29 @@ class ParallelExecutor(object):
use_cuda, use_cuda,
num_threads=None, num_threads=None,
allow_op_delay=False): allow_op_delay=False):
places = [] self._places = []
self._act_places = []
if use_cuda: if use_cuda:
for i in xrange(core.get_cuda_device_count()): for i in xrange(core.get_cuda_device_count()):
p = core.Place() p = core.Place()
p.set_place(core.CUDAPlace(i)) self._act_places.append(core.CUDAPlace(i))
places.append(p) p.set_place(self._act_places[-1])
self._places.append(p)
else: else:
for i in xrange(multiprocessing.cpu_count()): for i in xrange(multiprocessing.cpu_count()):
p = core.Place() p = core.Place()
p.set_place(core.CPUPlace()) self._act_places.append(core.CPUPlace(i))
places.append(p) p.set_place(self._act_places[-1])
self._places.append(p)
assert self._places, "no place for execution"
if num_threads is None: if num_threads is None:
if use_cuda: if use_cuda:
# Experiments on se-resnext shows that too many threads hurt # Experiments on se-resnext shows that too many threads hurt
# performance. Worth tunning for other models in the future. # performance. Worth tunning for other models in the future.
num_threads = len(places) num_threads = len(self._places)
else: else:
min(len(places) * 2, multiprocessing.cpu_count()) min(len(self._places) * 2, multiprocessing.cpu_count())
startup = framework.default_startup_program() startup = framework.default_startup_program()
main = framework.default_main_program() main = framework.default_main_program()
...@@ -53,7 +57,7 @@ class ParallelExecutor(object): ...@@ -53,7 +57,7 @@ class ParallelExecutor(object):
self.executor = core.ParallelExecutor( self.executor = core.ParallelExecutor(
num_threads, num_threads,
True if use_cuda else False, # use_event True if use_cuda else False, # use_event
places, self._places,
set([ set([
p.name for p in main.global_block().iter_parameters() p.name for p in main.global_block().iter_parameters()
if not p.stop_gradient if not p.stop_gradient
...@@ -65,8 +69,22 @@ class ParallelExecutor(object): ...@@ -65,8 +69,22 @@ class ParallelExecutor(object):
allow_op_delay) allow_op_delay)
self.scope = scope self.scope = scope
def run(self, fetch_list): def run(self, fetch_list, feed_dict={}):
"""
:param fetch_list: A list of variable names that will be fetched.
:param feed_dict: A dict mapping for feed variable name to LoDTensor
or numpy array.
:return: fetched value list.
"""
feed_tensor_dict = {}
for i, feed_name in enumerate(feed_dict):
feed_tensor = feed_dict[feed_name]
if not isinstance(feed_tensor, core.LoDTensor):
feed_tensor = core.LoDTensor()
feed_tensor.set(feed_dict[feed_name], self._act_places[0])
feed_tensor_dict[feed_name] = feed_tensor
fetch_var_name = '@FETCHED_VAR_NAME@' fetch_var_name = '@FETCHED_VAR_NAME@'
self.executor.run(fetch_list, fetch_var_name) self.executor.run(fetch_list, fetch_var_name, feed_tensor_dict)
arr = self.scope.find_var(fetch_var_name).get_lod_tensor_array() arr = self.scope.find_var(fetch_var_name).get_lod_tensor_array()
return [arr[i] for i in range(len(arr))] return [arr[i] for i in range(len(arr))]
...@@ -21,13 +21,17 @@ import paddle.dataset.mnist as mnist ...@@ -21,13 +21,17 @@ import paddle.dataset.mnist as mnist
import paddle.dataset.wmt16 as wmt16 import paddle.dataset.wmt16 as wmt16
def simple_fc_net(): def simple_fc_net(use_feed):
reader = fluid.layers.open_recordio_file( if use_feed:
filename='./mnist.recordio', img = fluid.layers.data(name='image', shape=[784], dtype='float32')
shapes=[[-1, 784], [-1, 1]], label = fluid.layers.data(name='label', shape=[1], dtype='int64')
lod_levels=[0, 0], else:
dtypes=['float32', 'int64']) reader = fluid.layers.open_recordio_file(
img, label = fluid.layers.read_file(reader) filename='./mnist.recordio',
shapes=[[-1, 784], [-1, 1]],
lod_levels=[0, 0],
dtypes=['float32', 'int64'])
img, label = fluid.layers.read_file(reader)
hidden = img hidden = img
for _ in xrange(4): for _ in xrange(4):
hidden = fluid.layers.fc( hidden = fluid.layers.fc(
...@@ -42,13 +46,18 @@ def simple_fc_net(): ...@@ -42,13 +46,18 @@ def simple_fc_net():
return loss return loss
def fc_with_batchnorm(): def fc_with_batchnorm(use_feed):
reader = fluid.layers.open_recordio_file( if use_feed:
filename='./mnist.recordio', img = fluid.layers.data(name='image', shape=[784], dtype='float32')
shapes=[[-1, 784], [-1, 1]], label = fluid.layers.data(name='label', shape=[1], dtype='int64')
lod_levels=[0, 0], else:
dtypes=['float32', 'int64']) reader = fluid.layers.open_recordio_file(
img, label = fluid.layers.read_file(reader) filename='./mnist.recordio',
shapes=[[-1, 784], [-1, 1]],
lod_levels=[0, 0],
dtypes=['float32', 'int64'])
img, label = fluid.layers.read_file(reader)
hidden = img hidden = img
for _ in xrange(1): for _ in xrange(1):
hidden = fluid.layers.fc( hidden = fluid.layers.fc(
...@@ -135,7 +144,9 @@ def bottleneck_block(input, num_filters, stride, cardinality, reduction_ratio): ...@@ -135,7 +144,9 @@ def bottleneck_block(input, num_filters, stride, cardinality, reduction_ratio):
return fluid.layers.elementwise_add(x=short, y=scale, act='relu') return fluid.layers.elementwise_add(x=short, y=scale, act='relu')
def SE_ResNeXt152Small(batch_size=2): def SE_ResNeXt152Small(batch_size=2, use_feed=False):
assert not use_feed, "SE_ResNeXt doesn't support feed yet"
img = fluid.layers.fill_constant( img = fluid.layers.fill_constant(
shape=[batch_size, 3, 224, 224], dtype='float32', value=0.0) shape=[batch_size, 3, 224, 224], dtype='float32', value=0.0)
label = fluid.layers.fill_constant( label = fluid.layers.fill_constant(
...@@ -185,30 +196,28 @@ class TestParallelExecutorBase(unittest.TestCase): ...@@ -185,30 +196,28 @@ class TestParallelExecutorBase(unittest.TestCase):
memory_opt=True, memory_opt=True,
iter=10, iter=10,
batch_size=None, batch_size=None,
allow_op_delay=False): allow_op_delay=False,
feed_dict={}):
main = fluid.Program() main = fluid.Program()
startup = fluid.Program() startup = fluid.Program()
with fluid.program_guard(main, startup): with fluid.program_guard(main, startup):
loss = method() loss = method(use_feed=len(feed_dict) > 0)
adam = fluid.optimizer.Adam() adam = fluid.optimizer.Adam()
adam.minimize(loss) adam.minimize(loss)
if memory_opt: if memory_opt:
fluid.memory_optimize(main) fluid.memory_optimize(main)
exe = fluid.ParallelExecutor( exe = fluid.ParallelExecutor(loss_name=loss.name, use_cuda=True)
loss_name=loss.name,
use_cuda=True,
allow_op_delay=allow_op_delay)
if batch_size is not None: if batch_size is not None:
batch_size *= fluid.core.get_cuda_device_count() batch_size *= fluid.core.get_cuda_device_count()
begin = time.time() begin = time.time()
first_loss, = exe.run([loss.name]) first_loss, = exe.run([loss.name], feed_dict=feed_dict)
first_loss = numpy.array(first_loss) first_loss = numpy.array(first_loss)
for i in xrange(iter): for i in xrange(iter):
exe.run([]) exe.run([], feed_dict=feed_dict)
last_loss, = exe.run([loss.name]) last_loss, = exe.run([loss.name], feed_dict=feed_dict)
end = time.time() end = time.time()
if batch_size is not None: if batch_size is not None:
...@@ -242,9 +251,19 @@ class TestMNIST(TestParallelExecutorBase): ...@@ -242,9 +251,19 @@ class TestMNIST(TestParallelExecutorBase):
self.check_network_convergence(simple_fc_net) self.check_network_convergence(simple_fc_net)
self.check_network_convergence(simple_fc_net, allow_op_delay=True) self.check_network_convergence(simple_fc_net, allow_op_delay=True)
img = numpy.zeros(shape=[32, 784], dtype='float32')
label = numpy.ones(shape=[32, 1], dtype='int64')
self.check_network_convergence(
simple_fc_net, feed_dict={"image": img,
"label": label})
def test_batchnorm_fc(self): def test_batchnorm_fc(self):
self.check_network_convergence(fc_with_batchnorm) self.check_network_convergence(fc_with_batchnorm)
self.check_network_convergence(fc_with_batchnorm, allow_op_delay=True) img = numpy.zeros(shape=[32, 784], dtype='float32')
label = numpy.ones(shape=[32, 1], dtype='int64')
self.check_network_convergence(
fc_with_batchnorm, feed_dict={"image": img,
"label": label})
class TestResnet(TestParallelExecutorBase): class TestResnet(TestParallelExecutorBase):
...@@ -400,7 +419,8 @@ def prepare_batch_input(insts, src_pad_idx, trg_pad_idx, n_head): ...@@ -400,7 +419,8 @@ def prepare_batch_input(insts, src_pad_idx, trg_pad_idx, n_head):
import transformer_model import transformer_model
def transformer(): def transformer(use_feed):
assert not use_feed, "transfomer doesn't support feed yet"
return transformer_model.transformer( return transformer_model.transformer(
ModelHyperParams.src_vocab_size + 1, ModelHyperParams.src_vocab_size + 1,
ModelHyperParams.trg_vocab_size + 1, ModelHyperParams.max_length + 1, ModelHyperParams.trg_vocab_size + 1, ModelHyperParams.max_length + 1,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册