未验证 提交 e249ad12 编写于 作者: Y Yancey 提交者: GitHub

Test dist word2vec (#7334)

* test dist word2vec

* multiple trainers work
上级 b5fda272
......@@ -35,7 +35,7 @@ const std::string kFetchOpType = "fetch";
Executor::Executor(const platform::Place& place) : place_(place) {}
void CreateTensor(Variable* var, proto::VarDesc::VarType var_type) {
static void CreateTensor(Variable* var, proto::VarDesc::VarType var_type) {
if (var_type == proto::VarDesc::LOD_TENSOR) {
var->GetMutable<LoDTensor>();
} else if (var_type == proto::VarDesc::SELECTED_ROWS) {
......
......@@ -45,7 +45,5 @@ class Executor {
const platform::Place place_;
};
void CreateTensor(Variable* var, proto::VarDesc::VarType var_type);
} // namespace framework
} // namespace paddle
......@@ -32,6 +32,20 @@ limitations under the License. */
namespace paddle {
namespace operators {
static void CreateTensorFromMessageType(framework::Variable *var,
sendrecv::VarType var_type) {
if (var_type == sendrecv::VarType::LOD_TENSOR) {
var->GetMutable<framework::LoDTensor>();
} else if (var_type == sendrecv::VarType::SELECTED_ROWS) {
var->GetMutable<framework::SelectedRows>();
} else {
PADDLE_THROW(
"VraibleMessage type %d is not in "
"[LoDTensor, SelectedRows]",
var_type);
}
}
void RunServer(Server **rpc_server,
std::shared_ptr<detail::SendRecvServerImpl> service,
const std::string &server_address) {
......@@ -111,10 +125,10 @@ class RecvOp : public framework::OperatorBase {
auto *merged_grad = recv_scope.FindVar(grad_var_name);
if (merged_grad == nullptr) {
auto *ptr = recv_scope.Var(grad_var_name);
framework::CreateTensor(ptr,
framework::ToVarType(merged_grad->Type()));
CreateTensorFromMessageType(ptr, v.second.type());
VLOG(3) << "Create Variable " << grad_var_name
<< " on recv scope, which pointer is " << ptr;
<< " on recv scope, which pointer is " << ptr << " type is "
<< v.second.type();
}
if (trainer_count > 1) {
......
......@@ -70,6 +70,7 @@ class SumKernel : public framework::OpKernel<T> {
} else if (out_var->IsType<framework::SelectedRows>()) {
PADDLE_ENFORCE(!in_place, "SelectedRows not support inplace sum now");
auto *out = context.Output<SelectedRows>("Out");
out->mutable_rows()->clear();
auto *out_value = out->mutable_value();
// Runtime InferShape
......
from __future__ import print_function
import numpy as np
import paddle.v2 as paddle
import paddle.v2.fluid as fluid
import os
PASS_NUM = 100
EMBED_SIZE = 32
HIDDEN_SIZE = 256
N = 5
BATCH_SIZE = 32
IS_SPARSE = True
TRAINERS = 2
word_dict = paddle.dataset.imikolov.build_dict()
dict_size = len(word_dict)
first_word = fluid.layers.data(name='firstw', shape=[1], dtype='int64')
second_word = fluid.layers.data(name='secondw', shape=[1], dtype='int64')
third_word = fluid.layers.data(name='thirdw', shape=[1], dtype='int64')
forth_word = fluid.layers.data(name='forthw', shape=[1], dtype='int64')
next_word = fluid.layers.data(name='nextw', shape=[1], dtype='int64')
embed_first = fluid.layers.embedding(
input=first_word,
size=[dict_size, EMBED_SIZE],
dtype='float32',
is_sparse=IS_SPARSE,
param_attr='shared_w')
embed_second = fluid.layers.embedding(
input=second_word,
size=[dict_size, EMBED_SIZE],
dtype='float32',
is_sparse=IS_SPARSE,
param_attr='shared_w')
embed_third = fluid.layers.embedding(
input=third_word,
size=[dict_size, EMBED_SIZE],
dtype='float32',
is_sparse=IS_SPARSE,
param_attr='shared_w')
embed_forth = fluid.layers.embedding(
input=forth_word,
size=[dict_size, EMBED_SIZE],
dtype='float32',
is_sparse=IS_SPARSE,
param_attr='shared_w')
concat_embed = fluid.layers.concat(
input=[embed_first, embed_second, embed_third, embed_forth], axis=1)
hidden1 = fluid.layers.fc(input=concat_embed, size=HIDDEN_SIZE, act='sigmoid')
predict_word = fluid.layers.fc(input=hidden1, size=dict_size, act='softmax')
cost = fluid.layers.cross_entropy(input=predict_word, label=next_word)
avg_cost = fluid.layers.mean(x=cost)
sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001)
optimize_ops, params_grads = sgd_optimizer.minimize(avg_cost)
train_reader = paddle.batch(
paddle.dataset.imikolov.train(word_dict, N), BATCH_SIZE)
place = fluid.CPUPlace()
exe = fluid.Executor(place)
t = fluid.DistributeTranspiler()
# all parameter server endpoints list for spliting parameters
pserver_endpoints = os.getenv("PSERVERS")
# server endpoint for current node
current_endpoint = os.getenv("SERVER_ENDPOINT")
# run as trainer or parameter server
training_role = os.getenv("TRAINING_ROLE",
"TRAINER") # get the training role: trainer/pserver
t.transpile(
optimize_ops, params_grads, pservers=pserver_endpoints, trainers=TRAINERS)
if training_role == "PSERVER":
if not current_endpoint:
print("need env SERVER_ENDPOINT")
exit(1)
pserver_prog = t.get_pserver_program(current_endpoint, optimize_ops)
exe.run(fluid.default_startup_program())
exe.run(pserver_prog)
elif training_role == "TRAINER":
feeder = fluid.DataFeeder(
feed_list=[first_word, second_word, third_word, forth_word, next_word],
place=place)
exe.run(fluid.default_startup_program())
for pass_id in range(PASS_NUM):
for data in train_reader():
avg_cost_np = exe.run(fluid.default_main_program(),
feed=feeder.feed(data),
fetch_list=[avg_cost])
print("avg_cost_np", avg_cost_np)
if avg_cost_np[0] < 5.0:
exit(
0) # if avg cost less than 10.0, we think our code is good.
else:
print("environment var TRAINER_ROLE should be TRAINER os PSERVER")
exit(1)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册