dist_fleet_ctr.py 13.1 KB
Newer Older
T
tangwei12 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
14 15 16
"""
Distribute CTR model for test fleet api
"""
T
tangwei12 已提交
17

18
import os
T
tangwei12 已提交
19 20 21 22
import shutil
import tempfile
import time

23
import ctr_dataset_reader
1
123malin 已提交
24
import numpy as np
25
from test_dist_fleet_base import FleetDistRunnerBase, runtime_main
T
tangwei12 已提交
26

27
import paddle
28
from paddle import fluid
T
tangwei12 已提交
29

P
pangyoki 已提交
30 31
paddle.enable_static()

T
tangwei12 已提交
32 33 34 35 36
# Fix seed for test
fluid.default_startup_program().random_seed = 1
fluid.default_main_program().random_seed = 1


37 38 39 40 41 42 43 44 45 46 47
def fake_ctr_reader():
    def reader():
        for _ in range(1000):
            deep = np.random.random_integers(0, 1e5 - 1, size=16).tolist()
            wide = np.random.random_integers(0, 1e5 - 1, size=8).tolist()
            label = np.random.random_integers(0, 1, size=1).tolist()
            yield [deep, wide, label]

    return reader


T
tangwei12 已提交
48
class TestDistCTR2x2(FleetDistRunnerBase):
49 50 51 52
    """
    For test CTR model, using Fleet api
    """

T
tangwei12 已提交
53
    def net(self, args, is_train=True, batch_size=4, lr=0.01):
54 55 56 57 58 59 60 61 62
        """
        network definition

        Args:
            batch_size(int): the size of mini-batch for training
            lr(float): learning rate of training
        Returns:
            avg_cost: LoDTensor of cost.
        """
63 64
        dnn_input_dim, lr_input_dim = int(1e5), int(1e5)

G
GGBond8488 已提交
65
        dnn_data = paddle.static.data(
66 67 68 69 70
            name="dnn_data",
            shape=[-1, 1],
            dtype="int64",
            lod_level=1,
        )
G
GGBond8488 已提交
71
        lr_data = paddle.static.data(
72 73 74 75 76
            name="lr_data",
            shape=[-1, 1],
            dtype="int64",
            lod_level=1,
        )
G
GGBond8488 已提交
77
        label = paddle.static.data(
78 79 80 81 82
            name="click",
            shape=[-1, 1],
            dtype="int64",
            lod_level=0,
        )
T
tangwei12 已提交
83 84 85

        datas = [dnn_data, lr_data, label]

86
        if args.reader == "pyreader":
T
tangwei12 已提交
87
            if is_train:
88 89 90 91 92 93
                self.reader = fluid.io.PyReader(
                    feed_list=datas,
                    capacity=64,
                    iterable=False,
                    use_double_buffer=False,
                )
T
tangwei12 已提交
94
            else:
95 96 97 98 99 100 101 102
                self.test_reader = fluid.io.PyReader(
                    feed_list=datas,
                    capacity=64,
                    iterable=False,
                    use_double_buffer=False,
                )

        # build dnn model
C
Chengmo 已提交
103
        dnn_layer_dims = [128, 128, 64, 32, 1]
T
tangwei12 已提交
104 105 106 107 108 109
        dnn_embedding = fluid.layers.embedding(
            is_distributed=False,
            input=dnn_data,
            size=[dnn_input_dim, dnn_layer_dims[0]],
            param_attr=fluid.ParamAttr(
                name="deep_embedding",
110
                initializer=paddle.nn.initializer.Constant(value=0.01),
111
            ),
1
123malin 已提交
112
            is_sparse=True,
113 114
            padding_idx=0,
        )
115
        dnn_pool = paddle.static.nn.sequence_lod.sequence_pool(
116 117
            input=dnn_embedding, pool_type="sum"
        )
T
tangwei12 已提交
118 119
        dnn_out = dnn_pool
        for i, dim in enumerate(dnn_layer_dims[1:]):
