diff --git a/fluid/language_model/.run_ce.sh b/fluid/language_model/.run_ce.sh new file mode 100644 index 0000000000000000000000000000000000000000..c0cb8f5473a100ebf0e5c0697f9ab5e65e97560d --- /dev/null +++ b/fluid/language_model/.run_ce.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +export MKL_NUM_THREADS=1 +export OMP_NUM_THREADS=1 + +cudaid=${language_model:=0} # use 0-th card as default +export CUDA_VISIBLE_DEVICES=$cudaid + +FLAGS_benchmark=true python train.py | python _ce.py + +cudaid=${language_model_m:=0,1,2,3} # use 0,1,2,3 card as default +export CUDA_VISIBLE_DEVICES=$cudaid + +FLAGS_benchmark=true python train.py | python _ce.py diff --git a/fluid/language_model/_ce.py b/fluid/language_model/_ce.py new file mode 100644 index 0000000000000000000000000000000000000000..d4999d7a1e14e333f1c7056b3dc2c5b506682ec6 --- /dev/null +++ b/fluid/language_model/_ce.py @@ -0,0 +1,62 @@ +# this file is only used for continuous evaluation test! + +import os +import sys +sys.path.append(os.environ['ceroot']) +from kpi import CostKpi +from kpi import DurationKpi + +imikolov_20_avg_ppl_kpi = CostKpi('imikolov_20_avg_ppl', 0.2, 0) +imikolov_20_pass_duration_kpi = DurationKpi( + 'imikolov_20_pass_duration', 0.02, 0, actived=True) +imikolov_20_avg_ppl_kpi_card4 = CostKpi('imikolov_20_avg_ppl_card4', 0.2, 0) +imikolov_20_pass_duration_kpi_card4 = DurationKpi( + 'imikolov_20_pass_duration_card4', 0.03, 0, actived=True) + +tracking_kpis = [ + imikolov_20_avg_ppl_kpi, + imikolov_20_pass_duration_kpi, + imikolov_20_avg_ppl_kpi_card4, + imikolov_20_pass_duration_kpi_card4, +] + + +def parse_log(log): + ''' + This method should be implemented by model developers. + + The suggestion: + + each line in the log should be key, value, for example: + + " + train_cost\t1.0 + test_cost\t1.0 + train_cost\t1.0 + train_cost\t1.0 + train_acc\t1.2 + " + ''' + for line in log.split('\n'): + fs = line.strip().split('\t') + print(fs) + if len(fs) == 3 and fs[0] == 'kpis': + kpi_name = fs[1] + kpi_value = float(fs[2]) + yield kpi_name, kpi_value + + +def log_to_ce(log): + kpi_tracker = {} + for kpi in tracking_kpis: + kpi_tracker[kpi.name] = kpi + + for (kpi_name, kpi_value) in parse_log(log): + print(kpi_name, kpi_value) + kpi_tracker[kpi_name].add_record(kpi_value) + kpi_tracker[kpi_name].persist() + + +if __name__ == '__main__': + log = sys.stdin.read() + log_to_ce(log) diff --git a/fluid/language_model/train.py b/fluid/language_model/train.py index 59fc3a987746af7aec9b61b5c817400b6b6546d0..f3e7a7398bf13e14c74ce1d10d90b7bf34031698 100644 --- a/fluid/language_model/train.py +++ b/fluid/language_model/train.py @@ -1,14 +1,28 @@ +import os import sys import time import numpy as np import math - +import argparse import paddle.fluid as fluid -import paddle.v2 as paddle +import paddle import utils +SEED = 102 + + +def parse_args(): + parser = argparse.ArgumentParser("language_model benchmark.") + parser.add_argument( + '--enable_ce', + action='store_true', + help='If set, run \ + the task with continuous evaluation logs.') + args = parser.parse_args() + return args + def network(src, dst, vocab_size, hid_size, init_low_bound, init_high_bound): """ network definition """ @@ -63,31 +77,26 @@ def train(train_reader, init_low_bound=-0.04, init_high_bound=0.04): """ train network """ + + args = parse_args() + if args.enable_ce: + # random seed must set before configuring the network. + fluid.default_startup_program().random_seed = SEED vocab_size = len(vocab) + #Input data src_wordseq = fluid.layers.data( name="src_wordseq", shape=[1], dtype="int64", lod_level=1) dst_wordseq = fluid.layers.data( name="dst_wordseq", shape=[1], dtype="int64", lod_level=1) + # Train program avg_cost = None - if not parallel: - cost = network(src_wordseq, dst_wordseq, vocab_size, hid_size, - init_low_bound, init_high_bound) - avg_cost = fluid.layers.mean(x=cost) - else: - places = fluid.layers.get_places() - pd = fluid.layers.ParallelDo(places) - with pd.do(): - cost = network( - pd.read_input(src_wordseq), - pd.read_input(dst_wordseq), vocab_size, hid_size, - init_low_bound, init_high_bound) - pd.write_output(cost) - - cost = pd() - avg_cost = fluid.layers.mean(x=cost) + cost = network(src_wordseq, dst_wordseq, vocab_size, hid_size, + init_low_bound, init_high_bound) + avg_cost = fluid.layers.mean(x=cost) + # Optimization to minimize lost sgd_optimizer = fluid.optimizer.SGD( learning_rate=fluid.layers.exponential_decay( learning_rate=base_lr, @@ -96,39 +105,56 @@ def train(train_reader, staircase=True)) sgd_optimizer.minimize(avg_cost) + # Initialize executor place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() exe = fluid.Executor(place) - exe.run(fluid.default_startup_program()) + + train_exe = fluid.ParallelExecutor(use_cuda=True, loss_name=avg_cost.name) + total_time = 0.0 + fetch_list = [avg_cost.name] for pass_idx in xrange(pass_num): epoch_idx = pass_idx + 1 print "epoch_%d start" % epoch_idx t0 = time.time() i = 0 + newest_ppl = 0 for data in train_reader(): i += 1 lod_src_wordseq = utils.to_lodtensor( map(lambda x: x[0], data), place) lod_dst_wordseq = utils.to_lodtensor( map(lambda x: x[1], data), place) - ret_avg_cost = exe.run(fluid.default_main_program(), - feed={ - "src_wordseq": lod_src_wordseq, - "dst_wordseq": lod_dst_wordseq - }, - fetch_list=[avg_cost], - use_program_cache=True) - avg_ppl = math.exp(ret_avg_cost[0]) + ret_avg_cost = train_exe.run(feed={ + "src_wordseq": lod_src_wordseq, + "dst_wordseq": lod_dst_wordseq + }, + fetch_list=fetch_list) + avg_ppl = np.exp(ret_avg_cost[0]) + newest_ppl = np.mean(avg_ppl) if i % 100 == 0: - print "step:%d ppl:%.3f" % (i, avg_ppl) + print "step:%d ppl:%.3f" % (i, newest_ppl) t1 = time.time() total_time += t1 - t0 print "epoch:%d num_steps:%d time_cost(s):%f" % (epoch_idx, i, total_time / epoch_idx) + if pass_idx == pass_num - 1 and args.enable_ce: + #Note: The following logs are special for CE monitoring. + #Other situations do not need to care about these logs. + gpu_num = get_cards() + if gpu_num == 1: + print("kpis imikolov_20_pass_duration %s" % + (total_time / epoch_idx)) + print("kpis imikolov_20_avg_ppl %s" % newest_ppl) + else: + print("kpis imikolov_20_pass_duration_card%s %s" % \ + (gpu_num, total_time / epoch_idx)) + print("kpis imikolov_20_avg_ppl_card%s %s" % + (gpu_num, newest_ppl)) save_dir = "%s/epoch_%d" % (model_dir, epoch_idx) feed_var_names = ["src_wordseq", "dst_wordseq"] fetch_vars = [avg_cost] @@ -138,11 +164,22 @@ def train(train_reader, print("finish training") +def get_cards(enable_ce): + if enable_ce: + cards = os.environ.get('CUDA_VISIBLE_DEVICES') + num = len(cards.split(",")) + return num + else: + return fluid.core.get_cuda_device_count() + + def train_net(): """ do training """ batch_size = 20 + args = parse_args() vocab, train_reader, test_reader = utils.prepare_data( - batch_size=batch_size, buffer_size=1000, word_freq_threshold=0) + batch_size=batch_size * get_cards(args.enable_ce), buffer_size=1000, \ + word_freq_threshold=0, enable_ce = args.enable_ce) train( train_reader=train_reader, vocab=vocab, @@ -152,7 +189,7 @@ def train_net(): batch_size=batch_size, pass_num=12, use_cuda=True, - parallel=False, + parallel=True, model_dir="model", init_low_bound=-0.1, init_high_bound=0.1) diff --git a/fluid/language_model/utils.py b/fluid/language_model/utils.py index c5909046176586556a2aedba5dd5d12810b3ea8d..dd03a89835e620dc8432a6ca16392fc5173a12d4 100644 --- a/fluid/language_model/utils.py +++ b/fluid/language_model/utils.py @@ -3,7 +3,7 @@ import time import numpy as np import paddle.fluid as fluid -import paddle.v2 as paddle +import paddle def to_lodtensor(data, place): @@ -22,17 +22,28 @@ def to_lodtensor(data, place): return res -def prepare_data(batch_size, buffer_size=1000, word_freq_threshold=0): +def prepare_data(batch_size, + buffer_size=1000, + word_freq_threshold=0, + enable_ce=False): """ prepare the English Pann Treebank (PTB) data """ vocab = paddle.dataset.imikolov.build_dict(word_freq_threshold) - train_reader = paddle.batch( - paddle.reader.shuffle( + if enable_ce: + train_reader = paddle.batch( paddle.dataset.imikolov.train( vocab, buffer_size, data_type=paddle.dataset.imikolov.DataType.SEQ), - buf_size=buffer_size), - batch_size) + batch_size) + else: + train_reader = paddle.batch( + paddle.reader.shuffle( + paddle.dataset.imikolov.train( + vocab, + buffer_size, + data_type=paddle.dataset.imikolov.DataType.SEQ), + buf_size=buffer_size), + batch_size) test_reader = paddle.batch( paddle.dataset.imikolov.test( vocab, buffer_size, data_type=paddle.dataset.imikolov.DataType.SEQ),