fleet_deep_ctr.py 6.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import logging
import time

19
import paddle
20 21
import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
22 23 24 25 26 27
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import (
    fleet,
)
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import (
    StrategyFactory,
)
28

29
from paddle.fluid.log_helper import get_logger
30 31 32

import ctr_dataset_reader

33 34 35
logger = get_logger(
    "fluid", logging.INFO, fmt='%(asctime)s - %(levelname)s - %(message)s'
)
36 37 38 39 40 41 42 43 44 45


def parse_args():
    parser = argparse.ArgumentParser(description="PaddlePaddle Fleet ctr")

    # the following arguments is used for distributed train, if is_local == false, then you should set them
    parser.add_argument(
        '--role',
        type=str,
        default='pserver',  # trainer or pserver
46 47
        help='The path for model to store (default: models)',
    )
48 49 50 51
    parser.add_argument(
        '--endpoints',
        type=str,
        default='127.0.0.1:6000',
52 53
        help='The pserver endpoints, like: 127.0.0.1:6000,127.0.0.1:6001',
    )
54 55 56 57
    parser.add_argument(
        '--current_endpoint',
        type=str,
        default='127.0.0.1:6000',
58 59 60 61 62 63 64 65 66 67 68 69 70 71
        help='The path for model to store (default: 127.0.0.1:6000)',
    )
    parser.add_argument(
        '--trainer_id',
        type=int,
        default=0,
        help='The path for model to store (default: models)',
    )
    parser.add_argument(
        '--trainers',
        type=int,
        default=1,
        help='The num of trainers, (default: 1)',
    )
72 73 74 75 76

    return parser.parse_args()


def model():
77 78 79 80 81
    (
        dnn_input_dim,
        lr_input_dim,
        train_file_path,
    ) = ctr_dataset_reader.prepare_data()
82
    """ network definition """
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
    dnn_data = fluid.layers.data(
        name="dnn_data",
        shape=[-1, 1],
        dtype="int64",
        lod_level=1,
        append_batch_size=False,
    )
    lr_data = fluid.layers.data(
        name="lr_data",
        shape=[-1, 1],
        dtype="int64",
        lod_level=1,
        append_batch_size=False,
    )
    label = fluid.layers.data(
        name="click",
        shape=[-1, 1],
        dtype="int64",
        lod_level=0,
        append_batch_size=False,
    )
104 105 106 107 108 109 110 111 112 113 114

    datas = [dnn_data, lr_data, label]

    # build dnn model
    dnn_layer_dims = [128, 64, 32, 1]
    dnn_embedding = fluid.layers.embedding(
        is_distributed=False,
        input=dnn_data,
        size=[dnn_input_dim, dnn_layer_dims[0]],
        param_attr=fluid.ParamAttr(
            name="deep_embedding",
115 116 117 118
            initializer=fluid.initializer.Constant(value=0.01),
        ),
        is_sparse=True,
    )
119 120 121
    dnn_pool = fluid.layers.sequence_pool(input=dnn_embedding, pool_type="sum")
    dnn_out = dnn_pool
    for i, dim in enumerate(dnn_layer_dims[1:]):
C
Charles-hit 已提交
122 123
        fc = paddle.static.nn.fc(
            x=dnn_out,
124
            size=dim,
C
Charles-hit 已提交
125 126
            activation="relu",
            weight_attr=fluid.ParamAttr(
127 128 129 130
                initializer=fluid.initializer.Constant(value=0.01)
            ),
            name='dnn-fc-%d' % i,
        )
131 132 133 134 135 136 137 138 139
        dnn_out = fc

    # build lr model
    lr_embbding = fluid.layers.embedding(
        is_distributed=False,
        input=lr_data,
        size=[lr_input_dim, 1],
        param_attr=fluid.ParamAttr(
            name="wide_embedding",
140 141 142 143
            initializer=fluid.initializer.Constant(value=0.01),
        ),
        is_sparse=True,
    )
144 145 146 147
    lr_pool = fluid.layers.sequence_pool(input=lr_embbding, pool_type="sum")

    merge_layer = fluid.layers.concat(input=[dnn_out, lr_pool], axis=1)

C
Charles-hit 已提交
148
    predict = paddle.static.nn.fc(x=merge_layer, size=2, activation='softmax')
149 150
    acc = paddle.static.accuracy(input=predict, label=label)
    auc_var, batch_auc_var, auc_states = paddle.static.auc(
151 152
        input=predict, label=label
    )
153 154 155
    cost = paddle.nn.functional.cross_entropy(
        input=predict, label=label, reduction='none', use_softmax=False
    )
156
    avg_cost = paddle.mean(x=cost)
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171

    return datas, avg_cost, predict, train_file_path


def train(args):
    datas, avg_cost, predict, train_file_path = model()

    endpoints = args.endpoints.split(",")
    if args.role.upper() == "PSERVER":
        current_id = endpoints.index(args.current_endpoint)
    else:
        current_id = 0
    role = role_maker.UserDefinedRoleMaker(
        current_id=current_id,
        role=role_maker.Role.WORKER
172 173
        if args.role.upper() == "TRAINER"
        else role_maker.Role.SERVER,
174
        worker_num=args.trainers,
175 176
        server_endpoints=endpoints,
    )
177 178 179 180

    exe = fluid.Executor(fluid.CPUPlace())
    fleet.init(role)

181
    strategy = StrategyFactory.create_half_async_strategy()
182 183 184 185 186 187 188 189 190 191 192 193 194 195

    optimizer = fluid.optimizer.SGD(learning_rate=0.0001)
    optimizer = fleet.distributed_optimizer(optimizer, strategy)
    optimizer.minimize(avg_cost)

    if fleet.is_server():
        logger.info("run pserver")

        fleet.init_server()
        fleet.run_server()
    elif fleet.is_worker():
        logger.info("run trainer")

        exe.run(fleet.startup_program)
T
tangwei12 已提交
196
        fleet.init_worker()
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216

        thread_num = 2
        filelist = []
        for _ in range(thread_num):
            filelist.append(train_file_path)

        # config dataset
        dataset = fluid.DatasetFactory().create_dataset()
        dataset.set_batch_size(128)
        dataset.set_use_var(datas)
        pipe_command = 'python ctr_dataset_reader.py'
        dataset.set_pipe_command(pipe_command)

        dataset.set_filelist(filelist)
        dataset.set_thread(thread_num)

        for epoch_id in range(10):
            logger.info("epoch {} start".format(epoch_id))
            pass_start = time.time()
            dataset.set_filelist(filelist)
217 218 219 220 221 222 223 224
            exe.train_from_dataset(
                program=fleet.main_program,
                dataset=dataset,
                fetch_list=[avg_cost],
                fetch_info=["cost"],
                print_period=100,
                debug=False,
            )
225
            pass_time = time.time() - pass_start
226 227 228
            logger.info(
                "epoch {} finished, pass_time {}".format(epoch_id, pass_time)
            )
229 230 231 232 233 234
        fleet.stop_worker()


if __name__ == "__main__":
    args = parse_args()
    train(args)