dist_fleet_ctr.py 12.9 KB
Newer Older
T
tangwei12 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
14 15 16
"""
Distribute CTR model for test fleet api
"""
T
tangwei12 已提交
17 18 19 20 21

import shutil
import tempfile
import time

1
123malin 已提交
22
import paddle
T
tangwei12 已提交
23 24
import paddle.fluid as fluid
import os
1
123malin 已提交
25
import numpy as np
T
tangwei12 已提交
26 27 28 29

import ctr_dataset_reader
from test_dist_fleet_base import runtime_main, FleetDistRunnerBase

P
pangyoki 已提交
30 31
paddle.enable_static()

T
tangwei12 已提交
32 33 34 35 36
# Fix seed for test
fluid.default_startup_program().random_seed = 1
fluid.default_main_program().random_seed = 1


37 38 39 40 41 42 43 44 45 46 47
def fake_ctr_reader():
    def reader():
        for _ in range(1000):
            deep = np.random.random_integers(0, 1e5 - 1, size=16).tolist()
            wide = np.random.random_integers(0, 1e5 - 1, size=8).tolist()
            label = np.random.random_integers(0, 1, size=1).tolist()
            yield [deep, wide, label]

    return reader


T
tangwei12 已提交
48
class TestDistCTR2x2(FleetDistRunnerBase):
49 50 51 52
    """
    For test CTR model, using Fleet api
    """

T
tangwei12 已提交
53
    def net(self, args, is_train=True, batch_size=4, lr=0.01):
54 55 56 57 58 59 60 61 62
        """
        network definition

        Args:
            batch_size(int): the size of mini-batch for training
            lr(float): learning rate of training
        Returns:
            avg_cost: LoDTensor of cost.
        """
63 64
        dnn_input_dim, lr_input_dim = int(1e5), int(1e5)

65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
        dnn_data = fluid.layers.data(
            name="dnn_data",
            shape=[-1, 1],
            dtype="int64",
            lod_level=1,
            append_batch_size=False,
        )
        lr_data = fluid.layers.data(
            name="lr_data",
            shape=[-1, 1],
            dtype="int64",
            lod_level=1,
            append_batch_size=False,
        )
        label = fluid.layers.data(
            name="click",
            shape=[-1, 1],
            dtype="int64",
            lod_level=0,
            append_batch_size=False,
        )
T
tangwei12 已提交
86 87 88

        datas = [dnn_data, lr_data, label]

89
        if args.reader == "pyreader":
T
tangwei12 已提交
90
            if is_train:
91 92 93 94 95 96
                self.reader = fluid.io.PyReader(
                    feed_list=datas,
                    capacity=64,
                    iterable=False,
                    use_double_buffer=False,
                )
T
tangwei12 已提交
97
            else:
98 99 100 101 102 103 104 105
                self.test_reader = fluid.io.PyReader(
                    feed_list=datas,
                    capacity=64,
                    iterable=False,
                    use_double_buffer=False,
                )

        # build dnn model
C
Chengmo 已提交
106
        dnn_layer_dims = [128, 128, 64, 32, 1]
T
tangwei12 已提交
107 108 109 110 111 112
        dnn_embedding = fluid.layers.embedding(
            is_distributed=False,
            input=dnn_data,
            size=[dnn_input_dim, dnn_layer_dims[0]],
            param_attr=fluid.ParamAttr(
                name="deep_embedding",
113 114
                initializer=fluid.initializer.Constant(value=0.01),
            ),
1
123malin 已提交
115
            is_sparse=True,
116 117 118 119 120
            padding_idx=0,
        )
        dnn_pool = fluid.layers.sequence_pool(
            input=dnn_embedding, pool_type="sum"
        )
T
tangwei12 已提交
121 122 123 124 125 126 127
        dnn_out = dnn_pool
        for i, dim in enumerate(dnn_layer_dims[1:]):
            fc = fluid.layers.fc(
                input=dnn_out,
                size=dim,
                act="relu",
                param_attr=fluid.ParamAttr(
128 129 130 131
                    initializer=fluid.initializer.Constant(value=0.01)
                ),
                name='dnn-fc-%d' % i,
            )
T
tangwei12 已提交
132 133 134 135 136 137 138 139 140
            dnn_out = fc

        # build lr model
        lr_embbding = fluid.layers.embedding(
            is_distributed=False,
            input=lr_data,
            size=[lr_input_dim, 1],
            param_attr=fluid.ParamAttr(
                name="wide_embedding",
141 142
                initializer=fluid.initializer.Constant(value=0.01),
            ),
1
123malin 已提交
143
            is_sparse=True,
144 145
            padding_idx=0,
        )
T
tangwei12 已提交
146 147 148 149 150 151
        lr_pool = fluid.layers.sequence_pool(input=lr_embbding, pool_type="sum")

        merge_layer = fluid.layers.concat(input=[dnn_out, lr_pool], axis=1)

        predict = fluid.layers.fc(input=merge_layer, size=2, act='softmax')
        acc = fluid.layers.accuracy(input=predict, label=label)
