dataset_holder.py 8.5 KB
Newer Older
T
tangwei 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

X
xiexionghang 已提交
15
import abc
X
xiexionghang 已提交
16
import datetime
T
tangwei 已提交
17
import time
18
import logging
T
tangwei 已提交
19

X
xiexionghang 已提交
20
import paddle.fluid as fluid
T
tangwei 已提交
21

22 23
from paddlerec.core.utils import fs as fs
from paddlerec.core.utils import util as util
T
tangwei 已提交
24

25 26 27 28
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)

X
xiexionghang 已提交
29

T
for mat  
tangwei 已提交
30
class DatasetHolder(object):
X
xiexionghang 已提交
31
    """
T
tangwei 已提交
32
    Dataset Holder
X
xiexionghang 已提交
33 34
    """
    __metaclass__ = abc.ABCMeta
T
tangwei 已提交
35

X
xiexionghang 已提交
36
    def __init__(self, config):
X
xiexionghang 已提交
37 38
        """ 
        """
X
xiexionghang 已提交
39 40
        self._datasets = {}
        self._config = config
T
tangwei 已提交
41

X
xiexionghang 已提交
42
    @abc.abstractmethod
X
xiexionghang 已提交
43
    def check_ready(self, params):
X
xiexionghang 已提交
44 45 46 47 48
        """
        check data ready or not
        Return:
            True/False
        """
X
xiexionghang 已提交
49 50
        pass

X
xiexionghang 已提交
51
    @abc.abstractmethod
T
tangwei 已提交
52
    def load_dataset(self, params):
X
xiexionghang 已提交
53
        """R
X
xiexionghang 已提交
54
        """
X
xiexionghang 已提交
55
        pass
T
tangwei 已提交
56

X
xiexionghang 已提交
57
    @abc.abstractmethod
T
tangwei 已提交
58
    def preload_dataset(self, params):
X
xiexionghang 已提交
59
        """R
X
xiexionghang 已提交
60
        """
X
xiexionghang 已提交
61
        pass
T
tangwei 已提交
62

X
xiexionghang 已提交
63
    @abc.abstractmethod
T
tangwei 已提交
64
    def release_dataset(self, params):
X
xiexionghang 已提交
65
        """R 
X
xiexionghang 已提交
66
        """
X
xiexionghang 已提交
67 68
        pass

X
xiexionghang 已提交
69

T
for mat  
tangwei 已提交
70
class TimeSplitDatasetHolder(DatasetHolder):
X
xiexionghang 已提交
71 72 73
    """
    Dataset with time split dir.  root_path/$DAY/$HOUR
    """
T
tangwei 已提交
74

X
xiexionghang 已提交
75
    def __init__(self, config):
X
xiexionghang 已提交
76 77 78
        """
        init data root_path, time_split_interval, data_path_format
        """
X
xionghang 已提交
79
        DatasetHolder.__init__(self, config)
X
xiexionghang 已提交
80
        if 'data_donefile' not in config or config['data_donefile'] is None:
T
tangwei 已提交
81
            config['data_donefile'] = config['data_path'] + "/to.hadoop.done"
T
tangwei 已提交
82 83 84 85 86 87 88 89 90 91 92
        self._path_generator = util.PathGenerator({
            'templates': [{
                'name': 'data_path',
                'template': config['data_path']
            }, {
                'name': 'donefile_path',
                'template': config['data_donefile']
            }]
        })
        self._split_interval = config[
            'split_interval']  # data split N mins per dir
T
tangwei 已提交
93
        self._data_file_handler = fs.FileHandler(config)
X
xiexionghang 已提交
94 95

    def _format_data_time(self, daytime_str, time_window_mins):
X
xiexionghang 已提交
96
        """ """
T
tangwei 已提交
97
        data_time = util.make_datetime(daytime_str)
X
xiexionghang 已提交
98 99 100 101 102 103 104
        mins_of_day = data_time.hour * 60 + data_time.minute
        begin_stage = mins_of_day / self._split_interval
        end_stage = (mins_of_day + time_window_mins) / self._split_interval
        if begin_stage == end_stage and mins_of_day % self._split_interval != 0:
            return None, 0

        if mins_of_day % self._split_interval != 0:
T
tangwei 已提交
105 106
            skip_mins = self._split_interval - (mins_of_day %
                                                self._split_interval)
X
xiexionghang 已提交
107
            data_time = data_time + datetime.timedelta(minutes=skip_mins)
T
tangwei 已提交
108
            time_window_mins = time_window_mins - skip_mins
X
xiexionghang 已提交
109
        return data_time, time_window_mins
T
tangwei 已提交
110

X
xiexionghang 已提交
111
    def check_ready(self, daytime_str, time_window_mins):
X
xiexionghang 已提交
112 113 114 115 116 117 118 119
        """
        data in [daytime_str, daytime_str + time_window_mins] is ready or not
        Args:
            daytime_str: datetime with str format, such as "202001122200" meanings "2020-01-12 22:00"
            time_window_mins(int): from daytime_str to daytime_str + time_window_mins
        Return:
            True/False
        """
X
xiexionghang 已提交
120
        is_ready = True
T
tangwei 已提交
121 122
        data_time, windows_mins = self._format_data_time(daytime_str,
                                                         time_window_mins)
X
xiexionghang 已提交
123
        while time_window_mins > 0:
T
tangwei 已提交
124 125
            file_path = self._path_generator.generate_path(
                'donefile_path', {'time_format': data_time})