C
Charles-hit 已提交
120 121
            fc = paddle.static.nn.fc(
                x=dnn_out,
T
tangwei12 已提交
122
                size=dim,
C
Charles-hit 已提交
123 124
                activation="relu",
                weight_attr=fluid.ParamAttr(
125
                    initializer=paddle.nn.initializer.Constant(value=0.01)
126 127 128
                ),
                name='dnn-fc-%d' % i,
            )
T
tangwei12 已提交
129 130 131
            dnn_out = fc

        # build lr model
C
co63oc 已提交
132
        lr_embedding = fluid.layers.embedding(
T
tangwei12 已提交
133 134 135 136 137
            is_distributed=False,
            input=lr_data,
            size=[lr_input_dim, 1],
            param_attr=fluid.ParamAttr(
                name="wide_embedding",
138
                initializer=paddle.nn.initializer.Constant(value=0.01),
139
            ),
1
123malin 已提交
140
            is_sparse=True,
141 142
            padding_idx=0,
        )
143
        lr_pool = paddle.static.nn.sequence_lod.sequence_pool(
C
co63oc 已提交
144
            input=lr_embedding, pool_type="sum"
145
        )
T
tangwei12 已提交
146

147
        merge_layer = paddle.concat([dnn_out, lr_pool], axis=1)
T
tangwei12 已提交
148

C
Charles-hit 已提交
149 150 151
        predict = paddle.static.nn.fc(
            x=merge_layer, size=2, activation='softmax'
        )
152
        acc = paddle.static.accuracy(input=predict, label=label)
153

154
        auc_var, batch_auc_var, auc_states = paddle.static.auc(
155 156
            input=predict, label=label
        )
157

158 159 160
        cost = paddle.nn.functional.cross_entropy(
            input=predict, label=label, reduction='none', use_softmax=False
        )
161
        avg_cost = paddle.mean(x=cost)
T
tangwei12 已提交
162 163

        self.feeds = datas
164
        self.train_file_path = ["fake1", "fake2"]
T
tangwei12 已提交
165 166 167 168 169 170
        self.avg_cost = avg_cost
        self.predict = predict

        return avg_cost

    def check_model_right(self, dirname):
171
        dirname = dirname + '/dnn_plugin/'
T
tangwei12 已提交
172 173 174 175 176 177 178 179 180
        model_filename = os.path.join(dirname, "__model__")

        with open(model_filename, "rb") as f:
            program_desc_str = f.read()

        program = fluid.Program.parse_from_string(program_desc_str)
        with open(os.path.join(dirname, "__model__.proto"), "w") as wn:
            wn.write(str(program))

181
    def do_distributed_testing(self, fleet):
T
tangwei12 已提交
182 183 184
        """
        do distributed
        """
185
        exe = self.get_executor()
T
tangwei12 已提交
186 187 188 189 190 191 192 193 194 195 196 197

        batch_size = 4
        test_reader = paddle.batch(fake_ctr_reader(), batch_size=batch_size)
        self.test_reader.decorate_sample_list_generator(test_reader)

        pass_start = time.time()
        batch_idx = 0

        self.test_reader.start()
        try:
            while True:
                batch_idx += 1
198 199 200 201
                loss_val = exe.run(
                    program=paddle.static.default_main_program(),
                    fetch_list=[self.avg_cost.name],
                )
T
tangwei12 已提交
202
                loss_val = np.mean(loss_val)
203
                message = "TEST ---> batch_idx: {} loss: {}\n".format(
204 205
                    batch_idx, loss_val
                )
T
tangwei12 已提交
206 207 208 209 210
                fleet.util.print_on_rank(message, 0)
        except fluid.core.EOFException:
            self.test_reader.reset()

        pass_time = time.time() - pass_start
211
        message = f"Distributed Test Succeed, Using Time {pass_time}\n"
T
tangwei12 已提交
212 213
        fleet.util.print_on_rank(message, 0)

