dist_fleet_ctr.py 13.3 KB
Newer Older
T
tangwei12 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
14 15 16
"""
Distribute CTR model for test fleet api
"""
T
tangwei12 已提交
17 18 19 20 21

import shutil
import tempfile
import time

1
123malin 已提交
22
import paddle
T
tangwei12 已提交
23 24
import paddle.fluid as fluid
import os
1
123malin 已提交
25
import numpy as np
T
tangwei12 已提交
26 27 28 29

import ctr_dataset_reader
from test_dist_fleet_base import runtime_main, FleetDistRunnerBase

P
pangyoki 已提交
30 31
paddle.enable_static()

T
tangwei12 已提交
32 33 34 35 36
# Fix seed for test
fluid.default_startup_program().random_seed = 1
fluid.default_main_program().random_seed = 1


37
def fake_ctr_reader():
38

39 40 41 42 43 44 45 46 47 48
    def reader():
        for _ in range(1000):
            deep = np.random.random_integers(0, 1e5 - 1, size=16).tolist()
            wide = np.random.random_integers(0, 1e5 - 1, size=8).tolist()
            label = np.random.random_integers(0, 1, size=1).tolist()
            yield [deep, wide, label]

    return reader


T
tangwei12 已提交
49
class TestDistCTR2x2(FleetDistRunnerBase):
50 51 52 53
    """
    For test CTR model, using Fleet api
    """

T
tangwei12 已提交
54
    def net(self, args, is_train=True, batch_size=4, lr=0.01):
55 56 57 58 59 60 61 62 63
        """
        network definition

        Args:
            batch_size(int): the size of mini-batch for training
            lr(float): learning rate of training
        Returns:
            avg_cost: LoDTensor of cost.
        """
64 65
        dnn_input_dim, lr_input_dim = int(1e5), int(1e5)

66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
        dnn_data = fluid.layers.data(name="dnn_data",
                                     shape=[-1, 1],
                                     dtype="int64",
                                     lod_level=1,
                                     append_batch_size=False)
        lr_data = fluid.layers.data(name="lr_data",
                                    shape=[-1, 1],
                                    dtype="int64",
                                    lod_level=1,
                                    append_batch_size=False)
        label = fluid.layers.data(name="click",
                                  shape=[-1, 1],
                                  dtype="int64",
                                  lod_level=0,
                                  append_batch_size=False)
T
tangwei12 已提交
81 82 83

        datas = [dnn_data, lr_data, label]

84
        if args.reader == "pyreader":
T
tangwei12 已提交
85
            if is_train:
86 87 88 89
                self.reader = fluid.io.PyReader(feed_list=datas,
                                                capacity=64,
                                                iterable=False,
                                                use_double_buffer=False)
T
tangwei12 已提交
90
            else:
91 92 93 94 95
                self.test_reader = fluid.io.PyReader(feed_list=datas,
                                                     capacity=64,
                                                     iterable=False,
                                                     use_double_buffer=False)

T
tangwei12 已提交
96 97

# build dnn model
C
Chengmo 已提交
98
        dnn_layer_dims = [128, 128, 64, 32, 1]
T
tangwei12 已提交
99 100 101 102 103 104 105
        dnn_embedding = fluid.layers.embedding(
            is_distributed=False,
            input=dnn_data,
            size=[dnn_input_dim, dnn_layer_dims[0]],
            param_attr=fluid.ParamAttr(
                name="deep_embedding",
                initializer=fluid.initializer.Constant(value=0.01)),
1
123malin 已提交
106 107
            is_sparse=True,
            padding_idx=0)
108 109
        dnn_pool = fluid.layers.sequence_pool(input=dnn_embedding,
                                              pool_type="sum")
T
tangwei12 已提交
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
        dnn_out = dnn_pool
        for i, dim in enumerate(dnn_layer_dims[1:]):
            fc = fluid.layers.fc(
                input=dnn_out,
                size=dim,
                act="relu",
                param_attr=fluid.ParamAttr(
                    initializer=fluid.initializer.Constant(value=0.01)),
                name='dnn-fc-%d' % i)
            dnn_out = fc

        # build lr model
        lr_embbding = fluid.layers.embedding(
            is_distributed=False,
            input=lr_data,
            size=[lr_input_dim, 1],
            param_attr=fluid.ParamAttr(
                name="wide_embedding",
                initializer=fluid.initializer.Constant(value=0.01)),
1
123malin 已提交
129 130
            is_sparse=True,
            padding_idx=0)
T
tangwei12 已提交
131 132 133 134 135 136
        lr_pool = fluid.layers.sequence_pool(input=lr_embbding, pool_type="sum")

        merge_layer = fluid.layers.concat(input=[dnn_out, lr_pool], axis=1)

        predict = fluid.layers.fc(input=merge_layer, size=2, act='softmax')
        acc = fluid.layers.accuracy(input=predict, label=label)
