train.py 9.5 KB
Newer Older
Q
Qiao Longfei 已提交
1 2 3 4 5
from __future__ import print_function

import argparse
import logging
import os
6
import time
Q
Qiao Longfei 已提交
7

8 9
import numpy as np

T
tangwei12 已提交
10
# disable gpu training for this example
Q
Qiao Longfei 已提交
11 12 13 14 15 16 17 18 19 20 21 22 23 24
os.environ["CUDA_VISIBLE_DEVICES"] = ""

import paddle
import paddle.fluid as fluid

import reader
from network_conf import skip_gram_word2vec

logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)


def parse_args():
J
JiabinYang 已提交
25 26
    parser = argparse.ArgumentParser(
        description="PaddlePaddle Word2vec example")
Q
Qiao Longfei 已提交
27 28 29
    parser.add_argument(
        '--train_data_path',
        type=str,
Q
Qiao Longfei 已提交
30
        default='./data/enwik8',
Q
Qiao Longfei 已提交
31
        help="The path of training dataset")
Q
Qiao Longfei 已提交
32 33 34
    parser.add_argument(
        '--dict_path',
        type=str,
Q
Qiao Longfei 已提交
35
        default='./data/enwik8_dict',
Q
Qiao Longfei 已提交
36
        help="The path of data dict")
Q
Qiao Longfei 已提交
37 38 39
    parser.add_argument(
        '--test_data_path',
        type=str,
Q
Qiao Longfei 已提交
40
        default='./data/text8',
Q
Qiao Longfei 已提交
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
        help="The path of testing dataset")
    parser.add_argument(
        '--batch_size',
        type=int,
        default=100,
        help="The size of mini-batch (default:1000)")
    parser.add_argument(
        '--num_passes',
        type=int,
        default=10,
        help="The number of passes to train (default: 10)")
    parser.add_argument(
        '--model_output_dir',
        type=str,
        default='models',
        help='The path for model to store (default: models)')
    parser.add_argument(
        '--embedding_size',
        type=int,
        default=64,
        help='sparse feature hashing space for index processing')

T
tangwei12 已提交
63 64 65 66 67 68 69 70 71 72 73
    parser.add_argument(
        '--with_hs',
        action='store_true',
        required=False,
        default=False,
        help='using hierarchical sigmoid, (default: False)')

    parser.add_argument(
        '--with_nce',
        action='store_true',
        required=False,
J
JiabinYang 已提交
74
        default=False,
T
tangwei12 已提交
75 76 77 78 79 80 81 82 83
        help='using negtive sampling, (default: True)')

    parser.add_argument(
        '--max_code_length',
        type=int,
        default=40,
        help='max code length used by hierarchical sigmoid, (default: 40)')
    parser.add_argument(
        '--is_sparse',
T
tangwei12 已提交
84 85
        action='store_true',
        required=False,
T
tangwei12 已提交
86 87 88
        default=False,
        help='embedding and nce will use sparse or not, (default: False)')

Q
Qiao Longfei 已提交
89 90 91
    return parser.parse_args()


92 93
def train_loop(args, train_program, reader, py_reader, loss, trainer_id):

Q
Qiao Longfei 已提交
94 95
    train_reader = paddle.batch(
        paddle.reader.shuffle(
96 97
            reader.train((args.with_hs or (not args.with_nce))),
            buf_size=args.batch_size * 100),
Q
Qiao Longfei 已提交
98 99
        batch_size=args.batch_size)

100 101 102
    py_reader.decorate_paddle_reader(train_reader)

    place = fluid.CPUPlace()
103

104
    data_name_list = None
Q
Qiao Longfei 已提交
105 106 107

    exe = fluid.Executor(place)
    exe.run(fluid.default_startup_program())
108 109

    exec_strategy = fluid.ExecutionStrategy()
T
tangwei12 已提交
110

J
JiabinYang 已提交
111 112 113 114
    #if os.getenv("NUM_THREADS", ""):
    #    exec_strategy.num_threads = int(os.getenv("NUM_THREADS"))
    print("CPU_NUM:" + str(os.getenv("CPU_NUM")))
    exec_strategy.num_threads = int(os.getenv("CPU_NUM"))
T
tangwei12 已提交
115

116
    build_strategy = fluid.BuildStrategy()
