dist_fleet_simnet_bow.py 8.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
import os
16 17
import time

18 19 20
import numpy as np
from test_dist_fleet_base import FleetDistRunnerBase, runtime_main

21 22 23
import paddle
import paddle.fluid as fluid

P
pangyoki 已提交
24 25
paddle.enable_static()

26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
DTYPE = "int64"
DATA_URL = 'http://paddle-dist-ce-data.bj.bcebos.com/simnet.train.1000'
DATA_MD5 = '24e49366eb0611c552667989de2f57d5'

# For Net
base_lr = 0.2
emb_lr = base_lr * 3
dict_dim = 1500
emb_dim = 128
hid_dim = 128
margin = 0.1
sample_rate = 1

# Fix seed for test
fluid.default_startup_program().random_seed = 1
fluid.default_main_program().random_seed = 1


44 45 46 47 48 49 50 51 52 53 54 55
def fake_simnet_reader():
    def reader():
        for _ in range(1000):
            q = np.random.random_integers(0, 1500 - 1, size=1).tolist()
            label = np.random.random_integers(0, 1, size=1).tolist()
            pt = np.random.random_integers(0, 1500 - 1, size=1).tolist()
            nt = np.random.random_integers(0, 1500 - 1, size=1).tolist()
            yield [q, label, pt, nt]

    return reader


56
def get_acc(cos_q_nt, cos_q_pt, batch_size):
L
LiYuRio 已提交
57
    cond = paddle.less_than(cos_q_nt, cos_q_pt)
58
    cond = paddle.cast(cond, dtype='float64')
59
    cond_3 = paddle.sum(cond)
60
    acc = paddle.divide(
61 62 63 64 65 66
        cond_3,
        fluid.layers.fill_constant(
            shape=[1], value=batch_size * 1.0, dtype='float64'
        ),
        name="simnet_acc",
    )
67 68 69 70
    return acc


def get_loss(cos_q_pt, cos_q_nt):
71
    loss_op1 = paddle.subtract(
72 73 74 75 76
        fluid.layers.fill_constant_batch_size_like(
            input=cos_q_pt, shape=[-1, 1], value=margin, dtype='float32'
        ),
        cos_q_pt,
    )
77
    loss_op2 = paddle.add(loss_op1, cos_q_nt)
H
HongyuJia 已提交
78
    loss_op3 = paddle.maximum(
79 80 81 82 83
        fluid.layers.fill_constant_batch_size_like(
            input=loss_op2, shape=[-1, 1], value=0.0, dtype='float32'
        ),
        loss_op2,
    )
84
    avg_cost = paddle.mean(loss_op3)
85 86 87
    return avg_cost


88 89 90 91 92 93 94
def train_network(
    batch_size,
    is_distributed=False,
    is_sparse=False,
    is_self_contained_lr=False,
    is_pyreader=False,
):
95
    # query
G
GGBond8488 已提交
96 97
    q = paddle.static.data(
        name="query_ids", shape=[-1, 1], dtype="int64", lod_level=1
98
    )
99
    # label data
G
GGBond8488 已提交
100
    label = paddle.static.data(name="label", shape=[-1, 1], dtype="int64")
101
    # pt
G
GGBond8488 已提交
102 103
    pt = paddle.static.data(
        name="pos_title_ids", shape=[-1, 1], dtype="int64", lod_level=1
104
    )
105
    # nt
G
GGBond8488 已提交
106 107
    nt = paddle.static.data(
        name="neg_title_ids", shape=[-1, 1], dtype="int64", lod_level=1
108
    )
109 110 111 112 113

    datas = [q, label, pt, nt]

    reader = None
    if is_pyreader:
114 115 116 117 118 119
        reader = fluid.io.PyReader(
            feed_list=datas,
            capacity=64,
            iterable=False,
            use_double_buffer=False,
        )
120

121 122
    # embedding
    q_emb = fluid.embedding(
123 124 125 126
        input=q,
        is_distributed=is_distributed,
        size=[dict_dim, emb_dim],
        param_attr=fluid.ParamAttr(
127 128
            initializer=paddle.nn.initializer.Constant(value=0.01),
            name="__emb__",
129 130 131
        ),
        is_sparse=is_sparse,
    )
132
    q_emb = paddle.reshape(q_emb, [-1, emb_dim])
133
    # vsum
134 135 136
    q_sum = paddle.static.nn.sequence_lod.sequence_pool(
        input=q_emb, pool_type='sum'
    )
137
    q_ss = paddle.nn.functional.softsign(q_sum)
138
    # fc layer after conv
C
Charles-hit 已提交
139 140
    q_fc = paddle.static.nn.fc(
        x=q_ss,
141
        size=hid_dim,
C
Charles-hit 已提交
142
        weight_attr=fluid.ParamAttr(
143
            initializer=paddle.nn.initializer.Constant(value=0.01),
144
            name="__q_fc__",
145 146
            learning_rate=base_lr,
        ),
147
    )
148

