提交 dd46d95f 编写于 作者: T typhoonzero

wip

上级 b18ca5f8
...@@ -7,55 +7,40 @@ from layer_helper import LayerHelper ...@@ -7,55 +7,40 @@ from layer_helper import LayerHelper
__all__ = ['SGD', 'Momentum', 'Adagrad', 'Adam', 'Adamax', 'DecayedAdagrad'] __all__ = ['SGD', 'Momentum', 'Adagrad', 'Adam', 'Adamax', 'DecayedAdagrad']
def hash_name_to_server(parameters_and_grads, pserver_endpoints): def hash_name_to_server(parameters, pserver_endpoints):
def _hash_param(param_name, total): def _hash_param(param_name, total):
return hash(param_name) % total return hash(param_name) % total
param_map = dict() param_map = dict()
grad_map = dict() for param in parameters:
for param_and_grad in parameters_and_grads: if param.trainable is True:
if param_and_grad[0].trainable is True and param_and_grad[ server_id = _hash_param(param.name, len(pserver_endpoints))
1] is not None:
server_id = _hash_param(param_and_grad[0].name,
len(pserver_endpoints))
server_for_param = pserver_endpoints[server_id] server_for_param = pserver_endpoints[server_id]
if param_map.has_key(server_for_param): if param_map.has_key(server_for_param):
param_map[server_for_param].append(param_and_grad[0]) param_map[server_for_param].append(param)
else: else:
param_map[server_for_param] = [param_and_grad[0]] param_map[server_for_param] = [param]
if grad_map.has_key(server_for_param): return param_map
grad_map[server_for_param].append(param_and_grad[1])
else:
grad_map[server_for_param] = [param_and_grad[1]]
return param_map, grad_map
def round_robin(parameters_and_grads, pserver_endpoints): def round_robin(parameters, pserver_endpoints):
if len(parameters_and_grads) < len(pserver_endpoints): assert (len(parameters) < len(pserver_endpoints))
raise Exception("parameters is less than pservers")
param_map = dict() param_map = dict()
grad_map = dict()
pserver_idx = 0 pserver_idx = 0
for param_and_grad in parameters_and_grads: for param in parameters:
if param_and_grad[0].trainable is True and param_and_grad[ if param.trainable is True:
1] is not None:
server_for_param = pserver_endpoints[pserver_idx] server_for_param = pserver_endpoints[pserver_idx]
if param_map.has_key(server_for_param): if param_map.has_key(server_for_param):
param_map[server_for_param].append(param_and_grad[0]) param_map[server_for_param].append(param)
else: else:
param_map[server_for_param] = [param_and_grad[0]] param_map[server_for_param] = [param]
if grad_map.has_key(server_for_param):
grad_map[server_for_param].append(param_and_grad[1])
else:
grad_map[server_for_param] = [param_and_grad[1]]
pserver_idx += 1 pserver_idx += 1
if pserver_idx > len(pserver_endpoints): if pserver_idx > len(pserver_endpoints):
pserver_idx = 0 pserver_idx = 0
return param_map, grad_map return param_map
def _append_sendop_for_trainer(loss, def _append_sendop_for_trainer(loss,
......
import numpy as np import numpy as np
from . import core from . import core
from framework import Program, default_main_program from framework import Program, default_main_program
import distribute_planner
__all__ = ['Executor', 'g_scope'] __all__ = ['Executor', 'g_scope']
...@@ -49,6 +50,80 @@ class Executor(object): ...@@ -49,6 +50,80 @@ class Executor(object):
self.executor = core.Executor(act_places) self.executor = core.Executor(act_places)
self.places = places self.places = places
def optimize(self, optimize_ops, program=None, **kwargs):
"""
optimize the program for different runtime environment
:param optimize_ops: op list of optimization, should be the
return value of Optimizer.minimize
:type optimize_ops: list
:param program: program to optimize, default default_main_program
:param pservers: parameter server endpoints like "m1:6174,m2:6174"
:type pservers: string
:return: return a list of programs
"""
if program is None:
program = default_main_program()
if kwargs.has_key("pservers"):
return self._optimize_distributed(optimize_ops, program, **kwargs)
def _optimize_distributed(self, optimize_ops, program, **kwargs):
# remove optimize ops and add a send op to main_program
# FIXME(typhoonzero): delete_op only remove the first accurence,
# need to consider about multiple same optimize op?
for op in optimize_ops:
program.global_block().delete_op(op)
if kwargs.has_key("split_method"):
split_method = kwargs["split_method"]
else:
split_method = distribute_planner.round_robin
assert (callable(split_method))
pserver_endpoints = kwargs["pservers"].split(",")
params = program.global_block().all_parameters()
param_map = split_method(params, pserver_endpoints)
for ep in pserver_endpoints:
# FIXME(typhoonzero): send to different servers can run in parrallel.
send_op = program.global_block().append_op(
type="send",
inputs={"X": param_map[ep]
}, # inputs is a list of tensors to be send
outputs={"Out": param_map[ep]},
attrs={"endpoint": ep})
# -------------- generate pserver program --------------
self.parameter_server_program_map = dict()
optimize_sub_program = Program()
optimize_ops = self.create_optimization_pass(
params_grads, optimize_sub_program, startup_program)
param_list = []
for param in params:
if param.trainable is True:
param_list.append(param)
param_map = split_method(params, pserver_endpoints)
for ep in pserver_endpoints:
pserver_program = Program()
self.parameter_server_program_map[ep] = pserver_program
pserver_program.global_block().append_op(
type="recv",
inputs={"RX": param_map[ep]}, # grads to recv
outputs={},
attrs={
"OptimizeBlock": optimize_sub_program.global_block(),
"endpoint": ep
})
def get_pserver_program(self, endpoint):
pass
def get_trainer_program(self):
return default_main_program()
def aslodtensor(self, data): def aslodtensor(self, data):
def accumulate(data): def accumulate(data):
if not isinstance(data, list): if not isinstance(data, list):
......
...@@ -425,6 +425,9 @@ class Block(object): ...@@ -425,6 +425,9 @@ class Block(object):
self.ops.append(op) self.ops.append(op)
return op return op
def delete_op(self, op):
self.ops.remove(op)
def prepend_op(self, *args, **kwargs): def prepend_op(self, *args, **kwargs):
op_desc = self.desc.prepend_op() op_desc = self.desc.prepend_op()
op = Operator(self, op_desc, *args, **kwargs) op = Operator(self, op_desc, *args, **kwargs)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册