diff --git a/kagle/kagle_model.py b/kagle/kagle_model.py index 41d90907bafab242509947e87a446b507da5fb62..e94d8f78ae26c53674f2e6e2c53e176738ea85cc 100755 --- a/kagle/kagle_model.py +++ b/kagle/kagle_model.py @@ -77,6 +77,7 @@ class Model(object): """R """ pass + @abc.abstractmethod def dump_inference_program(self, inference_layer, path): """R @@ -101,7 +102,8 @@ class Model(object): if node['name'] not in self._inference_meta['dependency'][layer]: continue if 'inference_param' in self._build_param['layer_extend'][node['name']]: - self._inference_meta['params'][layer] += self._build_param['layer_extend'][node['name']]['inference_param']['params'] + self._inference_meta['params'][layer] += \ + self._build_param['layer_extend'][node['name']]['inference_param']['params'] return self._inference_meta['params'][layer] def get_dependency(self, layer_graph, dest_layer): @@ -192,10 +194,10 @@ class FluidModel(Model): metrics = params['metrics'] for name in metrics: model_metrics = metrics[name] - stat_var_names += [ model_metrics[metric]['var'].name for metric in model_metrics] + stat_var_names += [model_metrics[metric]['var'].name for metric in model_metrics] strategy['stat_var_names'] = list(set(stat_var_names)) optimizer_generator = 'optimizer = fluid.optimizer.' + optimizer_conf['class'] + \ - '(learning_rate=' + str(optimizer_conf['learning_rate']) + ')' + '(learning_rate=' + str(optimizer_conf['learning_rate']) + ')' exec(optimizer_generator) optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) return optimizer @@ -233,12 +235,12 @@ class FluidModel(Model): fleet._fleet_ptr.pull_dense(scope, table['_meta']._table_id, table['params']) for infernce_item in params['inference_list']: params_name_list = self.inference_params(infernce_item['layer_name']) - params_var_list = [ program.global_block().var(i) for i in params_name_list ] + params_var_list = [program.global_block().var(i) for i in params_name_list] params_file_name = infernce_item['save_file_name'] with fluid.scope_guard(scope): if params['save_combine']: - fluid.io.save_vars( - executor, "./", program, vars=params_var_list, filename=params_file_name) + fluid.io.save_vars(executor, "./", \ + program, vars=params_var_list, filename=params_file_name) else: fluid.io.save_vars(executor, params_file_name, program, vars=params_var_list) pass diff --git a/kagle/kagle_util.py b/kagle/kagle_util.py index 341a0e162762a10d8f29581eff0d196298fbcd13..11b1156ebd3bf155b5f78dcce65702c45d9580dc 100755 --- a/kagle/kagle_util.py +++ b/kagle/kagle_util.py @@ -1,18 +1,29 @@ +""" +Util lib +""" import os import sys import time import datetime -import kagle_fs +import kagle.kagle_fs import numpy as np from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet def get_env_value(env_name): + """ + get os environment value + """ return os.popen("echo -n ${" + env_name + "}").read().strip() def now_time_str(): - return "\n" + time.strftime("%Y-%m-%d %H:%M:%S",time.localtime()) + "[0]:" + """ + get current format str_time + """ + return "\n" + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "[0]:" def get_absolute_path(path, params): + """R + """ if path.startswith('afs:') or path.startswith('hdfs:'): sub_path = path.split('fs:')[1] if ':' in sub_path: #such as afs://xxx:prot/xxxx @@ -23,6 +34,14 @@ def get_absolute_path(path, params): return path def make_datetime(date_str, fmt = None): + """ + create a datetime instance by date_string + Args: + date_str: such as 2020-01-14 + date_str_format: "%Y-%m-%d" + Return: + datetime + """ if fmt is None: if len(date_str) == 8: #%Y%m%d return datetime.datetime.strptime(date_str, '%Y%m%d') @@ -32,28 +51,51 @@ def make_datetime(date_str, fmt = None): def wroker_numric_opt(value, opt): + """ + numric count opt for workers + Args: + value: value for count + opt: count operator, SUM/MAX/MIN/AVG + Return: + count result + """ local_value = np.array([value]) global_value = np.copy(local_value) * 0 fleet._role_maker._node_type_comm.Allreduce(local_value, global_value, op=opt) return global_value[0] def worker_numric_sum(value): + """R + """ from mpi4py import MPI return wroker_numric_opt(value, MPI.SUM) + def worker_numric_avg(value): + """R + """ return worker_numric_sum(value) / fleet.worker_num() + def worker_numric_min(value): + """R + """ from mpi4py import MPI return wroker_numric_opt(value, MPI.MIN) + def worker_numric_max(value): + """R + """ from mpi4py import MPI return wroker_numric_opt(value, MPI.MAX) def rank0_print(log_str): + """R + """ print_log(log_str, {'master': True}) def print_log(log_str, params): + """R + """ if params['master']: if fleet.worker_index() == 0: print(log_str) @@ -64,22 +106,33 @@ def print_log(log_str, params): params['stdout'] += str(datetime.datetime.now()) + log_str def print_cost(cost, params): + """R + """ log_str = params['log_format'] % cost print_log(log_str, params) return log_str class CostPrinter: + """ + For count cost time && print cost log + """ def __init__(self, callback, callback_params): + """R + """ self.reset(callback, callback_params) pass def __del__(self): + """R + """ if not self._done: self.done() pass def reset(self, callback, callback_params): + """R + """ self._done = False self._callback = callback self._callback_params = callback_params @@ -87,24 +140,35 @@ class CostPrinter: pass def done(self): + """R + """ cost = time.time() - self._begin_time log_str = self._callback(cost, self._callback_params) #cost(s) self._done = True return cost, log_str class PathGenerator: + """ + generate path with template & runtime variables + """ def __init__(self, config): + """R + """ self._templates = {} self.add_path_template(config) pass def add_path_template(self, config): + """R + """ if 'templates' in config: for template in config['templates']: self._templates[template['name']] = template['template'] pass def generate_path(self, template_name, param): + """R + """ if template_name in self._templates: if 'time_format' in param: str = param['time_format'].strftime(self._templates[template_name]) @@ -113,8 +177,15 @@ class PathGenerator: else: return "" + class TimeTrainPass: + """ + timely pass + define pass time_interval && start_time && end_time + """ def __init__(self, global_config): + """R + """ self._config = global_config['epoch'] if '+' in self._config['days']: day_str = self._config['days'].replace(' ', '') @@ -156,9 +227,13 @@ class TimeTrainPass: self.init_pass_by_id(done_fileds[0], self._checkpoint_pass_id) def max_pass_num_day(self): + """R + """ return 24 * 60 / self._interval_per_pass def save_train_progress(self, day, pass_id, base_key, model_path, is_checkpoint): + """R + """ if is_checkpoint: self._checkpoint_pass_id = pass_id self._checkpoint_model_path = model_path @@ -168,6 +243,12 @@ class TimeTrainPass: pass def init_pass_by_id(self, date_str, pass_id): + """ + init pass context with pass_id + Args: + date_str: example "20200110" + pass_id(int): pass_id of date + """ date_time = make_datetime(date_str) if pass_id < 1: pass_id = 0 @@ -179,14 +260,23 @@ class TimeTrainPass: print(self._current_train_time) def init_pass_by_time(self, datetime_str): + """ + init pass context with datetime + Args: + date_str: example "20200110000" -> "%Y%m%d%H%M" + """ self._current_train_time = make_datetime(datetime_str) minus = self._current_train_time.hour * 60 + self._current_train_time.minute; self._pass_id = minus / self._interval_per_pass + 1 - def current_pass(): + def current_pass(self): + """R + """ return self._pass_id def next(self): + """R + """ has_next = True old_pass_id = self._pass_id if self._pass_id < 1: @@ -202,6 +292,8 @@ class TimeTrainPass: return has_next def is_checkpoint_pass(self, pass_id): + """R + """ if pass_id < 1: return True if pass_id == self.max_pass_num_day(): @@ -211,10 +303,21 @@ class TimeTrainPass: return False def need_dump_inference(self, pass_id): + """R + """ return self._inference_pass_id < pass_id and pass_id % self._dump_inference_interval == 0 def date(self, delta_day=0): + """ + get train date + Args: + delta_day(int): n day afer current_train_date + Return: + date(current_train_time + delta_day) + """ return (self._current_train_time + datetime.timedelta(days=delta_day)).strftime("%Y%m%d") def timestamp(self, delta_day=0): + """R + """ return (self._current_train_time + datetime.timedelta(days=delta_day)).timestamp() diff --git a/kagle/trainer/abacus_trainer.py b/kagle/trainer/abacus_trainer.py index a052d419df334fc22e42688fc9eaa276abb49c38..c8a38fae8ecacf6d4892b7d2d30264938ec3f530 100755 --- a/kagle/trainer/abacus_trainer.py +++ b/kagle/trainer/abacus_trainer.py @@ -1,21 +1,28 @@ +""" +A paddle trainer Adapt to Abacus +""" +import abc import sys import copy import yaml import time import json import datetime -import kagle_trainer -from .. import kagle_fs -from .. import kagle_util -from .. import kagle_model -from .. import kagle_metric -from .. import kagle_dataset +import kagle.kagle_fs +import kagle.kagle_util +import kagle.kagle_model +import kagle.kagle_metric +import kagle.kagle_dataset +import kagle.trainer.kagle_trainer import paddle.fluid as fluid -from abc import ABCMeta, abstractmethod from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet class AbacusPaddleTrainer(kagle_trainer.Trainer): + """R + """ def __init__(self, config): + """R + """ kagle_trainer.Trainer.__init__(self, config) config['output_path'] = kagle_util.get_absolute_path( config['output_path'], config['io']['afs']) @@ -43,6 +50,8 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): self.regist_context_processor('end_day', self.end_day) def init(self, context): + """R + """ fleet.init(self._exe) data_var_list = [] data_var_name_dict = {} @@ -77,7 +86,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): if not executor['is_update_sparse']: program._fleet_opt["program_configs"][str(id(model.get_cost_op().block.program))]["push_sparse"] = [] if 'train_thread_num' not in executor: - executor['train_thread_num'] = global_config['train_thread_num'] + executor['train_thread_num'] = self.global_config['train_thread_num'] with fluid.scope_guard(scope): self._exe.run(model._build_param['model']['startup_program']) model.dump_model_program('./') @@ -98,23 +107,29 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): pass def print_log(self, log_str, params): + """R + """ params['index'] = fleet.worker_index() return kagle_util.print_log(log_str, params) def print_global_metrics(self, scope, model, monitor_data, stdout_str): + """R + """ metrics = model.get_metrics() metric_calculator = kagle_metric.PaddleAUCMetric(None) for metric in metrics: - metric_param = {'label' : metric, 'metric_dict' : metrics[metric]} + metric_param = {'label': metric, 'metric_dict': metrics[metric]} metric_calculator.calculate(scope, metric_param) metric_result = metric_calculator.get_result_to_string() - self.print_log(metric_result, {'master': True, 'stdout' : stdout_str}) + self.print_log(metric_result, {'master': True, 'stdout': stdout_str}) monitor_data += metric_result metric_calculator.clear(scope, metric_param) def save_model(self, day, pass_index, base_key): + """R + """ cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, - {'master': True, 'log_format' : 'save model cost %s sec'}) + {'master': True, 'log_format': 'save model cost %s sec'}) model_path = self._path_generator.generate_path('batch_model', {'day': day, 'pass_id': pass_index}) save_mode = 0 # just save all if pass_index < 1: #batch_model @@ -126,27 +141,30 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): return model_path def save_xbox_model(self, day, pass_index, xbox_base_key, monitor_data): + """R + """ stdout_str = "" xbox_patch_id = str(int(time.time())) kagle_util.rank0_print("begin save delta model") model_path = "" xbox_model_donefile = "" - cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, 'log_format' : 'save xbox model cost %s sec', 'stdout' : stdout_str}) + cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, \ + 'log_format': 'save xbox model cost %s sec', 'stdout': stdout_str}) if pass_index < 1: save_mode = 2 xbox_patch_id = xbox_base_key - model_path = self._path_generator.generate_path('xbox_base', {'day' : day}) - xbox_model_donefile = self._path_generator.generate_path('xbox_base_done', {'day' : day}) + model_path = self._path_generator.generate_path('xbox_base', {'day': day}) + xbox_model_donefile = self._path_generator.generate_path('xbox_base_done', {'day': day}) else: save_mode = 1 - model_path = self._path_generator.generate_path('xbox_delta', {'day' : day, 'pass_id':pass_index}) - xbox_model_donefile = self._path_generator.generate_path('xbox_delta_done', {'day' : day}) + model_path = self._path_generator.generate_path('xbox_delta', {'day': day, 'pass_id': pass_index}) + xbox_model_donefile = self._path_generator.generate_path('xbox_delta_done', {'day': day}) total_save_num = fleet.save_persistables(None, model_path, mode=save_mode) cost_printer.done() cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, - 'log_format' : 'save cache model cost %s sec', 'stdout' : stdout_str}) + 'log_format': 'save cache model cost %s sec', 'stdout': stdout_str}) model_file_handler = kagle_fs.FileHandler(self.global_config['io']['afs']) if self.global_config['save_cache_model']: cache_save_num = fleet.save_cache_model(None, model_path, mode=save_mode) @@ -161,7 +179,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): 'save_combine': True } cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, - 'log_format' : 'save dense model cost %s sec', 'stdout' : stdout_str}) + 'log_format': 'save dense model cost %s sec', 'stdout': stdout_str}) for executor in self.global_config['executor']: if 'layer_for_inference' not in executor: continue @@ -176,17 +194,17 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): cost_printer.done() xbox_done_info = { - "id" : xbox_patch_id, - "key" : xbox_base_key, - "ins_path" : "", - "ins_tag" : "feasign", - "partition_type" : "2", - "record_count" : "111111", - "monitor_data" : monitor_data, - "mpi_size" : str(fleet.worker_num()), - "input" : model_path.rstrip("/") + "/000", - "job_id" : kagle_util.get_env_value("JOB_ID"), - "job_name" : kagle_util.get_env_value("JOB_NAME") + "id": xbox_patch_id, + "key": xbox_base_key, + "ins_path": "", + "ins_tag": "feasign", + "partition_type": "2", + "record_count": "111111", + "monitor_data": monitor_data, + "mpi_size": str(fleet.worker_num()), + "input": model_path.rstrip("/") + "/000", + "job_id": kagle_util.get_env_value("JOB_ID"), + "job_name": kagle_util.get_env_value("JOB_NAME") } model_file_handler.write(json.dumps(xbox_done_info) + "\n", xbox_model_donefile, 'a') if pass_index > 0: @@ -194,6 +212,8 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): return stdout_str def run_executor(self, executor_config, dataset, stdout_str): + """R + """ day = self._train_pass.date() pass_id = self._train_pass._pass_id xbox_base_key = self._train_pass._base_key @@ -221,6 +241,8 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): stdout_str += self.save_xbox_model(day, pass_id, xbox_base_key, monitor_data) def startup(self, context): + """R + """ if fleet.is_server(): fleet.run_server() context['status'] = 'wait' @@ -239,24 +261,28 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): cost_printer.done() if self.global_config['save_first_base']: self.print_log("save_first_base=True", {'master': True}) - self.print_log("going to save xbox base model", {'master': True, 'stdout' : stdout_str}) + self.print_log("going to save xbox base model", {'master': True, 'stdout': stdout_str}) self._train_pass._base_key = int(time.time()) - stdout_str += self.save_xbox_model(day, 0, self._train_pass._base_key, "") + stdout_str += self.save_xbox_model(self._train_pass.date(), 0, self._train_pass._base_key, "") context['status'] = 'begin_day' def begin_day(self, context): + """R + """ stdout_str = "" if not self._train_pass.next(): context['is_exit'] = True day = self._train_pass.date() pass_id = self._train_pass._pass_id - self.print_log("======== BEGIN DAY:%s ========" % day, {'master': True, 'stdout' : stdout_str}) + self.print_log("======== BEGIN DAY:%s ========" % day, {'master': True, 'stdout': stdout_str}) if pass_id == self._train_pass.max_pass_num_day(): context['status'] = 'end_day' else: context['status'] = 'train_pass' def end_day(self, context): + """R + """ day = self._train_pass.date() pass_id = self._train_pass._pass_id xbox_base_key = int(time.time()) @@ -264,7 +290,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): kagle_util.rank0_print("shrink table") cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, - {'master': True, 'log_format' : 'shrink table done, cost %s sec'}) + {'master': True, 'log_format': 'shrink table done, cost %s sec'}) fleet.shrink_sparse_table() for executor in self._exector_context: self._exector_context[executor]['model'].shrink({ @@ -281,27 +307,29 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): self._train_pass._base_key = xbox_base_key def train_pass(self, context): + """R + """ stdout_str = "" day = self._train_pass.date() pass_id = self._train_pass._pass_id base_key = self._train_pass._base_key pass_time = self._train_pass._current_train_time.strftime("%Y%m%d%H%M") - self.print_log(" ==== begin delta:%s ========" % pass_id, {'master': True, 'stdout' : stdout_str}) + self.print_log(" ==== begin delta:%s ========" % pass_id, {'master': True, 'stdout': stdout_str}) train_begin_time = time.time() - cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, 'log_format' : 'load into memory done, cost %s sec', 'stdout' : stdout_str}) + cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, 'log_format': 'load into memory done, cost %s sec', 'stdout': stdout_str}) current_dataset = {} for name in self._dataset: current_dataset[name] = self._dataset[name].load_dataset({ 'node_num': fleet.worker_num(), 'node_idx': fleet.worker_index(), - 'begin_time': pass_time, 'time_window_min': self._train_pass._interval_per_pass + 'begin_time': pass_time, 'time_window_min': self._train_pass._interval_per_pass }) cost_printer.done() kagle_util.rank0_print("going to global shuffle") cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, { - 'master': True, 'stdout' : stdout_str, - 'log_format' : 'global shuffle done, cost %s sec'}) + 'master': True, 'stdout': stdout_str, + 'log_format': 'global shuffle done, cost %s sec'}) for name in current_dataset: current_dataset[name].global_shuffle(fleet, self.global_config['dataset']['shuffle_thread']) cost_printer.done() @@ -313,13 +341,14 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): for name in self._dataset: self._dataset[name].preload_dataset({ 'node_num': fleet.worker_num(), 'node_idx': fleet.worker_index(), - 'begin_time': next_pass_time, 'time_window_min': self._train_pass._interval_per_pass + 'begin_time': next_pass_time, 'time_window_min': self._train_pass._interval_per_pass }) pure_train_begin = time.time() for executor in self.global_config['executor']: self.run_executor(executor, current_dataset[executor['dataset_name']], stdout_str) - cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, 'log_format' : 'release_memory cost %s sec'}) + cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, \ + {'master': True, 'log_format': 'release_memory cost %s sec'}) for name in current_dataset: current_dataset[name].release_memory() pure_train_cost = time.time() - pure_train_begin diff --git a/kagle/trainer/kagle_trainer.py b/kagle/trainer/kagle_trainer.py index 442293b7aab21aad3e72d8bb008a0d210cb4fc20..d4fd239b858a088496451d416fc07e51c57a4399 100755 --- a/kagle/trainer/kagle_trainer.py +++ b/kagle/trainer/kagle_trainer.py @@ -8,7 +8,7 @@ import time class Trainer(object): """R """ - __metaclass__ = self.ABCMeta + __metaclass__ = abc.ABCMeta def __init__(self, config): """R """