137

T
tangwei12 已提交
138 139
        auc_var, batch_auc_var, auc_states = fluid.layers.auc(input=predict,
                                                              label=label)
140

T
tangwei12 已提交
141
        cost = fluid.layers.cross_entropy(input=predict, label=label)
142
        avg_cost = paddle.mean(x=cost)
T
tangwei12 已提交
143 144

        self.feeds = datas
145
        self.train_file_path = ["fake1", "fake2"]
T
tangwei12 已提交
146 147 148 149 150 151
        self.avg_cost = avg_cost
        self.predict = predict

        return avg_cost

    def check_model_right(self, dirname):
152
        dirname = dirname + '/dnn_plugin/'
T
tangwei12 已提交
153 154 155 156 157 158 159 160 161
        model_filename = os.path.join(dirname, "__model__")

        with open(model_filename, "rb") as f:
            program_desc_str = f.read()

        program = fluid.Program.parse_from_string(program_desc_str)
        with open(os.path.join(dirname, "__model__.proto"), "w") as wn:
            wn.write(str(program))

162
    def do_distributed_testing(self, fleet):
T
tangwei12 已提交
163 164 165
        """
        do distributed
        """
166
        exe = self.get_executor()
T
tangwei12 已提交
167 168 169 170 171 172 173 174 175 176 177 178

        batch_size = 4
        test_reader = paddle.batch(fake_ctr_reader(), batch_size=batch_size)
        self.test_reader.decorate_sample_list_generator(test_reader)

        pass_start = time.time()
        batch_idx = 0

        self.test_reader.start()
        try:
            while True:
                batch_idx += 1
179
                loss_val = exe.run(program=paddle.static.default_main_program(),
T
tangwei12 已提交
180 181
                                   fetch_list=[self.avg_cost.name])
                loss_val = np.mean(loss_val)
182 183
                message = "TEST ---> batch_idx: {} loss: {}\n".format(
                    batch_idx, loss_val)
T
tangwei12 已提交
184 185 186 187 188 189 190 191
                fleet.util.print_on_rank(message, 0)
        except fluid.core.EOFException:
            self.test_reader.reset()

        pass_time = time.time() - pass_start
        message = "Distributed Test Succeed, Using Time {}\n".format(pass_time)
        fleet.util.print_on_rank(message, 0)

1
123malin 已提交
192
    def do_pyreader_training(self, fleet):
193 194 195 196 197
        """
        do training using dataset, using fetch handler to catch variable
        Args:
            fleet(Fleet api): the fleet object of Parameter Server, define distribute training role
        """
198
        exe = self.get_executor()
199
        exe.run(fluid.default_startup_program())
T
tangwei12 已提交
200 201
        fleet.init_worker()

202 203
        batch_size = 4
        train_reader = paddle.batch(fake_ctr_reader(), batch_size=batch_size)
1
123malin 已提交
204 205 206 207 208 209 210
        self.reader.decorate_sample_list_generator(train_reader)

        for epoch_id in range(1):
            self.reader.start()
            try:
                pass_start = time.time()
                while True:
211
                    loss_val = exe.run(program=fluid.default_main_program(),
1
123malin 已提交
212 213
                                       fetch_list=[self.avg_cost.name])
                    loss_val = np.mean(loss_val)
214
                    # TODO(randomly fail)
215
                    #   reduce_output = fleet.util.all_reduce(
216
                    #       np.array(loss_val), mode="sum")
217
                    #   loss_all_trainer = fleet.util.all_gather(float(loss_val))
218
                    #   loss_val = float(reduce_output) / len(loss_all_trainer)
219 220
                    message = "TRAIN ---> pass: {} loss: {}\n".format(
                        epoch_id, loss_val)
221
                    fleet.util.print_on_rank(message, 0)
222

1
123malin 已提交
223 224 225 226
                pass_time = time.time() - pass_start
            except fluid.core.EOFException:
                self.reader.reset()

T
tangwei12 已提交
227 228 229 230
        dirname = os.getenv("SAVE_DIRNAME", None)
        if dirname:
            fleet.save_persistables(exe, dirname=dirname)

1
123malin 已提交
231
        model_dir = tempfile.mkdtemp()
232 233 234
        fleet.save_inference_model(exe, model_dir,
                                   [feed.name for feed in self.feeds],
                                   self.avg_cost)
235 236
        if fleet.is_first_worker():
            self.check_model_right(model_dir)
1
123malin 已提交
237 238
        shutil.rmtree(model_dir)

239
    def do_dataset_training_queuedataset(self, fleet):
240
        train_file_list = ctr_dataset_reader.prepare_fake_data()
1
123malin 已提交
241

242
        exe = self.get_executor()
243
        exe.run(fluid.default_startup_program())
T
tangwei12 已提交
244
        fleet.init_worker()
