cluster_train.py 6.4 KB
Newer Older
1 2 3 4 5 6 7
import argparse
import os
import sys
import time
from network_conf import ctr_deepfm_model

import paddle.fluid as fluid
8
import utils
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41


def parse_args():
    parser = argparse.ArgumentParser("deepfm cluster train.")

    parser.add_argument(
        '--train_data_dir',
        type=str,
        default='dist_data/dist_train_data',
        help='The path of train data (default: data/train_data)')
    parser.add_argument(
        '--test_data_dir',
        type=str,
        default='dist_data/dist_test_data',
        help='The path of test data (default: models)')
    parser.add_argument(
        '--feat_dict',
        type=str,
        default='dist_data/aid_data/feat_dict_10.pkl2',
        help='The path of feat_dict')
    parser.add_argument(
        '--batch_size',
        type=int,
        default=100,
        help="The size of mini-batch (default:100)")
    parser.add_argument(
        '--embedding_size',
        type=int,
        default=10,
        help="The size for embedding layer (default:10)")
    parser.add_argument(
        '--num_epoch',
        type=int,
Z
zhoushiyu 已提交
42
        default=10,
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
        help="The number of epochs to train (default: 50)")
    parser.add_argument(
        '--model_output_dir',
        type=str,
        required=True,
        help='The path for model to store (default: models)')
    parser.add_argument(
        '--num_thread',
        type=int,
        default=1,
        help='The number of threads (default: 1)')
    parser.add_argument('--test_epoch', type=str, default='1')
    parser.add_argument(
        '--layer_sizes',
        nargs='+',
        type=int,
        default=[400, 400, 400],
        help='The size of each layers (default: [10, 10, 10])')
    parser.add_argument(
        '--act',
        type=str,
        default='relu',
        help='The activation of each layers (default: relu)')
    parser.add_argument(
        '--is_sparse',
        action='store_true',
        required=False,
        default=False,
        help='embedding will use sparse or not, (default: False)')
    parser.add_argument(
        '--lr', type=float, default=1e-4, help='Learning rate (default: 1e-4)')
    parser.add_argument(
        '--reg', type=float, default=1e-4, help=' (default: 1e-4)')
    parser.add_argument('--num_field', type=int, default=39)
Z
zhoushiyu 已提交
77
    parser.add_argument('--num_feat', type=int, default=141443)
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
    parser.add_argument('--use_gpu', type=int, default=1)

    # dist params
    parser.add_argument('--is_local', type=int, default=1, help='whether local')
    parser.add_argument(
        '--num_devices', type=int, default=1, help='Number of GPU devices')
    parser.add_argument(
        '--role', type=str, default='pserver', help='trainer or pserver')
    parser.add_argument(
        '--endpoints',
        type=str,
        default='127.0.0.1:6000',
        help='The pserver endpoints, like: 127.0.0.1:6000, 127.0.0.1:6001')
    parser.add_argument(
        '--current_endpoint',
        type=str,
        default='127.0.0.1:6000',
        help='The current_endpoint')
    parser.add_argument(
        '--trainer_id',
        type=int,
        default=0,
        help='trainer id ,only trainer_id=0 save model')
    parser.add_argument(
        '--trainers',
        type=int,
        default=1,
        help='The num of trianers, (default: 1)')
    args = parser.parse_args()
    return args


def train():
    """ do training """
    args = parse_args()
    print(args)

    if args.trainer_id == 0 and not os.path.isdir(args.model_output_dir):
        os.mkdir(args.model_output_dir)

118 119 120
    loss, auc, data_list, auc_states = ctr_deepfm_model(
        args.embedding_size, args.num_field, args.num_feat, args.layer_sizes,
        args.act, args.reg, args.is_sparse)
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
    optimizer = fluid.optimizer.SGD(
        learning_rate=args.lr,
        regularization=fluid.regularizer.L2DecayRegularizer(args.reg))
    optimizer.minimize(loss)

    def train_loop(main_program):
        """ train network """
        start_time = time.time()
        dataset = fluid.DatasetFactory().create_dataset()
        dataset.set_use_var(data_list)
        pipe_command = 'python criteo_reader.py {}'.format(args.feat_dict)
        dataset.set_pipe_command(pipe_command)
        dataset.set_batch_size(args.batch_size)
        dataset.set_thread(args.num_thread)
        train_filelist = [
136
            os.path.join(args.train_data_dir, x)
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
            for x in os.listdir(args.train_data_dir)
        ]

        if args.use_gpu == 1:
            exe = fluid.Executor(fluid.CUDAPlace(0))
            dataset.set_thread(1)
        else:
            exe = fluid.Executor(fluid.CPUPlace())
            dataset.set_thread(args.num_thread)
        exe.run(fluid.default_startup_program())

        for epoch_id in range(args.num_epoch):
            start = time.time()
            sys.stderr.write('\nepoch%d start ...\n' % (epoch_id + 1))
            dataset.set_filelist(train_filelist)
            exe.train_from_dataset(
                program=main_program,
                dataset=dataset,
155 156
                fetch_list=[loss, auc],
                fetch_info=['epoch %d batch loss' % (epoch_id + 1), "auc"],
157
                print_period=5,
158
                debug=False)
159 160
            model_dir = os.path.join(args.model_output_dir,
                                     'epoch_' + str(epoch_id + 1))
161 162 163 164
            sys.stderr.write('epoch%d is finished and takes %f s\n' % (
                (epoch_id + 1), time.time() - start))
            if args.trainer_id == 0:  # only trainer 0 save model
                print("save model in {}".format(model_dir))
Y
yaoxuefeng 已提交
165
                fluid.save(main_program, model_dir)
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191

        print("train time cost {:.4f}".format(time.time() - start_time))
        print("finish training")

    if args.is_local:
        print("run local training")
        train_loop(fluid.default_main_program())
    else:
        print("run distribute training")
        t = fluid.DistributeTranspiler()
        t.transpile(
            args.trainer_id, pservers=args.endpoints, trainers=args.trainers)
        if args.role == "pserver":
            print("run psever")
            pserver_prog, pserver_startup = t.get_pserver_programs(
                args.current_endpoint)

            exe = fluid.Executor(fluid.CPUPlace())
            exe.run(pserver_startup)
            exe.run(pserver_prog)
        elif args.role == "trainer":
            print("run trainer")
            train_loop(t.get_trainer_program())


if __name__ == "__main__":
192
    utils.check_version()
193
    train()