1
123malin 已提交
214
    def do_pyreader_training(self, fleet):
215 216 217 218 219
        """
        do training using dataset, using fetch handler to catch variable
        Args:
            fleet(Fleet api): the fleet object of Parameter Server, define distribute training role
        """
220
        exe = self.get_executor()
221
        exe.run(fluid.default_startup_program())
T
tangwei12 已提交
222 223
        fleet.init_worker()

224 225
        batch_size = 4
        train_reader = paddle.batch(fake_ctr_reader(), batch_size=batch_size)
1
123malin 已提交
226 227 228 229 230 231 232
        self.reader.decorate_sample_list_generator(train_reader)

        for epoch_id in range(1):
            self.reader.start()
            try:
                pass_start = time.time()
                while True:
233 234 235 236
                    loss_val = exe.run(
                        program=fluid.default_main_program(),
                        fetch_list=[self.avg_cost.name],
                    )
1
123malin 已提交
237
                    loss_val = np.mean(loss_val)
238
                    # TODO(randomly fail)
239
                    #   reduce_output = fleet.util.all_reduce(
240
                    #       np.array(loss_val), mode="sum")
241
                    #   loss_all_trainer = fleet.util.all_gather(float(loss_val))
242
                    #   loss_val = float(reduce_output) / len(loss_all_trainer)
243
                    message = "TRAIN ---> pass: {} loss: {}\n".format(
244 245
                        epoch_id, loss_val
                    )
246
                    fleet.util.print_on_rank(message, 0)
247

1
123malin 已提交
248 249 250 251
                pass_time = time.time() - pass_start
            except fluid.core.EOFException:
                self.reader.reset()

T
tangwei12 已提交
252 253 254 255
        dirname = os.getenv("SAVE_DIRNAME", None)
        if dirname:
            fleet.save_persistables(exe, dirname=dirname)

1
123malin 已提交
256
        model_dir = tempfile.mkdtemp()
257 258 259
        fleet.save_inference_model(
            exe, model_dir, [feed.name for feed in self.feeds], self.avg_cost
        )
260 261
        if fleet.is_first_worker():
            self.check_model_right(model_dir)
1
123malin 已提交
262 263
        shutil.rmtree(model_dir)

264
    def do_dataset_training_queuedataset(self, fleet):
265
        train_file_list = ctr_dataset_reader.prepare_fake_data()
1
123malin 已提交
266

267
        exe = self.get_executor()
268
        exe.run(fluid.default_startup_program())
T
tangwei12 已提交
269
        fleet.init_worker()
1
123malin 已提交
270 271 272

        thread_num = 2
        batch_size = 128
273
        filelist = train_file_list
T
tangwei12 已提交
274 275

        # config dataset
276
        dataset = paddle.distributed.QueueDataset()
T
tangwei12 已提交
277
        pipe_command = 'python ctr_dataset_reader.py'
278

279 280 281 282 283 284
        dataset.init(
            batch_size=batch_size,
            use_var=self.feeds,
            pipe_command=pipe_command,
            thread_num=thread_num,
        )
T
tangwei12 已提交
285 286 287

        dataset.set_filelist(filelist)

288
        for epoch_id in range(1):
T
tangwei12 已提交
289 290
            pass_start = time.time()
            dataset.set_filelist(filelist)
291 292 293 294 295 296 297 298
            exe.train_from_dataset(
                program=fluid.default_main_program(),
                dataset=dataset,
                fetch_list=[self.avg_cost],
                fetch_info=["cost"],
                print_period=2,
                debug=int(os.getenv("Debug", "0")),
            )
299 300
            pass_time = time.time() - pass_start

301 302
        if os.getenv("SAVE_MODEL") == "1":
            model_dir = tempfile.mkdtemp()
303 304 305 306 307 308
            fleet.save_inference_model(
                exe,
                model_dir,
                [feed.name for feed in self.feeds],
                self.avg_cost,
            )
