From 69701c7b14c5cef2ab25c08a5d5461e6f47d13b5 Mon Sep 17 00:00:00 2001 From: xiexionghang Date: Fri, 6 Mar 2020 03:16:53 +0800 Subject: [PATCH] depend on paddle with bcloud --- kagle/kagle_model.py | 2 +- kagle/kagle_util.py | 18 ++++++++++++++---- kagle/trainer/abacus_trainer.py | 13 +++++++------ 3 files changed, 22 insertions(+), 11 deletions(-) diff --git a/kagle/kagle_model.py b/kagle/kagle_model.py index e94d8f78..7a695fc7 100755 --- a/kagle/kagle_model.py +++ b/kagle/kagle_model.py @@ -231,7 +231,7 @@ class FluidModel(Model): scope = params['scope'] executor = params['executor'] program = self._build_param['model']['train_program'] - for table_name,table in self._build_param['table'].items(): + for table_name, table in self._build_param['table'].items(): fleet._fleet_ptr.pull_dense(scope, table['_meta']._table_id, table['params']) for infernce_item in params['inference_list']: params_name_list = self.inference_params(infernce_item['layer_name']) diff --git a/kagle/kagle_util.py b/kagle/kagle_util.py index 11b1156e..5b8803e9 100755 --- a/kagle/kagle_util.py +++ b/kagle/kagle_util.py @@ -15,12 +15,14 @@ def get_env_value(env_name): """ return os.popen("echo -n ${" + env_name + "}").read().strip() + def now_time_str(): """ get current format str_time """ return "\n" + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "[0]:" + def get_absolute_path(path, params): """R """ @@ -33,6 +35,7 @@ def get_absolute_path(path, params): else: return path + def make_datetime(date_str, fmt = None): """ create a datetime instance by date_string @@ -64,23 +67,27 @@ def wroker_numric_opt(value, opt): fleet._role_maker._node_type_comm.Allreduce(local_value, global_value, op=opt) return global_value[0] + def worker_numric_sum(value): """R """ from mpi4py import MPI return wroker_numric_opt(value, MPI.SUM) + def worker_numric_avg(value): """R """ return worker_numric_sum(value) / fleet.worker_num() + def worker_numric_min(value): """R """ from mpi4py import MPI return wroker_numric_opt(value, MPI.MIN) + def worker_numric_max(value): """R """ @@ -93,6 +100,7 @@ def rank0_print(log_str): """ print_log(log_str, {'master': True}) + def print_log(log_str, params): """R """ @@ -105,6 +113,7 @@ def print_log(log_str, params): if 'stdout' in params: params['stdout'] += str(datetime.datetime.now()) + log_str + def print_cost(cost, params): """R """ @@ -113,7 +122,7 @@ def print_cost(cost, params): return log_str -class CostPrinter: +class CostPrinter(object): """ For count cost time && print cost log """ @@ -147,7 +156,8 @@ class CostPrinter: self._done = True return cost, log_str -class PathGenerator: + +class PathGenerator(object): """ generate path with template & runtime variables """ @@ -178,7 +188,7 @@ class PathGenerator: return "" -class TimeTrainPass: +class TimeTrainPass(object): """ timely pass define pass time_interval && start_time && end_time @@ -266,7 +276,7 @@ class TimeTrainPass: date_str: example "20200110000" -> "%Y%m%d%H%M" """ self._current_train_time = make_datetime(datetime_str) - minus = self._current_train_time.hour * 60 + self._current_train_time.minute; + minus = self._current_train_time.hour * 60 + self._current_train_time.minute self._pass_id = minus / self._interval_per_pass + 1 def current_pass(self): diff --git a/kagle/trainer/abacus_trainer.py b/kagle/trainer/abacus_trainer.py index c8a38fae..be1594ef 100755 --- a/kagle/trainer/abacus_trainer.py +++ b/kagle/trainer/abacus_trainer.py @@ -32,7 +32,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): self._exector_context = {} self._metrics = {} self._path_generator = kagle_util.PathGenerator({ - 'templates' : [ + 'templates': [ {'name': 'xbox_base_done', 'template': config['output_path'] + '/xbox_base_done.txt'}, {'name': 'xbox_delta_done', 'template': config['output_path'] + '/xbox_patch_done.txt'}, {'name': 'xbox_base', 'template': config['output_path'] + '/xbox/{day}/base/'}, @@ -75,8 +75,8 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): data_var_name_dict[var.name] = var optimizer = kagle_model.FluidModel.build_optimizer({ - 'metrics' : self._metrics, - 'optimizer_conf' : self.global_config['optimizer'] + 'metrics': self._metrics, + 'optimizer_conf': self.global_config['optimizer'] }) optimizer.minimize(runnnable_cost_op, runnnable_scope) for executor in self.global_config['executor']: @@ -227,7 +227,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): self._exe.train_from_dataset(program, dataset, scope, thread=executor_config['train_thread_num'], debug=self.global_config['debug']) end = time.time() - local_cost = (end-begin) / 60.0 + local_cost = (end - begin) / 60.0 avg_cost = kagle_util.worker_numric_avg(local_cost) min_cost = kagle_util.worker_numric_min(local_cost) max_cost = kagle_util.worker_numric_max(local_cost) @@ -251,7 +251,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): self._train_pass = kagle_util.TimeTrainPass(self.global_config) if not self.global_config['cold_start']: cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, - {'master': True, 'log_format' : 'load model cost %s sec', 'stdout' : stdout_str}) + {'master': True, 'log_format': 'load model cost %s sec', 'stdout': stdout_str}) self.print_log("going to load model %s" % self._train_pass._checkpoint_model_path, {'master': True}) #if config.need_reqi_changeslot and config.reqi_dnn_plugin_day >= self._train_pass.date() # and config.reqi_dnn_plugin_pass >= self._pass_id: @@ -317,7 +317,8 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): self.print_log(" ==== begin delta:%s ========" % pass_id, {'master': True, 'stdout': stdout_str}) train_begin_time = time.time() - cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, 'log_format': 'load into memory done, cost %s sec', 'stdout': stdout_str}) + cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, \ + {'master': True, 'log_format': 'load into memory done, cost %s sec', 'stdout': stdout_str}) current_dataset = {} for name in self._dataset: current_dataset[name] = self._dataset[name].load_dataset({ -- GitLab