train.py 12.3 KB
Newer Older
Q
Qiao Longfei 已提交
1 2 3 4
from __future__ import print_function
import argparse
import logging
import os
5
import time
Q
Qiao Longfei 已提交
6

7 8
import numpy as np

T
tangwei12 已提交
9
# disable gpu training for this example
Q
Qiao Longfei 已提交
10 11 12 13
os.environ["CUDA_VISIBLE_DEVICES"] = ""

import paddle
import paddle.fluid as fluid
J
JiabinYang 已提交
14
from paddle.fluid.executor import global_scope
J
JiabinYang 已提交
15
import six
Q
Qiao Longfei 已提交
16 17
import reader
from network_conf import skip_gram_word2vec
J
JiabinYang 已提交
18
from infer import inference_test
Q
Qiao Longfei 已提交
19 20 21 22 23 24 25

logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)


def parse_args():
J
JiabinYang 已提交
26 27
    parser = argparse.ArgumentParser(
        description="PaddlePaddle Word2vec example")
Q
Qiao Longfei 已提交
28 29 30
    parser.add_argument(
        '--train_data_path',
        type=str,
31
        default='./data/1-billion-word-language-modeling-benchmark-r13output/training-monolingual.tokenized.shuffled',
J
JiabinYang 已提交
32
        help="The path of taining dataset")
Q
Qiao Longfei 已提交
33 34 35
    parser.add_argument(
        '--dict_path',
        type=str,
36
        default='./data/1-billion_dict',
Q
Qiao Longfei 已提交
37
        help="The path of data dict")
Q
Qiao Longfei 已提交
38 39 40
    parser.add_argument(
        '--test_data_path',
        type=str,
Q
Qiao Longfei 已提交
41
        default='./data/text8',
Q
Qiao Longfei 已提交
42 43 44 45
        help="The path of testing dataset")
    parser.add_argument(
        '--batch_size',
        type=int,
J
JiabinYang 已提交
46
        default=1000,
47
        help="The size of mini-batch (default:100)")
Q
Qiao Longfei 已提交
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
    parser.add_argument(
        '--num_passes',
        type=int,
        default=10,
        help="The number of passes to train (default: 10)")
    parser.add_argument(
        '--model_output_dir',
        type=str,
        default='models',
        help='The path for model to store (default: models)')
    parser.add_argument(
        '--embedding_size',
        type=int,
        default=64,
        help='sparse feature hashing space for index processing')

T
tangwei12 已提交
64 65 66 67 68 69 70 71 72 73 74
    parser.add_argument(
        '--with_hs',
        action='store_true',
        required=False,
        default=False,
        help='using hierarchical sigmoid, (default: False)')

    parser.add_argument(
        '--with_nce',
        action='store_true',
        required=False,
J
JiabinYang 已提交
75
        default=False,
T
tangwei12 已提交
76 77 78 79 80 81 82
        help='using negtive sampling, (default: True)')

    parser.add_argument(
        '--max_code_length',
        type=int,
        default=40,
        help='max code length used by hierarchical sigmoid, (default: 40)')
83

T
tangwei12 已提交
84 85
    parser.add_argument(
        '--is_sparse',
T
tangwei12 已提交
86 87
        action='store_true',
        required=False,
T
tangwei12 已提交
88 89 90
        default=False,
        help='embedding and nce will use sparse or not, (default: False)')

91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
    parser.add_argument(
        '--with_Adam',
        action='store_true',
        required=False,
        default=False,
        help='Using Adam as optimizer or not, (default: False)')

    parser.add_argument(
        '--is_local',
        action='store_true',
        required=False,
        default=False,
        help='Local train or not, (default: False)')

    parser.add_argument(
        '--with_speed',
        action='store_true',
        required=False,
        default=False,
        help='print speed or not , (default: False)')

    parser.add_argument(
        '--with_infer_test',
        action='store_true',
        required=False,
        default=False,
        help='Do inference every 100 batches , (default: False)')

    parser.add_argument(
        '--rank_num',
        type=int,
        default=4,
        help="find rank_num-nearest result for test (default: 4)")

