train.py 7.5 KB
Newer Older
Q
Qiao Longfei 已提交
1 2 3 4 5
from __future__ import print_function

import argparse
import logging
import os
6
import time
Q
Qiao Longfei 已提交
7

8 9
import numpy as np

T
tangwei12 已提交
10
# disable gpu training for this example
Q
Qiao Longfei 已提交
11 12 13 14 15 16 17 18 19 20 21 22 23 24
os.environ["CUDA_VISIBLE_DEVICES"] = ""

import paddle
import paddle.fluid as fluid

import reader
from network_conf import skip_gram_word2vec

logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)


def parse_args():
T
tangwei12 已提交
25
    parser = argparse.ArgumentParser(description="PaddlePaddle Word2vec example")
Q
Qiao Longfei 已提交
26 27 28
    parser.add_argument(
        '--train_data_path',
        type=str,
Q
Qiao Longfei 已提交
29
        default='./data/enwik8',
Q
Qiao Longfei 已提交
30
        help="The path of training dataset")
Q
Qiao Longfei 已提交
31 32 33
    parser.add_argument(
        '--dict_path',
        type=str,
Q
Qiao Longfei 已提交
34
        default='./data/enwik8_dict',
Q
Qiao Longfei 已提交
35
        help="The path of data dict")
Q
Qiao Longfei 已提交
36 37 38
    parser.add_argument(
        '--test_data_path',
        type=str,
Q
Qiao Longfei 已提交
39
        default='./data/text8',
Q
Qiao Longfei 已提交
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
        help="The path of testing dataset")
    parser.add_argument(
        '--batch_size',
        type=int,
        default=100,
        help="The size of mini-batch (default:1000)")
    parser.add_argument(
        '--num_passes',
        type=int,
        default=10,
        help="The number of passes to train (default: 10)")
    parser.add_argument(
        '--model_output_dir',
        type=str,
        default='models',
        help='The path for model to store (default: models)')
    parser.add_argument(
        '--embedding_size',
        type=int,
        default=64,
        help='sparse feature hashing space for index processing')

T
tangwei12 已提交
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
    parser.add_argument(
        '--with_hs',
        action='store_true',
        required=False,
        default=False,
        help='using hierarchical sigmoid, (default: False)')

    parser.add_argument(
        '--with_nce',
        action='store_true',
        required=False,
        default=True,
        help='using negtive sampling, (default: True)')

    parser.add_argument(
        '--max_code_length',
        type=int,
        default=40,
        help='max code length used by hierarchical sigmoid, (default: 40)')
    parser.add_argument(
        '--is_sparse',
T
tangwei12 已提交
83 84
        action='store_true',
        required=False,
T
tangwei12 已提交
85 86 87
        default=False,
        help='embedding and nce will use sparse or not, (default: False)')

Q
Qiao Longfei 已提交
88 89 90
    return parser.parse_args()


91 92
def train_loop(args, train_program, reader, py_reader, loss, trainer_id):

Q
Qiao Longfei 已提交
93 94
    train_reader = paddle.batch(
        paddle.reader.shuffle(
95 96
            reader.train((args.with_hs or (not args.with_nce))),
            buf_size=args.batch_size * 100),
Q
Qiao Longfei 已提交
97 98
        batch_size=args.batch_size)

99 100 101
    py_reader.decorate_paddle_reader(train_reader)

    place = fluid.CPUPlace()
102

103
    data_name_list = None
Q
Qiao Longfei 已提交
104 105 106

    exe = fluid.Executor(place)
    exe.run(fluid.default_startup_program())
107
    start = time.clock()
108 109

    exec_strategy = fluid.ExecutionStrategy()
T
tangwei12 已提交
110 111 112 113

    if os.getenv("NUM_THREADS", ""):
        exec_strategy.num_threads = int(os.getenv("NUM_THREADS"))

114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
    build_strategy = fluid.BuildStrategy()
    build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce

    train_exe = fluid.ParallelExecutor(
        use_cuda=False,
        loss_name=loss.name,
        main_program=train_program,
        build_strategy=build_strategy,
        exec_strategy=exec_strategy)

    profile_state = "CPU"
    profiler_step = 0
    profiler_step_start = 20
    profiler_step_end = 30

