提交 71655334 编写于 作者: T typhoonzero

update

上级 dd46d95f
...@@ -72,8 +72,10 @@ class RecvOp : public framework::OperatorBase { ...@@ -72,8 +72,10 @@ class RecvOp : public framework::OperatorBase {
// FIXME(typhoonzero): do not copy // FIXME(typhoonzero): do not copy
framework::CopyFrom(t, dev_ctx.GetPlace(), dev_ctx, tensor); framework::CopyFrom(t, dev_ctx.GetPlace(), dev_ctx, tensor);
auto *block = Attr<framework::BlockDescBind *>("OptimizeBlock"); std::string program_str = Attr<std::string>("OptimizeProgram");
auto *program = block->Program(); framework::Program program_desc;
program_desc.ParseFromString(program_str);
framework::ProgramDescBind program(program_desc);
framework::Executor executor(dev_ctx); framework::Executor executor(dev_ctx);
// Run sub graph to get optimized tensor // Run sub graph to get optimized tensor
executor.Run(*program, &recv_scope, block->ID(), executor.Run(*program, &recv_scope, block->ID(),
...@@ -108,8 +110,9 @@ This operator will recv tensor from send_op ...@@ -108,8 +110,9 @@ This operator will recv tensor from send_op
"IP address to listen on.") "IP address to listen on.")
.SetDefault("127.0.0.1:6164") .SetDefault("127.0.0.1:6164")
.AddCustomChecker([](const std::string &ip) { return !ip.empty(); }); .AddCustomChecker([](const std::string &ip) { return !ip.empty(); });
AddAttr<framework::BlockDescBind *>("OptimizeBlock", "type BlockDescBind*", AddAttr<framework::BlockDescBind *>(
"optimize network run in server"); "OptimizeProgram", "type string",
"Serialized ProgramDesc string for recv to run.");
} }
}; };
......
...@@ -85,7 +85,7 @@ void StartServerNet() { ...@@ -85,7 +85,7 @@ void StartServerNet() {
paddle::framework::AttributeMap attrs; paddle::framework::AttributeMap attrs;
attrs.insert({"endpoint", std::string("127.0.0.1:6174")}); attrs.insert({"endpoint", std::string("127.0.0.1:6174")});
attrs.insert({"OptimizeBlock", block}); attrs.insert({"OptimizeProgram", program.Proto()->SerializeToString()});
recv_op = paddle::framework::OpRegistry::CreateOp("recv", {{"RX", {"RX"}}}, recv_op = paddle::framework::OpRegistry::CreateOp("recv", {{"RX", {"RX"}}},
{{"Out", {"Out"}}}, attrs); {{"Out", {"Out"}}}, attrs);
paddle::platform::CPUDeviceContext ctx(place); paddle::platform::CPUDeviceContext ctx(place);
......
...@@ -4,172 +4,46 @@ from regularizer import append_regularization_ops ...@@ -4,172 +4,46 @@ from regularizer import append_regularization_ops
import optimizer import optimizer
from layer_helper import LayerHelper from layer_helper import LayerHelper
__all__ = ['SGD', 'Momentum', 'Adagrad', 'Adam', 'Adamax', 'DecayedAdagrad']
def hash_name_to_server(params_grads, pserver_endpoints):
"""
:param param_grads:
:return: a map of pserver endpoint ->
params -> [param list]
grads -> [grad list]
"""
def hash_name_to_server(parameters, pserver_endpoints):
def _hash_param(param_name, total): def _hash_param(param_name, total):
return hash(param_name) % total return hash(param_name) % total
param_map = dict() param_grad_map = dict()
for param in parameters: for param, grad in params_grads:
if param.trainable is True: if param.trainable is True and grad is not None:
server_id = _hash_param(param.name, len(pserver_endpoints)) server_id = _hash_param(param.name, len(pserver_endpoints))
server_for_param = pserver_endpoints[server_id] server_for_param = pserver_endpoints[server_id]
if param_map.has_key(server_for_param): if not param_grad_map.has_key(server_for_param):
param_map[server_for_param].append(param) param_grad_map[server_for_param] = {"params": [], "grads": []}
else: param_grad_map[server_for_param]["params"].append(param)
param_map[server_for_param] = [param] param_grad_map[server_for_param]["grads"].append(grad)
return param_map return param_grad_map
def round_robin(parameters, pserver_endpoints): def round_robin(parameters, pserver_endpoints):
assert (len(parameters) < len(pserver_endpoints)) assert (len(parameters) < len(pserver_endpoints))
param_map = dict() param_grad_map = dict()
pserver_idx = 0 pserver_idx = 0
for param in parameters: for param in parameters:
if param.trainable is True: if param.trainable is True:
server_for_param = pserver_endpoints[pserver_idx] server_for_param = pserver_endpoints[pserver_idx]
if param_map.has_key(server_for_param): if not param_grad_map.has_key(server_for_param):
param_map[server_for_param].append(param) param_grad_map[server_for_param] = {"params": [], "grads": []}
else:
param_map[server_for_param] = [param] param_grad_map[server_for_param]["params"].append(param)
param_grad_map[server_for_param]["grads"].append(param)
pserver_idx += 1 pserver_idx += 1
if pserver_idx > len(pserver_endpoints): if pserver_idx > len(pserver_endpoints):
pserver_idx = 0 pserver_idx = 0
return param_map return param_grad_map
def _append_sendop_for_trainer(loss,
parameters_and_grads,
pserver_endpoints,
split_method=round_robin):
assert (callable(split_method))
param_map, grad_map = \
split_method(parameters_and_grads, pserver_endpoints)
for ep in pserver_endpoints:
# FIXME(typhoonzero): send to different servers can run in parrallel.
send_op = loss.block.append_op(
type="send",
inputs={"X": param_map[ep]},
outputs={"Out": param_map[ep]},
attrs={"endpoint": ep})
return send_op
class DistributedPlanner(optimizer.Optimizer):
def __init__(self, global_step=None, parallelism_type='dp'):
"""
parallelism_type:
dp: data parallelism
mp: model parallelism
"""
super(DistributedPlanner).__init__(self, global_step)
if parallelism_type == "mp":
raise NotImplementedError("model parallelism not implemented")
elif parallelism_type == "dp":
self.parameter_server_program_map = dict()
self.worker_program = None
else:
raise NameError("parallelism_type %s not supported" %
parallelism_type)
def create_optimization_pass(self,
parameters_and_grads,
program,
startup_program=None):
# Create any accumulators
self.helper = LayerHelper(
self.__class__.__name__,
main_program=program,
startup_program=startup_program)
self._create_accumulators(program.global_block(),
[p[0] for p in parameters_and_grads])
optimize_ops = []
for param_and_grad in parameters_and_grads:
if param_and_grad[0].trainable is True and param_and_grad[
1] is not None:
optimize_op = self._append_optimize_op(program.global_block(),
param_and_grad)
optimize_ops.append(optimize_op)
# Returned list of ops can include more ops in addition
# to optimization ops
return_ops = optimize_ops
# Get custom finish ops for subclasses
# FIXME: Need to fix this once we figure out how to handle dependencies
finish_ops = self._finish_update(program.global_block())
if finish_ops is not None:
return_ops += finish_ops
if self._global_step is not None:
return_ops.append(
self._increment_global_step(program.global_block()))
return return_ops
def minimize(self,
loss,
startup_program=None,
parameter_list=None,
no_grad_set=None,
split_method=round_robin):
"""
For distributed case, this call append backward ops and then
append sevaral send_ops at the end for each parameter server.
Then call get_pserver_program(idx/endpoint) will return the program of
coresponding pserver program to run.
"""
params_grads = append_backward_ops(loss, parameter_list, no_grad_set)
# Add regularization if any
params_grads = append_regularization_ops(params_grads)
_append_sendop_for_trainer(loss, params_grads, self.pserver_endpoints,
split_method)
self.worker_program = loss.block.program
optimize_sub_program = framework.Program()
optimize_ops = self.create_optimization_pass(
params_grads, optimize_sub_program, startup_program)
param_list = []
for param_and_grad in params_grads:
if param_and_grad[0].trainable is True and param_and_grad[
1] is not None:
param_list.append(param_and_grad[0])
param_map, grad_map = \
split_method(params_grads, self.pserver_endpoints)
for ep in self.pserver_endpoints:
pserver_program = framework.Program()
self.parameter_server_program_map[ep] = pserver_program
pserver_program.global_block().append_op(
type="recv",
inputs={"RX": param_map[ep]},
outputs={},
attrs={
"OptimizeBlock": optimize_sub_program.global_block(),
"endpoint": ep
})
# FIXME(typhoonzero): when to use this return value?
return None
def get_pserver_program(self, endpoint):
return self.parameter_server_program_map.get(endpoint)
SGD = optimizer.SGDOptimizer
Momentum = optimizer.MomentumOptimizer
Adagrad = optimizer.AdagradOptimizer
Adam = optimizer.AdamOptimizer
Adamax = optimizer.AdamaxOptimizer
DecayedAdagrad = optimizer.DecayedAdagradOptimizer
for optcls in __all__:
eval(optcls).__base__ = DistributedPlanner
...@@ -69,7 +69,8 @@ class Executor(object): ...@@ -69,7 +69,8 @@ class Executor(object):
if kwargs.has_key("pservers"): if kwargs.has_key("pservers"):
return self._optimize_distributed(optimize_ops, program, **kwargs) return self._optimize_distributed(optimize_ops, program, **kwargs)
def _optimize_distributed(self, optimize_ops, program, **kwargs): def _optimize_distributed(self, optimize_ops, program, params_and_grads,
**kwargs):
# remove optimize ops and add a send op to main_program # remove optimize ops and add a send op to main_program
# FIXME(typhoonzero): delete_op only remove the first accurence, # FIXME(typhoonzero): delete_op only remove the first accurence,
# need to consider about multiple same optimize op? # need to consider about multiple same optimize op?
...@@ -83,43 +84,36 @@ class Executor(object): ...@@ -83,43 +84,36 @@ class Executor(object):
assert (callable(split_method)) assert (callable(split_method))
pserver_endpoints = kwargs["pservers"].split(",") pserver_endpoints = kwargs["pservers"].split(",")
params = program.global_block().all_parameters() params = program.global_block().all_parameters()
param_map = split_method(params, pserver_endpoints) self.param_grad_map = split_method(params, pserver_endpoints)
for ep in pserver_endpoints: for ep in pserver_endpoints:
# FIXME(typhoonzero): send to different servers can run in parrallel. # FIXME(typhoonzero): send to different servers can run in parrallel.
send_op = program.global_block().append_op( send_op = program.global_block().append_op(
type="send", type="send",
inputs={"X": param_map[ep] inputs={"X": self.param_grad_map[ep]["params"]
}, # inputs is a list of tensors to be send }, # inputs is a list of tensors to be send
outputs={"Out": param_map[ep]}, outputs={"Out": self.param_grad_map[ep]["params"]},
attrs={"endpoint": ep}) attrs={"endpoint": ep})
# -------------- generate pserver program -------------- # -------------- generate optimize sub program --------------
self.parameter_server_program_map = dict() self.optimize_sub_program = Program()
for opt_op in optimize_ops:
optimize_sub_program = Program() self.optimize_sub_program.global_block().ops.append(opt_op)
optimize_ops = self.create_optimization_pass(
params_grads, optimize_sub_program, startup_program)
param_list = []
for param in params:
if param.trainable is True:
param_list.append(param)
param_map = split_method(params, pserver_endpoints)
for ep in pserver_endpoints:
pserver_program = Program()
self.parameter_server_program_map[ep] = pserver_program
pserver_program.global_block().append_op(
type="recv",
inputs={"RX": param_map[ep]}, # grads to recv
outputs={},
attrs={
"OptimizeBlock": optimize_sub_program.global_block(),
"endpoint": ep
})
def get_pserver_program(self, endpoint): def get_pserver_program(self, endpoint):
pass pserver_program = Program()
for param in self.param_grad_map[endpoint]["params"]:
pserver_program.global_block().create_parameter(**param.__dict__)
pserver_program.global_block().append_op(
type="recv",
inputs={"RX":
self.param_grad_map[endpoint]["grads"]}, # grads to recv
outputs={},
attrs={
"OptimizeProgram": self.optimize_sub_program.to_string(),
"endpoint": endpoint
})
def get_trainer_program(self): def get_trainer_program(self):
return default_main_program() return default_main_program()
......
...@@ -37,24 +37,33 @@ train_reader = paddle.batch( ...@@ -37,24 +37,33 @@ train_reader = paddle.batch(
place = fluid.CPUPlace() place = fluid.CPUPlace()
exe = fluid.Executor(place) exe = fluid.Executor(place)
feeder = fluid.DataFeeder(feed_list=[images, label], place=place)
exe.run(fluid.default_startup_program()) exe.optimize(pservers="127.0.0.1:6174", trainers=1)
for pass_id in range(PASS_NUM): pserver_endpoint = os.getenv("PSERVER")
accuracy.reset(exe) if is_pserver:
for data in train_reader(): pserver_prog = exe.get_pserver_program(pserver_endpoint)
loss, acc = exe.run(fluid.default_main_program(), exe.run(fluid.default_startup_program())
feed=feeder.feed(data), exe.run(pserver_prog)
fetch_list=[avg_cost] + accuracy.metrics) else:
feeder = fluid.DataFeeder(feed_list=[images, label], place=place)
exe.run(fluid.default_startup_program())
for pass_id in range(PASS_NUM):
accuracy.reset(exe)
for data in train_reader():
loss, acc = exe.run(fluid.default_main_program(),
feed=feeder.feed(data),
fetch_list=[avg_cost] + accuracy.metrics)
pass_acc = accuracy.eval(exe)
print("pass_id=" + str(pass_id) + " acc=" + str(acc) + " pass_acc="
+ str(pass_acc))
# print loss, acc
if loss < 10.0 and pass_acc > 0.9:
# if avg cost less than 10.0 and accuracy is larger than 0.9, we think our code is good.
exit(0)
pass_acc = accuracy.eval(exe) pass_acc = accuracy.eval(exe)
print("pass_id=" + str(pass_id) + " acc=" + str(acc) + " pass_acc=" + print("pass_id=" + str(pass_id) + " pass_acc=" + str(pass_acc))
str(pass_acc))
# print loss, acc
if loss < 10.0 and pass_acc > 0.9:
# if avg cost less than 10.0 and accuracy is larger than 0.9, we think our code is good.
exit(0)
pass_acc = accuracy.eval(exe)
print("pass_id=" + str(pass_id) + " pass_acc=" + str(pass_acc))
exit(1) exit(1)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册