未验证 提交 8d253e49 编写于 作者: 武毅 提交者: GitHub

Merge pull request #7249 from typhoonzero/transpiler_split_tensor

Feature/transpiler split tensor to multiple pservers
...@@ -18,6 +18,7 @@ from param_attr import ParamAttr ...@@ -18,6 +18,7 @@ from param_attr import ParamAttr
from data_feeder import DataFeeder from data_feeder import DataFeeder
from core import LoDTensor, CPUPlace, CUDAPlace from core import LoDTensor, CPUPlace, CUDAPlace
from distribute_transpiler import DistributeTranspiler from distribute_transpiler import DistributeTranspiler
from distribute_transpiler_simple import SimpleDistributeTranspiler
import clip import clip
from memory_optimization_transpiler import memory_optimize from memory_optimization_transpiler import memory_optimize
...@@ -37,6 +38,7 @@ __all__ = framework.__all__ + executor.__all__ + [ ...@@ -37,6 +38,7 @@ __all__ = framework.__all__ + executor.__all__ + [
'ParamAttr' 'ParamAttr'
'DataFeeder', 'DataFeeder',
'clip', 'clip',
'SimpleDistributeTranspiler',
'DistributeTranspiler', 'DistributeTranspiler',
'memory_optimize', 'memory_optimize',
] ]
......
from __future__ import print_function
import framework import framework
from framework import Program, default_main_program, Parameter, Variable from framework import Program, default_main_program, Parameter, Variable
import optimizer import optimizer
from layer_helper import LayerHelper from layer_helper import LayerHelper
from distributed_spliter import *
import math
def hash_name_to_server(params_grads, pserver_endpoints): class VarBlock:
""" def __init__(self, varname, offset, size):
:param param_grads: self.varname = varname
:return: a map of pserver endpoint -> # NOTE: real offset is offset * size
params -> [param list] self.offset = offset
grads -> [grad list] self.size = size
"""
def _hash_param(param_name, total):
return hash(param_name) % total
param_grad_map = dict() def __str__(self):
for param, grad in params_grads: return "%s:%d:%d" % (self.varname, self.offset, self.size)
if param.trainable is True and grad is not None:
server_id = _hash_param(param.name, len(pserver_endpoints))
server_for_param = pserver_endpoints[server_id]
if not param_grad_map.has_key(server_for_param):
param_grad_map[server_for_param] = {"params": [], "grads": []}
param_grad_map[server_for_param]["params"].append(param)
param_grad_map[server_for_param]["grads"].append(grad)
return param_grad_map
def split_dense_variable(var_list,
pserver_count,
min_block_size=1024,
max_block_size=1048576):
"""
We may need to split dense tensor to one or several blocks and put
them equally onto parameter server. One block is a sub-tensor
aligned by dim[0] of the tensor.
def round_robin(params_grads, pserver_endpoints): We need to have a minimal block size so that the calculations in
assert (len(params_grads) > len(pserver_endpoints)) the parameter server side can gain better performance. By default
mininum block size is 1024. The max block size is used to prevent
param_grad_map = dict() too large block that may causing send error.
pserver_idx = 0 """
for param, grad in params_grads: blocks = []
if param.trainable is True: for var in var_list:
server_for_param = pserver_endpoints[pserver_idx] split_count = pserver_count
if not param_grad_map.has_key(server_for_param): var_numel = reduce(lambda x, y: x * y, var.shape)
param_grad_map[server_for_param] = {"params": [], "grads": []} max_pserver_count = int(math.floor(var_numel / float(min_block_size)))
if max_pserver_count == 0:
param_grad_map[server_for_param]["params"].append(param) max_pserver_count = 1
param_grad_map[server_for_param]["grads"].append(grad) if max_pserver_count < pserver_count:
split_count = max_pserver_count
block_size = int(math.ceil(var_numel / float(split_count)))
pserver_idx += 1 if len(var.shape) >= 2:
if pserver_idx >= len(pserver_endpoints): # align by dim1(width)
pserver_idx = 0 dim1 = reduce(lambda x, y: x * y, var.shape[1:])
return param_grad_map remains = block_size % dim1
if remains != 0:
block_size += dim1 - remains
# update split_count after align
split_count = int(math.ceil(var_numel / float(block_size)))
for block_id in xrange(split_count):
curr_block_size = min(block_size, var_numel - (
(block_id) * block_size))
block = VarBlock(var.name, block_id, curr_block_size)
blocks.append(str(block))
return blocks
class DistributeTranspiler: class DistributeTranspiler:
...@@ -58,7 +69,6 @@ class DistributeTranspiler: ...@@ -58,7 +69,6 @@ class DistributeTranspiler:
split_method=round_robin): split_method=round_robin):
""" """
Transpile the program to a distributed data-parallelism programs. Transpile the program to a distributed data-parallelism programs.
The main_program will be transform to use a remote parameter server The main_program will be transform to use a remote parameter server
to do parameter optimization. And the optimization graph will be put to do parameter optimization. And the optimization graph will be put
in to a parameter server program. in to a parameter server program.
...@@ -66,60 +76,113 @@ class DistributeTranspiler: ...@@ -66,60 +76,113 @@ class DistributeTranspiler:
Use different methods to split trainable varialbles to different Use different methods to split trainable varialbles to different
parameter servers. parameter servers.
Example to run:
exe = fluid.Executor(place)
t = fluid.DistributeTranspiler()
t.transpile(optimize_ops, params_grads, pservers="127.0.0.1:6174", trainers=1)
pserver_endpoint = os.getenv("PSERVER")
if pserver_endpoint:
pserver_prog = t.get_pserver_program(pserver_endpoint, optimize_ops)
exe.run(fluid.default_startup_program())
exe.run(pserver_prog)
else:
feeder = fluid.DataFeeder(feed_list=[images, label], place=place)
exe.run(fluid.default_startup_program())
for pass_id in range(PASS_NUM):
...
:param optimize_ops: op list of optimization, should be the :param optimize_ops: op list of optimization, should be the
return value of Optimizer.minimize return value of Optimizer.minimize
:type optimize_ops: list :type optimize_ops: list
:param program: program to optimize, default default_main_program :param program: program to optimize, default default_main_program
:param pservers: parameter server endpoints like "m1:6174,m2:6174" :param pservers: parameter server endpoints like "m1:6174,m2:6174"
:type pservers: string :type pservers: string
:return: return a list of programs :return: return a list of programs
""" """
assert (callable(split_method))
if program is None: if program is None:
program = default_main_program() program = default_main_program()
self.program = program self.program = program
self.trainers = trainers self.trainers = trainers
self.optimize_ops = optimize_ops self.optimize_ops = optimize_ops
self._optimize_distributed( # steps to transpile:
optimize_ops, # 1. split variable to multiple blocks, align by product(dim[1:]) (width).
program, # 2. modify trainer program add split_op to each Grad.
params_grads, # 3. append send_op to trainer.
pservers=pservers, # 4. append concat_op to trainer to update local weights.
trainers=trainers, # 5. create new program as parameter server.
split_method=split_method) # 6. create parameter server program by split_method generated endpoint->VarBlock
def _clone_param(self, block, v): pserver_endpoints = pservers.split(",")
assert isinstance(v, Parameter)
new_p = Parameter( # step1
block=block, param_list = [pg[0] for pg in params_grads]
shape=v.shape, grad_list = [pg[1] for pg in params_grads]
dtype=v.dtype, # TODO: add split selected rows support
type=v.type, grad_blocks = split_dense_variable(grad_list, len(pserver_endpoints))
lod_level=v.lod_level, param_blocks = split_dense_variable(param_list, len(pserver_endpoints))
stop_gradient=v.stop_gradient, # step2
trainable=v.trainable, grad_var_mapping = self._append_split_op(program, grad_blocks)
optimize_attr=v.optimize_attr,
regularizer=v.regularizer, # step3
name=v.name) send_inputs = []
block.vars[new_p.name] = new_p send_outputs = []
for b in grad_blocks: # append by order
varname, block_id, _ = b.split(":")
send_inputs.append(grad_var_mapping[varname][int(block_id)])
param_var_mapping = self._create_vars_from_blocklist(program,
param_blocks)
for b in param_blocks:
varname, block_id, _ = b.split(":")
send_outputs.append(param_var_mapping[varname][int(block_id)])
# let send_op know which endpoint to send which var, eplist is of the same
# order of send_inputs.
eplist = split_method(send_inputs, pserver_endpoints)
# create mapping of endpoint -> splited var to create pserver side program
self.param_grad_ep_mapping = dict()
for i, ep in enumerate(eplist):
param = send_outputs[i]
grad = send_inputs[i]
if not self.param_grad_ep_mapping.has_key(ep):
self.param_grad_ep_mapping[ep] = {"params": [], "grads": []}
self.param_grad_ep_mapping[ep]["params"].append(param)
self.param_grad_ep_mapping[ep]["grads"].append(grad)
send_op = program.global_block().append_op(
type="send",
inputs={"X": send_inputs},
outputs={"Out": send_outputs},
attrs={"endpoints": pserver_endpoints,
"epmap": eplist})
# step4
for varname, splited_var in param_var_mapping.iteritems():
if len(splited_var) <= 1:
continue
orig_param = program.global_block().vars[varname]
concat = program.global_block().append_op(
type="concat",
inputs={"X": splited_var},
outputs={"Out": [orig_param]},
attrs={"axis": 0})
def _create_vars_from_blocklist(self, program, block_list):
block_map = dict()
var_mapping = dict()
for block_str in block_list:
varname, offset, size = block_str.split(":")
if not block_map.has_key(varname):
block_map[varname] = []
block_map[varname].append((long(offset), long(size)))
for varname, splited in block_map.iteritems():
orig_var = program.global_block().vars[varname]
var_mapping[varname] = []
if len(splited) == 1:
var_mapping[varname] = [orig_var]
continue
orig_shape = orig_var.shape
orig_dim1_flatten = 1
if len(orig_shape) >= 2:
orig_dim1_flatten = reduce(lambda x, y: x * y, orig_shape[1:])
for i, block in enumerate(splited):
size = block[1]
rows = size / orig_dim1_flatten
splited_shape = [rows]
if len(orig_shape) >= 2:
splited_shape.extend(orig_shape[1:])
var = program.global_block().create_var(
name="%s.block%d" % (varname, i),
psersistable=False,
dtype=orig_var.dtype,
shape=splited_shape) # flattend splited var
var_mapping[varname].append(var)
return var_mapping
def _clone_var(self, block, var): def _clone_var(self, block, var):
assert isinstance(var, Variable) assert isinstance(var, Variable)
...@@ -129,34 +192,27 @@ class DistributeTranspiler: ...@@ -129,34 +192,27 @@ class DistributeTranspiler:
dtype=var.dtype, dtype=var.dtype,
type=var.type, type=var.type,
lod_level=var.lod_level, lod_level=var.lod_level,
persistable=var.persistable) # HACK: let all param in pserver persistable so child
# program in recv can get them
persistable=True)
def _optimize_distributed(self, optimize_ops, program, params_and_grads, def _append_split_op(self, program, gradblocks):
**kwargs): var_mapping = self._create_vars_from_blocklist(program, gradblocks)
if kwargs.has_key("split_method"): for varname, splited_vars in var_mapping.iteritems():
split_method = kwargs["split_method"] # variable that don't need to split have empty splited_vars
else: if len(splited_vars) <= 1:
split_method = round_robin continue
orig_var = program.global_block().vars[varname]
assert (callable(split_method)) sections = []
pserver_endpoints = kwargs["pservers"].split(",") for v in splited_vars:
self.param_grad_map = split_method(params_and_grads, pserver_endpoints) sections.append(v.shape[0])
program.global_block().append_op(
send_op_ordered_inputs = [] type="split",
send_op_ordered_outputs = [] inputs={"X": orig_var},
epmap = [] outputs={"Out": splited_vars},
for ep, v in self.param_grad_map.iteritems(): attrs={"sections": sections} # assume split evenly
send_op_ordered_inputs.extend(v["grads"]) )
send_op_ordered_outputs.extend(v["params"]) return var_mapping
for i in v["grads"]:
epmap.append(ep)
send_op = program.global_block().append_op(
type="send",
inputs={"X": send_op_ordered_inputs
}, # inputs is a list of tensors to be send
outputs={"Out": send_op_ordered_outputs},
attrs={"endpoints": pserver_endpoints,
"epmap": epmap})
def get_trainer_program(self): def get_trainer_program(self):
# remove optimize ops and add a send op to main_program # remove optimize ops and add a send op to main_program
...@@ -174,69 +230,267 @@ class DistributeTranspiler: ...@@ -174,69 +230,267 @@ class DistributeTranspiler:
var_list.append(var_each) var_list.append(var_each)
return var_list return var_list
def get_pserver_program(self, endpoint, optimize_ops): def _get_optimizer_input_shape(self, op_type, varkey, orig_shape,
pserver_program = Program() param_shape):
for v in self.param_grad_map[endpoint]["params"]: """
self._clone_param(pserver_program.global_block(), v) Returns the shape for optimizer inputs that need to be reshaped when
Param and Grad is splited to multiple servers.
"""
# HACK(typhoonzero): Should use functions of corresponding optimizer in
# optimizer.py to get the shape, do not bind this in the transpiler.
if op_type == "adam":
if varkey in ["Moment1", "Moment2"]:
return param_shape
elif op_type == "adagrad":
if varkey == "Moment":
return param_shape
elif op_type == "adamax":
if varkey in ["Moment", "InfNorm"]:
return param_shape
elif op_type == "momentum":
if varkey == "Velocity":
return param_shape
elif op_type == "":
if varkey == "Moment":
return param_shape
elif op_type == "sgd":
pass
return orig_shape
optimize_sub_program = Program() def _is_op_on_pserver(self, endpoint, all_ops, idx):
grad_var_names = [ """
var.name for var in self.param_grad_map[endpoint]["grads"] Recursively check if the op need to run on current server.
Assume that ops are in the execution order.
"""
param_names = [
p.name for p in self.param_grad_ep_mapping[endpoint]["params"]
] ]
for opt_op in optimize_ops: op = all_ops[idx]
for _, var in opt_op.inputs.iteritems(): if op.inputs.has_key("Param"):
# NOTE: append operators to merge gradients from multiple if op.inputs["Param"].name in param_names:
# trainers. If trainers == 1, this is not needed. return True
if self.trainers > 1 and var.name in grad_var_names: else:
for n in param_names:
if n.startswith(op.inputs["Param"].name+".block") and \
n != op.inputs["Param"].name:
return True
return False
else:
j = idx - 1
while j >= 0:
prev_op = all_ops[j]
prev_output_names = [o.name for o in prev_op.outputs.values()]
prev_input_names = [o.name for o in prev_op.inputs.values()]
found1 = False
found2 = False
for _, v in op.inputs.iteritems():
if v.name in prev_output_names:
found1 = self._is_op_on_pserver(endpoint, all_ops, j)
# later ops may produce output for prev op's next batch use.
for _, v in op.outputs.iteritems():
if v.name in prev_input_names:
found2 = self._is_op_on_pserver(endpoint, all_ops, j)
if found1 or found2:
return True
j -= 1
return False
def _append_pserver_ops(self, program, pserver_program, opt_op, endpoint):
new_inputs = dict()
# update param/grad shape first, then other inputs like
# moment can use the updated shape
for key, var in opt_op.inputs.iteritems():
if key == "Grad":
grad_block = None
for g in self.param_grad_ep_mapping[endpoint]["grads"]:
if g.name.startswith(var.name):
grad_block = g
break
if not grad_block:
# do not append this op if current endpoint
# is not dealing with this grad block
return
merged_var = program.global_block().create_var(
name=grad_block.name,
persistable=grad_block.persistable,
dtype=grad_block.dtype,
shape=grad_block.shape)
# append merging ops if trainers > 1
if self.trainers > 1:
vars2merge = self._create_var_for_trainers( vars2merge = self._create_var_for_trainers(
optimize_sub_program.global_block(), var, self.trainers) program.global_block(), grad_block, self.trainers)
merged_var = optimize_sub_program.global_block().create_var( program.global_block().append_op(
name=var.name,
persistable=var.persistable,
dtype=var.dtype,
shape=var.shape)
optimize_sub_program.global_block().append_op(
type="sum", type="sum",
inputs={"X": vars2merge}, inputs={"X": vars2merge},
outputs={"Out": merged_var}) outputs={"Out": merged_var})
optimize_sub_program.global_block().append_op( program.global_block().append_op(
type="scale", type="scale",
inputs={"X": merged_var}, inputs={"X": merged_var},
outputs={"Out": merged_var}, outputs={"Out": merged_var},
attrs={"scale": 1.0 / float(self.trainers)}) attrs={"scale": 1.0 / float(self.trainers)})
else: new_inputs[key] = merged_var
optimize_sub_program.global_block().create_var( elif key == "Param":
# param is already created on global program
param_block = None
for p in self.param_grad_ep_mapping[endpoint]["params"]:
if p.name.startswith(var.name):
param_block = p
break
if not param_block:
return
tmpvar = program.global_block().create_var(
name=param_block.name,
persistable=True,
dtype=param_block.dtype,
shape=param_block.shape)
new_inputs[key] = tmpvar
for key, var in opt_op.inputs.iteritems():
if key in ["Param", "Grad"]:
continue
# update accumulator variable shape
param_shape = new_inputs["Param"].shape
new_shape = self._get_optimizer_input_shape(opt_op.type, key,
var.shape, param_shape)
tmpvar = program.global_block().create_var(
name=var.name, name=var.name,
persistable=var.persistable, persistable=var.persistable,
dtype=var.dtype, dtype=var.dtype,
shape=var.shape) shape=new_shape)
new_inputs[key] = tmpvar
# create var in pserver program global block.
# TODO(typhoonzero): put blocks in one program to avoid create two
# variables.
pserver_program.global_block().create_var(
name=var.name,
persistable=var.persistable,
dtype=var.dtype,
shape=new_shape)
if opt_op.inputs.has_key("Grad"): # change outputs ParamOut variable
if opt_op.inputs["Grad"].name in grad_var_names: opt_op.outputs["ParamOut"] = new_inputs["Param"]
optimize_sub_program.global_block().append_op( program.global_block().append_op(
type=opt_op.type, type=opt_op.type,
inputs=opt_op.inputs, inputs=new_inputs,
outputs=opt_op.outputs, outputs=opt_op.outputs,
attrs=opt_op.attrs) attrs=opt_op.attrs)
else:
optimize_sub_program.global_block().append_op( def _append_pserver_non_opt_ops(self, program, pserver_program, opt_op):
for _, var in opt_op.inputs.iteritems():
program.global_block().create_var(
name=var.name,
persistable=var.persistable,
dtype=var.dtype,
shape=var.shape)
pserver_program.global_block().create_var(
name=var.name,
persistable=var.persistable,
dtype=var.dtype,
shape=var.shape)
program.global_block().append_op(
type=opt_op.type, type=opt_op.type,
inputs=opt_op.inputs, inputs=opt_op.inputs,
outputs=opt_op.outputs, outputs=opt_op.outputs,
attrs=opt_op.attrs) attrs=opt_op.attrs)
def get_pserver_program(self, endpoint, optimize_ops):
"""
get pserver side program by endpoint
NOTE: assume blocks of the same variable is not distributed
on the same pserver, only change param/grad varnames for
trainers to fetch. For each pserver endpoint, server side
program must be a sub-set of the original optimization program.
"""
# step5
pserver_program = Program()
for v in self.param_grad_ep_mapping[endpoint]["params"]:
self._clone_var(pserver_program.global_block(), v)
# step6
optimize_sub_program = Program()
for idx, opt_op in enumerate(optimize_ops):
is_op_on_pserver = self._is_op_on_pserver(endpoint, optimize_ops,
idx)
if not is_op_on_pserver:
continue
if opt_op.inputs.has_key("Grad"):
self._append_pserver_ops(optimize_sub_program, pserver_program,
opt_op, endpoint)
else:
self._append_pserver_non_opt_ops(optimize_sub_program,
pserver_program, opt_op)
pserver_program.global_block().append_op( pserver_program.global_block().append_op(
type="recv", type="recv",
inputs={"RX": inputs={"RX": self.param_grad_ep_mapping[endpoint]["grads"]
self.param_grad_map[endpoint]["grads"]}, # grads to recv }, # grads to recv
outputs={}, outputs={},
attrs={ attrs={
"OptimizeProgram": optimize_sub_program.desc, "OptimizeProgram": optimize_sub_program.desc,
"endpoint": endpoint, "endpoint": endpoint,
"ParamList": "ParamList": [
[p.name for p in self.param_grad_map[endpoint]["params"]], p.name
"GradList": for p in self.param_grad_ep_mapping[endpoint]["params"]
[p.name for p in self.param_grad_map[endpoint]["grads"]], ],
"GradList": [
p.name
for p in self.param_grad_ep_mapping[endpoint]["grads"]
],
"Trainers": self.trainers "Trainers": self.trainers
}) })
pserver_program.sync_with_cpp() pserver_program.sync_with_cpp()
return pserver_program return pserver_program
def get_startup_program(self, endpoint, pserver_program):
"""
Get startup program for current parameter server.
Modify operator input variables if there are variables that
was splited to several blocks.
"""
s_prog = Program()
orig_s_prog = framework.default_startup_program()
params = self.param_grad_ep_mapping[endpoint]["params"]
def _get_splited_name_and_shape(varname):
for idx, splited_param in enumerate(params):
pname = splited_param.name
if pname.startswith(varname) and varname != pname:
return pname, splited_param.shape
return "", []
# 1. create vars in pserver program to startup program
pserver_vars = pserver_program.global_block().vars
created_var_map = dict()
for _, var in pserver_vars.iteritems():
tmpvar = s_prog.global_block().create_var(
name=var.name,
persistable=var.persistable,
dtype=var.dtype,
shape=var.shape)
created_var_map[var.name] = tmpvar
# 2. rename op outputs
for op in orig_s_prog.global_block().ops:
new_outputs = dict()
# do not append startup op if var is not on this pserver
op_on_pserver = False
for key, var in op.outputs.iteritems():
newname, _ = _get_splited_name_and_shape(var.name)
if newname:
op_on_pserver = True
new_outputs[key] = created_var_map[newname]
elif var.name in pserver_vars:
op_on_pserver = True
new_outputs[key] = pserver_vars[var.name]
if op_on_pserver:
if op.type in [
"gaussian_random", "fill_constant", "uniform_random"
]:
op.attrs["shape"] = new_outputs["Out"].shape
s_prog.global_block().append_op(
type=op.type,
inputs=op.inputs,
outputs=new_outputs,
attrs=op.attrs)
return s_prog
import framework
from framework import Program, default_main_program, Parameter, Variable
import optimizer
from layer_helper import LayerHelper
def hash_name_to_server(params_grads, pserver_endpoints):
"""
:param param_grads:
:return: a map of pserver endpoint ->
params -> [param list]
grads -> [grad list]
"""
def _hash_param(param_name, total):
return hash(param_name) % total
param_grad_map = dict()
for param, grad in params_grads:
if param.trainable is True and grad is not None:
server_id = _hash_param(param.name, len(pserver_endpoints))
server_for_param = pserver_endpoints[server_id]
if not param_grad_map.has_key(server_for_param):
param_grad_map[server_for_param] = {"params": [], "grads": []}
param_grad_map[server_for_param]["params"].append(param)
param_grad_map[server_for_param]["grads"].append(grad)
return param_grad_map
def round_robin(params_grads, pserver_endpoints):
assert (len(params_grads) > len(pserver_endpoints))
param_grad_map = dict()
pserver_idx = 0
for param, grad in params_grads:
if param.trainable is True:
server_for_param = pserver_endpoints[pserver_idx]
if not param_grad_map.has_key(server_for_param):
param_grad_map[server_for_param] = {"params": [], "grads": []}
param_grad_map[server_for_param]["params"].append(param)
param_grad_map[server_for_param]["grads"].append(grad)
pserver_idx += 1
if pserver_idx >= len(pserver_endpoints):
pserver_idx = 0
return param_grad_map
class SimpleDistributeTranspiler:
def transpile(self,
optimize_ops,
params_grads,
program=None,
pservers="127.0.0.1:6174",
trainers=1,
split_method=round_robin):
"""
Transpile the program to a distributed data-parallelism programs.
The main_program will be transform to use a remote parameter server
to do parameter optimization. And the optimization graph will be put
in to a parameter server program.
Use different methods to split trainable varialbles to different
parameter servers.
Example to run:
exe = fluid.Executor(place)
t = fluid.DistributeTranspiler()
t.transpile(optimize_ops, params_grads, pservers="127.0.0.1:6174", trainers=1)
pserver_endpoint = os.getenv("PSERVER")
if pserver_endpoint:
pserver_prog = t.get_pserver_program(pserver_endpoint, optimize_ops)
exe.run(fluid.default_startup_program())
exe.run(pserver_prog)
else:
feeder = fluid.DataFeeder(feed_list=[images, label], place=place)
exe.run(fluid.default_startup_program())
for pass_id in range(PASS_NUM):
...
:param optimize_ops: op list of optimization, should be the
return value of Optimizer.minimize
:type optimize_ops: list
:param program: program to optimize, default default_main_program
:param pservers: parameter server endpoints like "m1:6174,m2:6174"
:type pservers: string
:return: return a list of programs
"""
if program is None:
program = default_main_program()
self.program = program
self.trainers = trainers
self.optimize_ops = optimize_ops
self._optimize_distributed(
optimize_ops,
program,
params_grads,
pservers=pservers,
trainers=trainers,
split_method=split_method)
def _clone_param(self, block, v):
assert isinstance(v, Parameter)
new_p = Parameter(
block=block,
shape=v.shape,
dtype=v.dtype,
type=v.type,
lod_level=v.lod_level,
stop_gradient=v.stop_gradient,
trainable=v.trainable,
optimize_attr=v.optimize_attr,
regularizer=v.regularizer,
name=v.name)
block.vars[new_p.name] = new_p
def _clone_var(self, block, var):
assert isinstance(var, Variable)
return block.create_var(
name=var.name,
shape=var.shape,
dtype=var.dtype,
type=var.type,
lod_level=var.lod_level,
persistable=var.persistable)
def _optimize_distributed(self, optimize_ops, program, params_and_grads,
**kwargs):
if kwargs.has_key("split_method"):
split_method = kwargs["split_method"]
else:
split_method = round_robin
assert (callable(split_method))
pserver_endpoints = kwargs["pservers"].split(",")
self.param_grad_map = split_method(params_and_grads, pserver_endpoints)
send_op_ordered_inputs = []
send_op_ordered_outputs = []
epmap = []
for ep, v in self.param_grad_map.iteritems():
send_op_ordered_inputs.extend(v["grads"])
send_op_ordered_outputs.extend(v["params"])
for i in v["grads"]:
epmap.append(ep)
send_op = program.global_block().append_op(
type="send",
inputs={"X": send_op_ordered_inputs
}, # inputs is a list of tensors to be send
outputs={"Out": send_op_ordered_outputs},
attrs={"endpoints": pserver_endpoints,
"epmap": epmap})
def get_trainer_program(self):
# remove optimize ops and add a send op to main_program
self.program.global_block().delete_ops(self.optimize_ops)
return self.program
def _create_var_for_trainers(self, block, var, trainers):
var_list = []
for i in xrange(trainers):
var_each = block.create_var(
name="%s.trainer_%d" % (var.name, i),
psersistable=var.persistable,
dtype=var.dtype,
shape=var.shape)
var_list.append(var_each)
return var_list
def get_pserver_program(self, endpoint, optimize_ops):
pserver_program = Program()
for v in self.param_grad_map[endpoint]["params"]:
self._clone_param(pserver_program.global_block(), v)
optimize_sub_program = Program()
grad_var_names = [
var.name for var in self.param_grad_map[endpoint]["grads"]
]
for opt_op in optimize_ops:
for _, var in opt_op.inputs.iteritems():
# NOTE: append operators to merge gradients from multiple
# trainers. If trainers == 1, this is not needed.
if self.trainers > 1 and var.name in grad_var_names:
vars2merge = self._create_var_for_trainers(
optimize_sub_program.global_block(), var, self.trainers)
merged_var = optimize_sub_program.global_block().create_var(
name=var.name,
persistable=var.persistable,
dtype=var.dtype,
shape=var.shape)
optimize_sub_program.global_block().append_op(
type="sum",
inputs={"X": vars2merge},
outputs={"Out": merged_var})
optimize_sub_program.global_block().append_op(
type="scale",
inputs={"X": merged_var},
outputs={"Out": merged_var},
attrs={"scale": 1.0 / float(self.trainers)})
else:
optimize_sub_program.global_block().create_var(
name=var.name,
persistable=var.persistable,
dtype=var.dtype,
shape=var.shape)
if opt_op.inputs.has_key("Grad"):
if opt_op.inputs["Grad"].name in grad_var_names:
optimize_sub_program.global_block().append_op(
type=opt_op.type,
inputs=opt_op.inputs,
outputs=opt_op.outputs,
attrs=opt_op.attrs)
else:
optimize_sub_program.global_block().append_op(
type=opt_op.type,
inputs=opt_op.inputs,
outputs=opt_op.outputs,
attrs=opt_op.attrs)
pserver_program.global_block().append_op(
type="recv",
inputs={"RX":
self.param_grad_map[endpoint]["grads"]}, # grads to recv
outputs={},
attrs={
"OptimizeProgram": optimize_sub_program.desc,
"endpoint": endpoint,
"ParamList":
[p.name for p in self.param_grad_map[endpoint]["params"]],
"GradList":
[p.name for p in self.param_grad_map[endpoint]["grads"]],
"Trainers": self.trainers
})
pserver_program.sync_with_cpp()
return pserver_program
def hash_name(varlist, pserver_endpoints):
"""
hash variable names to several endpoints.
:param varlist: a list of Variables
:return: a map of pserver endpoint -> varname
"""
def _hash_block(block_str, total):
return hash(block_str) % total
eplist = []
for var in varlist:
server_id = _hash_block(var.name(), len(pserver_endpoints))
server_for_param = pserver_endpoints[server_id]
eplist.append(server_for_param)
return eplist
def round_robin(varlist, pserver_endpoints):
"""
distribute variables to several endpoints.
"""
assert (len(varlist) > len(pserver_endpoints))
eplist = []
pserver_idx = 0
for var in varlist:
server_for_param = pserver_endpoints[pserver_idx]
eplist.append(server_for_param)
pserver_idx += 1
if pserver_idx >= len(pserver_endpoints):
pserver_idx = 0
return eplist
...@@ -5,3 +5,4 @@ foreach(src ${TEST_OPS}) ...@@ -5,3 +5,4 @@ foreach(src ${TEST_OPS})
endforeach() endforeach()
add_subdirectory(book) add_subdirectory(book)
add_subdirectory(book_distribute)
file(GLOB TEST_OPS RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "test_*.py")
string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}")
foreach(src ${TEST_OPS})
py_test(${src} SRCS ${src}.py)
endforeach()
import math
import unittest
from paddle.v2.fluid.distribute_transpiler import split_dense_variable
import paddle.v2.fluid as fluid
import paddle.v2.fluid.core as core
import random
class TestSplitVar(unittest.TestCase):
def test_check_output(self):
# split below shapes to 10 servers
shapes = [[3, 5], [1024], [28, 784], [8, 1020], [800, 10]]
expected_sizes = [
[15], [1024],
[2352, 2352, 2352, 2352, 2352, 2352, 2352, 2352, 2352, 784],
[2040, 2040, 2040, 2040],
[1150, 1150, 1150, 1150, 1150, 1150, 1100]
]
var_list = []
program = fluid.Program()
for shape in shapes:
var = program.global_block().create_var(
name=str(random.randint(10000, 99999)),
persistable=True,
# dtype=core.VarDesc.VarType.LOD_TENSOR,
shape=shape)
var_list.append(var)
blocks = split_dense_variable(var_list, 10)
all_sizes = []
for s in expected_sizes:
for s2 in s:
all_sizes.append(s2)
for i, block_str in enumerate(blocks):
varname, block_id, size = block_str.split(":")
self.assertEqual(int(size), all_sizes[i])
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册