dataset.py 7.1 KB
Newer Older
X
xiexionghang 已提交
1
"""
X
xiexionghang 已提交
2
Define Dataset
X
xiexionghang 已提交
3
"""
X
xiexionghang 已提交
4
import abc
X
xiexionghang 已提交
5 6 7 8 9
import copy
import yaml
import time
import datetime
import paddle.fluid as fluid
T
tangwei 已提交
10 11 12
import kagle.utils.kagle_fs as kagle_fs
import kagle.utils.kagle_util as kagle_util

X
xiexionghang 已提交
13 14

class Dataset(object):
X
xiexionghang 已提交
15
    """
X
xiexionghang 已提交
16
    Dataset Base
X
xiexionghang 已提交
17 18
    """
    __metaclass__ = abc.ABCMeta
X
xiexionghang 已提交
19
    def __init__(self, config):
X
xiexionghang 已提交
20 21
        """ 
        """
X
xiexionghang 已提交
22 23 24
        self._datasets = {}
        self._config = config
    
X
xiexionghang 已提交
25
    @abc.abstractmethod
X
xiexionghang 已提交
26
    def check_ready(self, params):
X
xiexionghang 已提交
27 28 29 30 31
        """
        check data ready or not
        Return:
            True/False
        """
X
xiexionghang 已提交
32 33
        pass

X
xiexionghang 已提交
34
    @abc.abstractmethod
X
xiexionghang 已提交
35
    def load_dataset(self, params): 
X
xiexionghang 已提交
36
        """R
X
xiexionghang 已提交
37
        """
X
xiexionghang 已提交
38 39
        pass
    
X
xiexionghang 已提交
40
    @abc.abstractmethod
X
xiexionghang 已提交
41
    def preload_dataset(self, params): 
X
xiexionghang 已提交
42
        """R
X
xiexionghang 已提交
43
        """
X
xiexionghang 已提交
44 45
        pass
    
X
xiexionghang 已提交
46
    @abc.abstractmethod
X
xiexionghang 已提交
47
    def release_dataset(self, params): 
X
xiexionghang 已提交
48
        """R 
X
xiexionghang 已提交
49
        """
X
xiexionghang 已提交
50 51
        pass

X
xiexionghang 已提交
52

X
xiexionghang 已提交
53
class TimeSplitDataset(Dataset):
X
xiexionghang 已提交
54 55 56
    """
    Dataset with time split dir.  root_path/$DAY/$HOUR
    """
X
xiexionghang 已提交
57
    def __init__(self, config):
X
xiexionghang 已提交
58 59 60
        """
        init data root_path, time_split_interval, data_path_format
        """
X
xiexionghang 已提交
61 62 63
        Dataset.__init__(self, config)
        if 'data_donefile' not in config or config['data_donefile'] is None:
            config['data_donefile'] = config['data_path'] + "/to.hadoop.done" 
X
xiexionghang 已提交
64
        self._path_generator = kagle_util.PathGenerator({'templates': [
X
xiexionghang 已提交
65 66 67 68 69 70 71
            {'name': 'data_path', 'template': config['data_path']},        
            {'name': 'donefile_path', 'template': config['data_donefile']}        
        ]})
        self._split_interval = config['split_interval'] # data split N mins per dir
        self._data_file_handler = kagle_fs.FileHandler(config)

    def _format_data_time(self, daytime_str, time_window_mins):
X
xiexionghang 已提交
72
        """ """
X
xiexionghang 已提交
73 74 75 76 77 78 79 80 81 82 83
        data_time = kagle_util.make_datetime(daytime_str)  
        mins_of_day = data_time.hour * 60 + data_time.minute
        begin_stage = mins_of_day / self._split_interval
        end_stage = (mins_of_day + time_window_mins) / self._split_interval
        if begin_stage == end_stage and mins_of_day % self._split_interval != 0:
            return None, 0

        if mins_of_day % self._split_interval != 0:
            skip_mins = self._split_interval - (mins_of_day % self._split_interval)
            data_time = data_time + datetime.timedelta(minutes=skip_mins)
            time_window_mins = time_window_mins - skip_mins 
X
xiexionghang 已提交
84
        return data_time, time_window_mins
X
xiexionghang 已提交
85 86
    
    def check_ready(self, daytime_str, time_window_mins):
X
xiexionghang 已提交
87 88 89 90 91 92 93 94
        """
        data in [daytime_str, daytime_str + time_window_mins] is ready or not
        Args:
            daytime_str: datetime with str format, such as "202001122200" meanings "2020-01-12 22:00"
            time_window_mins(int): from daytime_str to daytime_str + time_window_mins
        Return:
            True/False
        """
X
xiexionghang 已提交
95
        is_ready = True
X
xiexionghang 已提交
96
        data_time, windows_mins = self._format_data_time(daytime_str, time_window_mins)
