# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import abc import datetime import time import paddle.fluid as fluid from paddlerec.core.utils import fs as fs from paddlerec.core.utils import util as util class DatasetHolder(object): """ Dataset Base """ __metaclass__ = abc.ABCMeta def __init__(self, config): """ """ self._datasets = {} self._config = config @abc.abstractmethod def check_ready(self, params): """ check data ready or not Return: True/False """ pass @abc.abstractmethod def load_dataset(self, params): """R """ pass @abc.abstractmethod def preload_dataset(self, params): """R """ pass @abc.abstractmethod def release_dataset(self, params): """R """ pass class TimeSplitDatasetHolder(DatasetHolder): """ Dataset with time split dir. root_path/$DAY/$HOUR """ def __init__(self, config): """ init data root_path, time_split_interval, data_path_format """ Dataset.__init__(self, config) if 'data_donefile' not in config or config['data_donefile'] is None: config['data_donefile'] = config['data_path'] + "/to.hadoop.done" self._path_generator = util.PathGenerator({'templates': [ {'name': 'data_path', 'template': config['data_path']}, {'name': 'donefile_path', 'template': config['data_donefile']} ]}) self._split_interval = config['split_interval'] # data split N mins per dir self._data_file_handler = fs.FileHandler(config) def _format_data_time(self, daytime_str, time_window_mins): """ """ data_time = util.make_datetime(daytime_str) mins_of_day = data_time.hour * 60 + data_time.minute begin_stage = mins_of_day / self._split_interval end_stage = (mins_of_day + time_window_mins) / self._split_interval if begin_stage == end_stage and mins_of_day % self._split_interval != 0: return None, 0 if mins_of_day % self._split_interval != 0: skip_mins = self._split_interval - (mins_of_day % self._split_interval) data_time = data_time + datetime.timedelta(minutes=skip_mins) time_window_mins = time_window_mins - skip_mins return data_time, time_window_mins def check_ready(self, daytime_str, time_window_mins): """ data in [daytime_str, daytime_str + time_window_mins] is ready or not Args: daytime_str: datetime with str format, such as "202001122200" meanings "2020-01-12 22:00" time_window_mins(int): from daytime_str to daytime_str + time_window_mins Return: True/False """ is_ready = True data_time, windows_mins = self._format_data_time(daytime_str, time_window_mins) while time_window_mins > 0: file_path = self._path_generator.generate_path('donefile_path', {'time_format': data_time}) if not self._data_file_handler.is_exist(file_path): is_ready = False break time_window_mins = time_window_mins - self._split_interval data_time = data_time + datetime.timedelta(minutes=self._split_interval) return is_ready def get_file_list(self, daytime_str, time_window_mins, node_num=1, node_idx=0): """ data in [daytime_str, daytime_str + time_window_mins], random shard to node_num, return shard[node_idx] Args: daytime_str: datetime with str format, such as "202001122200" meanings "2020-01-12 22:00" time_window_mins(int): from daytime_str to daytime_str + time_window_mins node_num(int): data split shard num node_idx(int): shard_idx Return: list, data_shard[node_idx] """ data_file_list = [] data_time, windows_mins = self._format_data_time(daytime_str, time_window_mins) while time_window_mins > 0: file_path = self._path_generator.generate_path('data_path', {'time_format': data_time}) sub_file_list = self._data_file_handler.ls(file_path) for sub_file in sub_file_list: sub_file_name = self._data_file_handler.get_file_name(sub_file) if not sub_file_name.startswith(self._config['filename_prefix']): continue if hash(sub_file_name) % node_num == node_idx: data_file_list.append(sub_file) time_window_mins = time_window_mins - self._split_interval data_time = data_time + datetime.timedelta(minutes=self._split_interval) return data_file_list def _alloc_dataset(self, file_list): """ """ dataset = fluid.DatasetFactory().create_dataset(self._config['dataset_type']) dataset.set_batch_size(self._config['batch_size']) dataset.set_thread(self._config['load_thread']) dataset.set_hdfs_config(self._config['fs_name'], self._config['fs_ugi']) dataset.set_pipe_command(self._config['data_converter']) dataset.set_filelist(file_list) dataset.set_use_var(self._config['data_vars']) # dataset.set_fleet_send_sleep_seconds(2) # dataset.set_fleet_send_batch_size(80000) return dataset def load_dataset(self, params): """ """ begin_time = params['begin_time'] windown_min = params['time_window_min'] if begin_time not in self._datasets: while self.check_ready(begin_time, windown_min) == False: print("dataset not ready, time:" + begin_time) time.sleep(30) file_list = self.get_file_list(begin_time, windown_min, params['node_num'], params['node_idx']) self._datasets[begin_time] = self._alloc_dataset(file_list) self._datasets[begin_time].load_into_memory() else: self._datasets[begin_time].wait_preload_done() return self._datasets[begin_time] def preload_dataset(self, params): """ """ begin_time = params['begin_time'] windown_min = params['time_window_min'] if begin_time not in self._datasets: if self.check_ready(begin_time, windown_min): file_list = self.get_file_list(begin_time, windown_min, params['node_num'], params['node_idx']) self._datasets[begin_time] = self._alloc_dataset(file_list) self._datasets[begin_time].preload_into_memory(self._config['preload_thread']) return True return False def release_dataset(self, params): """ """ begin_time = params['begin_time'] windown_min = params['time_window_min'] if begin_time in self._datasets: self._datasets[begin_time].release_memory()