node.py 6.9 KB
Newer Older
D
dongdaxiang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and

D
dongdaxiang 已提交
14
import ps_pb2 as pslib
15

16 17
# NOTE: reduce removed in fuctools in python3
from functools import reduce
D
dongdaxiang 已提交
18

H
heqiaozhi 已提交
19

20
class Server:
H
heqiaozhi 已提交
21
    """
22
    A Server basic class.
H
heqiaozhi 已提交
23 24
    """

D
dongdaxiang 已提交
25 26 27 28
    def __init__(self):
        pass


29
class Worker:
H
heqiaozhi 已提交
30
    """
31
    A Worker basic class.
H
heqiaozhi 已提交
32 33
    """

D
dongdaxiang 已提交
34 35 36 37 38
    def __init__(self):
        pass


class DownpourServer(Server):
H
heqiaozhi 已提交
39
    """
40 41 42 43 44
    DownpourServer class is used to generate server program_desc
    Args:
        server: it is pslib.ServerParameter()
    Examples:
        server = DownpourServer()
H
heqiaozhi 已提交
45 46
    """

D
dongdaxiang 已提交
47
    def __init__(self):
D
dongdaxiang 已提交
48
        self.server_ = pslib.ServerParameter()
H
heqiaozhi 已提交
49
        self.server_.downpour_server_param.service_param.start_server_port = 0
50 51 52 53 54 55 56 57 58
        self.server_.downpour_server_param.service_param.server_class = (
            "DownpourBrpcPsServer"
        )
        self.server_.downpour_server_param.service_param.client_class = (
            "DownpourBrpcPsClient"
        )
        self.server_.downpour_server_param.service_param.service_class = (
            "DownpourPsService"
        )
H
heqiaozhi 已提交
59
        self.server_.downpour_server_param.service_param.server_thread_num = 12
D
dongdaxiang 已提交
60

61 62 63
    def add_sparse_table(
        self, table_id, learning_rate, slot_key_vars, slot_value_var
    ):
64
        r"""
H
heqiaozhi 已提交
65 66 67 68
        Args:
            table_id(int): id of sparse params table
            learning_rate(float): the learning rate used to update parameters. \
                Can be a float value
69
            slot_key_vars(string): slot key id
H
heqiaozhi 已提交
70 71
            slot_value_var(string): slot key value after embedding
        Returns:
72
            return None
H
heqiaozhi 已提交
73
        """
D
dongdaxiang 已提交
74
        table = self.server_.downpour_server_param.downpour_table_param.add()
D
dongdaxiang 已提交
75
        table.table_id = table_id
H
heqiaozhi 已提交
76
        table.table_class = "DownpourSparseTable"
77
        table.type = pslib.PS_SPARSE_TABLE
D
dongdaxiang 已提交
78
        table.accessor.accessor_class = "DownpourFeatureValueAccessor"
H
heqiaozhi 已提交
79 80 81 82
        table.accessor.sparse_sgd_param.learning_rate = learning_rate
        table.accessor.sparse_sgd_param.initial_g2sum = 3
        table.accessor.sparse_sgd_param.initial_range = 1e-4
        table.accessor.sparse_sgd_param.weight_bounds.extend([-10, 10])
H
heqiaozhi 已提交
83

H
heqiaozhi 已提交
84 85
        table.accessor.embedx_dim = 8
        table.accessor.embedx_threshold = 5
H
heqiaozhi 已提交
86
        table.accessor.fea_dim = 11
H
heqiaozhi 已提交
87 88 89 90 91 92 93
        table.accessor.downpour_accessor_param.nonclk_coeff = 0.1
        table.accessor.downpour_accessor_param.click_coeff = 2
        table.accessor.downpour_accessor_param.base_threshold = 0.2
        table.accessor.downpour_accessor_param.delta_threshold = 0.15
        table.accessor.downpour_accessor_param.delta_keep_days = 31
        table.accessor.downpour_accessor_param.show_click_decay_rate = 0.999
        table.accessor.downpour_accessor_param.delete_threshold = 0.8
D
dongdaxiang 已提交
94

H
heqiaozhi 已提交
95
    def add_dense_table(self, table_id, learning_rate, param_var, grad_var):
96
        r"""
H
heqiaozhi 已提交
97 98 99 100 101 102 103
        Args:
            table_id(int): id of sparse params table
            learning_rate(float): the learning rate used to update parameters. \
                Can be a float value
            param_var(list): all dense param. it is a list.
            grad_var(list): all dense grad parm it is a list.
        Returns:
104
            return None
H
heqiaozhi 已提交
105
        """
