node.py 4.3 KB
Newer Older
D
dongdaxiang 已提交
1
import ps_pb2 as pslib
D
dongdaxiang 已提交
2 3 4 5 6 7 8 9 10 11 12 13 14

class Server(object):
    def __init__(self):
        pass


class Worker(object):
    def __init__(self):
        pass


class DownpourServer(Server):
    def __init__(self):
D
dongdaxiang 已提交
15
        self.server_ = pslib.ServerParameter()
H
heqiaozhi 已提交
16 17 18 19 20 21
        self.server_.downpour_server_param.service_param.start_server_port = 0
        self.server_.downpour_server_param.service_param.server_class = "DownpourBrpcPsServer"
        self.server_.downpour_server_param.service_param.client_class = "DownpourBrpcPsClient"
        self.server_.downpour_server_param.service_param.service_class = "DownpourPsService"
        self.server_.downpour_server_param.service_param.start_server_port = 0
        self.server_.downpour_server_param.service_param.server_thread_num = 12
D
dongdaxiang 已提交
22 23

    def add_sparse_table(self, table_id, learning_rate,
24
                         slot_key_vars, slot_value_var):
D
dongdaxiang 已提交
25
        table = self.server_.downpour_server_param.downpour_table_param.add()
D
dongdaxiang 已提交
26
        table.table_id = table_id
H
heqiaozhi 已提交
27
        table.table_class = "DownpourSparseTable"
28
        table.type = pslib.PS_SPARSE_TABLE
D
dongdaxiang 已提交
29
        table.accessor.accessor_class = "DownpourFeatureValueAccessor"
H
heqiaozhi 已提交
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
        table.accessor.sparse_sgd_param.learning_rate = learning_rate
        table.accessor.sparse_sgd_param.initial_g2sum = 3
        table.accessor.sparse_sgd_param.initial_range = 1e-4
        table.accessor.sparse_sgd_param.weight_bounds.extend([-10, 10])
        
        table.accessor.embedx_dim = 8
        table.accessor.embedx_threshold = 5
        table.accessor.fea_dim = 11 
        #table.accessor.fea_dim = abs(reduce(lambda x, y: x * y, 
        #                                    slot_value_var[0].shape, 1))
        table.accessor.downpour_accessor_param.nonclk_coeff = 0.1
        table.accessor.downpour_accessor_param.click_coeff = 2
        table.accessor.downpour_accessor_param.base_threshold = 0.2
        table.accessor.downpour_accessor_param.delta_threshold = 0.15
        table.accessor.downpour_accessor_param.delta_keep_days = 31
        table.accessor.downpour_accessor_param.show_click_decay_rate = 0.999
        table.accessor.downpour_accessor_param.delete_threshold = 0.8
D
dongdaxiang 已提交
47 48 49

    def add_dense_table(self, table_id, learning_rate, 
                        param_var, grad_var):
D
dongdaxiang 已提交
50
        table = self.server_.downpour_server_param.downpour_table_param.add()
D
dongdaxiang 已提交
51
        table.table_id = table_id
H
heqiaozhi 已提交
52
        table.table_class = "DownpourDenseTable"
53
        table.type = pslib.PS_DENSE_TABLE
D
dongdaxiang 已提交
54
        table.accessor.accessor_class = "DownpourDenseValueAccessor"
H
heqiaozhi 已提交
55 56 57 58 59 60 61
        table.accessor.dense_sgd_param.name = "adam" 
        table.accessor.dense_sgd_param.adam.learning_rate = learning_rate
        table.accessor.dense_sgd_param.adam.avg_decay_rate = 0.999993 
        table.accessor.dense_sgd_param.adam.ada_decay_rate = 0.9999 
        table.accessor.dense_sgd_param.adam.ada_epsilon = 1e-8
        table.accessor.dense_sgd_param.adam.mom_decay_rate = 0.99
        table.accessor.dense_sgd_param.naive.learning_rate = 0.0002
62 63 64 65
        fea_dim = 0
        for param in param_var:
            fea_dim += reduce(lambda x, y: x * y, param.shape, 1)
        table.accessor.fea_dim = fea_dim
D
dongdaxiang 已提交
66 67 68 69 70 71 72 73

    def get_desc(self):
        return self.server_


class DownpourWorker(Worker):
    def __init__(self, window):
        self.window = window
D
dongdaxiang 已提交
74
        self.worker_ = pslib.DownpourTrainerParameter()
H
heqiaozhi 已提交
75 76
        #self.worker_.pull_dense_per_batch = window
        #self.worker_.push_dense_per_batch = window
D
dongdaxiang 已提交
77

78 79 80
    def add_sparse_table(self, table_id, learning_rate,
                         slot_key_vars, slot_value_vars):
        table = self.worker_.sparse_table.add()
D
dongdaxiang 已提交
81
        table.table_id = table_id
82 83 84 85 86 87
        table.slot_key.extend(
            [var.name for var in slot_key_vars])
        table.slot_value.extend(
            [var.name for var in slot_value_vars])
        table.slot_gradient.extend(
            [var.name + "@GRAD" for var in slot_value_vars])
D
dongdaxiang 已提交
88

89 90 91
    def add_dense_table(self, table_id, learning_rate, 
                        param_vars, grad_vars):
        table = self.worker_.dense_table.add()
D
dongdaxiang 已提交
92
        table.table_id = table_id
H
heqiaozhi 已提交
93 94
        table.dense_variable_name.extend(filter(lambda x: x.find("embedding") == -1, [p.name for p in param_vars]))
        table.dense_gradient_variable_name.extend(filter(lambda x: x.find("embedding") == -1, [g.name for g in grad_vars]))
D
dongdaxiang 已提交
95 96 97

    def get_desc(self):
        return self.worker_