node.py 9.5 KB
Newer Older
D
dongdaxiang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and

import ps_pb2 as pslib


class Server(object):
    """
        A Server basic class.
    """

    def __init__(self):
        pass


class Worker(object):
    """
        A Worker basic class.
    """

    def __init__(self):
        pass


class DownpourServer(Server):
    """
        DownpourServer class is used to generate server program_desc
        Args:
            server: it is pslib.ServerParameter() 
        Examples:
            server = DownpourServer()
    """

    def __init__(self):
D
dongdaxiang 已提交
45 46 47 48 49 50 51
        self._server = pslib.ServerParameter()
        self._server.downpour_server_param.service_param.start_server_port = 0
        self._server.downpour_server_param.service_param.server_class = "DownpourBrpcPsServer"
        self._server.downpour_server_param.service_param.client_class = "DownpourBrpcPsClient"
        self._server.downpour_server_param.service_param.service_class = "DownpourPsService"
        self._server.downpour_server_param.service_param.start_server_port = 0
        self._server.downpour_server_param.service_param.server_thread_num = 12
D
dongdaxiang 已提交
52 53 54 55 56 57 58 59 60 61 62 63 64

    def add_sparse_table(self, table_id, learning_rate, slot_key_vars,
                         slot_value_var):
        """
        Args:
            table_id(int): id of sparse params table
            learning_rate(float): the learning rate used to update parameters. \
                Can be a float value
            slot_key_vars(string): slot key id 
            slot_value_var(string): slot key value after embedding
        Returns:
            return None 
        """
65 66 67 68 69 70 71
        for table in self._server.downpour_server_param.downpour_table_param:
            if table.table_id == table_id:
                if table.type == pslib.PS_SPARSE_TABLE:
                    return
                else:
                    raise ValueError("expect table %s type=%s, but actual type=%s" \
                        %(table_id, pslib.PS_SPARSE_TABLE, table.type))
D
dongdaxiang 已提交
72
        table = self._server.downpour_server_param.downpour_table_param.add()
D
dongdaxiang 已提交
73 74 75
        table.table_id = table_id
        table.table_class = "DownpourSparseTable"
        table.type = pslib.PS_SPARSE_TABLE
76
        table.compress_in_save = True
77
        table.shard_num = 1000
T
Thunderbrook 已提交
78
        table.accessor.accessor_class = "DownpourCtrAccessor"
D
dongdaxiang 已提交
79 80 81 82 83 84 85 86 87 88 89 90
        table.accessor.sparse_sgd_param.learning_rate = learning_rate
        table.accessor.sparse_sgd_param.initial_g2sum = 3
        table.accessor.sparse_sgd_param.initial_range = 1e-4
        table.accessor.sparse_sgd_param.weight_bounds.extend([-10, 10])

        table.accessor.embedx_dim = 8
        table.accessor.embedx_threshold = 5
        table.accessor.fea_dim = 11
        table.accessor.downpour_accessor_param.nonclk_coeff = 0.1
        table.accessor.downpour_accessor_param.click_coeff = 2
        table.accessor.downpour_accessor_param.base_threshold = 0.2
        table.accessor.downpour_accessor_param.delta_threshold = 0.15
T
Thunderbrook 已提交
91 92
        table.accessor.downpour_accessor_param.delta_keep_days = 16
        table.accessor.downpour_accessor_param.delete_after_unseen_days = 30
D
dongdaxiang 已提交
93 94 95 96 97 98 99 100 101 102 103 104 105 106
        table.accessor.downpour_accessor_param.show_click_decay_rate = 0.999
        table.accessor.downpour_accessor_param.delete_threshold = 0.8

    def add_dense_table(self, table_id, learning_rate, param_var, grad_var):
        """
        Args:
            table_id(int): id of sparse params table
            learning_rate(float): the learning rate used to update parameters. \
                Can be a float value
            param_var(list): all dense param. it is a list.
            grad_var(list): all dense grad parm it is a list.
        Returns:
            return None 
        """
107 108 109 110 111 112 113 114 115 116 117 118 119
        fea_dim = 0
        for param in filter(lambda x: x.name.find("embedding") == -1,
                            param_var):
            fea_dim += reduce(lambda x, y: x * y, param.shape, 1)

        for table in self._server.downpour_server_param.downpour_table_param:
            if table.table_id == table_id:
                if table.type == pslib.PS_DENSE_TABLE:
                    table.accessor.fea_dim = fea_dim
                    return
                else:
                    raise ValueError("expect table %s type=%s, but actual type=%s" \
                        %(table_id, pslib.PS_DENSE_TABLE, table.type))
T
tangwei12 已提交
120
        table = self._server.downpour_server_param.downpour_table_param.add()
D
dongdaxiang 已提交
121 122 123
        table.table_id = table_id
        table.table_class = "DownpourDenseTable"
        table.type = pslib.PS_DENSE_TABLE
124
        table.compress_in_save = True