Q
Qiao Longfei 已提交
125 126 127
    return parser.parse_args()


J
JiabinYang 已提交
128
def convert_python_to_tensor(batch_size, sample_reader, is_hs):
J
JiabinYang 已提交
129
    def __reader__():
J
JiabinYang 已提交
130 131 132 133 134
        result = None
        if is_hs:
            result = [[], [], [], []]
        else:
            result = [[], []]
J
JiabinYang 已提交
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
        for sample in sample_reader():
            for i, fea in enumerate(sample):
                result[i].append(fea)
            if len(result[0]) == batch_size:
                tensor_result = []
                for tensor in result:
                    t = fluid.Tensor()
                    dat = np.array(tensor, dtype='int64')
                    if len(dat.shape) > 2:
                        dat = dat.reshape((dat.shape[0], dat.shape[2]))
                    elif len(dat.shape) == 1:
                        dat = dat.reshape((-1, 1))
                    t.set(dat, fluid.CPUPlace())

                    tensor_result.append(t)
                yield tensor_result
J
JiabinYang 已提交
151 152 153 154
                if is_hs:
                    result = [[], [], [], []]
                else:
                    result = [[], []]
J
JiabinYang 已提交
155 156 157 158

    return __reader__


159
def train_loop(args, train_program, reader, py_reader, loss, trainer_id):
Q
Qiao Longfei 已提交
160

J
JiabinYang 已提交
161 162 163
    py_reader.decorate_tensor_provider(
        convert_python_to_tensor(args.batch_size,
                                 reader.train((args.with_hs or (
J
JiabinYang 已提交
164 165
                                     not args.with_nce))), (args.with_hs or (
                                         not args.with_nce))))
166 167

    place = fluid.CPUPlace()
168

Q
Qiao Longfei 已提交
169 170
    exe = fluid.Executor(place)
    exe.run(fluid.default_startup_program())
171 172

    exec_strategy = fluid.ExecutionStrategy()
J
JiabinYang 已提交
173
    exec_strategy.use_experimental_executor = True
T
tangwei12 已提交
174

J
JiabinYang 已提交
175 176
    print("CPU_NUM:" + str(os.getenv("CPU_NUM")))
    exec_strategy.num_threads = int(os.getenv("CPU_NUM"))
T
tangwei12 已提交
177

178
    build_strategy = fluid.BuildStrategy()
J
JiabinYang 已提交
179 180
    if int(os.getenv("CPU_NUM")) > 1:
        build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
181 182 183 184 185 186 187 188 189 190 191 192 193

    train_exe = fluid.ParallelExecutor(
        use_cuda=False,
        loss_name=loss.name,
        main_program=train_program,
        build_strategy=build_strategy,
        exec_strategy=exec_strategy)

    profile_state = "CPU"
    profiler_step = 0
    profiler_step_start = 20
    profiler_step_end = 30

Q
Qiao Longfei 已提交
194
    for pass_id in range(args.num_passes):
195
        py_reader.start()
J
JiabinYang 已提交
196 197
        time.sleep(10)
        epoch_start = time.time()
198
        batch_id = 0
J
JiabinYang 已提交
199
        start = time.clock()
200 201 202

        try:
            while True:
203

204 205 206
                loss_val = train_exe.run(fetch_list=[loss.name])
                loss_val = np.mean(loss_val)

207
                if batch_id % 50 == 0:
J
JiabinYang 已提交
208 209 210
                    logger.info(
                        "TRAIN --> pass: {} batch: {} loss: {} reader queue:{}".
                        format(pass_id, batch_id,
J
JiabinYang 已提交
211
                               loss_val.mean(), py_reader.queue.size()))
212 213 214 215 216 217 218 219
                if args.with_speed:
                    if batch_id % 1000 == 0 and batch_id != 0:
                        elapsed = (time.clock() - start)
                        start = time.clock()
                        samples = 1001 * args.batch_size * int(
                            os.getenv("CPU_NUM"))
                        logger.info("Time used: {}, Samples/Sec: {}".format(
                            elapsed, samples / elapsed))
