From 5db9e5900f2e5282455e2daeef6facbeb6504541 Mon Sep 17 00:00:00 2001 From: hupeng03 Date: Tue, 30 Aug 2016 14:27:04 +0000 Subject: [PATCH] speedup preprocess in quick start ISSUE=4575209 git-svn-id: https://svn.baidu.com/idl/trunk/paddle@1430 1ad973e4-5ce8-4261-8a94-b56d1f490c56 --- demo/quick_start/preprocess.py | 250 +++++++++++++++++++-------------- demo/quick_start/preprocess.sh | 35 ++++- 2 files changed, 181 insertions(+), 104 deletions(-) diff --git a/demo/quick_start/preprocess.py b/demo/quick_start/preprocess.py index 0ef7e65c74..1507ac48e8 100755 --- a/demo/quick_start/preprocess.py +++ b/demo/quick_start/preprocess.py @@ -1,3 +1,5 @@ +# -*- coding: UTF-8 -*- + # Copyright (c) 2016 Baidu, Inc. All Rights Reserved # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -12,45 +14,71 @@ # See the License for the specific language governing permissions and # limitations under the License. -''' -1. remove HTML before tokensizing +""" +1. (remove HTML before or not)tokensizing 2. pos sample : rating score 5; neg sample: rating score 1-2. -3. size of pos : neg = 1:1. -4. size of testing set = min(25k, len(all_data) * 0.1), others is traning set. -5. distinct train set and test set. Usage: python preprocess.py -i data_file [random seed] -''' +""" -import sys,os -import re +import sys +import os import operator -import gzip,math -import random -import numpy as np -from bs4 import BeautifulSoup +import gzip from subprocess import Popen, PIPE from optparse import OptionParser +import json +from bs4 import BeautifulSoup +from multiprocessing import Queue +from multiprocessing import Pool +import multiprocessing + +batch_size = 5000 +word_count = {} +num_tokenize = max(1, multiprocessing.cpu_count() - 2) # parse + tokenize + save +max_queue_size = 8 +parse_queue = Queue(maxsize=max_queue_size + num_tokenize) +tokenize_queue = Queue(maxsize=max_queue_size + num_tokenize) + + +def create_dict(data): + """ + Create dictionary based on data, and saved in data_dir/dict.txt. + The first line is unk \t -1. + data: list, input data by batch. + """ + for seq in data: + try: + for w in seq.lower().split(): + if w not in word_count: + word_count[w] = 1 + else: + word_count[w] += 1 + except: + sys.stderr.write(seq + "\tERROR\n") + def parse(path): """ Open .gz file. """ + sys.stderr.write(path) g = gzip.open(path, 'r') for l in g: - yield eval(l) + yield json.loads(l) + g.close() +''' def clean(review): """ Clean input review: remove HTML, convert words to lower cases. """ # Remove HTML review_text = BeautifulSoup(review, "html.parser").get_text() - - # Convert words to lower case - review_text = review_text.lower() return review_text +''' + def tokenize(sentences): """ @@ -68,119 +96,137 @@ def tokenize(sentences): toks = tok_text.split('\n')[:-1] return toks -def create_dict(data, data_dir): + +def save_data(instance, data_dir, pre_fix, batch_num): """ - Create dictionary based on data, and saved in data_dir/dict.txt. - The first line is unk \t -1. - data: list, input data. - data_dir: path to save dict. + save data by batch """ - word_count = {} - for seq in data: - try: - for w in seq.lower().split(): - if w not in word_count: - word_count[w] = 1 - else: - word_count[w] += 1 - except: - sys.stderr.write(seq+"\tERROR\n") - f = open(os.path.join(data_dir, 'dict.txt'), 'w') + label = ['1' if pre_fix == 'pos' else '0' for i in range(len(instance))] + lines = ['%s\t%s' % (label[i], instance[i]) for i in range(len(label))] + file_name = os.path.join(data_dir, "%s_%s.txt" % (pre_fix, batch_num)) + file(file_name, 'w').write('\n'.join(lines) + '\n') + + +def tokenize_batch(id): + """ + tokenize data by batch + """ + while True: + num_batch, instance, pre_fix = parse_queue.get() + if num_batch == -1: ### parse_queue finished + tokenize_queue.put((-1, None, None)) + sys.stderr.write("tokenize theread %s finish\n" % (id)) + break + tokenize_instance = tokenize(instance) + tokenize_queue.put((num_batch, tokenize_instance, pre_fix)) + sys.stderr.write('.') + + +def save_batch(data_dir, num_tokenize, data_dir_dict): + """ + save data by batch + build dict.txt + """ + token_count = 0 + while True: + num_batch, instance, pre_fix = tokenize_queue.get() + if num_batch == -1: + token_count += 1 + if token_count == num_tokenize: #### tokenize finished. + break + else: + continue + save_data(instance, data_dir, pre_fix, num_batch) + create_dict(instance) ## update dict + + sys.stderr.write("save file finish\n") + f = open(data_dir_dict, 'w') f.write('%s\t%s\n' % ('unk', '-1')) - for k, v in sorted(word_count.items(), key=operator.itemgetter(1),\ - reverse=True): + for k, v in sorted(word_count.items(), key=operator.itemgetter(1), \ + reverse=True): f.write('%s\t%s\n' % (k, v)) f.close() + sys.stderr.write("build dict finish\n") -def save_data(data, data_dir, prefix = ""): - file_name = os.path.join(data_dir, "%s.txt" % (prefix)) - file(file_name,'w').write('\n'.join(data)+'\n') - file(os.path.join(data_dir, prefix+'.list'),'w').write('%s\n' % file_name) -def split_data(raw_txt): +def parse_batch(data, num_tokenize): """ - Extract positive and negative sample. + parse data by batch + parse -> clean ->tokenize ->save """ - pos = [] - neg = [] + raw_txt = parse(data) + neg, pos = [], [] count = 0 - dup_cnt = 0 - sys.stderr.write("extract raw data") + sys.stderr.write("extract raw data\n") for l in raw_txt: rating = l["overall"] - text = clean(l["reviewText"]) + #text = clean(l["reviewText"].lower()) # remove HTML + text = l["reviewText"].lower() # # convert words to lower case if rating == 5.0 and text: pos.append(text) if rating < 3.0 and text: neg.append(text) + if len(pos) == batch_size or len(neg) == batch_size: + if len(pos) == batch_size: + batch = pos + pre_fix = 'pos' + else: + batch = neg + pre_fix = 'neg' + + parse_queue.put((count, batch, pre_fix)) + count += 1 + if pre_fix == 'pos': + pos = [] + else: + neg = [] + + if len(pos) > 0: + parse_queue.put((count, pos, 'pos')) + count += 1 + if len(neg) > 0: + parse_queue.put((count, neg, 'neg')) count += 1 - if count % 20000==0: - sys.stderr.write(".") - sys.stderr.write("\n") - return pos, neg - -def preprocess(pos_in, neg_in, data_dir, rand_seed): - # tokenize - sys.stderr.write("tokenize...\n") - tmppos = tokenize(pos_in) - tmpneg = tokenize(neg_in) - cnt = len(tmppos) + len(tmpneg) - - # unique smaples - tmppos = list(set(tmppos)) - tmpneg = list(set(tmpneg)) - dup_cnt = cnt - len(tmppos) - len(tmpneg) - sys.stderr.write("\ntotal size of data set: %d, duplicate data: %d\n" % (cnt, dup_cnt)) - - # keep same size of positive and negative sample - min_len = min(len(tmppos), len(tmpneg)) - tmppos = tmppos[0:min_len] - tmpneg = tmpneg[0:min_len] - - # creat dictionary - sys.stderr.write("create dict with train and test data...\n") - all_data = tmppos + tmpneg - create_dict(all_data, data_dir) - - # split into train set and test set - sys.stderr.write("split data...\n") - pos = ["1\t"+i for i in tmppos] - neg = ["0\t"+i for i in tmpneg] - random.seed(rand_seed) - random.shuffle(pos) - random.shuffle(neg) - - # split into test set and train set - test_len = min(12500, int(min_len * 0.1)) - test = pos[0:test_len] + neg[0:test_len] - train = pos[test_len:] + neg[test_len:] - - # save data - sys.stderr.write("save data...\n") - save_data(train, data_dir, prefix = 'train') - save_data(test, data_dir, prefix = 'test') - file(os.path.join(data_dir,'labels.list'),'w').write('neg\t0\npos\t1\n') + for i in range(num_tokenize): + parse_queue.put((-1, None, None)) #### for tokenize's input finished + sys.stderr.write("parsing finish\n") + def option_parser(): parser = OptionParser(usage="usage: python preprcoess.py "\ "-i data_path [options]") - parser.add_option("-i", "--data", action="store", - dest="input", help="Input data path.") - parser.add_option("-s", "--seed", action="store", - dest="seed", default=1024, - help="Set random seed.") + parser.add_option( + "-i", "--data", action="store", dest="input", help="Input data path.") + parser.add_option( + "-s", + "--seed", + action="store", + dest="seed", + default=1024, + help="Set random seed.") return parser.parse_args() + def main(): reload(sys) sys.setdefaultencoding('utf-8') options, args = option_parser() - data=options.input - seed=options.seed - data_dir = os.path.dirname(data) - pos, neg = split_data(parse(data)) - preprocess(pos, neg, data_dir, seed) - sys.stderr.write("Done.\n") + data = options.input + seed = options.seed + data_dir_dict = os.path.join(os.path.dirname(data), 'dict.txt') + data_dir = os.path.join(os.path.dirname(data), 'tmp') + pool = Pool(processes=num_tokenize + 2) + pool.apply_async(parse_batch, args=(data, num_tokenize)) + for i in range(num_tokenize): + pool.apply_async(tokenize_batch, args=(str(i), )) + pool.apply_async(save_batch, args=(data_dir, num_tokenize, data_dir_dict)) + pool.close() + pool.join() + + sys.stderr.write("clean data done.\n") + file(os.path.join(os.path.dirname(data), 'labels.list'), + 'w').write('neg\t0\npos\t1\n') + if __name__ == '__main__': main() diff --git a/demo/quick_start/preprocess.sh b/demo/quick_start/preprocess.sh index f4d8e647a2..bdc03f81b6 100755 --- a/demo/quick_start/preprocess.sh +++ b/demo/quick_start/preprocess.sh @@ -1,4 +1,4 @@ -#!/bin/bash +#!/bin/sh # Copyright (c) 2016 Baidu, Inc. All Rights Reserved # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -12,10 +12,41 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -set -e +# 1. size of pos : neg = 1:1. +# 2. size of testing set = min(25k, len(all_data) * 0.1), others is traning set. +# 3. distinct train set and test set. +# 4. build dict + + +mkdir data/tmp python preprocess.py -i data/reviews_Electronics_5.json.gz +# uniq and shuffle +cd data/tmp +cat pos_*|sort|uniq|shuf> pos.shuffed +cat neg_*|sort|uniq|shuf> neg.shuffed + +min_len=`sed -n '$=' neg.shuffed` +((test_num=$min_len/10)) +if [ $test_num -gt 12500 ];then + test_num=12500 +fi +((train_num=$min_len-$test_num)) + +head -n$train_num pos.shuffed >train.pos +head -n$train_num neg.shuffed >train.neg +tail -n$test_num pos.shuffed >test.pos +tail -n$test_num neg.shuffed >test.neg + +cat train.pos train.neg|shuf>../train.txt +cat test.pos test.neg|shuf>../test.txt + +cd - +echo 'data/train.txt' > data/train.list +echo 'data/test.txt' > data/test.list # use 30k dict +rm -rf data/tmp mv data/dict.txt data/dict_all.txt cat data/dict_all.txt | head -n 30001 > data/dict.txt +echo 'preprocess finished' -- GitLab