test_dist_fleet_ps2.py 7.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
16

17 18
os.environ["WITH_DISTRIBUTE"] = "ON"

19 20
import unittest

21
import paddle
22
import paddle.distributed.fleet as fleet
23 24
import paddle.distributed.fleet.base.role_maker as role_maker
import paddle.fluid as fluid
25

P
pangyoki 已提交
26 27
paddle.enable_static()

28 29 30 31 32 33 34 35 36 37 38 39 40 41
# For Net
base_lr = 0.2
emb_lr = base_lr * 3
dict_dim = 1500
emb_dim = 128
hid_dim = 128
margin = 0.1
sample_rate = 1
batch_size = 4


class TestPSPassWithBow(unittest.TestCase):
    def net(self):
        def get_acc(cos_q_nt, cos_q_pt, batch_size):
L
LiYuRio 已提交
42
            cond = paddle.less_than(cos_q_nt, cos_q_pt)
43
            cond = fluid.layers.cast(cond, dtype='float64')
44
            cond_3 = paddle.sum(cond)
45
            acc = paddle.divide(
46 47 48 49 50 51
                cond_3,
                fluid.layers.fill_constant(
                    shape=[1], value=batch_size * 1.0, dtype='float64'
                ),
                name="simnet_acc",
            )
52 53 54
            return acc

        def get_loss(cos_q_pt, cos_q_nt):
55
            loss_op1 = paddle.subtract(
56 57 58 59 60
                fluid.layers.fill_constant_batch_size_like(
                    input=cos_q_pt, shape=[-1, 1], value=margin, dtype='float32'
                ),
                cos_q_pt,
            )
61
            loss_op2 = paddle.add(loss_op1, cos_q_nt)
H
HongyuJia 已提交
62
            loss_op3 = paddle.maximum(
63 64 65 66 67
                fluid.layers.fill_constant_batch_size_like(
                    input=loss_op2, shape=[-1, 1], value=0.0, dtype='float32'
                ),
                loss_op2,
            )
68
            avg_cost = paddle.mean(loss_op3)
69 70 71 72 73 74
            return avg_cost

        is_distributed = False
        is_sparse = True

        # query
G
GGBond8488 已提交
75 76
        q = paddle.static.data(
            name="query_ids", shape=[-1, 1], dtype="int64", lod_level=1
77
        )
78 79 80 81 82
        # embedding
        q_emb = fluid.contrib.layers.sparse_embedding(
            input=q,
            size=[dict_dim, emb_dim],
            param_attr=fluid.ParamAttr(
83
                initializer=paddle.nn.initializer.Constant(value=0.01),
84
                name="__emb__",
85 86 87
                learning_rate=emb_lr,
            ),
        )
88
        q_emb = paddle.reshape(q_emb, [-1, emb_dim])
89
        # vsum
90 91 92
        q_sum = paddle.static.nn.sequence_lod.sequence_pool(
            input=q_emb, pool_type='sum'
        )
93
        q_ss = paddle.nn.functional.softsign(q_sum)
94
        q_ss = paddle.static.nn.data_norm(input=q_ss)
95
        # fc layer after conv
C
Charles-hit 已提交
96 97
        q_fc = paddle.static.nn.fc(
            x=q_ss,
98
            size=hid_dim,
C
Charles-hit 已提交
99
            weight_attr=fluid.ParamAttr(
100
                initializer=paddle.nn.initializer.Constant(value=0.01),
101
                name="__q_fc__",
102 103 104
                learning_rate=base_lr,
            ),
        )
105
        # label data
G
GGBond8488 已提交
106
        label = paddle.static.data(name="label", shape=[-1, 1], dtype="int64")
107
        # pt
G
GGBond8488 已提交
108 109
        pt = paddle.static.data(
            name="pos_title_ids", shape=[-1, 1], dtype="int64", lod_level=1
110
        )
111 112 113 114 115
        # embedding
        pt_emb = fluid.contrib.layers.sparse_embedding(
            input=pt,
            size=[dict_dim, emb_dim],
            param_attr=fluid.ParamAttr(
116
                initializer=paddle.nn.initializer.Constant(value=0.01),
117
                name="__emb__",
118 119 120
                learning_rate=emb_lr,
            ),
        )
121
        pt_emb = paddle.reshape(pt_emb, [-1, emb_dim])
122
        # vsum
