train.py 11.1 KB
Newer Older
Q
Qiao Longfei 已提交
1 2 3 4 5
from __future__ import print_function

import argparse
import logging
import os
6
import time
Q
Qiao Longfei 已提交
7

8 9
import numpy as np

T
tangwei12 已提交
10
# disable gpu training for this example
Q
Qiao Longfei 已提交
11 12 13 14
os.environ["CUDA_VISIBLE_DEVICES"] = ""

import paddle
import paddle.fluid as fluid
J
JiabinYang 已提交
15
from paddle.fluid.executor import global_scope
Q
Qiao Longfei 已提交
16 17 18

import reader
from network_conf import skip_gram_word2vec
J
JiabinYang 已提交
19
from infer import inference_test
Q
Qiao Longfei 已提交
20 21 22 23 24 25 26

logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)


def parse_args():
J
JiabinYang 已提交
27 28
    parser = argparse.ArgumentParser(
        description="PaddlePaddle Word2vec example")
Q
Qiao Longfei 已提交
29 30 31
    parser.add_argument(
        '--train_data_path',
        type=str,
32
        default='./data/1-billion-word-language-modeling-benchmark-r13output/training-monolingual.tokenized.shuffled',
Q
Qiao Longfei 已提交
33
        help="The path of training dataset")
Q
Qiao Longfei 已提交
34 35 36
    parser.add_argument(
        '--dict_path',
        type=str,
37
        default='./data/1-billion_dict',
Q
Qiao Longfei 已提交
38
        help="The path of data dict")
Q
Qiao Longfei 已提交
39 40 41
    parser.add_argument(
        '--test_data_path',
        type=str,
Q
Qiao Longfei 已提交
42
        default='./data/text8',
Q
Qiao Longfei 已提交
43 44 45 46 47
        help="The path of testing dataset")
    parser.add_argument(
        '--batch_size',
        type=int,
        default=100,
48
        help="The size of mini-batch (default:100)")
Q
Qiao Longfei 已提交
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
    parser.add_argument(
        '--num_passes',
        type=int,
        default=10,
        help="The number of passes to train (default: 10)")
    parser.add_argument(
        '--model_output_dir',
        type=str,
        default='models',
        help='The path for model to store (default: models)')
    parser.add_argument(
        '--embedding_size',
        type=int,
        default=64,
        help='sparse feature hashing space for index processing')

T
tangwei12 已提交
65 66 67 68 69 70 71 72 73 74 75
    parser.add_argument(
        '--with_hs',
        action='store_true',
        required=False,
        default=False,
        help='using hierarchical sigmoid, (default: False)')

    parser.add_argument(
        '--with_nce',
        action='store_true',
        required=False,
J
JiabinYang 已提交
76
        default=False,
T
tangwei12 已提交
77 78 79 80 81 82 83
        help='using negtive sampling, (default: True)')

    parser.add_argument(
        '--max_code_length',
        type=int,
        default=40,
        help='max code length used by hierarchical sigmoid, (default: 40)')
84

T
tangwei12 已提交
85 86
    parser.add_argument(
        '--is_sparse',
T
tangwei12 已提交
87 88
        action='store_true',
        required=False,
T
tangwei12 已提交
89 90 91
        default=False,
        help='embedding and nce will use sparse or not, (default: False)')

92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
    parser.add_argument(
        '--with_Adam',
        action='store_true',
        required=False,
        default=False,
        help='Using Adam as optimizer or not, (default: False)')

    parser.add_argument(
        '--is_local',
        action='store_true',
        required=False,
        default=False,
        help='Local train or not, (default: False)')

    parser.add_argument(
        '--with_speed',
        action='store_true',
        required=False,
        default=False,
        help='print speed or not , (default: False)')

    parser.add_argument(
        '--with_infer_test',
        action='store_true',
        required=False,
        default=False,
        help='Do inference every 100 batches , (default: False)')

    parser.add_argument(
        '--rank_num',
        type=int,
        default=4,
        help="find rank_num-nearest result for test (default: 4)")

Q
Qiao Longfei 已提交
126 127 128
    return parser.parse_args()


129
def train_loop(args, train_program, reader, py_reader, loss, trainer_id):
Q
Qiao Longfei 已提交
130 131
    train_reader = paddle.batch(
        paddle.reader.shuffle(
132 133
            reader.train((args.with_hs or (not args.with_nce))),
            buf_size=args.batch_size * 100),
Q
Qiao Longfei 已提交
134 135
        batch_size=args.batch_size)

136 137 138
    py_reader.decorate_paddle_reader(train_reader)

    place = fluid.CPUPlace()
139

Q
Qiao Longfei 已提交
140 141
    exe = fluid.Executor(place)
    exe.run(fluid.default_startup_program())
