downpour.py 7.8 KB
Newer Older
D
dongdaxiang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and

D
dongdaxiang 已提交
14 15
from .node import DownpourServer
from .node import DownpourWorker
D
dongdaxiang 已提交
16 17
from ..backward import append_backward
import ps_pb2 as pslib
D
dongdaxiang 已提交
18
from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table
19 20 21 22 23 24
from paddle.fluid.distribute_lookup_table import (
    find_distributed_lookup_table_inputs,
)
from paddle.fluid.distribute_lookup_table import (
    find_distributed_lookup_table_outputs,
)
D
dongdaxiang 已提交
25
from google.protobuf import text_format
D
dongdaxiang 已提交
26

27

D
dongdaxiang 已提交
28
class DownpourSGD(object):
29
    r"""
30 31 32 33 34 35 36 37 38
    Distributed optimizer of downpour stochastic gradient descent
    Standard implementation of Google's Downpour SGD
    in Large Scale Distributed Deep Networks

    Args:
        learning_rate (float): the learning rate used to update parameters. \
        Can be a float value
    Examples:
        .. code-block:: python
39

40 41 42
             opt = fluid.DistributedOptimizer(sgd_opt)
             opt.minimize()

43 44 45
             downpour_sgd = fluid.distributed.DownpourSGD(learning_rate=0.2)
             downpour_sgd.minimize(cost)
    """
46

D
dongdaxiang 已提交
47
    def __init__(self, learning_rate=0.001, window=1):
48 49
        # todo(guru4elephant): add more optimizers here as argument
        # todo(guru4elephant): make learning_rate as a variable
D
dongdaxiang 已提交
50
        self.learning_rate_ = learning_rate
D
dongdaxiang 已提交
51
        self.window_ = window
52
        self.type = "downpour"
H
heqiaozhi 已提交
53
        self.data_norm_name = [
54 55 56 57 58 59
            ".batch_size",
            ".batch_square_sum",
            ".batch_sum",
            ".batch_size@GRAD",
            ".batch_square_sum@GRAD",
            ".batch_sum@GRAD",
H
heqiaozhi 已提交
60
        ]
61

62 63 64 65 66 67 68
    def minimize(
        self,
        losses,
        startup_program=None,
        parameter_list=None,
        no_grad_set=None,
    ):
69 70 71
        """
        DownpounSGD is a distributed optimizer so
        that user can call minimize to generate backward
T
tianshuo78520a 已提交
72
        operators and optimization operators within minimize function
73 74 75 76 77 78 79 80 81 82 83 84
        Args:
            loss(Variable): loss variable defined by user
            startup_program(Program): startup program that defined by user
            parameter_list(str list): parameter names defined by users
            no_grad_set(set): a set of variables that is defined by users
            so that these variables do not need gradient computation
        Returns:
            [ps_param, worker_skipped_ops]
            ps_param: parameter server protobuf desc
            worker_skipped_ops: operator names that need
            to be skipped during execution
        """
H
heqiaozhi 已提交
85 86 87
        if not isinstance(losses, list):
            raise ValueError('losses is a list, just lick [model.cost]')
        table_name = find_distributed_lookup_table(losses[0].block.program)
88
        prefetch_slots = find_distributed_lookup_table_inputs(
89 90
            losses[0].block.program, table_name
        )
91
        prefetch_slots_emb = find_distributed_lookup_table_outputs(
92 93
            losses[0].block.program, table_name
        )
H
heqiaozhi 已提交
94 95

        ps_param = pslib.PSParameter()
D
dongdaxiang 已提交
96
        server = DownpourServer()
D
dongdaxiang 已提交
97
        worker = DownpourWorker(self.window_)
98
        sparse_table_index = 0
99 100 101 102 103 104 105 106 107 108 109 110
        server.add_sparse_table(
            sparse_table_index,
            self.learning_rate_,
            prefetch_slots,
            prefetch_slots_emb,
        )
        worker.add_sparse_table(
            sparse_table_index,
            self.learning_rate_,
            prefetch_slots,
            prefetch_slots_emb,
        )
