From fd66a06def9a561cdf1fa474b8bbb59b66f92d0e Mon Sep 17 00:00:00 2001 From: malin10 Date: Mon, 21 Sep 2020 12:31:19 +0800 Subject: [PATCH] test=develop, update gru4rec --- models/recall/gru4rec/README.md | 172 ++++++++++++++++++++++ models/recall/gru4rec/config.yaml | 24 +-- models/recall/gru4rec/data/preprocess.py | 50 +++---- models/recall/gru4rec/data/text2paddle.py | 115 +++++++++++++++ models/recall/gru4rec/data_prepare.sh | 29 +--- models/recall/gru4rec/infer.py | 109 ++++++++++++++ 6 files changed, 441 insertions(+), 58 deletions(-) create mode 100644 models/recall/gru4rec/README.md create mode 100644 models/recall/gru4rec/data/text2paddle.py create mode 100644 models/recall/gru4rec/infer.py diff --git a/models/recall/gru4rec/README.md b/models/recall/gru4rec/README.md new file mode 100644 index 00000000..0a3d6f5f --- /dev/null +++ b/models/recall/gru4rec/README.md @@ -0,0 +1,172 @@ +# GRU4REC + +以下是本例的简要目录结构及说明: + +``` +├── data #样例数据及数据处理相关文件 + ├── train + ├── small_train.txt # 样例训练数据 + ├── test + ├── small_test.txt # 样例测试数据 + ├── convert_format.py # 数据转换脚本 + ├── download.py # 数据下载脚本 + ├── preprocess.py # 数据预处理脚本 +├── __init__.py +├── README.md # 文档 +├── model.py #模型文件 +├── config.yaml #配置文件 +├── data_prepare.sh #一键数据处理脚本 +├── rsc15_reader.py #reader +``` + +注:在阅读该示例前,建议您先了解以下内容: + +[paddlerec入门教程](https://github.com/PaddlePaddle/PaddleRec/blob/master/README.md) + + +--- +## 内容 + +- [模型简介](#模型简介) +- [数据准备](#数据准备) +- [运行环境](#运行环境) +- [快速开始](#快速开始) +- [论文复现](#论文复现) +- [进阶使用](#进阶使用) +- [FAQ](#FAQ) + +## 模型简介 +GRU4REC模型的介绍可以参阅论文[Session-based Recommendations with Recurrent Neural Networks](https://arxiv.org/abs/1511.06939)。 + +论文的贡献在于首次将RNN(GRU)运用于session-based推荐,相比传统的KNN和矩阵分解,效果有明显的提升。 + +论文的核心思想是在一个session中,用户点击一系列item的行为看做一个序列,用来训练RNN模型。预测阶段,给定已知的点击序列作为输入,预测下一个可能点击的item。 + +session-based推荐应用场景非常广泛,比如用户的商品浏览、新闻点击、地点签到等序列数据。 + +本模型配置默认使用demo数据集,若进行精度验证,请参考[论文复现](#论文复现)部分。 + +本项目支持功能 + +训练:单机CPU、单机单卡GPU、本地模拟参数服务器训练、增量训练,配置请参考 [启动训练](https://github.com/PaddlePaddle/PaddleRec/blob/master/doc/train.md) + +预测:单机CPU、单机单卡GPU;配置请参考[PaddleRec 离线预测](https://github.com/PaddlePaddle/PaddleRec/blob/master/doc/predict.md) + +## 数据处理 +本示例中数据处理共包含三步: +- Step1: 原始数据数据集下载 +``` +cd data/ +python download.py +``` +- Step2: 数据预处理及格式转换。 + 1. 以session_id为key合并原始数据集,得到每个session的日期,及顺序点击列表。 + 2. 过滤掉长度为1的session;过滤掉点击次数小于5的items。 + 3. 训练集、测试集划分。原始数据集里最新日期七天内的作为训练集,更早之前的数据作为测试集。 +``` +python preprocess.py +python convert_data.py +``` +这一步之后,会在data/目录下得到两个文件,rsc15_train_tr_paddle.txt为原始训练文件,rsc15_test_paddle.txt为原始测试文件。格式如下所示: +``` +214536502 214536500 214536506 214577561 +214662742 214662742 214825110 214757390 214757407 214551617 +214716935 214774687 214832672 +214836765 214706482 +214701242 214826623 +214826835 214826715 +214838855 214838855 +214576500 214576500 214576500 +214821275 214821275 214821371 214821371 214821371 214717089 214563337 214706462 214717436 214743335 214826837 214819762 +214717867 21471786 +``` +- Step3: 数据整理。将训练文件统一放在data/all_train目录下,测试文件统一放在data/all_test目录下。 +``` +mkdir raw_train_data && mkdir raw_test_data +mv rsc15_train_tr_paddle.txt raw_train_data/ && mv rsc15_test_paddle.txt raw_test_data/ +mkdir all_train && mkdir all_test + +python text2paddle.py raw_train_data/ raw_test_data/ all_train all_test vocab.txt +``` + +方便起见,我们提供了一键式数据生成脚本: +``` +sh data_prepare.sh +``` + +## 运行环境 + +PaddlePaddle>=1.7.2 + +python 2.7/3.5/3.6/3.7 + +PaddleRec >=0.1 + +os : windows/linux/macos + +## 快速开始 + +### 单机训练 + +``` +mode: [cpu_train_runner, cpu_infer_runner] + +runner: +- name: cpu_train_runner + class: train + device: cpu + epochs: 10 + save_checkpoint_interval: 2 + save_inference_interval: 4 + save_checkpoint_path: "increment_gru4rec" + save_inference_path: "inference_gru4rec" + print_interval: 10 + phase: train +- name: cpu_infer_runner + class: infer + init_model_path: "increment_gru4rec" + device: cpu + phase: infer +``` + +### 单机预测 + +### 运行 +``` +python -m paddlerec.run -m paddlerec.models.recall.w2v +``` + +### 结果展示 + +样例数据训练结果展示: + +``` +Running SingleStartup. +Running SingleRunner. +batch: 1, acc: [0.03125] +batch: 2, acc: [0.0625] +batch: 3, acc: [0.] +... +epoch 0 done, use time: 0.0605320930481, global metrics: acc=[0.] +... +epoch 19 done, use time: 0.33447098732, global metrics: acc=[0.] +``` + +样例数据预测结果展示: +``` +user:0, top K videos:[40, 31, 4, 33, 93] +user:1, top K videos:[35, 57, 58, 40, 17] +user:2, top K videos:[35, 17, 88, 40, 9] +user:3, top K videos:[73, 35, 39, 58, 38] +user:4, top K videos:[40, 31, 57, 4, 73] +user:5, top K videos:[38, 9, 7, 88, 22] +user:6, top K videos:[35, 73, 14, 58, 28] +user:7, top K videos:[35, 73, 58, 38, 56] +user:8, top K videos:[38, 40, 9, 35, 99] +user:9, top K videos:[88, 73, 9, 35, 28] +user:10, top K videos:[35, 52, 28, 54, 73] +``` + +## 进阶使用 + +## FAQ diff --git a/models/recall/gru4rec/config.yaml b/models/recall/gru4rec/config.yaml index e3c20c9f..c07e9517 100644 --- a/models/recall/gru4rec/config.yaml +++ b/models/recall/gru4rec/config.yaml @@ -16,12 +16,12 @@ workspace: "models/recall/gru4rec" dataset: - name: dataset_train - batch_size: 5 + batch_size: 500 type: DataLoader # QueueDataset data_path: "{workspace}/data/train" data_converter: "{workspace}/rsc15_reader.py" - name: dataset_infer - batch_size: 5 + batch_size: 500 type: DataLoader #QueueDataset data_path: "{workspace}/data/test" data_converter: "{workspace}/rsc15_reader.py" @@ -41,30 +41,32 @@ hyper_parameters: strategy: async #use infer_runner mode and modify 'phase' below if infer -mode: train_runner +mode: [cpu_train_runner] #mode: infer_runner runner: -- name: train_runner +- name: cpu_train_runner class: train device: cpu epochs: 10 - save_checkpoint_interval: 2 - save_inference_interval: 4 + save_checkpoint_interval: 1 + save_inference_interval: 1 save_checkpoint_path: "increment_gru4rec" save_inference_path: "inference_gru4rec" print_interval: 10 -- name: infer_runner + phase: train +- name: cpu_infer_runner class: infer init_model_path: "increment_gru4rec" device: cpu + phase: infer phase: - name: train model: "{workspace}/model.py" dataset_name: dataset_train thread_num: 1 -#- name: infer -# model: "{workspace}/model.py" -# dataset_name: dataset_infer -# thread_num: 1 +- name: infer + model: "{workspace}/model.py" + dataset_name: dataset_infer + thread_num: 1 diff --git a/models/recall/gru4rec/data/preprocess.py b/models/recall/gru4rec/data/preprocess.py index 66ed72b6..23323726 100644 --- a/models/recall/gru4rec/data/preprocess.py +++ b/models/recall/gru4rec/data/preprocess.py @@ -21,50 +21,50 @@ data = pd.read_csv( dtype={0: np.int32, 1: str, 2: np.int64}) -data.columns = ['SessionId', 'TimeStr', 'ItemId'] -data['Time'] = data.TimeStr.apply(lambda x: time.mktime(dt.datetime.strptime(x, '%Y-%m-%dT%H:%M:%S.%fZ').timetuple())) #This is not UTC. It does not really matter. -del (data['TimeStr']) +data.columns = ['session_id', 'timestamp', 'item_id'] +data['Time'] = data.timestamp.apply(lambda x: time.mktime(dt.datetime.strptime(x, '%Y-%m-%dT%H:%M:%S.%fZ').timetuple())) #This is not UTC. It does not really matter. +del (data['timestamp']) -session_lengths = data.groupby('SessionId').size() -data = data[np.in1d(data.SessionId, session_lengths[session_lengths > 1] +session_lengths = data.groupby('session_id').size() +data = data[np.in1d(data.session_id, session_lengths[session_lengths > 1] .index)] -item_supports = data.groupby('ItemId').size() -data = data[np.in1d(data.ItemId, item_supports[item_supports >= 5].index)] -session_lengths = data.groupby('SessionId').size() -data = data[np.in1d(data.SessionId, session_lengths[session_lengths >= 2] +item_supports = data.groupby('item_id').size() +data = data[np.in1d(data.item_id, item_supports[item_supports >= 5].index)] +session_lengths = data.groupby('session_id').size() +data = data[np.in1d(data.session_id, session_lengths[session_lengths >= 2] .index)] tmax = data.Time.max() -session_max_times = data.groupby('SessionId').Time.max() +session_max_times = data.groupby('session_id').Time.max() session_train = session_max_times[session_max_times < tmax - 86400].index session_test = session_max_times[session_max_times >= tmax - 86400].index -train = data[np.in1d(data.SessionId, session_train)] -test = data[np.in1d(data.SessionId, session_test)] -test = test[np.in1d(test.ItemId, train.ItemId)] -tslength = test.groupby('SessionId').size() -test = test[np.in1d(test.SessionId, tslength[tslength >= 2].index)] +train = data[np.in1d(data.session_id, session_train)] +test = data[np.in1d(data.session_id, session_test)] +test = test[np.in1d(test.item_id, train.item_id)] +tslength = test.groupby('session_id').size() +test = test[np.in1d(test.session_id, tslength[tslength >= 2].index)] print('Full train set\n\tEvents: {}\n\tSessions: {}\n\tItems: {}'.format( - len(train), train.SessionId.nunique(), train.ItemId.nunique())) + len(train), train.session_id.nunique(), train.item_id.nunique())) train.to_csv( PATH_TO_PROCESSED_DATA + 'rsc15_train_full.txt', sep='\t', index=False) print('Test set\n\tEvents: {}\n\tSessions: {}\n\tItems: {}'.format( - len(test), test.SessionId.nunique(), test.ItemId.nunique())) + len(test), test.session_id.nunique(), test.item_id.nunique())) test.to_csv(PATH_TO_PROCESSED_DATA + 'rsc15_test.txt', sep='\t', index=False) tmax = train.Time.max() -session_max_times = train.groupby('SessionId').Time.max() +session_max_times = train.groupby('session_id').Time.max() session_train = session_max_times[session_max_times < tmax - 86400].index session_valid = session_max_times[session_max_times >= tmax - 86400].index -train_tr = train[np.in1d(train.SessionId, session_train)] -valid = train[np.in1d(train.SessionId, session_valid)] -valid = valid[np.in1d(valid.ItemId, train_tr.ItemId)] -tslength = valid.groupby('SessionId').size() -valid = valid[np.in1d(valid.SessionId, tslength[tslength >= 2].index)] +train_tr = train[np.in1d(train.session_id, session_train)] +valid = train[np.in1d(train.session_id, session_valid)] +valid = valid[np.in1d(valid.item_id, train_tr.item_id)] +tslength = valid.groupby('session_id').size() +valid = valid[np.in1d(valid.session_id, tslength[tslength >= 2].index)] print('Train set\n\tEvents: {}\n\tSessions: {}\n\tItems: {}'.format( - len(train_tr), train_tr.SessionId.nunique(), train_tr.ItemId.nunique())) + len(train_tr), train_tr.session_id.nunique(), train_tr.item_id.nunique())) train_tr.to_csv( PATH_TO_PROCESSED_DATA + 'rsc15_train_tr.txt', sep='\t', index=False) print('Validation set\n\tEvents: {}\n\tSessions: {}\n\tItems: {}'.format( - len(valid), valid.SessionId.nunique(), valid.ItemId.nunique())) + len(valid), valid.session_id.nunique(), valid.item_id.nunique())) valid.to_csv( PATH_TO_PROCESSED_DATA + 'rsc15_train_valid.txt', sep='\t', index=False) diff --git a/models/recall/gru4rec/data/text2paddle.py b/models/recall/gru4rec/data/text2paddle.py new file mode 100644 index 00000000..ff952825 --- /dev/null +++ b/models/recall/gru4rec/data/text2paddle.py @@ -0,0 +1,115 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import six +import collections +import os +import sys +import io +if six.PY2: + reload(sys) + sys.setdefaultencoding('utf-8') + + +def word_count(input_file, word_freq=None): + """ + compute word count from corpus + """ + if word_freq is None: + word_freq = collections.defaultdict(int) + + for l in input_file: + for w in l.strip().split(): + word_freq[w] += 1 + + return word_freq + + +def build_dict(min_word_freq=0, train_dir="", test_dir=""): + """ + Build a word dictionary from the corpus, Keys of the dictionary are words, + and values are zero-based IDs of these words. + """ + word_freq = collections.defaultdict(int) + files = os.listdir(train_dir) + for fi in files: + with io.open(os.path.join(train_dir, fi), "r") as f: + word_freq = word_count(f, word_freq) + files = os.listdir(test_dir) + for fi in files: + with io.open(os.path.join(test_dir, fi), "r") as f: + word_freq = word_count(f, word_freq) + + word_freq = [x for x in six.iteritems(word_freq) if x[1] > min_word_freq] + word_freq_sorted = sorted(word_freq, key=lambda x: (-x[1], x[0])) + words, _ = list(zip(*word_freq_sorted)) + word_idx = dict(list(zip(words, six.moves.range(len(words))))) + return word_idx + + +def write_paddle(word_idx, train_dir, test_dir, output_train_dir, + output_test_dir): + files = os.listdir(train_dir) + if not os.path.exists(output_train_dir): + os.mkdir(output_train_dir) + for fi in files: + with io.open(os.path.join(train_dir, fi), "r") as f: + with io.open(os.path.join(output_train_dir, fi), "w") as wf: + for l in f: + l = l.strip().split() + l = [word_idx.get(w) for w in l] + for w in l: + wf.write(str2file(str(w) + " ")) + wf.write(str2file("\n")) + + files = os.listdir(test_dir) + if not os.path.exists(output_test_dir): + os.mkdir(output_test_dir) + for fi in files: + with io.open(os.path.join(test_dir, fi), "r", encoding='utf-8') as f: + with io.open( + os.path.join(output_test_dir, fi), "w", + encoding='utf-8') as wf: + for l in f: + l = l.strip().split() + l = [word_idx.get(w) for w in l] + for w in l: + wf.write(str2file(str(w) + " ")) + wf.write(str2file("\n")) + + +def str2file(str): + if six.PY2: + return str.decode("utf-8") + else: + return str + + +def text2paddle(train_dir, test_dir, output_train_dir, output_test_dir, + output_vocab): + vocab = build_dict(0, train_dir, test_dir) + print("vocab size:", str(len(vocab))) + with io.open(output_vocab, "w", encoding='utf-8') as wf: + wf.write(str2file(str(len(vocab)) + "\n")) + write_paddle(vocab, train_dir, test_dir, output_train_dir, output_test_dir) + + +train_dir = sys.argv[1] +test_dir = sys.argv[2] +output_train_dir = sys.argv[3] +output_test_dir = sys.argv[4] +output_vocab = sys.argv[5] +text2paddle(train_dir, test_dir, output_train_dir, output_test_dir, + output_vocab) diff --git a/models/recall/gru4rec/data_prepare.sh b/models/recall/gru4rec/data_prepare.sh index a97e57ab..6dea52f3 100644 --- a/models/recall/gru4rec/data_prepare.sh +++ b/models/recall/gru4rec/data_prepare.sh @@ -16,30 +16,15 @@ set -e -dataset=$1 -src=$1 - -if [[ $src == "yoochoose1_4" || $src == "yoochoose1_64" ]];then - src="yoochoose" -elif [[ $src == "diginetica" ]];then - src="diginetica" -else - echo "Usage: sh data_prepare.sh [diginetica|yoochoose1_4|yoochoose1_64]" - exit 1 -fi - echo "begin to download data" -cd data && python download.py $src -mkdir $dataset -python preprocess.py --dataset $src +cd data && python download.py +python preprocess.py echo "begin to convert data (binary -> txt)" -python convert_data.py --data_dir $dataset - -cat ${dataset}/train.txt | wc -l >> config.txt +python convert_data.py -rm -rf train && mkdir train -mv ${dataset}/train.txt train +mkdir raw_train_data && mkdir raw_test_data +mv rsc15_train_tr_paddle.txt raw_train_data/ && mv rsc15_test_paddle.txt raw_test_data/ -rm -rf test && mkdir test -mv ${dataset}/test.txt test +mkdir all_train && mkdir all_test +python text2paddle.py raw_train_data/ raw_test_data/ all_train all_test vocab.txt diff --git a/models/recall/gru4rec/infer.py b/models/recall/gru4rec/infer.py new file mode 100644 index 00000000..7a9bef18 --- /dev/null +++ b/models/recall/gru4rec/infer.py @@ -0,0 +1,109 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import sys +import time +import math +import unittest +import contextlib +import numpy as np +import six +import paddle.fluid as fluid +import paddle + +import utils + + +def parse_args(): + parser = argparse.ArgumentParser("gru4rec benchmark.") + parser.add_argument( + '--test_dir', type=str, default='test_data', help='test file address') + parser.add_argument( + '--start_index', type=int, default='1', help='start index') + parser.add_argument( + '--last_index', type=int, default='10', help='end index') + parser.add_argument( + '--model_dir', type=str, default='model_recall20', help='model dir') + parser.add_argument( + '--use_cuda', type=int, default='0', help='whether use cuda') + parser.add_argument( + '--batch_size', type=int, default='5', help='batch_size') + parser.add_argument( + '--vocab_path', type=str, default='vocab.txt', help='vocab file') + args = parser.parse_args() + return args + + +def infer(test_reader, use_cuda, model_path): + """ inference function """ + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + exe = fluid.Executor(place) + + with fluid.scope_guard(fluid.Scope()): + infer_program, feed_target_names, fetch_vars = fluid.io.load_inference_model( + model_path, exe) + accum_num_recall = 0.0 + accum_num_sum = 0.0 + t0 = time.time() + step_id = 0 + for data in test_reader(): + step_id += 1 + src_wordseq = utils.to_lodtensor([dat[0] for dat in data], place) + label_data = [dat[1] for dat in data] + dst_wordseq = utils.to_lodtensor(label_data, place) + para = exe.run( + infer_program, + feed={"src_wordseq": src_wordseq, + "dst_wordseq": dst_wordseq}, + fetch_list=fetch_vars, + return_numpy=False) + + acc_ = para[1]._get_float_element(0) + data_length = len( + np.concatenate( + label_data, axis=0).astype("int64")) + accum_num_sum += (data_length) + accum_num_recall += (data_length * acc_) + if step_id % 1 == 0: + print("step:%d recall@20:%.4f" % + (step_id, accum_num_recall / accum_num_sum)) + t1 = time.time() + print("model:%s recall@20:%.3f time_cost(s):%.2f" % + (model_path, accum_num_recall / accum_num_sum, t1 - t0)) + + +if __name__ == "__main__": + utils.check_version() + args = parse_args() + start_index = args.start_index + last_index = args.last_index + test_dir = args.test_dir + model_dir = args.model_dir + batch_size = args.batch_size + vocab_path = args.vocab_path + use_cuda = True if args.use_cuda else False + print("start index: ", start_index, " last_index:", last_index) + vocab_size, test_reader = utils.prepare_data( + test_dir, + vocab_path, + batch_size=batch_size, + buffer_size=1000, + word_freq_threshold=0, + is_train=False) + + for epoch in range(start_index, last_index + 1): + epoch_path = model_dir + "/epoch_" + str(epoch) + infer( + test_reader=test_reader, use_cuda=use_cuda, model_path=epoch_path) -- GitLab