distribute_planner.py 7.1 KB
Newer Older
T
typhoonzero 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
import framework
from backward import append_backward_ops
from regularizer import append_regularization_ops
import optimizer
from layer_helper import LayerHelper

__all__ = ['SGD', 'Momentum', 'Adagrad', 'Adam', 'Adamax', 'DecayedAdagrad']


def hash_name_to_server(parameters_and_grads, pserver_endpoints):
    def _hash_param(param_name, total):
        return hash(param_name) % total

    param_map = dict()
    grad_map = dict()
    for param_and_grad in parameters_and_grads:
        if param_and_grad[0].trainable is True and param_and_grad[
                1] is not None:
            server_id = _hash_param(param_and_grad[0].name,
                                    len(pserver_endpoints))
            server_for_param = pserver_endpoints[server_id]
            if param_map.has_key(server_for_param):
                param_map[server_for_param].append(param_and_grad[0])
            else:
                param_map[server_for_param] = [param_and_grad[0]]

            if grad_map.has_key(server_for_param):
                grad_map[server_for_param].append(param_and_grad[1])
            else:
                grad_map[server_for_param] = [param_and_grad[1]]
    return param_map, grad_map


def round_robin(parameters_and_grads, pserver_endpoints):
    if len(parameters_and_grads) < len(pserver_endpoints):
        raise Exception("parameters is less than pservers")

    param_map = dict()
    grad_map = dict()
    pserver_idx = 0
    for param_and_grad in parameters_and_grads:
        if param_and_grad[0].trainable is True and param_and_grad[
                1] is not None:

            server_for_param = pserver_endpoints[pserver_idx]
            if param_map.has_key(server_for_param):
                param_map[server_for_param].append(param_and_grad[0])
            else:
                param_map[server_for_param] = [param_and_grad[0]]

            if grad_map.has_key(server_for_param):
                grad_map[server_for_param].append(param_and_grad[1])
            else:
                grad_map[server_for_param] = [param_and_grad[1]]
            pserver_idx += 1
            if pserver_idx > len(pserver_endpoints):
                pserver_idx = 0
    return param_map, grad_map


def _append_sendop_for_trainer(loss,
                               parameters_and_grads,
                               pserver_endpoints,
                               split_method=round_robin):
    assert (callable(split_method))
    param_map, grad_map = \
        split_method(parameters_and_grads, pserver_endpoints)

    for ep in pserver_endpoints:
        # FIXME(typhoonzero): send to different servers can run in parrallel.
        send_op = loss.block.append_op(
            type="send",
            inputs={"X": param_map[ep]},
            outputs={"Out": param_map[ep]},
            attrs={"endpoint": ep})

    return send_op


class DistributedPlanner(optimizer.Optimizer):
    def __init__(self, global_step=None, parallelism_type='dp'):
        """
            parallelism_type:
                dp: data parallelism
                mp: model parallelism
        """
        super(DistributedPlanner).__init__(self, global_step)
        if parallelism_type == "mp":
            raise NotImplementedError("model parallelism not implemented")
        elif parallelism_type == "dp":
            self.parameter_server_program_map = dict()
            self.worker_program = None
        else:
            raise NameError("parallelism_type %s not supported" %
                            parallelism_type)

    def create_optimization_pass(self,
                                 parameters_and_grads,
                                 program,
                                 startup_program=None):
        # Create any accumulators
        self.helper = LayerHelper(
            self.__class__.__name__,
            main_program=program,
            startup_program=startup_program)
        self._create_accumulators(program.global_block(),
                                  [p[0] for p in parameters_and_grads])

        optimize_ops = []
        for param_and_grad in parameters_and_grads:
            if param_and_grad[0].trainable is True and param_and_grad[
                    1] is not None:
                optimize_op = self._append_optimize_op(program.global_block(),
                                                       param_and_grad)
                optimize_ops.append(optimize_op)

        # Returned list of ops can include more ops in addition
        # to optimization ops
        return_ops = optimize_ops

        # Get custom finish ops for subclasses
        # FIXME: Need to fix this once we figure out how to handle dependencies
        finish_ops = self._finish_update(program.global_block())
        if finish_ops is not None:
            return_ops += finish_ops

        if self._global_step is not None:
            return_ops.append(
                self._increment_global_step(program.global_block()))
        return return_ops

    def minimize(self,
                 loss,
                 startup_program=None,
                 parameter_list=None,
                 no_grad_set=None,
                 split_method=round_robin):
        """
            For distributed case, this call append backward ops and then
            append sevaral send_ops at the end for each parameter server.

            Then call get_pserver_program(idx/endpoint) will return the program of
            coresponding pserver program to run.
        """
        params_grads = append_backward_ops(loss, parameter_list, no_grad_set)
        # Add regularization if any
        params_grads = append_regularization_ops(params_grads)
        _append_sendop_for_trainer(loss, params_grads, self.pserver_endpoints,
                                   split_method)
        self.worker_program = loss.block.program

        optimize_sub_program = framework.Program()
        optimize_ops = self.create_optimization_pass(
            params_grads, optimize_sub_program, startup_program)
        param_list = []
        for param_and_grad in params_grads:
            if param_and_grad[0].trainable is True and param_and_grad[
                    1] is not None:
                param_list.append(param_and_grad[0])

        param_map, grad_map = \
            split_method(params_grads, self.pserver_endpoints)

        for ep in self.pserver_endpoints:
            pserver_program = framework.Program()
            self.parameter_server_program_map[ep] = pserver_program
            pserver_program.global_block().append_op(
                type="recv",
                inputs={"RX": param_map[ep]},
                outputs={},
                attrs={
                    "OptimizeBlock": optimize_sub_program.global_block(),
                    "endpoint": ep
                })
        # FIXME(typhoonzero): when to use this return value?
        return None

    def get_pserver_program(self, endpoint):
        return self.parameter_server_program_map.get(endpoint)


SGD = optimizer.SGDOptimizer
Momentum = optimizer.MomentumOptimizer
Adagrad = optimizer.AdagradOptimizer
Adam = optimizer.AdamOptimizer
Adamax = optimizer.AdamaxOptimizer
DecayedAdagrad = optimizer.DecayedAdagradOptimizer

for optcls in __all__:
    eval(optcls).__base__ = DistributedPlanner