未验证 提交 23f5c182 编写于 作者: K kavyasrinet 提交者: GitHub

Fixed few comments in transpiler (#7748)

* Updating the cluster trainign doc

* Fixed few comments of transpiler

* Adding few explanations
上级 b7eeef24
...@@ -38,14 +38,14 @@ def split_dense_variable(var_list, ...@@ -38,14 +38,14 @@ def split_dense_variable(var_list,
min_block_size=1024, min_block_size=1024,
max_block_size=1048576): max_block_size=1048576):
""" """
We may need to split dense tensor to one or several blocks and put We may need to split dense tensor to one or more blocks and put
them equally onto parameter server. One block is a sub-tensor them equally onto parameter server. One block is a sub-tensor
aligned by dim[0] of the tensor. aligned by dim[0] of the tensor.
We need to have a minimal block size so that the calculations in We need to have a minimal block size so that the calculations in
the parameter server side can gain better performance. By default the parameter server side can gain better performance. By default
mininum block size is 1024. The max block size is used to prevent minimum block size is 1024. The max block size is used to prevent
too large block that may causing send error. very large blocks that may cause send error.
""" """
blocks = [] blocks = []
for var in var_list: for var in var_list:
...@@ -64,7 +64,7 @@ def split_dense_variable(var_list, ...@@ -64,7 +64,7 @@ def split_dense_variable(var_list,
remains = block_size % dim1 remains = block_size % dim1
if remains != 0: if remains != 0:
block_size += dim1 - remains block_size += dim1 - remains
# update split_count after align # update split_count after aligning
split_count = int(math.ceil(var_numel / float(block_size))) split_count = int(math.ceil(var_numel / float(block_size)))
for block_id in xrange(split_count): for block_id in xrange(split_count):
curr_block_size = min(block_size, var_numel - ( curr_block_size = min(block_size, var_numel - (
...@@ -83,18 +83,18 @@ class DistributeTranspiler: ...@@ -83,18 +83,18 @@ class DistributeTranspiler:
trainers=1, trainers=1,
split_method=round_robin): split_method=round_robin):
""" """
Transpile the program to a distributed data-parallelism programs. Transpile the program to distributed data-parallelism programs.
The main_program will be transform to use a remote parameter server The main_program will be transformed to use a remote parameter server
to do parameter optimization. And the optimization graph will be put to do parameter optimization. And the optimization graph will be put
in to a parameter server program. into a parameter server program.
Use different methods to split trainable varialbles to different Use different methods to split trainable variables to different
parameter servers. parameter servers.
:param optimize_ops: op list of optimization, should be the :param optimize_ops: op list of optimization, should be the
return value of Optimizer.minimize return value of Optimizer.minimize
:type optimize_ops: list :type optimize_ops: list
:param program: program to optimize, default default_main_program :param program: program to optimize, default is default_main_program
:param pservers: parameter server endpoints like "m1:6174,m2:6174" :param pservers: parameter server endpoints like "m1:6174,m2:6174"
:type pservers: string :type pservers: string
:return: return a list of programs :return: return a list of programs
...@@ -106,11 +106,11 @@ class DistributeTranspiler: ...@@ -106,11 +106,11 @@ class DistributeTranspiler:
self.trainers = trainers self.trainers = trainers
self.optimize_ops = optimize_ops self.optimize_ops = optimize_ops
# steps to transpile: # steps to transpile:
# 1. split variable to multiple blocks, align by product(dim[1:]) (width). # 1. split variable to multiple blocks, aligned by product(dim[1:]) (width).
# 2. modify trainer program add split_op to each Grad. # 2. modify trainer program add split_op to each Grad.
# 3. append send_op to trainer. # 3. append send_op to trainer.
# 4. append concat_op to trainer to update local weights. # 4. append concat_op to trainer to update local weights.
# 5. create new program as parameter server. # 5. create new program for parameter server.
# 6. create parameter server program by split_method generated endpoint->VarBlock # 6. create parameter server program by split_method generated endpoint->VarBlock
pserver_endpoints = pservers.split(",") pserver_endpoints = pservers.split(",")
...@@ -136,10 +136,10 @@ class DistributeTranspiler: ...@@ -136,10 +136,10 @@ class DistributeTranspiler:
for b in param_blocks: for b in param_blocks:
varname, block_id, _ = b.split(":") varname, block_id, _ = b.split(":")
send_outputs.append(param_var_mapping[varname][int(block_id)]) send_outputs.append(param_var_mapping[varname][int(block_id)])
# let send_op know which endpoint to send which var, eplist is of the same # let send_op know which endpoint to send which var to, eplist has the same
# order of send_inputs. # order as send_inputs.
eplist = split_method(send_inputs, pserver_endpoints) eplist = split_method(send_inputs, pserver_endpoints)
# create mapping of endpoint -> splited var to create pserver side program # create mapping of endpoint -> split var to create pserver side program
self.param_grad_ep_mapping = dict() self.param_grad_ep_mapping = dict()
for i, ep in enumerate(eplist): for i, ep in enumerate(eplist):
param = send_outputs[i] param = send_outputs[i]
...@@ -149,6 +149,7 @@ class DistributeTranspiler: ...@@ -149,6 +149,7 @@ class DistributeTranspiler:
self.param_grad_ep_mapping[ep]["params"].append(param) self.param_grad_ep_mapping[ep]["params"].append(param)
self.param_grad_ep_mapping[ep]["grads"].append(grad) self.param_grad_ep_mapping[ep]["grads"].append(grad)
# create send_op
send_op = program.global_block().append_op( send_op = program.global_block().append_op(
type="send", type="send",
inputs={"X": send_inputs}, inputs={"X": send_inputs},
...@@ -167,6 +168,7 @@ class DistributeTranspiler: ...@@ -167,6 +168,7 @@ class DistributeTranspiler:
attrs={"axis": 0}) attrs={"axis": 0})
def _create_vars_from_blocklist(self, program, block_list): def _create_vars_from_blocklist(self, program, block_list):
# Create respective variables using the block_list
block_map = dict() block_map = dict()
var_mapping = dict() var_mapping = dict()
for block_str in block_list: for block_str in block_list:
...@@ -207,11 +209,12 @@ class DistributeTranspiler: ...@@ -207,11 +209,12 @@ class DistributeTranspiler:
dtype=var.dtype, dtype=var.dtype,
type=var.type, type=var.type,
lod_level=var.lod_level, lod_level=var.lod_level,
# HACK: let all param in pserver persistable so child # HACK: let all param in pserver be persistable so the child
# program in recv can get them # program in recv can get them
persistable=True) persistable=True)
def _append_split_op(self, program, gradblocks): def _append_split_op(self, program, gradblocks):
# Split variables that need to be split and append respective ops
var_mapping = self._create_vars_from_blocklist(program, gradblocks) var_mapping = self._create_vars_from_blocklist(program, gradblocks)
for varname, splited_vars in var_mapping.iteritems(): for varname, splited_vars in var_mapping.iteritems():
# variable that don't need to split have empty splited_vars # variable that don't need to split have empty splited_vars
...@@ -248,6 +251,7 @@ class DistributeTranspiler: ...@@ -248,6 +251,7 @@ class DistributeTranspiler:
return self.program return self.program
def _create_var_for_trainers(self, block, var, trainers): def _create_var_for_trainers(self, block, var, trainers):
# For each trainer, create the necessary variables
var_list = [] var_list = []
for i in xrange(trainers): for i in xrange(trainers):
var_each = block.create_var( var_each = block.create_var(
...@@ -262,7 +266,7 @@ class DistributeTranspiler: ...@@ -262,7 +266,7 @@ class DistributeTranspiler:
param_shape): param_shape):
""" """
Returns the shape for optimizer inputs that need to be reshaped when Returns the shape for optimizer inputs that need to be reshaped when
Param and Grad is splited to multiple servers. Param and Grad is split to multiple servers.
""" """
# HACK(typhoonzero): Should use functions of corresponding optimizer in # HACK(typhoonzero): Should use functions of corresponding optimizer in
# optimizer.py to get the shape, do not bind this in the transpiler. # optimizer.py to get the shape, do not bind this in the transpiler.
...@@ -396,7 +400,7 @@ class DistributeTranspiler: ...@@ -396,7 +400,7 @@ class DistributeTranspiler:
dtype=var.dtype, dtype=var.dtype,
shape=new_shape) shape=new_shape)
# change outputs ParamOut variable # change output's ParamOut variable
opt_op.outputs["ParamOut"] = new_inputs["Param"] opt_op.outputs["ParamOut"] = new_inputs["Param"]
program.global_block().append_op( program.global_block().append_op(
type=opt_op.type, type=opt_op.type,
...@@ -405,6 +409,7 @@ class DistributeTranspiler: ...@@ -405,6 +409,7 @@ class DistributeTranspiler:
attrs=opt_op.attrs) attrs=opt_op.attrs)
def _append_pserver_non_opt_ops(self, program, pserver_program, opt_op): def _append_pserver_non_opt_ops(self, program, pserver_program, opt_op):
# Append the ops for parameters that do not need to be optimized/updated
for _, var in opt_op.inputs.iteritems(): for _, var in opt_op.inputs.iteritems():
program.global_block().create_var( program.global_block().create_var(
name=var.name, name=var.name,
...@@ -424,7 +429,7 @@ class DistributeTranspiler: ...@@ -424,7 +429,7 @@ class DistributeTranspiler:
def get_pserver_program(self, endpoint): def get_pserver_program(self, endpoint):
""" """
get pserver side program by endpoint Get pserver side program using the endpoint
NOTE: assume blocks of the same variable is not distributed NOTE: assume blocks of the same variable is not distributed
on the same pserver, only change param/grad varnames for on the same pserver, only change param/grad varnames for
...@@ -450,6 +455,7 @@ class DistributeTranspiler: ...@@ -450,6 +455,7 @@ class DistributeTranspiler:
shape=v.shape) shape=v.shape)
# step6 # step6
optimize_sub_program = Program() optimize_sub_program = Program()
# Iterate through the ops and append ops as needed
for idx, opt_op in enumerate(self.optimize_ops): for idx, opt_op in enumerate(self.optimize_ops):
is_op_on_pserver = self._is_op_on_pserver(endpoint, is_op_on_pserver = self._is_op_on_pserver(endpoint,
self.optimize_ops, idx) self.optimize_ops, idx)
...@@ -461,6 +467,7 @@ class DistributeTranspiler: ...@@ -461,6 +467,7 @@ class DistributeTranspiler:
else: else:
self._append_pserver_non_opt_ops(optimize_sub_program, self._append_pserver_non_opt_ops(optimize_sub_program,
pserver_program, opt_op) pserver_program, opt_op)
# Append the recv op
pserver_program.global_block().append_op( pserver_program.global_block().append_op(
type="recv", type="recv",
inputs={"RX": self.param_grad_ep_mapping[endpoint]["grads"] inputs={"RX": self.param_grad_ep_mapping[endpoint]["grads"]
...@@ -486,7 +493,7 @@ class DistributeTranspiler: ...@@ -486,7 +493,7 @@ class DistributeTranspiler:
""" """
Get startup program for current parameter server. Get startup program for current parameter server.
Modify operator input variables if there are variables that Modify operator input variables if there are variables that
was splited to several blocks. were split to several blocks.
""" """
s_prog = Program() s_prog = Program()
orig_s_prog = framework.default_startup_program() orig_s_prog = framework.default_startup_program()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册