Q
Qiao Longfei 已提交
129
    for pass_id in range(args.num_passes):
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
        epoch_start = time.time()
        py_reader.start()
        batch_id = 0

        try:
            while True:
                if profiler_step == profiler_step_start:
                    fluid.profiler.start_profiler(profile_state)

                loss_val = train_exe.run(fetch_list=[loss.name])
                loss_val = np.mean(loss_val)

                if profiler_step == profiler_step_end:
                    fluid.profiler.stop_profiler('total', 'trainer_profile.log')
                    profiler_step += 1
                else:
                    profiler_step += 1

                if batch_id % 10 == 0:
                    logger.info("TRAIN --> pass: {} batch: {} loss: {}".format(
                        pass_id, batch_id, loss_val.mean() / args.batch_size))
                if batch_id % 1000 == 0 and batch_id != 0:
                    elapsed = (time.clock() - start)
                    logger.info("Time used: {}".format(elapsed))

                if batch_id % 1000 == 0 and batch_id != 0:
                    model_dir = args.model_output_dir + '/batch-' + str(batch_id)
                    if trainer_id == 0:
                        fluid.io.save_inference_model(model_dir, data_name_list, [loss], exe)
                batch_id += 1

        except fluid.core.EOFException:
            py_reader.reset()
            epoch_end = time.time()
            print("Epoch: {0}, Train total expend: {1} ".format(pass_id, epoch_end - epoch_start))

            model_dir = args.model_output_dir + '/pass-' + str(pass_id)
            if trainer_id == 0:
                fluid.io.save_inference_model(model_dir, data_name_list, [loss], exe)
Q
Qiao Longfei 已提交
169 170 171 172 173 174 175 176


def train():
    args = parse_args()

    if not os.path.isdir(args.model_output_dir):
        os.mkdir(args.model_output_dir)

Q
Qiao Longfei 已提交
177 178
    word2vec_reader = reader.Word2VecReader(args.dict_path,
                                            args.train_data_path)
179 180 181

    logger.info("dict_size: {}".format(word2vec_reader.dict_size))

182
    loss, py_reader = skip_gram_word2vec(
183
        word2vec_reader.dict_size, word2vec_reader.word_frequencys,
T
tangwei12 已提交
184 185 186
        args.embedding_size, args.max_code_length,
        args.with_hs, args.with_nce, is_sparse=args.is_sparse)

Q
Qiao Longfei 已提交
187 188 189
    optimizer = fluid.optimizer.Adam(learning_rate=1e-3)
    optimizer.minimize(loss)

T
tangwei12 已提交
190
    if os.environ["PADDLE_IS_LOCAL"] == "1":
Q
Qiao Longfei 已提交
191 192
        logger.info("run local training")
        main_program = fluid.default_main_program()
193
        train_loop(args, main_program, word2vec_reader, py_reader, loss, 0)
Q
Qiao Longfei 已提交
194 195
    else:
        logger.info("run dist training")
T
tangwei12 已提交
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216

        trainer_id = int(os.environ["PADDLE_TRAINER_ID"])
        trainers = int(os.environ["PADDLE_TRAINERS"])
        training_role = os.environ["PADDLE_TRAINING_ROLE"]

        ports = os.getenv("PADDLE_PSERVER_PORTS", "6174")
        pserver_ip = os.getenv("PADDLE_IP", "")

        eplist = []
        for port in ports.split(","):
            eplist.append(':'.join([pserver_ip, port]))

        pserver_endpoints = ",".join(eplist)
        current_endpoint = pserver_ip + ":" + os.getenv("CUR_PORT", "2333")

        config = fluid.DistributeTranspilerConfig()
        config.slice_var_up = False
        t = fluid.DistributeTranspiler(config=config)
        t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers, sync_mode=True)

        if training_role == "PSERVER":
Q
Qiao Longfei 已提交
217
            logger.info("run pserver")
T
tangwei12 已提交
218 219 220 221 222 223
            prog = t.get_pserver_program(current_endpoint)
            startup = t.get_startup_program(current_endpoint, pserver_program=prog)

            with open("pserver.main.proto.{}".format(os.getenv("CUR_PORT")), "w") as f:
                f.write(str(prog))

Q
Qiao Longfei 已提交
224 225 226
            exe = fluid.Executor(fluid.CPUPlace())
            exe.run(startup)
            exe.run(prog)
T
tangwei12 已提交
227
        elif training_role == "TRAINER":
Q
Qiao Longfei 已提交
228 229
            logger.info("run trainer")
            train_prog = t.get_trainer_program()
T
tangwei12 已提交
230 231 232 233 234

            with open("trainer.main.proto.{}".format(trainer_id), "w") as f:
                f.write(str(train_prog))

            train_loop(args, train_prog, word2vec_reader, py_reader, loss, trainer_id)
Q
Qiao Longfei 已提交
235 236 237 238


if __name__ == '__main__':
    train()