142 143

    exec_strategy = fluid.ExecutionStrategy()
T
tangwei12 已提交
144

J
JiabinYang 已提交
145 146
    print("CPU_NUM:" + str(os.getenv("CPU_NUM")))
    exec_strategy.num_threads = int(os.getenv("CPU_NUM"))
T
tangwei12 已提交
147

148
    build_strategy = fluid.BuildStrategy()
J
JiabinYang 已提交
149 150
    if int(os.getenv("CPU_NUM")) > 1:
        build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
151 152 153 154 155 156 157 158 159 160 161 162 163

    train_exe = fluid.ParallelExecutor(
        use_cuda=False,
        loss_name=loss.name,
        main_program=train_program,
        build_strategy=build_strategy,
        exec_strategy=exec_strategy)

    profile_state = "CPU"
    profiler_step = 0
    profiler_step_start = 20
    profiler_step_end = 30

Q
Qiao Longfei 已提交
164
    for pass_id in range(args.num_passes):
165 166 167
        epoch_start = time.time()
        py_reader.start()
        batch_id = 0
J
JiabinYang 已提交
168
        start = time.clock()
169 170 171

        try:
            while True:
172

173 174 175 176 177 178 179 180 181 182 183 184
                if profiler_step == profiler_step_start:
                    fluid.profiler.start_profiler(profile_state)

                loss_val = train_exe.run(fetch_list=[loss.name])
                loss_val = np.mean(loss_val)

                if profiler_step == profiler_step_end:
                    fluid.profiler.stop_profiler('total', 'trainer_profile.log')
                    profiler_step += 1
                else:
                    profiler_step += 1

185
                if batch_id % 50 == 0:
J
JiabinYang 已提交
186 187 188 189 190
                    logger.info(
                        "TRAIN --> pass: {} batch: {} loss: {} reader queue:{}".
                        format(pass_id, batch_id,
                               loss_val.mean() / args.batch_size,
                               py_reader.queue.size()))
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
                if args.with_speed:
                    if batch_id % 1000 == 0 and batch_id != 0:
                        elapsed = (time.clock() - start)
                        start = time.clock()
                        samples = 1001 * args.batch_size * int(
                            os.getenv("CPU_NUM"))
                        logger.info("Time used: {}, Samples/Sec: {}".format(
                            elapsed, samples / elapsed))
                if batch_id == 200 or batch_id == 100:
                    model_dir = args.model_output_dir + '/batch-' + str(
                        batch_id)
                    fluid.io.save_persistables(executor=exe, dirname=model_dir)
                    with open(model_dir + "/_success", 'w+') as f:
                        f.write(str(batch_id))
                # calculate infer result each 100 batches
                if args.with_infer_test:
                    if batch_id % 1000 == 0 and batch_id != 0:
                        model_dir = args.model_output_dir + '/batch-' + str(
                            batch_id)
J
JiabinYang 已提交
210
                        inference_test(global_scope(), model_dir, args)
211 212 213 214 215
                batch_id += 1

        except fluid.core.EOFException:
            py_reader.reset()
            epoch_end = time.time()
216
            logger.info("Epoch: {0}, Train total expend: {1} ".format(
J
JiabinYang 已提交
217
                pass_id, epoch_end - epoch_start))
218

219 220 221 222 223
            model_dir = args.model_output_dir + '/pass-' + str(pass_id)
            if trainer_id == 0:
                fluid.io.save_persistables(executor=exe, dirname=model_dir)
                with open(model_dir + "/_success", 'w+') as f:
                    f.write(str(pass_id))
Q
Qiao Longfei 已提交
224 225


226 227 228 229 230
def GetFileList(data_path):
    return os.listdir(data_path)


def train(args):
Q
Qiao Longfei 已提交
231 232 233 234

    if not os.path.isdir(args.model_output_dir):
        os.mkdir(args.model_output_dir)

235
    filelist = GetFileList(args.train_data_path)
Q
Qiao Longfei 已提交
236
    word2vec_reader = reader.Word2VecReader(args.dict_path,
237
                                            args.train_data_path, filelist)
238 239

    logger.info("dict_size: {}".format(word2vec_reader.dict_size))
240
    loss, py_reader = skip_gram_word2vec(
J
JiabinYang 已提交
241 242 243 244 245 246 247 248
        word2vec_reader.dict_size,
        word2vec_reader.word_frequencys,
        args.embedding_size,
        args.max_code_length,
        args.with_hs,
        args.with_nce,
        is_sparse=args.is_sparse)

249 250 251 252 253 254
    optimizer = None
    if args.with_Adam:
        optimizer = fluid.optimizer.Adam(learning_rate=1e-3)
    else:
        optimizer = fluid.optimizer.SGD(learning_rate=1e-3)