D
dongdaxiang 已提交
106
        table = self.server_.downpour_server_param.downpour_table_param.add()
D
dongdaxiang 已提交
107
        table.table_id = table_id
H
heqiaozhi 已提交
108
        table.table_class = "DownpourDenseTable"
109
        table.type = pslib.PS_DENSE_TABLE
D
dongdaxiang 已提交
110
        table.accessor.accessor_class = "DownpourDenseValueAccessor"
H
heqiaozhi 已提交
111
        table.accessor.dense_sgd_param.name = "adam"
H
heqiaozhi 已提交
112
        table.accessor.dense_sgd_param.adam.learning_rate = learning_rate
H
heqiaozhi 已提交
113 114
        table.accessor.dense_sgd_param.adam.avg_decay_rate = 0.999993
        table.accessor.dense_sgd_param.adam.ada_decay_rate = 0.9999
H
heqiaozhi 已提交
115 116 117
        table.accessor.dense_sgd_param.adam.ada_epsilon = 1e-8
        table.accessor.dense_sgd_param.adam.mom_decay_rate = 0.99
        table.accessor.dense_sgd_param.naive.learning_rate = 0.0002
118
        fea_dim = 0
119 120 121
        for param in filter(
            lambda x: x.name.find("embedding") == -1, param_var
        ):
122 123
            fea_dim += reduce(lambda x, y: x * y, param.shape, 1)
        table.accessor.fea_dim = fea_dim
D
dongdaxiang 已提交
124 125

    def get_desc(self):
H
heqiaozhi 已提交
126 127 128
        """
        Return downpour server program_desc
        """
D
dongdaxiang 已提交
129 130 131 132
        return self.server_


class DownpourWorker(Worker):
H
heqiaozhi 已提交
133
    """
134 135 136 137 138 139
    DownpourWorker class is used to generate worker program_desc
    Args:
        window (int): push params frequency
        worker: it is pslib.DownpourTrainerParameter
    Examples:
        worker = DownpourWorker(1)
H
heqiaozhi 已提交
140 141
    """

D
dongdaxiang 已提交
142 143
    def __init__(self, window):
        self.window = window
D
dongdaxiang 已提交
144
        self.worker_ = pslib.DownpourTrainerParameter()
D
dongdaxiang 已提交
145

146 147 148
    def add_sparse_table(
        self, table_id, learning_rate, slot_key_vars, slot_value_vars
    ):
149
        r"""
H
heqiaozhi 已提交
150 151 152 153
        Args:
            table_id(int): id of sparse params table
            learning_rate(float): the learning rate used to update parameters. \
                Can be a float value
154
            slot_key_vars(string): slot key id
H
heqiaozhi 已提交
155 156
            slot_value_var(string): slot key value after embedding
        Returns:
157
            return None
H
heqiaozhi 已提交
158
        """
159
        table = self.worker_.sparse_table.add()
D
dongdaxiang 已提交
160
        table.table_id = table_id
H
heqiaozhi 已提交
161 162
        table.slot_key.extend([var.name for var in slot_key_vars])
        table.slot_value.extend([var.name for var in slot_value_vars])
163
        table.slot_gradient.extend(
164 165
            [var.name + "@GRAD" for var in slot_value_vars]
        )
D
dongdaxiang 已提交
166

H
heqiaozhi 已提交
167
    def add_dense_table(self, table_id, learning_rate, param_vars, grad_vars):
168
        r"""
H
heqiaozhi 已提交
169 170 171 172 173 174 175
        Args:
            table_id(int): id of sparse params table
            learning_rate(float): the learning rate used to update parameters. \
                Can be a float value
            param_var(list): all dense param. it is a list.
            grad_var(list): all dense grad parm it is a list.
        Returns:
176
            return None
H
heqiaozhi 已提交
177
        """
178
        table = self.worker_.dense_table.add()
D
dongdaxiang 已提交
179
        table.table_id = table_id
H
heqiaozhi 已提交
180
        table.dense_variable_name.extend(
181 182 183 184 185
            filter(
                lambda x: x.find("embedding") == -1,
                [p.name for p in param_vars],
            )
        )
H
heqiaozhi 已提交
186
        table.dense_gradient_variable_name.extend(
187 188 189 190
            filter(
                lambda x: x.find("embedding") == -1, [g.name for g in grad_vars]
            )
        )
D
dongdaxiang 已提交
191 192

    def get_desc(self):
H
heqiaozhi 已提交
193 194 195
        """
        Return downpour worker program_desc
        """
D
dongdaxiang 已提交
196
        return self.worker_