dist_fleet_ctr.py 8.6 KB
Newer Older
T
tangwei12 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
14 15 16
"""
Distribute CTR model for test fleet api
"""
T
tangwei12 已提交
17 18 19 20 21 22 23

from __future__ import print_function

import shutil
import tempfile
import time

1
123malin 已提交
24
import paddle
T
tangwei12 已提交
25 26
import paddle.fluid as fluid
import os
1
123malin 已提交
27
import numpy as np
T
tangwei12 已提交
28 29 30 31

import ctr_dataset_reader
from test_dist_fleet_base import runtime_main, FleetDistRunnerBase

P
pangyoki 已提交
32 33
paddle.enable_static()

T
tangwei12 已提交
34 35 36 37 38
# Fix seed for test
fluid.default_startup_program().random_seed = 1
fluid.default_main_program().random_seed = 1


39 40 41 42 43 44 45 46 47 48 49
def fake_ctr_reader():
    def reader():
        for _ in range(1000):
            deep = np.random.random_integers(0, 1e5 - 1, size=16).tolist()
            wide = np.random.random_integers(0, 1e5 - 1, size=8).tolist()
            label = np.random.random_integers(0, 1, size=1).tolist()
            yield [deep, wide, label]

    return reader


T
tangwei12 已提交
50
class TestDistCTR2x2(FleetDistRunnerBase):
51 52 53 54
    """
    For test CTR model, using Fleet api
    """

55
    def net(self, args, batch_size=4, lr=0.01):
56 57 58 59 60 61 62 63 64
        """
        network definition

        Args:
            batch_size(int): the size of mini-batch for training
            lr(float): learning rate of training
        Returns:
            avg_cost: LoDTensor of cost.
        """
65 66
        dnn_input_dim, lr_input_dim = int(1e5), int(1e5)

T
tangwei12 已提交
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
        dnn_data = fluid.layers.data(
            name="dnn_data",
            shape=[-1, 1],
            dtype="int64",
            lod_level=1,
            append_batch_size=False)
        lr_data = fluid.layers.data(
            name="lr_data",
            shape=[-1, 1],
            dtype="int64",
            lod_level=1,
            append_batch_size=False)
        label = fluid.layers.data(
            name="click",
            shape=[-1, 1],
            dtype="int64",
            lod_level=0,
            append_batch_size=False)

        datas = [dnn_data, lr_data, label]

88 89 90 91 92 93 94
        if args.reader == "pyreader":
            self.reader = fluid.io.PyReader(
                feed_list=datas,
                capacity=64,
                iterable=False,
                use_double_buffer=False)

T
tangwei12 已提交
95
        # build dnn model
C
Chengmo 已提交
96
        dnn_layer_dims = [128, 128, 64, 32, 1]
T
tangwei12 已提交
97 98 99 100 101 102 103
        dnn_embedding = fluid.layers.embedding(
            is_distributed=False,
            input=dnn_data,
            size=[dnn_input_dim, dnn_layer_dims[0]],
            param_attr=fluid.ParamAttr(
                name="deep_embedding",
                initializer=fluid.initializer.Constant(value=0.01)),
1
123malin 已提交
104 105
            is_sparse=True,
            padding_idx=0)
T
tangwei12 已提交
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
        dnn_pool = fluid.layers.sequence_pool(
            input=dnn_embedding, pool_type="sum")
        dnn_out = dnn_pool
        for i, dim in enumerate(dnn_layer_dims[1:]):
            fc = fluid.layers.fc(
                input=dnn_out,
                size=dim,
                act="relu",
                param_attr=fluid.ParamAttr(
                    initializer=fluid.initializer.Constant(value=0.01)),
                name='dnn-fc-%d' % i)
            dnn_out = fc

        # build lr model
        lr_embbding = fluid.layers.embedding(
            is_distributed=False,
            input=lr_data,
            size=[lr_input_dim, 1],
            param_attr=fluid.ParamAttr(
                name="wide_embedding",
                initializer=fluid.initializer.Constant(value=0.01)),
1
123malin 已提交
127 128
            is_sparse=True,
            padding_idx=0)
T
tangwei12 已提交
129 130 131 132 133 134
        lr_pool = fluid.layers.sequence_pool(input=lr_embbding, pool_type="sum")

        merge_layer = fluid.layers.concat(input=[dnn_out, lr_pool], axis=1)

        predict = fluid.layers.fc(input=merge_layer, size=2, act='softmax')
        acc = fluid.layers.accuracy(input=predict, label=label)
135

T
tangwei12 已提交
136 137
        auc_var, batch_auc_var, auc_states = fluid.layers.auc(input=predict,
                                                              label=label)
138

T
tangwei12 已提交
139 140 141 142
        cost = fluid.layers.cross_entropy(input=predict, label=label)
        avg_cost = fluid.layers.mean(x=cost)

        self.feeds = datas
143
        self.train_file_path = ["fake1", "fake2"]