Q
Qiao Longfei 已提交
255 256
    optimizer.minimize(loss)

257 258
    # do local training 
    if args.is_local or os.getenv("PADDLE_IS_LOCAL", "1") == "1":
Q
Qiao Longfei 已提交
259 260
        logger.info("run local training")
        main_program = fluid.default_main_program()
J
JiabinYang 已提交
261 262 263 264

        with open("local.main.proto", "w") as f:
            f.write(str(main_program))

265
        train_loop(args, main_program, word2vec_reader, py_reader, loss, 0)
266
    # do distribute training
Q
Qiao Longfei 已提交
267 268
    else:
        logger.info("run dist training")
T
tangwei12 已提交
269 270 271 272 273

        trainer_id = int(os.environ["PADDLE_TRAINER_ID"])
        trainers = int(os.environ["PADDLE_TRAINERS"])
        training_role = os.environ["PADDLE_TRAINING_ROLE"]

J
JiabinYang 已提交
274 275
        port = os.getenv("PADDLE_PSERVER_PORT", "6174")
        pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "")
T
tangwei12 已提交
276
        eplist = []
J
JiabinYang 已提交
277 278
        for ip in pserver_ips.split(","):
            eplist.append(':'.join([ip, port]))
T
tangwei12 已提交
279
        pserver_endpoints = ",".join(eplist)
J
JiabinYang 已提交
280
        current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port
T
tangwei12 已提交
281 282 283 284

        config = fluid.DistributeTranspilerConfig()
        config.slice_var_up = False
        t = fluid.DistributeTranspiler(config=config)
J
JiabinYang 已提交
285 286 287 288 289
        t.transpile(
            trainer_id,
            pservers=pserver_endpoints,
            trainers=trainers,
            sync_mode=True)
T
tangwei12 已提交
290 291

        if training_role == "PSERVER":
Q
Qiao Longfei 已提交
292
            logger.info("run pserver")
T
tangwei12 已提交
293
            prog = t.get_pserver_program(current_endpoint)
J
JiabinYang 已提交
294 295
            startup = t.get_startup_program(
                current_endpoint, pserver_program=prog)
T
tangwei12 已提交
296

J
JiabinYang 已提交
297 298
            with open("pserver.main.proto.{}".format(os.getenv("CUR_PORT")),
                      "w") as f:
T
tangwei12 已提交
299 300
                f.write(str(prog))

Q
Qiao Longfei 已提交
301 302 303
            exe = fluid.Executor(fluid.CPUPlace())
            exe.run(startup)
            exe.run(prog)
T
tangwei12 已提交
304
        elif training_role == "TRAINER":
Q
Qiao Longfei 已提交
305 306
            logger.info("run trainer")
            train_prog = t.get_trainer_program()
T
tangwei12 已提交
307 308 309 310

            with open("trainer.main.proto.{}".format(trainer_id), "w") as f:
                f.write(str(train_prog))

J
JiabinYang 已提交
311 312
            train_loop(args, train_prog, word2vec_reader, py_reader, loss,
                       trainer_id)
Q
Qiao Longfei 已提交
313 314


J
JiabinYang 已提交
315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
def env_declar():
    print("********  Rename Cluster Env to PaddleFluid Env ********")

    print("Content-Type: text/plain\n\n")
    for key in os.environ.keys():
        print("%30s %s \n" % (key, os.environ[key]))

    if os.environ["TRAINING_ROLE"] == "PSERVER" or os.environ[
            "PADDLE_IS_LOCAL"] == "0":
        os.environ["PADDLE_TRAINING_ROLE"] = os.environ["TRAINING_ROLE"]
        os.environ["PADDLE_PSERVER_PORT"] = os.environ["PADDLE_PORT"]
        os.environ["PADDLE_PSERVER_IPS"] = os.environ["PADDLE_PSERVERS"]
        os.environ["PADDLE_TRAINERS"] = os.environ["PADDLE_TRAINERS_NUM"]
        os.environ["PADDLE_CURRENT_IP"] = os.environ["POD_IP"]
        os.environ["PADDLE_TRAINER_ID"] = os.environ["PADDLE_TRAINER_ID"]
330
        # we set the thread number same as CPU number
J
JiabinYang 已提交
331 332 333 334 335 336 337 338 339
        os.environ["CPU_NUM"] = "12"

    print("Content-Type: text/plain\n\n")
    for key in os.environ.keys():
        print("%30s %s \n" % (key, os.environ[key]))

    print("******  Rename Cluster Env to PaddleFluid Env END ******")


Q
Qiao Longfei 已提交
340
if __name__ == '__main__':
341 342 343 344 345 346
    args = parse_args()
    if args.is_local:
        pass
    else:
        env_declar()
    train(args)