未验证 提交 865a714e 编写于 作者: Y Yu Yang 提交者: GitHub

Merge pull request #7970 from reyoung/feature/test_w2v_parallel.do

Make word2vec uses parallel.do when CI
...@@ -68,7 +68,32 @@ class SumKernel : public framework::OpKernel<T> { ...@@ -68,7 +68,32 @@ class SumKernel : public framework::OpKernel<T> {
} }
} }
} else if (out_var->IsType<framework::SelectedRows>()) { } else if (out_var->IsType<framework::SelectedRows>()) {
PADDLE_ENFORCE(!in_place, "SelectedRows not support inplace sum now"); std::unique_ptr<framework::SelectedRows> in0;
if (in_place) {
// If is in_place, we store the input[0] to in0
auto &in_sel0 = in_vars[0]->Get<SelectedRows>();
auto &rows = in_sel0.rows();
#ifdef PADDLE_WITH_CUDA
std::vector<int64_t> rows_in_cpu;
rows_in_cpu.reserve(rows.size());
for (auto item : rows) {
rows_in_cpu.push_back(item);
}
in0.reset(new framework::SelectedRows(rows_in_cpu, in_sel0.height()));
#else
in0.reset(new framework::SelectedRows(rows, in_sel0.height()));
#endif
in0->mutable_value()->ShareDataWith(in_sel0.value());
}
auto get_selected_row = [&](size_t i) -> const SelectedRows & {
if (i == 0 && in0) {
return *in0.get();
} else {
return in_vars[i]->Get<SelectedRows>();
}
};
auto *out = context.Output<SelectedRows>("Out"); auto *out = context.Output<SelectedRows>("Out");
out->mutable_rows()->clear(); out->mutable_rows()->clear();
auto *out_value = out->mutable_value(); auto *out_value = out->mutable_value();
...@@ -76,24 +101,26 @@ class SumKernel : public framework::OpKernel<T> { ...@@ -76,24 +101,26 @@ class SumKernel : public framework::OpKernel<T> {
// Runtime InferShape // Runtime InferShape
size_t first_dim = 0; size_t first_dim = 0;
for (int i = 0; i < N; i++) { for (int i = 0; i < N; i++) {
first_dim += in_vars[i]->Get<SelectedRows>().rows().size(); auto &sel_row = get_selected_row(i);
first_dim += sel_row.rows().size();
} }
auto in_dim = in_vars[0]->Get<SelectedRows>().value().dims(); auto in_dim =
auto in_dim_vec = framework::vectorize(in_dim); framework::vectorize(get_selected_row(N - 1).value().dims());
in_dim_vec[0] = static_cast<int64_t>(first_dim); in_dim[0] = static_cast<int64_t>(first_dim);
out_value->Resize(framework::make_ddim(in_dim_vec)); out_value->Resize(framework::make_ddim(in_dim));
out_value->mutable_data<T>(context.GetPlace()); out_value->mutable_data<T>(context.GetPlace());
math::SelectedRowsAddTo<DeviceContext, T> functor; math::SelectedRowsAddTo<DeviceContext, T> functor;
int64_t offset = 0; int64_t offset = 0;
for (int i = 0; i < N; i++) { for (int i = 0; i < N; i++) {
PADDLE_ENFORCE_EQ(out->height(), auto &sel_row = get_selected_row(i);
in_vars[i]->Get<SelectedRows>().height());
functor(context.template device_context<DeviceContext>(), PADDLE_ENFORCE_EQ(out->height(), sel_row.height());
in_vars[i]->Get<SelectedRows>(), offset, out); functor(context.template device_context<DeviceContext>(), sel_row,
offset += in_vars[i]->Get<SelectedRows>().value().numel(); offset, out);
offset += sel_row.value().numel();
} }
} else if (out_var->IsType<framework::LoDTensorArray>()) { } else if (out_var->IsType<framework::LoDTensorArray>()) {
auto &out_array = *out_var->GetMutable<framework::LoDTensorArray>(); auto &out_array = *out_var->GetMutable<framework::LoDTensorArray>();
......
...@@ -12,76 +12,145 @@ ...@@ -12,76 +12,145 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import numpy as np
import paddle.v2 as paddle import paddle.v2 as paddle
import paddle.v2.fluid as fluid import paddle.v2.fluid as fluid
import unittest
import os
PASS_NUM = 100
EMBED_SIZE = 32
HIDDEN_SIZE = 256
N = 5
BATCH_SIZE = 32
IS_SPARSE = True
word_dict = paddle.dataset.imikolov.build_dict() def main(use_cuda, is_sparse, parallel):
dict_size = len(word_dict) if use_cuda and not fluid.core.is_compiled_with_cuda():
return
first_word = fluid.layers.data(name='firstw', shape=[1], dtype='int64') PASS_NUM = 100
second_word = fluid.layers.data(name='secondw', shape=[1], dtype='int64') EMBED_SIZE = 32
third_word = fluid.layers.data(name='thirdw', shape=[1], dtype='int64') HIDDEN_SIZE = 256
forth_word = fluid.layers.data(name='forthw', shape=[1], dtype='int64') N = 5
next_word = fluid.layers.data(name='nextw', shape=[1], dtype='int64') BATCH_SIZE = 32
IS_SPARSE = is_sparse
embed_first = fluid.layers.embedding( def __network__(words):
input=first_word, embed_first = fluid.layers.embedding(
input=words[0],
size=[dict_size, EMBED_SIZE], size=[dict_size, EMBED_SIZE],
dtype='float32', dtype='float32',
is_sparse=IS_SPARSE, is_sparse=IS_SPARSE,
param_attr='shared_w') param_attr='shared_w')
embed_second = fluid.layers.embedding( embed_second = fluid.layers.embedding(
input=second_word, input=words[1],
size=[dict_size, EMBED_SIZE], size=[dict_size, EMBED_SIZE],
dtype='float32', dtype='float32',
is_sparse=IS_SPARSE, is_sparse=IS_SPARSE,
param_attr='shared_w') param_attr='shared_w')
embed_third = fluid.layers.embedding( embed_third = fluid.layers.embedding(
input=third_word, input=words[2],
size=[dict_size, EMBED_SIZE], size=[dict_size, EMBED_SIZE],
dtype='float32', dtype='float32',
is_sparse=IS_SPARSE, is_sparse=IS_SPARSE,
param_attr='shared_w') param_attr='shared_w')
embed_forth = fluid.layers.embedding( embed_forth = fluid.layers.embedding(
input=forth_word, input=words[3],
size=[dict_size, EMBED_SIZE], size=[dict_size, EMBED_SIZE],
dtype='float32', dtype='float32',
is_sparse=IS_SPARSE, is_sparse=IS_SPARSE,
param_attr='shared_w') param_attr='shared_w')
concat_embed = fluid.layers.concat( concat_embed = fluid.layers.concat(
input=[embed_first, embed_second, embed_third, embed_forth], axis=1) input=[embed_first, embed_second, embed_third, embed_forth], axis=1)
hidden1 = fluid.layers.fc(input=concat_embed, size=HIDDEN_SIZE, act='sigmoid') hidden1 = fluid.layers.fc(input=concat_embed,
predict_word = fluid.layers.fc(input=hidden1, size=dict_size, act='softmax') size=HIDDEN_SIZE,
cost = fluid.layers.cross_entropy(input=predict_word, label=next_word) act='sigmoid')
avg_cost = fluid.layers.mean(x=cost) predict_word = fluid.layers.fc(input=hidden1,
sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001) size=dict_size,
sgd_optimizer.minimize(avg_cost) act='softmax')
cost = fluid.layers.cross_entropy(input=predict_word, label=words[4])
train_reader = paddle.batch( avg_cost = fluid.layers.mean(x=cost)
return avg_cost
word_dict = paddle.dataset.imikolov.build_dict()
dict_size = len(word_dict)
first_word = fluid.layers.data(name='firstw', shape=[1], dtype='int64')
second_word = fluid.layers.data(name='secondw', shape=[1], dtype='int64')
third_word = fluid.layers.data(name='thirdw', shape=[1], dtype='int64')
forth_word = fluid.layers.data(name='forthw', shape=[1], dtype='int64')
next_word = fluid.layers.data(name='nextw', shape=[1], dtype='int64')
if not parallel:
avg_cost = __network__(
[first_word, second_word, third_word, forth_word, next_word])
else:
places = fluid.layers.get_places()
pd = fluid.layers.ParallelDo(places)
with pd.do():
avg_cost = __network__(
map(pd.read_input, [
first_word, second_word, third_word, forth_word, next_word
]))
pd.write_output(avg_cost)
avg_cost = fluid.layers.mean(x=pd())
sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001)
sgd_optimizer.minimize(avg_cost)
train_reader = paddle.batch(
paddle.dataset.imikolov.train(word_dict, N), BATCH_SIZE) paddle.dataset.imikolov.train(word_dict, N), BATCH_SIZE)
place = fluid.CPUPlace() place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe = fluid.Executor(place) exe = fluid.Executor(place)
feeder = fluid.DataFeeder( feeder = fluid.DataFeeder(
feed_list=[first_word, second_word, third_word, forth_word, next_word], feed_list=[first_word, second_word, third_word, forth_word, next_word],
place=place) place=place)
exe.run(fluid.default_startup_program()) exe.run(fluid.default_startup_program())
for pass_id in range(PASS_NUM): for pass_id in range(PASS_NUM):
for data in train_reader(): for data in train_reader():
avg_cost_np = exe.run(fluid.default_main_program(), avg_cost_np = exe.run(fluid.default_main_program(),
feed=feeder.feed(data), feed=feeder.feed(data),
fetch_list=[avg_cost]) fetch_list=[avg_cost])
if avg_cost_np[0] < 5.0: if avg_cost_np[0] < 5.0:
exit(0) # if avg cost less than 10.0, we think our code is good. return
exit(1) raise AssertionError("Cost is too large {0:2.2}".format(avg_cost_np[0]))
FULL_TEST = os.getenv('FULL_TEST',
'0').lower() in ['true', '1', 't', 'y', 'yes', 'on']
SKIP_REASON = "Only run minimum number of tests in CI server, to make CI faster"
class W2VTest(unittest.TestCase):
pass
def inject_test_method(use_cuda, is_sparse, parallel):
fn_name = "test_{0}_{1}_{2}".format("cuda" if use_cuda else "cpu", "sparse"
if is_sparse else "dense", "parallel"
if parallel else "normal")
def __impl__(*args, **kwargs):
prog = fluid.Program()
startup_prog = fluid.Program()
scope = fluid.core.Scope()
with fluid.scope_guard(scope):
with fluid.program_guard(prog, startup_prog):
main(use_cuda=use_cuda, is_sparse=is_sparse, parallel=parallel)
if use_cuda and is_sparse and parallel:
fn = __impl__
else:
# skip the other test when on CI server
fn = unittest.skipUnless(
condition=FULL_TEST, reason=SKIP_REASON)(__impl__)
setattr(W2VTest, fn_name, fn)
for use_cuda in (False, True):
for is_sparse in (False, True):
for parallel in (False, True):
inject_test_method(use_cuda, is_sparse, parallel)
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册