J
JiabinYang 已提交
220
                # calculate infer result each 100 batches when using --with_infer_test
221 222 223 224
                if args.with_infer_test:
                    if batch_id % 1000 == 0 and batch_id != 0:
                        model_dir = args.model_output_dir + '/batch-' + str(
                            batch_id)
J
JiabinYang 已提交
225
                        inference_test(global_scope(), model_dir, args)
J
JiabinYang 已提交
226

227
                if batch_id % 500000 == 0 and batch_id != 0:
J
JiabinYang 已提交
228 229 230 231 232
                    model_dir = args.model_output_dir + '/batch-' + str(
                        batch_id)
                    fluid.io.save_persistables(executor=exe, dirname=model_dir)
                    with open(model_dir + "/_success", 'w+') as f:
                        f.write(str(batch_id))
233 234 235 236 237
                batch_id += 1

        except fluid.core.EOFException:
            py_reader.reset()
            epoch_end = time.time()
238
            logger.info("Epoch: {0}, Train total expend: {1} ".format(
J
JiabinYang 已提交
239
                pass_id, epoch_end - epoch_start))
240

241 242 243 244 245
            model_dir = args.model_output_dir + '/pass-' + str(pass_id)
            if trainer_id == 0:
                fluid.io.save_persistables(executor=exe, dirname=model_dir)
                with open(model_dir + "/_success", 'w+') as f:
                    f.write(str(pass_id))
Q
Qiao Longfei 已提交
246 247


248 249 250 251 252
def GetFileList(data_path):
    return os.listdir(data_path)


def train(args):
Q
Qiao Longfei 已提交
253 254 255 256

    if not os.path.isdir(args.model_output_dir):
        os.mkdir(args.model_output_dir)

257
    filelist = GetFileList(args.train_data_path)
258 259 260 261 262 263 264 265 266 267
    word2vec_reader = None
    if args.is_local or os.getenv("PADDLE_IS_LOCAL", "1") == "1":
        word2vec_reader = reader.Word2VecReader(
            args.dict_path, args.train_data_path, filelist, 0, 1)
    else:
        trainer_id = int(os.environ["PADDLE_TRAINER_ID"])
        trainers = int(os.environ["PADDLE_TRAINERS"])
        word2vec_reader = reader.Word2VecReader(args.dict_path,
                                                args.train_data_path, filelist,
                                                trainer_id, trainer_num)
268 269

    logger.info("dict_size: {}".format(word2vec_reader.dict_size))
270
    loss, py_reader = skip_gram_word2vec(
J
JiabinYang 已提交
271 272 273 274 275 276 277 278
        word2vec_reader.dict_size,
        word2vec_reader.word_frequencys,
        args.embedding_size,
        args.max_code_length,
        args.with_hs,
        args.with_nce,
        is_sparse=args.is_sparse)

279 280
    optimizer = None
    if args.with_Adam:
J
JiabinYang 已提交
281
        optimizer = fluid.optimizer.Adam(learning_rate=1e-4, lazy_mode=True)
282
    else:
J
JiabinYang 已提交
283
        optimizer = fluid.optimizer.SGD(learning_rate=1e-4)
284

Q
Qiao Longfei 已提交
285 286
    optimizer.minimize(loss)

287 288
    # do local training 
    if args.is_local or os.getenv("PADDLE_IS_LOCAL", "1") == "1":
Q
Qiao Longfei 已提交
289 290
        logger.info("run local training")
        main_program = fluid.default_main_program()
J
JiabinYang 已提交
291 292 293 294

        with open("local.main.proto", "w") as f:
            f.write(str(main_program))

295
        train_loop(args, main_program, word2vec_reader, py_reader, loss, 0)
296
    # do distribute training
Q
Qiao Longfei 已提交
297 298
    else:
        logger.info("run dist training")