J
JiabinYang 已提交
117 118
    if int(os.getenv("CPU_NUM")) > 1:
        build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
119 120 121 122 123 124 125 126 127 128 129 130 131

    train_exe = fluid.ParallelExecutor(
        use_cuda=False,
        loss_name=loss.name,
        main_program=train_program,
        build_strategy=build_strategy,
        exec_strategy=exec_strategy)

    profile_state = "CPU"
    profiler_step = 0
    profiler_step_start = 20
    profiler_step_end = 30

Q
Qiao Longfei 已提交
132
    for pass_id in range(args.num_passes):
133 134 135
        epoch_start = time.time()
        py_reader.start()
        batch_id = 0
J
JiabinYang 已提交
136
        start = time.clock()
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152

        try:
            while True:
                if profiler_step == profiler_step_start:
                    fluid.profiler.start_profiler(profile_state)

                loss_val = train_exe.run(fetch_list=[loss.name])
                loss_val = np.mean(loss_val)

                if profiler_step == profiler_step_end:
                    fluid.profiler.stop_profiler('total', 'trainer_profile.log')
                    profiler_step += 1
                else:
                    profiler_step += 1

                if batch_id % 10 == 0:
J
JiabinYang 已提交
153 154 155 156 157
                    logger.info(
                        "TRAIN --> pass: {} batch: {} loss: {} reader queue:{}".
                        format(pass_id, batch_id,
                               loss_val.mean() / args.batch_size,
                               py_reader.queue.size()))
J
JiabinYang 已提交
158
                if batch_id % 100 == 0 and batch_id != 0:
159
                    elapsed = (time.clock() - start)
J
JiabinYang 已提交
160 161 162 163 164 165 166 167 168 169 170 171 172
                    start = time.clock()
                    samples = 101 * args.batch_size * int(os.getenv("CPU_NUM"))
                    logger.info("Time used: {}, Samples/Sec: {}".format(
                        elapsed, samples / elapsed))

                # elapsed = (time.clock() - start)
                # start = time.clock()
                # samples = 101 * args.batch_size * int(os.getenv("CPU_NUM"))
                # logger.info("Time used: {}, Samples/Sec: {}".format(elapsed, samples/elapsed))
                #if batch_id % 1000 == 0 and batch_id != 0:
                #    model_dir = args.model_output_dir + '/batch-' + str(batch_id)
                #    if trainer_id == 0:
                #        fluid.io.save_inference_model(model_dir, data_name_list, [loss], exe)
173 174 175 176 177
                batch_id += 1

        except fluid.core.EOFException:
            py_reader.reset()
            epoch_end = time.time()
J
JiabinYang 已提交
178 179
            print("Epoch: {0}, Train total expend: {1} ".format(
                pass_id, epoch_end - epoch_start))
180

J
JiabinYang 已提交
181 182 183
            #model_dir = args.model_output_dir + '/pass-' + str(pass_id)
            #if trainer_id == 0:
            #    fluid.io.save_inference_model(model_dir, data_name_list, [loss], exe)
Q
Qiao Longfei 已提交
184 185 186 187 188 189 190 191


def train():
    args = parse_args()

    if not os.path.isdir(args.model_output_dir):
        os.mkdir(args.model_output_dir)

Q
Qiao Longfei 已提交
192 193
    word2vec_reader = reader.Word2VecReader(args.dict_path,
                                            args.train_data_path)
194 195 196

    logger.info("dict_size: {}".format(word2vec_reader.dict_size))

197
    loss, py_reader = skip_gram_word2vec(
J
JiabinYang 已提交
198 199 200 201 202 203 204 205
        word2vec_reader.dict_size,
        word2vec_reader.word_frequencys,
        args.embedding_size,
        args.max_code_length,
        args.with_hs,
        args.with_nce,
        is_sparse=args.is_sparse)

J
JiabinYang 已提交
206 207
    #optimizer = fluid.optimizer.SGD(learning_rate=1e-3)
    optimizer = fluid.optimizer.Adam(learning_rate=1e-3)
Q
Qiao Longfei 已提交
208 209
    optimizer.minimize(loss)

T
tangwei12 已提交
210
    if os.getenv("PADDLE_IS_LOCAL", "1") == "1":
Q
Qiao Longfei 已提交
211 212
        logger.info("run local training")
        main_program = fluid.default_main_program()
