test_dist_fleet_ps11.py 8.1 KB
Newer Older
T
Thunderbrook 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import unittest

import paddle
import paddle.distributed.fleet as fleet
20 21
import paddle.distributed.fleet.base.role_maker as role_maker
import paddle.fluid as fluid
T
Thunderbrook 已提交
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38

paddle.enable_static()

# For Net
base_lr = 0.2
emb_lr = base_lr * 3
dict_dim = 1500
emb_dim = 128
hid_dim = 128
margin = 0.1
sample_rate = 1
batch_size = 4


class TestPSPassWithBow(unittest.TestCase):
    def net(self):
        def get_acc(cos_q_nt, cos_q_pt, batch_size):
L
LiYuRio 已提交
39
            cond = paddle.less_than(cos_q_nt, cos_q_pt)
T
Thunderbrook 已提交
40
            cond = fluid.layers.cast(cond, dtype='float64')
41
            cond_3 = paddle.sum(cond)
42
            acc = paddle.divide(
43 44 45 46 47 48
                cond_3,
                fluid.layers.fill_constant(
                    shape=[1], value=batch_size * 1.0, dtype='float64'
                ),
                name="simnet_acc",
            )
T
Thunderbrook 已提交
49 50 51
            return acc

        def get_loss(cos_q_pt, cos_q_nt):
52
            loss_op1 = paddle.subtract(
53 54 55 56 57
                fluid.layers.fill_constant_batch_size_like(
                    input=cos_q_pt, shape=[-1, 1], value=margin, dtype='float32'
                ),
                cos_q_pt,
            )
58
            loss_op2 = paddle.add(loss_op1, cos_q_nt)
H
HongyuJia 已提交
59
            loss_op3 = paddle.maximum(
60 61 62 63 64
                fluid.layers.fill_constant_batch_size_like(
                    input=loss_op2, shape=[-1, 1], value=0.0, dtype='float32'
                ),
                loss_op2,
            )
65
            avg_cost = paddle.mean(loss_op3)
T
Thunderbrook 已提交
66 67 68 69 70 71
            return avg_cost

        is_distributed = False
        is_sparse = True

        # query
G
GGBond8488 已提交
72 73 74
        q = paddle.static.data(
            name="1", shape=[-1, 1], dtype="int64", lod_level=1
        )
T
Thunderbrook 已提交
75 76 77 78 79
        # embedding
        q_emb = fluid.contrib.layers.sparse_embedding(
            input=q,
            size=[dict_dim, emb_dim],
            param_attr=fluid.ParamAttr(
80
                initializer=paddle.nn.initializer.Constant(value=0.01),
T
Thunderbrook 已提交
81
                name="__emb__",
82 83 84
                learning_rate=emb_lr,
            ),
        )
85
        q_emb = paddle.reshape(q_emb, [-1, emb_dim])
T
Thunderbrook 已提交
86
        # vsum
87 88 89
        q_sum = paddle.static.nn.sequence_lod.sequence_pool(
            input=q_emb, pool_type='sum'
        )
90
        q_ss = paddle.nn.functional.softsign(q_sum)
T
Thunderbrook 已提交
91
        # fc layer after conv
C
Charles-hit 已提交
92 93
        q_fc = paddle.static.nn.fc(
            x=q_ss,
T
Thunderbrook 已提交
94
            size=hid_dim,
C
Charles-hit 已提交
95
            weight_attr=fluid.ParamAttr(
96
                initializer=paddle.nn.initializer.Constant(value=0.01),
T
Thunderbrook 已提交
97
                name="__q_fc__",
98 99 100
                learning_rate=base_lr,
            ),
        )
T
Thunderbrook 已提交
101
        # label data
G
GGBond8488 已提交
102
        label = paddle.static.data(name="label", shape=[-1, 1], dtype="int64")
T
Thunderbrook 已提交
103
        # pt
G
GGBond8488 已提交
104 105 106
        pt = paddle.static.data(
            name="2", shape=[-1, 1], dtype="int64", lod_level=1
        )
T
Thunderbrook 已提交
107 108 109 110 111
        # embedding
        pt_emb = fluid.contrib.layers.sparse_embedding(
            input=pt,
            size=[dict_dim, emb_dim],
            param_attr=fluid.ParamAttr(
112
                initializer=paddle.nn.initializer.Constant(value=0.01),
T
Thunderbrook 已提交
113
                name="__emb__",
114 115 116
                learning_rate=emb_lr,
            ),
        )
117
        pt_emb = paddle.reshape(pt_emb, [-1, emb_dim])
T
Thunderbrook 已提交
118
        # vsum
119 120 121
        pt_sum = paddle.static.nn.sequence_lod.sequence_pool(
            input=pt_emb, pool_type='sum'
        )
122
        pt_ss = paddle.nn.functional.softsign(pt_sum)
T
Thunderbrook 已提交
123
        # fc layer
C
Charles-hit 已提交
124 125
        pt_fc = paddle.static.nn.fc(
            x=pt_ss,
T
Thunderbrook 已提交
126
            size=hid_dim,
C
Charles-hit 已提交
127
            weight_attr=fluid.ParamAttr(
128
                initializer=paddle.nn.initializer.Constant(value=0.01),
T
Thunderbrook 已提交
129
                name="__fc__",
130 131 132 133
                learning_rate=base_lr,
            ),
            bias_attr=fluid.ParamAttr(name="__fc_b__"),
        )
T
Thunderbrook 已提交
134
        # nt
