From 28ed5927f6917be75cca854c0b933596de1dec58 Mon Sep 17 00:00:00 2001 From: xiexionghang Date: Thu, 5 Mar 2020 18:55:00 +0800 Subject: [PATCH] commit kagle for paddle --- kagle/__init__.py | 0 kagle/kagle_dataset.py | 132 +++++++++++ kagle/kagle_fs.py | 116 ++++++++++ kagle/kagle_layer.py | 200 +++++++++++++++++ kagle/kagle_metric.py | 176 +++++++++++++++ kagle/kagle_model.py | 175 +++++++++++++++ kagle/kagle_table.py | 19 ++ kagle/kagle_trainer.py | 373 ++++++++++++++++++++++++++++++++ kagle/kagle_util.py | 220 +++++++++++++++++++ kagle/trainer/__init__.py | 0 kagle/trainer/abacus_trainer.py | 345 +++++++++++++++++++++++++++++ kagle/trainer/kagle_trainer.py | 32 +++ 12 files changed, 1788 insertions(+) create mode 100755 kagle/__init__.py create mode 100755 kagle/kagle_dataset.py create mode 100755 kagle/kagle_fs.py create mode 100755 kagle/kagle_layer.py create mode 100755 kagle/kagle_metric.py create mode 100755 kagle/kagle_model.py create mode 100755 kagle/kagle_table.py create mode 100755 kagle/kagle_trainer.py create mode 100755 kagle/kagle_util.py create mode 100755 kagle/trainer/__init__.py create mode 100755 kagle/trainer/abacus_trainer.py create mode 100755 kagle/trainer/kagle_trainer.py diff --git a/kagle/__init__.py b/kagle/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/kagle/kagle_dataset.py b/kagle/kagle_dataset.py new file mode 100755 index 00000000..fa14aa21 --- /dev/null +++ b/kagle/kagle_dataset.py @@ -0,0 +1,132 @@ +import copy +import yaml +import time +import datetime +import kagle_fs +import kagle_util +import kagle_layer +import paddle.fluid as fluid +from abc import ABCMeta, abstractmethod + +class Dataset(object): + __metaclass__=ABCMeta + def __init__(self, config): + self._datasets = {} + self._config = config + + @abstractmethod + def check_ready(self, params): + pass + + @abstractmethod + def load_dataset(self, params): + pass + + @abstractmethod + def preload_dataset(self, params): + pass + + @abstractmethod + def release_dataset(self, params): + pass + +class TimeSplitDataset(Dataset): + def __init__(self, config): + Dataset.__init__(self, config) + if 'data_donefile' not in config or config['data_donefile'] is None: + config['data_donefile'] = config['data_path'] + "/to.hadoop.done" + self._path_generator = kagle_util.PathGenerator({'templates' : [ + {'name': 'data_path', 'template': config['data_path']}, + {'name': 'donefile_path', 'template': config['data_donefile']} + ]}) + self._split_interval = config['split_interval'] # data split N mins per dir + self._data_file_handler = kagle_fs.FileHandler(config) + + def _format_data_time(self, daytime_str, time_window_mins): + data_time = kagle_util.make_datetime(daytime_str) + mins_of_day = data_time.hour * 60 + data_time.minute + begin_stage = mins_of_day / self._split_interval + end_stage = (mins_of_day + time_window_mins) / self._split_interval + if begin_stage == end_stage and mins_of_day % self._split_interval != 0: + return None, 0 + + if mins_of_day % self._split_interval != 0: + skip_mins = self._split_interval - (mins_of_day % self._split_interval) + data_time = data_time + datetime.timedelta(minutes=skip_mins) + time_window_mins = time_window_mins - skip_mins + return data_time,time_window_mins + + def check_ready(self, daytime_str, time_window_mins): + is_ready = True + data_time,windows_mins = self._format_data_time(daytime_str, time_window_mins) + while time_window_mins > 0: + file_path = self._path_generator.generate_path('donefile_path', {'time_format': data_time}) + if not self._data_file_handler.is_exist(file_path): + is_ready = False + break + time_window_mins = time_window_mins - self._split_interval + data_time = data_time + datetime.timedelta(minutes=self._split_interval) + return is_ready + + def get_file_list(self, daytime_str, time_window_mins, node_num=1, node_idx=0): + data_file_list = [] + data_time,windows_mins = self._format_data_time(daytime_str, time_window_mins) + while time_window_mins > 0: + file_path = self._path_generator.generate_path('data_path', {'time_format': data_time}) + sub_file_list = self._data_file_handler.ls(file_path) + for sub_file in sub_file_list: + sub_file_name = self._data_file_handler.get_file_name(sub_file) + if not sub_file_name.startswith(self._config['filename_prefix']): + continue + if hash(sub_file_name) % node_num == node_idx: + data_file_list.append(sub_file) + time_window_mins = time_window_mins - self._split_interval + data_time = data_time + datetime.timedelta(minutes=self._split_interval) + return data_file_list + +class FluidTimeSplitDataset(TimeSplitDataset): + def __init__(self, config): + TimeSplitDataset.__init__(self, config) + + def _alloc_dataset(self, file_list): + dataset = fluid.DatasetFactory().create_dataset(self._config['dataset_type']) + dataset.set_batch_size(self._config['batch_size']) + dataset.set_thread(self._config['load_thread']) + dataset.set_hdfs_config(self._config['fs_name'], self._config['fs_ugi']) + dataset.set_pipe_command(self._config['data_converter']) + dataset.set_filelist(file_list) + dataset.set_use_var(self._config['data_vars']) + #dataset.set_fleet_send_sleep_seconds(2) + #dataset.set_fleet_send_batch_size(80000) + return dataset + + def load_dataset(self, params): + begin_time = params['begin_time'] + windown_min = params['time_window_min'] + if begin_time not in self._datasets: + while self.check_ready(begin_time, windown_min) == False: + print("dataset not ready, time:" + begin_time) + time.sleep(30) + file_list = self.get_file_list(begin_time, windown_min, params['node_num'], params['node_idx']) + self._datasets[begin_time] = self._alloc_dataset(file_list) + self._datasets[begin_time].load_into_memory() + else: + self._datasets[begin_time].wait_preload_done() + return self._datasets[begin_time] + + def preload_dataset(self, params): + begin_time = params['begin_time'] + windown_min = params['time_window_min'] + if begin_time not in self._datasets: + if self.check_ready(begin_time, windown_min): + file_list = self.get_file_list(begin_time, windown_min, params['node_num'], params['node_idx']) + self._datasets[begin_time] = self._alloc_dataset(file_list) + self._datasets[begin_time].preload_into_memory(self._config['preload_thread']) + return True + return False + + def release_dataset(self, params): + begin_time = params['begin_time'] + windown_min = params['time_window_min'] + if begin_time in self._datasets: + self._datasets[begin_time].release_memory() diff --git a/kagle/kagle_fs.py b/kagle/kagle_fs.py new file mode 100755 index 00000000..aeb025ed --- /dev/null +++ b/kagle/kagle_fs.py @@ -0,0 +1,116 @@ +import os +import time +from paddle.fluid.incubate.fleet.utils.hdfs import HDFSClient + +def is_afs_path(path): + if path.startswith("afs") or path.startswith("hdfs"): + return True + return False + +class LocalFSClient: + def __init__(self): + pass + def write(self, content, path, mode): + temp_dir = os.path.dirname(path) + if not os.path.exists(temp_dir): + os.makedirs(temp_dir) + f = open(path, mode) + f.write(content) + f.flush() + f.close() + + def cp(self, org_path, dest_path): + temp_dir = os.path.dirname(dest_path) + if not os.path.exists(temp_dir): + os.makedirs(temp_dir) + return os.system("cp -r " + org_path + " " + dest_path) + + def cat(self, file_path): + f = open(file_path) + content = f.read() + f.close() + return content + + def mkdir(self, dir_name): + os.system("mkdir -p " + path) + + def remove(self, path): + os.system("rm -rf " + path) + + def is_exist(self, path): + if os.system("ls " + path) == 0: + return True + return False + + def ls(self, path): + files = os.listdir(path) + files = [ path + '/' + fi for fi in files ] + return files + +class FileHandler: + def __init__(self, config): + if 'fs_name' in config: + hadoop_home="$HADOOP_HOME" + hdfs_configs = { + "hadoop.job.ugi": config['fs_ugi'], + "fs.default.name": config['fs_name'] + } + self._hdfs_client = HDFSClient(hadoop_home, hdfs_configs) + self._local_fs_client = LocalFSClient() + + def is_exist(self, path): + if is_afs_path(path): + return self._hdfs_client.is_exist(path) + else: + return self._local_fs_client.is_exist(path) + + def get_file_name(self, path): + sub_paths = path.split('/') + return sub_paths[-1] + + def write(self, content, dest_path, mode='w'): + if is_afs_path(dest_path): + file_name = self.get_file_name(dest_path) + temp_local_file = "./tmp/" + file_name + self._local_fs_client.remove(temp_local_file) + org_content = "" + if mode.find('a') >= 0: + org_content = self._hdfs_client.cat(dest_path) + content = content + org_content + self._local_fs_client.write(content, temp_local_file, mode) #fleet hdfs_client only support upload, so write tmp file + self._hdfs_client.delete(dest_path + ".tmp") + self._hdfs_client.upload(dest_path + ".tmp", temp_local_file) + self._hdfs_client.delete(dest_path + ".bak") + self._hdfs_client.rename(dest_path, dest_path + '.bak') + self._hdfs_client.rename(dest_path + ".tmp", dest_path) + else: + self._local_fs_client.write(content, dest_path, mode) + + + def cat(self, path): + if is_afs_path(path): + print("xxh go cat " + path) + hdfs_cat = self._hdfs_client.cat(path) + print(hdfs_cat) + return hdfs_cat + else: + return self._local_fs_client.cat(path) + + def ls(self, path): + if is_afs_path(path): + return self._hdfs_client.ls(path) + else: + return self._local_fs_client.ls(path) + + + def cp(self, org_path, dest_path): + org_is_afs = is_afs_path(org_path) + dest_is_afs = is_afs_path(dest_path) + if not org_is_afs and not dest_is_afs: + return self._local_fs_client.cp(org_path, dest_path) + if not org_is_afs and dest_is_afs: + return self._hdfs_client.upload(dest_path, org_path) + if org_is_afs and not dest_is_afs: + return self._hdfs_client.download(org_path, dest_path) + print("Not Suppor hdfs cp currently") + diff --git a/kagle/kagle_layer.py b/kagle/kagle_layer.py new file mode 100755 index 00000000..c78adb83 --- /dev/null +++ b/kagle/kagle_layer.py @@ -0,0 +1,200 @@ +import paddle.fluid as fluid +from abc import ABCMeta, abstractmethod + +class Layer(object): + __metaclass__=ABCMeta + + def __init__(self, config): + pass + + def generate(self, mode, param): + if mode == 'fluid': + return self.generate_fluid(param) + elif mode == 'tensorflow': + return self.generate_tensorflow(param) + print ('unsupport this mode: ' + mode) + return None,None + + @abstractmethod + def generate_fluid(self, param): + pass + + # maybe + #@abstractmethod + def generate_tensorflow(self, param): + pass + +class EmbeddingInputLayer(Layer): + def __init__(self, config): + self._cvm = config['cvm'] + self._name = config['name'] + self._slots = [ str(slot) for slot in config['slots'] ] + self._mf_dim = config['mf_dim'] + self._backward = config['backward'] + self._emb_dim = self._mf_dim + 3 #append show ctr lr + self._emb_layers = [] + + def generate_fluid(self, param): + show_clk = fluid.layers.concat( + [param['layer']['show'], param['layer']['click']], axis=1) + show_clk.stop_gradient = True + data_var = [] + for slot in self._slots: + l = fluid.layers.data(name=slot, shape=[1], dtype="int64", lod_level=1) + data_var.append(l) + emb = fluid.layers.embedding(input=l, size=[10, self._emb_dim], is_sparse = True, is_distributed=True, param_attr=fluid.ParamAttr(name="embedding")) + emb = fluid.layers.sequence_pool(input=emb, pool_type='sum') + emb = fluid.layers.continuous_value_model(emb, show_clk, self._cvm) + self._emb_layers.append(emb) + output = fluid.layers.concat(input=self._emb_layers, axis=1, name=self._name) + return output, {'data_var' : data_var} + +class LabelInputLayer(Layer): + def __init__(self, config): + self._name = config['name'] + self._dim = config.get('dim', 1) + self._data_type = config.get('data_type', "int64") + self._label_idx = config['label_idx'] + + def generate_fluid(self, param): + label = fluid.layers.data(name=self._name, shape=[-1, self._dim], dtype=self._data_type, lod_level=0, append_batch_size=False) + cast_label = fluid.layers.cast(label, dtype='float32') + cast_label.stop_gradient = True + return cast_label, {'data_var' : [label]} + +class TagInputLayer(Layer): + def __init__(self, config): + self._name = config['name'] + self._tag = config['tag'] + self._dim = config.get('dim', 1) + self._data_type = config['data_type'] + + def generate_fluid(self, param): + output = fluid.layers.data(name=self._name, shape=[-1, self._dim], dtype=self._data_type, lod_level=0, append_batch_size=False, stop_gradient=True) + return output, {'data_var' : [output]} + +class ParamLayer(Layer): + def __init__(self, config): + self._name = config['name'] + self._coln = config['coln'] + self._table_id = config.get('table_id', -1) + self._init_range = config.get('init_range', 1) + self._data_type = config.get('data_type', 'float32') + self._config = config + + def generate_fluid(self, param): + return self._config, {'inference_param': {'name':'param', 'params': [], 'table_id': self._table_id}} + +class SummaryLayer(Layer): + def __init__(self, config): + self._name = config['name'] + self._table_id = config.get('table_id', -1) + self._data_type = config.get('data_type', 'float32') + self._config = config + + def generate_fluid(self, param): + return self._config, {'inference_param': {'name':'summary', 'params': [], 'table_id': self._table_id}} + +class NormalizetionLayer(Layer): + def __init__(self, config): + self._name = config['name'] + self._input = config['input'] + self._summary = config['summary'] + self._table_id = config.get('table_id', -1) + + def generate_fluid(self, param): + input_layer = param['layer'][self._input[0]] + summary_layer = param['layer'][self._summary] + if len(self._input) > 0: + input_list=[ param['layer'][i] for i in self._input ] + input_layer = fluid.layers.concat(input=input_list, axis=1) + bn = fluid.layers.data_norm(input=input_layer, name=self._name, epsilon=1e-4, param_attr={ + "batch_size":1e4, + "batch_sum_default":0.0, + "batch_square":1e4}) + inference_param = [ self._name + '.batch_size', self._name + '.batch_sum', self._name + '.batch_square_sum' ] + return bn, {'inference_param' : { 'name':'summary', 'params': inference_param, 'table_id': summary_layer.get('table_id', -1)}} + +class NeuralLayer(Layer): + def __init__(self, config): + self._name = config['name'] + self._param = config['param'] + self._input = config['input'] + self._bias = config.get('bias', True) + self._act_func = config.get('act_func', None) + + def generate_fluid(self, param): + param_layer = param['layer'][self._param] + input_layer = param['layer'][self._input[0]] + if len(self._input) > 0: + input_list=[ param['layer'][i] for i in self._input ] + input_layer = fluid.layers.concat(input=input_list, axis=1) + input_coln = input_layer.shape[1] + scale = param_layer['init_range'] / (input_coln ** 0.5) + bias = None + if self._bias: + bias = fluid.ParamAttr(learning_rate=1.0, initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=scale)) + fc = fluid.layers.fc( + name = self._name, + input = input_layer, + size = param_layer['coln'], + act = self._act_func, + param_attr = \ + fluid.ParamAttr(learning_rate=1.0, \ + initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=scale)), + bias_attr = bias) + inference_param = [self._name + '.w_0', self._name + '.b_0'] + return fc, {'inference_param' : {'name':'param', 'params': inference_param, 'table_id': param_layer.get('table_id', -1)}} + +class SigmoidLossLayer(Layer): + def __init__(self, config): + self._name = config['name'] + self._label = config['label'] + self._input = config['input'] + self._weight = config.get('weight', None) + self._metric_label = config.get('metric_label', None) + self._bound = config.get('bound', [-15.0, 15.0]) + self._extend_output = { + 'metric_label' : self._metric_label, + 'metric_dict' : { + 'auc' : { 'var' : None}, + 'batch_auc' : {'var' : None}, + 'stat_pos' : {'var' : None, 'data_type' : 'int64'}, + 'stat_neg' : {'var' : None, 'data_type' : 'int64'}, + 'batch_stat_pos' : {'var' : None, 'data_type' : 'int64'}, + 'batch_stat_neg' : {'var' : None, 'data_type' : 'int64'}, + 'pos_ins_num' : {'var' : None}, + 'abserr': {'var' : None}, + 'sqrerr': {'var' : None}, + 'prob': {'var' : None}, + 'total_ins_num': {'var' : None}, + 'q': {'var' : None} + } + } + + + def generate_fluid(self, param): + input_layer = param['layer'][self._input[0]] + label_layer = param['layer'][self._label] + output = fluid.layers.clip(input_layer, self._bound[0], self._bound[1], name = self._name) + norm = fluid.layers.sigmoid(output, name=self._name) + output = fluid.layers.log_loss(norm, fluid.layers.cast(x=label_layer, dtype='float32')) + if self._weight: + weight_layer = param['layer'][self._weight] + output = fluid.layers.elementwise_mul(output, weight_layer) + output = fluid.layers.mean(x=output) + self._extend_output['loss'] = output + + #For AUC Metric + metric = self._extend_output['metric_dict'] + binary_predict = fluid.layers.concat( + input=[fluid.layers.elementwise_sub(fluid.layers.ceil(norm), norm), norm], axis=1) + metric['auc']['var'], metric['batch_auc']['var'], [metric['batch_stat_pos']['var'], \ + metric['batch_stat_neg']['var'], metric['stat_pos']['var'], metric['stat_neg']['var']] = \ + fluid.layers.auc(input=binary_predict, label=fluid.layers.cast(x=label_layer, dtype='int64'), curve='ROC', num_thresholds=32) + + metric['sqrerr']['var'], metric['abserr']['var'], metric['prob']['var'], metric['q']['var'], \ + metric['pos_ins_num']['var'], metric['total_ins_num']['var'] = \ + fluid.contrib.layers.ctr_metric_bundle(norm, fluid.layers.cast(x=label_layer, dtype='float32')) + + return norm, self._extend_output diff --git a/kagle/kagle_metric.py b/kagle/kagle_metric.py new file mode 100755 index 00000000..a92e59af --- /dev/null +++ b/kagle/kagle_metric.py @@ -0,0 +1,176 @@ +import math +import time +import numpy as np +import kagle_util +import paddle.fluid as fluid +from abc import ABCMeta, abstractmethod +from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet + +class Metric(object): + __metaclass__=ABCMeta + + def __init__(self, config): + pass + + @abstractmethod + def clear(self, scope, params): + pass + + @abstractmethod + def calculate(self, scope, params): + pass + + @abstractmethod + def get_result(self): + pass + + @abstractmethod + def get_result_to_string(self): + pass + +class PaddleAUCMetric(Metric): + def __init__(self, config): + pass + + def clear(self, scope, params): + self._label = params['label'] + self._metric_dict = params['metric_dict'] + self._result = {} + place=fluid.CPUPlace() + for metric_name in self._metric_dict: + metric_config = self._metric_dict[metric_name] + if scope.find_var(metric_config['var'].name) is None: + continue + metric_var = scope.var(metric_config['var'].name).get_tensor() + data_type = 'float32' + if 'data_type' in metric_config: + data_type = metric_config['data_type'] + data_array = np.zeros(metric_var._get_dims()).astype(data_type) + metric_var.set(data_array, place) + pass + + + def get_metric(self, scope, metric_name): + metric = np.array(scope.find_var(metric_name).get_tensor()) + old_metric_shape = np.array(metric.shape) + metric = metric.reshape(-1) + global_metric = np.copy(metric) * 0 + fleet._role_maker._node_type_comm.Allreduce(metric, global_metric) + global_metric = global_metric.reshape(old_metric_shape) + return global_metric[0] + + def get_global_metrics(self, scope, metric_dict): + fleet._role_maker._barrier_worker() + result = {} + for metric_name in metric_dict: + metric_item = metric_dict[metric_name] + if scope.find_var(metric_item['var'].name) is None: + result[metric_name] = None + continue + result[metric_name] = self.get_metric(scope, metric_item['var'].name) + return result + + def calculate_auc(self, global_pos, global_neg): + num_bucket = len(global_pos) + area = 0.0 + pos = 0.0 + neg = 0.0 + new_pos = 0.0 + new_neg = 0.0 + total_ins_num = 0 + for i in xrange(num_bucket): + index = num_bucket - 1 - i + new_pos = pos + global_pos[index] + total_ins_num += global_pos[index] + new_neg = neg + global_neg[index] + total_ins_num += global_neg[index] + area += (new_neg - neg) * (pos + new_pos) / 2 + pos = new_pos + neg = new_neg + auc_value = None + if pos * neg == 0 or total_ins_num == 0: + auc_value = 0.5 + else: + auc_value = area / (pos * neg) + return auc_value + + def calculate_bucket_error(self, global_pos, global_neg): + num_bucket = len(global_pos) + last_ctr = -1.0 + impression_sum = 0.0 + ctr_sum = 0.0 + click_sum = 0.0 + error_sum = 0.0 + error_count = 0.0 + click = 0.0 + show = 0.0 + ctr = 0.0 + adjust_ctr = 0.0 + relative_error = 0.0 + actual_ctr = 0.0 + relative_ctr_error = 0.0 + k_max_span = 0.01 + k_relative_error_bound = 0.05 + for i in xrange(num_bucket): + click = global_pos[i] + show = global_pos[i] + global_neg[i] + ctr = float(i) / num_bucket + if abs(ctr - last_ctr) > k_max_span: + last_ctr = ctr + impression_sum = 0.0 + ctr_sum = 0.0 + click_sum = 0.0 + impression_sum += show + ctr_sum += ctr * show + click_sum += click + if impression_sum == 0: + continue + adjust_ctr = ctr_sum / impression_sum + if adjust_ctr == 0: + continue + relative_error = \ + math.sqrt((1 - adjust_ctr) / (adjust_ctr * impression_sum)) + if relative_error < k_relative_error_bound: + actual_ctr = click_sum / impression_sum + relative_ctr_error = abs(actual_ctr / adjust_ctr - 1) + error_sum += relative_ctr_error * impression_sum + error_count += impression_sum + last_ctr = -1 + + bucket_error = error_sum / error_count if error_count > 0 else 0.0 + return bucket_error + + def calculate(self, scope, params): + self._label = params['label'] + self._metric_dict = params['metric_dict'] + fleet._role_maker._barrier_worker() + result = self.get_global_metrics(scope, self._metric_dict) + if 'stat_pos' in result and 'stat_neg' in result: + result['auc'] = self.calculate_auc(result['stat_pos'], result['stat_neg']) + result['bucket_error'] = self.calculate_auc(result['stat_pos'], result['stat_neg']) + if 'pos_ins_num' in result: + result['actual_ctr'] = result['pos_ins_num'] / result['total_ins_num'] + if 'abserr' in result: + result['mae'] = result['abserr'] / result['total_ins_num'] + if 'sqrerr' in result: + result['rmse'] = math.sqrt(result['sqrerr'] / result['total_ins_num']) + if 'prob' in result: + result['predict_ctr'] = result['prob'] / result['total_ins_num'] + if abs(result['predict_ctr']) > 1e-6: + result['copc'] = result['actual_ctr'] / result['predict_ctr'] + + if 'q' in result: + result['mean_q'] = result['q'] / result['total_ins_num'] + self._result = result + return result + + def get_result(self): + return self._result + + def get_result_to_string(self): + result = self.get_result() + result_str = "%s AUC=%.6f BUCKET_ERROR=%.6f MAE=%.6f RMSE=%.6f "\ + "Actural_CTR=%.6f Predicted_CTR=%.6f COPC=%.6f MEAN Q_VALUE=%.6f Ins number=%s" % \ + (self._label, result['auc'], result['bucket_error'], result['mae'], result['rmse'], result['actual_ctr'], + result['predict_ctr'], result['copc'], result['mean_q'], result['total_ins_num']) + return result_str diff --git a/kagle/kagle_model.py b/kagle/kagle_model.py new file mode 100755 index 00000000..ad6b234e --- /dev/null +++ b/kagle/kagle_model.py @@ -0,0 +1,175 @@ +import copy +import yaml +import kagle_layer +import kagle_table +import paddle.fluid as fluid +from abc import ABCMeta, abstractmethod +from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet + +def create(config): + model = None + if config['mode'] == 'fluid': + model = FluidModel(config) + model.build_model() + return model + +class Model(object): + __metaclass__=ABCMeta + + def __init__(self, config): + self._config = config + self._name = config['name'] + f = open(config['layer_file'], 'r') + self._build_nodes = yaml.safe_load(f.read()) + self._build_phase = ['input', 'param', 'summary', 'layer'] + self._build_param = {'layer': {}, 'inner_layer':{}, 'layer_extend': {}, 'model': {}} + self._inference_meta = {'dependency':{}, 'params': {}} + self._cost = None + self._metrics = {} + self._data_var = [] + pass + + def get_cost_op(self): + return self._cost + + def get_metrics(self): + return self._metrics + + @abstractmethod + def shrink(self, params): + pass + + @abstractmethod + def build_model(self): + pass + + @abstractmethod + def dump_model_program(self, path): + pass + + @abstractmethod + def dump_inference_param(self, params): + pass + @abstractmethod + def dump_inference_program(self, inference_layer, path): + pass + + def inference_params(self, inference_layer): + layer = inference_layer + if layer in self._inference_meta['params']: + return self._inference_meta['params'][layer] + + self._inference_meta['params'][layer] = [] + self._inference_meta['dependency'][layer] = self.get_dependency(self._build_param['inner_layer'], layer) + for node in self._build_nodes['layer']: + if node['name'] not in self._inference_meta['dependency'][layer]: + continue + if 'inference_param' in self._build_param['layer_extend'][node['name']]: + self._inference_meta['params'][layer] += self._build_param['layer_extend'][node['name']]['inference_param']['params'] + return self._inference_meta['params'][layer] + + def get_dependency(self, layer_graph, dest_layer): + dependency_list = [] + if dest_layer in layer_graph: + dependencys = copy.deepcopy(layer_graph[dest_layer]['input']) + dependency_list = copy.deepcopy(dependencys) + for dependency in dependencys: + dependency_list = dependency_list + self.get_dependency(layer_graph, dependency) + return list(set(dependency_list)) + + +class FluidModel(Model): + def __init__(self, config): + Model.__init__(self, config) + pass + + def build_model(self): + for layer in self._build_nodes['layer']: + self._build_param['inner_layer'][layer['name']] = layer + + + self._build_param['table'] = {} + self._build_param['model']['train_program'] = fluid.Program() + self._build_param['model']['startup_program'] = fluid.Program() + with fluid.program_guard(self._build_param['model']['train_program'], self._build_param['model']['startup_program']): + with fluid.unique_name.guard(): + for phase in self._build_phase: + if self._build_nodes[phase] is None: + continue + for node in self._build_nodes[phase]: + exec("""layer=kagle_layer.{}(node)""".format(node['class'])) + layer_output, extend_output = layer.generate(self._config['mode'], self._build_param) + self._build_param['layer'][node['name']] = layer_output + self._build_param['layer_extend'][node['name']] = extend_output + if extend_output is None: + continue + if 'loss' in extend_output: + if self._cost is None: + self._cost = extend_output['loss'] + else: + self._cost += extend_output['loss'] + if 'data_var' in extend_output: + self._data_var += extend_output['data_var'] + if 'metric_label' in extend_output and extend_output['metric_label'] is not None: + self._metrics[extend_output['metric_label']] = extend_output['metric_dict'] + + if 'inference_param' in extend_output: + param_name = extend_output['inference_param']['name'] + if param_name not in self._build_param['table']: + self._build_param['table'][param_name] = {'params':[]} + table_meta = kagle_table.TableMeta.alloc_new_table(extend_output['inference_param']['table_id']) + self._build_param['table'][param_name]['_meta'] = table_meta + self._build_param['table'][param_name]['params'] += extend_output['inference_param']['params'] + pass + + @classmethod + def build_optimizer(self, params): + optimizer_conf = params['optimizer_conf'] + strategy = None + if 'strategy' in optimizer_conf: + strategy = optimizer_conf['strategy'] + stat_var_names = [] + metrics = params['metrics'] + for name in metrics: + model_metrics = metrics[name] + stat_var_names += [ model_metrics[metric]['var'].name for metric in model_metrics] + strategy['stat_var_names'] = list(set(stat_var_names)) + optimizer_generator = 'optimizer = fluid.optimizer.' + optimizer_conf['class'] + '(learning_rate=' + str(optimizer_conf['learning_rate']) + ')' + exec(optimizer_generator) + optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) + return optimizer + + def dump_model_program(self, path): + with open(path + '/' + self._name + '_main_program.pbtxt', "w") as fout: + print >> fout, self._build_param['model']['train_program'] + with open(path + '/' + self._name + '_startup_program.pbtxt', "w") as fout: + print >> fout, self._build_param['model']['startup_program'] + pass + + def shrink(self, params): + scope = params['scope'] + decay = params['decay'] + for param_table in self._build_param['table']: + table_id = self._build_param['table'][param_table]['_meta']._table_id + fleet.shrink_dense_table(decay, scope=scope, table_id=table_id) + + def dump_inference_program(self, inference_layer, path): + pass + + def dump_inference_param(self, params): + scope = params['scope'] + executor = params['executor'] + program = self._build_param['model']['train_program'] + for table_name,table in self._build_param['table'].items(): + fleet._fleet_ptr.pull_dense(scope, table['_meta']._table_id, table['params']) + for infernce_item in params['inference_list']: + params_name_list = self.inference_params(infernce_item['layer_name']) + params_var_list = [ program.global_block().var(i) for i in params_name_list ] + params_file_name = infernce_item['save_file_name'] + with fluid.scope_guard(scope): + if params['save_combine']: + fluid.io.save_vars( + executor, "./", program, vars=params_var_list, filename=params_file_name) + else: + fluid.io.save_vars(executor, params_file_name, program, vars=params_var_list) + pass diff --git a/kagle/kagle_table.py b/kagle/kagle_table.py new file mode 100755 index 00000000..91aefa30 --- /dev/null +++ b/kagle/kagle_table.py @@ -0,0 +1,19 @@ +import copy +import yaml +from abc import ABCMeta, abstractmethod + +class TableMeta: + TableId = 1 + + @staticmethod + def alloc_new_table(table_id): + if table_id < 0: + table_id = TableMeta.TableId + if table_id >= TableMeta.TableId: + TableMeta.TableId += 1 + table = TableMeta(table_id) + return table + + def __init__(self, table_id): + self._table_id = table_id + pass diff --git a/kagle/kagle_trainer.py b/kagle/kagle_trainer.py new file mode 100755 index 00000000..20bd0f19 --- /dev/null +++ b/kagle/kagle_trainer.py @@ -0,0 +1,373 @@ +import sys +import copy +import yaml +import time +import json +import datetime +import kagle_fs +import kagle_util +import kagle_model +import kagle_dataset +import kagle_metric +import paddle.fluid as fluid +from abc import ABCMeta, abstractmethod +from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet + +class Trainer(object): + __metaclass__=ABCMeta + def __init__(self, config): + self._status_processor = {} + self._context = {'status': 'uninit', 'is_exit': False} + + def regist_context_processor(self, status_name, processor): + self._status_processor[status_name] = processor + + def context_process(self, context): + if context['status'] in self._status_processor: + self._status_processor[context['status']](context) + else: + self.other_status_processor(context) + + def other_status_processor(self, context): + print('unknow context_status:%s, do nothing' % context['status']) + time.sleep(60) + + def reload_train_context(self): + pass + + def run(self): + while True: + self.reload_train_context() + self.context_process(self._context) + if self._context['is_exit']: + break + +class AbacusPaddleTrainer(Trainer): + def __init__(self, config): + Trainer.__init__(self, config) + config['output_path'] = kagle_util.get_absolute_path( + config['output_path'], config['io']['afs']) + self.global_config = config + self._place = fluid.CPUPlace() + self._exe = fluid.Executor(self._place) + self._exector_context = {} + self._metrics = {} + self._path_generator = kagle_util.PathGenerator({ + 'templates' : [ + {'name': 'xbox_base_done', 'template': config['output_path'] + '/xbox_base_done.txt'}, + {'name': 'xbox_delta_done', 'template': config['output_path'] + '/xbox_patch_done.txt'}, + {'name': 'xbox_base', 'template': config['output_path'] + '/xbox/{day}/base/'}, + {'name': 'xbox_delta', 'template': config['output_path'] + '/xbox/{day}/delta-{pass_id}/'}, + {'name': 'batch_model', 'template': config['output_path'] + '/batch_model/{day}/{pass_id}/'} + ] + }) + if 'path_generator' in config: + self._path_generator.add_path_template(config['path_generator']) + + self.regist_context_processor('uninit', self.init) + self.regist_context_processor('startup', self.startup) + self.regist_context_processor('begin_day', self.begin_day) + self.regist_context_processor('train_pass', self.train_pass) + self.regist_context_processor('end_day', self.end_day) + + def init(self, context): + fleet.init(self._exe) + data_var_list = [] + data_var_name_dict = {} + runnnable_scope = [] + runnnable_cost_op = [] + context['status'] = 'startup' + + for executor in self.global_config['executor']: + scope = fluid.Scope() + self._exector_context[executor['name']] = {} + self._exector_context[executor['name']]['scope'] = scope + self._exector_context[executor['name']]['model'] = kagle_model.create(executor) + model = self._exector_context[executor['name']]['model'] + self._metrics.update(model.get_metrics()) + runnnable_scope.append(scope) + runnnable_cost_op.append(model.get_cost_op()) + for var in model._data_var: + if var.name in data_var_name_dict: + continue + data_var_list.append(var) + data_var_name_dict[var.name] = var + + optimizer = kagle_model.FluidModel.build_optimizer({ + 'metrics' : self._metrics, + 'optimizer_conf' : self.global_config['optimizer'] + }) + optimizer.minimize(runnnable_cost_op, runnnable_scope) + for executor in self.global_config['executor']: + scope = self._exector_context[executor['name']]['scope'] + model = self._exector_context[executor['name']]['model'] + program = model._build_param['model']['train_program'] + if not executor['is_update_sparse']: + program._fleet_opt["program_configs"][str(id(model.get_cost_op().block.program))]["push_sparse"] = [] + if 'train_thread_num' not in executor: + executor['train_thread_num'] = global_config['train_thread_num'] + with fluid.scope_guard(scope): + self._exe.run(model._build_param['model']['startup_program']) + model.dump_model_program('./') + + #server init done + if fleet.is_server(): + return 0 + + self._dataset = {} + for dataset_item in self.global_config['dataset']['data_list']: + dataset_item['data_vars'] = data_var_list + dataset_item.update(self.global_config['io']['afs']) + dataset_item["batch_size"] = self.global_config['batch_size'] + self._dataset[dataset_item['name']] = kagle_dataset.FluidTimeSplitDataset(dataset_item) + #if config.need_reqi_changeslot and config.reqi_dnn_plugin_day >= last_day and config.reqi_dnn_plugin_pass >= last_pass: + # util.reqi_changeslot(config.hdfs_dnn_plugin_path, join_save_params, common_save_params, update_save_params, scope2, scope3) + fleet.init_worker() + pass + + def print_log(self, log_str, params): + params['index'] = fleet.worker_index() + return kagle_util.print_log(log_str, params) + + def print_global_metrics(self, scope, model, monitor_data, stdout_str): + metrics = model.get_metrics() + metric_calculator = kagle_metric.PaddleAUCMetric(None) + for metric in metrics: + metric_param = {'label' : metric, 'metric_dict' : metrics[metric]} + metric_calculator.calculate(scope, metric_param) + metric_result = metric_calculator.get_result_to_string() + self.print_log(metric_result, {'master': True, 'stdout' : stdout_str}) + monitor_data += metric_result + metric_calculator.clear(scope, metric_param) + + def save_model(self, day, pass_index, base_key): + cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, + {'master': True, 'log_format' : 'save model cost %s sec'}) + model_path = self._path_generator.generate_path('batch_model', {'day': day, 'pass_id': pass_index}) + save_mode = 0 # just save all + if pass_index < 1: #batch_model + save_mode = 3 # unseen_day++, save all + kagle_util.rank0_print("going to save_model %s" % model_path) + fleet.save_persistables(None, model_path, mode=save_mode) + self._train_pass.save_train_progress(day, pass_index, base_key, model_path, is_checkpoint=True) + cost_printer.done() + return model_path + + def save_xbox_model(self, day, pass_index, xbox_base_key, monitor_data): + stdout_str = "" + xbox_patch_id = str(int(time.time())) + kagle_util.rank0_print("begin save delta model") + + model_path = "" + xbox_model_donefile = "" + cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, 'log_format' : 'save xbox model cost %s sec', 'stdout' : stdout_str}) + if pass_index < 1: + save_mode = 2 + xbox_patch_id = xbox_base_key + model_path = self._path_generator.generate_path('xbox_base', {'day' : day}) + xbox_model_donefile = self._path_generator.generate_path('xbox_base_done', {'day' : day}) + else: + save_mode = 1 + model_path = self._path_generator.generate_path('xbox_delta', {'day' : day, 'pass_id':pass_index}) + xbox_model_donefile = self._path_generator.generate_path('xbox_delta_done', {'day' : day}) + total_save_num = fleet.save_persistables(None, model_path, mode=save_mode) + cost_printer.done() + + cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, + 'log_format' : 'save cache model cost %s sec', 'stdout' : stdout_str}) + model_file_handler = kagle_fs.FileHandler(self.global_config['io']['afs']) + if self.global_config['save_cache_model']: + cache_save_num = fleet.save_cache_model(None, model_path, mode=save_mode) + model_file_handler.write( + "file_prefix:part\npart_num:16\nkey_num:%d\n" % cache_save_num, + model_path + '/000_cache/sparse_cache.meta', 'w') + cost_printer.done() + kagle_util.rank0_print("save xbox cache model done, key_num=%s" % cache_save_num) + + save_env_param = { + 'executor': self._exe, + 'save_combine': True + } + cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, + 'log_format' : 'save dense model cost %s sec', 'stdout' : stdout_str}) + for executor in self.global_config['executor']: + if 'layer_for_inference' not in executor: + continue + executor_name = executor['name'] + model = self._exector_context[executor_name]['model'] + save_env_param['inference_list'] = executor['layer_for_inference'] + save_env_param['scope'] = self._exector_context[executor_name]['scope'] + model.dump_inference_param(save_env_param) + for dnn_layer in executor['layer_for_inference']: + model_file_handler.cp(dnn_layer['save_file_name'], + model_path + '/dnn_plugin/' + dnn_layer['save_file_name']) + cost_printer.done() + + xbox_done_info = { + "id" : xbox_patch_id, + "key" : xbox_base_key, + "ins_path" : "", + "ins_tag" : "feasign", + "partition_type" : "2", + "record_count" : "111111", + "monitor_data" : monitor_data, + "mpi_size" : str(fleet.worker_num()), + "input" : model_path.rstrip("/") + "/000", + "job_id" : kagle_util.get_env_value("JOB_ID"), + "job_name" : kagle_util.get_env_value("JOB_NAME") + } + model_file_handler.write(json.dumps(xbox_done_info) + "\n", xbox_model_donefile, 'a') + if pass_index > 0: + self._train_pass.save_train_progress(day, pass_index, xbox_base_key, model_path, is_checkpoint=False) + return stdout_str + + def run_executor(self, executor_config, dataset, stdout_str): + day = self._train_pass.date() + pass_id = self._train_pass._pass_id + xbox_base_key = self._train_pass._base_key + executor_name = executor_config['name'] + scope = self._exector_context[executor_name]['scope'] + model = self._exector_context[executor_name]['model'] + with fluid.scope_guard(scope): + kagle_util.rank0_print("Begin " + executor_name + " pass") + begin = time.time() + program = model._build_param['model']['train_program'] + self._exe.train_from_dataset(program, dataset, scope, + thread=executor_config['train_thread_num'], debug=self.global_config['debug']) + end = time.time() + local_cost = (end-begin) / 60.0 + avg_cost = kagle_util.worker_numric_avg(local_cost) + min_cost = kagle_util.worker_numric_min(local_cost) + max_cost = kagle_util.worker_numric_max(local_cost) + kagle_util.rank0_print("avg train time %s mins, min %s mins, max %s mins" % (avg_cost, min_cost, max_cost)) + self._exector_context[executor_name]['cost'] = max_cost + + monitor_data = "" + self.print_global_metrics(scope, model, monitor_data, stdout_str) + kagle_util.rank0_print("End " + executor_name + " pass") + if self._train_pass.need_dump_inference(pass_id) and executor_config['dump_inference_model']: + stdout_str += self.save_xbox_model(day, pass_id, xbox_base_key, monitor_data) + + def startup(self, context): + if fleet.is_server(): + fleet.run_server() + context['status'] = 'wait' + return + stdout_str = "" + self._train_pass = kagle_util.TimeTrainPass(self.global_config) + if not self.global_config['cold_start']: + cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, + {'master': True, 'log_format' : 'load model cost %s sec', 'stdout' : stdout_str}) + self.print_log("going to load model %s" % self._train_pass._checkpoint_model_path, {'master': True}) + #if config.need_reqi_changeslot and config.reqi_dnn_plugin_day >= self._train_pass.date() + # and config.reqi_dnn_plugin_pass >= self._pass_id: + # fleet.load_one_table(0, self._train_pass._checkpoint_model_path) + #else: + fleet.init_server(self._train_pass._checkpoint_model_path, mode=0) + cost_printer.done() + if self.global_config['save_first_base']: + self.print_log("save_first_base=True", {'master': True}) + self.print_log("going to save xbox base model", {'master': True, 'stdout' : stdout_str}) + self._train_pass._base_key = int(time.time()) + stdout_str += self.save_xbox_model(day, 0, self._train_pass._base_key, "") + context['status'] = 'begin_day' + + def begin_day(self, context): + stdout_str = "" + if not self._train_pass.next(): + context['is_exit'] = True + day = self._train_pass.date() + pass_id = self._train_pass._pass_id + self.print_log("======== BEGIN DAY:%s ========" % day, {'master': True, 'stdout' : stdout_str}) + if pass_id == self._train_pass.max_pass_num_day(): + context['status'] = 'end_day' + else: + context['status'] = 'train_pass' + + def end_day(self, context): + day = self._train_pass.date() + pass_id = self._train_pass._pass_id + xbox_base_key = int(time.time()) + context['status'] = 'begin_day' + + kagle_util.rank0_print("shrink table") + cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, + {'master': True, 'log_format' : 'shrink table done, cost %s sec'}) + fleet.shrink_sparse_table() + for executor in self._exector_context: + self._exector_context[executor]['model'].shrink({ + 'scope': self._exector_context[executor]['scope'], + 'decay': self.global_config['optimizer']['dense_decay_rate'] + }) + cost_printer.done() + + next_date = self._train_pass.date(delta_day=1) + kagle_util.rank0_print("going to save xbox base model") + self.save_xbox_model(next_date, 0, xbox_base_key, "") + kagle_util.rank0_print("going to save batch model") + self.save_model(next_date, 0, xbox_base_key) + self._train_pass._base_key = xbox_base_key + + def train_pass(self, context): + stdout_str = "" + day = self._train_pass.date() + pass_id = self._train_pass._pass_id + base_key = self._train_pass._base_key + pass_time = self._train_pass._current_train_time.strftime("%Y%m%d%H%M") + self.print_log(" ==== begin delta:%s ========" % pass_id, {'master': True, 'stdout' : stdout_str}) + train_begin_time = time.time() + + cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, 'log_format' : 'load into memory done, cost %s sec', 'stdout' : stdout_str}) + current_dataset = {} + for name in self._dataset: + current_dataset[name] = self._dataset[name].load_dataset({ + 'node_num': fleet.worker_num(), 'node_idx': fleet.worker_index(), + 'begin_time': pass_time, 'time_window_min': self._train_pass._interval_per_pass + }) + cost_printer.done() + + kagle_util.rank0_print("going to global shuffle") + cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, { + 'master': True, 'stdout' : stdout_str, + 'log_format' : 'global shuffle done, cost %s sec'}) + for name in current_dataset: + current_dataset[name].global_shuffle(fleet, self.global_config['dataset']['shuffle_thread']) + cost_printer.done() + # str(dataset.get_shuffle_data_size(fleet)) + + if self.global_config['prefetch_data']: + next_pass_time = (self._train_pass._current_train_time + + datetime.timedelta(minutes=self._train_pass._interval_per_pass)).strftime("%Y%m%d%H%M") + for name in self._dataset: + self._dataset[name].preload_dataset({ + 'node_num': fleet.worker_num(), 'node_idx': fleet.worker_index(), + 'begin_time': next_pass_time, 'time_window_min': self._train_pass._interval_per_pass + }) + + pure_train_begin = time.time() + for executor in self.global_config['executor']: + self.run_executor(executor, current_dataset[executor['dataset_name']], stdout_str) + cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, 'log_format' : 'release_memory cost %s sec'}) + for name in current_dataset: + current_dataset[name].release_memory() + pure_train_cost = time.time() - pure_train_begin + + if self._train_pass.is_checkpoint_pass(pass_id): + self.save_model(day, pass_id, base_key) + + train_end_time = time.time() + train_cost = train_end_time - train_begin_time + other_cost = train_cost - pure_train_cost + log_str = "finished train day %s pass %s time cost:%s sec job time cost:" % (day, pass_id, train_cost) + for executor in self._exector_context: + log_str += '[' + executor + ':' + str(self._exector_context[executor]['cost']) + ']' + log_str += '[other_cost:' + str(other_cost) + ']' + kagle_util.rank0_print(log_str) + stdout_str += kagle_util.now_time_str() + log_str + sys.stdout.write(stdout_str) + stdout_str = "" + if pass_id == self._train_pass.max_pass_num_day(): + context['status'] = 'end_day' + return + elif not self._train_pass.next(): + context['is_exit'] = True diff --git a/kagle/kagle_util.py b/kagle/kagle_util.py new file mode 100755 index 00000000..341a0e16 --- /dev/null +++ b/kagle/kagle_util.py @@ -0,0 +1,220 @@ +import os +import sys +import time +import datetime +import kagle_fs +import numpy as np +from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet + +def get_env_value(env_name): + return os.popen("echo -n ${" + env_name + "}").read().strip() + +def now_time_str(): + return "\n" + time.strftime("%Y-%m-%d %H:%M:%S",time.localtime()) + "[0]:" + +def get_absolute_path(path, params): + if path.startswith('afs:') or path.startswith('hdfs:'): + sub_path = path.split('fs:')[1] + if ':' in sub_path: #such as afs://xxx:prot/xxxx + return path + elif 'fs_name' in params: + return params['fs_name'] + sub_path + else: + return path + +def make_datetime(date_str, fmt = None): + if fmt is None: + if len(date_str) == 8: #%Y%m%d + return datetime.datetime.strptime(date_str, '%Y%m%d') + if len(date_str) == 12: #%Y%m%d%H%M + return datetime.datetime.strptime(date_str, '%Y%m%d%H%M') + return datetime.datetime.strptime(date_str, fmt) + + +def wroker_numric_opt(value, opt): + local_value = np.array([value]) + global_value = np.copy(local_value) * 0 + fleet._role_maker._node_type_comm.Allreduce(local_value, global_value, op=opt) + return global_value[0] + +def worker_numric_sum(value): + from mpi4py import MPI + return wroker_numric_opt(value, MPI.SUM) +def worker_numric_avg(value): + return worker_numric_sum(value) / fleet.worker_num() +def worker_numric_min(value): + from mpi4py import MPI + return wroker_numric_opt(value, MPI.MIN) +def worker_numric_max(value): + from mpi4py import MPI + return wroker_numric_opt(value, MPI.MAX) + + +def rank0_print(log_str): + print_log(log_str, {'master': True}) + +def print_log(log_str, params): + if params['master']: + if fleet.worker_index() == 0: + print(log_str) + sys.stdout.flush() + else: + print(log_str) + if 'stdout' in params: + params['stdout'] += str(datetime.datetime.now()) + log_str + +def print_cost(cost, params): + log_str = params['log_format'] % cost + print_log(log_str, params) + return log_str + + +class CostPrinter: + def __init__(self, callback, callback_params): + self.reset(callback, callback_params) + pass + + def __del__(self): + if not self._done: + self.done() + pass + + def reset(self, callback, callback_params): + self._done = False + self._callback = callback + self._callback_params = callback_params + self._begin_time = time.time() + pass + + def done(self): + cost = time.time() - self._begin_time + log_str = self._callback(cost, self._callback_params) #cost(s) + self._done = True + return cost, log_str + +class PathGenerator: + def __init__(self, config): + self._templates = {} + self.add_path_template(config) + pass + + def add_path_template(self, config): + if 'templates' in config: + for template in config['templates']: + self._templates[template['name']] = template['template'] + pass + + def generate_path(self, template_name, param): + if template_name in self._templates: + if 'time_format' in param: + str = param['time_format'].strftime(self._templates[template_name]) + return str.format(**param) + return self._templates[template_name].format(**param) + else: + return "" + +class TimeTrainPass: + def __init__(self, global_config): + self._config = global_config['epoch'] + if '+' in self._config['days']: + day_str = self._config['days'].replace(' ', '') + day_fields = day_str.split('+') + self._begin_day = make_datetime(day_fields[0].strip()) + if len(day_fields) == 1 or len(day_fields[1]) == 0: + #100 years, meaning to continuous running + self._end_day = self._begin_day + datetime.timedelta(days=36500) + else: + # example: 2020212+10 + run_day = int(day_fields[1].strip()) + self._end_day =self._begin_day + datetime.timedelta(days=run_day) + else: + # example: {20191001..20191031} + days = os.popen("echo -n " + self._config['days']).read().split(" ") + self._begin_day = make_datetime(days[0]) + self._end_day = make_datetime(days[len(days) - 1]) + self._checkpoint_interval = self._config['checkpoint_interval'] + self._dump_inference_interval = self._config['dump_inference_interval'] + self._interval_per_pass = self._config['train_time_interval'] #train N min data per pass + + self._pass_id = 0 + self._inference_pass_id = 0 + self._pass_donefile_handler = None + if 'pass_donefile_name' in self._config: + self._train_pass_donefile = global_config['output_path'] + '/' + self._config['pass_donefile_name'] + if kagle_fs.is_afs_path(self._train_pass_donefile): + self._pass_donefile_handler = kagle_fs.FileHandler(global_config['io']['afs']) + else: + self._pass_donefile_handler = kagle_fs.FileHandler(global_config['io']['local_fs']) + + last_done = self._pass_donefile_handler.cat(self._train_pass_donefile).strip().split('\n')[-1] + done_fileds = last_done.split('\t') + if len(done_fileds) > 4: + self._base_key = done_fileds[1] + self._checkpoint_model_path = done_fileds[2] + self._checkpoint_pass_id = int(done_fileds[3]) + self._inference_pass_id = int(done_fileds[4]) + self.init_pass_by_id(done_fileds[0], self._checkpoint_pass_id) + + def max_pass_num_day(self): + return 24 * 60 / self._interval_per_pass + + def save_train_progress(self, day, pass_id, base_key, model_path, is_checkpoint): + if is_checkpoint: + self._checkpoint_pass_id = pass_id + self._checkpoint_model_path = model_path + done_content = "%s\t%s\t%s\t%s\t%d\n" % (day, base_key, + self._checkpoint_model_path, self._checkpoint_pass_id, pass_id) + self._pass_donefile_handler.write(done_content, self._train_pass_donefile, 'a') + pass + + def init_pass_by_id(self, date_str, pass_id): + date_time = make_datetime(date_str) + if pass_id < 1: + pass_id = 0 + if (date_time - self._begin_day).total_seconds() > 0: + self._begin_day = date_time + self._pass_id = pass_id + mins = self._interval_per_pass * (pass_id - 1) + self._current_train_time = date_time + datetime.timedelta(minutes=mins) + print(self._current_train_time) + + def init_pass_by_time(self, datetime_str): + self._current_train_time = make_datetime(datetime_str) + minus = self._current_train_time.hour * 60 + self._current_train_time.minute; + self._pass_id = minus / self._interval_per_pass + 1 + + def current_pass(): + return self._pass_id + + def next(self): + has_next = True + old_pass_id = self._pass_id + if self._pass_id < 1: + self.init_pass_by_time(self._begin_day.strftime("%Y%m%d%H%M")) + else: + next_time = self._current_train_time + datetime.timedelta(minutes=self._interval_per_pass) + if (next_time - self._end_day).total_seconds() > 0: + has_next = False + else: + self.init_pass_by_time(next_time.strftime("%Y%m%d%H%M")) + if has_next and (self._inference_pass_id < self._pass_id or self._pass_id < old_pass_id): + self._inference_pass_id = self._pass_id - 1 + return has_next + + def is_checkpoint_pass(self, pass_id): + if pass_id < 1: + return True + if pass_id == self.max_pass_num_day(): + return False + if pass_id % self._checkpoint_interval == 0: + return True + return False + + def need_dump_inference(self, pass_id): + return self._inference_pass_id < pass_id and pass_id % self._dump_inference_interval == 0 + + def date(self, delta_day=0): + return (self._current_train_time + datetime.timedelta(days=delta_day)).strftime("%Y%m%d") + + def timestamp(self, delta_day=0): + return (self._current_train_time + datetime.timedelta(days=delta_day)).timestamp() diff --git a/kagle/trainer/__init__.py b/kagle/trainer/__init__.py new file mode 100755 index 00000000..e69de29b diff --git a/kagle/trainer/abacus_trainer.py b/kagle/trainer/abacus_trainer.py new file mode 100755 index 00000000..a052d419 --- /dev/null +++ b/kagle/trainer/abacus_trainer.py @@ -0,0 +1,345 @@ +import sys +import copy +import yaml +import time +import json +import datetime +import kagle_trainer +from .. import kagle_fs +from .. import kagle_util +from .. import kagle_model +from .. import kagle_metric +from .. import kagle_dataset +import paddle.fluid as fluid +from abc import ABCMeta, abstractmethod +from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet + +class AbacusPaddleTrainer(kagle_trainer.Trainer): + def __init__(self, config): + kagle_trainer.Trainer.__init__(self, config) + config['output_path'] = kagle_util.get_absolute_path( + config['output_path'], config['io']['afs']) + self.global_config = config + self._place = fluid.CPUPlace() + self._exe = fluid.Executor(self._place) + self._exector_context = {} + self._metrics = {} + self._path_generator = kagle_util.PathGenerator({ + 'templates' : [ + {'name': 'xbox_base_done', 'template': config['output_path'] + '/xbox_base_done.txt'}, + {'name': 'xbox_delta_done', 'template': config['output_path'] + '/xbox_patch_done.txt'}, + {'name': 'xbox_base', 'template': config['output_path'] + '/xbox/{day}/base/'}, + {'name': 'xbox_delta', 'template': config['output_path'] + '/xbox/{day}/delta-{pass_id}/'}, + {'name': 'batch_model', 'template': config['output_path'] + '/batch_model/{day}/{pass_id}/'} + ] + }) + if 'path_generator' in config: + self._path_generator.add_path_template(config['path_generator']) + + self.regist_context_processor('uninit', self.init) + self.regist_context_processor('startup', self.startup) + self.regist_context_processor('begin_day', self.begin_day) + self.regist_context_processor('train_pass', self.train_pass) + self.regist_context_processor('end_day', self.end_day) + + def init(self, context): + fleet.init(self._exe) + data_var_list = [] + data_var_name_dict = {} + runnnable_scope = [] + runnnable_cost_op = [] + context['status'] = 'startup' + + for executor in self.global_config['executor']: + scope = fluid.Scope() + self._exector_context[executor['name']] = {} + self._exector_context[executor['name']]['scope'] = scope + self._exector_context[executor['name']]['model'] = kagle_model.create(executor) + model = self._exector_context[executor['name']]['model'] + self._metrics.update(model.get_metrics()) + runnnable_scope.append(scope) + runnnable_cost_op.append(model.get_cost_op()) + for var in model._data_var: + if var.name in data_var_name_dict: + continue + data_var_list.append(var) + data_var_name_dict[var.name] = var + + optimizer = kagle_model.FluidModel.build_optimizer({ + 'metrics' : self._metrics, + 'optimizer_conf' : self.global_config['optimizer'] + }) + optimizer.minimize(runnnable_cost_op, runnnable_scope) + for executor in self.global_config['executor']: + scope = self._exector_context[executor['name']]['scope'] + model = self._exector_context[executor['name']]['model'] + program = model._build_param['model']['train_program'] + if not executor['is_update_sparse']: + program._fleet_opt["program_configs"][str(id(model.get_cost_op().block.program))]["push_sparse"] = [] + if 'train_thread_num' not in executor: + executor['train_thread_num'] = global_config['train_thread_num'] + with fluid.scope_guard(scope): + self._exe.run(model._build_param['model']['startup_program']) + model.dump_model_program('./') + + #server init done + if fleet.is_server(): + return 0 + + self._dataset = {} + for dataset_item in self.global_config['dataset']['data_list']: + dataset_item['data_vars'] = data_var_list + dataset_item.update(self.global_config['io']['afs']) + dataset_item["batch_size"] = self.global_config['batch_size'] + self._dataset[dataset_item['name']] = kagle_dataset.FluidTimeSplitDataset(dataset_item) + #if config.need_reqi_changeslot and config.reqi_dnn_plugin_day >= last_day and config.reqi_dnn_plugin_pass >= last_pass: + # util.reqi_changeslot(config.hdfs_dnn_plugin_path, join_save_params, common_save_params, update_save_params, scope2, scope3) + fleet.init_worker() + pass + + def print_log(self, log_str, params): + params['index'] = fleet.worker_index() + return kagle_util.print_log(log_str, params) + + def print_global_metrics(self, scope, model, monitor_data, stdout_str): + metrics = model.get_metrics() + metric_calculator = kagle_metric.PaddleAUCMetric(None) + for metric in metrics: + metric_param = {'label' : metric, 'metric_dict' : metrics[metric]} + metric_calculator.calculate(scope, metric_param) + metric_result = metric_calculator.get_result_to_string() + self.print_log(metric_result, {'master': True, 'stdout' : stdout_str}) + monitor_data += metric_result + metric_calculator.clear(scope, metric_param) + + def save_model(self, day, pass_index, base_key): + cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, + {'master': True, 'log_format' : 'save model cost %s sec'}) + model_path = self._path_generator.generate_path('batch_model', {'day': day, 'pass_id': pass_index}) + save_mode = 0 # just save all + if pass_index < 1: #batch_model + save_mode = 3 # unseen_day++, save all + kagle_util.rank0_print("going to save_model %s" % model_path) + fleet.save_persistables(None, model_path, mode=save_mode) + self._train_pass.save_train_progress(day, pass_index, base_key, model_path, is_checkpoint=True) + cost_printer.done() + return model_path + + def save_xbox_model(self, day, pass_index, xbox_base_key, monitor_data): + stdout_str = "" + xbox_patch_id = str(int(time.time())) + kagle_util.rank0_print("begin save delta model") + + model_path = "" + xbox_model_donefile = "" + cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, 'log_format' : 'save xbox model cost %s sec', 'stdout' : stdout_str}) + if pass_index < 1: + save_mode = 2 + xbox_patch_id = xbox_base_key + model_path = self._path_generator.generate_path('xbox_base', {'day' : day}) + xbox_model_donefile = self._path_generator.generate_path('xbox_base_done', {'day' : day}) + else: + save_mode = 1 + model_path = self._path_generator.generate_path('xbox_delta', {'day' : day, 'pass_id':pass_index}) + xbox_model_donefile = self._path_generator.generate_path('xbox_delta_done', {'day' : day}) + total_save_num = fleet.save_persistables(None, model_path, mode=save_mode) + cost_printer.done() + + cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, + 'log_format' : 'save cache model cost %s sec', 'stdout' : stdout_str}) + model_file_handler = kagle_fs.FileHandler(self.global_config['io']['afs']) + if self.global_config['save_cache_model']: + cache_save_num = fleet.save_cache_model(None, model_path, mode=save_mode) + model_file_handler.write( + "file_prefix:part\npart_num:16\nkey_num:%d\n" % cache_save_num, + model_path + '/000_cache/sparse_cache.meta', 'w') + cost_printer.done() + kagle_util.rank0_print("save xbox cache model done, key_num=%s" % cache_save_num) + + save_env_param = { + 'executor': self._exe, + 'save_combine': True + } + cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, + 'log_format' : 'save dense model cost %s sec', 'stdout' : stdout_str}) + for executor in self.global_config['executor']: + if 'layer_for_inference' not in executor: + continue + executor_name = executor['name'] + model = self._exector_context[executor_name]['model'] + save_env_param['inference_list'] = executor['layer_for_inference'] + save_env_param['scope'] = self._exector_context[executor_name]['scope'] + model.dump_inference_param(save_env_param) + for dnn_layer in executor['layer_for_inference']: + model_file_handler.cp(dnn_layer['save_file_name'], + model_path + '/dnn_plugin/' + dnn_layer['save_file_name']) + cost_printer.done() + + xbox_done_info = { + "id" : xbox_patch_id, + "key" : xbox_base_key, + "ins_path" : "", + "ins_tag" : "feasign", + "partition_type" : "2", + "record_count" : "111111", + "monitor_data" : monitor_data, + "mpi_size" : str(fleet.worker_num()), + "input" : model_path.rstrip("/") + "/000", + "job_id" : kagle_util.get_env_value("JOB_ID"), + "job_name" : kagle_util.get_env_value("JOB_NAME") + } + model_file_handler.write(json.dumps(xbox_done_info) + "\n", xbox_model_donefile, 'a') + if pass_index > 0: + self._train_pass.save_train_progress(day, pass_index, xbox_base_key, model_path, is_checkpoint=False) + return stdout_str + + def run_executor(self, executor_config, dataset, stdout_str): + day = self._train_pass.date() + pass_id = self._train_pass._pass_id + xbox_base_key = self._train_pass._base_key + executor_name = executor_config['name'] + scope = self._exector_context[executor_name]['scope'] + model = self._exector_context[executor_name]['model'] + with fluid.scope_guard(scope): + kagle_util.rank0_print("Begin " + executor_name + " pass") + begin = time.time() + program = model._build_param['model']['train_program'] + self._exe.train_from_dataset(program, dataset, scope, + thread=executor_config['train_thread_num'], debug=self.global_config['debug']) + end = time.time() + local_cost = (end-begin) / 60.0 + avg_cost = kagle_util.worker_numric_avg(local_cost) + min_cost = kagle_util.worker_numric_min(local_cost) + max_cost = kagle_util.worker_numric_max(local_cost) + kagle_util.rank0_print("avg train time %s mins, min %s mins, max %s mins" % (avg_cost, min_cost, max_cost)) + self._exector_context[executor_name]['cost'] = max_cost + + monitor_data = "" + self.print_global_metrics(scope, model, monitor_data, stdout_str) + kagle_util.rank0_print("End " + executor_name + " pass") + if self._train_pass.need_dump_inference(pass_id) and executor_config['dump_inference_model']: + stdout_str += self.save_xbox_model(day, pass_id, xbox_base_key, monitor_data) + + def startup(self, context): + if fleet.is_server(): + fleet.run_server() + context['status'] = 'wait' + return + stdout_str = "" + self._train_pass = kagle_util.TimeTrainPass(self.global_config) + if not self.global_config['cold_start']: + cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, + {'master': True, 'log_format' : 'load model cost %s sec', 'stdout' : stdout_str}) + self.print_log("going to load model %s" % self._train_pass._checkpoint_model_path, {'master': True}) + #if config.need_reqi_changeslot and config.reqi_dnn_plugin_day >= self._train_pass.date() + # and config.reqi_dnn_plugin_pass >= self._pass_id: + # fleet.load_one_table(0, self._train_pass._checkpoint_model_path) + #else: + fleet.init_server(self._train_pass._checkpoint_model_path, mode=0) + cost_printer.done() + if self.global_config['save_first_base']: + self.print_log("save_first_base=True", {'master': True}) + self.print_log("going to save xbox base model", {'master': True, 'stdout' : stdout_str}) + self._train_pass._base_key = int(time.time()) + stdout_str += self.save_xbox_model(day, 0, self._train_pass._base_key, "") + context['status'] = 'begin_day' + + def begin_day(self, context): + stdout_str = "" + if not self._train_pass.next(): + context['is_exit'] = True + day = self._train_pass.date() + pass_id = self._train_pass._pass_id + self.print_log("======== BEGIN DAY:%s ========" % day, {'master': True, 'stdout' : stdout_str}) + if pass_id == self._train_pass.max_pass_num_day(): + context['status'] = 'end_day' + else: + context['status'] = 'train_pass' + + def end_day(self, context): + day = self._train_pass.date() + pass_id = self._train_pass._pass_id + xbox_base_key = int(time.time()) + context['status'] = 'begin_day' + + kagle_util.rank0_print("shrink table") + cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, + {'master': True, 'log_format' : 'shrink table done, cost %s sec'}) + fleet.shrink_sparse_table() + for executor in self._exector_context: + self._exector_context[executor]['model'].shrink({ + 'scope': self._exector_context[executor]['scope'], + 'decay': self.global_config['optimizer']['dense_decay_rate'] + }) + cost_printer.done() + + next_date = self._train_pass.date(delta_day=1) + kagle_util.rank0_print("going to save xbox base model") + self.save_xbox_model(next_date, 0, xbox_base_key, "") + kagle_util.rank0_print("going to save batch model") + self.save_model(next_date, 0, xbox_base_key) + self._train_pass._base_key = xbox_base_key + + def train_pass(self, context): + stdout_str = "" + day = self._train_pass.date() + pass_id = self._train_pass._pass_id + base_key = self._train_pass._base_key + pass_time = self._train_pass._current_train_time.strftime("%Y%m%d%H%M") + self.print_log(" ==== begin delta:%s ========" % pass_id, {'master': True, 'stdout' : stdout_str}) + train_begin_time = time.time() + + cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, 'log_format' : 'load into memory done, cost %s sec', 'stdout' : stdout_str}) + current_dataset = {} + for name in self._dataset: + current_dataset[name] = self._dataset[name].load_dataset({ + 'node_num': fleet.worker_num(), 'node_idx': fleet.worker_index(), + 'begin_time': pass_time, 'time_window_min': self._train_pass._interval_per_pass + }) + cost_printer.done() + + kagle_util.rank0_print("going to global shuffle") + cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, { + 'master': True, 'stdout' : stdout_str, + 'log_format' : 'global shuffle done, cost %s sec'}) + for name in current_dataset: + current_dataset[name].global_shuffle(fleet, self.global_config['dataset']['shuffle_thread']) + cost_printer.done() + # str(dataset.get_shuffle_data_size(fleet)) + + if self.global_config['prefetch_data']: + next_pass_time = (self._train_pass._current_train_time + + datetime.timedelta(minutes=self._train_pass._interval_per_pass)).strftime("%Y%m%d%H%M") + for name in self._dataset: + self._dataset[name].preload_dataset({ + 'node_num': fleet.worker_num(), 'node_idx': fleet.worker_index(), + 'begin_time': next_pass_time, 'time_window_min': self._train_pass._interval_per_pass + }) + + pure_train_begin = time.time() + for executor in self.global_config['executor']: + self.run_executor(executor, current_dataset[executor['dataset_name']], stdout_str) + cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, 'log_format' : 'release_memory cost %s sec'}) + for name in current_dataset: + current_dataset[name].release_memory() + pure_train_cost = time.time() - pure_train_begin + + if self._train_pass.is_checkpoint_pass(pass_id): + self.save_model(day, pass_id, base_key) + + train_end_time = time.time() + train_cost = train_end_time - train_begin_time + other_cost = train_cost - pure_train_cost + log_str = "finished train day %s pass %s time cost:%s sec job time cost:" % (day, pass_id, train_cost) + for executor in self._exector_context: + log_str += '[' + executor + ':' + str(self._exector_context[executor]['cost']) + ']' + log_str += '[other_cost:' + str(other_cost) + ']' + kagle_util.rank0_print(log_str) + stdout_str += kagle_util.now_time_str() + log_str + sys.stdout.write(stdout_str) + stdout_str = "" + if pass_id == self._train_pass.max_pass_num_day(): + context['status'] = 'end_day' + return + elif not self._train_pass.next(): + context['is_exit'] = True diff --git a/kagle/trainer/kagle_trainer.py b/kagle/trainer/kagle_trainer.py new file mode 100755 index 00000000..a67267a1 --- /dev/null +++ b/kagle/trainer/kagle_trainer.py @@ -0,0 +1,32 @@ +import sys +import time +from abc import ABCMeta, abstractmethod + +class Trainer(object): + __metaclass__=ABCMeta + def __init__(self, config): + self._status_processor = {} + self._context = {'status': 'uninit', 'is_exit': False} + + def regist_context_processor(self, status_name, processor): + self._status_processor[status_name] = processor + + def context_process(self, context): + if context['status'] in self._status_processor: + self._status_processor[context['status']](context) + else: + self.other_status_processor(context) + + def other_status_processor(self, context): + print('unknow context_status:%s, do nothing' % context['status']) + time.sleep(60) + + def reload_train_context(self): + pass + + def run(self): + while True: + self.reload_train_context() + self.context_process(self._context) + if self._context['is_exit']: + break -- GitLab