J
JiabinYang 已提交
213 214 215 216

        with open("local.main.proto", "w") as f:
            f.write(str(main_program))

217
        train_loop(args, main_program, word2vec_reader, py_reader, loss, 0)
Q
Qiao Longfei 已提交
218 219
    else:
        logger.info("run dist training")
T
tangwei12 已提交
220 221 222 223 224

        trainer_id = int(os.environ["PADDLE_TRAINER_ID"])
        trainers = int(os.environ["PADDLE_TRAINERS"])
        training_role = os.environ["PADDLE_TRAINING_ROLE"]

J
JiabinYang 已提交
225 226
        port = os.getenv("PADDLE_PSERVER_PORT", "6174")
        pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "")
T
tangwei12 已提交
227
        eplist = []
J
JiabinYang 已提交
228 229
        for ip in pserver_ips.split(","):
            eplist.append(':'.join([ip, port]))
T
tangwei12 已提交
230
        pserver_endpoints = ",".join(eplist)
J
JiabinYang 已提交
231
        current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port
T
tangwei12 已提交
232 233 234 235

        config = fluid.DistributeTranspilerConfig()
        config.slice_var_up = False
        t = fluid.DistributeTranspiler(config=config)
J
JiabinYang 已提交
236 237 238 239 240
        t.transpile(
            trainer_id,
            pservers=pserver_endpoints,
            trainers=trainers,
            sync_mode=True)
T
tangwei12 已提交
241 242

        if training_role == "PSERVER":
Q
Qiao Longfei 已提交
243
            logger.info("run pserver")
T
tangwei12 已提交
244
            prog = t.get_pserver_program(current_endpoint)
J
JiabinYang 已提交
245 246
            startup = t.get_startup_program(
                current_endpoint, pserver_program=prog)
T
tangwei12 已提交
247

J
JiabinYang 已提交
248 249
            with open("pserver.main.proto.{}".format(os.getenv("CUR_PORT")),
                      "w") as f:
T
tangwei12 已提交
250 251
                f.write(str(prog))

Q
Qiao Longfei 已提交
252 253 254
            exe = fluid.Executor(fluid.CPUPlace())
            exe.run(startup)
            exe.run(prog)
T
tangwei12 已提交
255
        elif training_role == "TRAINER":
Q
Qiao Longfei 已提交
256 257
            logger.info("run trainer")
            train_prog = t.get_trainer_program()
T
tangwei12 已提交
258 259 260 261

            with open("trainer.main.proto.{}".format(trainer_id), "w") as f:
                f.write(str(train_prog))

J
JiabinYang 已提交
262 263
            train_loop(args, train_prog, word2vec_reader, py_reader, loss,
                       trainer_id)
Q
Qiao Longfei 已提交
264 265


J
JiabinYang 已提交
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
def env_declar():
    print("********  Rename Cluster Env to PaddleFluid Env ********")

    print("Content-Type: text/plain\n\n")
    for key in os.environ.keys():
        print("%30s %s \n" % (key, os.environ[key]))

    if os.environ["TRAINING_ROLE"] == "PSERVER" or os.environ[
            "PADDLE_IS_LOCAL"] == "0":
        os.environ["PADDLE_TRAINING_ROLE"] = os.environ["TRAINING_ROLE"]
        os.environ["PADDLE_PSERVER_PORT"] = os.environ["PADDLE_PORT"]
        os.environ["PADDLE_PSERVER_IPS"] = os.environ["PADDLE_PSERVERS"]
        os.environ["PADDLE_TRAINERS"] = os.environ["PADDLE_TRAINERS_NUM"]
        os.environ["PADDLE_CURRENT_IP"] = os.environ["POD_IP"]
        os.environ["PADDLE_TRAINER_ID"] = os.environ["PADDLE_TRAINER_ID"]
        os.environ["CPU_NUM"] = "12"
        os.environ["NUM_THREADS"] = "12"

    print("Content-Type: text/plain\n\n")
    for key in os.environ.keys():
        print("%30s %s \n" % (key, os.environ[key]))

    print("******  Rename Cluster Env to PaddleFluid Env END ******")


Q
Qiao Longfei 已提交
291
if __name__ == '__main__':
J
JiabinYang 已提交
292
    #`env_declar()
J
JiabinYang 已提交
293
    train()