From dd46d95fe4c3bcb21fed8264cc325361322ebd6c Mon Sep 17 00:00:00 2001 From: typhoonzero Date: Wed, 6 Dec 2017 21:08:38 +0800 Subject: [PATCH] wip --- python/paddle/v2/fluid/distribute_planner.py | 43 ++++------- python/paddle/v2/fluid/executor.py | 75 ++++++++++++++++++++ python/paddle/v2/fluid/framework.py | 3 + 3 files changed, 92 insertions(+), 29 deletions(-) diff --git a/python/paddle/v2/fluid/distribute_planner.py b/python/paddle/v2/fluid/distribute_planner.py index 86b11ac558..2eb32b5227 100644 --- a/python/paddle/v2/fluid/distribute_planner.py +++ b/python/paddle/v2/fluid/distribute_planner.py @@ -7,55 +7,40 @@ from layer_helper import LayerHelper __all__ = ['SGD', 'Momentum', 'Adagrad', 'Adam', 'Adamax', 'DecayedAdagrad'] -def hash_name_to_server(parameters_and_grads, pserver_endpoints): +def hash_name_to_server(parameters, pserver_endpoints): def _hash_param(param_name, total): return hash(param_name) % total param_map = dict() - grad_map = dict() - for param_and_grad in parameters_and_grads: - if param_and_grad[0].trainable is True and param_and_grad[ - 1] is not None: - server_id = _hash_param(param_and_grad[0].name, - len(pserver_endpoints)) + for param in parameters: + if param.trainable is True: + server_id = _hash_param(param.name, len(pserver_endpoints)) server_for_param = pserver_endpoints[server_id] if param_map.has_key(server_for_param): - param_map[server_for_param].append(param_and_grad[0]) + param_map[server_for_param].append(param) else: - param_map[server_for_param] = [param_and_grad[0]] + param_map[server_for_param] = [param] - if grad_map.has_key(server_for_param): - grad_map[server_for_param].append(param_and_grad[1]) - else: - grad_map[server_for_param] = [param_and_grad[1]] - return param_map, grad_map + return param_map -def round_robin(parameters_and_grads, pserver_endpoints): - if len(parameters_and_grads) < len(pserver_endpoints): - raise Exception("parameters is less than pservers") +def round_robin(parameters, pserver_endpoints): + assert (len(parameters) < len(pserver_endpoints)) param_map = dict() - grad_map = dict() pserver_idx = 0 - for param_and_grad in parameters_and_grads: - if param_and_grad[0].trainable is True and param_and_grad[ - 1] is not None: - + for param in parameters: + if param.trainable is True: server_for_param = pserver_endpoints[pserver_idx] if param_map.has_key(server_for_param): - param_map[server_for_param].append(param_and_grad[0]) + param_map[server_for_param].append(param) else: - param_map[server_for_param] = [param_and_grad[0]] + param_map[server_for_param] = [param] - if grad_map.has_key(server_for_param): - grad_map[server_for_param].append(param_and_grad[1]) - else: - grad_map[server_for_param] = [param_and_grad[1]] pserver_idx += 1 if pserver_idx > len(pserver_endpoints): pserver_idx = 0 - return param_map, grad_map + return param_map def _append_sendop_for_trainer(loss, diff --git a/python/paddle/v2/fluid/executor.py b/python/paddle/v2/fluid/executor.py index bdc82eede9..4a03e55ee0 100644 --- a/python/paddle/v2/fluid/executor.py +++ b/python/paddle/v2/fluid/executor.py @@ -1,6 +1,7 @@ import numpy as np from . import core from framework import Program, default_main_program +import distribute_planner __all__ = ['Executor', 'g_scope'] @@ -49,6 +50,80 @@ class Executor(object): self.executor = core.Executor(act_places) self.places = places + def optimize(self, optimize_ops, program=None, **kwargs): + """ + optimize the program for different runtime environment + + :param optimize_ops: op list of optimization, should be the + return value of Optimizer.minimize + :type optimize_ops: list + :param program: program to optimize, default default_main_program + :param pservers: parameter server endpoints like "m1:6174,m2:6174" + :type pservers: string + + :return: return a list of programs + """ + if program is None: + program = default_main_program() + + if kwargs.has_key("pservers"): + return self._optimize_distributed(optimize_ops, program, **kwargs) + + def _optimize_distributed(self, optimize_ops, program, **kwargs): + # remove optimize ops and add a send op to main_program + # FIXME(typhoonzero): delete_op only remove the first accurence, + # need to consider about multiple same optimize op? + for op in optimize_ops: + program.global_block().delete_op(op) + if kwargs.has_key("split_method"): + split_method = kwargs["split_method"] + else: + split_method = distribute_planner.round_robin + + assert (callable(split_method)) + pserver_endpoints = kwargs["pservers"].split(",") + params = program.global_block().all_parameters() + param_map = split_method(params, pserver_endpoints) + + for ep in pserver_endpoints: + # FIXME(typhoonzero): send to different servers can run in parrallel. + send_op = program.global_block().append_op( + type="send", + inputs={"X": param_map[ep] + }, # inputs is a list of tensors to be send + outputs={"Out": param_map[ep]}, + attrs={"endpoint": ep}) + # -------------- generate pserver program -------------- + self.parameter_server_program_map = dict() + + optimize_sub_program = Program() + optimize_ops = self.create_optimization_pass( + params_grads, optimize_sub_program, startup_program) + param_list = [] + for param in params: + if param.trainable is True: + param_list.append(param) + + param_map = split_method(params, pserver_endpoints) + + for ep in pserver_endpoints: + pserver_program = Program() + self.parameter_server_program_map[ep] = pserver_program + pserver_program.global_block().append_op( + type="recv", + inputs={"RX": param_map[ep]}, # grads to recv + outputs={}, + attrs={ + "OptimizeBlock": optimize_sub_program.global_block(), + "endpoint": ep + }) + + def get_pserver_program(self, endpoint): + pass + + def get_trainer_program(self): + return default_main_program() + def aslodtensor(self, data): def accumulate(data): if not isinstance(data, list): diff --git a/python/paddle/v2/fluid/framework.py b/python/paddle/v2/fluid/framework.py index 49c6d89834..99fe94942b 100644 --- a/python/paddle/v2/fluid/framework.py +++ b/python/paddle/v2/fluid/framework.py @@ -425,6 +425,9 @@ class Block(object): self.ops.append(op) return op + def delete_op(self, op): + self.ops.remove(op) + def prepend_op(self, *args, **kwargs): op_desc = self.desc.prepend_op() op = Operator(self, op_desc, *args, **kwargs) -- GitLab