diff --git a/kagle/kagle_model.py b/kagle/kagle_model.py index e94d8f78ae26c53674f2e6e2c53e176738ea85cc..7a695fc7454230db019cff57ec9b54f2ea20459e 100755 --- a/kagle/kagle_model.py +++ b/kagle/kagle_model.py @@ -231,7 +231,7 @@ class FluidModel(Model): scope = params['scope'] executor = params['executor'] program = self._build_param['model']['train_program'] - for table_name,table in self._build_param['table'].items(): + for table_name, table in self._build_param['table'].items(): fleet._fleet_ptr.pull_dense(scope, table['_meta']._table_id, table['params']) for infernce_item in params['inference_list']: params_name_list = self.inference_params(infernce_item['layer_name']) diff --git a/kagle/kagle_util.py b/kagle/kagle_util.py index 11b1156ebd3bf155b5f78dcce65702c45d9580dc..5b8803e9f111861bbf838819aed82fdbf15b7175 100755 --- a/kagle/kagle_util.py +++ b/kagle/kagle_util.py @@ -15,12 +15,14 @@ def get_env_value(env_name): """ return os.popen("echo -n ${" + env_name + "}").read().strip() + def now_time_str(): """ get current format str_time """ return "\n" + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "[0]:" + def get_absolute_path(path, params): """R """ @@ -33,6 +35,7 @@ def get_absolute_path(path, params): else: return path + def make_datetime(date_str, fmt = None): """ create a datetime instance by date_string @@ -64,23 +67,27 @@ def wroker_numric_opt(value, opt): fleet._role_maker._node_type_comm.Allreduce(local_value, global_value, op=opt) return global_value[0] + def worker_numric_sum(value): """R """ from mpi4py import MPI return wroker_numric_opt(value, MPI.SUM) + def worker_numric_avg(value): """R """ return worker_numric_sum(value) / fleet.worker_num() + def worker_numric_min(value): """R """ from mpi4py import MPI return wroker_numric_opt(value, MPI.MIN) + def worker_numric_max(value): """R """ @@ -93,6 +100,7 @@ def rank0_print(log_str): """ print_log(log_str, {'master': True}) + def print_log(log_str, params): """R """ @@ -105,6 +113,7 @@ def print_log(log_str, params): if 'stdout' in params: params['stdout'] += str(datetime.datetime.now()) + log_str + def print_cost(cost, params): """R """ @@ -113,7 +122,7 @@ def print_cost(cost, params): return log_str -class CostPrinter: +class CostPrinter(object): """ For count cost time && print cost log """ @@ -147,7 +156,8 @@ class CostPrinter: self._done = True return cost, log_str -class PathGenerator: + +class PathGenerator(object): """ generate path with template & runtime variables """ @@ -178,7 +188,7 @@ class PathGenerator: return "" -class TimeTrainPass: +class TimeTrainPass(object): """ timely pass define pass time_interval && start_time && end_time @@ -266,7 +276,7 @@ class TimeTrainPass: date_str: example "20200110000" -> "%Y%m%d%H%M" """ self._current_train_time = make_datetime(datetime_str) - minus = self._current_train_time.hour * 60 + self._current_train_time.minute; + minus = self._current_train_time.hour * 60 + self._current_train_time.minute self._pass_id = minus / self._interval_per_pass + 1 def current_pass(self): diff --git a/kagle/trainer/abacus_trainer.py b/kagle/trainer/abacus_trainer.py index c8a38fae8ecacf6d4892b7d2d30264938ec3f530..be1594ef96545170ce49654039d9c664ef4a397d 100755 --- a/kagle/trainer/abacus_trainer.py +++ b/kagle/trainer/abacus_trainer.py @@ -32,7 +32,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): self._exector_context = {} self._metrics = {} self._path_generator = kagle_util.PathGenerator({ - 'templates' : [ + 'templates': [ {'name': 'xbox_base_done', 'template': config['output_path'] + '/xbox_base_done.txt'}, {'name': 'xbox_delta_done', 'template': config['output_path'] + '/xbox_patch_done.txt'}, {'name': 'xbox_base', 'template': config['output_path'] + '/xbox/{day}/base/'}, @@ -75,8 +75,8 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): data_var_name_dict[var.name] = var optimizer = kagle_model.FluidModel.build_optimizer({ - 'metrics' : self._metrics, - 'optimizer_conf' : self.global_config['optimizer'] + 'metrics': self._metrics, + 'optimizer_conf': self.global_config['optimizer'] }) optimizer.minimize(runnnable_cost_op, runnnable_scope) for executor in self.global_config['executor']: @@ -227,7 +227,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): self._exe.train_from_dataset(program, dataset, scope, thread=executor_config['train_thread_num'], debug=self.global_config['debug']) end = time.time() - local_cost = (end-begin) / 60.0 + local_cost = (end - begin) / 60.0 avg_cost = kagle_util.worker_numric_avg(local_cost) min_cost = kagle_util.worker_numric_min(local_cost) max_cost = kagle_util.worker_numric_max(local_cost) @@ -251,7 +251,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): self._train_pass = kagle_util.TimeTrainPass(self.global_config) if not self.global_config['cold_start']: cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, - {'master': True, 'log_format' : 'load model cost %s sec', 'stdout' : stdout_str}) + {'master': True, 'log_format': 'load model cost %s sec', 'stdout': stdout_str}) self.print_log("going to load model %s" % self._train_pass._checkpoint_model_path, {'master': True}) #if config.need_reqi_changeslot and config.reqi_dnn_plugin_day >= self._train_pass.date() # and config.reqi_dnn_plugin_pass >= self._pass_id: @@ -317,7 +317,8 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): self.print_log(" ==== begin delta:%s ========" % pass_id, {'master': True, 'stdout': stdout_str}) train_begin_time = time.time() - cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, 'log_format': 'load into memory done, cost %s sec', 'stdout': stdout_str}) + cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, \ + {'master': True, 'log_format': 'load into memory done, cost %s sec', 'stdout': stdout_str}) current_dataset = {} for name in self._dataset: current_dataset[name] = self._dataset[name].load_dataset({