未验证 提交 8d6db251 编写于 作者: 武毅 提交者: GitHub

Merge pull request #6297 from typhoonzero/simple_dist_train_api

[Done] API for dist train
...@@ -90,6 +90,21 @@ OpDesc *BlockDesc::PrependOp() { ...@@ -90,6 +90,21 @@ OpDesc *BlockDesc::PrependOp() {
return ops_.front().get(); return ops_.front().get();
} }
void BlockDesc::RemoveOp(size_t s, size_t e) {
if (ops_.begin() + s == ops_.end() || ops_.begin() + e == ops_.end()) {
return;
}
need_update_ = true;
for (auto it = ops_.begin() + s; it != ops_.begin() + e; it++) {
auto names = (*it)->InputArgumentNames();
for (auto n : names) {
// TODO(typhoonzero): delete vars if no other op use it.
VLOG(3) << "deleting var " << n;
}
}
ops_.erase(ops_.begin() + s, ops_.begin() + e);
}
std::vector<OpDesc *> BlockDesc::AllOps() const { std::vector<OpDesc *> BlockDesc::AllOps() const {
std::vector<OpDesc *> res; std::vector<OpDesc *> res;
for (const auto &op : ops_) { for (const auto &op : ops_) {
......
...@@ -79,6 +79,8 @@ class BlockDesc { ...@@ -79,6 +79,8 @@ class BlockDesc {
OpDesc *PrependOp(); OpDesc *PrependOp();
void RemoveOp(size_t s, size_t e);
std::vector<OpDesc *> AllOps() const; std::vector<OpDesc *> AllOps() const;
size_t OpSize() const { return ops_.size(); } size_t OpSize() const { return ops_.size(); }
......
...@@ -65,7 +65,7 @@ static void CreateTensor(Variable* var, proto::VarDesc::VarType var_type) { ...@@ -65,7 +65,7 @@ static void CreateTensor(Variable* var, proto::VarDesc::VarType var_type) {
} }
void Executor::Run(const ProgramDesc& pdesc, Scope* scope, int block_id, void Executor::Run(const ProgramDesc& pdesc, Scope* scope, int block_id,
bool create_local_scope) { bool create_local_scope, bool create_vars) {
// TODO(tonyyang-svail): // TODO(tonyyang-svail):
// - only runs on the first device (i.e. no interdevice communication) // - only runs on the first device (i.e. no interdevice communication)
// - will change to use multiple blocks for RNN op and Cond Op // - will change to use multiple blocks for RNN op and Cond Op
...@@ -74,6 +74,7 @@ void Executor::Run(const ProgramDesc& pdesc, Scope* scope, int block_id, ...@@ -74,6 +74,7 @@ void Executor::Run(const ProgramDesc& pdesc, Scope* scope, int block_id,
auto& device = device_contexts_[0]; auto& device = device_contexts_[0];
Scope* local_scope = scope; Scope* local_scope = scope;
if (create_vars) {
if (create_local_scope) { if (create_local_scope) {
local_scope = &scope->NewScope(); local_scope = &scope->NewScope();
for (auto& var : block.AllVars()) { for (auto& var : block.AllVars()) {
...@@ -100,7 +101,8 @@ void Executor::Run(const ProgramDesc& pdesc, Scope* scope, int block_id, ...@@ -100,7 +101,8 @@ void Executor::Run(const ProgramDesc& pdesc, Scope* scope, int block_id,
VLOG(3) << "Create variable " << var->Name() << ", which pointer is " VLOG(3) << "Create variable " << var->Name() << ", which pointer is "
<< ptr; << ptr;
} }
} } // if (create_local_scope)
} // if (create_vars)
for (auto& op_desc : block.AllOps()) { for (auto& op_desc : block.AllOps()) {
auto op = paddle::framework::OpRegistry::CreateOp(*op_desc); auto op = paddle::framework::OpRegistry::CreateOp(*op_desc);
......
...@@ -124,7 +124,8 @@ class Executor { ...@@ -124,7 +124,8 @@ class Executor {
* ProgramDesc * ProgramDesc
* Scope * Scope
*/ */
void Run(const ProgramDesc&, Scope*, int, bool create_local_scope = true); void Run(const ProgramDesc&, Scope*, int, bool create_local_scope = true,
bool create_vars = true);
private: private:
std::vector<const platform::DeviceContext*> device_contexts_; std::vector<const platform::DeviceContext*> device_contexts_;
......
...@@ -20,25 +20,57 @@ namespace detail { ...@@ -20,25 +20,57 @@ namespace detail {
Status SendRecvServerImpl::SendVariable(ServerContext *context, Status SendRecvServerImpl::SendVariable(ServerContext *context,
const VariableMessage *in_var, const VariableMessage *in_var,
VariableMessage *out_var) { VoidMessage *out_var) {
framework::LoDTensor t; // TODO(typhoonzero): support different variable types.
// TODO(typhoonzero): desirealize in_tensor and run pserver network.
std::istringstream iss(in_var->serialized()); std::istringstream iss(in_var->serialized());
framework::LoDTensor t;
framework::DeserializeFromStream(iss, &t); framework::DeserializeFromStream(iss, &t);
lodtensor_queue_.Push(std::move(t)); TensorWithName tensor_with_name =
// Block util the sub graph is done. std::make_pair(in_var->varname(), std::move(t));
t = lodtensor_return_queue_.Pop();
var_recv_queue_.Push(std::move(tensor_with_name));
return Status::OK;
}
Status SendRecvServerImpl::GetVariable(ServerContext *context,
const VariableMessage *in_var,
VariableMessage *out_var) {
std::string get_var_name = in_var->varname();
auto *var = scope_->FindVar(get_var_name);
auto tensor = var->Get<framework::LoDTensor>();
std::ostringstream oss; std::ostringstream oss;
// FIXME(typhoonzero): get context from op. framework::SerializeToStream(oss, tensor, platform::CPUDeviceContext());
framework::SerializeToStream(oss, t, platform::CPUDeviceContext());
std::string *varname = out_var->mutable_varname(); std::string *varname = out_var->mutable_varname();
*varname = in_var->varname(); *varname = get_var_name;
std::string *serialized = out_var->mutable_serialized(); std::string *serialized = out_var->mutable_serialized();
*serialized = oss.str(); *serialized = oss.str();
return Status::OK;
}
Status SendRecvServerImpl::Wait(ServerContext *context,
const VoidMessage *in_var,
VoidMessage *out_var) {
{
std::unique_lock<std::mutex> lock(this->mutex_);
condition_.wait(lock, [=] { return this->done_ == true; });
}
return Status::OK; return Status::OK;
} }
void SendRecvServerImpl::Reset() {
std::lock_guard<std::mutex> lock(this->mutex_);
done_ = false;
}
void SendRecvServerImpl::Done() {
{
std::lock_guard<std::mutex> lock(this->mutex_);
done_ = true;
}
condition_.notify_all();
}
} // namespace detail } // namespace detail
} // namespace operators } // namespace operators
} // namespace paddle } // namespace paddle
...@@ -19,10 +19,10 @@ namespace operators { ...@@ -19,10 +19,10 @@ namespace operators {
namespace detail { namespace detail {
bool RPCClient::SendVariable(const framework::Scope& scope, bool RPCClient::SendVariable(const framework::Scope& scope,
const std::string& inname, const std::string& inname) {
const std::string& outname) {
ClientContext context; ClientContext context;
VariableMessage msg, out_msg; VariableMessage msg;
VoidMessage out_msg;
// FIXME(typhoonzero): pass device context to here. // FIXME(typhoonzero): pass device context to here.
auto ctx = platform::CPUDeviceContext(); auto ctx = platform::CPUDeviceContext();
auto* var = scope.FindVar(inname); auto* var = scope.FindVar(inname);
...@@ -37,9 +37,26 @@ bool RPCClient::SendVariable(const framework::Scope& scope, ...@@ -37,9 +37,26 @@ bool RPCClient::SendVariable(const framework::Scope& scope,
msg.set_serialized(oss.str()); msg.set_serialized(oss.str());
Status status = stub_->SendVariable(&context, msg, &out_msg); Status status = stub_->SendVariable(&context, msg, &out_msg);
if (!status.ok()) { if (!status.ok()) {
LOG(ERROR) << "gRPC error: " << status.error_message();
return false; return false;
} }
std::istringstream iss(out_msg.serialized()); return true;
}
bool RPCClient::GetVariable(const framework::Scope& scope,
const std::string& outname) {
ClientContext context;
VariableMessage call_msg, ret_msg;
call_msg.set_varname(outname);
auto ctx = platform::CPUDeviceContext();
Status status = stub_->GetVariable(&context, call_msg, &ret_msg);
if (!status.ok()) {
LOG(ERROR) << "gRPC error: " << status.error_message();
return false;
}
std::istringstream iss(ret_msg.serialized());
framework::LoDTensor ret_tensor; framework::LoDTensor ret_tensor;
framework::DeserializeFromStream(iss, &ret_tensor); framework::DeserializeFromStream(iss, &ret_tensor);
auto* outvar = scope.FindVar(outname); auto* outvar = scope.FindVar(outname);
...@@ -49,6 +66,12 @@ bool RPCClient::SendVariable(const framework::Scope& scope, ...@@ -49,6 +66,12 @@ bool RPCClient::SendVariable(const framework::Scope& scope,
return true; return true;
} }
void RPCClient::Wait() {
ClientContext context;
VoidMessage call_msg, ret_msg;
stub_->Wait(&context, call_msg, &ret_msg);
}
} // namespace detail } // namespace detail
} // namespace operators } // namespace operators
} // namespace paddle } // namespace paddle
...@@ -19,7 +19,12 @@ package sendrecv; ...@@ -19,7 +19,12 @@ package sendrecv;
service SendRecvService { service SendRecvService {
// For parameter server round-robin like hashing, do not split tensors. // For parameter server round-robin like hashing, do not split tensors.
// Send and recv only one tensor // Send and recv only one tensor
rpc SendVariable(VariableMessage) returns (VariableMessage) {} // TODO(typhoonzero): add streaming API
rpc SendVariable(VariableMessage) returns (VoidMessage) {}
// Argument VariableMessage for GetVariable should only contain varname.
rpc GetVariable(VariableMessage) returns (VariableMessage) {}
// wait for one execution of the program
rpc Wait(VoidMessage) returns (VoidMessage) {}
} }
// VariableMessage is serialized paddle variable message. // VariableMessage is serialized paddle variable message.
......
...@@ -20,10 +20,6 @@ ...@@ -20,10 +20,6 @@
#include "paddle/framework/selected_rows.h" #include "paddle/framework/selected_rows.h"
#include "paddle/operators/detail/simple_block_queue.h" #include "paddle/operators/detail/simple_block_queue.h"
// #include <grpc++/channel.h>
// #include <grpc++/client_context.h>
// #include <grpc++/create_channel.h>
// #include <grpc++/security/credentials.h>
#include "paddle/operators/detail/send_recv.grpc.pb.h" #include "paddle/operators/detail/send_recv.grpc.pb.h"
#include "paddle/operators/detail/send_recv.pb.h" #include "paddle/operators/detail/send_recv.pb.h"
...@@ -48,24 +44,32 @@ namespace paddle { ...@@ -48,24 +44,32 @@ namespace paddle {
namespace operators { namespace operators {
namespace detail { namespace detail {
typedef std::pair<std::string, framework::LoDTensor> TensorWithName;
class SendRecvServerImpl final : public SendRecvService::Service { class SendRecvServerImpl final : public SendRecvService::Service {
public: public:
explicit SendRecvServerImpl() {} explicit SendRecvServerImpl() {}
Status SendVariable(ServerContext *context, const VariableMessage *in_var, Status SendVariable(ServerContext *context, const VariableMessage *in_var,
VoidMessage *out_var) override;
Status GetVariable(ServerContext *context, const VariableMessage *in_var,
VariableMessage *out_var) override; VariableMessage *out_var) override;
Status Wait(ServerContext *context, const VoidMessage *in_var,
VoidMessage *out_var) override;
void Reset();
void Done();
void SetScope(framework::Scope *scope) { scope_ = scope; };
const framework::LoDTensor Get() { return this->lodtensor_queue_.Pop(); } const TensorWithName Get() { return this->var_recv_queue_.Pop(); }
void Push(const framework::LoDTensor &tensor) {
this->lodtensor_return_queue_.Push(tensor);
}
private: private:
SimpleBlockQueue<framework::LoDTensor> lodtensor_queue_; // received variable from RPC, operators fetch variable from this queue.
SimpleBlockQueue<framework::LoDTensor> lodtensor_return_queue_; SimpleBlockQueue<TensorWithName> var_recv_queue_;
SimpleBlockQueue<framework::SelectedRows> selected_rows_queue_; framework::Scope *scope_;
SimpleBlockQueue<framework::SelectedRows> selected_rows_return_queue_; // condition of the sub program
std::mutex mutex_;
bool done_;
std::condition_variable condition_;
}; };
// RPCClient is a class to send tensors to pserver sub-network // RPCClient is a class to send tensors to pserver sub-network
...@@ -75,8 +79,9 @@ class RPCClient { ...@@ -75,8 +79,9 @@ class RPCClient {
RPCClient(std::shared_ptr<Channel> channel) RPCClient(std::shared_ptr<Channel> channel)
: stub_(SendRecvService::NewStub(channel)) {} : stub_(SendRecvService::NewStub(channel)) {}
bool SendVariable(const framework::Scope &scope, const std::string &inname, bool SendVariable(const framework::Scope &scope, const std::string &inname);
const std::string &outname); bool GetVariable(const framework::Scope &scope, const std::string &outname);
void Wait();
private: private:
std::unique_ptr<SendRecvService::Stub> stub_; std::unique_ptr<SendRecvService::Stub> stub_;
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#include "paddle/framework/framework.pb.h" #include "paddle/framework/framework.pb.h"
#include "paddle/framework/lod_tensor.h" #include "paddle/framework/lod_tensor.h"
#include "paddle/framework/op_registry.h" #include "paddle/framework/op_registry.h"
#include "paddle/framework/proto_desc.h"
#include "paddle/operators/detail/send_recv_impl.h" #include "paddle/operators/detail/send_recv_impl.h"
#include "paddle/operators/detail/simple_block_queue.h" #include "paddle/operators/detail/simple_block_queue.h"
...@@ -61,29 +62,76 @@ class RecvOp : public framework::OperatorBase { ...@@ -61,29 +62,76 @@ class RecvOp : public framework::OperatorBase {
server_thread_->join(); server_thread_->join();
} }
std::string GetGradVarNameForTrainer(const std::string &varname) const {
if (grads_counter_.find(varname) == grads_counter_.end()) {
grads_counter_[varname] = 0;
}
char ret[256];
snprintf(ret, sizeof(ret), "%s.trainer_%d", varname.c_str(),
grads_counter_[varname]++);
return std::string(ret);
}
void Run(const framework::Scope &scope, void Run(const framework::Scope &scope,
const platform::DeviceContext &dev_ctx) const override { const platform::DeviceContext &dev_ctx) const override {
// blocking get one var from client. // FIXME(typhoonzero): no new scopes for every run.
const framework::LoDTensor &t = rpc_service_->Get();
framework::Scope &recv_scope = scope.NewScope(); framework::Scope &recv_scope = scope.NewScope();
// set graph input var rpc_service_->SetScope(&recv_scope);
auto *var = recv_scope.Var(Input("RX")); auto param_list = Attr<std::vector<std::string>>("ParamList");
auto grad_list = Attr<std::vector<std::string>>("GradList");
auto trainer_count = Attr<int>("Trainers");
size_t param_count = param_list.size();
rpc_service_->Reset();
// TODO(typhoonzero): change this to a while_op for every cluster-batch.
while (true) {
// Get from multiple trainers, we don't care about order in which
// the gradient arrives, just add suffix 0~n then average the gradient.
for (size_t i = 0; i < param_count * trainer_count; ++i) {
// blocking get one var from client.
const detail::TensorWithName &v = rpc_service_->Get();
auto grad_var_name = v.first;
auto it = std::find(grad_list.begin(), grad_list.end(), grad_var_name);
std::string param_var_name;
if (it != grad_list.end()) {
param_var_name = param_list[it - grad_list.begin()];
} else {
LOG(ERROR) << "grad have no paired param found!";
}
VLOG(3) << "recved grad: " << grad_var_name
<< " updating param: " << param_var_name;
auto *merged_grad = recv_scope.FindVar(grad_var_name);
if (merged_grad == nullptr) {
// create output of merged var.
auto merged_var = recv_scope.Var(grad_var_name);
merged_var->GetMutable<framework::LoDTensor>();
}
if (trainer_count > 1) {
grad_var_name = this->GetGradVarNameForTrainer(grad_var_name);
}
auto *var = recv_scope.Var(grad_var_name);
auto *tensor = var->GetMutable<framework::LoDTensor>(); auto *tensor = var->GetMutable<framework::LoDTensor>();
// FIXME(typhoonzero): do not copy // FIXME(typhoonzero): do not copy
framework::CopyFrom(t, dev_ctx.GetPlace(), dev_ctx, tensor); framework::CopyFrom(v.second, dev_ctx.GetPlace(), dev_ctx, tensor);
}
rpc_service_->Reset();
std::string program_str = Attr<std::string>("OptimizeProgram"); std::string program_str = Attr<std::string>("OptimizeProgram");
framework::ProgramDesc program_desc; framework::proto::ProgramDesc program_desc;
program_desc.ParseFromString(program_str); program_desc.ParseFromString(program_str);
framework::ProgramDescBind program(program_desc); framework::ProgramDesc program(program_desc);
framework::Executor executor(dev_ctx); framework::Executor executor(dev_ctx);
// Run sub graph to get optimized tensor // Run sub graph to get optimized tensor
try {
executor.Run(program, &recv_scope, 0, /*global_block*/ executor.Run(program, &recv_scope, 0, /*global_block*/
false /*create_local_scope*/); false /*create_local_scope*/, false /*create_vars*/);
} catch (std::exception &e) {
auto *out_var = recv_scope.FindVar("Out"); LOG(ERROR) << "run sub program error " << e.what();
// push back }
rpc_service_->Push(out_var->Get<framework::LoDTensor>()); rpc_service_->Done();
grads_counter_.clear();
} // while(true)
} }
protected: protected:
...@@ -93,13 +141,14 @@ class RecvOp : public framework::OperatorBase { ...@@ -93,13 +141,14 @@ class RecvOp : public framework::OperatorBase {
// grpc send/recv service implement to register. // grpc send/recv service implement to register.
std::shared_ptr<detail::SendRecvServerImpl> rpc_service_; std::shared_ptr<detail::SendRecvServerImpl> rpc_service_;
std::shared_ptr<std::thread> server_thread_; std::shared_ptr<std::thread> server_thread_;
mutable std::unordered_map<std::string, int> grads_counter_;
}; };
class RecvOpMaker : public framework::OpProtoAndCheckerMaker { class RecvOpMaker : public framework::OpProtoAndCheckerMaker {
public: public:
RecvOpMaker(OpProto *proto, OpAttrChecker *op_checker) RecvOpMaker(OpProto *proto, OpAttrChecker *op_checker)
: OpProtoAndCheckerMaker(proto, op_checker) { : OpProtoAndCheckerMaker(proto, op_checker) {
AddInput("RX", "(Tensor) Input tensor to be saved"); AddInput("RX", "(Tensor) Input tensor to be optimized").AsDuplicable();
AddComment(R"DOC( AddComment(R"DOC(
Recv operator Recv operator
...@@ -112,6 +161,17 @@ This operator will recv tensor from send_op ...@@ -112,6 +161,17 @@ This operator will recv tensor from send_op
.AddCustomChecker([](const std::string &ip) { return !ip.empty(); }); .AddCustomChecker([](const std::string &ip) { return !ip.empty(); });
AddAttr<std::string>("OptimizeProgram", "type string", AddAttr<std::string>("OptimizeProgram", "type string",
"Serialized ProgramDesc string for recv to run."); "Serialized ProgramDesc string for recv to run.");
AddAttr<std::vector<std::string>>(
"ParamList", "type list of string",
"grad->param name mapping to find which param to optimize.")
.SetDefault({});
AddAttr<std::vector<std::string>>(
"GradList", "type list of string",
"grad->param name mapping to find which param to optimize.")
.SetDefault({});
AddAttr<int>("Trainers", "type int",
"Number of trainers in the current cluster job")
.SetDefault(1);
} }
}; };
......
...@@ -34,45 +34,56 @@ class SendOp : public framework::OperatorBase { ...@@ -34,45 +34,56 @@ class SendOp : public framework::OperatorBase {
const framework::AttributeMap &attrs) const framework::AttributeMap &attrs)
: OperatorBase(type, inputs, outputs, attrs) { : OperatorBase(type, inputs, outputs, attrs) {
// init client when the operator is created at runtime. // init client when the operator is created at runtime.
if (!client_) { std::vector<std::string> endpoints =
std::string endpoint = Attr<std::string>("endpoint"); Attr<std::vector<std::string>>("endpoints");
client_.reset(new detail::RPCClient( for (auto ep : endpoints) {
grpc::CreateChannel(endpoint, grpc::InsecureChannelCredentials()))); client_map_[ep].reset(new detail::RPCClient(
// TODO(typhoonzero): how to call InitVariables grpc::CreateChannel(ep, grpc::InsecureChannelCredentials())));
} }
} }
void Run(const framework::Scope &scope, void Run(const framework::Scope &scope,
const platform::DeviceContext &dev_ctx) const override { const platform::DeviceContext &dev_ctx) const override {
auto iname = Input("X"); auto ins = Inputs("X");
auto oname = Output("Out"); std::vector<std::string> epmap = Attr<std::vector<std::string>>("epmap");
// TODO(typhoonzero): currently it's non-blocking, // TODO(typhoonzero): use async calls to send multiple variable asyncly.
// should block until server responds. for (size_t i = 0; i < ins.size(); ++i) {
bool ret = client_->SendVariable(scope, iname, oname); bool ret = client_map_[epmap[i]]->SendVariable(scope, ins[i]);
if (!ret) { if (!ret) {
LOG(ERROR) << "send variable error"; LOG(ERROR) << "send variable error: " << ins[i];
}
}
// TODO(typhoonzero): support async optimization
client_map_[epmap[0]]->Wait();
for (size_t i = 0; i < ins.size(); ++i) {
bool ret = client_map_[epmap[i]]->GetVariable(scope, ins[i]);
if (!ret) {
LOG(ERROR) << "GetVariable error: " << ins[i];
}
} }
} }
protected: protected:
std::shared_ptr<detail::RPCClient> client_{nullptr}; mutable std::unordered_map<std::string, std::shared_ptr<detail::RPCClient>>
client_map_;
}; };
class SendOpMaker : public framework::OpProtoAndCheckerMaker { class SendOpMaker : public framework::OpProtoAndCheckerMaker {
public: public:
SendOpMaker(OpProto *proto, OpAttrChecker *op_checker) SendOpMaker(OpProto *proto, OpAttrChecker *op_checker)
: OpProtoAndCheckerMaker(proto, op_checker) { : OpProtoAndCheckerMaker(proto, op_checker) {
AddInput("X", "(Tensor) Input tensor to be saved"); AddInput("X", "(Tensor) Input tensor to be send").AsDuplicable();
AddOutput("Out", "(Tensor) Output fetched from server");
AddComment(R"DOC( AddComment(R"DOC(
Recv operator Recv operator
This operator will recv tensor from send_op This operator will recv tensor from send_op
)DOC"); )DOC");
AddAttr<std::string>("endpoint", AddAttr<std::vector<std::string>>("endpoints",
"(string, default 127.0.0.1:6164)" "(string vector, default 127.0.0.1:6164)"
"IP address to listen on.") "Server endpoints to send variables to.");
.SetDefault("127.0.0.1:6164") AddAttr<std::vector<std::string>>("epmap",
.AddCustomChecker([](const std::string &ip) { return !ip.empty(); }); "(string vector, default 127.0.0.1:6164)"
"Server endpoints in the order of input "
"variables for mapping");
} }
}; };
......
...@@ -16,12 +16,14 @@ ...@@ -16,12 +16,14 @@
// a RemoteOptimizer. // a RemoteOptimizer.
#include <unistd.h> #include <unistd.h>
#include <string>
#include <thread> #include <thread>
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include "paddle/framework/op_registry.h" #include "paddle/framework/op_registry.h"
#include "paddle/framework/operator.h" #include "paddle/framework/operator.h"
#include "paddle/framework/program_desc.h" #include "paddle/framework/program_desc.h"
#include "paddle/string/printf.h"
USE_NO_KERNEL_OP(send); USE_NO_KERNEL_OP(send);
USE_NO_KERNEL_OP(recv); USE_NO_KERNEL_OP(recv);
...@@ -33,30 +35,33 @@ std::unique_ptr<paddle::framework::OperatorBase> recv_op; ...@@ -33,30 +35,33 @@ std::unique_ptr<paddle::framework::OperatorBase> recv_op;
void InitTensorsInScope(paddle::framework::Scope &scope, void InitTensorsInScope(paddle::framework::Scope &scope,
paddle::platform::CPUPlace &place) { paddle::platform::CPUPlace &place) {
paddle::platform::CPUDeviceContext ctx(place); paddle::platform::CPUDeviceContext ctx(place);
auto var = scope.Var("X"); for (int i = 0; i < 2; ++i) {
auto var_name = paddle::string::Sprintf("x%d", i);
auto var = scope.Var(var_name);
auto tensor = var->GetMutable<paddle::framework::LoDTensor>(); auto tensor = var->GetMutable<paddle::framework::LoDTensor>();
tensor->Resize({10, 10}); tensor->Resize({10, 10});
float *expect = tensor->mutable_data<float>(place); float *expect = tensor->mutable_data<float>(place);
for (int64_t i = 0; i < tensor->numel(); ++i) { for (int64_t i = 0; i < tensor->numel(); ++i) {
expect[i] = static_cast<float>(i); expect[i] = static_cast<float>(i);
} }
}
auto out_var = scope.Var("Out"); auto out_var = scope.Var("Out");
auto out_tensor = out_var->GetMutable<paddle::framework::LoDTensor>(); auto out_tensor = out_var->GetMutable<paddle::framework::LoDTensor>();
out_tensor->Resize({10, 10}); out_tensor->Resize({10, 10});
tensor->mutable_data<float>(place); // allocate out_tensor->mutable_data<float>(place); // allocate
} }
void AddOp(const std::string &type, void AddOp(const std::string &type,
const paddle::framework::VariableNameMap &inputs, const paddle::framework::VariableNameMap &inputs,
const paddle::framework::VariableNameMap &outputs, const paddle::framework::VariableNameMap &outputs,
paddle::framework::AttributeMap attrs, paddle::framework::AttributeMap attrs,
paddle::framework::BlockDescBind *block) { paddle::framework::BlockDesc *block) {
// insert output // insert output
for (auto kv : outputs) { for (auto kv : outputs) {
for (auto v : kv.second) { for (auto v : kv.second) {
auto var = block->Var(v); auto var = block->Var(v);
var->SetDataType(paddle::framework::DataType::FP32); var->SetDataType(paddle::framework::proto::DataType::FP32);
} }
} }
...@@ -78,10 +83,10 @@ void StartServerNet() { ...@@ -78,10 +83,10 @@ void StartServerNet() {
InitTensorsInScope(scope, place); InitTensorsInScope(scope, place);
// sub program run in recv_op, for simple test we use sum // sub program run in recv_op, for simple test we use sum
paddle::framework::ProgramDescBind program; paddle::framework::ProgramDesc program;
paddle::framework::BlockDescBind *block = program.MutableBlock(0); paddle::framework::BlockDesc *block = program.MutableBlock(0);
// X for server side tensors, RX for received tensers, must be of same shape. // X for server side tensors, RX for received tensers, must be of same shape.
AddOp("sum", {{"X", {"X", "RX"}}}, {{"Out", {"Out"}}}, {}, block); AddOp("sum", {{"X", {"x0", "x1"}}}, {{"Out", {"Out"}}}, {}, block);
paddle::framework::AttributeMap attrs; paddle::framework::AttributeMap attrs;
attrs.insert({"endpoint", std::string("127.0.0.1:6174")}); attrs.insert({"endpoint", std::string("127.0.0.1:6174")});
...@@ -89,8 +94,8 @@ void StartServerNet() { ...@@ -89,8 +94,8 @@ void StartServerNet() {
PADDLE_ENFORCE(program.Proto()->SerializeToString(&program_proto)); PADDLE_ENFORCE(program.Proto()->SerializeToString(&program_proto));
attrs.insert({"OptimizeProgram", program_proto}); attrs.insert({"OptimizeProgram", program_proto});
recv_op = paddle::framework::OpRegistry::CreateOp("recv", {{"RX", {"RX"}}}, recv_op = paddle::framework::OpRegistry::CreateOp(
{{"Out", {"Out"}}}, attrs); "recv", {{"RX", {"x0", "x1"}}}, {{"Out", {"Out"}}}, attrs);
paddle::platform::CPUDeviceContext ctx(place); paddle::platform::CPUDeviceContext ctx(place);
recv_op->Run(scope, ctx); recv_op->Run(scope, ctx);
} }
...@@ -107,11 +112,11 @@ TEST(SendRecvOp, CPU) { ...@@ -107,11 +112,11 @@ TEST(SendRecvOp, CPU) {
attrs.insert({"endpoint", std::string("127.0.0.1:6174")}); attrs.insert({"endpoint", std::string("127.0.0.1:6174")});
auto send_op = paddle::framework::OpRegistry::CreateOp( auto send_op = paddle::framework::OpRegistry::CreateOp(
"send", {{"X", {"X"}}}, {{"Out", {"Out"}}}, attrs); "send", {{"X", {"x0", "x1"}}}, {{"Out", {"Out"}}}, attrs);
paddle::platform::CPUDeviceContext ctx(place); paddle::platform::CPUDeviceContext ctx(place);
send_op->Run(scope, ctx); send_op->Run(scope, ctx);
auto in_var = scope.Var("X"); auto in_var = scope.Var("x0");
auto tensor = in_var->GetMutable<paddle::framework::LoDTensor>(); auto tensor = in_var->GetMutable<paddle::framework::LoDTensor>();
float *expected = tensor->data<float>(); float *expected = tensor->data<float>();
......
...@@ -159,6 +159,7 @@ void BindBlockDesc(py::module &m) { ...@@ -159,6 +159,7 @@ void BindBlockDesc(py::module &m) {
py::return_value_policy::reference) py::return_value_policy::reference)
.def("prepend_op", &BlockDesc::PrependOp, .def("prepend_op", &BlockDesc::PrependOp,
py::return_value_policy::reference) py::return_value_policy::reference)
.def("remove_op", &BlockDesc::RemoveOp)
.def("var", .def("var",
[](BlockDesc &self, py::bytes byte_name) { [](BlockDesc &self, py::bytes byte_name) {
std::string name = byte_name; std::string name = byte_name;
...@@ -249,6 +250,12 @@ void BindOpDesc(py::module &m) { ...@@ -249,6 +250,12 @@ void BindOpDesc(py::module &m) {
.def("set_attr", &OpDesc::SetAttr) .def("set_attr", &OpDesc::SetAttr)
.def("attr", &OpDesc::GetAttr) .def("attr", &OpDesc::GetAttr)
.def("set_block_attr", &OpDesc::SetBlockAttr) .def("set_block_attr", &OpDesc::SetBlockAttr)
.def("set_serialized_attr",
[](OpDesc &self, const std::string &name,
const py::bytes &seriralized) {
std::string ser(seriralized);
self.SetAttr(name, ser);
})
.def("block_attr", &OpDesc::GetBlockAttr) .def("block_attr", &OpDesc::GetBlockAttr)
.def("check_attrs", &OpDesc::CheckAttrs) .def("check_attrs", &OpDesc::CheckAttrs)
.def("infer_shape", &OpDesc::InferShape) .def("infer_shape", &OpDesc::InferShape)
......
...@@ -16,13 +16,14 @@ import regularizer ...@@ -16,13 +16,14 @@ import regularizer
from param_attr import ParamAttr from param_attr import ParamAttr
from data_feeder import DataFeeder from data_feeder import DataFeeder
from core import LoDTensor, CPUPlace, GPUPlace from core import LoDTensor, CPUPlace, GPUPlace
from distribute_transpiler import DistributeTranspiler
import clip import clip
Tensor = LoDTensor Tensor = LoDTensor
__all__ = framework.__all__ + executor.__all__ + [ __all__ = framework.__all__ + executor.__all__ + [
'io', 'initializer', 'layers', 'nets', 'optimizer', 'backward', 'io', 'initializer', 'layers', 'nets', 'optimizer', 'backward',
'regularizer', 'LoDTensor', 'CPUPlace', 'GPUPlace', 'Tensor', 'ParamAttr' 'regularizer', 'LoDTensor', 'CPUPlace', 'GPUPlace', 'Tensor', 'ParamAttr'
'DataFeeder', 'clip' 'DataFeeder', 'clip', 'DistributeTranspiler'
] ]
......
import framework
from framework import Program, default_main_program, Parameter, Variable
import optimizer
from layer_helper import LayerHelper
def hash_name_to_server(params_grads, pserver_endpoints):
"""
:param param_grads:
:return: a map of pserver endpoint ->
params -> [param list]
grads -> [grad list]
"""
def _hash_param(param_name, total):
return hash(param_name) % total
param_grad_map = dict()
for param, grad in params_grads:
if param.trainable is True and grad is not None:
server_id = _hash_param(param.name, len(pserver_endpoints))
server_for_param = pserver_endpoints[server_id]
if not param_grad_map.has_key(server_for_param):
param_grad_map[server_for_param] = {"params": [], "grads": []}
param_grad_map[server_for_param]["params"].append(param)
param_grad_map[server_for_param]["grads"].append(grad)
return param_grad_map
def round_robin(params_grads, pserver_endpoints):
assert (len(params_grads) > len(pserver_endpoints))
param_grad_map = dict()
pserver_idx = 0
for param, grad in params_grads:
if param.trainable is True:
server_for_param = pserver_endpoints[pserver_idx]
if not param_grad_map.has_key(server_for_param):
param_grad_map[server_for_param] = {"params": [], "grads": []}
param_grad_map[server_for_param]["params"].append(param)
param_grad_map[server_for_param]["grads"].append(grad)
pserver_idx += 1
if pserver_idx >= len(pserver_endpoints):
pserver_idx = 0
return param_grad_map
class DistributeTranspiler:
def transpile(self,
optimize_ops,
params_grads,
program=None,
pservers="127.0.0.1:6174",
trainers=1,
split_method=round_robin):
"""
Transpile the program to a distributed data-parallelism programs.
The main_program will be transform to use a remote parameter server
to do parameter optimization. And the optimization graph will be put
in to a parameter server program.
Use different methods to split trainable varialbles to different
parameter servers.
Example to run:
exe = fluid.Executor(place)
t = fluid.DistributeTranspiler()
t.transpile(optimize_ops, params_grads, pservers="127.0.0.1:6174", trainers=1)
pserver_endpoint = os.getenv("PSERVER")
if pserver_endpoint:
pserver_prog = t.get_pserver_program(pserver_endpoint, optimize_ops)
exe.run(fluid.default_startup_program())
exe.run(pserver_prog)
else:
feeder = fluid.DataFeeder(feed_list=[images, label], place=place)
exe.run(fluid.default_startup_program())
for pass_id in range(PASS_NUM):
...
:param optimize_ops: op list of optimization, should be the
return value of Optimizer.minimize
:type optimize_ops: list
:param program: program to optimize, default default_main_program
:param pservers: parameter server endpoints like "m1:6174,m2:6174"
:type pservers: string
:return: return a list of programs
"""
if program is None:
program = default_main_program()
self.trainers = trainers
self._optimize_distributed(
optimize_ops,
program,
params_grads,
pservers=pservers,
trainers=trainers,
split_method=split_method)
def _clone_param(self, block, v):
assert isinstance(v, Parameter)
new_p = Parameter(
block=block,
shape=v.shape,
dtype=v.dtype,
type=v.type,
lod_level=v.lod_level,
stop_gradient=v.stop_gradient,
trainable=v.trainable,
optimize_attr=v.optimize_attr,
regularizer=v.regularizer,
name=v.name)
block.vars[new_p.name] = new_p
def _clone_var(self, block, var):
assert isinstance(var, Variable)
return block.create_var(
name=var.name,
shape=var.shape,
dtype=var.dtype,
type=var.type,
lod_level=var.lod_level,
persistable=var.persistable)
def _optimize_distributed(self, optimize_ops, program, params_and_grads,
**kwargs):
if kwargs.has_key("split_method"):
split_method = kwargs["split_method"]
else:
split_method = round_robin
assert (callable(split_method))
pserver_endpoints = kwargs["pservers"].split(",")
self.param_grad_map = split_method(params_and_grads, pserver_endpoints)
send_op_ordered_inputs = []
epmap = []
for ep, v in self.param_grad_map.iteritems():
send_op_ordered_inputs.extend(v["grads"])
for i in v["grads"]:
epmap.append(ep)
send_op = program.global_block().append_op(
type="send",
inputs={"X": send_op_ordered_inputs
}, # inputs is a list of tensors to be send
outputs={},
attrs={"endpoints": pserver_endpoints,
"epmap": epmap})
def get_trainer_program(optimize_ops, program):
# remove optimize ops and add a send op to main_program
program.global_block().delete_ops(optimize_ops)
def _create_var_for_trainers(self, block, var, trainers):
var_list = []
for i in xrange(trainers):
var_each = block.create_var(
name="%s.trainer_%d" % (var.name, i),
psersistable=var.persistable,
dtype=var.dtype,
shape=var.shape)
var_list.append(var_each)
return var_list
def get_pserver_program(self, endpoint, optimize_ops):
pserver_program = Program()
for v in self.param_grad_map[endpoint]["params"]:
self._clone_param(pserver_program.global_block(), v)
optimize_sub_program = Program()
grad_var_names = [
var.name for var in self.param_grad_map[endpoint]["grads"]
]
for opt_op in optimize_ops:
for _, var in opt_op.inputs.iteritems():
# NOTE: append operators to merge gradients from multiple
# trainers. If trainers == 1, this is not needed.
if self.trainers > 1 and var.name in grad_var_names:
vars2merge = self._create_var_for_trainers(
optimize_sub_program.global_block(), var, self.trainers)
merged_var = optimize_sub_program.global_block().create_var(
name=var.name,
persistable=var.persistable,
dtype=var.dtype,
shape=var.shape)
optimize_sub_program.global_block().append_op(
type="sum",
inputs={"X": vars2merge},
outputs={"Out": merged_var})
optimize_sub_program.global_block().append_op(
type="scale",
inputs={"X": merged_var},
outputs={"Out": merged_var},
attrs={"scale": 1.0 / float(self.trainers)})
else:
optimize_sub_program.global_block().create_var(
name=var.name,
persistable=var.persistable,
dtype=var.dtype,
shape=var.shape)
if opt_op.inputs.has_key("Grad"):
if opt_op.inputs["Grad"].name in grad_var_names:
print "appending ", opt_op.type, opt_op.inputs
optimize_sub_program.global_block().append_op(
type=opt_op.type,
inputs=opt_op.inputs,
outputs=opt_op.outputs,
attrs=opt_op.attrs)
else:
optimize_sub_program.global_block().append_op(
type=opt_op.type,
inputs=opt_op.inputs,
outputs=opt_op.outputs,
attrs=opt_op.attrs)
pserver_program.global_block().append_op(
type="recv",
inputs={"RX":
self.param_grad_map[endpoint]["grads"]}, # grads to recv
outputs={},
attrs={
"OptimizeProgram": optimize_sub_program.desc,
"endpoint": endpoint,
"ParamList":
[p.name for p in self.param_grad_map[endpoint]["params"]],
"GradList":
[p.name for p in self.param_grad_map[endpoint]["grads"]],
"Trainers": self.trainers
})
pserver_program.sync_with_cpp()
return pserver_program
import numpy as np import numpy as np
from . import core from . import core
from framework import Program, default_main_program from framework import Program, default_main_program, Parameter, Variable
__all__ = ['Executor', 'g_scope'] __all__ = ['Executor', 'g_scope']
...@@ -148,7 +148,7 @@ class Executor(object): ...@@ -148,7 +148,7 @@ class Executor(object):
outputs={'Out': [fetch_var]}, outputs={'Out': [fetch_var]},
attrs={'col': i}) attrs={'col': i})
self.executor.run(program.desc, scope, 0, True) self.executor.run(program.desc, scope, 0, True, True)
outs = [ outs = [
core.get_fetch_variable(scope, fetch_var_name, i) core.get_fetch_variable(scope, fetch_var_name, i)
for i in xrange(len(fetch_list)) for i in xrange(len(fetch_list))
......
...@@ -359,6 +359,10 @@ class Operator(object): ...@@ -359,6 +359,10 @@ class Operator(object):
""" """
self.block = block self.block = block
self.desc = desc self.desc = desc
# for clone a new operator
self.inputs = inputs
self.outputs = outputs
self.attrs = attrs
if len(self.desc.type()) != 0: if len(self.desc.type()) != 0:
return return
if type is None: if type is None:
...@@ -430,13 +434,18 @@ class Operator(object): ...@@ -430,13 +434,18 @@ class Operator(object):
continue continue
if isinstance(attrs[attr_name], Block): if isinstance(attrs[attr_name], Block):
self.desc.set_block_attr(attr_name, attrs[attr_name].desc) self.desc.set_block_attr(attr_name, attrs[attr_name].desc)
elif isinstance(attrs[attr_name], core.BlockDesc) or \
isinstance(attrs[attr_name], core.ProgramDesc):
self.desc.set_serialized_attr(
attr_name, attrs[attr_name].serialize_to_string())
else: else:
self.desc.set_attr(attr_name, attrs[attr_name]) self.desc.set_attr(attr_name, attrs[attr_name])
self.desc.check_attrs() self.desc.check_attrs()
no_kernel_op_set = { no_kernel_op_set = {
'feed', 'fetch', 'save', 'load', 'recurrent', 'feed', 'fetch', 'save', 'load', 'recurrent',
'rnn_memory_helper_grad', 'conditional_block', 'while' 'rnn_memory_helper_grad', 'conditional_block', 'while', 'send',
'recv'
} }
if type not in no_kernel_op_set: if type not in no_kernel_op_set:
self.desc.infer_var_type(self.block.desc) self.desc.infer_var_type(self.block.desc)
...@@ -582,6 +591,7 @@ class Block(object): ...@@ -582,6 +591,7 @@ class Block(object):
self.vars = dict() # var_name --> var self.vars = dict() # var_name --> var
self.ops = collections.deque() # operator list self.ops = collections.deque() # operator list
self.program = program self.program = program
self.removed_vars = dict()
def __str__(self): def __str__(self):
return self.to_string(True) return self.to_string(True)
...@@ -638,6 +648,16 @@ class Block(object): ...@@ -638,6 +648,16 @@ class Block(object):
self.ops.append(op) self.ops.append(op)
return op return op
def delete_ops(self, ops):
# remove from cpp
# FIXME(typhoonzero): remove only the first occuracy.
try:
start = list(self.ops).index(ops[0])
end = list(self.ops).index(ops[-1])
except Exception, e:
raise e
self.desc.remove_op(start, end)
def prepend_op(self, *args, **kwargs): def prepend_op(self, *args, **kwargs):
op_desc = self.desc.prepend_op() op_desc = self.desc.prepend_op()
op = Operator(self, op_desc, *args, **kwargs) op = Operator(self, op_desc, *args, **kwargs)
......
...@@ -207,7 +207,7 @@ class Optimizer(object): ...@@ -207,7 +207,7 @@ class Optimizer(object):
optimize_ops = self.create_optimization_pass(params_grads, loss, optimize_ops = self.create_optimization_pass(params_grads, loss,
startup_program) startup_program)
return optimize_ops return optimize_ops, params_grads
class SGDOptimizer(Optimizer): class SGDOptimizer(Optimizer):
......
from __future__ import print_function
import numpy as np
import paddle.v2 as paddle
import paddle.v2.fluid as fluid
import os
images = fluid.layers.data(name='pixel', shape=[1, 28, 28], dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
conv_pool_1 = fluid.nets.simple_img_conv_pool(
input=images,
filter_size=5,
num_filters=20,
pool_size=2,
pool_stride=2,
act="relu")
conv_pool_2 = fluid.nets.simple_img_conv_pool(
input=conv_pool_1,
filter_size=5,
num_filters=50,
pool_size=2,
pool_stride=2,
act="relu")
predict = fluid.layers.fc(input=conv_pool_2, size=10, act="softmax")
cost = fluid.layers.cross_entropy(input=predict, label=label)
avg_cost = fluid.layers.mean(x=cost)
optimizer = fluid.optimizer.Adam(learning_rate=0.01)
optimize_ops, params_grads = optimizer.minimize(avg_cost)
accuracy = fluid.evaluator.Accuracy(input=predict, label=label)
BATCH_SIZE = 50
PASS_NUM = 3
train_reader = paddle.batch(
paddle.reader.shuffle(
paddle.dataset.mnist.train(), buf_size=500),
batch_size=BATCH_SIZE)
place = fluid.CPUPlace()
exe = fluid.Executor(place)
t = fluid.DistributeTranspiler()
pserver_endpoints = os.getenv("PSERVERS")
training_role = os.getenv("TRAINING_ROLE",
"TRAINER") # get the training role: trainer/pserver
t.transpile(optimize_ops, params_grads, pservers=pserver_endpoints, trainers=1)
if training_role == "PSERVER":
pserver_prog = t.get_pserver_program(pserver_endpoints, optimize_ops)
exe.run(fluid.default_startup_program())
exe.run(pserver_prog)
elif training_role == "TRAINER":
feeder = fluid.DataFeeder(feed_list=[images, label], place=place)
exe.run(fluid.default_startup_program())
for pass_id in range(PASS_NUM):
accuracy.reset(exe)
for data in train_reader():
loss, acc = exe.run(fluid.default_main_program(),
feed=feeder.feed(data),
fetch_list=[avg_cost] + accuracy.metrics)
pass_acc = accuracy.eval(exe)
# print loss, acc
if loss < 10.0 and pass_acc > 0.9:
# if avg cost less than 10.0 and accuracy is larger than 0.9, we think our code is good.
exit(0)
pass_acc = accuracy.eval(exe)
print("pass_id=" + str(pass_id) + " pass_acc=" + str(pass_acc))
else:
print("environment var TRAINER_ROLE should be TRAINER os PSERVER")
exit(1)
...@@ -27,7 +27,7 @@ class TestOptimizer(unittest.TestCase): ...@@ -27,7 +27,7 @@ class TestOptimizer(unittest.TestCase):
block.append_op( block.append_op(
type="mean", inputs={"X": mul_out}, outputs={"Out": mean_out}) type="mean", inputs={"X": mul_out}, outputs={"Out": mean_out})
sgd_optimizer = optimizer.SGDOptimizer(learning_rate=0.01) sgd_optimizer = optimizer.SGDOptimizer(learning_rate=0.01)
opts = sgd_optimizer.minimize(mean_out, init_program) opts, _ = sgd_optimizer.minimize(mean_out, init_program)
self.assertEqual(len(opts), 1) self.assertEqual(len(opts), 1)
sgd_op = opts[0] sgd_op = opts[0]
self.assertEqual(sgd_op.type, "sgd") self.assertEqual(sgd_op.type, "sgd")
...@@ -57,7 +57,7 @@ class TestOptimizer(unittest.TestCase): ...@@ -57,7 +57,7 @@ class TestOptimizer(unittest.TestCase):
learning_rate = 0.01 learning_rate = 0.01
sgd_optimizer = optimizer.SGDOptimizer( sgd_optimizer = optimizer.SGDOptimizer(
learning_rate=learning_rate, global_step=global_step) learning_rate=learning_rate, global_step=global_step)
opts = sgd_optimizer.minimize(mean_out, init_program) opts, _ = sgd_optimizer.minimize(mean_out, init_program)
self.assertEqual(len(opts), 2) self.assertEqual(len(opts), 2)
sgd_op = opts[0] sgd_op = opts[0]
self.assertEqual(sgd_op.type, "sgd") self.assertEqual(sgd_op.type, "sgd")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册