......@@ -27,12 +27,7 @@ class Metric(object):
def clear(self, scope=None):
clear current value
scope: value container
params: extend varilable for clear
if scope is None:
scope = fluid.global_scope()
......@@ -46,11 +41,7 @@ class Metric(object):
var.set(data_array, place)
def get_global_metric(self, fleet, scope, metric_name, mode="sum"):
reduce metric named metric_name from all worker
metric reduce result
input = np.array(scope.find_var(metric_name).get_tensor())
if fleet is None:
return input
......@@ -63,12 +54,7 @@ class Metric(object):
return output
def cal_global_metrics(self, fleet, scope=None):
calculate result
scope: value container
params: extend varilable for clear
if scope is None:
scope = fluid.global_scope()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
import math
import numpy as np
import paddle.fluid as fluid
from paddlerec.core.metric import Metric
class AUCMetric(Metric):
Metric For Fluid Model
def __init__(self, config, fleet):
self.config = config
self.fleet = fleet
def clear(self, scope, params):
Clear current metric value, usually set to zero
scope : paddle runtime var container
params(dict) :
label : a group name for metric
metric_dict : current metric_items in group
self._label = params['label']
self._metric_dict = params['metric_dict']
self._result = {}
place = fluid.CPUPlace()
for metric_name in self._metric_dict:
metric_config = self._metric_dict[metric_name]
if scope.find_var(metric_config['var'].name) is None:
metric_var = scope.var(metric_config['var'].name).get_tensor()
data_type = 'float32'
if 'data_type' in metric_config:
data_type = metric_config['data_type']
data_array = np.zeros(metric_var._get_dims()).astype(data_type)
metric_var.set(data_array, place)
def get_metric(self, scope, metric_name):
reduce metric named metric_name from all worker
metric reduce result
metric = np.array(scope.find_var(metric_name).get_tensor())
old_metric_shape = np.array(metric.shape)
metric = metric.reshape(-1)
global_metric = np.copy(metric) * 0
self.fleet._role_maker.all_reduce_worker(metric, global_metric)
global_metric = global_metric.reshape(old_metric_shape)
return global_metric[0]
def get_global_metrics(self, scope, metric_dict):
reduce all metric in metric_dict from all worker
dict : {matric_name : metric_result}
result = {}
for metric_name in metric_dict:
metric_item = metric_dict[metric_name]
if scope.find_var(metric_item['var'].name) is None:
result[metric_name] = None
result[metric_name] = self.get_metric(scope,
return result
def calculate_auc(self, global_pos, global_neg):
num_bucket = len(global_pos)
area = 0.0
pos = 0.0
neg = 0.0
new_pos = 0.0
new_neg = 0.0
total_ins_num = 0
for i in range(num_bucket):
index = num_bucket - 1 - i
new_pos = pos + global_pos[index]
total_ins_num += global_pos[index]
new_neg = neg + global_neg[index]
total_ins_num += global_neg[index]
area += (new_neg - neg) * (pos + new_pos) / 2
pos = new_pos
neg = new_neg
auc_value = None
if pos * neg == 0 or total_ins_num == 0:
auc_value = 0.5
auc_value = area / (pos * neg)
return auc_value
def calculate_bucket_error(self, global_pos, global_neg):
num_bucket = len(global_pos)
last_ctr = -1.0
impression_sum = 0.0
ctr_sum = 0.0
click_sum = 0.0
error_sum = 0.0
error_count = 0.0
click = 0.0
show = 0.0
ctr = 0.0
adjust_ctr = 0.0
relative_error = 0.0
actual_ctr = 0.0
relative_ctr_error = 0.0
k_max_span = 0.01
k_relative_error_bound = 0.05
for i in range(num_bucket):
click = global_pos[i]
show = global_pos[i] + global_neg[i]
ctr = float(i) / num_bucket
if abs(ctr - last_ctr) > k_max_span:
last_ctr = ctr
impression_sum = 0.0
ctr_sum = 0.0
click_sum = 0.0
impression_sum += show
ctr_sum += ctr * show
click_sum += click
if impression_sum == 0:
adjust_ctr = ctr_sum / impression_sum
if adjust_ctr == 0:
relative_error = \
math.sqrt((1 - adjust_ctr) / (adjust_ctr * impression_sum))
if relative_error < k_relative_error_bound:
actual_ctr = click_sum / impression_sum
relative_ctr_error = abs(actual_ctr / adjust_ctr - 1)
error_sum += relative_ctr_error * impression_sum
error_count += impression_sum
last_ctr = -1
bucket_error = error_sum / error_count if error_count > 0 else 0.0
return bucket_error
def calculate(self, scope, params):
self._label = params['label']
self._metric_dict = params['metric_dict']
result = self.get_global_metrics(scope, self._metric_dict)
if result['total_ins_num'] == 0:
self._result = result
self._result['auc'] = 0
self._result['bucket_error'] = 0
self._result['actual_ctr'] = 0
self._result['predict_ctr'] = 0
self._result['mae'] = 0
self._result['rmse'] = 0
self._result['copc'] = 0
self._result['mean_q'] = 0
return self._result
if 'stat_pos' in result and 'stat_neg' in result:
result['auc'] = self.calculate_auc(result['stat_pos'],
result['bucket_error'] = self.calculate_auc(result['stat_pos'],
if 'pos_ins_num' in result:
result['actual_ctr'] = result['pos_ins_num'] / result[
if 'abserr' in result:
result['mae'] = result['abserr'] / result['total_ins_num']
if 'sqrerr' in result:
result['rmse'] = math.sqrt(result['sqrerr'] /
if 'prob' in result:
result['predict_ctr'] = result['prob'] / result['total_ins_num']
if abs(result['predict_ctr']) > 1e-6:
result['copc'] = result['actual_ctr'] / result['predict_ctr']
if 'q' in result:
result['mean_q'] = result['q'] / result['total_ins_num']
self._result = result
return result
def get_result(self):
return self._result
def __str__(self):
result = self.get_result()
result_str = "%s AUC=%.6f BUCKET_ERROR=%.6f MAE=%.6f RMSE=%.6f " \
"Actural_CTR=%.6f Predicted_CTR=%.6f COPC=%.6f MEAN Q_VALUE=%.6f Ins number=%s" % \
(self._label, result['auc'], result['bucket_error'], result['mae'], result['rmse'],
result['predict_ctr'], result['copc'], result['mean_q'], result['total_ins_num'])
return result_str
......@@ -18,9 +18,6 @@ import numpy as np
import paddle.fluid as fluid
from paddlerec.core.metric import Metric
from paddle.fluid.layers import nn, accuracy
from paddle.fluid.initializer import Constant
from paddle.fluid.layer_helper import LayerHelper
from paddle.fluid.layers.tensor import Variable
......@@ -18,7 +18,6 @@ import numpy as np
import paddle.fluid as fluid
from paddlerec.core.metric import Metric
from paddle.fluid.layers import nn, accuracy
from paddle.fluid.initializer import Constant
from paddle.fluid.layer_helper import LayerHelper
from paddle.fluid.layers.tensor import Variable
......@@ -18,7 +18,6 @@ import numpy as np
import paddle.fluid as fluid
from paddlerec.core.metric import Metric
from paddle.fluid.layers import nn, accuracy
from paddle.fluid.initializer import Constant
from paddle.fluid.layer_helper import LayerHelper
from paddle.fluid.layers.tensor import Variable
......@@ -18,7 +18,7 @@ import numpy as np
import paddle.fluid as fluid
from paddlerec.core.metric import Metric
from paddle.fluid.layers import nn, accuracy
from paddle.fluid.layers import accuracy
from paddle.fluid.initializer import Constant
from paddle.fluid.layer_helper import LayerHelper
from paddle.fluid.layers.tensor import Variable
## 快速开始
python -m paddlerec.run -m paddlerec.models.recall.gnn
## 数据处理
- Step1: 原始数据数据集下载,本示例提供了两个开源数据集:DIGINETICA和Yoochoose,可选其中任意一个训练本模型。
cd data && python download.py diginetica # or yoochoose
> [Yoochooses](https://2015.recsyschallenge.com/challenge.html)数据集来源于RecSys Challenge 2015,原始数据包含如下字段:
1. Session ID – the id of the session. In one session there are one or many clicks.
2. Timestamp – the time when the click occurred.
3. Item ID – the unique identifier of the item.
4. Category – the category of the item.
> [DIGINETICA](https://competitions.codalab.org/competitions/11161#learn_the_details-data2)数据集来源于CIKM Cup 2016 _Personalized E-Commerce Search Challenge_项目。原始数据包含如下字段:
1. sessionId - the id of the session. In one session there are one or many clicks.
2. userId - the id of the user, with anonymized user ids.
3. itemId - the unique identifier of the item.
4. timeframe - time since the first query in a session, in milliseconds.
5. eventdate - calendar date.
- Step2: 数据预处理
cd data && python preprocess.py --dataset diginetica # or yoochoose
1. 以session_id为key合并原始数据集,得到每个session的日期,及顺序点击列表。
2. 过滤掉长度为1的session;过滤掉点击次数小于5的items。
3. 训练集、测试集划分。原始数据集里最新日期七天内的作为测试集,更早之前的数据作为测试集。
- Step3: 数据整理。 将训练文件统一放在data/train目录下,测试文件统一放在data/test目录下。
cat data/diginetica/train.txt | wc -l >> data/config.txt # or yoochoose1_4 or yoochoose1_64
rm -rf data/train/*
rm -rf data/test/*
mv data/diginetica/train.txt data/train
mv data/diginetica/test.txt data/test
方便起见, 我们提供了一键式数据处理脚本:
sh data_prepare.sh diginetica # or yoochoose1_4 or yoochoose1_64
## 实验配置
1. 真实数据配置。config.yaml中数据集相关配置见`dataset`字段,数据路径通过`data_path`进行配置。用户可以直接将workspace修改为当前项目目录的绝对路径完成设置。
2. 超参配置。
- batch_size: 修改config.yaml中dataset_train数据集的batch_size为100。
- epochs: 修改config.yaml中runner的epochs为5。
- sparse_feature_number: 不同训练数据集(diginetica or yoochoose)配置不一致,diginetica数据集配置为43098,yoochoose数据集配置为37484。具体见数据处理后得到的data/config.txt文件中第一行。
- corpus_size: 不同训练数据集配置不一致,diginetica数据集配置为719470,yoochoose数据集配置为5917745。具体见数据处理后得到的data/config.txt文件中第二行。
## 训练
python -m paddlerec.run -m ./config.yaml
## 测试
1. 修改config.yaml中的mode,为infer_runner。
2. 修改config.yaml中的phase,为phase_infer,需按提示注释掉phase_trainer。
3. 修改config.yaml中dataset_infer数据集的batch_size为100。
python -m paddlerec.run -m ./config.yaml
......@@ -75,10 +75,10 @@ class TestPosNegRatio(unittest.TestCase):
outs = dict(zip(metric_keys, outs))
self.assertTrue(np.allclose(outs['right_cnt'], self.right_cnt))
self.assertTrue(np.allclose(outs['wrong_cnt'], self.wrong_cnt))
self.assertTrue(np.allclose(outs['RightCnt'], self.right_cnt))
self.assertTrue(np.allclose(outs['WrongCnt'], self.wrong_cnt))
np.array((self.right_cnt + 1.0) / (self.wrong_cnt + 1.0
......@@ -145,7 +145,7 @@ class TestPrecisionRecall(unittest.TestCase):
outs = dict(zip(metric_keys, outs))
self.assertTrue(np.allclose(outs['accum_states'], self.states))
self.assertTrue(np.allclose(outs['[TP FP TN FN]'], self.states))
self.assertTrue(np.allclose(outs['precision_recall_f1'], self.metrics))
def test_exception(self):
......@@ -78,10 +78,10 @@ class TestRecallK(unittest.TestCase):
outs = dict(zip(metric_keys, outs))
np.allclose(outs['ins_cnt'], self.ins_num * self.batch_nums))
self.assertTrue(np.allclose(outs['pos_cnt'], self.match_num))
np.allclose(outs['InsCnt'], self.ins_num * self.batch_nums))
self.assertTrue(np.allclose(outs['RecallCnt'], self.match_num))
np.allclose(outs['Recall@%d_ACC' % (self.topk)],
np.allclose(outs['Acc(Recall@%d)' % (self.topk)],
np.array(self.match_num / (self.ins_num *
