From 154e5da21425346546b3ef024f03cfdb369d5b95 Mon Sep 17 00:00:00 2001 From: tangwei Date: Tue, 31 Mar 2020 11:07:38 +0800 Subject: [PATCH] rename to eleps --- models/base.py | 6 +- models/ctr_dnn/model.py | 32 ++++++++++ models/ctr_dnn/reader.py | 7 +++ reader/dataset.py | 10 +-- trainer/ctr_trainer.py | 80 ++++++++++++------------ trainer/{kagle_trainer.py => trainer.py} | 0 utils/{kagle_fs.py => fs.py} | 0 utils/{kagle_table.py => table.py} | 0 utils/{kagle_util.py => util.py} | 8 +-- 9 files changed, 92 insertions(+), 51 deletions(-) rename trainer/{kagle_trainer.py => trainer.py} (100%) rename utils/{kagle_fs.py => fs.py} (100%) rename utils/{kagle_table.py => table.py} (100%) rename utils/{kagle_util.py => util.py} (96%) diff --git a/models/base.py b/models/base.py index 2311f5e5..d651567d 100644 --- a/models/base.py +++ b/models/base.py @@ -5,7 +5,7 @@ import abc import copy import yaml import paddle.fluid as fluid -import kagle.utils.kagle_table as kagle_table +from ..utils import table as table from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet @@ -187,7 +187,7 @@ class YamlModel(Model): if self._build_nodes[phase] is None: continue for node in self._build_nodes[phase]: - exec("""layer=kagle_layer.{}(node)""".format(node['class'])) + exec("""layer=layer.{}(node)""".format(node['class'])) layer_output, extend_output = layer.generate(self._config['mode'], self._build_param) self._build_param['layer'][node['name']] = layer_output self._build_param['layer_extend'][node['name']] = extend_output @@ -208,7 +208,7 @@ class YamlModel(Model): param_name = inference_param['name'] if param_name not in self._build_param['table']: self._build_param['table'][param_name] = {'params' :[]} - table_meta = kagle_table.TableMeta.alloc_new_table(inference_param['table_id']) + table_meta = table.TableMeta.alloc_new_table(inference_param['table_id']) self._build_param['table'][param_name]['_meta'] = table_meta self._build_param['table'][param_name]['params'] += inference_param['params'] pass diff --git a/models/ctr_dnn/model.py b/models/ctr_dnn/model.py index e69de29b..017dc078 100644 --- a/models/ctr_dnn/model.py +++ b/models/ctr_dnn/model.py @@ -0,0 +1,32 @@ +class TrainModel(object): + def input(self): + pass + + def net(self): + pass + + def net(self): + pass + + def loss(self): + pass + + def optimizer(self): + pass + + +class InferModel(object): + def input(self): + pass + + def net(self): + pass + + def net(self): + pass + + def loss(self): + pass + + def optimizer(self): + pass diff --git a/models/ctr_dnn/reader.py b/models/ctr_dnn/reader.py index e69de29b..ed594e51 100644 --- a/models/ctr_dnn/reader.py +++ b/models/ctr_dnn/reader.py @@ -0,0 +1,7 @@ + +def TrainReader(): + pass + + +def InferReader(): + pass diff --git a/reader/dataset.py b/reader/dataset.py index 96c6fd50..c4171488 100755 --- a/reader/dataset.py +++ b/reader/dataset.py @@ -7,8 +7,8 @@ import yaml import time import datetime import paddle.fluid as fluid -import kagle.utils.kagle_fs as kagle_fs -import kagle.utils.kagle_util as kagle_util +from .. utils import fs as fs +from .. utils import util as util class Dataset(object): @@ -61,16 +61,16 @@ class TimeSplitDataset(Dataset): Dataset.__init__(self, config) if 'data_donefile' not in config or config['data_donefile'] is None: config['data_donefile'] = config['data_path'] + "/to.hadoop.done" - self._path_generator = kagle_util.PathGenerator({'templates': [ + self._path_generator = util.PathGenerator({'templates': [ {'name': 'data_path', 'template': config['data_path']}, {'name': 'donefile_path', 'template': config['data_donefile']} ]}) self._split_interval = config['split_interval'] # data split N mins per dir - self._data_file_handler = kagle_fs.FileHandler(config) + self._data_file_handler = fs.FileHandler(config) def _format_data_time(self, daytime_str, time_window_mins): """ """ - data_time = kagle_util.make_datetime(daytime_str) + data_time = util.make_datetime(daytime_str) mins_of_day = data_time.hour * 60 + data_time.minute begin_stage = mins_of_day / self._split_interval end_stage = (mins_of_day + time_window_mins) / self._split_interval diff --git a/trainer/ctr_trainer.py b/trainer/ctr_trainer.py index 93e282e4..62444dbc 100755 --- a/trainer/ctr_trainer.py +++ b/trainer/ctr_trainer.py @@ -12,12 +12,14 @@ import datetime import numpy as np import paddle.fluid as fluid -import kagle.utils.kagle_fs as kagle_fs -import kagle.utils.kagle_util as kagle_util -import kagle.kagle_model as kagle_model -import kagle.kagle_metric as kagle_metric -import kagle.reader.dataset as kagle_dataset -import kagle.trainer.kagle_trainer as kagle_trainer + +from .. utils import fs as fs +from .. utils import util as util +from .. metrics.auc_metrics import AUCMetric +from .. models import base as model_basic +from .. reader import dataset +from . import trainer + from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker @@ -62,22 +64,22 @@ def worker_numric_max(value, env="mpi"): return wroker_numric_opt(value, env, "max") -class CtrPaddleTrainer(kagle_trainer.Trainer): +class CtrPaddleTrainer(trainer.Trainer): """R """ def __init__(self, config): """R """ - kagle_trainer.Trainer.__init__(self, config) - config['output_path'] = kagle_util.get_absolute_path( + trainer.Trainer.__init__(self, config) + config['output_path'] = util.get_absolute_path( config['output_path'], config['io']['afs']) self.global_config = config self._place = fluid.CPUPlace() self._exe = fluid.Executor(self._place) self._exector_context = {} self._metrics = {} - self._path_generator = kagle_util.PathGenerator({ + self._path_generator = util.PathGenerator({ 'templates': [ {'name': 'xbox_base_done', 'template': config['output_path'] + '/xbox_base_done.txt'}, {'name': 'xbox_delta_done', 'template': config['output_path'] + '/xbox_patch_done.txt'}, @@ -116,7 +118,7 @@ class CtrPaddleTrainer(kagle_trainer.Trainer): scope = fluid.Scope() self._exector_context[executor['name']] = {} self._exector_context[executor['name']]['scope'] = scope - self._exector_context[executor['name']]['model'] = kagle_model.create(executor) + self._exector_context[executor['name']]['model'] = model_basic.create(executor) model = self._exector_context[executor['name']]['model'] self._metrics.update(model.get_metrics()) runnnable_scope.append(scope) @@ -127,7 +129,7 @@ class CtrPaddleTrainer(kagle_trainer.Trainer): data_var_list.append(var) data_var_name_dict[var.name] = var - optimizer = kagle_model.FluidModel.build_optimizer({ + optimizer = model_basic.YamlModel.build_optimizer({ 'metrics': self._metrics, 'optimizer_conf': self.global_config['optimizer'] }) @@ -153,7 +155,7 @@ class CtrPaddleTrainer(kagle_trainer.Trainer): dataset_item['data_vars'] = data_var_list dataset_item.update(self.global_config['io']['afs']) dataset_item["batch_size"] = self.global_config['batch_size'] - self._dataset[dataset_item['name']] = kagle_dataset.FluidTimeSplitDataset(dataset_item) + self._dataset[dataset_item['name']] = dataset.FluidTimeSplitDataset(dataset_item) # if config.need_reqi_changeslot and config.reqi_dnn_plugin_day >= last_day and config.reqi_dnn_plugin_pass >= last_pass: # util.reqi_changeslot(config.hdfs_dnn_plugin_path, join_save_params, common_save_params, update_save_params, scope2, scope3) fleet.init_worker() @@ -176,7 +178,7 @@ class CtrPaddleTrainer(kagle_trainer.Trainer): """R """ metrics = model.get_metrics() - metric_calculator = kagle_metric.AUCMetric(None) + metric_calculator = AUCMetric(None) for metric in metrics: metric_param = {'label': metric, 'metric_dict': metrics[metric]} metric_calculator.calculate(scope, metric_param) @@ -188,13 +190,13 @@ class CtrPaddleTrainer(kagle_trainer.Trainer): def save_model(self, day, pass_index, base_key): """R """ - cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, + cost_printer = util.CostPrinter(util.print_cost, {'master': True, 'log_format': 'save model cost %s sec'}) model_path = self._path_generator.generate_path('batch_model', {'day': day, 'pass_id': pass_index}) save_mode = 0 # just save all if pass_index < 1: # batch_model save_mode = 3 # unseen_day++, save all - kagle_util.rank0_print("going to save_model %s" % model_path) + util.rank0_print("going to save_model %s" % model_path) fleet.save_persistables(None, model_path, mode=save_mode) if fleet._role_maker.is_first_worker(): self._train_pass.save_train_progress(day, pass_index, base_key, model_path, is_checkpoint=True) @@ -206,11 +208,11 @@ class CtrPaddleTrainer(kagle_trainer.Trainer): """ stdout_str = "" xbox_patch_id = str(int(time.time())) - kagle_util.rank0_print("begin save delta model") + util.rank0_print("begin save delta model") model_path = "" xbox_model_donefile = "" - cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, \ + cost_printer = util.CostPrinter(util.print_cost, {'master': True, \ 'log_format': 'save xbox model cost %s sec', 'stdout': stdout_str}) if pass_index < 1: @@ -225,23 +227,23 @@ class CtrPaddleTrainer(kagle_trainer.Trainer): total_save_num = fleet.save_persistables(None, model_path, mode=save_mode) cost_printer.done() - cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, + cost_printer = util.CostPrinter(util.print_cost, {'master': True, 'log_format': 'save cache model cost %s sec', 'stdout': stdout_str}) - model_file_handler = kagle_fs.FileHandler(self.global_config['io']['afs']) + model_file_handler = fs.FileHandler(self.global_config['io']['afs']) if self.global_config['save_cache_model']: cache_save_num = fleet.save_cache_model(None, model_path, mode=save_mode) model_file_handler.write( "file_prefix:part\npart_num:16\nkey_num:%d\n" % cache_save_num, model_path + '/000_cache/sparse_cache.meta', 'w') cost_printer.done() - kagle_util.rank0_print("save xbox cache model done, key_num=%s" % cache_save_num) + util.rank0_print("save xbox cache model done, key_num=%s" % cache_save_num) save_env_param = { 'executor': self._exe, 'save_combine': True } - cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, + cost_printer = util.CostPrinter(util.print_cost, {'master': True, 'log_format': 'save dense model cost %s sec', 'stdout': stdout_str}) if fleet._role_maker.is_first_worker(): @@ -269,8 +271,8 @@ class CtrPaddleTrainer(kagle_trainer.Trainer): "monitor_data": monitor_data, "mpi_size": str(fleet.worker_num()), "input": model_path.rstrip("/") + "/000", - "job_id": kagle_util.get_env_value("JOB_ID"), - "job_name": kagle_util.get_env_value("JOB_NAME") + "job_id": util.get_env_value("JOB_ID"), + "job_name": util.get_env_value("JOB_NAME") } if fleet._role_maker.is_first_worker(): model_file_handler.write(json.dumps(xbox_done_info) + "\n", xbox_model_donefile, 'a') @@ -289,7 +291,7 @@ class CtrPaddleTrainer(kagle_trainer.Trainer): scope = self._exector_context[executor_name]['scope'] model = self._exector_context[executor_name]['model'] with fluid.scope_guard(scope): - kagle_util.rank0_print("Begin " + executor_name + " pass") + util.rank0_print("Begin " + executor_name + " pass") begin = time.time() program = model._build_param['model']['train_program'] self._exe.train_from_dataset(program, dataset, scope, @@ -299,12 +301,12 @@ class CtrPaddleTrainer(kagle_trainer.Trainer): avg_cost = worker_numric_avg(local_cost) min_cost = worker_numric_min(local_cost) max_cost = worker_numric_max(local_cost) - kagle_util.rank0_print("avg train time %s mins, min %s mins, max %s mins" % (avg_cost, min_cost, max_cost)) + util.rank0_print("avg train time %s mins, min %s mins, max %s mins" % (avg_cost, min_cost, max_cost)) self._exector_context[executor_name]['cost'] = max_cost monitor_data = "" self.print_global_metrics(scope, model, monitor_data, stdout_str) - kagle_util.rank0_print("End " + executor_name + " pass") + util.rank0_print("End " + executor_name + " pass") if self._train_pass.need_dump_inference(pass_id) and executor_config['dump_inference_model']: stdout_str += self.save_xbox_model(day, pass_id, xbox_base_key, monitor_data) fleet._role_maker._barrier_worker() @@ -317,9 +319,9 @@ class CtrPaddleTrainer(kagle_trainer.Trainer): context['status'] = 'wait' return stdout_str = "" - self._train_pass = kagle_util.TimeTrainPass(self.global_config) + self._train_pass = util.TimeTrainPass(self.global_config) if not self.global_config['cold_start']: - cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, + cost_printer = util.CostPrinter(util.print_cost, {'master': True, 'log_format': 'load model cost %s sec', 'stdout': stdout_str}) self.print_log("going to load model %s" % self._train_pass._checkpoint_model_path, {'master': True}) @@ -358,8 +360,8 @@ class CtrPaddleTrainer(kagle_trainer.Trainer): xbox_base_key = int(time.time()) context['status'] = 'begin_day' - kagle_util.rank0_print("shrink table") - cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, + util.rank0_print("shrink table") + cost_printer = util.CostPrinter(util.print_cost, {'master': True, 'log_format': 'shrink table done, cost %s sec'}) fleet.shrink_sparse_table() for executor in self._exector_context: @@ -370,9 +372,9 @@ class CtrPaddleTrainer(kagle_trainer.Trainer): cost_printer.done() next_date = self._train_pass.date(delta_day=1) - kagle_util.rank0_print("going to save xbox base model") + util.rank0_print("going to save xbox base model") self.save_xbox_model(next_date, 0, xbox_base_key, "") - kagle_util.rank0_print("going to save batch model") + util.rank0_print("going to save batch model") self.save_model(next_date, 0, xbox_base_key) self._train_pass._base_key = xbox_base_key fleet._role_maker._barrier_worker() @@ -388,7 +390,7 @@ class CtrPaddleTrainer(kagle_trainer.Trainer): self.print_log(" ==== begin delta:%s ========" % pass_id, {'master': True, 'stdout': stdout_str}) train_begin_time = time.time() - cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, \ + cost_printer = util.CostPrinter(util.print_cost, \ {'master': True, 'log_format': 'load into memory done, cost %s sec', 'stdout': stdout_str}) current_dataset = {} @@ -400,8 +402,8 @@ class CtrPaddleTrainer(kagle_trainer.Trainer): fleet._role_maker._barrier_worker() cost_printer.done() - kagle_util.rank0_print("going to global shuffle") - cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, { + util.rank0_print("going to global shuffle") + cost_printer = util.CostPrinter(util.print_cost, { 'master': True, 'stdout': stdout_str, 'log_format': 'global shuffle done, cost %s sec'}) for name in current_dataset: @@ -423,7 +425,7 @@ class CtrPaddleTrainer(kagle_trainer.Trainer): pure_train_begin = time.time() for executor in self.global_config['executor']: self.run_executor(executor, current_dataset[executor['dataset_name']], stdout_str) - cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, \ + cost_printer = util.CostPrinter(util.print_cost, \ {'master': True, 'log_format': 'release_memory cost %s sec'}) for name in current_dataset: current_dataset[name].release_memory() @@ -439,8 +441,8 @@ class CtrPaddleTrainer(kagle_trainer.Trainer): for executor in self._exector_context: log_str += '[' + executor + ':' + str(self._exector_context[executor]['cost']) + ']' log_str += '[other_cost:' + str(other_cost) + ']' - kagle_util.rank0_print(log_str) - stdout_str += kagle_util.now_time_str() + log_str + util.rank0_print(log_str) + stdout_str += util.now_time_str() + log_str sys.stdout.write(stdout_str) fleet._role_maker._barrier_worker() stdout_str = "" diff --git a/trainer/kagle_trainer.py b/trainer/trainer.py similarity index 100% rename from trainer/kagle_trainer.py rename to trainer/trainer.py diff --git a/utils/kagle_fs.py b/utils/fs.py similarity index 100% rename from utils/kagle_fs.py rename to utils/fs.py diff --git a/utils/kagle_table.py b/utils/table.py similarity index 100% rename from utils/kagle_table.py rename to utils/table.py diff --git a/utils/kagle_util.py b/utils/util.py similarity index 96% rename from utils/kagle_util.py rename to utils/util.py index 5173a793..7f0e3ca8 100755 --- a/utils/kagle_util.py +++ b/utils/util.py @@ -4,7 +4,7 @@ Util lib import os import time import datetime -import kagle.utils.kagle_fs as kagle_fs +from .. utils import fs as fs def get_env_value(env_name): @@ -168,10 +168,10 @@ class TimeTrainPass(object): self._pass_donefile_handler = None if 'pass_donefile_name' in self._config: self._train_pass_donefile = global_config['output_path'] + '/' + self._config['pass_donefile_name'] - if kagle_fs.is_afs_path(self._train_pass_donefile): - self._pass_donefile_handler = kagle_fs.FileHandler(global_config['io']['afs']) + if fs.is_afs_path(self._train_pass_donefile): + self._pass_donefile_handler = fs.FileHandler(global_config['io']['afs']) else: - self._pass_donefile_handler = kagle_fs.FileHandler(global_config['io']['local_fs']) + self._pass_donefile_handler = fs.FileHandler(global_config['io']['local_fs']) last_done = self._pass_donefile_handler.cat(self._train_pass_donefile).strip().split('\n')[-1] done_fileds = last_done.split('\t') -- GitLab