提交 dcf92119 编写于 作者: X xiexionghang

commit kagle for paddle

上级 71fd9646
...@@ -6,32 +6,49 @@ import kagle_fs ...@@ -6,32 +6,49 @@ import kagle_fs
import kagle_util import kagle_util
import kagle_layer import kagle_layer
import paddle.fluid as fluid import paddle.fluid as fluid
from abc import ABCMeta, abstractmethod import abc
class Dataset(object): class Dataset(object):
__metaclass__=ABCMeta """
"""
__metaclass__ = abc.ABCMeta
def __init__(self, config): def __init__(self, config):
""" """
self._datasets = {} self._datasets = {}
self._config = config self._config = config
@abstractmethod @abc.abstractmethod
def check_ready(self, params): def check_ready(self, params):
"""
check data ready or not
Return:
True/False
"""
pass pass
@abstractmethod @abc.abstractmethod
def load_dataset(self, params): def load_dataset(self, params):
""" """
pass pass
@abstractmethod @abc.abstractmethod
def preload_dataset(self, params): def preload_dataset(self, params):
""" """
pass pass
@abstractmethod @abc.abstractmethod
def release_dataset(self, params): def release_dataset(self, params):
""" """
pass pass
class TimeSplitDataset(Dataset): class TimeSplitDataset(Dataset):
"""
Dataset with time split dir. root_path/$DAY/$HOUR
"""
def __init__(self, config): def __init__(self, config):
"""
init data root_path, time_split_interval, data_path_format
"""
Dataset.__init__(self, config) Dataset.__init__(self, config)
if 'data_donefile' not in config or config['data_donefile'] is None: if 'data_donefile' not in config or config['data_donefile'] is None:
config['data_donefile'] = config['data_path'] + "/to.hadoop.done" config['data_donefile'] = config['data_path'] + "/to.hadoop.done"
...@@ -43,6 +60,7 @@ class TimeSplitDataset(Dataset): ...@@ -43,6 +60,7 @@ class TimeSplitDataset(Dataset):
self._data_file_handler = kagle_fs.FileHandler(config) self._data_file_handler = kagle_fs.FileHandler(config)
def _format_data_time(self, daytime_str, time_window_mins): def _format_data_time(self, daytime_str, time_window_mins):
""" """
data_time = kagle_util.make_datetime(daytime_str) data_time = kagle_util.make_datetime(daytime_str)
mins_of_day = data_time.hour * 60 + data_time.minute mins_of_day = data_time.hour * 60 + data_time.minute
begin_stage = mins_of_day / self._split_interval begin_stage = mins_of_day / self._split_interval
...@@ -57,6 +75,14 @@ class TimeSplitDataset(Dataset): ...@@ -57,6 +75,14 @@ class TimeSplitDataset(Dataset):
return data_time,time_window_mins return data_time,time_window_mins
def check_ready(self, daytime_str, time_window_mins): def check_ready(self, daytime_str, time_window_mins):
"""
data in [daytime_str, daytime_str + time_window_mins] is ready or not
Args:
daytime_str: datetime with str format, such as "202001122200" meanings "2020-01-12 22:00"
time_window_mins(int): from daytime_str to daytime_str + time_window_mins
Return:
True/False
"""
is_ready = True is_ready = True
data_time,windows_mins = self._format_data_time(daytime_str, time_window_mins) data_time,windows_mins = self._format_data_time(daytime_str, time_window_mins)
while time_window_mins > 0: while time_window_mins > 0:
...@@ -69,6 +95,16 @@ class TimeSplitDataset(Dataset): ...@@ -69,6 +95,16 @@ class TimeSplitDataset(Dataset):
return is_ready return is_ready
def get_file_list(self, daytime_str, time_window_mins, node_num=1, node_idx=0): def get_file_list(self, daytime_str, time_window_mins, node_num=1, node_idx=0):
"""
data in [daytime_str, daytime_str + time_window_mins], random shard to node_num, return shard[node_idx]
Args:
daytime_str: datetime with str format, such as "202001122200" meanings "2020-01-12 22:00"
time_window_mins(int): from daytime_str to daytime_str + time_window_mins
node_num(int): data split shard num
node_idx(int): shard_idx
Return:
list, data_shard[node_idx]
"""
data_file_list = [] data_file_list = []
data_time,windows_mins = self._format_data_time(daytime_str, time_window_mins) data_time,windows_mins = self._format_data_time(daytime_str, time_window_mins)
while time_window_mins > 0: while time_window_mins > 0:
...@@ -85,10 +121,15 @@ class TimeSplitDataset(Dataset): ...@@ -85,10 +121,15 @@ class TimeSplitDataset(Dataset):
return data_file_list return data_file_list
class FluidTimeSplitDataset(TimeSplitDataset): class FluidTimeSplitDataset(TimeSplitDataset):
"""
A Dataset with time split for PaddleFluid
"""
def __init__(self, config): def __init__(self, config):
""" """
TimeSplitDataset.__init__(self, config) TimeSplitDataset.__init__(self, config)
def _alloc_dataset(self, file_list): def _alloc_dataset(self, file_list):
""" """
dataset = fluid.DatasetFactory().create_dataset(self._config['dataset_type']) dataset = fluid.DatasetFactory().create_dataset(self._config['dataset_type'])
dataset.set_batch_size(self._config['batch_size']) dataset.set_batch_size(self._config['batch_size'])
dataset.set_thread(self._config['load_thread']) dataset.set_thread(self._config['load_thread'])
...@@ -100,7 +141,8 @@ class FluidTimeSplitDataset(TimeSplitDataset): ...@@ -100,7 +141,8 @@ class FluidTimeSplitDataset(TimeSplitDataset):
#dataset.set_fleet_send_batch_size(80000) #dataset.set_fleet_send_batch_size(80000)
return dataset return dataset
def load_dataset(self, params): def load_dataset(self, params):
""" """
begin_time = params['begin_time'] begin_time = params['begin_time']
windown_min = params['time_window_min'] windown_min = params['time_window_min']
if begin_time not in self._datasets: if begin_time not in self._datasets:
...@@ -115,6 +157,7 @@ class FluidTimeSplitDataset(TimeSplitDataset): ...@@ -115,6 +157,7 @@ class FluidTimeSplitDataset(TimeSplitDataset):
return self._datasets[begin_time] return self._datasets[begin_time]
def preload_dataset(self, params): def preload_dataset(self, params):
""" """
begin_time = params['begin_time'] begin_time = params['begin_time']
windown_min = params['time_window_min'] windown_min = params['time_window_min']
if begin_time not in self._datasets: if begin_time not in self._datasets:
...@@ -126,6 +169,7 @@ class FluidTimeSplitDataset(TimeSplitDataset): ...@@ -126,6 +169,7 @@ class FluidTimeSplitDataset(TimeSplitDataset):
return False return False
def release_dataset(self, params): def release_dataset(self, params):
""" """
begin_time = params['begin_time'] begin_time = params['begin_time']
windown_min = params['time_window_min'] windown_min = params['time_window_min']
if begin_time in self._datasets: if begin_time in self._datasets:
......
...@@ -13,64 +13,64 @@ class Metric(object): ...@@ -13,64 +13,64 @@ class Metric(object):
__metaclass__=abc.ABCMeta __metaclass__=abc.ABCMeta
def __init__(self, config): def __init__(self, config):
""" """ """ """
pass pass
@abc.abstractmethod @abc.abstractmethod
def clear(self, scope, params): def clear(self, scope, params):
""" """
clear current value clear current value
Args: Args:
scope: value container scope: value container
params: extend varilable for clear params: extend varilable for clear
""" """
pass pass
@abc.abstractmethod @abc.abstractmethod
def calculate(self, scope, params): def calculate(self, scope, params):
""" """
calculate result calculate result
Args: Args:
scope: value container scope: value container
params: extend varilable for clear params: extend varilable for clear
""" """
pass pass
@abc.abstractmethod @abc.abstractmethod
def get_result(self): def get_result(self):
""" """
Return: Return:
result(dict) : calculate result result(dict) : calculate result
""" """
pass pass
@abc.abstractmethod @abc.abstractmethod
def get_result_to_string(self): def get_result_to_string(self):
""" """
Return: Return:
result(string) : calculate result with string format, for output result(string) : calculate result with string format, for output
""" """
pass pass
class PaddleAUCMetric(Metric): class PaddleAUCMetric(Metric):
""" """
Metric For Paddle Model Metric For Paddle Model
""" """
def __init__(self, config): def __init__(self, config):
""" """ """ """
pass pass
def clear(self, scope, params): def clear(self, scope, params):
""" """
Clear current metric value, usually set to zero Clear current metric value, usually set to zero
Args: Args:
scope : paddle runtime var container scope : paddle runtime var container
params(dict) : params(dict) :
label : a group name for metric label : a group name for metric
metric_dict : current metric_items in group metric_dict : current metric_items in group
Return: Return:
None None
""" """
self._label = params['label'] self._label = params['label']
self._metric_dict = params['metric_dict'] self._metric_dict = params['metric_dict']
self._result = {} self._result = {}
...@@ -87,11 +87,11 @@ class PaddleAUCMetric(Metric): ...@@ -87,11 +87,11 @@ class PaddleAUCMetric(Metric):
metric_var.set(data_array, place) metric_var.set(data_array, place)
def get_metric(self, scope, metric_name): def get_metric(self, scope, metric_name):
""" """
reduce metric named metric_name from all worker reduce metric named metric_name from all worker
Return: Return:
metric reduce result metric reduce result
""" """
metric = np.array(scope.find_var(metric_name).get_tensor()) metric = np.array(scope.find_var(metric_name).get_tensor())
old_metric_shape = np.array(metric.shape) old_metric_shape = np.array(metric.shape)
metric = metric.reshape(-1) metric = metric.reshape(-1)
...@@ -101,11 +101,11 @@ class PaddleAUCMetric(Metric): ...@@ -101,11 +101,11 @@ class PaddleAUCMetric(Metric):
return global_metric[0] return global_metric[0]
def get_global_metrics(self, scope, metric_dict): def get_global_metrics(self, scope, metric_dict):
""" """
reduce all metric in metric_dict from all worker reduce all metric in metric_dict from all worker
Return: Return:
dict : {matric_name : metric_result} dict : {matric_name : metric_result}
""" """
fleet._role_maker._barrier_worker() fleet._role_maker._barrier_worker()
result = {} result = {}
for metric_name in metric_dict: for metric_name in metric_dict:
...@@ -117,7 +117,7 @@ class PaddleAUCMetric(Metric): ...@@ -117,7 +117,7 @@ class PaddleAUCMetric(Metric):
return result return result
def calculate_auc(self, global_pos, global_neg): def calculate_auc(self, global_pos, global_neg):
""" """ """ """
num_bucket = len(global_pos) num_bucket = len(global_pos)
area = 0.0 area = 0.0
pos = 0.0 pos = 0.0
...@@ -142,7 +142,7 @@ class PaddleAUCMetric(Metric): ...@@ -142,7 +142,7 @@ class PaddleAUCMetric(Metric):
return auc_value return auc_value
def calculate_bucket_error(self, global_pos, global_neg): def calculate_bucket_error(self, global_pos, global_neg):
""" """ """ """
num_bucket = len(global_pos) num_bucket = len(global_pos)
last_ctr = -1.0 last_ctr = -1.0
impression_sum = 0.0 impression_sum = 0.0
...@@ -189,7 +189,7 @@ class PaddleAUCMetric(Metric): ...@@ -189,7 +189,7 @@ class PaddleAUCMetric(Metric):
return bucket_error return bucket_error
def calculate(self, scope, params): def calculate(self, scope, params):
""" """ """ """
self._label = params['label'] self._label = params['label']
self._metric_dict = params['metric_dict'] self._metric_dict = params['metric_dict']
fleet._role_maker._barrier_worker() fleet._role_maker._barrier_worker()
...@@ -214,11 +214,11 @@ class PaddleAUCMetric(Metric): ...@@ -214,11 +214,11 @@ class PaddleAUCMetric(Metric):
return result return result
def get_result(self): def get_result(self):
""" """ """ """
return self._result return self._result
def get_result_to_string(self): def get_result_to_string(self):
""" """ """ """
result = self.get_result() result = self.get_result()
result_str = "%s AUC=%.6f BUCKET_ERROR=%.6f MAE=%.6f RMSE=%.6f "\ result_str = "%s AUC=%.6f BUCKET_ERROR=%.6f MAE=%.6f RMSE=%.6f "\
"Actural_CTR=%.6f Predicted_CTR=%.6f COPC=%.6f MEAN Q_VALUE=%.6f Ins number=%s" % \ "Actural_CTR=%.6f Predicted_CTR=%.6f COPC=%.6f MEAN Q_VALUE=%.6f Ins number=%s" % \
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册