train.py 12.5 KB
Newer Older
Q
Qiao Longfei 已提交
1 2 3 4
from __future__ import print_function
import argparse
import logging
import os
5
import time
Q
Qiao Longfei 已提交
6

7 8
import numpy as np

T
tangwei12 已提交
9
# disable gpu training for this example
Q
Qiao Longfei 已提交
10 11 12 13
os.environ["CUDA_VISIBLE_DEVICES"] = ""

import paddle
import paddle.fluid as fluid
J
JiabinYang 已提交
14
from paddle.fluid.executor import global_scope
J
JiabinYang 已提交
15
import six
Q
Qiao Longfei 已提交
16 17
import reader
from network_conf import skip_gram_word2vec
J
JiabinYang 已提交
18
from infer import inference_test
Q
Qiao Longfei 已提交
19 20 21 22 23 24 25

logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)


def parse_args():
J
JiabinYang 已提交
26 27
    parser = argparse.ArgumentParser(
        description="PaddlePaddle Word2vec example")
Q
Qiao Longfei 已提交
28 29 30
    parser.add_argument(
        '--train_data_path',
        type=str,
31
        default='./data/1-billion-word-language-modeling-benchmark-r13output/training-monolingual.tokenized.shuffled',
J
JiabinYang 已提交
32
        help="The path of taining dataset")
Q
Qiao Longfei 已提交
33 34 35
    parser.add_argument(
        '--dict_path',
        type=str,
36
        default='./data/1-billion_dict',
Q
Qiao Longfei 已提交
37
        help="The path of data dict")
Q
Qiao Longfei 已提交
38 39 40
    parser.add_argument(
        '--test_data_path',
        type=str,
Q
Qiao Longfei 已提交
41
        default='./data/text8',
Q
Qiao Longfei 已提交
42 43 44 45
        help="The path of testing dataset")
    parser.add_argument(
        '--batch_size',
        type=int,
J
JiabinYang 已提交
46
        default=1000,
47
        help="The size of mini-batch (default:100)")
Q
Qiao Longfei 已提交
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
    parser.add_argument(
        '--num_passes',
        type=int,
        default=10,
        help="The number of passes to train (default: 10)")
    parser.add_argument(
        '--model_output_dir',
        type=str,
        default='models',
        help='The path for model to store (default: models)')
    parser.add_argument(
        '--embedding_size',
        type=int,
        default=64,
        help='sparse feature hashing space for index processing')

T
tangwei12 已提交
64 65 66 67 68 69 70 71 72 73 74
    parser.add_argument(
        '--with_hs',
        action='store_true',
        required=False,
        default=False,
        help='using hierarchical sigmoid, (default: False)')

    parser.add_argument(
        '--with_nce',
        action='store_true',
        required=False,
J
JiabinYang 已提交
75
        default=False,
T
tangwei12 已提交
76 77 78 79 80 81 82
        help='using negtive sampling, (default: True)')

    parser.add_argument(
        '--max_code_length',
        type=int,
        default=40,
        help='max code length used by hierarchical sigmoid, (default: 40)')
83

T
tangwei12 已提交
84 85
    parser.add_argument(
        '--is_sparse',
T
tangwei12 已提交
86 87
        action='store_true',
        required=False,
T
tangwei12 已提交
88 89 90
        default=False,
        help='embedding and nce will use sparse or not, (default: False)')

91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
    parser.add_argument(
        '--with_Adam',
        action='store_true',
        required=False,
        default=False,
        help='Using Adam as optimizer or not, (default: False)')

    parser.add_argument(
        '--is_local',
        action='store_true',
        required=False,
        default=False,
        help='Local train or not, (default: False)')

    parser.add_argument(
        '--with_speed',
        action='store_true',
        required=False,
        default=False,
        help='print speed or not , (default: False)')

    parser.add_argument(
        '--with_infer_test',
        action='store_true',
        required=False,
        default=False,
        help='Do inference every 100 batches , (default: False)')

119 120 121 122 123 124 125
    parser.add_argument(
        '--with_other_dict',
        action='store_true',
        required=False,
        default=False,
        help='if use other dict , (default: False)')