T
tangwei12 已提交
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
        self.avg_cost = avg_cost
        self.predict = predict

        return avg_cost

    def check_model_right(self, dirname):
        model_filename = os.path.join(dirname, "__model__")

        with open(model_filename, "rb") as f:
            program_desc_str = f.read()

        program = fluid.Program.parse_from_string(program_desc_str)
        with open(os.path.join(dirname, "__model__.proto"), "w") as wn:
            wn.write(str(program))

1
123malin 已提交
159
    def do_pyreader_training(self, fleet):
160 161 162 163 164
        """
        do training using dataset, using fetch handler to catch variable
        Args:
            fleet(Fleet api): the fleet object of Parameter Server, define distribute training role
        """
1
123malin 已提交
165 166 167 168 169 170
        device_env = os.getenv("DEVICE", 'cpu')
        if device_env == 'cpu':
            device = fluid.CPUPlace()
        elif device_env == 'gpu':
            device = fluid.CUDAPlace(0)
        exe = fluid.Executor(device)
T
tangwei12 已提交
171

172
        exe.run(fluid.default_startup_program())
T
tangwei12 已提交
173 174
        fleet.init_worker()

175 176
        batch_size = 4
        train_reader = paddle.batch(fake_ctr_reader(), batch_size=batch_size)
1
123malin 已提交
177 178 179 180 181 182 183
        self.reader.decorate_sample_list_generator(train_reader)

        for epoch_id in range(1):
            self.reader.start()
            try:
                pass_start = time.time()
                while True:
184
                    loss_val = exe.run(program=fluid.default_main_program(),
1
123malin 已提交
185 186
                                       fetch_list=[self.avg_cost.name])
                    loss_val = np.mean(loss_val)
187
                    # TODO(randomly fail)
188
                    #   reduce_output = fleet.util.all_reduce(
189
                    #       np.array(loss_val), mode="sum")
190
                    #   loss_all_trainer = fleet.util.all_gather(float(loss_val))
191
                    #   loss_val = float(reduce_output) / len(loss_all_trainer)
192 193
                    message = "TRAIN ---> pass: {} loss: {}\n".format(epoch_id,
                                                                      loss_val)
194
                    fleet.util.print_on_rank(message, 0)
195

1
123malin 已提交
196 197 198 199 200 201 202 203 204 205 206 207
                pass_time = time.time() - pass_start
            except fluid.core.EOFException:
                self.reader.reset()

        model_dir = tempfile.mkdtemp()
        fleet.save_inference_model(
            exe, model_dir, [feed.name for feed in self.feeds], self.avg_cost)
        self.check_model_right(model_dir)
        shutil.rmtree(model_dir)
        fleet.stop_worker()

    def do_dataset_training(self, fleet):
208
        train_file_list = ctr_dataset_reader.prepare_fake_data()
1
123malin 已提交
209

1
123malin 已提交
210 211 212 213 214 215
        device_env = os.getenv("DEVICE", 'cpu')
        if device_env == 'cpu':
            device = fluid.CPUPlace()
        elif device_env == 'gpu':
            device = fluid.CUDAPlace(0)
        exe = fluid.Executor(device)
1
123malin 已提交
216

217
        exe.run(fluid.default_startup_program())
T
tangwei12 已提交
218
        fleet.init_worker()
1
123malin 已提交
219 220 221

        thread_num = 2
        batch_size = 128
222
        filelist = train_file_list
T
tangwei12 已提交
223 224

        # config dataset
225
        dataset = paddle.distributed.QueueDataset()
T
tangwei12 已提交
226
        pipe_command = 'python ctr_dataset_reader.py'
227 228 229 230 231 232

        dataset.init(
            batch_size=batch_size,
            use_var=self.feeds,
            pipe_command=pipe_command,
            thread_num=thread_num)
T
tangwei12 已提交
233 234 235

        dataset.set_filelist(filelist)

236
        for epoch_id in range(1):
T
tangwei12 已提交
237 238 239
            pass_start = time.time()
            dataset.set_filelist(filelist)
            exe.train_from_dataset(
240
                program=fluid.default_main_program(),
T
tangwei12 已提交
241 242 243
                dataset=dataset,
                fetch_list=[self.avg_cost],
                fetch_info=["cost"],
244
                print_period=2,
245
                debug=int(os.getenv("Debug", "0")))
246 247
            pass_time = time.time() - pass_start

248 249 250 251 252 253 254
        if os.getenv("SAVE_MODEL") == "1":
            model_dir = tempfile.mkdtemp()
            fleet.save_inference_model(exe, model_dir,
                                       [feed.name for feed in self.feeds],
                                       self.avg_cost)
            self.check_model_right(model_dir)
            shutil.rmtree(model_dir)
255

T
tangwei12 已提交
256 257 258 259 260
        fleet.stop_worker()


if __name__ == "__main__":
    runtime_main(TestDistCTR2x2)