diff --git a/python/paddle/fluid/__init__.py b/python/paddle/fluid/__init__.py index 859605d005328c030980a49a349742772de1cb6d..d53a96a7a79456d1f3ba640b1cbab6cc314e4d24 100644 --- a/python/paddle/fluid/__init__.py +++ b/python/paddle/fluid/__init__.py @@ -44,8 +44,8 @@ import transpiler from param_attr import ParamAttr, WeightNormParamAttr from data_feeder import DataFeeder from core import LoDTensor, CPUPlace, CUDAPlace, CUDAPinnedPlace -from transpiler import DistributeTranspiler, SimpleDistributeTranspiler, \ - InferenceTranspiler, memory_optimize, release_memory +from transpiler import DistributeTranspiler, InferenceTranspiler, \ + memory_optimize, release_memory from concurrency import (Go, make_channel, channel_send, channel_recv, channel_close, Select) from lod_tensor import create_lod_tensor, create_random_int_lodtensor diff --git a/python/paddle/fluid/tests/unittests/test_dist_transpiler.py b/python/paddle/fluid/tests/unittests/test_dist_transpiler.py index fa49bd41a5876847d046682dce5c3d3868a18500..32647f9aa81431a3ecc798df6f1360a14fd978af 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_transpiler.py +++ b/python/paddle/fluid/tests/unittests/test_dist_transpiler.py @@ -12,40 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -import unittest - import paddle.fluid as fluid -import paddle.fluid.core as core -import paddle.fluid.layers as layers from paddle.fluid.transpiler.distribute_transpiler import delete_ops -import numpy + +from transpiler_test import TranspilerTest -class TestDistTranspiler(unittest.TestCase): +class TestDistTranspiler(TranspilerTest): def setUp(self): - self.trainer_id = 0 - self.trainers = 2 - self.pservers = 2 - self.pserver_eps = "127.0.0.1:6174,127.0.0.1:6175" self.current_pserver_ep = "127.0.0.1:6174" - def net_conf(self): - x = fluid.layers.data(name='x', shape=[1000], dtype='float32') - - y_predict = fluid.layers.fc(input=x, - size=1000, - act=None, - param_attr=fluid.ParamAttr(name='fc_w')) - - y = fluid.layers.data(name='y', shape=[1], dtype='float32') - - cost = fluid.layers.square_error_cost(input=y_predict, label=y) - avg_cost = fluid.layers.mean(cost) - sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.1) - - optimize_ops, params_grads = sgd_optimizer.minimize(avg_cost) - return optimize_ops, params_grads - def test_transpiler(self): trainer = self.get_trainer() pserver, startup = self.get_pserver(self.current_pserver_ep) @@ -70,14 +46,6 @@ class TestDistTranspiler(unittest.TestCase): fc_w_var = startup.global_block().var("fc_w.block1") self.assertEqual(fc_w_var.shape, (500, 1000)) - def get_main_program(self): - main = fluid.Program() - - with fluid.program_guard(main): - self.net_conf() - - return main - def get_expect_trainer_ops(self): trainer = fluid.Program() @@ -92,25 +60,6 @@ class TestDistTranspiler(unittest.TestCase): ops.insert(ops.index("elementwise_add_grad") + 1, "send_vars") return ops - def get_trainer(self): - return self._transpiler_instance().get_trainer_program() - - def get_pserver(self, ep): - t = self._transpiler_instance() - pserver = t.get_pserver_program(ep) - startup = t.get_startup_program(ep, pserver) - return pserver, startup - - def _transpiler_instance(self): - main = self.get_main_program() - t = fluid.DistributeTranspiler() - t.transpile( - self.trainer_id, - program=main, - pservers=self.pserver_eps, - trainers=self.trainers) - return t - if __name__ == "__main__": unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_simple_dist_transpiler.py b/python/paddle/fluid/tests/unittests/test_simple_dist_transpiler.py new file mode 100644 index 0000000000000000000000000000000000000000..5ae2844e295194f95701e1cdccd43bf919bf964f --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_simple_dist_transpiler.py @@ -0,0 +1,80 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np + +import paddle.fluid as fluid +from paddle.fluid.transpiler.distribute_transpiler import delete_ops + +from transpiler_test import TranspilerTest + + +class TestSimpleDistTranspiler(TranspilerTest): + def setUp(self): + self.current_pserver_ep = "127.0.0.1:6175" + + def test_simple_transpiler(self): + np.random.seed(1) + + trainer = self.get_trainer() + pserver, startup = self.get_pserver(self.current_pserver_ep) + self.assertEqual([op.type for op in trainer.global_block().ops], + self.get_expect_trainer_ops()) + + self.assertEqual(len(pserver.blocks), 2) + # block0: listen_and_serv + self.assertEqual([op.type for op in pserver.blocks[0].ops], + ["listen_and_serv"]) + # block1: optimize pass + self.assertEqual([op.type for op in pserver.blocks[1].ops], + ["sum", "scale", "sgd"]) + + # confirm startup program + self.assertEqual([op.type for op in startup.global_block().ops], + ["fill_constant", "uniform_random", "uniform_random"]) + + # the variable #fc_w will NOT be splited + fc_w_var = startup.global_block().var("fc_w@GRAD") + self.assertEqual(fc_w_var.shape, (1000, 1000)) + + fc_w_var = startup.global_block().var("fc_w@GRAD.trainer_0") + self.assertEqual(fc_w_var.shape, (1000, 1000)) + + def get_expect_trainer_ops(self): + trainer = fluid.Program() + + with fluid.program_guard(trainer): + optimize_ops, params_grads = self.net_conf() + + delete_ops(trainer.global_block(), optimize_ops) + ops = [op.type for op in trainer.global_block().ops] + [ + "send_vars", "send_barrier", "recv", "recv", "fetch_barrier" + ] + ops.insert(ops.index("elementwise_add_grad") + 1, "send_vars") + return ops + + def _transpiler_instance(self): + main = self.get_main_program() + t = fluid.DistributeTranspiler() + t.transpile( + self.trainer_id, + program=main, + pservers=self.pserver_eps, + trainers=self.trainers, + slice_var_up=False) + return t + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_split_var.py b/python/paddle/fluid/tests/unittests/test_slice_var.py similarity index 85% rename from python/paddle/fluid/tests/unittests/test_split_var.py rename to python/paddle/fluid/tests/unittests/test_slice_var.py index 157def9b56e44092a86023035d1ab444c938aa07..82305b23a1a1e2cee8cef6b291d848581fe5b509 100644 --- a/python/paddle/fluid/tests/unittests/test_split_var.py +++ b/python/paddle/fluid/tests/unittests/test_slice_var.py @@ -14,14 +14,14 @@ import math import unittest -from paddle.fluid.transpiler.distribute_transpiler import split_variable +from paddle.fluid.transpiler.distribute_transpiler import slice_variable import paddle.fluid as fluid import paddle.fluid.core as core import random -class TestSplitVar(unittest.TestCase): - def check_split_output(self, shapes, expected_sizes, min_size): +class TestSliceVar(unittest.TestCase): + def check_slice_output(self, shapes, expected_sizes, min_size): var_list = [] program = fluid.Program() for shape in shapes: @@ -31,7 +31,7 @@ class TestSplitVar(unittest.TestCase): # dtype=core.VarDesc.VarType.LOD_TENSOR, shape=shape) var_list.append(var) - blocks = split_variable(var_list, 10, min_size) + blocks = slice_variable(var_list, 10, min_size) all_sizes = [] for s in expected_sizes: for s2 in s: @@ -49,7 +49,7 @@ class TestSplitVar(unittest.TestCase): [1150, 1150, 1150, 1150, 1150, 1150, 1100] ] - self.check_split_output(shapes, expected_sizes, 1024) + self.check_slice_output(shapes, expected_sizes, 1024) def test_check_output_8k(self): shapes = [[3, 5], [1024], [28, 784], [8, 1020], [800, 10], @@ -57,7 +57,7 @@ class TestSplitVar(unittest.TestCase): expected_sizes = [[15], [1024], [10976, 10976], [8160], [8000], [35937, 35937, 35937, 35937, 35937, 35937]] - self.check_split_output(shapes, expected_sizes, 8192) + self.check_slice_output(shapes, expected_sizes, 8192) if __name__ == '__main__': diff --git a/python/paddle/fluid/tests/unittests/transpiler_test.py b/python/paddle/fluid/tests/unittests/transpiler_test.py new file mode 100644 index 0000000000000000000000000000000000000000..d84c5d9c41c705cf6d14cc0b5a8c692b0d646337 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/transpiler_test.py @@ -0,0 +1,73 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import numpy as np + +import paddle.fluid as fluid +import paddle.fluid.core as core +import paddle.fluid.layers as layers + + +class TranspilerTest(unittest.TestCase): + @classmethod + def setUpClass(self): + self.trainer_id = 0 + self.trainers = 2 + self.pservers = 2 + self.pserver_eps = "127.0.0.1:6174,127.0.0.1:6175" + + def net_conf(self): + x = fluid.layers.data(name='x', shape=[1000], dtype='float32') + + y_predict = fluid.layers.fc(input=x, + size=1000, + act=None, + param_attr=fluid.ParamAttr(name='fc_w')) + + y = fluid.layers.data(name='y', shape=[1], dtype='float32') + + cost = fluid.layers.square_error_cost(input=y_predict, label=y) + avg_cost = fluid.layers.mean(cost) + sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.1) + + optimize_ops, params_grads = sgd_optimizer.minimize(avg_cost) + return optimize_ops, params_grads + + def get_main_program(self): + main = fluid.Program() + + with fluid.program_guard(main): + self.net_conf() + + return main + + def get_trainer(self): + return self._transpiler_instance().get_trainer_program() + + def get_pserver(self, ep): + t = self._transpiler_instance() + pserver = t.get_pserver_program(ep) + startup = t.get_startup_program(ep, pserver) + return pserver, startup + + def _transpiler_instance(self): + main = self.get_main_program() + t = fluid.DistributeTranspiler() + t.transpile( + self.trainer_id, + program=main, + pservers=self.pserver_eps, + trainers=self.trainers) + return t diff --git a/python/paddle/fluid/transpiler/__init__.py b/python/paddle/fluid/transpiler/__init__.py index 045ca537b2e84c02298d6375a7ef5bdbb5517380..cf18090f71f34be5105498f5846dbcdf15ab2e3f 100644 --- a/python/paddle/fluid/transpiler/__init__.py +++ b/python/paddle/fluid/transpiler/__init__.py @@ -15,10 +15,9 @@ from distribute_transpiler import DistributeTranspiler from inference_transpiler import InferenceTranspiler from memory_optimization_transpiler import memory_optimize, release_memory -from distribute_transpiler_simple import SimpleDistributeTranspiler from ps_dispatcher import HashName, RoundRobin __all__ = [ - "DistributeTranspiler", "InferenceTranspiler", "SimpleDistributeTranspiler", - "memory_optimize", "release_memory", "HashName", "RoundRobin" + "DistributeTranspiler", "InferenceTranspiler", "memory_optimize", + "release_memory", "HashName", "RoundRobin" ] diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index 06b0a1375ce6568cca864cd8a2dd69ee46b223a7..da001add8e12fab8c8a09ede8ef27cbbf653865e 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -39,6 +39,7 @@ Steps to transpile pserver: from __future__ import print_function import math +import numpy as np from ps_dispatcher import RoundRobin, HashName, PSDispatcher from .. import core, framework @@ -70,7 +71,7 @@ def same_or_split_var(p_name, var_name): return p_name == var_name or p_name.startswith(var_name + ".block") -def split_variable(var_list, service_count, min_block_size=8192): +def slice_variable(var_list, slice_count, min_block_size=8192): """ We may need to split dense tensor to one or more blocks and put them equally onto parameter server. One block is a sub-tensor @@ -78,25 +79,25 @@ def split_variable(var_list, service_count, min_block_size=8192): We need to have a minimal block size so that the calculations in the parameter server side can gain better performance. By default - minimum block size 8K elements (maybe 16bit or 32bit or 64bit). + minimum block size 8K elements (maybe 16bit or 32bit or 64bit). Args: var_list (list): List of variables. - service_count (int): Numel of pserver services. A pserver may have two - or more listening ports. + slice_count (int): Numel of count that variables will be sliced, which + could be the pserver services' count. min_block_size (int): Minimum splitted block size. Returns: - blocks (list[(varname, block_id, current_block_size)]): A list + blocks (list[(varname, block_id, current_block_size)]): A list of VarBlocks. Each VarBlock specifies a shard of the var. """ blocks = [] for var in var_list: - split_count = service_count + split_count = slice_count var_numel = reduce(lambda x, y: x * y, var.shape) max_pserver_count = int(math.floor(var_numel / float(min_block_size))) if max_pserver_count == 0: max_pserver_count = 1 - if max_pserver_count < service_count: + if max_pserver_count < slice_count: split_count = max_pserver_count block_size = int(math.ceil(var_numel / float(split_count))) @@ -177,7 +178,7 @@ class DistributeTranspiler: for index in range(len(self.pserver_endpoints)) ] - def _init_splited_vars(self, split_method): + def _init_splited_vars(self, slice_var_up): # update these mappings for further transpile: # 1. param_var_mapping: param var name -> [splited params vars] # 2. grad_var_mapping: grad var name -> [splited grads vars] @@ -196,9 +197,19 @@ class DistributeTranspiler: self._update_dist_lookup_table_vars(param_list, grad_list, self.params_grads) - grad_blocks = split_variable(grad_list, len(self.pserver_endpoints)) - param_blocks = split_variable(param_list, len(self.pserver_endpoints)) + if slice_var_up: + # when we slice var up into blocks, we will slice the var according to + # pserver services' count. A pserver may have two or more listening ports. + grad_blocks = slice_variable(grad_list, len(self.pserver_endpoints)) + param_blocks = slice_variable(param_list, + len(self.pserver_endpoints)) + else: + # when we do NOT slice var up into blocks, we will always slice params + # grads into one block. + grad_blocks = slice_variable(grad_list, 1) + param_blocks = slice_variable(param_list, 1) assert (len(grad_blocks) == len(param_blocks)) + # origin_varname -> [splited_var] self.param_var_mapping = self._create_vars_from_blocklist( self.origin_program, param_blocks) @@ -229,6 +240,7 @@ class DistributeTranspiler: program=None, pservers="127.0.0.1:6174", trainers=1, + slice_var_up=True, split_method=RoundRobin, sync_mode=True): """ @@ -262,13 +274,27 @@ class DistributeTranspiler: self.has_distributed_lookup_table = self._has_distributed_lookup_table() # split and create vars, then put splited vars in dicts for later use. - self._init_splited_vars(split_method) + self._init_splited_vars(slice_var_up) # step 3.1: insert send op to send gradient vars to parameter servers ps_dispatcher.reset() send_vars = [] - for orig_varname, splited_vars in self.grad_var_mapping.items(): + + # in general cases, the number of pservers is times of 2, and this + # will lead to uneven distribution among weights and bias: + # fc_w@GRAD_trainer_0, fc_w@GRAD_trainer_1 --> pserver1 + # fc_b@GRAD_trainer_0, fc_b@GRAD_trainer_1 --> pserver2 + # shuffle the map will avoid the uneven distribution above + grad_var_mapping_items = self.grad_var_mapping.items() + if not slice_var_up: + np.random.shuffle(grad_var_mapping_items) + + for orig_varname, splited_vars in grad_var_mapping_items: eplist = ps_dispatcher.dispatch(splited_vars) + + if not slice_var_up: + assert (len(splited_vars) == 1) + if len(splited_vars) == 1: orig_varname = splited_vars[0].name index = find_op_by_output_arg(program.global_block(), @@ -316,6 +342,7 @@ class DistributeTranspiler: for i, ep in enumerate(eplist): self.param_grad_ep_mapping[ep]["params"].append(recv_vars[i]) self.param_grad_ep_mapping[ep]["grads"].append(send_vars[i]) + # step4: Concat the parameters splits together after recv. for varname, splited_var in self.param_var_mapping.iteritems(): eps = [] @@ -788,8 +815,8 @@ class DistributeTranspiler: program (ProgramDesc): ProgramDesc which gradients blong. block_list (list[(varname, block_id, block_size)]): List of gradient blocks. add_trainer_suffix (Bool): Add trainer suffix to new variable's name if set True. - Returns: - var_mapping (dict(varname->[new_varname_variable])):A dict mapping + Returns: + var_mapping (dict(varname->[new_varname_variable])):A dict mapping from original var name to each var split. """ diff --git a/python/paddle/fluid/transpiler/distribute_transpiler_simple.py b/python/paddle/fluid/transpiler/distribute_transpiler_simple.py deleted file mode 100644 index ea8c27cdca885dbbf90349b35df9691951264061..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/transpiler/distribute_transpiler_simple.py +++ /dev/null @@ -1,254 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ..framework import Program, default_main_program, Parameter, Variable -from ..layer_helper import LayerHelper - - -def hash_name_to_server(params_grads, pserver_endpoints): - """ - :param param_grads: - :return: a map of pserver endpoint -> - params -> [param list] - grads -> [grad list] - """ - - def _hash_param(param_name, total): - return hash(param_name) % total - - param_grad_map = dict() - for param, grad in params_grads: - if param.trainable is True and grad is not None: - server_id = _hash_param(param.name, len(pserver_endpoints)) - server_for_param = pserver_endpoints[server_id] - if not param_grad_map.has_key(server_for_param): - param_grad_map[server_for_param] = {"params": [], "grads": []} - param_grad_map[server_for_param]["params"].append(param) - param_grad_map[server_for_param]["grads"].append(grad) - - return param_grad_map - - -def round_robin(params_grads, pserver_endpoints): - assert (len(params_grads) > len(pserver_endpoints)) - - param_grad_map = dict() - pserver_idx = 0 - for param, grad in params_grads: - if param.trainable is True: - server_for_param = pserver_endpoints[pserver_idx] - if not param_grad_map.has_key(server_for_param): - param_grad_map[server_for_param] = {"params": [], "grads": []} - - param_grad_map[server_for_param]["params"].append(param) - param_grad_map[server_for_param]["grads"].append(grad) - - pserver_idx += 1 - if pserver_idx >= len(pserver_endpoints): - pserver_idx = 0 - return param_grad_map - - -class SimpleDistributeTranspiler: - def transpile(self, - optimize_ops, - params_grads, - program=None, - pservers="127.0.0.1:6174", - trainers=1, - split_method=round_robin): - """ - Transpile the program to a distributed data-parallelism programs. - - The main_program will be transform to use a remote parameter server - to do parameter optimization. And the optimization graph will be put - in to a parameter server program. - - Use different methods to split trainable varialbles to different - parameter servers. - - Example to run: - - exe = fluid.Executor(place) - t = fluid.DistributeTranspiler() - t.transpile(optimize_ops, params_grads, pservers="127.0.0.1:6174", trainers=1) - - pserver_endpoint = os.getenv("PSERVER") - if pserver_endpoint: - pserver_prog = t.get_pserver_program(pserver_endpoint, optimize_ops) - exe.run(fluid.default_startup_program()) - exe.run(pserver_prog) - else: - feeder = fluid.DataFeeder(feed_list=[images, label], place=place) - exe.run(fluid.default_startup_program()) - - for pass_id in range(PASS_NUM): - ... - - :param optimize_ops: op list of optimization, should be the - return value of Optimizer.minimize - :type optimize_ops: list - :param program: program to optimize, default default_main_program - :param pservers: parameter server endpoints like "m1:6174,m2:6174" - :type pservers: string - - :return: return a list of programs - """ - if program is None: - program = default_main_program() - self.program = program - self.trainers = trainers - self.optimize_ops = optimize_ops - self._optimize_distributed( - optimize_ops, - program, - params_grads, - pservers=pservers, - trainers=trainers, - split_method=split_method) - - def _clone_param(self, block, v): - assert isinstance(v, Parameter) - new_p = Parameter( - block=block, - shape=v.shape, - dtype=v.dtype, - type=v.type, - lod_level=v.lod_level, - stop_gradient=v.stop_gradient, - trainable=v.trainable, - optimize_attr=v.optimize_attr, - regularizer=v.regularizer, - name=v.name) - block.vars[new_p.name] = new_p - - def _clone_var(self, block, var): - assert isinstance(var, Variable) - return block.create_var( - name=var.name, - shape=var.shape, - dtype=var.dtype, - type=var.type, - lod_level=var.lod_level, - persistable=var.persistable) - - def _optimize_distributed(self, optimize_ops, program, params_and_grads, - **kwargs): - if kwargs.has_key("split_method"): - split_method = kwargs["split_method"] - else: - split_method = round_robin - - assert (callable(split_method)) - pserver_endpoints = kwargs["pservers"].split(",") - self.param_grad_map = split_method(params_and_grads, pserver_endpoints) - - send_op_ordered_inputs = [] - send_op_ordered_outputs = [] - epmap = [] - for ep, v in self.param_grad_map.iteritems(): - send_op_ordered_inputs.extend(v["grads"]) - send_op_ordered_outputs.extend(v["params"]) - for i in v["grads"]: - epmap.append(ep) - send_op = program.global_block().append_op( - type="send", - inputs={"X": send_op_ordered_inputs - }, # inputs is a list of tensors to be send - outputs={"Out": send_op_ordered_outputs}, - attrs={"endpoints": pserver_endpoints, - "epmap": epmap}) - - def get_trainer_program(self): - # remove optimize ops and add a send op to main_program - self.program.global_block().delete_ops(self.optimize_ops) - return self.program - - def _create_var_for_trainers(self, block, var, trainers): - var_list = [] - for i in xrange(trainers): - var_each = block.create_var( - name="%s.trainer_%d" % (var.name, i), - psersistable=var.persistable, - dtype=var.dtype, - shape=var.shape) - var_list.append(var_each) - return var_list - - def get_pserver_program(self, endpoint, optimize_ops): - pserver_program = Program() - for v in self.param_grad_map[endpoint]["params"]: - self._clone_param(pserver_program.global_block(), v) - - optimize_sub_program = Program() - grad_var_names = [ - var.name for var in self.param_grad_map[endpoint]["grads"] - ] - for opt_op in optimize_ops: - for _, var in opt_op.inputs.iteritems(): - # NOTE: append operators to merge gradients from multiple - # trainers. If trainers == 1, this is not needed. - if self.trainers > 1 and var.name in grad_var_names: - vars2merge = self._create_var_for_trainers( - optimize_sub_program.global_block(), var, self.trainers) - merged_var = optimize_sub_program.global_block().create_var( - name=var.name, - persistable=var.persistable, - dtype=var.dtype, - shape=var.shape) - optimize_sub_program.global_block().append_op( - type="sum", - inputs={"X": vars2merge}, - outputs={"Out": merged_var}) - optimize_sub_program.global_block().append_op( - type="scale", - inputs={"X": merged_var}, - outputs={"Out": merged_var}, - attrs={"scale": 1.0 / float(self.trainers)}) - else: - optimize_sub_program.global_block().create_var( - name=var.name, - persistable=var.persistable, - dtype=var.dtype, - shape=var.shape) - - if opt_op.inputs.has_key("Grad"): - if opt_op.inputs["Grad"].name in grad_var_names: - optimize_sub_program.global_block().append_op( - type=opt_op.type, - inputs=opt_op.inputs, - outputs=opt_op.outputs, - attrs=opt_op.attrs) - else: - optimize_sub_program.global_block().append_op( - type=opt_op.type, - inputs=opt_op.inputs, - outputs=opt_op.outputs, - attrs=opt_op.attrs) - pserver_program.global_block().append_op( - type="recv", - inputs={"RX": - self.param_grad_map[endpoint]["grads"]}, # grads to recv - outputs={}, - attrs={ - "OptimizeBlock": optimize_sub_program.global_block(), - "endpoint": endpoint, - "ParamList": - [p.name for p in self.param_grad_map[endpoint]["params"]], - "GradList": - [p.name for p in self.param_grad_map[endpoint]["grads"]], - "Trainers": self.trainers - }) - pserver_program.sync_with_cpp() - return pserver_program