D
dongdaxiang 已提交
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
        table.accessor.accessor_class = "DownpourDenseValueAccessor"
        table.accessor.dense_sgd_param.name = "adam"
        table.accessor.dense_sgd_param.adam.learning_rate = learning_rate
        table.accessor.dense_sgd_param.adam.avg_decay_rate = 0.999993
        table.accessor.dense_sgd_param.adam.ada_decay_rate = 0.9999
        table.accessor.dense_sgd_param.adam.ada_epsilon = 1e-8
        table.accessor.dense_sgd_param.adam.mom_decay_rate = 0.99
        table.accessor.dense_sgd_param.naive.learning_rate = 0.0002
        table.accessor.fea_dim = fea_dim

    def add_data_norm_table(self, table_id, learning_rate, param_var, grad_var):
        """
        Args:
            table_id(int): id of sparse params table
            learning_rate(float): the learning rate used to update parameters. \
                Can be a float value
            param_var(list): all dense param. it is a list.
            grad_var(list): all dense grad parm it is a list.
        Returns:
            return None 
        """
146 147 148 149 150 151 152 153 154 155 156 157 158
        fea_dim = 0
        for param in filter(lambda x: x.name.find("embedding") == -1,
                            param_var):
            fea_dim += reduce(lambda x, y: x * y, param.shape, 1)

        for table in self._server.downpour_server_param.downpour_table_param:
            if table.table_id == table_id:
                if table.type == pslib.PS_DENSE_TABLE:
                    table.accessor.fea_dim = fea_dim
                    return
                else:
                    raise ValueError("expect table %s type=%s, but actual type=%s" \
                        %(table_id, pslib.PS_DENSE_TABLE, table.type))
D
dongdaxiang 已提交
159
        table = self._server.downpour_server_param.downpour_table_param.add()
D
dongdaxiang 已提交
160 161 162
        table.table_id = table_id
        table.table_class = "DownpourDenseTable"
        table.type = pslib.PS_DENSE_TABLE
163
        table.compress_in_save = True
D
dongdaxiang 已提交
164 165 166 167 168 169 170 171 172
        table.accessor.accessor_class = "DownpourDenseValueAccessor"
        table.accessor.dense_sgd_param.name = "summary"
        table.accessor.dense_sgd_param.summary.summary_decay_rate = 0.999999
        table.accessor.fea_dim = fea_dim

    def get_desc(self):
        """
        Return downpour server program_desc
        """
D
dongdaxiang 已提交
173
        return self._server
D
dongdaxiang 已提交
174 175 176 177 178 179 180 181 182 183 184 185 186 187


class DownpourWorker(Worker):
    """
        DownpourWorker class is used to generate worker program_desc
        Args:
            window (int): push params frequency
            worker: it is pslib.DownpourTrainerParameter 
        Examples:
            worker = DownpourWorker(1)
    """

    def __init__(self, window):
        self.window = window
D
dongdaxiang 已提交
188
        self._worker = pslib.DownpourTrainerParameter()
D
dongdaxiang 已提交
189 190 191 192 193 194 195 196 197 198 199 200 201

    def add_sparse_table(self, table_id, learning_rate, slot_key_vars,
                         slot_value_vars):
        """
        Args:
            table_id(int): id of sparse params table
            learning_rate(float): the learning rate used to update parameters. \
                Can be a float value
            slot_key_vars(string): slot key id 
            slot_value_var(string): slot key value after embedding
        Returns:
            return None 
        """
202 203 204
        for table in self._worker.sparse_table:
            if table.table_id == table_id:
                return
T
tangwei12 已提交
205
        table = self._worker.sparse_table.add()
D
dongdaxiang 已提交
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
        table.table_id = table_id
        table.slot_key.extend([var.name for var in slot_key_vars])
        table.slot_value.extend([var.name for var in slot_value_vars])
        table.slot_gradient.extend(
            [var.name + "@GRAD" for var in slot_value_vars])

    def add_dense_table(self, table_id, learning_rate, param_vars, grad_vars):
        """
        Args:
            table_id(int): id of sparse params table
            learning_rate(float): the learning rate used to update parameters. \
                Can be a float value
            param_var(list): all dense param. it is a list.
            grad_var(list): all dense grad parm it is a list.
        Returns:
            return None 
        """
223 224 225
        for table in self._worker.dense_table:
            if table.table_id == table_id:
                return
D
dongdaxiang 已提交
226
        table = self._worker.dense_table.add()
D
dongdaxiang 已提交
227 228 229 230 231 232 233 234 235 236 237 238
        table.table_id = table_id
        table.dense_variable_name.extend(
            filter(lambda x: x.find("embedding") == -1,
                   [p.name for p in param_vars]))
        table.dense_gradient_variable_name.extend(
            filter(lambda x: x.find("embedding") == -1,
                   [g.name for g in grad_vars]))

    def get_desc(self):
        """
        Return downpour worker program_desc
        """
D
dongdaxiang 已提交
239
        return self._worker