diff --git a/paddle/framework/executor.cc b/paddle/framework/executor.cc index d8ef9a0fbaa7ba18d78060bd5b9605458cd9b1a2..c0418c9266e257bd7567861543e557f354451b17 100644 --- a/paddle/framework/executor.cc +++ b/paddle/framework/executor.cc @@ -35,7 +35,7 @@ const std::string kFetchOpType = "fetch"; Executor::Executor(const platform::Place& place) : place_(place) {} -void CreateTensor(Variable* var, proto::VarDesc::VarType var_type) { +static void CreateTensor(Variable* var, proto::VarDesc::VarType var_type) { if (var_type == proto::VarDesc::LOD_TENSOR) { var->GetMutable(); } else if (var_type == proto::VarDesc::SELECTED_ROWS) { diff --git a/paddle/framework/executor.h b/paddle/framework/executor.h index 0b2b5780fed1ef48ba78f44112fb0a88b477b796..d869e18901b82959a40cc296aa0844c20ea63ac1 100644 --- a/paddle/framework/executor.h +++ b/paddle/framework/executor.h @@ -45,7 +45,5 @@ class Executor { const platform::Place place_; }; -void CreateTensor(Variable* var, proto::VarDesc::VarType var_type); - } // namespace framework } // namespace paddle diff --git a/paddle/operators/recv_op.cc b/paddle/operators/recv_op.cc index 6f65b87d3b06c1d8d453f42194a277accdbc1164..9331c7b563491902b2824898766cacb9bfdee2d9 100644 --- a/paddle/operators/recv_op.cc +++ b/paddle/operators/recv_op.cc @@ -32,6 +32,20 @@ limitations under the License. */ namespace paddle { namespace operators { +static void CreateTensorFromMessageType(framework::Variable *var, + sendrecv::VarType var_type) { + if (var_type == sendrecv::VarType::LOD_TENSOR) { + var->GetMutable(); + } else if (var_type == sendrecv::VarType::SELECTED_ROWS) { + var->GetMutable(); + } else { + PADDLE_THROW( + "VraibleMessage type %d is not in " + "[LoDTensor, SelectedRows]", + var_type); + } +} + void RunServer(Server **rpc_server, std::shared_ptr service, const std::string &server_address) { @@ -111,10 +125,10 @@ class RecvOp : public framework::OperatorBase { auto *merged_grad = recv_scope.FindVar(grad_var_name); if (merged_grad == nullptr) { auto *ptr = recv_scope.Var(grad_var_name); - framework::CreateTensor(ptr, - framework::ToVarType(merged_grad->Type())); + CreateTensorFromMessageType(ptr, v.second.type()); VLOG(3) << "Create Variable " << grad_var_name - << " on recv scope, which pointer is " << ptr; + << " on recv scope, which pointer is " << ptr << " type is " + << v.second.type(); } if (trainer_count > 1) { diff --git a/paddle/operators/sum_op.h b/paddle/operators/sum_op.h index 2c43097d71751f3b5ac3b6366de095a22bac00ee..48201b344de0d3bd2b121a12389876dad095f10d 100644 --- a/paddle/operators/sum_op.h +++ b/paddle/operators/sum_op.h @@ -70,6 +70,7 @@ class SumKernel : public framework::OpKernel { } else if (out_var->IsType()) { PADDLE_ENFORCE(!in_place, "SelectedRows not support inplace sum now"); auto *out = context.Output("Out"); + out->mutable_rows()->clear(); auto *out_value = out->mutable_value(); // Runtime InferShape diff --git a/python/paddle/v2/fluid/tests/book_distribute/test_dist_word2vec.py b/python/paddle/v2/fluid/tests/book_distribute/test_dist_word2vec.py new file mode 100644 index 0000000000000000000000000000000000000000..b41853784d607c566fc596ab93f2282520778a4b --- /dev/null +++ b/python/paddle/v2/fluid/tests/book_distribute/test_dist_word2vec.py @@ -0,0 +1,96 @@ +from __future__ import print_function +import numpy as np +import paddle.v2 as paddle +import paddle.v2.fluid as fluid +import os + +PASS_NUM = 100 +EMBED_SIZE = 32 +HIDDEN_SIZE = 256 +N = 5 +BATCH_SIZE = 32 +IS_SPARSE = True +TRAINERS = 2 + +word_dict = paddle.dataset.imikolov.build_dict() +dict_size = len(word_dict) + +first_word = fluid.layers.data(name='firstw', shape=[1], dtype='int64') +second_word = fluid.layers.data(name='secondw', shape=[1], dtype='int64') +third_word = fluid.layers.data(name='thirdw', shape=[1], dtype='int64') +forth_word = fluid.layers.data(name='forthw', shape=[1], dtype='int64') +next_word = fluid.layers.data(name='nextw', shape=[1], dtype='int64') + +embed_first = fluid.layers.embedding( + input=first_word, + size=[dict_size, EMBED_SIZE], + dtype='float32', + is_sparse=IS_SPARSE, + param_attr='shared_w') +embed_second = fluid.layers.embedding( + input=second_word, + size=[dict_size, EMBED_SIZE], + dtype='float32', + is_sparse=IS_SPARSE, + param_attr='shared_w') +embed_third = fluid.layers.embedding( + input=third_word, + size=[dict_size, EMBED_SIZE], + dtype='float32', + is_sparse=IS_SPARSE, + param_attr='shared_w') +embed_forth = fluid.layers.embedding( + input=forth_word, + size=[dict_size, EMBED_SIZE], + dtype='float32', + is_sparse=IS_SPARSE, + param_attr='shared_w') + +concat_embed = fluid.layers.concat( + input=[embed_first, embed_second, embed_third, embed_forth], axis=1) +hidden1 = fluid.layers.fc(input=concat_embed, size=HIDDEN_SIZE, act='sigmoid') +predict_word = fluid.layers.fc(input=hidden1, size=dict_size, act='softmax') +cost = fluid.layers.cross_entropy(input=predict_word, label=next_word) +avg_cost = fluid.layers.mean(x=cost) +sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001) +optimize_ops, params_grads = sgd_optimizer.minimize(avg_cost) +train_reader = paddle.batch( + paddle.dataset.imikolov.train(word_dict, N), BATCH_SIZE) + +place = fluid.CPUPlace() +exe = fluid.Executor(place) + +t = fluid.DistributeTranspiler() +# all parameter server endpoints list for spliting parameters +pserver_endpoints = os.getenv("PSERVERS") +# server endpoint for current node +current_endpoint = os.getenv("SERVER_ENDPOINT") +# run as trainer or parameter server +training_role = os.getenv("TRAINING_ROLE", + "TRAINER") # get the training role: trainer/pserver +t.transpile( + optimize_ops, params_grads, pservers=pserver_endpoints, trainers=TRAINERS) +if training_role == "PSERVER": + if not current_endpoint: + print("need env SERVER_ENDPOINT") + exit(1) + pserver_prog = t.get_pserver_program(current_endpoint, optimize_ops) + exe.run(fluid.default_startup_program()) + exe.run(pserver_prog) +elif training_role == "TRAINER": + feeder = fluid.DataFeeder( + feed_list=[first_word, second_word, third_word, forth_word, next_word], + place=place) + exe.run(fluid.default_startup_program()) + for pass_id in range(PASS_NUM): + for data in train_reader(): + avg_cost_np = exe.run(fluid.default_main_program(), + feed=feeder.feed(data), + fetch_list=[avg_cost]) + print("avg_cost_np", avg_cost_np) + if avg_cost_np[0] < 5.0: + exit( + 0) # if avg cost less than 10.0, we think our code is good. +else: + print("environment var TRAINER_ROLE should be TRAINER os PSERVER") +exit(1)