126 127 128 129 130 131
    parser.add_argument(
        '--rank_num',
        type=int,
        default=4,
        help="find rank_num-nearest result for test (default: 4)")

Q
Qiao Longfei 已提交
132 133 134
    return parser.parse_args()


J
JiabinYang 已提交
135
def convert_python_to_tensor(batch_size, sample_reader, is_hs):
J
JiabinYang 已提交
136
    def __reader__():
J
JiabinYang 已提交
137 138 139 140 141
        result = None
        if is_hs:
            result = [[], [], [], []]
        else:
            result = [[], []]
J
JiabinYang 已提交
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
        for sample in sample_reader():
            for i, fea in enumerate(sample):
                result[i].append(fea)
            if len(result[0]) == batch_size:
                tensor_result = []
                for tensor in result:
                    t = fluid.Tensor()
                    dat = np.array(tensor, dtype='int64')
                    if len(dat.shape) > 2:
                        dat = dat.reshape((dat.shape[0], dat.shape[2]))
                    elif len(dat.shape) == 1:
                        dat = dat.reshape((-1, 1))
                    t.set(dat, fluid.CPUPlace())

                    tensor_result.append(t)
                yield tensor_result
J
JiabinYang 已提交
158 159 160 161
                if is_hs:
                    result = [[], [], [], []]
                else:
                    result = [[], []]
J
JiabinYang 已提交
162 163 164 165

    return __reader__


166
def train_loop(args, train_program, reader, py_reader, loss, trainer_id):
Q
Qiao Longfei 已提交
167

J
JiabinYang 已提交
168 169 170
    py_reader.decorate_tensor_provider(
        convert_python_to_tensor(args.batch_size,
                                 reader.train((args.with_hs or (
171 172
                                     not args.with_nce)), args.with_other_dict),
                                 (args.with_hs or (not args.with_nce))))
173 174

    place = fluid.CPUPlace()
175

Q
Qiao Longfei 已提交
176 177
    exe = fluid.Executor(place)
    exe.run(fluid.default_startup_program())
178 179

    exec_strategy = fluid.ExecutionStrategy()
J
JiabinYang 已提交
180
    exec_strategy.use_experimental_executor = True
T
tangwei12 已提交
181

J
JiabinYang 已提交
182 183
    print("CPU_NUM:" + str(os.getenv("CPU_NUM")))
    exec_strategy.num_threads = int(os.getenv("CPU_NUM"))
T
tangwei12 已提交
184

185
    build_strategy = fluid.BuildStrategy()
J
JiabinYang 已提交
186 187
    if int(os.getenv("CPU_NUM")) > 1:
        build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
188 189 190 191 192 193 194 195 196 197 198 199 200

    train_exe = fluid.ParallelExecutor(
        use_cuda=False,
        loss_name=loss.name,
        main_program=train_program,
        build_strategy=build_strategy,
        exec_strategy=exec_strategy)

    profile_state = "CPU"
    profiler_step = 0
    profiler_step_start = 20
    profiler_step_end = 30

Q
Qiao Longfei 已提交
201
    for pass_id in range(args.num_passes):
202
        py_reader.start()
J
JiabinYang 已提交
203 204
        time.sleep(10)
        epoch_start = time.time()
205
        batch_id = 0
206
        start = time.time()
207 208 209

        try:
            while True:
210

211 212 213
                loss_val = train_exe.run(fetch_list=[loss.name])
                loss_val = np.mean(loss_val)

214
                if batch_id % 50 == 0:
J
JiabinYang 已提交
215 216 217
                    logger.info(
                        "TRAIN --> pass: {} batch: {} loss: {} reader queue:{}".
                        format(pass_id, batch_id,
J
JiabinYang 已提交
218
                               loss_val.mean(), py_reader.queue.size()))
219 220
                if args.with_speed:
                    if batch_id % 1000 == 0 and batch_id != 0:
221 222
                        elapsed = (time.time() - start)
                        start = time.time()
