提交 ed55f1b9 编写于 作者: T typhoonzero

transpiler_split_tensor

上级 59116442
from __future__ import print_function
import framework
from framework import Program, default_main_program, Parameter, Variable
import optimizer
from layer_helper import LayerHelper
from distributed_spliter import *
def hash_name_to_server(params_grads, pserver_endpoints):
"""
:param param_grads:
:return: a map of pserver endpoint ->
params -> [param list]
grads -> [grad list]
"""
class VarBlock:
def __init__(self, varname, offset, size):
self.varname = varname
# NOTE: real offset is offset * size
self.offset = offset
self.size = size
def _hash_param(param_name, total):
return hash(param_name) % total
param_grad_map = dict()
for param, grad in params_grads:
if param.trainable is True and grad is not None:
server_id = _hash_param(param.name, len(pserver_endpoints))
server_for_param = pserver_endpoints[server_id]
if not param_grad_map.has_key(server_for_param):
param_grad_map[server_for_param] = {"params": [], "grads": []}
param_grad_map[server_for_param]["params"].append(param)
param_grad_map[server_for_param]["grads"].append(grad)
return param_grad_map
def round_robin(params_grads, pserver_endpoints):
assert (len(params_grads) > len(pserver_endpoints))
param_grad_map = dict()
pserver_idx = 0
for param, grad in params_grads:
if param.trainable is True:
server_for_param = pserver_endpoints[pserver_idx]
if not param_grad_map.has_key(server_for_param):
param_grad_map[server_for_param] = {"params": [], "grads": []}
param_grad_map[server_for_param]["params"].append(param)
param_grad_map[server_for_param]["grads"].append(grad)
pserver_idx += 1
if pserver_idx >= len(pserver_endpoints):
pserver_idx = 0
return param_grad_map
def __str__(self):
return "%s:%d:%d" % (self.varname, self.offset, self.size)
class DistributeTranspiler:
......@@ -58,7 +27,6 @@ class DistributeTranspiler:
split_method=round_robin):
"""
Transpile the program to a distributed data-parallelism programs.
The main_program will be transform to use a remote parameter server
to do parameter optimization. And the optimization graph will be put
in to a parameter server program.
......@@ -66,45 +34,84 @@ class DistributeTranspiler:
Use different methods to split trainable varialbles to different
parameter servers.
Example to run:
exe = fluid.Executor(place)
t = fluid.DistributeTranspiler()
t.transpile(optimize_ops, params_grads, pservers="127.0.0.1:6174", trainers=1)
pserver_endpoint = os.getenv("PSERVER")
if pserver_endpoint:
pserver_prog = t.get_pserver_program(pserver_endpoint, optimize_ops)
exe.run(fluid.default_startup_program())
exe.run(pserver_prog)
else:
feeder = fluid.DataFeeder(feed_list=[images, label], place=place)
exe.run(fluid.default_startup_program())
for pass_id in range(PASS_NUM):
...
:param optimize_ops: op list of optimization, should be the
return value of Optimizer.minimize
:type optimize_ops: list
:param program: program to optimize, default default_main_program
:param pservers: parameter server endpoints like "m1:6174,m2:6174"
:type pservers: string
:return: return a list of programs
"""
assert (callable(split_method))
if program is None:
program = default_main_program()
self.program = program
self.trainers = trainers
self.optimize_ops = optimize_ops
self._optimize_distributed(
optimize_ops,
program,
params_grads,
pservers=pservers,
trainers=trainers,
split_method=split_method)
# steps to transpile:
# 1. split variable to multiple blocks, align by product(dim[1:]) (width).
# 2. modify trainer program add split_op to each Grad.
# 3. append send_op to trainer.
# 4. append concat_op to trainer to update local weights.
# 5. create new program as parameter server.
# 5. create parameter server program by split_method generated endpoint->VarBlock
# 6. run compile time infershape for parameter server program
if kwargs.has_key("split_method"):
split_method = kwargs["split_method"]
else:
split_method = round_robin
pserver_endpoints = kwargs["pservers"].split(",")
grad2param = dict()
for param, grad in params_and_grads:
grad2param[grad.name()] = param.name()
# step1
param_list = [pg[0] for pg in params_and_grads]
grad_list = [pg[1] for pg in params_and_grads]
# TODO: add split selected rows support
grad_blocks = _split_dense_variable(grad_list, len(pserver_endpoints))
param_blocks = _split_dense_variable(param_list, len(pserver_endpoints))
ep2gradblock = split_method(grad_blocks, pserver_endpoints)
# self.param_grad_map
# step2
var2splited = self._split_trainer_vars(program, grad_blocks)
# step3
send_inputs = []
send_outputs = []
for _, splited in var2splited.iteritems():
send_inputs.extend(splited)
send_outputs = self._create_vars_from_blocklist(program, param_blocks)
send_op = program.global_block().append_op(
type="send",
inputs={"X": send_inputs},
outputs={"Out": send_outputs},
attrs={"endpoints": pserver_endpoints,
"epmap": epmap})
def _create_vars_from_blocklist(self, program, block_list):
block_map = dict()
ret_vars = []
for block_str in block_list:
varname, offset, size = block_str.split(":")
if not block_map.has_key(varname):
block_map[varname] = []
block_map[varname].append((long(offset), long(size)))
for varname, splited in block_map.iteritems():
orig_var = program.global_block().vars[varname]
for block in splited:
size = block[1]
var = program.global_block().create_var(
name="%s.block%d" % (varname, i),
psersistable=False,
dtype=orig_var.dtype,
shape=[1, size]) # flattend splited var
ret_vars.append(var)
return ret_vars
def _clone_param(self, block, v):
assert isinstance(v, Parameter)
......@@ -131,32 +138,80 @@ class DistributeTranspiler:
lod_level=var.lod_level,
persistable=var.persistable)
def _optimize_distributed(self, optimize_ops, program, params_and_grads,
**kwargs):
if kwargs.has_key("split_method"):
split_method = kwargs["split_method"]
else:
split_method = round_robin
def _split_dense_variable(self,
var_list,
pserver_count,
min_block_size=1024,
max_block_size=1048576):
"""
We may need to split dense tensor to one or several blocks and put
them equally onto parameter server. One block is a sub-tensor
aligned by dim[0] of the tensor.
We need to have a minimal block size so that the calculations in
the parameter server side can gain better performance. By default
mininum block size is 1024. The max block size is used to prevent
too large block that may causing send error.
"""
block_sizes = []
blocks = []
for grad in var_list:
dim1 = reduce(lambda x, y: x * y, grad.shape[1:])
grad_numel = reduce(lambda x, y: x * y, grad.shape)
if grad_numel < min_block_size:
block_sizes.append(grad_numel)
block_size = grad_numel / min_block_size
if block_size < min_block_size:
block_size = min_block_size
# align by dim1(width)
remains = block_size % dim1
if remains != 0:
block_size += dim1 - remains
block_sizes.append(block_size)
num_blocks = grad_numel / block_size
print("grad numel :%d, blocksize: %d" % grad_numel, block_size)
for block_id in xrange(num_blocks):
block = VarBlock(grad.name(), block_id, block_size)
blocks.append(str(block))
return blocks
assert (callable(split_method))
pserver_endpoints = kwargs["pservers"].split(",")
self.param_grad_map = split_method(params_and_grads, pserver_endpoints)
send_op_ordered_inputs = []
send_op_ordered_outputs = []
epmap = []
for ep, v in self.param_grad_map.iteritems():
send_op_ordered_inputs.extend(v["grads"])
send_op_ordered_outputs.extend(v["params"])
for i in v["grads"]:
epmap.append(ep)
send_op = program.global_block().append_op(
type="send",
inputs={"X": send_op_ordered_inputs
}, # inputs is a list of tensors to be send
outputs={"Out": send_op_ordered_outputs},
attrs={"endpoints": pserver_endpoints,
"epmap": epmap})
def _split_trainer_vars(self, program, gradblocks, params_and_grads):
var2blocks = dict()
splited = dict()
for block_str in gradblocks:
varname, offset, size = block_str.split(":")
if not var2blocks.has_key(varname):
var2blocks[varname] = []
var2blocks[varname].append((long(offset), long(size)))
for varname, blocks in var2blocks.iteritems():
orig_var = program.global_block().vars[varname]
split_outs = []
for i in xrange(len(blocks)):
size = blocks[i][1]
var = program.global_block().create_var(
name="%s.block%d" % (varname, i),
psersistable=False,
dtype=orig_var.dtype,
shape=[1, size]) # flattend splited var
split_outs.append(var)
splited[varname] = split_outs
program.global_block().append_op(
type="split",
inputs={"X": orig_var},
outputs={"Out": split_outs},
attrs={"num": len(blocks)} # assume split evenly
)
return splited
def _concat_trainer_vars(self, program, splited):
for varname, to_merge_list in splited.iteritems():
orig_var = program.global_block().vars[varname]
program.global_block().append_op(
type="concat",
inputs={"X": to_merge_list},
outputs={"Out": orig_var},
attrs={})
def get_trainer_program(self):
# remove optimize ops and add a send op to main_program
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册