1
123malin 已提交
245 246 247

        thread_num = 2
        batch_size = 128
248
        filelist = train_file_list
T
tangwei12 已提交
249 250

        # config dataset
251
        dataset = paddle.distributed.QueueDataset()
T
tangwei12 已提交
252
        pipe_command = 'python ctr_dataset_reader.py'
253

254 255 256 257
        dataset.init(batch_size=batch_size,
                     use_var=self.feeds,
                     pipe_command=pipe_command,
                     thread_num=thread_num)
T
tangwei12 已提交
258 259 260

        dataset.set_filelist(filelist)

261
        for epoch_id in range(1):
T
tangwei12 已提交
262 263
            pass_start = time.time()
            dataset.set_filelist(filelist)
264 265 266 267 268 269
            exe.train_from_dataset(program=fluid.default_main_program(),
                                   dataset=dataset,
                                   fetch_list=[self.avg_cost],
                                   fetch_info=["cost"],
                                   print_period=2,
                                   debug=int(os.getenv("Debug", "0")))
270 271
            pass_time = time.time() - pass_start

272 273 274 275 276
        if os.getenv("SAVE_MODEL") == "1":
            model_dir = tempfile.mkdtemp()
            fleet.save_inference_model(exe, model_dir,
                                       [feed.name for feed in self.feeds],
                                       self.avg_cost)
277 278
            if fleet.is_first_worker():
                self.check_model_right(model_dir)
279
            shutil.rmtree(model_dir)
280

T
tangwei12 已提交
281 282 283 284
        dirname = os.getenv("SAVE_DIRNAME", None)
        if dirname:
            fleet.save_persistables(exe, dirname=dirname)

285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313
    def do_dataset_training(self, fleet):
        train_file_list = ctr_dataset_reader.prepare_fake_data()

        exe = self.get_executor()
        exe.run(fluid.default_startup_program())
        fleet.init_worker()

        thread_num = 2
        batch_size = 128
        filelist = train_file_list

        # config dataset
        dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
        dataset.set_use_var(self.feeds)
        dataset.set_batch_size(128)
        dataset.set_thread(2)
        dataset.set_filelist(filelist)
        dataset.set_pipe_command('python ctr_dataset_reader.py')
        dataset.load_into_memory()

        dataset.global_shuffle(fleet, 12)  ##TODO: thread configure
        shuffle_data_size = dataset.get_shuffle_data_size(fleet)
        local_data_size = dataset.get_shuffle_data_size()
        data_size_list = fleet.util.all_gather(local_data_size)
        print('after global_shuffle data_size_list: ', data_size_list)
        print('after global_shuffle data_size: ', shuffle_data_size)

        for epoch_id in range(1):
            pass_start = time.time()
314 315 316 317 318 319
            exe.train_from_dataset(program=fluid.default_main_program(),
                                   dataset=dataset,
                                   fetch_list=[self.avg_cost],
                                   fetch_info=["cost"],
                                   print_period=2,
                                   debug=int(os.getenv("Debug", "0")))
320 321 322 323 324 325 326 327
            pass_time = time.time() - pass_start
        dataset.release_memory()

        if os.getenv("SAVE_MODEL") == "1":
            model_dir = tempfile.mkdtemp()
            fleet.save_inference_model(exe, model_dir,
                                       [feed.name for feed in self.feeds],
                                       self.avg_cost)
328 329 330
            fleet.load_inference_model(model_dir, mode=0)
            if fleet.is_first_worker():
                self.check_model_right(model_dir)
331 332 333 334 335
            shutil.rmtree(model_dir)

        dirname = os.getenv("SAVE_DIRNAME", None)
        if dirname:
            fleet.save_persistables(exe, dirname=dirname)
336
            fleet.load_model(dirname, mode=0)
337

Z
zhaocaibei123 已提交
338 339 340 341
        cache_dirname = os.getenv("SAVE_CACHE_DIRNAME", None)
        if cache_dirname:
            fleet.save_cache_model(cache_dirname)

342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357
        dense_param_dirname = os.getenv("SAVE_DENSE_PARAM_DIRNAME", None)
        if dense_param_dirname:
            fleet.save_dense_params(exe, dense_param_dirname,
                                    fluid.global_scope(),
                                    fluid.default_main_program())

        save_one_table_dirname = os.getenv("SAVE_ONE_TABLE_DIRNAME", None)
        if save_one_table_dirname:
            fleet.save_one_table(0, save_one_table_dirname, 0)
            fleet.load_one_table(0, save_one_table_dirname, 0)

        patch_dirname = os.getenv("SAVE_PATCH_DIRNAME", None)
        if patch_dirname:
            fleet.save_persistables(exe, patch_dirname, None, 5)
            fleet.check_save_pre_patch_done()

T
tangwei12 已提交
358 359
if __name__ == "__main__":
    runtime_main(TestDistCTR2x2)