223 224 225 226
                        samples = 1001 * args.batch_size * int(
                            os.getenv("CPU_NUM"))
                        logger.info("Time used: {}, Samples/Sec: {}".format(
                            elapsed, samples / elapsed))
J
JiabinYang 已提交
227
                # calculate infer result each 100 batches when using --with_infer_test
228 229 230 231
                if args.with_infer_test:
                    if batch_id % 1000 == 0 and batch_id != 0:
                        model_dir = args.model_output_dir + '/batch-' + str(
                            batch_id)
J
JiabinYang 已提交
232
                        inference_test(global_scope(), model_dir, args)
J
JiabinYang 已提交
233

234
                if batch_id % 500000 == 0 and batch_id != 0:
J
JiabinYang 已提交
235 236 237 238 239
                    model_dir = args.model_output_dir + '/batch-' + str(
                        batch_id)
                    fluid.io.save_persistables(executor=exe, dirname=model_dir)
                    with open(model_dir + "/_success", 'w+') as f:
                        f.write(str(batch_id))
240 241 242 243 244
                batch_id += 1

        except fluid.core.EOFException:
            py_reader.reset()
            epoch_end = time.time()
245
            logger.info("Epoch: {0}, Train total expend: {1} ".format(
J
JiabinYang 已提交
246
                pass_id, epoch_end - epoch_start))
247

248 249 250 251 252
            model_dir = args.model_output_dir + '/pass-' + str(pass_id)
            if trainer_id == 0:
                fluid.io.save_persistables(executor=exe, dirname=model_dir)
                with open(model_dir + "/_success", 'w+') as f:
                    f.write(str(pass_id))
Q
Qiao Longfei 已提交
253 254


255 256 257 258 259
def GetFileList(data_path):
    return os.listdir(data_path)


def train(args):
Q
Qiao Longfei 已提交
260 261 262 263

    if not os.path.isdir(args.model_output_dir):
        os.mkdir(args.model_output_dir)

264
    filelist = GetFileList(args.train_data_path)
265 266 267 268 269 270
    word2vec_reader = None
    if args.is_local or os.getenv("PADDLE_IS_LOCAL", "1") == "1":
        word2vec_reader = reader.Word2VecReader(
            args.dict_path, args.train_data_path, filelist, 0, 1)
    else:
        trainer_id = int(os.environ["PADDLE_TRAINER_ID"])
271
        trainer_num = int(os.environ["PADDLE_TRAINERS"])
272 273 274
        word2vec_reader = reader.Word2VecReader(args.dict_path,
                                                args.train_data_path, filelist,
                                                trainer_id, trainer_num)
275 276

    logger.info("dict_size: {}".format(word2vec_reader.dict_size))
277
    loss, py_reader = skip_gram_word2vec(
J
JiabinYang 已提交
278 279 280 281 282 283 284 285
        word2vec_reader.dict_size,
        word2vec_reader.word_frequencys,
        args.embedding_size,
        args.max_code_length,
        args.with_hs,
        args.with_nce,
        is_sparse=args.is_sparse)

286 287
    optimizer = None
    if args.with_Adam:
J
JiabinYang 已提交
288
        optimizer = fluid.optimizer.Adam(learning_rate=1e-4, lazy_mode=True)
289
    else:
J
JiabinYang 已提交
290
        optimizer = fluid.optimizer.SGD(learning_rate=1e-4)
291

Q
Qiao Longfei 已提交
292 293
    optimizer.minimize(loss)

294 295
    # do local training 
    if args.is_local or os.getenv("PADDLE_IS_LOCAL", "1") == "1":
Q
Qiao Longfei 已提交
296 297
        logger.info("run local training")
        main_program = fluid.default_main_program()
J
JiabinYang 已提交
298 299 300 301

        with open("local.main.proto", "w") as f:
            f.write(str(main_program))

302
        train_loop(args, main_program, word2vec_reader, py_reader, loss, 0)
303
    # do distribute training
Q
Qiao Longfei 已提交
304 305
    else:
        logger.info("run dist training")