G
GGBond8488 已提交
135 136 137
        nt = paddle.static.data(
            name="3", shape=[-1, 1], dtype="int64", lod_level=1
        )
T
Thunderbrook 已提交
138 139 140 141 142
        # embedding
        nt_emb = fluid.contrib.layers.sparse_embedding(
            input=nt,
            size=[dict_dim, emb_dim],
            param_attr=fluid.ParamAttr(
143
                initializer=paddle.nn.initializer.Constant(value=0.01),
T
Thunderbrook 已提交
144
                name="__emb__",
145 146 147
                learning_rate=emb_lr,
            ),
        )
148
        nt_emb = paddle.reshape(nt_emb, [-1, emb_dim])
T
Thunderbrook 已提交
149
        # vsum
150 151 152
        nt_sum = paddle.static.nn.sequence_lod.sequence_pool(
            input=nt_emb, pool_type='sum'
        )
153
        nt_ss = paddle.nn.functional.softsign(nt_sum)
T
Thunderbrook 已提交
154
        # fc layer
C
Charles-hit 已提交
155 156
        nt_fc = paddle.static.nn.fc(
            x=nt_ss,
T
Thunderbrook 已提交
157
            size=hid_dim,
C
Charles-hit 已提交
158
            weight_attr=fluid.ParamAttr(
159
                initializer=paddle.nn.initializer.Constant(value=0.01),
T
Thunderbrook 已提交
160
                name="__fc__",
161 162 163 164
                learning_rate=base_lr,
            ),
            bias_attr=fluid.ParamAttr(name="__fc_b__"),
        )
C
ccrrong 已提交
165 166
        cos_q_pt = paddle.nn.functional.cosine_similarity(q_fc, pt_fc)
        cos_q_nt = paddle.nn.functional.cosine_similarity(q_fc, nt_fc)
T
Thunderbrook 已提交
167 168 169 170 171 172 173 174 175 176 177 178 179 180
        # loss
        avg_cost = get_loss(cos_q_pt, cos_q_nt)
        # acc
        acc = get_acc(cos_q_nt, cos_q_pt, batch_size)
        return [avg_cost, acc, cos_q_pt]

    def test(self):
        os.environ["PADDLE_PSERVER_NUMS"] = "2"
        os.environ["PADDLE_TRAINERS_NUM"] = "2"
        os.environ["POD_IP"] = "127.0.0.1"
        os.environ["PADDLE_PORT"] = "36001"
        os.environ["PADDLE_TRAINER_ID"] = "0"
        os.environ["PADDLE_TRAINERS_NUM"] = "2"
        os.environ[
181 182
            "PADDLE_TRAINER_ENDPOINTS"
        ] = "127.0.0.1:36001,127.0.0.2:36001"
T
Thunderbrook 已提交
183
        os.environ[
184 185
            "PADDLE_PSERVERS_IP_PORT_LIST"
        ] = "127.0.0.1:36002,127.0.0.2:36002"
T
Thunderbrook 已提交
186 187 188 189 190 191 192 193 194 195 196 197 198 199
        os.environ["TRAINING_ROLE"] = "TRAINER"
        os.environ["FLAGS_selected_gpus"] = "0"
        role = role_maker.PaddleCloudRoleMaker()
        fleet.init(role)
        loss, acc, _ = self.net()

        strategy = paddle.distributed.fleet.DistributedStrategy()
        configs = {"use_ps_gpu": 1, "launch_barrier": False}
        strategy.a_sync_configs = configs
        strategy.a_sync = True
        optimizer = paddle.fluid.optimizer.Adam(learning_rate=0.01)
        optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
        optimizer.minimize(loss)

200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
    def test_gpups_dataset(self):
        """
        Testcase for GPUPS InMemoryDataset .
        """
        with open("test_in_memory_dataset_run_a.txt", "w") as f:
            data = "1 1 2 3 3 4 5 5 5 5 1 1\n"
            data += "1 2 2 3 4 4 6 6 6 6 1 2\n"
            data += "1 3 2 3 5 4 7 7 7 7 1 3\n"
            f.write(data)
        with open("test_in_memory_dataset_run_b.txt", "w") as f:
            data = "1 4 2 3 3 4 5 5 5 5 1 4\n"
            data += "1 5 2 3 4 4 6 6 6 6 1 5\n"
            data += "1 6 2 3 5 4 7 7 7 7 1 6\n"
            data += "1 7 2 3 6 4 8 8 8 8 1 7\n"
            f.write(data)

        slots = ["slot1", "slot2", "slot3", "slot4"]
        slots_vars = []
        for slot in slots:
G
GGBond8488 已提交
219 220
            var = paddle.static.data(
                name=slot, shape=[-1, 1], dtype="int64", lod_level=1
221
            )
222 223 224 225
            slots_vars.append(var)

        dataset = paddle.distributed.InMemoryDataset()
        dataset._set_use_ps_gpu(True)
226 227 228 229 230 231 232 233 234
        dataset.init(
            batch_size=32, thread_num=3, pipe_command="cat", use_var=slots_vars
        )
        dataset.set_filelist(
            [
                "test_in_memory_dataset_run_a.txt",
                "test_in_memory_dataset_run_b.txt",
            ]
        )
235 236 237 238

        os.remove("./test_in_memory_dataset_run_a.txt")
        os.remove("./test_in_memory_dataset_run_b.txt")

T
Thunderbrook 已提交
239 240 241

if __name__ == '__main__':
    unittest.main()