X
xiexionghang 已提交
126 127 128 129
            if not self._data_file_handler.is_exist(file_path):
                is_ready = False
                break
            time_window_mins = time_window_mins - self._split_interval
T
tangwei 已提交
130 131
            data_time = data_time + datetime.timedelta(
                minutes=self._split_interval)
X
xiexionghang 已提交
132
        return is_ready
T
tangwei 已提交
133

T
tangwei 已提交
134 135 136 137 138
    def get_file_list(self,
                      daytime_str,
                      time_window_mins,
                      node_num=1,
                      node_idx=0):
X
xiexionghang 已提交
139 140 141 142 143 144 145 146 147 148
        """
        data in  [daytime_str, daytime_str + time_window_mins], random shard to node_num, return shard[node_idx]
        Args:
            daytime_str: datetime with str format, such as "202001122200" meanings "2020-01-12 22:00"
            time_window_mins(int): from daytime_str to daytime_str + time_window_mins
            node_num(int): data split shard num
            node_idx(int): shard_idx
        Return:
            list, data_shard[node_idx]
        """
X
xiexionghang 已提交
149
        data_file_list = []
T
tangwei 已提交
150 151
        data_time, windows_mins = self._format_data_time(daytime_str,
                                                         time_window_mins)
X
xiexionghang 已提交
152
        while time_window_mins > 0:
T
tangwei 已提交
153 154
            file_path = self._path_generator.generate_path(
                'data_path', {'time_format': data_time})
X
xiexionghang 已提交
155 156 157
            sub_file_list = self._data_file_handler.ls(file_path)
            for sub_file in sub_file_list:
                sub_file_name = self._data_file_handler.get_file_name(sub_file)
T
tangwei 已提交
158 159
                if not sub_file_name.startswith(self._config[
                        'filename_prefix']):
X
xiexionghang 已提交
160
                    continue
X
xionghang 已提交
161 162 163 164 165 166 167 168
                postfix = sub_file_name.split(self._config['filename_prefix'])[
                    1]
                if postfix.isdigit():
                    if int(postfix) % node_num == node_idx:
                        data_file_list.append(sub_file)
                else:
                    if hash(sub_file_name) % node_num == node_idx:
                        data_file_list.append(sub_file)
X
xiexionghang 已提交
169
            time_window_mins = time_window_mins - self._split_interval
T
tangwei 已提交
170 171
            data_time = data_time + datetime.timedelta(
                minutes=self._split_interval)
T
tangwei 已提交
172 173
        return data_file_list

X
xiexionghang 已提交
174
    def _alloc_dataset(self, file_list):
X
xiexionghang 已提交
175
        """ """
T
tangwei 已提交
176 177
        dataset = fluid.DatasetFactory().create_dataset(self._config[
            'dataset_type'])
X
xiexionghang 已提交
178 179
        dataset.set_batch_size(self._config['batch_size'])
        dataset.set_thread(self._config['load_thread'])
T
tangwei 已提交
180 181
        dataset.set_hdfs_config(self._config['fs_name'],
                                self._config['fs_ugi'])
X
xiexionghang 已提交
182 183 184
        dataset.set_pipe_command(self._config['data_converter'])
        dataset.set_filelist(file_list)
        dataset.set_use_var(self._config['data_vars'])
T
tangwei 已提交
185 186
        # dataset.set_fleet_send_sleep_seconds(2)
        # dataset.set_fleet_send_batch_size(80000)
X
xiexionghang 已提交
187 188
        return dataset

X
xiexionghang 已提交
189
    def load_dataset(self, params):
T
tangwei 已提交
190
        """ """
X
xiexionghang 已提交
191 192 193 194
        begin_time = params['begin_time']
        windown_min = params['time_window_min']
        if begin_time not in self._datasets:
            while self.check_ready(begin_time, windown_min) == False:
195
                logger.info("dataset not ready, time:" + begin_time)
X
xiexionghang 已提交
196
                time.sleep(30)
T
tangwei 已提交
197 198 199
            file_list = self.get_file_list(begin_time, windown_min,
                                           params['node_num'],
                                           params['node_idx'])
X
xiexionghang 已提交
200 201 202 203 204
            self._datasets[begin_time] = self._alloc_dataset(file_list)
            self._datasets[begin_time].load_into_memory()
        else:
            self._datasets[begin_time].wait_preload_done()
        return self._datasets[begin_time]
T
tangwei 已提交
205 206

    def preload_dataset(self, params):
X
xiexionghang 已提交
207
        """ """
X
xiexionghang 已提交
208 209 210 211
        begin_time = params['begin_time']
        windown_min = params['time_window_min']
        if begin_time not in self._datasets:
            if self.check_ready(begin_time, windown_min):
T
tangwei 已提交
212 213 214
                file_list = self.get_file_list(begin_time, windown_min,
                                               params['node_num'],
                                               params['node_idx'])
X
xiexionghang 已提交
215
                self._datasets[begin_time] = self._alloc_dataset(file_list)
T
tangwei 已提交
216 217
                self._datasets[begin_time].preload_into_memory(self._config[
                    'preload_thread'])
X
xiexionghang 已提交
218 219 220
                return True
        return False

T
tangwei 已提交
221
    def release_dataset(self, params):
X
xiexionghang 已提交
222
        """ """
X
xiexionghang 已提交
223 224 225 226
        begin_time = params['begin_time']
        windown_min = params['time_window_min']
        if begin_time in self._datasets:
            self._datasets[begin_time].release_memory()