downpour.py 7.3 KB
Newer Older
D
dongdaxiang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and

D
dongdaxiang 已提交
14 15
from .node import DownpourServer
from .node import DownpourWorker
D
dongdaxiang 已提交
16 17
from ..backward import append_backward
import ps_pb2 as pslib
D
dongdaxiang 已提交
18
from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table
19 20
from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table_inputs
from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table_outputs
D
dongdaxiang 已提交
21
from google.protobuf import text_format
D
dongdaxiang 已提交
22

23

D
dongdaxiang 已提交
24
class DownpourSGD(object):
25 26 27 28 29 30 31 32 33 34 35 36 37 38
    """
    Distributed optimizer of downpour stochastic gradient descent
    Standard implementation of Google's Downpour SGD
    in Large Scale Distributed Deep Networks

    Args:
        learning_rate (float): the learning rate used to update parameters. \
        Can be a float value
    Examples:
        .. code-block:: python
    
             downpour_sgd = fluid.distributed.DownpourSGD(learning_rate=0.2)
             downpour_sgd.minimize(cost)
    """
39

D
dongdaxiang 已提交
40
    def __init__(self, learning_rate=0.001, window=1):
41 42
        # todo(guru4elephant): add more optimizers here as argument
        # todo(guru4elephant): make learning_rate as a variable
D
dongdaxiang 已提交
43
        self.learning_rate_ = learning_rate
D
dongdaxiang 已提交
44
        self.window_ = window
45
        self.type = "downpour"
H
heqiaozhi 已提交
46 47 48 49
        self.data_norm_name = [
            ".batch_size", ".batch_square_sum", ".batch_sum",
            ".batch_size@GRAD", ".batch_square_sum@GRAD", ".batch_sum@GRAD"
        ]
50 51

    def minimize(self,
H
heqiaozhi 已提交
52
                 losses,
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
                 startup_program=None,
                 parameter_list=None,
                 no_grad_set=None):
        """
        DownpounSGD is a distributed optimizer so
        that user can call minimize to generate backward
        operators and optimization operators within minmize function
        Args:
            loss(Variable): loss variable defined by user
            startup_program(Program): startup program that defined by user
            parameter_list(str list): parameter names defined by users
            no_grad_set(set): a set of variables that is defined by users
            so that these variables do not need gradient computation
        Returns:
            [ps_param, worker_skipped_ops]
            ps_param: parameter server protobuf desc
            worker_skipped_ops: operator names that need
            to be skipped during execution
        """
H
heqiaozhi 已提交
72 73 74
        if not isinstance(losses, list):
            raise ValueError('losses is a list, just lick [model.cost]')
        table_name = find_distributed_lookup_table(losses[0].block.program)
75
        prefetch_slots = find_distributed_lookup_table_inputs(
H
heqiaozhi 已提交
76
            losses[0].block.program, table_name)
77
        prefetch_slots_emb = find_distributed_lookup_table_outputs(
H
heqiaozhi 已提交
78 79 80
            losses[0].block.program, table_name)

        ps_param = pslib.PSParameter()
D
dongdaxiang 已提交
81
        server = DownpourServer()
D
dongdaxiang 已提交
82
        worker = DownpourWorker(self.window_)
83 84
        sparse_table_index = 0
        server.add_sparse_table(sparse_table_index, self.learning_rate_,
D
dongdaxiang 已提交
85
                                prefetch_slots, prefetch_slots_emb)
86
        worker.add_sparse_table(sparse_table_index, self.learning_rate_,
D
dongdaxiang 已提交
87
                                prefetch_slots, prefetch_slots_emb)
H
heqiaozhi 已提交
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
        dense_table_index = 1
        program_configs = []
        for loss_index in range(len(losses)):
            program_config = ps_param.trainer_param.program_config.add()
            program_config.program_id = str(
                id(losses[loss_index].block.program))
            program_config.pull_sparse_table_id.extend([sparse_table_index])
            program_config.push_sparse_table_id.extend([sparse_table_index])
            params_grads = sorted(
                append_backward(losses[loss_index], parameter_list,
                                no_grad_set),
                key=lambda x: x[0].name)
            params = []
            grads = []
            data_norm_params = []
            data_norm_grads = []
            for i in params_grads:
                is_data_norm_data = False
                for data_norm_name in self.data_norm_name:
                    if i[0].name.endswith(data_norm_name):
                        is_data_norm_data = True
                        data_norm_params.append(i[0])
                if not is_data_norm_data:
                    params.append(i[0])
            for i in params_grads:
                is_data_norm_data = False
                for data_norm_grad in self.data_norm_name:
                    if i[0].name.endswith(data_norm_grad):
                        is_data_norm_data = True
                        data_norm_grads.append(i[1])
                if not is_data_norm_data:
                    grads.append(i[1])
            server.add_dense_table(dense_table_index, self.learning_rate_,
                                   params, grads)
            worker.add_dense_table(dense_table_index, self.learning_rate_,
                                   params, grads)
            program_config.pull_dense_table_id.extend([dense_table_index])
            program_config.push_dense_table_id.extend([dense_table_index])
            if len(data_norm_params) != 0 and len(data_norm_grads) != 0:
                dense_table_index += 1
                server.add_data_norm_table(dense_table_index,
                                           self.learning_rate_,
                                           data_norm_params, data_norm_grads)
                worker.add_dense_table(dense_table_index, self.learning_rate_,
                                       data_norm_params, data_norm_grads)
                program_config.pull_dense_table_id.extend([dense_table_index])
                program_config.push_dense_table_id.extend([dense_table_index])
            dense_table_index += 1
            program_configs.append(program_config)
D
dongdaxiang 已提交
137
        ps_param.server_param.CopyFrom(server.get_desc())
H
heqiaozhi 已提交
138
        ps_param.trainer_param.CopyFrom(worker.get_desc())
H
heqiaozhi 已提交
139 140
        for program_config in program_configs:
            ps_param.trainer_param.program_config.extend([program_config])
141 142
        # Todo(guru4elephant): figure out how to support more sparse parameters
        # currently only support lookup_table
D
dongdaxiang 已提交
143
        worker_skipped_ops = ["lookup_table", "lookup_table_grad"]
H
heqiaozhi 已提交
144
        ps_param.trainer_param.skip_op.extend(worker_skipped_ops)
D
dongdaxiang 已提交
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159

        # all fleet operations should be defined in operators in the future
        # we want to return an object here containing:
        # 1) worker execution strategy
        # 2) pserver execution strategy
        # 3) fleet configurations
        # 4) skipped operators in runtime
        # 5) distributed optimization
        opt_info = {}
        opt_info["trainer"] = "DistMultiTrainer"
        opt_info["device_worker"] = "DownpourSGD"
        opt_info["optimizer"] = "DownpourSGD"
        opt_info["fleet_desc"] = ps_param
        opt_info["worker_skipped_ops"] = worker_skipped_ops
        return opt_info