152

153 154 155
        auc_var, batch_auc_var, auc_states = fluid.layers.auc(
            input=predict, label=label
        )
156

T
tangwei12 已提交
157
        cost = fluid.layers.cross_entropy(input=predict, label=label)
158
        avg_cost = paddle.mean(x=cost)
T
tangwei12 已提交
159 160

        self.feeds = datas
161
        self.train_file_path = ["fake1", "fake2"]
T
tangwei12 已提交
162 163 164 165 166 167
        self.avg_cost = avg_cost
        self.predict = predict

        return avg_cost

    def check_model_right(self, dirname):
168
        dirname = dirname + '/dnn_plugin/'
T
tangwei12 已提交
169 170 171 172 173 174 175 176 177
        model_filename = os.path.join(dirname, "__model__")

        with open(model_filename, "rb") as f:
            program_desc_str = f.read()

        program = fluid.Program.parse_from_string(program_desc_str)
        with open(os.path.join(dirname, "__model__.proto"), "w") as wn:
            wn.write(str(program))

178
    def do_distributed_testing(self, fleet):
T
tangwei12 已提交
179 180 181
        """
        do distributed
        """
182
        exe = self.get_executor()
T
tangwei12 已提交
183 184 185 186 187 188 189 190 191 192 193 194

        batch_size = 4
        test_reader = paddle.batch(fake_ctr_reader(), batch_size=batch_size)
        self.test_reader.decorate_sample_list_generator(test_reader)

        pass_start = time.time()
        batch_idx = 0

        self.test_reader.start()
        try:
            while True:
                batch_idx += 1
195 196 197 198
                loss_val = exe.run(
                    program=paddle.static.default_main_program(),
                    fetch_list=[self.avg_cost.name],
                )
T
tangwei12 已提交
199
                loss_val = np.mean(loss_val)
200
                message = "TEST ---> batch_idx: {} loss: {}\n".format(
201 202
                    batch_idx, loss_val
                )
T
tangwei12 已提交
203 204 205 206 207 208 209 210
                fleet.util.print_on_rank(message, 0)
        except fluid.core.EOFException:
            self.test_reader.reset()

        pass_time = time.time() - pass_start
        message = "Distributed Test Succeed, Using Time {}\n".format(pass_time)
        fleet.util.print_on_rank(message, 0)

1
123malin 已提交
211
    def do_pyreader_training(self, fleet):
212 213 214 215 216
        """
        do training using dataset, using fetch handler to catch variable
        Args:
            fleet(Fleet api): the fleet object of Parameter Server, define distribute training role
        """
217
        exe = self.get_executor()
218
        exe.run(fluid.default_startup_program())
T
tangwei12 已提交
219 220
        fleet.init_worker()

221 222
        batch_size = 4
        train_reader = paddle.batch(fake_ctr_reader(), batch_size=batch_size)
1
123malin 已提交
223 224 225 226 227 228 229
        self.reader.decorate_sample_list_generator(train_reader)

        for epoch_id in range(1):
            self.reader.start()
            try:
                pass_start = time.time()
                while True:
230 231 232 233
                    loss_val = exe.run(
                        program=fluid.default_main_program(),
                        fetch_list=[self.avg_cost.name],
                    )
1
123malin 已提交
234
                    loss_val = np.mean(loss_val)
235
                    # TODO(randomly fail)
236
                    #   reduce_output = fleet.util.all_reduce(
237
                    #       np.array(loss_val), mode="sum")
238
                    #   loss_all_trainer = fleet.util.all_gather(float(loss_val))
239
                    #   loss_val = float(reduce_output) / len(loss_all_trainer)
240
                    message = "TRAIN ---> pass: {} loss: {}\n".format(
241 242
                        epoch_id, loss_val
                    )
243
                    fleet.util.print_on_rank(message, 0)
244

1
123malin 已提交
245 246 247 248
                pass_time = time.time() - pass_start
            except fluid.core.EOFException:
                self.reader.reset()

T
tangwei12 已提交
249 250 251 252
        dirname = os.getenv("SAVE_DIRNAME", None)
        if dirname:
            fleet.save_persistables(exe, dirname=dirname)

1
123malin 已提交
253
        model_dir = tempfile.mkdtemp()
254 255 256
        fleet.save_inference_model(
            exe, model_dir, [feed.name for feed in self.feeds], self.avg_cost
        )
257 258
        if fleet.is_first_worker():
            self.check_model_right(model_dir)
1
123malin 已提交
259 260
        shutil.rmtree(model_dir)

261
    def do_dataset_training_queuedataset(self, fleet):
262
        train_file_list = ctr_dataset_reader.prepare_fake_data()
1
123malin 已提交
263

264
        exe = self.get_executor()
265
        exe.run(fluid.default_startup_program())
T
tangwei12 已提交
266
        fleet.init_worker()
1
123malin 已提交
267 268 269

        thread_num = 2
        batch_size = 128
