提交 5db9e590 编写于 作者: H hupeng03

speedup preprocess in quick start

ISSUE=4575209


git-svn-id: https://svn.baidu.com/idl/trunk/paddle@1430 1ad973e4-5ce8-4261-8a94-b56d1f490c56
上级 2afe6609
# -*- coding: UTF-8 -*-
# Copyright (c) 2016 Baidu, Inc. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
......@@ -12,45 +14,71 @@
# See the License for the specific language governing permissions and
# limitations under the License.
'''
1. remove HTML before tokensizing
"""
1. (remove HTML before or not)tokensizing
2. pos sample : rating score 5; neg sample: rating score 1-2.
3. size of pos : neg = 1:1.
4. size of testing set = min(25k, len(all_data) * 0.1), others is traning set.
5. distinct train set and test set.
Usage:
python preprocess.py -i data_file [random seed]
'''
"""
import sys,os
import re
import sys
import os
import operator
import gzip,math
import random
import numpy as np
from bs4 import BeautifulSoup
import gzip
from subprocess import Popen, PIPE
from optparse import OptionParser
import json
from bs4 import BeautifulSoup
from multiprocessing import Queue
from multiprocessing import Pool
import multiprocessing
batch_size = 5000
word_count = {}
num_tokenize = max(1, multiprocessing.cpu_count() - 2) # parse + tokenize + save
max_queue_size = 8
parse_queue = Queue(maxsize=max_queue_size + num_tokenize)
tokenize_queue = Queue(maxsize=max_queue_size + num_tokenize)
def create_dict(data):
"""
Create dictionary based on data, and saved in data_dir/dict.txt.
The first line is unk \t -1.
data: list, input data by batch.
"""
for seq in data:
try:
for w in seq.lower().split():
if w not in word_count:
word_count[w] = 1
else:
word_count[w] += 1
except:
sys.stderr.write(seq + "\tERROR\n")
def parse(path):
"""
Open .gz file.
"""
sys.stderr.write(path)
g = gzip.open(path, 'r')
for l in g:
yield eval(l)
yield json.loads(l)
g.close()
'''
def clean(review):
"""
Clean input review: remove HTML, convert words to lower cases.
"""
# Remove HTML
review_text = BeautifulSoup(review, "html.parser").get_text()
# Convert words to lower case
review_text = review_text.lower()
return review_text
'''
def tokenize(sentences):
"""
......@@ -68,119 +96,137 @@ def tokenize(sentences):
toks = tok_text.split('\n')[:-1]
return toks
def create_dict(data, data_dir):
def save_data(instance, data_dir, pre_fix, batch_num):
"""
Create dictionary based on data, and saved in data_dir/dict.txt.
The first line is unk \t -1.
data: list, input data.
data_dir: path to save dict.
save data by batch
"""
word_count = {}
for seq in data:
try:
for w in seq.lower().split():
if w not in word_count:
word_count[w] = 1
else:
word_count[w] += 1
except:
sys.stderr.write(seq+"\tERROR\n")
f = open(os.path.join(data_dir, 'dict.txt'), 'w')
label = ['1' if pre_fix == 'pos' else '0' for i in range(len(instance))]
lines = ['%s\t%s' % (label[i], instance[i]) for i in range(len(label))]
file_name = os.path.join(data_dir, "%s_%s.txt" % (pre_fix, batch_num))
file(file_name, 'w').write('\n'.join(lines) + '\n')
def tokenize_batch(id):
"""
tokenize data by batch
"""
while True:
num_batch, instance, pre_fix = parse_queue.get()
if num_batch == -1: ### parse_queue finished
tokenize_queue.put((-1, None, None))
sys.stderr.write("tokenize theread %s finish\n" % (id))
break
tokenize_instance = tokenize(instance)
tokenize_queue.put((num_batch, tokenize_instance, pre_fix))
sys.stderr.write('.')
def save_batch(data_dir, num_tokenize, data_dir_dict):
"""
save data by batch
build dict.txt
"""
token_count = 0
while True:
num_batch, instance, pre_fix = tokenize_queue.get()
if num_batch == -1:
token_count += 1
if token_count == num_tokenize: #### tokenize finished.
break
else:
continue
save_data(instance, data_dir, pre_fix, num_batch)
create_dict(instance) ## update dict
sys.stderr.write("save file finish\n")
f = open(data_dir_dict, 'w')
f.write('%s\t%s\n' % ('unk', '-1'))
for k, v in sorted(word_count.items(), key=operator.itemgetter(1),\
reverse=True):
for k, v in sorted(word_count.items(), key=operator.itemgetter(1), \
reverse=True):
f.write('%s\t%s\n' % (k, v))
f.close()
sys.stderr.write("build dict finish\n")
def save_data(data, data_dir, prefix = ""):
file_name = os.path.join(data_dir, "%s.txt" % (prefix))
file(file_name,'w').write('\n'.join(data)+'\n')
file(os.path.join(data_dir, prefix+'.list'),'w').write('%s\n' % file_name)
def split_data(raw_txt):
def parse_batch(data, num_tokenize):
"""
Extract positive and negative sample.
parse data by batch
parse -> clean ->tokenize ->save
"""
pos = []
neg = []
raw_txt = parse(data)
neg, pos = [], []
count = 0
dup_cnt = 0
sys.stderr.write("extract raw data")
sys.stderr.write("extract raw data\n")
for l in raw_txt:
rating = l["overall"]
text = clean(l["reviewText"])
#text = clean(l["reviewText"].lower()) # remove HTML
text = l["reviewText"].lower() # # convert words to lower case
if rating == 5.0 and text:
pos.append(text)
if rating < 3.0 and text:
neg.append(text)
if len(pos) == batch_size or len(neg) == batch_size:
if len(pos) == batch_size:
batch = pos
pre_fix = 'pos'
else:
batch = neg
pre_fix = 'neg'
parse_queue.put((count, batch, pre_fix))
count += 1
if pre_fix == 'pos':
pos = []
else:
neg = []
if len(pos) > 0:
parse_queue.put((count, pos, 'pos'))
count += 1
if len(neg) > 0:
parse_queue.put((count, neg, 'neg'))
count += 1
if count % 20000==0:
sys.stderr.write(".")
sys.stderr.write("\n")
return pos, neg
def preprocess(pos_in, neg_in, data_dir, rand_seed):
# tokenize
sys.stderr.write("tokenize...\n")
tmppos = tokenize(pos_in)
tmpneg = tokenize(neg_in)
cnt = len(tmppos) + len(tmpneg)
# unique smaples
tmppos = list(set(tmppos))
tmpneg = list(set(tmpneg))
dup_cnt = cnt - len(tmppos) - len(tmpneg)
sys.stderr.write("\ntotal size of data set: %d, duplicate data: %d\n" % (cnt, dup_cnt))
# keep same size of positive and negative sample
min_len = min(len(tmppos), len(tmpneg))
tmppos = tmppos[0:min_len]
tmpneg = tmpneg[0:min_len]
# creat dictionary
sys.stderr.write("create dict with train and test data...\n")
all_data = tmppos + tmpneg
create_dict(all_data, data_dir)
# split into train set and test set
sys.stderr.write("split data...\n")
pos = ["1\t"+i for i in tmppos]
neg = ["0\t"+i for i in tmpneg]
random.seed(rand_seed)
random.shuffle(pos)
random.shuffle(neg)
# split into test set and train set
test_len = min(12500, int(min_len * 0.1))
test = pos[0:test_len] + neg[0:test_len]
train = pos[test_len:] + neg[test_len:]
# save data
sys.stderr.write("save data...\n")
save_data(train, data_dir, prefix = 'train')
save_data(test, data_dir, prefix = 'test')
file(os.path.join(data_dir,'labels.list'),'w').write('neg\t0\npos\t1\n')
for i in range(num_tokenize):
parse_queue.put((-1, None, None)) #### for tokenize's input finished
sys.stderr.write("parsing finish\n")
def option_parser():
parser = OptionParser(usage="usage: python preprcoess.py "\
"-i data_path [options]")
parser.add_option("-i", "--data", action="store",
dest="input", help="Input data path.")
parser.add_option("-s", "--seed", action="store",
dest="seed", default=1024,
help="Set random seed.")
parser.add_option(
"-i", "--data", action="store", dest="input", help="Input data path.")
parser.add_option(
"-s",
"--seed",
action="store",
dest="seed",
default=1024,
help="Set random seed.")
return parser.parse_args()
def main():
reload(sys)
sys.setdefaultencoding('utf-8')
options, args = option_parser()
data=options.input
seed=options.seed
data_dir = os.path.dirname(data)
pos, neg = split_data(parse(data))
preprocess(pos, neg, data_dir, seed)
sys.stderr.write("Done.\n")
data = options.input
seed = options.seed
data_dir_dict = os.path.join(os.path.dirname(data), 'dict.txt')
data_dir = os.path.join(os.path.dirname(data), 'tmp')
pool = Pool(processes=num_tokenize + 2)
pool.apply_async(parse_batch, args=(data, num_tokenize))
for i in range(num_tokenize):
pool.apply_async(tokenize_batch, args=(str(i), ))
pool.apply_async(save_batch, args=(data_dir, num_tokenize, data_dir_dict))
pool.close()
pool.join()
sys.stderr.write("clean data done.\n")
file(os.path.join(os.path.dirname(data), 'labels.list'),
'w').write('neg\t0\npos\t1\n')
if __name__ == '__main__':
main()
#!/bin/bash
#!/bin/sh
# Copyright (c) 2016 Baidu, Inc. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
......@@ -12,10 +12,41 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -e
# 1. size of pos : neg = 1:1.
# 2. size of testing set = min(25k, len(all_data) * 0.1), others is traning set.
# 3. distinct train set and test set.
# 4. build dict
mkdir data/tmp
python preprocess.py -i data/reviews_Electronics_5.json.gz
# uniq and shuffle
cd data/tmp
cat pos_*|sort|uniq|shuf> pos.shuffed
cat neg_*|sort|uniq|shuf> neg.shuffed
min_len=`sed -n '$=' neg.shuffed`
((test_num=$min_len/10))
if [ $test_num -gt 12500 ];then
test_num=12500
fi
((train_num=$min_len-$test_num))
head -n$train_num pos.shuffed >train.pos
head -n$train_num neg.shuffed >train.neg
tail -n$test_num pos.shuffed >test.pos
tail -n$test_num neg.shuffed >test.neg
cat train.pos train.neg|shuf>../train.txt
cat test.pos test.neg|shuf>../test.txt
cd -
echo 'data/train.txt' > data/train.list
echo 'data/test.txt' > data/test.list
# use 30k dict
rm -rf data/tmp
mv data/dict.txt data/dict_all.txt
cat data/dict_all.txt | head -n 30001 > data/dict.txt
echo 'preprocess finished'
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册