提交 db405c78 编写于 作者: J JiabinYang

change reader to fit 1-billion dataset, add infer

上级 72ed55fb
......@@ -8,7 +8,7 @@
需要先安装PaddlePaddle Fluid
## 数据集
数据集使用的是来自Matt Mahoney(http://mattmahoney.net/dc/textdata.html)的维基百科文章数据集enwiki8.
数据集使用的是来自1 Billion Word Language Model Benchmark的(http://www.statmt.org/lm-benchmark)的数据集.
下载数据集:
```bash
......@@ -23,7 +23,7 @@ cd data && ./download.sh && cd ..
对数据进行预处理以生成一个词典。
```bash
python preprocess.py --data_path data/enwik8 --dict_path data/enwik8_dict
python preprocess.py --data_path ./data/1-billion-word-language-modeling-benchmark-r13output/training-monolingual.tokenized.shuffled --dict_path data/1-billion_dict
```
## 训练
......
......@@ -8,8 +8,7 @@
You should install PaddlePaddle Fluid first.
## Dataset
The training data for the Large Text Compression Benchmark is the first 109 bytes
of the English Wikipedia dump on Mar. 3, 2006 from Matt Mahoney(http://mattmahoney.net/dc/textdata.html).
The training data for the 1 Billion Word Language Model Benchmark的(http://www.statmt.org/lm-benchmark).
Download dataset:
```bash
......@@ -25,7 +24,7 @@ This model implement a skip-gram model of word2vector.
Preprocess the training data to generate a word dict.
```bash
python preprocess.py --data_path data/enwik8 --dict_path data/enwik8_dict
python preprocess.py --data_path ./data/1-billion-word-language-modeling-benchmark-r13output/training-monolingual.tokenized.shuffled --dict_path data/1-billion_dict
```
## Train
......
#!/bin/bash
wget http://mattmahoney.net/dc/enwik8.zip
unzip enwik8.zip
wget http://www.statmt.org/lm-benchmark/1-billion-word-language-modeling-benchmark-r13output.tar.gz
tar -zxvf 1-billion-word-language-modeling-benchmark-r13output.tar.gz
import paddle
import time
import os
import paddle.fluid as fluid
import numpy as np
from Queue import PriorityQueue
import logging
import argparse
from sklearn.metrics.pairwise import cosine_similarity
from paddle.fluid.executor import global_scope
word_to_id = dict()
id_to_word = dict()
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
def parse_args():
parser = argparse.ArgumentParser(
description="PaddlePaddle Word2vec infer example")
parser.add_argument(
'--dict_path',
type=str,
default='./data/1-billion_dict',
help="The path of training dataset")
parser.add_argument(
'--model_output_dir',
type=str,
default='models',
help="The path for model to store (with infer_once please set specify dir to models) (default: models)"
)
parser.add_argument(
'--rank_num',
type=int,
default=4,
help="find rank_num-nearest result for test (default: 4)")
parser.add_argument(
'--infer_once',
action='store_true',
required=False,
default=False,
help='if using infer_once, (default: False)')
return parser.parse_args()
def BuildWord_IdMap(dict_path):
with open(dict_path + "_word_to_id_", 'r') as f:
for line in f:
word_to_id[line.split(' ')[0]] = int(line.split(' ')[1])
id_to_word[int(line.split(' ')[1])] = line.split(' ')[0]
def inference_prog():
fluid.layers.create_parameter(
shape=[1, 1], dtype='float32', name="embeding")
def build_test_case(emb):
emb1 = emb[word_to_id['boy']] - emb[word_to_id['girl']] + emb[word_to_id[
'aunt']]
desc1 = "boy - girl + aunt = uncle"
emb2 = emb[word_to_id['brother']] - emb[word_to_id['sister']] + emb[
word_to_id['sisters']]
desc2 = "brother - sister + sisters = brothers"
emb3 = emb[word_to_id['king']] - emb[word_to_id['queen']] + emb[word_to_id[
'woman']]
desc3 = "king - queen + woman = man"
emb4 = emb[word_to_id['reluctant']] - emb[word_to_id['reluctantly']] + emb[
word_to_id['slowly']]
desc4 = "reluctant - reluctantly + slowly = slow"
emb5 = emb[word_to_id['old']] - emb[word_to_id['older']] + emb[word_to_id[
'deeper']]
desc5 = "old - older + deeper = deep"
return [[emb1, desc1], [emb2, desc2], [emb3, desc3], [emb4, desc4],
[emb5, desc5]]
def inference_test(model_dir, args):
BuildWord_IdMap(args.dict_path)
exe = fluid.Executor(fluid.CPUPlace())
Scope = fluid.Scope()
logger.info("model_dir is: {}".format(model_dir + "/"))
with fluid.scope_guard(Scope):
inference_prog()
fluid.io.load_persistables(executor=exe, dirname=model_dir + "/")
emb = np.array(Scope.find_var("embeding").get_tensor())
test_cases = build_test_case(emb)
logger.info("inference result: ====================")
for case in test_cases:
pq = topK(args.rank_num, emb, case[0])
logger.info("Test result for {}".format(case[1]))
pq_tmps = list()
for i in range(args.rank_num):
pq_tmps.append(pq.get())
for i in range(len(pq_tmps)):
logger.info("{} nearest is {}, rate is {}".format(i, id_to_word[
pq_tmps[len(pq_tmps) - 1 - i].id], pq_tmps[len(pq_tmps) - 1
- i].priority))
del pq_tmps[:]
def infer_with_in_train(model_dir, rank_num, dict_path):
BuildWord_IdMap(dict_path)
emb = np.array(global_scope().find_var("embeding").get_tensor())
test_cases = build_test_case(emb)
logger.info("inference result: ====================")
for case in test_cases:
pq = topK(rank_num, emb, case[0])
logger.info("Test result for {}".format(case[1]))
pq_tmps = list()
for i in range(rank_num):
pq_tmps.append(pq.get())
for i in range(len(pq_tmps)):
logger.info("{} nearest is {}, rate is {}".format(i, id_to_word[
pq_tmps[len(pq_tmps) - 1 - i].id], pq_tmps[len(pq_tmps) - 1 - i]
.priority))
del pq_tmps[:]
class PQ_Entry(object):
def __init__(self, cos_similarity, id):
self.priority = cos_similarity
self.id = id
def __cmp__(self, other):
return cmp(self.priority, other.priority)
def topK(k, emb, test_emb):
pq = PriorityQueue(k + 1)
if len(emb) <= k:
for i in range(len(emb)):
x = cosine_similarity([emb[i]], [test_emb])
pq.put(PQ_Entry(x, i))
return pq
for i in range(len(emb)):
x = cosine_similarity([emb[i]], [test_emb])
pq_e = PQ_Entry(x, i)
if pq.full():
pq.get()
pq.put(pq_e)
pq.get()
return pq
def infer_during_train(args):
model_file_list = list()
while True:
time.sleep(1)
current_list = os.listdir(args.model_output_dir)
logger.info("current_list is : {}".format(current_list))
logger.info("model_file_list is : {}".format(model_file_list))
if set(model_file_list) == set(current_list):
logger.info("they are the same")
pass
else:
increment_models = list()
for f in current_list:
if f not in model_file_list:
increment_models.append(f)
logger.info("increment_models is : {}".format(increment_models))
for model in increment_models:
model_dir = args.model_output_dir + "/" + model
if os.path.exists(model_dir + "/_success"):
logger.info("using models from " + model_dir)
inference_test(model_dir, args)
model_file_list = current_list
def infer_once(args):
if os.path.exists(args.model_output_dir + "/_success"
): # check models file has already been finished
logger.info("using models from " + args.model_output_dir)
inference_test(args.model_output_dir, args)
if __name__ == '__main__':
args = parse_args()
# while setting infer_once please specify the dir to models file with --model_output_dir
if args.infer_once:
infer_once(args)
else:
infer_during_train(args)
......@@ -55,7 +55,8 @@ def skip_gram_word2vec(dict_size,
return cost
def hsigmoid_layer(input, label, ptable, pcode, non_leaf_num, is_sparse):
def hsigmoid_layer(input, label, path_table, path_code, non_leaf_num,
is_sparse):
if non_leaf_num is None:
non_leaf_num = dict_size
......@@ -63,8 +64,8 @@ def skip_gram_word2vec(dict_size,
input=input,
label=label,
num_classes=non_leaf_num,
path_table=ptable,
path_code=pcode,
path_table=path_table,
path_code=path_code,
is_custom=True,
is_sparse=is_sparse)
......@@ -79,16 +80,16 @@ def skip_gram_word2vec(dict_size,
datas.append(predict_word)
if with_hsigmoid:
ptable = fluid.layers.data(
name='ptable',
path_table = fluid.layers.data(
name='path_table',
shape=[max_code_length if max_code_length else 40],
dtype='int64')
pcode = fluid.layers.data(
name='pcode',
path_code = fluid.layers.data(
name='path_code',
shape=[max_code_length if max_code_length else 40],
dtype='int64')
datas.append(ptable)
datas.append(pcode)
datas.append(path_table)
datas.append(path_code)
py_reader = fluid.layers.create_py_reader_by_data(
capacity=64, feed_list=datas, name='py_reader', use_double_buffer=True)
......@@ -99,8 +100,10 @@ def skip_gram_word2vec(dict_size,
input=words[0],
is_sparse=is_sparse,
size=[dict_size, embedding_size],
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Normal(
scale=1 / math.sqrt(dict_size))))
param_attr=fluid.ParamAttr(
name='embeding',
initializer=fluid.initializer.Normal(scale=1 /
math.sqrt(dict_size))))
cost, cost_nce, cost_hs = None, None, None
......
......@@ -22,6 +22,12 @@ def parse_args():
type=int,
default=5,
help="If the word count is less then freq, it will be removed from dict")
parser.add_argument(
'--is_local',
action='store_true',
required=False,
default=False,
help='Local train or not, (default: False)')
return parser.parse_args()
......@@ -114,7 +120,7 @@ def build_Huffman(word_count, max_code_length):
return word_point, word_code, word_code_len
def preprocess(data_path, dict_path, freq):
def preprocess(data_path, dict_path, freq, is_local):
"""
proprocess the data, generate dictionary and save into dict_path.
:param data_path: the input data path.
......@@ -125,16 +131,19 @@ def preprocess(data_path, dict_path, freq):
# word to count
word_count = dict()
with open(data_path) as f:
for line in f:
line = line.lower()
line = text_strip(line)
words = line.split()
for item in words:
if item in word_count:
word_count[item] = word_count[item] + 1
else:
word_count[item] = 1
if is_local:
for i in range(1, 100):
with open(data_path + "/news.en-000{:0>2d}-of-00100".format(
i)) as f:
for line in f:
line = line.lower()
line = text_strip(line)
words = line.split()
for item in words:
if item in word_count:
word_count[item] = word_count[item] + 1
else:
word_count[item] = 1
item_to_remove = []
for item in word_count:
if word_count[item] <= freq:
......@@ -159,4 +168,4 @@ def preprocess(data_path, dict_path, freq):
if __name__ == "__main__":
args = parse_args()
preprocess(args.data_path, args.dict_path, args.freq)
preprocess(args.data_path, args.dict_path, args.freq, args.is_local)
......@@ -5,9 +5,10 @@ import preprocess
class Word2VecReader(object):
def __init__(self, dict_path, data_path, window_size=5):
def __init__(self, dict_path, data_path, filelist, window_size=5):
self.window_size_ = window_size
self.data_path_ = data_path
self.filelist = filelist
self.num_non_leaf = 0
self.word_to_id_ = dict()
self.id_to_word = dict()
......@@ -27,6 +28,10 @@ class Word2VecReader(object):
word_counts.append(count)
word_all_count += count
with open(dict_path + "_word_to_id_", 'w+') as f6:
for k, v in self.word_to_id_.items():
f6.write(str(k) + " " + str(v) + '\n')
self.dict_size = len(self.word_to_id_)
self.word_frequencys = [
float(count) / word_all_count for count in word_counts
......@@ -66,36 +71,40 @@ class Word2VecReader(object):
def train(self, with_hs):
def _reader():
with open(self.data_path_, 'r') as f:
for line in f:
line = preprocess.text_strip(line)
word_ids = [
self.word_to_id_[word] for word in line.split()
if word in self.word_to_id_
]
for idx, target_id in enumerate(word_ids):
context_word_ids = self.get_context_words(
word_ids, idx, self.window_size_)
for context_id in context_word_ids:
yield [target_id], [context_id]
for file in self.filelist:
with open(self.data_path_ + "/" + file, 'r') as f:
for line in f:
line = preprocess.text_strip(line)
word_ids = [
self.word_to_id_[word] for word in line.split()
if word in self.word_to_id_
]
for idx, target_id in enumerate(word_ids):
context_word_ids = self.get_context_words(
word_ids, idx, self.window_size_)
for context_id in context_word_ids:
yield [target_id], [context_id]
def _reader_hs():
with open(self.data_path_, 'r') as f:
for line in f:
line = preprocess.text_strip(line)
word_ids = [
self.word_to_id_[word] for word in line.split()
if word in self.word_to_id_
]
for idx, target_id in enumerate(word_ids):
context_word_ids = self.get_context_words(
word_ids, idx, self.window_size_)
for context_id in context_word_ids:
yield [target_id], [context_id], [
self.word_to_code[self.id_to_word[context_id]]
], [
self.word_to_path[self.id_to_word[context_id]]
]
for file in self.filelist:
with open(self.data_path_ + "/" + file, 'r') as f:
for line in f:
line = preprocess.text_strip(line)
word_ids = [
self.word_to_id_[word] for word in line.split()
if word in self.word_to_id_
]
for idx, target_id in enumerate(word_ids):
context_word_ids = self.get_context_words(
word_ids, idx, self.window_size_)
for context_id in context_word_ids:
yield [target_id], [context_id], [
self.word_to_code[self.id_to_word[
context_id]]
], [
self.word_to_path[self.id_to_word[
context_id]]
]
if not with_hs:
return _reader
......@@ -104,8 +113,6 @@ class Word2VecReader(object):
if __name__ == "__main__":
epochs = 10
batch_size = 1000
window_size = 10
reader = Word2VecReader("data/enwik9_dict", "data/enwik9", window_size)
......
......@@ -15,6 +15,7 @@ import paddle.fluid as fluid
import reader
from network_conf import skip_gram_word2vec
from infer import infer_with_train
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("fluid")
......@@ -27,12 +28,12 @@ def parse_args():
parser.add_argument(
'--train_data_path',
type=str,
default='./data/enwik8',
default='./data/1-billion-word-language-modeling-benchmark-r13output/training-monolingual.tokenized.shuffled',
help="The path of training dataset")
parser.add_argument(
'--dict_path',
type=str,
default='./data/enwik8_dict',
default='./data/1-billion_dict',
help="The path of data dict")
parser.add_argument(
'--test_data_path',
......@@ -43,7 +44,7 @@ def parse_args():
'--batch_size',
type=int,
default=100,
help="The size of mini-batch (default:1000)")
help="The size of mini-batch (default:100)")
parser.add_argument(
'--num_passes',
type=int,
......@@ -79,6 +80,7 @@ def parse_args():
type=int,
default=40,
help='max code length used by hierarchical sigmoid, (default: 40)')
parser.add_argument(
'--is_sparse',
action='store_true',
......@@ -86,11 +88,44 @@ def parse_args():
default=False,
help='embedding and nce will use sparse or not, (default: False)')
parser.add_argument(
'--with_Adam',
action='store_true',
required=False,
default=False,
help='Using Adam as optimizer or not, (default: False)')
parser.add_argument(
'--is_local',
action='store_true',
required=False,
default=False,
help='Local train or not, (default: False)')
parser.add_argument(
'--with_speed',
action='store_true',
required=False,
default=False,
help='print speed or not , (default: False)')
parser.add_argument(
'--with_infer_test',
action='store_true',
required=False,
default=False,
help='Do inference every 100 batches , (default: False)')
parser.add_argument(
'--rank_num',
type=int,
default=4,
help="find rank_num-nearest result for test (default: 4)")
return parser.parse_args()
def train_loop(args, train_program, reader, py_reader, loss, trainer_id):
train_reader = paddle.batch(
paddle.reader.shuffle(
reader.train((args.with_hs or (not args.with_nce))),
......@@ -101,15 +136,11 @@ def train_loop(args, train_program, reader, py_reader, loss, trainer_id):
place = fluid.CPUPlace()
data_name_list = None
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
exec_strategy = fluid.ExecutionStrategy()
#if os.getenv("NUM_THREADS", ""):
# exec_strategy.num_threads = int(os.getenv("NUM_THREADS"))
print("CPU_NUM:" + str(os.getenv("CPU_NUM")))
exec_strategy.num_threads = int(os.getenv("CPU_NUM"))
......@@ -137,6 +168,7 @@ def train_loop(args, train_program, reader, py_reader, loss, trainer_id):
try:
while True:
if profiler_step == profiler_step_start:
fluid.profiler.start_profiler(profile_state)
......@@ -149,51 +181,62 @@ def train_loop(args, train_program, reader, py_reader, loss, trainer_id):
else:
profiler_step += 1
if batch_id % 10 == 0:
if batch_id % 50 == 0:
logger.info(
"TRAIN --> pass: {} batch: {} loss: {} reader queue:{}".
format(pass_id, batch_id,
loss_val.mean() / args.batch_size,
py_reader.queue.size()))
if batch_id % 100 == 0 and batch_id != 0:
elapsed = (time.clock() - start)
start = time.clock()
samples = 101 * args.batch_size * int(os.getenv("CPU_NUM"))
logger.info("Time used: {}, Samples/Sec: {}".format(
elapsed, samples / elapsed))
# elapsed = (time.clock() - start)
# start = time.clock()
# samples = 101 * args.batch_size * int(os.getenv("CPU_NUM"))
# logger.info("Time used: {}, Samples/Sec: {}".format(elapsed, samples/elapsed))
#if batch_id % 1000 == 0 and batch_id != 0:
# model_dir = args.model_output_dir + '/batch-' + str(batch_id)
# if trainer_id == 0:
# fluid.io.save_inference_model(model_dir, data_name_list, [loss], exe)
if args.with_speed:
if batch_id % 1000 == 0 and batch_id != 0:
elapsed = (time.clock() - start)
start = time.clock()
samples = 1001 * args.batch_size * int(
os.getenv("CPU_NUM"))
logger.info("Time used: {}, Samples/Sec: {}".format(
elapsed, samples / elapsed))
if batch_id == 200 or batch_id == 100:
model_dir = args.model_output_dir + '/batch-' + str(
batch_id)
fluid.io.save_persistables(executor=exe, dirname=model_dir)
with open(model_dir + "/_success", 'w+') as f:
f.write(str(batch_id))
# calculate infer result each 100 batches
if args.with_infer_test:
if batch_id % 1000 == 0 and batch_id != 0:
model_dir = args.model_output_dir + '/batch-' + str(
batch_id)
infer_with_in_train(model_dir, args.rank_num,
args.dict_path)
batch_id += 1
except fluid.core.EOFException:
py_reader.reset()
epoch_end = time.time()
print("Epoch: {0}, Train total expend: {1} ".format(
logger.info("Epoch: {0}, Train total expend: {1} ".format(
pass_id, epoch_end - epoch_start))
#model_dir = args.model_output_dir + '/pass-' + str(pass_id)
#if trainer_id == 0:
# fluid.io.save_inference_model(model_dir, data_name_list, [loss], exe)
model_dir = args.model_output_dir + '/pass-' + str(pass_id)
if trainer_id == 0:
fluid.io.save_persistables(executor=exe, dirname=model_dir)
with open(model_dir + "/_success", 'w+') as f:
f.write(str(pass_id))
def train():
args = parse_args()
def GetFileList(data_path):
return os.listdir(data_path)
def train(args):
if not os.path.isdir(args.model_output_dir):
os.mkdir(args.model_output_dir)
filelist = GetFileList(args.train_data_path)
word2vec_reader = reader.Word2VecReader(args.dict_path,
args.train_data_path)
args.train_data_path, filelist)
logger.info("dict_size: {}".format(word2vec_reader.dict_size))
loss, py_reader = skip_gram_word2vec(
word2vec_reader.dict_size,
word2vec_reader.word_frequencys,
......@@ -203,11 +246,16 @@ def train():
args.with_nce,
is_sparse=args.is_sparse)
#optimizer = fluid.optimizer.SGD(learning_rate=1e-3)
optimizer = fluid.optimizer.Adam(learning_rate=1e-3)
optimizer = None
if args.with_Adam:
optimizer = fluid.optimizer.Adam(learning_rate=1e-3)
else:
optimizer = fluid.optimizer.SGD(learning_rate=1e-3)
optimizer.minimize(loss)
if os.getenv("PADDLE_IS_LOCAL", "1") == "1":
# do local training
if args.is_local or os.getenv("PADDLE_IS_LOCAL", "1") == "1":
logger.info("run local training")
main_program = fluid.default_main_program()
......@@ -215,6 +263,7 @@ def train():
f.write(str(main_program))
train_loop(args, main_program, word2vec_reader, py_reader, loss, 0)
# do distribute training
else:
logger.info("run dist training")
......@@ -278,8 +327,8 @@ def env_declar():
os.environ["PADDLE_TRAINERS"] = os.environ["PADDLE_TRAINERS_NUM"]
os.environ["PADDLE_CURRENT_IP"] = os.environ["POD_IP"]
os.environ["PADDLE_TRAINER_ID"] = os.environ["PADDLE_TRAINER_ID"]
# we set the thread number same as CPU number
os.environ["CPU_NUM"] = "12"
os.environ["NUM_THREADS"] = "12"
print("Content-Type: text/plain\n\n")
for key in os.environ.keys():
......@@ -289,5 +338,9 @@ def env_declar():
if __name__ == '__main__':
#`env_declar()
train()
args = parse_args()
if args.is_local:
pass
else:
env_declar()
train(args)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册