提交 75e238df 编写于 作者: X xiexionghang

commit kagle for paddle

上级 dcf92119
"""
"""
import copy import copy
import yaml import yaml
import time import time
...@@ -13,7 +15,8 @@ class Dataset(object): ...@@ -13,7 +15,8 @@ class Dataset(object):
""" """
__metaclass__ = abc.ABCMeta __metaclass__ = abc.ABCMeta
def __init__(self, config): def __init__(self, config):
""" """ """
"""
self._datasets = {} self._datasets = {}
self._config = config self._config = config
...@@ -28,19 +31,23 @@ class Dataset(object): ...@@ -28,19 +31,23 @@ class Dataset(object):
@abc.abstractmethod @abc.abstractmethod
def load_dataset(self, params): def load_dataset(self, params):
""" """ """
"""
pass pass
@abc.abstractmethod @abc.abstractmethod
def preload_dataset(self, params): def preload_dataset(self, params):
""" """ """
"""
pass pass
@abc.abstractmethod @abc.abstractmethod
def release_dataset(self, params): def release_dataset(self, params):
""" """ """
"""
pass pass
class TimeSplitDataset(Dataset): class TimeSplitDataset(Dataset):
""" """
Dataset with time split dir. root_path/$DAY/$HOUR Dataset with time split dir. root_path/$DAY/$HOUR
...@@ -52,7 +59,7 @@ class TimeSplitDataset(Dataset): ...@@ -52,7 +59,7 @@ class TimeSplitDataset(Dataset):
Dataset.__init__(self, config) Dataset.__init__(self, config)
if 'data_donefile' not in config or config['data_donefile'] is None: if 'data_donefile' not in config or config['data_donefile'] is None:
config['data_donefile'] = config['data_path'] + "/to.hadoop.done" config['data_donefile'] = config['data_path'] + "/to.hadoop.done"
self._path_generator = kagle_util.PathGenerator({'templates' : [ self._path_generator = kagle_util.PathGenerator({'templates': [
{'name': 'data_path', 'template': config['data_path']}, {'name': 'data_path', 'template': config['data_path']},
{'name': 'donefile_path', 'template': config['data_donefile']} {'name': 'donefile_path', 'template': config['data_donefile']}
]}) ]})
...@@ -72,7 +79,7 @@ class TimeSplitDataset(Dataset): ...@@ -72,7 +79,7 @@ class TimeSplitDataset(Dataset):
skip_mins = self._split_interval - (mins_of_day % self._split_interval) skip_mins = self._split_interval - (mins_of_day % self._split_interval)
data_time = data_time + datetime.timedelta(minutes=skip_mins) data_time = data_time + datetime.timedelta(minutes=skip_mins)
time_window_mins = time_window_mins - skip_mins time_window_mins = time_window_mins - skip_mins
return data_time,time_window_mins return data_time, time_window_mins
def check_ready(self, daytime_str, time_window_mins): def check_ready(self, daytime_str, time_window_mins):
""" """
...@@ -84,7 +91,7 @@ class TimeSplitDataset(Dataset): ...@@ -84,7 +91,7 @@ class TimeSplitDataset(Dataset):
True/False True/False
""" """
is_ready = True is_ready = True
data_time,windows_mins = self._format_data_time(daytime_str, time_window_mins) data_time, windows_mins = self._format_data_time(daytime_str, time_window_mins)
while time_window_mins > 0: while time_window_mins > 0:
file_path = self._path_generator.generate_path('donefile_path', {'time_format': data_time}) file_path = self._path_generator.generate_path('donefile_path', {'time_format': data_time})
if not self._data_file_handler.is_exist(file_path): if not self._data_file_handler.is_exist(file_path):
...@@ -106,7 +113,7 @@ class TimeSplitDataset(Dataset): ...@@ -106,7 +113,7 @@ class TimeSplitDataset(Dataset):
list, data_shard[node_idx] list, data_shard[node_idx]
""" """
data_file_list = [] data_file_list = []
data_time,windows_mins = self._format_data_time(daytime_str, time_window_mins) data_time, windows_mins = self._format_data_time(daytime_str, time_window_mins)
while time_window_mins > 0: while time_window_mins > 0:
file_path = self._path_generator.generate_path('data_path', {'time_format': data_time}) file_path = self._path_generator.generate_path('data_path', {'time_format': data_time})
sub_file_list = self._data_file_handler.ls(file_path) sub_file_list = self._data_file_handler.ls(file_path)
...@@ -120,6 +127,7 @@ class TimeSplitDataset(Dataset): ...@@ -120,6 +127,7 @@ class TimeSplitDataset(Dataset):
data_time = data_time + datetime.timedelta(minutes=self._split_interval) data_time = data_time + datetime.timedelta(minutes=self._split_interval)
return data_file_list return data_file_list
class FluidTimeSplitDataset(TimeSplitDataset): class FluidTimeSplitDataset(TimeSplitDataset):
""" """
A Dataset with time split for PaddleFluid A Dataset with time split for PaddleFluid
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册