149 150
    # embedding
    pt_emb = fluid.embedding(
151 152 153 154
        input=pt,
        is_distributed=is_distributed,
        size=[dict_dim, emb_dim],
        param_attr=fluid.ParamAttr(
155
            initializer=paddle.nn.initializer.Constant(value=0.01),
156
            name="__emb__",
157 158 159 160
            learning_rate=emb_lr,
        ),
        is_sparse=is_sparse,
    )
161
    pt_emb = paddle.reshape(pt_emb, [-1, emb_dim])
162
    # vsum
163 164 165
    pt_sum = paddle.static.nn.sequence_lod.sequence_pool(
        input=pt_emb, pool_type='sum'
    )
166
    pt_ss = paddle.nn.functional.softsign(pt_sum)
167
    # fc layer
C
Charles-hit 已提交
168 169
    pt_fc = paddle.static.nn.fc(
        x=pt_ss,
170
        size=hid_dim,
C
Charles-hit 已提交
171
        weight_attr=fluid.ParamAttr(
172 173
            initializer=paddle.nn.initializer.Constant(value=0.01),
            name="__fc__",
174 175 176
        ),
        bias_attr=fluid.ParamAttr(name="__fc_b__"),
    )
177

178 179
    # embedding
    nt_emb = fluid.embedding(
180 181 182 183
        input=nt,
        is_distributed=is_distributed,
        size=[dict_dim, emb_dim],
        param_attr=fluid.ParamAttr(
184 185
            initializer=paddle.nn.initializer.Constant(value=0.01),
            name="__emb__",
186 187 188
        ),
        is_sparse=is_sparse,
    )
189
    nt_emb = paddle.reshape(nt_emb, [-1, emb_dim])
190
    # vsum
191 192 193
    nt_sum = paddle.static.nn.sequence_lod.sequence_pool(
        input=nt_emb, pool_type='sum'
    )
194
    nt_ss = paddle.nn.functional.softsign(nt_sum)
195
    # fc layer
C
Charles-hit 已提交
196 197
    nt_fc = paddle.static.nn.fc(
        x=nt_ss,
198
        size=hid_dim,
C
Charles-hit 已提交
199
        weight_attr=fluid.ParamAttr(
200 201
            initializer=paddle.nn.initializer.Constant(value=0.01),
            name="__fc__",
202 203 204
        ),
        bias_attr=fluid.ParamAttr(name="__fc_b__"),
    )
C
ccrrong 已提交
205 206
    cos_q_pt = paddle.nn.functional.cosine_similarity(q_fc, pt_fc)
    cos_q_nt = paddle.nn.functional.cosine_similarity(q_fc, nt_fc)
207 208 209 210
    # loss
    avg_cost = get_loss(cos_q_pt, cos_q_nt)
    # acc
    acc = get_acc(cos_q_nt, cos_q_pt, batch_size)
211 212 213 214 215 216 217 218 219
    return avg_cost, acc, cos_q_pt, reader


class TestDistSimnetBow2x2(FleetDistRunnerBase):
    """
    For test SimnetBow model, use Fleet api
    """

    def net(self, args, batch_size=4, lr=0.01):
220 221 222 223 224 225 226
        avg_cost, _, predict, self.reader = train_network(
            batch_size=batch_size,
            is_distributed=False,
            is_sparse=True,
            is_self_contained_lr=False,
            is_pyreader=(args.reader == "pyreader"),
        )
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250
        self.avg_cost = avg_cost
        self.predict = predict

        return avg_cost

    def check_model_right(self, dirname):
        model_filename = os.path.join(dirname, "__model__")

        with open(model_filename, "rb") as f:
            program_desc_str = f.read()

        program = fluid.Program.parse_from_string(program_desc_str)
        with open(os.path.join(dirname, "__model__.proto"), "w") as wn:
            wn.write(str(program))

    def do_pyreader_training(self, fleet):
        """
        do training using dataset, using fetch handler to catch variable
        Args:
            fleet(Fleet api): the fleet object of Parameter Server, define distribute training role
        """

        exe = fluid.Executor(fluid.CPUPlace())
        exe.run(fluid.default_startup_program())
T
tangwei12 已提交
251
        fleet.init_worker()
252 253 254 255 256 257 258 259 260
        batch_size = 4
        # reader
        train_reader = paddle.batch(fake_simnet_reader(), batch_size=batch_size)
        self.reader.decorate_sample_list_generator(train_reader)
        for epoch_id in range(1):
            self.reader.start()
            try:
                pass_start = time.time()
                while True:
261 262 263 264
                    loss_val = exe.run(
                        program=fluid.default_main_program(),
                        fetch_list=[self.avg_cost.name],
                    )
265
                    loss_val = np.mean(loss_val)
266
                    message = "TRAIN ---> pass: {} loss: {}\n".format(
267 268
                        epoch_id, loss_val
                    )
269
                    fleet.util.print_on_rank(message, 0)
270 271 272 273 274 275 276

                pass_time = time.time() - pass_start
            except fluid.core.EOFException:
                self.reader.reset()

    def do_dataset_training(self, fleet):
        pass
277 278 279 280


if __name__ == "__main__":
    runtime_main(TestDistSimnetBow2x2)