H
heqiaozhi 已提交
111 112
        dense_table_index = 1
        program_configs = []
113
        param_grads_list = []
H
heqiaozhi 已提交
114 115
        for loss_index in range(len(losses)):
            program_config = ps_param.trainer_param.program_config.add()
116 117 118
            program_config.program_id = str(
                id(losses[loss_index].block.program)
            )
H
heqiaozhi 已提交
119 120
            program_config.pull_sparse_table_id.extend([sparse_table_index])
            program_config.push_sparse_table_id.extend([sparse_table_index])
121 122 123 124 125 126
            params_grads = sorted(
                append_backward(
                    losses[loss_index], parameter_list, no_grad_set
                ),
                key=lambda x: x[0].name,
            )
127
            param_grads_list.append(params_grads)
H
heqiaozhi 已提交
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
            params = []
            grads = []
            data_norm_params = []
            data_norm_grads = []
            for i in params_grads:
                is_data_norm_data = False
                for data_norm_name in self.data_norm_name:
                    if i[0].name.endswith(data_norm_name):
                        is_data_norm_data = True
                        data_norm_params.append(i[0])
                if not is_data_norm_data:
                    params.append(i[0])
            for i in params_grads:
                is_data_norm_data = False
                for data_norm_grad in self.data_norm_name:
                    if i[0].name.endswith(data_norm_grad):
                        is_data_norm_data = True
                        data_norm_grads.append(i[1])
                if not is_data_norm_data:
                    grads.append(i[1])
148 149 150 151 152 153
            server.add_dense_table(
                dense_table_index, self.learning_rate_, params, grads
            )
            worker.add_dense_table(
                dense_table_index, self.learning_rate_, params, grads
            )
H
heqiaozhi 已提交
154 155 156 157
            program_config.pull_dense_table_id.extend([dense_table_index])
            program_config.push_dense_table_id.extend([dense_table_index])
            if len(data_norm_params) != 0 and len(data_norm_grads) != 0:
                dense_table_index += 1
158 159 160 161 162 163 164 165 166 167 168 169
                server.add_data_norm_table(
                    dense_table_index,
                    self.learning_rate_,
                    data_norm_params,
                    data_norm_grads,
                )
                worker.add_dense_table(
                    dense_table_index,
                    self.learning_rate_,
                    data_norm_params,
                    data_norm_grads,
                )
H
heqiaozhi 已提交
170 171 172 173
                program_config.pull_dense_table_id.extend([dense_table_index])
                program_config.push_dense_table_id.extend([dense_table_index])
            dense_table_index += 1
            program_configs.append(program_config)
D
dongdaxiang 已提交
174
        ps_param.server_param.CopyFrom(server.get_desc())
H
heqiaozhi 已提交
175
        ps_param.trainer_param.CopyFrom(worker.get_desc())
H
heqiaozhi 已提交
176 177
        for program_config in program_configs:
            ps_param.trainer_param.program_config.extend([program_config])
178 179
        # Todo(guru4elephant): figure out how to support more sparse parameters
        # currently only support lookup_table
D
dongdaxiang 已提交
180
        worker_skipped_ops = ["lookup_table", "lookup_table_grad"]
H
heqiaozhi 已提交
181
        ps_param.trainer_param.skip_op.extend(worker_skipped_ops)
D
dongdaxiang 已提交
182 183 184 185 186 187 188 189 190 191 192 193 194 195

        # all fleet operations should be defined in operators in the future
        # we want to return an object here containing:
        # 1) worker execution strategy
        # 2) pserver execution strategy
        # 3) fleet configurations
        # 4) skipped operators in runtime
        # 5) distributed optimization
        opt_info = {}
        opt_info["trainer"] = "DistMultiTrainer"
        opt_info["device_worker"] = "DownpourSGD"
        opt_info["optimizer"] = "DownpourSGD"
        opt_info["fleet_desc"] = ps_param
        opt_info["worker_skipped_ops"] = worker_skipped_ops
196 197 198 199 200

        for loss in losses:
            loss.block.program._fleet_opt = opt_info

        return None, param_grads_list