提交 9c0b1cf1 编写于 作者: T typhoonzero

update wip pserver transpile

上级 56e758fc
...@@ -98,8 +98,7 @@ class DistributeTranspiler: ...@@ -98,8 +98,7 @@ class DistributeTranspiler:
# 3. append send_op to trainer. # 3. append send_op to trainer.
# 4. append concat_op to trainer to update local weights. # 4. append concat_op to trainer to update local weights.
# 5. create new program as parameter server. # 5. create new program as parameter server.
# 5. create parameter server program by split_method generated endpoint->VarBlock # 6. create parameter server program by split_method generated endpoint->VarBlock
# 6. run compile time infershape for parameter server program
pserver_endpoints = pservers.split(",") pserver_endpoints = pservers.split(",")
...@@ -124,6 +123,15 @@ class DistributeTranspiler: ...@@ -124,6 +123,15 @@ class DistributeTranspiler:
# let send_op know which endpoint to send which var, eplist is of the same # let send_op know which endpoint to send which var, eplist is of the same
# order of send_inputs. # order of send_inputs.
eplist = split_method(send_inputs, pserver_endpoints) eplist = split_method(send_inputs, pserver_endpoints)
# create mapping of endpoint -> var to create pserver side program
self.param_grad_ep_mapping = dict()
for i, ep in enumerate(eplist):
param = send_outputs[i]
grad = send_inputs[i]
if not self.param_grad_ep_mapping.has_key(ep):
self.param_grad_ep_mapping[ep] = {"params": [], "grads": []}
self.param_grad_ep_mapping[ep]["params"].append(param)
self.param_grad_ep_mapping[ep]["grads"].append(grad)
send_op = program.global_block().append_op( send_op = program.global_block().append_op(
type="send", type="send",
...@@ -235,27 +243,29 @@ class DistributeTranspiler: ...@@ -235,27 +243,29 @@ class DistributeTranspiler:
var_list.append(var_each) var_list.append(var_each)
return var_list return var_list
def get_pserver_program(self, endpoint, optimize_ops): def _append_pserver_ops(self, opt_op, endpoint):
pserver_program = Program() new_inputs = dict()
for v in self.param_grad_map[endpoint]["params"]: for key, var in opt_op.inputs.iteritems():
self._clone_param(pserver_program.global_block(), v) if key == "Grad":
grad_block = None
optimize_sub_program = Program() for g in self.param_grad_ep_mapping[endpoint]["grads"]:
grad_var_names = [ if g.name.startswith(var.name):
var.name for var in self.param_grad_map[endpoint]["grads"] grad_block = g
] break
for opt_op in optimize_ops: if not grad_block:
for _, var in opt_op.inputs.iteritems(): # do not append this op if current endpoint
# NOTE: append operators to merge gradients from multiple # is not dealing with this grad block
# trainers. If trainers == 1, this is not needed. return
if self.trainers > 1 and var.name in grad_var_names: merged_var = optimize_sub_program.global_block().create_var(
name=grad_block.name,
persistable=grad_block.persistable,
dtype=grad_block.dtype,
shape=grad_block.shape)
# append merging ops if trainers > 1
if self.trainers > 1:
vars2merge = self._create_var_for_trainers( vars2merge = self._create_var_for_trainers(
optimize_sub_program.global_block(), var, self.trainers) optimize_sub_program.global_block(), grad_block,
merged_var = optimize_sub_program.global_block().create_var( self.trainers)
name=var.name,
persistable=var.persistable,
dtype=var.dtype,
shape=var.shape)
optimize_sub_program.global_block().append_op( optimize_sub_program.global_block().append_op(
type="sum", type="sum",
inputs={"X": vars2merge}, inputs={"X": vars2merge},
...@@ -265,38 +275,88 @@ class DistributeTranspiler: ...@@ -265,38 +275,88 @@ class DistributeTranspiler:
inputs={"X": merged_var}, inputs={"X": merged_var},
outputs={"Out": merged_var}, outputs={"Out": merged_var},
attrs={"scale": 1.0 / float(self.trainers)}) attrs={"scale": 1.0 / float(self.trainers)})
else: new_inputs[key] = merged_var
optimize_sub_program.global_block().create_var( elif key == "Param":
name=var.name, # param is already created on global program
persistable=var.persistable, param_block = None
dtype=var.dtype, for p in self.param_grad_ep_mapping[endpoint]["params"]:
shape=var.shape) if p.name.startswith(var.name):
param_block = p
break
if not param_block:
return
tmpvar = optimize_sub_program.global_block().create_var(
name=param_block.name,
persistable=param_block.persistable,
dtype=param_block.dtype,
shape=param_block.shape)
new_inputs[key] = tmpvar
else:
tmpvar = optimize_sub_program.global_block().create_var(
name=var.name,
persistable=var.persistable,
dtype=var.dtype,
shape=var.shape)
new_inputs[key] = tmpvar
if opt_op.inputs.has_key("Grad"): # FIXME: change outputs ParamOut
if opt_op.inputs["Grad"].name in grad_var_names: optimize_sub_program.global_block().append_op(
optimize_sub_program.global_block().append_op( type=opt_op.type,
type=opt_op.type, inputs=new_inputs,
inputs=opt_op.inputs, outputs=opt_op.outputs,
outputs=opt_op.outputs, attrs=opt_op.attrs)
attrs=opt_op.attrs)
def _append_pserver_non_opt_ops(self, opt_op):
for _, var in opt_op.inputs.iteritems():
optimize_sub_program.global_block().create_var(
name=var.name,
persistable=var.persistable,
dtype=var.dtype,
shape=var.shape)
optimize_sub_program.global_block().append_op(
type=opt_op.type,
inputs=new_inputs,
outputs=opt_op.outputs,
attrs=opt_op.attrs)
def get_pserver_program(self, endpoint, optimize_ops):
"""
get pserver side program by endpoint
NOTE: assume blocks of the same variable is not distributed
on the same pserver, only change param/grad varnames for
trainers to fetch. For each pserver endpoint, server side
program must be a sub-set of the original optimization program.
"""
# step5
pserver_program = Program()
for v in self.param_grad_ep_mapping[endpoint]["params"]:
self._clone_param(pserver_program.global_block(), v)
# step6
optimize_sub_program = Program()
for opt_op in optimize_ops:
if opt_ops.inputs.has_key("Grad"):
# append optimize_op
self._append_pserver_ops(opt_op, endpoint)
else: else:
optimize_sub_program.global_block().append_op( self._append_pserver_non_opt_ops(opt_op)
type=opt_op.type,
inputs=opt_op.inputs,
outputs=opt_op.outputs,
attrs=opt_op.attrs)
pserver_program.global_block().append_op( pserver_program.global_block().append_op(
type="recv", type="recv",
inputs={"RX": inputs={"RX": self.param_grad_ep_mapping[endpoint]["grads"]
self.param_grad_map[endpoint]["grads"]}, # grads to recv }, # grads to recv
outputs={}, outputs={},
attrs={ attrs={
"OptimizeProgram": optimize_sub_program.desc, "OptimizeProgram": optimize_sub_program.desc,
"endpoint": endpoint, "endpoint": endpoint,
"ParamList": "ParamList": [
[p.name for p in self.param_grad_map[endpoint]["params"]], p.name
"GradList": for p in self.param_grad_ep_mapping[endpoint]["params"]
[p.name for p in self.param_grad_map[endpoint]["grads"]], ],
"GradList": [
p.name
for p in self.param_grad_ep_mapping[endpoint]["grads"]
],
"Trainers": self.trainers "Trainers": self.trainers
}) })
pserver_program.sync_with_cpp() pserver_program.sync_with_cpp()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册