提交 7c0cc113 编写于 作者: Y Yang Yu

Test word2vec for parallel.do

* Polish sum_op support SelectedRows in_place
上级 32585ece
......@@ -68,7 +68,32 @@ class SumKernel : public framework::OpKernel<T> {
}
}
} else if (out_var->IsType<framework::SelectedRows>()) {
PADDLE_ENFORCE(!in_place, "SelectedRows not support inplace sum now");
std::unique_ptr<framework::SelectedRows> in0;
if (in_place) {
// If is in_place, we store the input[0] to in0
auto &in_sel0 = in_vars[0]->Get<SelectedRows>();
auto &rows = in_sel0.rows();
#ifdef PADDLE_WITH_CUDA
std::vector<int64_t> rows_in_cpu;
rows_in_cpu.reserve(rows.size());
for (auto item : rows) {
rows_in_cpu.push_back(item);
}
in0.reset(new framework::SelectedRows(rows_in_cpu, in_sel0.height()));
#else
in0.reset(new framework::SelectedRows(rows, in_sel0.height()));
#endif
in0->mutable_value()->ShareDataWith(in_sel0.value());
}
auto get_selected_row = [&](size_t i) -> const SelectedRows & {
if (i == 0 && in0) {
return *in0.get();
} else {
return in_vars[i]->Get<SelectedRows>();
}
};
auto *out = context.Output<SelectedRows>("Out");
out->mutable_rows()->clear();
auto *out_value = out->mutable_value();
......@@ -76,24 +101,26 @@ class SumKernel : public framework::OpKernel<T> {
// Runtime InferShape
size_t first_dim = 0;
for (int i = 0; i < N; i++) {
first_dim += in_vars[i]->Get<SelectedRows>().rows().size();
auto &sel_row = get_selected_row(i);
first_dim += sel_row.rows().size();
}
auto in_dim = in_vars[0]->Get<SelectedRows>().value().dims();
auto in_dim_vec = framework::vectorize(in_dim);
in_dim_vec[0] = static_cast<int64_t>(first_dim);
auto in_dim =
framework::vectorize(get_selected_row(N - 1).value().dims());
in_dim[0] = static_cast<int64_t>(first_dim);
out_value->Resize(framework::make_ddim(in_dim_vec));
out_value->Resize(framework::make_ddim(in_dim));
out_value->mutable_data<T>(context.GetPlace());
math::SelectedRowsAddTo<DeviceContext, T> functor;
int64_t offset = 0;
for (int i = 0; i < N; i++) {
PADDLE_ENFORCE_EQ(out->height(),
in_vars[i]->Get<SelectedRows>().height());
functor(context.template device_context<DeviceContext>(),
in_vars[i]->Get<SelectedRows>(), offset, out);
offset += in_vars[i]->Get<SelectedRows>().value().numel();
auto &sel_row = get_selected_row(i);
PADDLE_ENFORCE_EQ(out->height(), sel_row.height());
functor(context.template device_context<DeviceContext>(), sel_row,
offset, out);
offset += sel_row.value().numel();
}
} else if (out_var->IsType<framework::LoDTensorArray>()) {
auto &out_array = *out_var->GetMutable<framework::LoDTensorArray>();
......
......@@ -15,9 +15,10 @@
import paddle.v2 as paddle
import paddle.v2.fluid as fluid
import unittest
import os
def main_impl(use_cuda):
def main(use_cuda, is_sparse, parallel):
if use_cuda and not fluid.core.is_compiled_with_cuda():
return
......@@ -26,7 +27,45 @@ def main_impl(use_cuda):
HIDDEN_SIZE = 256
N = 5
BATCH_SIZE = 32
IS_SPARSE = True
IS_SPARSE = is_sparse
def __network__(words):
embed_first = fluid.layers.embedding(
input=words[0],
size=[dict_size, EMBED_SIZE],
dtype='float32',
is_sparse=IS_SPARSE,
param_attr='shared_w')
embed_second = fluid.layers.embedding(
input=words[1],
size=[dict_size, EMBED_SIZE],
dtype='float32',
is_sparse=IS_SPARSE,
param_attr='shared_w')
embed_third = fluid.layers.embedding(
input=words[2],
size=[dict_size, EMBED_SIZE],
dtype='float32',
is_sparse=IS_SPARSE,
param_attr='shared_w')
embed_forth = fluid.layers.embedding(
input=words[3],
size=[dict_size, EMBED_SIZE],
dtype='float32',
is_sparse=IS_SPARSE,
param_attr='shared_w')
concat_embed = fluid.layers.concat(
input=[embed_first, embed_second, embed_third, embed_forth], axis=1)
hidden1 = fluid.layers.fc(input=concat_embed,
size=HIDDEN_SIZE,
act='sigmoid')
predict_word = fluid.layers.fc(input=hidden1,
size=dict_size,
act='softmax')
cost = fluid.layers.cross_entropy(input=predict_word, label=words[4])
avg_cost = fluid.layers.mean(x=cost)
return avg_cost
word_dict = paddle.dataset.imikolov.build_dict()
dict_size = len(word_dict)
......@@ -37,39 +76,21 @@ def main_impl(use_cuda):
forth_word = fluid.layers.data(name='forthw', shape=[1], dtype='int64')
next_word = fluid.layers.data(name='nextw', shape=[1], dtype='int64')
embed_first = fluid.layers.embedding(
input=first_word,
size=[dict_size, EMBED_SIZE],
dtype='float32',
is_sparse=IS_SPARSE,
param_attr='shared_w')
embed_second = fluid.layers.embedding(
input=second_word,
size=[dict_size, EMBED_SIZE],
dtype='float32',
is_sparse=IS_SPARSE,
param_attr='shared_w')
embed_third = fluid.layers.embedding(
input=third_word,
size=[dict_size, EMBED_SIZE],
dtype='float32',
is_sparse=IS_SPARSE,
param_attr='shared_w')
embed_forth = fluid.layers.embedding(
input=forth_word,
size=[dict_size, EMBED_SIZE],
dtype='float32',
is_sparse=IS_SPARSE,
param_attr='shared_w')
concat_embed = fluid.layers.concat(
input=[embed_first, embed_second, embed_third, embed_forth], axis=1)
hidden1 = fluid.layers.fc(input=concat_embed,
size=HIDDEN_SIZE,
act='sigmoid')
predict_word = fluid.layers.fc(input=hidden1, size=dict_size, act='softmax')
cost = fluid.layers.cross_entropy(input=predict_word, label=next_word)
avg_cost = fluid.layers.mean(x=cost)
if not parallel:
avg_cost = __network__(
[first_word, second_word, third_word, forth_word, next_word])
else:
places = fluid.layers.get_places()
pd = fluid.layers.ParallelDo(places)
with pd.do():
avg_cost = __network__(
map(pd.read_input, [
first_word, second_word, third_word, forth_word, next_word
]))
pd.write_output(avg_cost)
avg_cost = fluid.layers.mean(x=pd())
sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001)
sgd_optimizer.minimize(avg_cost)
......@@ -94,22 +115,42 @@ def main_impl(use_cuda):
raise AssertionError("Cost is too large {0:2.2}".format(avg_cost_np[0]))
def main(*args, **kwargs):
prog = fluid.Program()
startup_prog = fluid.Program()
scope = fluid.core.Scope()
with fluid.scope_guard(scope):
with fluid.program_guard(prog, startup_prog):
main_impl(*args, **kwargs)
FULL_TEST = os.getenv('FULL_TEST',
'1').lower() in ['true', '1', 't', 'y', 'yes', 'on']
SKIP_REASON = "Only run minimum number of tests in CI server, to make CI faster"
class W2VTest(unittest.TestCase):
def test_cpu_normal(self):
main(use_cuda=False)
pass
def inject_test_method(use_cuda, is_sparse, parallel):
fn_name = "test_{0}_{1}_{2}".format("cuda" if use_cuda else "cpu", "sparse"
if is_sparse else "dense", "parallel"
if parallel else "normal")
def __impl__(*args, **kwargs):
prog = fluid.Program()
startup_prog = fluid.Program()
scope = fluid.core.Scope()
with fluid.scope_guard(scope):
with fluid.program_guard(prog, startup_prog):
main(use_cuda=use_cuda, is_sparse=is_sparse, parallel=parallel)
if use_cuda and is_sparse and parallel:
fn = __impl__
else:
# skip the other test when on CI server
fn = unittest.skipUnless(
condition=FULL_TEST, reason=SKIP_REASON)(__impl__)
setattr(W2VTest, fn_name, fn)
def test_gpu_normal(self):
main(use_cuda=True)
for use_cuda in (False, True):
for is_sparse in (False, True):
for parallel in (False, True):
inject_test_method(use_cuda, is_sparse, parallel)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册