X
xiexionghang 已提交
97 98 99 100 101 102 103 104 105 106
        while time_window_mins > 0:
            file_path = self._path_generator.generate_path('donefile_path', {'time_format': data_time}) 
            if not self._data_file_handler.is_exist(file_path):
                is_ready = False
                break
            time_window_mins = time_window_mins - self._split_interval
            data_time = data_time + datetime.timedelta(minutes=self._split_interval)
        return is_ready
        
    def get_file_list(self, daytime_str, time_window_mins, node_num=1, node_idx=0):
X
xiexionghang 已提交
107 108 109 110 111 112 113 114 115 116
        """
        data in  [daytime_str, daytime_str + time_window_mins], random shard to node_num, return shard[node_idx]
        Args:
            daytime_str: datetime with str format, such as "202001122200" meanings "2020-01-12 22:00"
            time_window_mins(int): from daytime_str to daytime_str + time_window_mins
            node_num(int): data split shard num
            node_idx(int): shard_idx
        Return:
            list, data_shard[node_idx]
        """
X
xiexionghang 已提交
117
        data_file_list = []
X
xiexionghang 已提交
118
        data_time, windows_mins = self._format_data_time(daytime_str, time_window_mins)
X
xiexionghang 已提交
119 120 121 122 123 124 125 126 127 128 129 130 131
        while time_window_mins > 0:
            file_path = self._path_generator.generate_path('data_path', {'time_format': data_time}) 
            sub_file_list = self._data_file_handler.ls(file_path)
            for sub_file in sub_file_list:
                sub_file_name = self._data_file_handler.get_file_name(sub_file)
                if not sub_file_name.startswith(self._config['filename_prefix']):
                    continue
                if hash(sub_file_name) % node_num == node_idx:
                    data_file_list.append(sub_file)
            time_window_mins = time_window_mins - self._split_interval
            data_time = data_time + datetime.timedelta(minutes=self._split_interval)
        return data_file_list 
        
X
xiexionghang 已提交
132

X
xiexionghang 已提交
133
class FluidTimeSplitDataset(TimeSplitDataset):
X
xiexionghang 已提交
134 135 136
    """
    A Dataset with time split for PaddleFluid
    """
X
xiexionghang 已提交
137
    def __init__(self, config):
X
xiexionghang 已提交
138
        """ """
X
xiexionghang 已提交
139 140 141
        TimeSplitDataset.__init__(self, config)
    
    def _alloc_dataset(self, file_list):
X
xiexionghang 已提交
142
        """ """
X
xiexionghang 已提交
143 144 145 146 147 148 149 150 151 152 153
        dataset = fluid.DatasetFactory().create_dataset(self._config['dataset_type'])
        dataset.set_batch_size(self._config['batch_size'])
        dataset.set_thread(self._config['load_thread'])
        dataset.set_hdfs_config(self._config['fs_name'], self._config['fs_ugi'])
        dataset.set_pipe_command(self._config['data_converter'])
        dataset.set_filelist(file_list)
        dataset.set_use_var(self._config['data_vars'])
        #dataset.set_fleet_send_sleep_seconds(2)
        #dataset.set_fleet_send_batch_size(80000)
        return dataset

X
xiexionghang 已提交
154 155
    def load_dataset(self, params):
        """ """ 
X
xiexionghang 已提交
156 157 158 159 160 161 162 163 164 165 166 167 168 169
        begin_time = params['begin_time']
        windown_min = params['time_window_min']
        if begin_time not in self._datasets:
            while self.check_ready(begin_time, windown_min) == False:
                print("dataset not ready, time:" + begin_time)
                time.sleep(30)
            file_list = self.get_file_list(begin_time, windown_min, params['node_num'], params['node_idx'])
            self._datasets[begin_time] = self._alloc_dataset(file_list)
            self._datasets[begin_time].load_into_memory()
        else:
            self._datasets[begin_time].wait_preload_done()
        return self._datasets[begin_time]
    
    def preload_dataset(self, params): 
X
xiexionghang 已提交
170
        """ """
X
xiexionghang 已提交
171 172 173 174 175 176 177 178 179 180 181
        begin_time = params['begin_time']
        windown_min = params['time_window_min']
        if begin_time not in self._datasets:
            if self.check_ready(begin_time, windown_min):
                file_list = self.get_file_list(begin_time, windown_min, params['node_num'], params['node_idx'])
                self._datasets[begin_time] = self._alloc_dataset(file_list)
                self._datasets[begin_time].preload_into_memory(self._config['preload_thread'])
                return True
        return False

    def release_dataset(self, params): 
X
xiexionghang 已提交
182
        """ """
X
xiexionghang 已提交
183 184 185 186
        begin_time = params['begin_time']
        windown_min = params['time_window_min']
        if begin_time in self._datasets:
            self._datasets[begin_time].release_memory()