未验证 提交 13836b04 编写于 作者: G gongweibao 提交者: GitHub

Change default split minimize value to speed up transport data to pserver. (#10635)

Change default split minimize value to speed up transport data to pserver
上级 d66d8446
...@@ -21,15 +21,7 @@ import random ...@@ -21,15 +21,7 @@ import random
class TestSplitVar(unittest.TestCase): class TestSplitVar(unittest.TestCase):
def test_check_output(self): def check_split_output(self, shapes, expected_sizes, min_size):
# split below shapes to 10 servers
shapes = [[3, 5], [1024], [28, 784], [8, 1020], [800, 10]]
expected_sizes = [
[15], [1024],
[2352, 2352, 2352, 2352, 2352, 2352, 2352, 2352, 2352, 784],
[2040, 2040, 2040, 2040],
[1150, 1150, 1150, 1150, 1150, 1150, 1100]
]
var_list = [] var_list = []
program = fluid.Program() program = fluid.Program()
for shape in shapes: for shape in shapes:
...@@ -39,7 +31,7 @@ class TestSplitVar(unittest.TestCase): ...@@ -39,7 +31,7 @@ class TestSplitVar(unittest.TestCase):
# dtype=core.VarDesc.VarType.LOD_TENSOR, # dtype=core.VarDesc.VarType.LOD_TENSOR,
shape=shape) shape=shape)
var_list.append(var) var_list.append(var)
blocks = split_dense_variable(var_list, 10) blocks = split_dense_variable(var_list, 10, min_size)
all_sizes = [] all_sizes = []
for s in expected_sizes: for s in expected_sizes:
for s2 in s: for s2 in s:
...@@ -48,6 +40,25 @@ class TestSplitVar(unittest.TestCase): ...@@ -48,6 +40,25 @@ class TestSplitVar(unittest.TestCase):
varname, block_id, size = block_str.split(":") varname, block_id, size = block_str.split(":")
self.assertEqual(int(size), all_sizes[i]) self.assertEqual(int(size), all_sizes[i])
def test_1k(self):
shapes = [[3, 5], [1024], [28, 784], [8, 1020], [800, 10]]
expected_sizes = [
[15], [1024],
[2352, 2352, 2352, 2352, 2352, 2352, 2352, 2352, 2352, 784],
[2040, 2040, 2040, 2040],
[1150, 1150, 1150, 1150, 1150, 1150, 1100]
]
self.check_split_output(shapes, expected_sizes, 1024)
def test_check_output_8k(self):
shapes = [[3, 5], [1024], [28, 784], [8, 1020], [800, 10],
[6, 33, 33, 33]]
expected_sizes = [[15], [1024], [10976, 10976], [8160], [8000],
[35937, 35937, 35937, 35937, 35937, 35937]]
self.check_split_output(shapes, expected_sizes, 8192)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
...@@ -93,30 +93,33 @@ def same_or_split_var(p_name, var_name): ...@@ -93,30 +93,33 @@ def same_or_split_var(p_name, var_name):
return p_name == var_name or p_name.startswith(var_name + ".block") return p_name == var_name or p_name.startswith(var_name + ".block")
def split_dense_variable(var_list, def split_dense_variable(var_list, service_count, min_block_size=8192):
pserver_count,
min_block_size=1024,
max_block_size=1048576):
""" """
We may need to split dense tensor to one or more blocks and put We may need to split dense tensor to one or more blocks and put
them equally onto parameter server. One block is a sub-tensor them equally onto parameter server. One block is a sub-tensor
aligned by dim[0] of the tensor. aligned by dim[0] of the tensor.
We need to have a minimal block size so that the calculations in We need to have a minimal block size so that the calculations in
the parameter server side can gain better performance. By default the parameter server side can gain better performance. By default
minimum block size is 1024. The max block size is used to prevent minimum block size 8K elements (maybe 16bit or 32bit or 64bit).
very large blocks that may cause send error.
:return: A list of VarBlocks. Each VarBlock specifies a shard of Args:
the var. var_list (list): List of variables.
service_count (int): Numel of pserver services. A pserver may have two
or more listening ports.
min_block_size (int): Minimum splitted block size.
Returns:
blocks (list[(varname, block_id, current_block_size)]): A list
of VarBlocks. Each VarBlock specifies a shard of the var.
""" """
blocks = [] blocks = []
for var in var_list: for var in var_list:
split_count = pserver_count split_count = service_count
var_numel = reduce(lambda x, y: x * y, var.shape) var_numel = reduce(lambda x, y: x * y, var.shape)
max_pserver_count = int(math.floor(var_numel / float(min_block_size))) max_pserver_count = int(math.floor(var_numel / float(min_block_size)))
if max_pserver_count == 0: if max_pserver_count == 0:
max_pserver_count = 1 max_pserver_count = 1
if max_pserver_count < pserver_count: if max_pserver_count < service_count:
split_count = max_pserver_count split_count = max_pserver_count
block_size = int(math.ceil(var_numel / float(split_count))) block_size = int(math.ceil(var_numel / float(split_count)))
...@@ -270,6 +273,7 @@ class DistributeTranspiler: ...@@ -270,6 +273,7 @@ class DistributeTranspiler:
grad_var_mapping = self._append_split_op(program, grad_blocks) grad_var_mapping = self._append_split_op(program, grad_blocks)
param_var_mapping = self._create_vars_from_blocklist(program, param_var_mapping = self._create_vars_from_blocklist(program,
param_blocks) param_blocks)
# step3: Add gradients as send op inputs and parameters as send # step3: Add gradients as send op inputs and parameters as send
# op outputs. # op outputs.
send_inputs = [] send_inputs = []
...@@ -277,9 +281,11 @@ class DistributeTranspiler: ...@@ -277,9 +281,11 @@ class DistributeTranspiler:
for b in grad_blocks: # append by order for b in grad_blocks: # append by order
varname, block_id, _ = b.split(":") varname, block_id, _ = b.split(":")
send_inputs.append(grad_var_mapping[varname][int(block_id)]) send_inputs.append(grad_var_mapping[varname][int(block_id)])
for b in param_blocks: for b in param_blocks:
varname, block_id, _ = b.split(":") varname, block_id, _ = b.split(":")
send_outputs.append(param_var_mapping[varname][int(block_id)]) send_outputs.append(param_var_mapping[varname][int(block_id)])
# let send_op know which endpoint to send which var to, eplist has the same # let send_op know which endpoint to send which var to, eplist has the same
# order as send_inputs. # order as send_inputs.
eplist = split_method(send_inputs, pserver_endpoints) eplist = split_method(send_inputs, pserver_endpoints)
...@@ -751,9 +757,18 @@ class DistributeTranspiler: ...@@ -751,9 +757,18 @@ class DistributeTranspiler:
Create vars for each split. Create vars for each split.
NOTE: only grads need to be named for different trainers, use NOTE: only grads need to be named for different trainers, use
add_trainer_suffix to rename the grad vars. add_trainer_suffix to rename the grad vars.
:return: A dict mapping from original var name to each var split. Args:
program (ProgramDesc): ProgramDesc which gradients blong.
block_list (list[(varname, block_id, block_size)]): List of gradient blocks.
add_trainer_suffix (Bool): Add trainer suffix to new variable's name if set True.
Returns:
var_mapping (dict(varname->[new_varname_variable])):A dict mapping
from original var name to each var split.
""" """
# varname->[(block_id, current_block_size)]
block_map = dict() block_map = dict()
var_mapping = dict() var_mapping = dict()
for block_str in block_list: for block_str in block_list:
varname, offset, size = block_str.split(":") varname, offset, size = block_str.split(":")
...@@ -824,7 +839,16 @@ class DistributeTranspiler: ...@@ -824,7 +839,16 @@ class DistributeTranspiler:
persistable=persistable) persistable=persistable)
def _append_split_op(self, program, gradblocks): def _append_split_op(self, program, gradblocks):
# Split variables that need to be split and append respective ops """
Split variables that need to be split and append respective ops
Args:
program (ProgramDesc): ProgramDesc that gradients blong.
gradblocks (list[(varname, block_id, block_size)]): List of gradient blocks.
Returns:
var_mapping (dict(varname->[new_splitted_variable])):A dict mapping
from original var name to each var split.
"""
add_suffix = False add_suffix = False
if self.trainer_num > 1: if self.trainer_num > 1:
add_suffix = True add_suffix = True
...@@ -1148,6 +1172,12 @@ class DistributeTranspiler: ...@@ -1148,6 +1172,12 @@ class DistributeTranspiler:
return lr_ops return lr_ops
def _get_optimize_pass(self): def _get_optimize_pass(self):
"""
Get optimizer operators, paramters and gradients from origin_program
Returns:
opt_ops (list): optimize operators.
params_grads (dict): paramter->gradient.
"""
block = self.origin_program.global_block() block = self.origin_program.global_block()
opt_ops = [] opt_ops = []
params_grads = [] params_grads = []
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册