123 124 125
        pt_sum = paddle.static.nn.sequence_lod.sequence_pool(
            input=pt_emb, pool_type='sum'
        )
126
        pt_ss = paddle.nn.functional.softsign(pt_sum)
127
        # fc layer
C
Charles-hit 已提交
128 129
        pt_fc = paddle.static.nn.fc(
            x=pt_ss,
130
            size=hid_dim,
C
Charles-hit 已提交
131
            weight_attr=fluid.ParamAttr(
132
                initializer=paddle.nn.initializer.Constant(value=0.01),
133
                name="__fc__",
134 135 136 137
                learning_rate=base_lr,
            ),
            bias_attr=fluid.ParamAttr(name="__fc_b__"),
        )
138
        # nt
G
GGBond8488 已提交
139 140
        nt = paddle.static.data(
            name="neg_title_ids", shape=[-1, 1], dtype="int64", lod_level=1
141
        )
142 143 144 145 146
        # embedding
        nt_emb = fluid.contrib.layers.sparse_embedding(
            input=nt,
            size=[dict_dim, emb_dim],
            param_attr=fluid.ParamAttr(
147
                initializer=paddle.nn.initializer.Constant(value=0.01),
148
                name="__emb__",
149 150 151
                learning_rate=emb_lr,
            ),
        )
152
        nt_emb = paddle.reshape(nt_emb, [-1, emb_dim])
153
        # vsum
154 155 156
        nt_sum = paddle.static.nn.sequence_lod.sequence_pool(
            input=nt_emb, pool_type='sum'
        )
157
        nt_ss = paddle.nn.functional.softsign(nt_sum)
158
        # fc layer
C
Charles-hit 已提交
159 160
        nt_fc = paddle.static.nn.fc(
            x=nt_ss,
161
            size=hid_dim,
C
Charles-hit 已提交
162
            weight_attr=fluid.ParamAttr(
163
                initializer=paddle.nn.initializer.Constant(value=0.01),
164
                name="__fc__",
165 166 167 168
                learning_rate=base_lr,
            ),
            bias_attr=fluid.ParamAttr(name="__fc_b__"),
        )
C
ccrrong 已提交
169 170
        cos_q_pt = paddle.nn.functional.cosine_similarity(q_fc, pt_fc)
        cos_q_nt = paddle.nn.functional.cosine_similarity(q_fc, nt_fc)
171 172 173 174 175 176 177
        # loss
        avg_cost = get_loss(cos_q_pt, cos_q_nt)
        # acc
        acc = get_acc(cos_q_nt, cos_q_pt, batch_size)
        return [avg_cost, acc, cos_q_pt]

    def test(self):
178 179 180 181 182 183
        os.environ["PADDLE_PSERVER_NUMS"] = "2"
        os.environ["PADDLE_TRAINERS_NUM"] = "2"
        os.environ["POD_IP"] = "127.0.0.1"
        os.environ["PADDLE_PORT"] = "36001"
        os.environ["PADDLE_TRAINER_ID"] = "0"
        os.environ["PADDLE_TRAINERS_NUM"] = "2"
T
tangwei12 已提交
184
        os.environ[
185 186
            "PADDLE_PSERVERS_IP_PORT_LIST"
        ] = "127.0.0.1:36001,127.0.0.2:36001"
187 188 189
        os.environ["TRAINING_ROLE"] = "PSERVER"

        role = role_maker.PaddleCloudRoleMaker()
190 191
        fleet.init(role)
        loss, acc, _ = self.net()
192

193
        strategy = paddle.distributed.fleet.DistributedStrategy()
194
        strategy.a_sync = True
195 196 197

        configs = {}
        configs['__emb__'] = {
198 199 200 201
            "table_parameters.__emb__.enable_sparse_table_cache": True,
            "table_parameters.__emb__.shard_merge_rate": 1,
            "table_parameters.__emb__.accessor.embed_sgd_param.name": "SparseNaiveSGDRule",
            "table_parameters.__emb__.accessor.embedx_sgd_param.name": "SparseAdamSGDRule",
202 203
        }
        strategy.sparse_table_configs = configs
M
MRXLT 已提交
204
        optimizer = paddle.fluid.optimizer.SGD(learning_rate=0.01)
205
        optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
206 207 208 209 210 211 212
        optimizer.minimize(loss)

        fleet.init_server()


if __name__ == '__main__':
    unittest.main()