提交 1b20096a 编写于 作者: T typhoonzero

done

上级 40d0fff2
...@@ -62,17 +62,29 @@ class RecvOp : public framework::OperatorBase { ...@@ -62,17 +62,29 @@ class RecvOp : public framework::OperatorBase {
server_thread_->join(); server_thread_->join();
} }
std::string GetGradVarNameForTrainer(const std::string &varname) const {
if (grads_counter_.find(varname) != grads_counter_.end()) {
grads_counter_[varname] = 0;
}
char ret[256];
snprintf(ret, sizeof(ret), "%s.trainer_%d", varname.c_str(),
grads_counter_[varname]++);
return std::string(ret);
}
void Run(const framework::Scope &scope, void Run(const framework::Scope &scope,
const platform::DeviceContext &dev_ctx) const override { const platform::DeviceContext &dev_ctx) const override {
// FIXME(typhoonzero): no new scopes for every run. // FIXME(typhoonzero): no new scopes for every run.
framework::Scope &recv_scope = scope.NewScope(); framework::Scope &recv_scope = scope.NewScope();
auto param_list = Attr<std::vector<std::string>>("ParamList"); auto param_list = Attr<std::vector<std::string>>("ParamList");
auto grad_list = Attr<std::vector<std::string>>("GradList"); auto grad_list = Attr<std::vector<std::string>>("GradList");
auto trainer_count = Attr<int>("Trainers");
size_t param_count = param_list.size(); size_t param_count = param_list.size();
// TODO(typhoonzero): change this to a while_op for every cluster-batch. // TODO(typhoonzero): change this to a while_op for every cluster-batch.
while (true) { while (true) {
// TODO(typhoonzero): get from multiple trainers. // Get from multiple trainers, we don't care about order in which
for (size_t i = 0; i < param_count; ++i) { // the gradient arrives, just add suffix 0~n then average the gradient.
for (size_t i = 0; i < param_count * trainer_count; ++i) {
// blocking get one var from client. // blocking get one var from client.
const detail::TensorWithName &v = rpc_service_->Get(); const detail::TensorWithName &v = rpc_service_->Get();
auto grad_var_name = v.first; auto grad_var_name = v.first;
...@@ -83,6 +95,14 @@ class RecvOp : public framework::OperatorBase { ...@@ -83,6 +95,14 @@ class RecvOp : public framework::OperatorBase {
} }
VLOG(10) << "recved grad: " << grad_var_name VLOG(10) << "recved grad: " << grad_var_name
<< " updating param: " << param_var_name; << " updating param: " << param_var_name;
if (trainer_count > 1) {
auto *var = recv_scope.FindVar(grad_var_name);
if (var != nullptr) {
// must rename the var to different names to merge gradient.
grad_var_name = this->GetGradVarNameForTrainer(grad_var_name);
}
}
auto *var = recv_scope.Var(grad_var_name); auto *var = recv_scope.Var(grad_var_name);
auto *tensor = var->GetMutable<framework::LoDTensor>(); auto *tensor = var->GetMutable<framework::LoDTensor>();
// FIXME(typhoonzero): do not copy // FIXME(typhoonzero): do not copy
...@@ -119,6 +139,7 @@ class RecvOp : public framework::OperatorBase { ...@@ -119,6 +139,7 @@ class RecvOp : public framework::OperatorBase {
// grpc send/recv service implement to register. // grpc send/recv service implement to register.
std::shared_ptr<detail::SendRecvServerImpl> rpc_service_; std::shared_ptr<detail::SendRecvServerImpl> rpc_service_;
std::shared_ptr<std::thread> server_thread_; std::shared_ptr<std::thread> server_thread_;
mutable std::unordered_map<std::string, int> grads_counter_;
}; };
class RecvOpMaker : public framework::OpProtoAndCheckerMaker { class RecvOpMaker : public framework::OpProtoAndCheckerMaker {
...@@ -144,6 +165,9 @@ This operator will recv tensor from send_op ...@@ -144,6 +165,9 @@ This operator will recv tensor from send_op
AddAttr<std::vector<std::string>>( AddAttr<std::vector<std::string>>(
"GradList", "type list of string", "GradList", "type list of string",
"grad->param name mapping to find which param to optimize."); "grad->param name mapping to find which param to optimize.");
AddAttr<int>("Trainers", "type int",
"Number of trainers in the current cluster job")
.SetDefault(1);
} }
}; };
......
...@@ -47,14 +47,12 @@ class SendOp : public framework::OperatorBase { ...@@ -47,14 +47,12 @@ class SendOp : public framework::OperatorBase {
// TODO(typhoonzero): currently it's non-blocking, // TODO(typhoonzero): currently it's non-blocking,
// should block until server responds. // should block until server responds.
for (auto in : ins) { for (auto in : ins) {
LOG(ERROR) << "sending grad: " << in;
bool ret = client_->SendVariable(scope, in); bool ret = client_->SendVariable(scope, in);
if (!ret) { if (!ret) {
LOG(ERROR) << "send variable error"; LOG(ERROR) << "send variable error";
} }
} }
for (auto in : ins) { for (auto in : ins) {
LOG(ERROR) << "updating from server...";
bool ret = client_->GetVariable(scope); bool ret = client_->GetVariable(scope);
if (!ret) { if (!ret) {
LOG(ERROR) << "GetVariable error"; LOG(ERROR) << "GetVariable error";
......
...@@ -16,12 +16,13 @@ import regularizer ...@@ -16,12 +16,13 @@ import regularizer
from param_attr import ParamAttr from param_attr import ParamAttr
from data_feeder import DataFeeder from data_feeder import DataFeeder
from core import LoDTensor, CPUPlace, GPUPlace from core import LoDTensor, CPUPlace, GPUPlace
from distribute_transpiler import DistributeTranspiler
Tensor = LoDTensor Tensor = LoDTensor
__all__ = framework.__all__ + executor.__all__ + [ __all__ = framework.__all__ + executor.__all__ + [
'io', 'initializer', 'layers', 'nets', 'optimizer', 'backward', 'io', 'initializer', 'layers', 'nets', 'optimizer', 'backward',
'regularizer', 'LoDTensor', 'CPUPlace', 'GPUPlace', 'Tensor', 'ParamAttr' 'regularizer', 'LoDTensor', 'CPUPlace', 'GPUPlace', 'Tensor', 'ParamAttr'
'DataFeeder' 'DataFeeder', 'DistributeTranspiler'
] ]
......
import framework
from backward import append_backward_ops
from regularizer import append_regularization_ops
import optimizer
from layer_helper import LayerHelper
def hash_name_to_server(params_grads, pserver_endpoints):
"""
:param param_grads:
:return: a map of pserver endpoint ->
params -> [param list]
grads -> [grad list]
"""
def _hash_param(param_name, total):
return hash(param_name) % total
param_grad_map = dict()
for param, grad in params_grads:
if param.trainable is True and grad is not None:
server_id = _hash_param(param.name, len(pserver_endpoints))
server_for_param = pserver_endpoints[server_id]
if not param_grad_map.has_key(server_for_param):
param_grad_map[server_for_param] = {"params": [], "grads": []}
param_grad_map[server_for_param]["params"].append(param)
param_grad_map[server_for_param]["grads"].append(grad)
return param_grad_map
def round_robin(params_grads, pserver_endpoints):
assert (len(params_grads) > len(pserver_endpoints))
param_grad_map = dict()
pserver_idx = 0
for param, grad in params_grads:
if param.trainable is True:
server_for_param = pserver_endpoints[pserver_idx]
if not param_grad_map.has_key(server_for_param):
param_grad_map[server_for_param] = {"params": [], "grads": []}
param_grad_map[server_for_param]["params"].append(param)
param_grad_map[server_for_param]["grads"].append(grad)
pserver_idx += 1
if pserver_idx >= len(pserver_endpoints):
pserver_idx = 0
return param_grad_map
import framework
from framework import Program, default_main_program, Parameter, Variable
import optimizer
from layer_helper import LayerHelper
def hash_name_to_server(params_grads, pserver_endpoints):
"""
:param param_grads:
:return: a map of pserver endpoint ->
params -> [param list]
grads -> [grad list]
"""
def _hash_param(param_name, total):
return hash(param_name) % total
param_grad_map = dict()
for param, grad in params_grads:
if param.trainable is True and grad is not None:
server_id = _hash_param(param.name, len(pserver_endpoints))
server_for_param = pserver_endpoints[server_id]
if not param_grad_map.has_key(server_for_param):
param_grad_map[server_for_param] = {"params": [], "grads": []}
param_grad_map[server_for_param]["params"].append(param)
param_grad_map[server_for_param]["grads"].append(grad)
return param_grad_map
def round_robin(params_grads, pserver_endpoints):
assert (len(params_grads) > len(pserver_endpoints))
param_grad_map = dict()
pserver_idx = 0
for param, grad in params_grads:
if param.trainable is True:
server_for_param = pserver_endpoints[pserver_idx]
if not param_grad_map.has_key(server_for_param):
param_grad_map[server_for_param] = {"params": [], "grads": []}
param_grad_map[server_for_param]["params"].append(param)
param_grad_map[server_for_param]["grads"].append(grad)
pserver_idx += 1
if pserver_idx >= len(pserver_endpoints):
pserver_idx = 0
return param_grad_map
class DistributeTranspiler:
def transpile(self,
optimize_ops,
params_grads,
program=None,
pservers="127.0.0.1:6174",
trainers=1,
split_method=round_robin):
"""
Transpile the program to a distributed data-parallelism programs.
The main_program will be transform to use a remote parameter server
to do parameter optimization. And the optimization graph will be put
in to a parameter server program.
Use different methods to split trainable varialbles to different
parameter servers.
:param optimize_ops: op list of optimization, should be the
return value of Optimizer.minimize
:type optimize_ops: list
:param program: program to optimize, default default_main_program
:param pservers: parameter server endpoints like "m1:6174,m2:6174"
:type pservers: string
:return: return a list of programs
"""
if program is None:
program = default_main_program()
self.trainers = trainers
self._optimize_distributed(
optimize_ops,
program,
params_grads,
pservers=pservers,
trainers=trainers,
split_method=split_method)
def _clone_param(self, block, v):
assert isinstance(v, Parameter)
new_p = Parameter(
block=block,
shape=v.shape,
dtype=v.dtype,
type=v.type,
lod_level=v.lod_level,
stop_gradient=v.stop_gradient,
trainable=v.trainable,
optimize_attr=v.optimize_attr,
regularizer=v.regularizer,
name=v.name)
block.vars[new_p.name] = new_p
def _clone_var(self, block, var):
assert isinstance(var, Variable)
return block.create_var(
name=var.name,
shape=var.shape,
dtype=var.dtype,
type=var.type,
lod_level=var.lod_level,
persistable=var.persistable)
def _optimize_distributed(self, optimize_ops, program, params_and_grads,
**kwargs):
# remove optimize ops and add a send op to main_program
# FIXME(typhoonzero): delete_op only remove the first accurance,
# need to consider about multiple same optimize op?
for op in optimize_ops:
program.global_block().delete_op(op)
if kwargs.has_key("split_method"):
split_method = kwargs["split_method"]
else:
split_method = round_robin
assert (callable(split_method))
pserver_endpoints = kwargs["pservers"].split(",")
self.param_grad_map = split_method(params_and_grads, pserver_endpoints)
for ep in pserver_endpoints:
# FIXME(typhoonzero): send to different servers can run in parrallel.
send_op = program.global_block().append_op(
type="send",
inputs={"X": self.param_grad_map[ep]["grads"]
}, # inputs is a list of tensors to be send
outputs={},
attrs={"endpoint": ep})
def _create_var_for_trainers(self, block, var, trainers):
var_list = []
for i in xrange(trainers):
var_each = block.create_var(
name="%s.trainer_%d" % (var.name, i),
psersistable=var.persistable,
dtype=var.dtype,
shape=var.shape)
var_list.append(var_each)
return var_list
def get_pserver_program(self, endpoint, optimize_ops):
pserver_program = Program()
for v in self.param_grad_map[endpoint]["params"]:
self._clone_param(pserver_program.global_block(), v)
optimize_sub_program = Program()
grad_var_names = [
var.name for var in self.param_grad_map[endpoint]["grads"]
]
for opt_op in optimize_ops:
for _, var in opt_op.inputs.iteritems():
# NOTE: append operators to merge gradients from multiple
# trainers. If trainers == 1, this is not needed.
if self.trainers > 1 and var.name in grad_var_names:
vars2merge = self._create_var_for_trainers(
optimize_sub_program.global_block(), var, self.trainers)
merged_var = optimize_sub_program.global_block().create_var(
name=var.name,
persistable=var.persistable,
dtype=var.dtype,
shape=var.shape)
optimize_sub_program.global_block().append_op(
type="sum",
inputs={"X": vars2merge},
outputs={"Out": merged_var})
optimize_sub_program.global_block().append_op(
type="scale",
inputs={"X": merged_var},
outputs={"Out": merged_var},
attrs={"scale": 1.0 / float(self.trainers)})
else:
optimize_sub_program.global_block().create_var(
name=var.name,
persistable=var.persistable,
dtype=var.dtype,
shape=var.shape)
optimize_sub_program.global_block().append_op(
type=opt_op.type,
inputs=opt_op.inputs,
outputs=opt_op.outputs,
attrs=opt_op.attrs)
pserver_program.global_block().append_op(
type="recv",
inputs={"RX":
self.param_grad_map[endpoint]["grads"]}, # grads to recv
outputs={},
attrs={
"OptimizeProgram": optimize_sub_program.desc,
"endpoint": endpoint,
"ParamList":
[p.name for p in self.param_grad_map[endpoint]["params"]],
"GradList":
[p.name for p in self.param_grad_map[endpoint]["grads"]],
"Trainers": self.trainers
})
pserver_program.sync_with_cpp()
return pserver_program
...@@ -50,111 +50,6 @@ class Executor(object): ...@@ -50,111 +50,6 @@ class Executor(object):
self.executor = core.Executor(act_places) self.executor = core.Executor(act_places)
self.places = places self.places = places
def optimize(self, optimize_ops, params_grads, program=None, **kwargs):
"""
optimize the program for different runtime environment
:param optimize_ops: op list of optimization, should be the
return value of Optimizer.minimize
:type optimize_ops: list
:param program: program to optimize, default default_main_program
:param pservers: parameter server endpoints like "m1:6174,m2:6174"
:type pservers: string
:return: return a list of programs
"""
if program is None:
program = default_main_program()
if kwargs.has_key("pservers"):
return self._optimize_distributed(optimize_ops, program,
params_grads, **kwargs)
def _clone_param(self, block, v):
assert isinstance(v, Parameter)
new_p = Parameter(
block=block,
shape=v.shape,
dtype=v.dtype,
type=v.type,
lod_level=v.lod_level,
stop_gradient=v.stop_gradient,
trainable=v.trainable,
optimize_attr=v.optimize_attr,
regularizer=v.regularizer,
name=v.name)
block.vars[new_p.name] = new_p
def _clone_var(self, block, var):
assert isinstance(var, Variable)
return block.create_var(
name=var.name,
shape=var.shape,
dtype=var.dtype,
type=var.type,
lod_level=var.lod_level,
persistable=var.persistable)
def _optimize_distributed(self, optimize_ops, program, params_and_grads,
**kwargs):
# remove optimize ops and add a send op to main_program
# FIXME(typhoonzero): delete_op only remove the first accurence,
# need to consider about multiple same optimize op?
for op in optimize_ops:
program.global_block().delete_op(op)
if kwargs.has_key("split_method"):
split_method = kwargs["split_method"]
else:
split_method = distribute_planner.round_robin
assert (callable(split_method))
pserver_endpoints = kwargs["pservers"].split(",")
self.param_grad_map = split_method(params_and_grads, pserver_endpoints)
for ep in pserver_endpoints:
# FIXME(typhoonzero): send to different servers can run in parrallel.
send_op = program.global_block().append_op(
type="send",
inputs={"X": self.param_grad_map[ep]["grads"]
}, # inputs is a list of tensors to be send
outputs={},
attrs={"endpoint": ep})
def get_pserver_program(self, endpoint, optimize_ops):
pserver_program = Program()
for v in self.param_grad_map[endpoint]["params"]:
self._clone_param(pserver_program.global_block(), v)
optimize_sub_program = Program()
for opt_op in optimize_ops:
for varname, var in opt_op.inputs.iteritems():
optimize_sub_program.global_block().create_var(
name=var.name,
persistable=var.persistable,
dtype=var.dtype,
shape=var.shape)
optimize_sub_program.global_block().append_op(
type=opt_op.type,
inputs=opt_op.inputs,
outputs=opt_op.outputs,
attrs=opt_op.attrs)
pserver_program.global_block().append_op(
type="recv",
inputs={"RX":
self.param_grad_map[endpoint]["grads"]}, # grads to recv
outputs={},
attrs={
"OptimizeProgram": optimize_sub_program.desc,
"endpoint": endpoint,
"ParamList":
[p.name for p in self.param_grad_map[endpoint]["params"]],
"GradList":
[p.name for p in self.param_grad_map[endpoint]["grads"]]
})
pserver_program.sync_with_cpp()
return pserver_program
def aslodtensor(self, data): def aslodtensor(self, data):
def accumulate(data): def accumulate(data):
if not isinstance(data, list): if not isinstance(data, list):
......
...@@ -38,17 +38,14 @@ train_reader = paddle.batch( ...@@ -38,17 +38,14 @@ train_reader = paddle.batch(
place = fluid.CPUPlace() place = fluid.CPUPlace()
exe = fluid.Executor(place) exe = fluid.Executor(place)
t = fluid.DistributeTranspiler()
exe.optimize(optimize_ops, params_grads, pservers="127.0.0.1:6174", trainers=1) t.transpile(optimize_ops, params_grads, pservers="127.0.0.1:6174", trainers=1)
pserver_endpoint = os.getenv("PSERVER") pserver_endpoint = os.getenv("PSERVER")
if pserver_endpoint: if pserver_endpoint:
pserver_prog = exe.get_pserver_program(pserver_endpoint, optimize_ops) pserver_prog = t.get_pserver_program(pserver_endpoint, optimize_ops)
print("pserver startup: ", fluid.default_startup_program())
exe.run(fluid.default_startup_program()) exe.run(fluid.default_startup_program())
while True: exe.run(pserver_prog)
exe.run(pserver_prog)
print("Run pserver once end...")
else: else:
feeder = fluid.DataFeeder(feed_list=[images, label], place=place) feeder = fluid.DataFeeder(feed_list=[images, label], place=place)
exe.run(fluid.default_startup_program()) exe.run(fluid.default_startup_program())
...@@ -60,8 +57,6 @@ else: ...@@ -60,8 +57,6 @@ else:
feed=feeder.feed(data), feed=feeder.feed(data),
fetch_list=[avg_cost] + accuracy.metrics) fetch_list=[avg_cost] + accuracy.metrics)
pass_acc = accuracy.eval(exe) pass_acc = accuracy.eval(exe)
print("pass_id=" + str(pass_id) + " acc=" + str(acc) + " pass_acc="
+ str(pass_acc))
# print loss, acc # print loss, acc
if loss < 10.0 and pass_acc > 0.9: if loss < 10.0 and pass_acc > 0.9:
# if avg cost less than 10.0 and accuracy is larger than 0.9, we think our code is good. # if avg cost less than 10.0 and accuracy is larger than 0.9, we think our code is good.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册