T
tangwei12 已提交
306 307 308 309 310

        trainer_id = int(os.environ["PADDLE_TRAINER_ID"])
        trainers = int(os.environ["PADDLE_TRAINERS"])
        training_role = os.environ["PADDLE_TRAINING_ROLE"]

J
JiabinYang 已提交
311 312
        port = os.getenv("PADDLE_PSERVER_PORT", "6174")
        pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "")
T
tangwei12 已提交
313
        eplist = []
J
JiabinYang 已提交
314 315
        for ip in pserver_ips.split(","):
            eplist.append(':'.join([ip, port]))
T
tangwei12 已提交
316
        pserver_endpoints = ",".join(eplist)
J
JiabinYang 已提交
317
        current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port
T
tangwei12 已提交
318 319 320 321

        config = fluid.DistributeTranspilerConfig()
        config.slice_var_up = False
        t = fluid.DistributeTranspiler(config=config)
J
JiabinYang 已提交
322 323 324 325 326
        t.transpile(
            trainer_id,
            pservers=pserver_endpoints,
            trainers=trainers,
            sync_mode=True)
T
tangwei12 已提交
327 328

        if training_role == "PSERVER":
Q
Qiao Longfei 已提交
329
            logger.info("run pserver")
T
tangwei12 已提交
330
            prog = t.get_pserver_program(current_endpoint)
J
JiabinYang 已提交
331 332
            startup = t.get_startup_program(
                current_endpoint, pserver_program=prog)
T
tangwei12 已提交
333

J
JiabinYang 已提交
334 335
            with open("pserver.main.proto.{}".format(os.getenv("CUR_PORT")),
                      "w") as f:
T
tangwei12 已提交
336 337
                f.write(str(prog))

Q
Qiao Longfei 已提交
338 339 340
            exe = fluid.Executor(fluid.CPUPlace())
            exe.run(startup)
            exe.run(prog)
T
tangwei12 已提交
341
        elif training_role == "TRAINER":
Q
Qiao Longfei 已提交
342 343
            logger.info("run trainer")
            train_prog = t.get_trainer_program()
T
tangwei12 已提交
344 345 346 347

            with open("trainer.main.proto.{}".format(trainer_id), "w") as f:
                f.write(str(train_prog))

J
JiabinYang 已提交
348 349
            train_loop(args, train_prog, word2vec_reader, py_reader, loss,
                       trainer_id)
Q
Qiao Longfei 已提交
350 351


J
JiabinYang 已提交
352 353 354 355 356 357 358 359 360 361 362 363 364 365 366
def env_declar():
    print("********  Rename Cluster Env to PaddleFluid Env ********")

    print("Content-Type: text/plain\n\n")
    for key in os.environ.keys():
        print("%30s %s \n" % (key, os.environ[key]))

    if os.environ["TRAINING_ROLE"] == "PSERVER" or os.environ[
            "PADDLE_IS_LOCAL"] == "0":
        os.environ["PADDLE_TRAINING_ROLE"] = os.environ["TRAINING_ROLE"]
        os.environ["PADDLE_PSERVER_PORT"] = os.environ["PADDLE_PORT"]
        os.environ["PADDLE_PSERVER_IPS"] = os.environ["PADDLE_PSERVERS"]
        os.environ["PADDLE_TRAINERS"] = os.environ["PADDLE_TRAINERS_NUM"]
        os.environ["PADDLE_CURRENT_IP"] = os.environ["POD_IP"]
        os.environ["PADDLE_TRAINER_ID"] = os.environ["PADDLE_TRAINER_ID"]
367
        # we set the thread number same as CPU number
J
JiabinYang 已提交
368 369 370 371 372 373 374 375 376
        os.environ["CPU_NUM"] = "12"

    print("Content-Type: text/plain\n\n")
    for key in os.environ.keys():
        print("%30s %s \n" % (key, os.environ[key]))

    print("******  Rename Cluster Env to PaddleFluid Env END ******")


Q
Qiao Longfei 已提交
377
if __name__ == '__main__':
378 379 380 381 382 383
    args = parse_args()
    if args.is_local:
        pass
    else:
        env_declar()
    train(args)