提交 f35c5606 编写于 作者: T typhoonzero

split tensor to pservers

上级 c70ea1cc
...@@ -4,6 +4,7 @@ from framework import Program, default_main_program, Parameter, Variable ...@@ -4,6 +4,7 @@ from framework import Program, default_main_program, Parameter, Variable
import optimizer import optimizer
from layer_helper import LayerHelper from layer_helper import LayerHelper
from distributed_spliter import * from distributed_spliter import *
import math
class VarBlock: class VarBlock:
...@@ -17,6 +18,47 @@ class VarBlock: ...@@ -17,6 +18,47 @@ class VarBlock:
return "%s:%d:%d" % (self.varname, self.offset, self.size) return "%s:%d:%d" % (self.varname, self.offset, self.size)
def split_dense_variable(var_list,
pserver_count,
min_block_size=1024,
max_block_size=1048576):
"""
We may need to split dense tensor to one or several blocks and put
them equally onto parameter server. One block is a sub-tensor
aligned by dim[0] of the tensor.
We need to have a minimal block size so that the calculations in
the parameter server side can gain better performance. By default
mininum block size is 1024. The max block size is used to prevent
too large block that may causing send error.
"""
blocks = []
for var in var_list:
split_count = pserver_count
var_numel = reduce(lambda x, y: x * y, var.shape)
max_pserver_count = int(math.floor(var_numel / float(min_block_size)))
if max_pserver_count == 0:
max_pserver_count = 1
if max_pserver_count < pserver_count:
split_count = max_pserver_count
block_size = int(math.ceil(var_numel / float(split_count)))
if len(var.shape) >= 2:
# align by dim1(width)
dim1 = reduce(lambda x, y: x * y, var.shape[1:])
remains = block_size % dim1
if remains != 0:
block_size += dim1 - remains
# update split_count after align
split_count = int(math.ceil(var_numel / float(block_size)))
for block_id in xrange(split_count):
curr_block_size = min(block_size, var_numel - (
(block_id) * block_size))
block = VarBlock(var.name, block_id, curr_block_size)
blocks.append(str(block))
return blocks
class DistributeTranspiler: class DistributeTranspiler:
def transpile(self, def transpile(self,
optimize_ops, optimize_ops,
...@@ -57,43 +99,49 @@ class DistributeTranspiler: ...@@ -57,43 +99,49 @@ class DistributeTranspiler:
# 5. create parameter server program by split_method generated endpoint->VarBlock # 5. create parameter server program by split_method generated endpoint->VarBlock
# 6. run compile time infershape for parameter server program # 6. run compile time infershape for parameter server program
if kwargs.has_key("split_method"): pserver_endpoints = pservers.split(",")
split_method = kwargs["split_method"]
else:
split_method = round_robin
pserver_endpoints = kwargs["pservers"].split(",")
grad2param = dict()
for param, grad in params_and_grads:
grad2param[grad.name()] = param.name()
# step1 # step1
param_list = [pg[0] for pg in params_and_grads] param_list = [pg[0] for pg in params_grads]
grad_list = [pg[1] for pg in params_and_grads] grad_list = [pg[1] for pg in params_grads]
# TODO: add split selected rows support # TODO: add split selected rows support
grad_blocks = _split_dense_variable(grad_list, len(pserver_endpoints)) grad_blocks = split_dense_variable(grad_list, len(pserver_endpoints))
param_blocks = _split_dense_variable(param_list, len(pserver_endpoints)) param_blocks = split_dense_variable(param_list, len(pserver_endpoints))
ep2gradblock = split_method(grad_blocks, pserver_endpoints)
# self.param_grad_map
# step2 # step2
var2splited = self._split_trainer_vars(program, grad_blocks) grad_var_mapping = self._append_split_op(program, grad_blocks)
# step3 # step3
send_inputs = [] send_inputs = []
for _, splited in var2splited.iteritems(): send_outputs = []
for _, splited in grad_var_mapping.iteritems():
send_inputs.extend(splited) send_inputs.extend(splited)
send_outputs = self._create_vars_from_blocklist(program, param_blocks) param_var_mapping = self._create_vars_from_blocklist(program,
param_blocks)
for _, splited in param_var_mapping.iteritems():
send_outputs.extend(splited)
# let send_op know which endpoint to send which var, eplist is of the same
# order of send_inputs.
eplist = split_method(send_inputs, pserver_endpoints)
send_op = program.global_block().append_op( send_op = program.global_block().append_op(
type="send", type="send",
inputs={"X": send_inputs}, inputs={"X": send_inputs},
outputs={"Out": send_outputs}, outputs={"Out": send_outputs},
attrs={"endpoints": pserver_endpoints, attrs={"endpoints": pserver_endpoints,
"epmap": epmap}) "epmap": eplist})
# step4
for varname, splited_var in param_var_mapping.iteritems():
orig_param = program.global_block().vars[varname]
concat = program.global_block().append_op(
type="concat",
inputs={"X": send_outputs},
outputs={"Out": orig_param},
attrs={"axis": 0})
def _create_vars_from_blocklist(self, program, block_list): def _create_vars_from_blocklist(self, program, block_list):
block_map = dict() block_map = dict()
ret_vars = [] var_mapping = dict()
for block_str in block_list: for block_str in block_list:
varname, offset, size = block_str.split(":") varname, offset, size = block_str.split(":")
if not block_map.has_key(varname): if not block_map.has_key(varname):
...@@ -102,15 +150,26 @@ class DistributeTranspiler: ...@@ -102,15 +150,26 @@ class DistributeTranspiler:
for varname, splited in block_map.iteritems(): for varname, splited in block_map.iteritems():
orig_var = program.global_block().vars[varname] orig_var = program.global_block().vars[varname]
for block in splited: orig_shape = orig_var.shape
orig_dim1_flatten = 1
if len(orig_shape) >= 2:
orig_dim1_flatten = reduce(lambda x, y: x * y, orig_shape[1:])
var_list = []
for i, block in enumerate(splited):
size = block[1] size = block[1]
rows = size / orig_dim1_flatten
splited_shape = [rows]
if len(orig_shape) >= 2:
splited_shape.extend(orig_shape[1:])
print("block, splited shape:", block, splited_shape)
var = program.global_block().create_var( var = program.global_block().create_var(
name="%s.block%d" % (varname, i), name="%s.block%d" % (varname, i),
psersistable=False, psersistable=False,
dtype=orig_var.dtype, dtype=orig_var.dtype,
shape=[1, size]) # flattend splited var shape=splited_shape) # flattend splited var
ret_vars.append(var) var_list.append(var)
return ret_vars var_mapping[varname] = var_list
return var_mapping
def _clone_param(self, block, v): def _clone_param(self, block, v):
assert isinstance(v, Parameter) assert isinstance(v, Parameter)
...@@ -137,80 +196,22 @@ class DistributeTranspiler: ...@@ -137,80 +196,22 @@ class DistributeTranspiler:
lod_level=var.lod_level, lod_level=var.lod_level,
persistable=var.persistable) persistable=var.persistable)
def _split_dense_variable(self, def _append_split_op(self, program, gradblocks):
var_list, var_mapping = self._create_vars_from_blocklist(program, gradblocks)
pserver_count, for varname, splited_vars in var_mapping.iteritems():
min_block_size=1024, if len(splited_vars) == 1:
max_block_size=1048576): continue
"""
We may need to split dense tensor to one or several blocks and put
them equally onto parameter server. One block is a sub-tensor
aligned by dim[0] of the tensor.
We need to have a minimal block size so that the calculations in
the parameter server side can gain better performance. By default
mininum block size is 1024. The max block size is used to prevent
too large block that may causing send error.
"""
block_sizes = []
blocks = []
for grad in var_list:
dim1 = reduce(lambda x, y: x * y, grad.shape[1:])
grad_numel = reduce(lambda x, y: x * y, grad.shape)
if grad_numel < min_block_size:
block_sizes.append(grad_numel)
block_size = grad_numel / min_block_size
if block_size < min_block_size:
block_size = min_block_size
# align by dim1(width)
remains = block_size % dim1
if remains != 0:
block_size += dim1 - remains
block_sizes.append(block_size)
num_blocks = grad_numel / block_size
print("grad numel :%d, blocksize: %d" % grad_numel, block_size)
for block_id in xrange(num_blocks):
block = VarBlock(grad.name(), block_id, block_size)
blocks.append(str(block))
return blocks
def _split_trainer_vars(self, program, gradblocks, params_and_grads):
var2blocks = dict()
splited = dict()
for block_str in gradblocks:
varname, offset, size = block_str.split(":")
if not var2blocks.has_key(varname):
var2blocks[varname] = []
var2blocks[varname].append((long(offset), long(size)))
for varname, blocks in var2blocks.iteritems():
orig_var = program.global_block().vars[varname] orig_var = program.global_block().vars[varname]
split_outs = [] sections = []
for i in xrange(len(blocks)): for v in splited_vars:
size = blocks[i][1] sections.append(v.shape[0])
var = program.global_block().create_var(
name="%s.block%d" % (varname, i),
psersistable=False,
dtype=orig_var.dtype,
shape=[1, size]) # flattend splited var
split_outs.append(var)
splited[varname] = split_outs
program.global_block().append_op( program.global_block().append_op(
type="split", type="split",
inputs={"X": orig_var}, inputs={"X": orig_var},
outputs={"Out": split_outs}, outputs={"Out": splited_vars},
attrs={"num": len(blocks)} # assume split evenly attrs={"sections": sections} # assume split evenly
) )
return splited return var_mapping
def _concat_trainer_vars(self, program, splited):
for varname, to_merge_list in splited.iteritems():
orig_var = program.global_block().vars[varname]
program.global_block().append_op(
type="concat",
inputs={"X": to_merge_list},
outputs={"Out": orig_var},
attrs={})
def get_trainer_program(self): def get_trainer_program(self):
# remove optimize ops and add a send op to main_program # remove optimize ops and add a send op to main_program
......
import framework
from framework import Program, default_main_program, Parameter, Variable
import optimizer
from layer_helper import LayerHelper
def hash_name_to_server(params_grads, pserver_endpoints):
"""
:param param_grads:
:return: a map of pserver endpoint ->
params -> [param list]
grads -> [grad list]
"""
def _hash_param(param_name, total):
return hash(param_name) % total
param_grad_map = dict()
for param, grad in params_grads:
if param.trainable is True and grad is not None:
server_id = _hash_param(param.name, len(pserver_endpoints))
server_for_param = pserver_endpoints[server_id]
if not param_grad_map.has_key(server_for_param):
param_grad_map[server_for_param] = {"params": [], "grads": []}
param_grad_map[server_for_param]["params"].append(param)
param_grad_map[server_for_param]["grads"].append(grad)
return param_grad_map
def round_robin(params_grads, pserver_endpoints):
assert (len(params_grads) > len(pserver_endpoints))
param_grad_map = dict()
pserver_idx = 0
for param, grad in params_grads:
if param.trainable is True:
server_for_param = pserver_endpoints[pserver_idx]
if not param_grad_map.has_key(server_for_param):
param_grad_map[server_for_param] = {"params": [], "grads": []}
param_grad_map[server_for_param]["params"].append(param)
param_grad_map[server_for_param]["grads"].append(grad)
pserver_idx += 1
if pserver_idx >= len(pserver_endpoints):
pserver_idx = 0
return param_grad_map
class DistributeTranspiler:
def transpile(self,
optimize_ops,
params_grads,
program=None,
pservers="127.0.0.1:6174",
trainers=1,
split_method=round_robin):
"""
Transpile the program to a distributed data-parallelism programs.
The main_program will be transform to use a remote parameter server
to do parameter optimization. And the optimization graph will be put
in to a parameter server program.
Use different methods to split trainable varialbles to different
parameter servers.
Example to run:
exe = fluid.Executor(place)
t = fluid.DistributeTranspiler()
t.transpile(optimize_ops, params_grads, pservers="127.0.0.1:6174", trainers=1)
pserver_endpoint = os.getenv("PSERVER")
if pserver_endpoint:
pserver_prog = t.get_pserver_program(pserver_endpoint, optimize_ops)
exe.run(fluid.default_startup_program())
exe.run(pserver_prog)
else:
feeder = fluid.DataFeeder(feed_list=[images, label], place=place)
exe.run(fluid.default_startup_program())
for pass_id in range(PASS_NUM):
...
:param optimize_ops: op list of optimization, should be the
return value of Optimizer.minimize
:type optimize_ops: list
:param program: program to optimize, default default_main_program
:param pservers: parameter server endpoints like "m1:6174,m2:6174"
:type pservers: string
:return: return a list of programs
"""
if program is None:
program = default_main_program()
self.program = program
self.trainers = trainers
self.optimize_ops = optimize_ops
self._optimize_distributed(
optimize_ops,
program,
params_grads,
pservers=pservers,
trainers=trainers,
split_method=split_method)
def _clone_param(self, block, v):
assert isinstance(v, Parameter)
new_p = Parameter(
block=block,
shape=v.shape,
dtype=v.dtype,
type=v.type,
lod_level=v.lod_level,
stop_gradient=v.stop_gradient,
trainable=v.trainable,
optimize_attr=v.optimize_attr,
regularizer=v.regularizer,
name=v.name)
block.vars[new_p.name] = new_p
def _clone_var(self, block, var):
assert isinstance(var, Variable)
return block.create_var(
name=var.name,
shape=var.shape,
dtype=var.dtype,
type=var.type,
lod_level=var.lod_level,
persistable=var.persistable)
def _optimize_distributed(self, optimize_ops, program, params_and_grads,
**kwargs):
if kwargs.has_key("split_method"):
split_method = kwargs["split_method"]
else:
split_method = round_robin
assert (callable(split_method))
pserver_endpoints = kwargs["pservers"].split(",")
self.param_grad_map = split_method(params_and_grads, pserver_endpoints)
send_op_ordered_inputs = []
send_op_ordered_outputs = []
epmap = []
for ep, v in self.param_grad_map.iteritems():
send_op_ordered_inputs.extend(v["grads"])
send_op_ordered_outputs.extend(v["params"])
for i in v["grads"]:
epmap.append(ep)
send_op = program.global_block().append_op(
type="send",
inputs={"X": send_op_ordered_inputs
}, # inputs is a list of tensors to be send
outputs={"Out": send_op_ordered_outputs},
attrs={"endpoints": pserver_endpoints,
"epmap": epmap})
def get_trainer_program(self):
# remove optimize ops and add a send op to main_program
self.program.global_block().delete_ops(self.optimize_ops)
return self.program
def _create_var_for_trainers(self, block, var, trainers):
var_list = []
for i in xrange(trainers):
var_each = block.create_var(
name="%s.trainer_%d" % (var.name, i),
psersistable=var.persistable,
dtype=var.dtype,
shape=var.shape)
var_list.append(var_each)
return var_list
def get_pserver_program(self, endpoint, optimize_ops):
pserver_program = Program()
for v in self.param_grad_map[endpoint]["params"]:
self._clone_param(pserver_program.global_block(), v)
optimize_sub_program = Program()
grad_var_names = [
var.name for var in self.param_grad_map[endpoint]["grads"]
]
for opt_op in optimize_ops:
for _, var in opt_op.inputs.iteritems():
# NOTE: append operators to merge gradients from multiple
# trainers. If trainers == 1, this is not needed.
if self.trainers > 1 and var.name in grad_var_names:
vars2merge = self._create_var_for_trainers(
optimize_sub_program.global_block(), var, self.trainers)
merged_var = optimize_sub_program.global_block().create_var(
name=var.name,
persistable=var.persistable,
dtype=var.dtype,
shape=var.shape)
optimize_sub_program.global_block().append_op(
type="sum",
inputs={"X": vars2merge},
outputs={"Out": merged_var})
optimize_sub_program.global_block().append_op(
type="scale",
inputs={"X": merged_var},
outputs={"Out": merged_var},
attrs={"scale": 1.0 / float(self.trainers)})
else:
optimize_sub_program.global_block().create_var(
name=var.name,
persistable=var.persistable,
dtype=var.dtype,
shape=var.shape)
if opt_op.inputs.has_key("Grad"):
if opt_op.inputs["Grad"].name in grad_var_names:
optimize_sub_program.global_block().append_op(
type=opt_op.type,
inputs=opt_op.inputs,
outputs=opt_op.outputs,
attrs=opt_op.attrs)
else:
optimize_sub_program.global_block().append_op(
type=opt_op.type,
inputs=opt_op.inputs,
outputs=opt_op.outputs,
attrs=opt_op.attrs)
pserver_program.global_block().append_op(
type="recv",
inputs={"RX":
self.param_grad_map[endpoint]["grads"]}, # grads to recv
outputs={},
attrs={
"OptimizeProgram": optimize_sub_program.desc,
"endpoint": endpoint,
"ParamList":
[p.name for p in self.param_grad_map[endpoint]["params"]],
"GradList":
[p.name for p in self.param_grad_map[endpoint]["grads"]],
"Trainers": self.trainers
})
pserver_program.sync_with_cpp()
return pserver_program
def hash_name(varblocks, pserver_endpoints): def hash_name(varlist, pserver_endpoints):
""" """
:param varblocks: a list of VarBlock string indicating hash variable names to several endpoints.
sub blocks of variables
:return: a map of pserver endpoint -> varblock_str :param varlist: a list of Variables
:return: a map of pserver endpoint -> varname
""" """
def _hash_block(block_str, total): def _hash_block(block_str, total):
return hash(block_str) % total return hash(block_str) % total
ep2block = dict() eplist = []
for varblock_str in varblocks: for var in varlist:
if param.trainable is True and grad is not None: server_id = _hash_block(var.name(), len(pserver_endpoints))
server_id = _hash_block(varblock_str, len(pserver_endpoints)) server_for_param = pserver_endpoints[server_id]
server_for_param = pserver_endpoints[server_id] eplist.append(server_for_param)
if not ep2block.has_key(server_for_param): return eplist
ep2block[server_for_param] = []
ep2block[server_for_param].append(varblock_str)
return ep2block
def round_robin(varblocks, pserver_endpoints): def round_robin(varlist, pserver_endpoints):
assert (len(varblocks) > len(pserver_endpoints)) """
distribute variables to several endpoints.
"""
assert (len(varlist) > len(pserver_endpoints))
ep2block = dict() eplist = []
pserver_idx = 0 pserver_idx = 0
for varblock_str in varblocks: for var in varlist:
if param.trainable is True: server_for_param = pserver_endpoints[pserver_idx]
server_for_param = pserver_endpoints[pserver_idx] eplist.append(server_for_param)
if not ep2block.has_key(server_for_param):
ep2block[server_for_param] = []
ep2block[server_for_param].append(varblock_str)
pserver_idx += 1 pserver_idx += 1
if pserver_idx >= len(pserver_endpoints): if pserver_idx >= len(pserver_endpoints):
pserver_idx = 0 pserver_idx = 0
return ep2block return eplist
import math
import unittest
from paddle.v2.fluid.distribute_transpiler import split_dense_variable
import paddle.v2.fluid as fluid
import random
class TestSplitVar(unittest.TestCase):
def test_check_output(self):
# split below shapes to 10 servers
shapes = [[3, 5], [1024], [28, 784], [8, 1020], [800, 10]]
expected_sizes = [
[15], [1024],
[2352, 2352, 2352, 2352, 2352, 2352, 2352, 2352, 2352, 784],
[2040, 2040, 2040, 2040],
[1150, 1150, 1150, 1150, 1150, 1150, 1100]
]
var_list = []
program = fluid.Program()
for shape in shapes:
var = program.global_block().create_var(
name=str(random.randint(10000)),
persistable=True,
dtype=core.VarDesc.VarType.LOD_TENSOR,
shape=shape)
var_list.append(var)
blocks = split_dense_variable(var_list, 10)
all_sizes = []
for s in expected_sizes:
for s2 in s:
all_sizes.append(s2)
for i, block_str in enumerate(blocks):
varname, block_id, size = block_str.split(":")
self.assertEqual(int(size), all_sizes[i])
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册