309 310
            if fleet.is_first_worker():
                self.check_model_right(model_dir)
311
            shutil.rmtree(model_dir)
312

T
tangwei12 已提交
313 314 315 316
        dirname = os.getenv("SAVE_DIRNAME", None)
        if dirname:
            fleet.save_persistables(exe, dirname=dirname)

317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336
    def do_dataset_training(self, fleet):
        train_file_list = ctr_dataset_reader.prepare_fake_data()

        exe = self.get_executor()
        exe.run(fluid.default_startup_program())
        fleet.init_worker()

        thread_num = 2
        batch_size = 128
        filelist = train_file_list

        # config dataset
        dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
        dataset.set_use_var(self.feeds)
        dataset.set_batch_size(128)
        dataset.set_thread(2)
        dataset.set_filelist(filelist)
        dataset.set_pipe_command('python ctr_dataset_reader.py')
        dataset.load_into_memory()

337
        dataset.global_shuffle(fleet, 12)  # TODO: thread configure
338 339 340 341 342 343 344 345
        shuffle_data_size = dataset.get_shuffle_data_size(fleet)
        local_data_size = dataset.get_shuffle_data_size()
        data_size_list = fleet.util.all_gather(local_data_size)
        print('after global_shuffle data_size_list: ', data_size_list)
        print('after global_shuffle data_size: ', shuffle_data_size)

        for epoch_id in range(1):
            pass_start = time.time()
346 347 348 349 350 351 352 353
            exe.train_from_dataset(
                program=fluid.default_main_program(),
                dataset=dataset,
                fetch_list=[self.avg_cost],
                fetch_info=["cost"],
                print_period=2,
                debug=int(os.getenv("Debug", "0")),
            )
354 355 356 357 358
            pass_time = time.time() - pass_start
        dataset.release_memory()

        if os.getenv("SAVE_MODEL") == "1":
            model_dir = tempfile.mkdtemp()
359 360 361 362 363 364
            fleet.save_inference_model(
                exe,
                model_dir,
                [feed.name for feed in self.feeds],
                self.avg_cost,
            )
365 366 367
            fleet.load_inference_model(model_dir, mode=0)
            if fleet.is_first_worker():
                self.check_model_right(model_dir)
368 369 370 371 372
            shutil.rmtree(model_dir)

        dirname = os.getenv("SAVE_DIRNAME", None)
        if dirname:
            fleet.save_persistables(exe, dirname=dirname)
373
            fleet.load_model(dirname, mode=0)
374

Z
zhaocaibei123 已提交
375 376 377 378
        cache_dirname = os.getenv("SAVE_CACHE_DIRNAME", None)
        if cache_dirname:
            fleet.save_cache_model(cache_dirname)

379 380
        dense_param_dirname = os.getenv("SAVE_DENSE_PARAM_DIRNAME", None)
        if dense_param_dirname:
381 382 383 384 385 386
            fleet.save_dense_params(
                exe,
                dense_param_dirname,
                fluid.global_scope(),
                fluid.default_main_program(),
            )
387 388 389 390 391 392 393 394 395 396 397

        save_one_table_dirname = os.getenv("SAVE_ONE_TABLE_DIRNAME", None)
        if save_one_table_dirname:
            fleet.save_one_table(0, save_one_table_dirname, 0)
            fleet.load_one_table(0, save_one_table_dirname, 0)

        patch_dirname = os.getenv("SAVE_PATCH_DIRNAME", None)
        if patch_dirname:
            fleet.save_persistables(exe, patch_dirname, None, 5)
            fleet.check_save_pre_patch_done()

C
co63oc 已提交
398
        # add for gpu graph
L
lxsbupt 已提交
399 400 401
        fleet.save_cache_table(0, 0)
        fleet.shrink()

402

T
tangwei12 已提交
403 404
if __name__ == "__main__":
    runtime_main(TestDistCTR2x2)