T
tangwei12 已提交
299 300 301 302 303

        trainer_id = int(os.environ["PADDLE_TRAINER_ID"])
        trainers = int(os.environ["PADDLE_TRAINERS"])
        training_role = os.environ["PADDLE_TRAINING_ROLE"]

J
JiabinYang 已提交
304 305
        port = os.getenv("PADDLE_PSERVER_PORT", "6174")
        pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "")
T
tangwei12 已提交
306
        eplist = []
J
JiabinYang 已提交
307 308
        for ip in pserver_ips.split(","):
            eplist.append(':'.join([ip, port]))
T
tangwei12 已提交
309
        pserver_endpoints = ",".join(eplist)
J
JiabinYang 已提交
310
        current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port
T
tangwei12 已提交
311 312 313 314

        config = fluid.DistributeTranspilerConfig()
        config.slice_var_up = False
        t = fluid.DistributeTranspiler(config=config)
J
JiabinYang 已提交
315 316 317 318 319
        t.transpile(
            trainer_id,
            pservers=pserver_endpoints,
            trainers=trainers,
            sync_mode=True)
T
tangwei12 已提交
320 321

        if training_role == "PSERVER":
Q
Qiao Longfei 已提交
322
            logger.info("run pserver")
T
tangwei12 已提交
323
            prog = t.get_pserver_program(current_endpoint)
J
JiabinYang 已提交
324 325
            startup = t.get_startup_program(
                current_endpoint, pserver_program=prog)
T
tangwei12 已提交
326

J
JiabinYang 已提交
327 328
            with open("pserver.main.proto.{}".format(os.getenv("CUR_PORT")),
                      "w") as f:
T
tangwei12 已提交
329 330
                f.write(str(prog))

Q
Qiao Longfei 已提交
331 332 333
            exe = fluid.Executor(fluid.CPUPlace())
            exe.run(startup)
            exe.run(prog)
T
tangwei12 已提交
334
        elif training_role == "TRAINER":
Q
Qiao Longfei 已提交
335 336
            logger.info("run trainer")
            train_prog = t.get_trainer_program()
T
tangwei12 已提交
337 338 339 340

            with open("trainer.main.proto.{}".format(trainer_id), "w") as f:
                f.write(str(train_prog))

J
JiabinYang 已提交
341 342
            train_loop(args, train_prog, word2vec_reader, py_reader, loss,
                       trainer_id)
Q
Qiao Longfei 已提交
343 344


J
JiabinYang 已提交
345 346 347 348 349 350 351 352 353 354 355 356 357 358 359
def env_declar():
    print("********  Rename Cluster Env to PaddleFluid Env ********")

    print("Content-Type: text/plain\n\n")
    for key in os.environ.keys():
        print("%30s %s \n" % (key, os.environ[key]))

    if os.environ["TRAINING_ROLE"] == "PSERVER" or os.environ[
            "PADDLE_IS_LOCAL"] == "0":
        os.environ["PADDLE_TRAINING_ROLE"] = os.environ["TRAINING_ROLE"]
        os.environ["PADDLE_PSERVER_PORT"] = os.environ["PADDLE_PORT"]
        os.environ["PADDLE_PSERVER_IPS"] = os.environ["PADDLE_PSERVERS"]
        os.environ["PADDLE_TRAINERS"] = os.environ["PADDLE_TRAINERS_NUM"]
        os.environ["PADDLE_CURRENT_IP"] = os.environ["POD_IP"]
        os.environ["PADDLE_TRAINER_ID"] = os.environ["PADDLE_TRAINER_ID"]
360
        # we set the thread number same as CPU number
J
JiabinYang 已提交
361 362 363 364 365 366 367 368 369
        os.environ["CPU_NUM"] = "12"

    print("Content-Type: text/plain\n\n")
    for key in os.environ.keys():
        print("%30s %s \n" % (key, os.environ[key]))

    print("******  Rename Cluster Env to PaddleFluid Env END ******")


Q
Qiao Longfei 已提交
370
if __name__ == '__main__':
371 372 373 374 375 376
    args = parse_args()
    if args.is_local:
        pass
    else:
        env_declar()
    train(args)