diff --git a/kagle/kagle_dataset.py b/kagle/kagle_dataset.py index fa14aa210cf49d0d186cce5d8bcdf4eab61236b7..553c138d3ac4bbdfa27dc0defa683378e1a08713 100755 --- a/kagle/kagle_dataset.py +++ b/kagle/kagle_dataset.py @@ -6,32 +6,49 @@ import kagle_fs import kagle_util import kagle_layer import paddle.fluid as fluid -from abc import ABCMeta, abstractmethod +import abc class Dataset(object): - __metaclass__=ABCMeta + """ + """ + __metaclass__ = abc.ABCMeta def __init__(self, config): + """ """ self._datasets = {} self._config = config - @abstractmethod + @abc.abstractmethod def check_ready(self, params): + """ + check data ready or not + Return: + True/False + """ pass - @abstractmethod + @abc.abstractmethod def load_dataset(self, params): + """ """ pass - @abstractmethod + @abc.abstractmethod def preload_dataset(self, params): + """ """ pass - @abstractmethod + @abc.abstractmethod def release_dataset(self, params): + """ """ pass class TimeSplitDataset(Dataset): + """ + Dataset with time split dir. root_path/$DAY/$HOUR + """ def __init__(self, config): + """ + init data root_path, time_split_interval, data_path_format + """ Dataset.__init__(self, config) if 'data_donefile' not in config or config['data_donefile'] is None: config['data_donefile'] = config['data_path'] + "/to.hadoop.done" @@ -43,6 +60,7 @@ class TimeSplitDataset(Dataset): self._data_file_handler = kagle_fs.FileHandler(config) def _format_data_time(self, daytime_str, time_window_mins): + """ """ data_time = kagle_util.make_datetime(daytime_str) mins_of_day = data_time.hour * 60 + data_time.minute begin_stage = mins_of_day / self._split_interval @@ -57,6 +75,14 @@ class TimeSplitDataset(Dataset): return data_time,time_window_mins def check_ready(self, daytime_str, time_window_mins): + """ + data in [daytime_str, daytime_str + time_window_mins] is ready or not + Args: + daytime_str: datetime with str format, such as "202001122200" meanings "2020-01-12 22:00" + time_window_mins(int): from daytime_str to daytime_str + time_window_mins + Return: + True/False + """ is_ready = True data_time,windows_mins = self._format_data_time(daytime_str, time_window_mins) while time_window_mins > 0: @@ -69,6 +95,16 @@ class TimeSplitDataset(Dataset): return is_ready def get_file_list(self, daytime_str, time_window_mins, node_num=1, node_idx=0): + """ + data in [daytime_str, daytime_str + time_window_mins], random shard to node_num, return shard[node_idx] + Args: + daytime_str: datetime with str format, such as "202001122200" meanings "2020-01-12 22:00" + time_window_mins(int): from daytime_str to daytime_str + time_window_mins + node_num(int): data split shard num + node_idx(int): shard_idx + Return: + list, data_shard[node_idx] + """ data_file_list = [] data_time,windows_mins = self._format_data_time(daytime_str, time_window_mins) while time_window_mins > 0: @@ -85,10 +121,15 @@ class TimeSplitDataset(Dataset): return data_file_list class FluidTimeSplitDataset(TimeSplitDataset): + """ + A Dataset with time split for PaddleFluid + """ def __init__(self, config): + """ """ TimeSplitDataset.__init__(self, config) def _alloc_dataset(self, file_list): + """ """ dataset = fluid.DatasetFactory().create_dataset(self._config['dataset_type']) dataset.set_batch_size(self._config['batch_size']) dataset.set_thread(self._config['load_thread']) @@ -100,7 +141,8 @@ class FluidTimeSplitDataset(TimeSplitDataset): #dataset.set_fleet_send_batch_size(80000) return dataset - def load_dataset(self, params): + def load_dataset(self, params): + """ """ begin_time = params['begin_time'] windown_min = params['time_window_min'] if begin_time not in self._datasets: @@ -115,6 +157,7 @@ class FluidTimeSplitDataset(TimeSplitDataset): return self._datasets[begin_time] def preload_dataset(self, params): + """ """ begin_time = params['begin_time'] windown_min = params['time_window_min'] if begin_time not in self._datasets: @@ -126,6 +169,7 @@ class FluidTimeSplitDataset(TimeSplitDataset): return False def release_dataset(self, params): + """ """ begin_time = params['begin_time'] windown_min = params['time_window_min'] if begin_time in self._datasets: diff --git a/kagle/kagle_metric.py b/kagle/kagle_metric.py index 7087391f02d2a98266ca9e0080936dd555d6e53a..19869051467d03e4791586d78abafd848ae2f233 100755 --- a/kagle/kagle_metric.py +++ b/kagle/kagle_metric.py @@ -13,64 +13,64 @@ class Metric(object): __metaclass__=abc.ABCMeta def __init__(self, config): - """ """ + """ """ pass @abc.abstractmethod def clear(self, scope, params): - """ - clear current value - Args: - scope: value container - params: extend varilable for clear - """ + """ + clear current value + Args: + scope: value container + params: extend varilable for clear + """ pass @abc.abstractmethod def calculate(self, scope, params): - """ - calculate result - Args: - scope: value container - params: extend varilable for clear - """ + """ + calculate result + Args: + scope: value container + params: extend varilable for clear + """ pass @abc.abstractmethod def get_result(self): - """ - Return: - result(dict) : calculate result - """ + """ + Return: + result(dict) : calculate result + """ pass @abc.abstractmethod def get_result_to_string(self): - """ - Return: - result(string) : calculate result with string format, for output - """ + """ + Return: + result(string) : calculate result with string format, for output + """ pass class PaddleAUCMetric(Metric): - """ - Metric For Paddle Model - """ + """ + Metric For Paddle Model + """ def __init__(self, config): - """ """ + """ """ pass def clear(self, scope, params): - """ - Clear current metric value, usually set to zero - Args: - scope : paddle runtime var container - params(dict) : - label : a group name for metric - metric_dict : current metric_items in group - Return: - None - """ + """ + Clear current metric value, usually set to zero + Args: + scope : paddle runtime var container + params(dict) : + label : a group name for metric + metric_dict : current metric_items in group + Return: + None + """ self._label = params['label'] self._metric_dict = params['metric_dict'] self._result = {} @@ -87,11 +87,11 @@ class PaddleAUCMetric(Metric): metric_var.set(data_array, place) def get_metric(self, scope, metric_name): - """ - reduce metric named metric_name from all worker - Return: - metric reduce result - """ + """ + reduce metric named metric_name from all worker + Return: + metric reduce result + """ metric = np.array(scope.find_var(metric_name).get_tensor()) old_metric_shape = np.array(metric.shape) metric = metric.reshape(-1) @@ -101,11 +101,11 @@ class PaddleAUCMetric(Metric): return global_metric[0] def get_global_metrics(self, scope, metric_dict): - """ - reduce all metric in metric_dict from all worker - Return: - dict : {matric_name : metric_result} - """ + """ + reduce all metric in metric_dict from all worker + Return: + dict : {matric_name : metric_result} + """ fleet._role_maker._barrier_worker() result = {} for metric_name in metric_dict: @@ -117,7 +117,7 @@ class PaddleAUCMetric(Metric): return result def calculate_auc(self, global_pos, global_neg): - """ """ + """ """ num_bucket = len(global_pos) area = 0.0 pos = 0.0 @@ -142,7 +142,7 @@ class PaddleAUCMetric(Metric): return auc_value def calculate_bucket_error(self, global_pos, global_neg): - """ """ + """ """ num_bucket = len(global_pos) last_ctr = -1.0 impression_sum = 0.0 @@ -189,7 +189,7 @@ class PaddleAUCMetric(Metric): return bucket_error def calculate(self, scope, params): - """ """ + """ """ self._label = params['label'] self._metric_dict = params['metric_dict'] fleet._role_maker._barrier_worker() @@ -214,11 +214,11 @@ class PaddleAUCMetric(Metric): return result def get_result(self): - """ """ + """ """ return self._result def get_result_to_string(self): - """ """ + """ """ result = self.get_result() result_str = "%s AUC=%.6f BUCKET_ERROR=%.6f MAE=%.6f RMSE=%.6f "\ "Actural_CTR=%.6f Predicted_CTR=%.6f COPC=%.6f MEAN Q_VALUE=%.6f Ins number=%s" % \