kagle_dataset.py 5.7 KB
Newer Older
X
xiexionghang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
import copy
import yaml
import time
import datetime
import kagle_fs
import kagle_util
import kagle_layer
import paddle.fluid as fluid
from abc import ABCMeta, abstractmethod

class Dataset(object):
    __metaclass__=ABCMeta
    def __init__(self, config):
        self._datasets = {}
        self._config = config
    
    @abstractmethod
    def check_ready(self, params):
        pass

    @abstractmethod
    def load_dataset(self, params): 
        pass
    
    @abstractmethod
    def preload_dataset(self, params): 
        pass
    
    @abstractmethod
    def release_dataset(self, params): 
        pass

class TimeSplitDataset(Dataset):
    def __init__(self, config):
        Dataset.__init__(self, config)
        if 'data_donefile' not in config or config['data_donefile'] is None:
            config['data_donefile'] = config['data_path'] + "/to.hadoop.done" 
        self._path_generator = kagle_util.PathGenerator({'templates' : [
            {'name': 'data_path', 'template': config['data_path']},        
            {'name': 'donefile_path', 'template': config['data_donefile']}        
        ]})
        self._split_interval = config['split_interval'] # data split N mins per dir
        self._data_file_handler = kagle_fs.FileHandler(config)

    def _format_data_time(self, daytime_str, time_window_mins):
        data_time = kagle_util.make_datetime(daytime_str)  
        mins_of_day = data_time.hour * 60 + data_time.minute
        begin_stage = mins_of_day / self._split_interval
        end_stage = (mins_of_day + time_window_mins) / self._split_interval
        if begin_stage == end_stage and mins_of_day % self._split_interval != 0:
            return None, 0

        if mins_of_day % self._split_interval != 0:
            skip_mins = self._split_interval - (mins_of_day % self._split_interval)
            data_time = data_time + datetime.timedelta(minutes=skip_mins)
            time_window_mins = time_window_mins - skip_mins 
        return data_time,time_window_mins
    
    def check_ready(self, daytime_str, time_window_mins):
        is_ready = True
        data_time,windows_mins = self._format_data_time(daytime_str, time_window_mins)
        while time_window_mins > 0:
            file_path = self._path_generator.generate_path('donefile_path', {'time_format': data_time}) 
            if not self._data_file_handler.is_exist(file_path):
                is_ready = False
                break
            time_window_mins = time_window_mins - self._split_interval
            data_time = data_time + datetime.timedelta(minutes=self._split_interval)
        return is_ready
        
    def get_file_list(self, daytime_str, time_window_mins, node_num=1, node_idx=0):
        data_file_list = []
        data_time,windows_mins = self._format_data_time(daytime_str, time_window_mins)
        while time_window_mins > 0:
            file_path = self._path_generator.generate_path('data_path', {'time_format': data_time}) 
            sub_file_list = self._data_file_handler.ls(file_path)
            for sub_file in sub_file_list:
                sub_file_name = self._data_file_handler.get_file_name(sub_file)
                if not sub_file_name.startswith(self._config['filename_prefix']):
                    continue
                if hash(sub_file_name) % node_num == node_idx:
                    data_file_list.append(sub_file)
            time_window_mins = time_window_mins - self._split_interval
            data_time = data_time + datetime.timedelta(minutes=self._split_interval)
        return data_file_list 
        
class FluidTimeSplitDataset(TimeSplitDataset):
    def __init__(self, config):
        TimeSplitDataset.__init__(self, config)
    
    def _alloc_dataset(self, file_list):
        dataset = fluid.DatasetFactory().create_dataset(self._config['dataset_type'])
        dataset.set_batch_size(self._config['batch_size'])
        dataset.set_thread(self._config['load_thread'])
        dataset.set_hdfs_config(self._config['fs_name'], self._config['fs_ugi'])
        dataset.set_pipe_command(self._config['data_converter'])
        dataset.set_filelist(file_list)
        dataset.set_use_var(self._config['data_vars'])
        #dataset.set_fleet_send_sleep_seconds(2)
        #dataset.set_fleet_send_batch_size(80000)
        return dataset

    def load_dataset(self, params): 
        begin_time = params['begin_time']
        windown_min = params['time_window_min']
        if begin_time not in self._datasets:
            while self.check_ready(begin_time, windown_min) == False:
                print("dataset not ready, time:" + begin_time)
                time.sleep(30)
            file_list = self.get_file_list(begin_time, windown_min, params['node_num'], params['node_idx'])
            self._datasets[begin_time] = self._alloc_dataset(file_list)
            self._datasets[begin_time].load_into_memory()
        else:
            self._datasets[begin_time].wait_preload_done()
        return self._datasets[begin_time]
    
    def preload_dataset(self, params): 
        begin_time = params['begin_time']
        windown_min = params['time_window_min']
        if begin_time not in self._datasets:
            if self.check_ready(begin_time, windown_min):
                file_list = self.get_file_list(begin_time, windown_min, params['node_num'], params['node_idx'])
                self._datasets[begin_time] = self._alloc_dataset(file_list)
                self._datasets[begin_time].preload_into_memory(self._config['preload_thread'])
                return True
        return False

    def release_dataset(self, params): 
        begin_time = params['begin_time']
        windown_min = params['time_window_min']
        if begin_time in self._datasets:
            self._datasets[begin_time].release_memory()