270
        filelist = train_file_list
T
tangwei12 已提交
271 272

        # config dataset
273
        dataset = paddle.distributed.QueueDataset()
T
tangwei12 已提交
274
        pipe_command = 'python ctr_dataset_reader.py'
275

276 277 278 279 280 281
        dataset.init(
            batch_size=batch_size,
            use_var=self.feeds,
            pipe_command=pipe_command,
            thread_num=thread_num,
        )
T
tangwei12 已提交
282 283 284

        dataset.set_filelist(filelist)

285
        for epoch_id in range(1):
T
tangwei12 已提交
286 287
            pass_start = time.time()
            dataset.set_filelist(filelist)
288 289 290 291 292 293 294 295
            exe.train_from_dataset(
                program=fluid.default_main_program(),
                dataset=dataset,
                fetch_list=[self.avg_cost],
                fetch_info=["cost"],
                print_period=2,
                debug=int(os.getenv("Debug", "0")),
            )
296 297
            pass_time = time.time() - pass_start

298 299
        if os.getenv("SAVE_MODEL") == "1":
            model_dir = tempfile.mkdtemp()
300 301 302 303 304 305
            fleet.save_inference_model(
                exe,
                model_dir,
                [feed.name for feed in self.feeds],
                self.avg_cost,
            )
306 307
            if fleet.is_first_worker():
                self.check_model_right(model_dir)
308
            shutil.rmtree(model_dir)
309

T
tangwei12 已提交
310 311 312 313
        dirname = os.getenv("SAVE_DIRNAME", None)
        if dirname:
            fleet.save_persistables(exe, dirname=dirname)

314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342
    def do_dataset_training(self, fleet):
        train_file_list = ctr_dataset_reader.prepare_fake_data()

        exe = self.get_executor()
        exe.run(fluid.default_startup_program())
        fleet.init_worker()

        thread_num = 2
        batch_size = 128
        filelist = train_file_list

        # config dataset
        dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
        dataset.set_use_var(self.feeds)
        dataset.set_batch_size(128)
        dataset.set_thread(2)
        dataset.set_filelist(filelist)
        dataset.set_pipe_command('python ctr_dataset_reader.py')
        dataset.load_into_memory()

        dataset.global_shuffle(fleet, 12)  ##TODO: thread configure
        shuffle_data_size = dataset.get_shuffle_data_size(fleet)
        local_data_size = dataset.get_shuffle_data_size()
        data_size_list = fleet.util.all_gather(local_data_size)
        print('after global_shuffle data_size_list: ', data_size_list)
        print('after global_shuffle data_size: ', shuffle_data_size)

        for epoch_id in range(1):
            pass_start = time.time()
343 344 345 346 347 348 349 350
            exe.train_from_dataset(
                program=fluid.default_main_program(),
                dataset=dataset,
                fetch_list=[self.avg_cost],
                fetch_info=["cost"],
                print_period=2,
                debug=int(os.getenv("Debug", "0")),
            )
351 352 353 354 355
            pass_time = time.time() - pass_start
        dataset.release_memory()

        if os.getenv("SAVE_MODEL") == "1":
            model_dir = tempfile.mkdtemp()
356 357 358 359 360 361
            fleet.save_inference_model(
                exe,
                model_dir,
                [feed.name for feed in self.feeds],
                self.avg_cost,
            )
362 363 364
            fleet.load_inference_model(model_dir, mode=0)
            if fleet.is_first_worker():
                self.check_model_right(model_dir)
365 366 367 368 369
            shutil.rmtree(model_dir)

        dirname = os.getenv("SAVE_DIRNAME", None)
        if dirname:
            fleet.save_persistables(exe, dirname=dirname)
370
            fleet.load_model(dirname, mode=0)
371

Z
zhaocaibei123 已提交
372 373 374 375
        cache_dirname = os.getenv("SAVE_CACHE_DIRNAME", None)
        if cache_dirname:
            fleet.save_cache_model(cache_dirname)

376 377
        dense_param_dirname = os.getenv("SAVE_DENSE_PARAM_DIRNAME", None)
        if dense_param_dirname:
378 379 380 381 382 383
            fleet.save_dense_params(
                exe,
                dense_param_dirname,
                fluid.global_scope(),
                fluid.default_main_program(),
            )
384 385 386 387 388 389 390 391 392 393 394

        save_one_table_dirname = os.getenv("SAVE_ONE_TABLE_DIRNAME", None)
        if save_one_table_dirname:
            fleet.save_one_table(0, save_one_table_dirname, 0)
            fleet.load_one_table(0, save_one_table_dirname, 0)

        patch_dirname = os.getenv("SAVE_PATCH_DIRNAME", None)
        if patch_dirname:
            fleet.save_persistables(exe, patch_dirname, None, 5)
            fleet.check_save_pre_patch_done()

395

T
tangwei12 已提交
396 397
if __name__ == "__main__":
    runtime_main(TestDistCTR2x2)