From 192682addc2ec62d72c9420b297d4148a8ab1061 Mon Sep 17 00:00:00 2001 From: xiexionghang Date: Mon, 23 Mar 2020 11:10:21 +0800 Subject: [PATCH] add gloo support and fix some barrier bug --- kagle/kagle_fs.py | 5 ++-- kagle/kagle_metric.py | 11 +++++++ kagle/kagle_util.py | 26 +++++++---------- kagle/trainer/abacus_trainer.py | 51 ++++++++++++++++++++++----------- kagle/trainer/kagle_trainer.py | 2 +- 5 files changed, 60 insertions(+), 35 deletions(-) diff --git a/kagle/kagle_fs.py b/kagle/kagle_fs.py index d2e51017..53352354 100755 --- a/kagle/kagle_fs.py +++ b/kagle/kagle_fs.py @@ -133,9 +133,7 @@ class FileHandler(object): """R """ if is_afs_path(path): - print("xxh go cat " + path) hdfs_cat = self._hdfs_client.cat(path) - print(hdfs_cat) return hdfs_cat else: return self._local_fs_client.cat(path) @@ -146,9 +144,10 @@ class FileHandler(object): files = [] if is_afs_path(path): files = self._hdfs_client.ls(path) + files = [path + '/' + self.get_file_name(fi) for fi in files] # absulte path else: files = self._local_fs_client.ls(path) - files = [path + '/' + fi for fi in files] + files = [path + '/' + fi for fi in files] # absulte path return files def cp(self, org_path, dest_path): diff --git a/kagle/kagle_metric.py b/kagle/kagle_metric.py index a0e66cfb..3919c6bf 100755 --- a/kagle/kagle_metric.py +++ b/kagle/kagle_metric.py @@ -199,6 +199,17 @@ class PaddleAUCMetric(Metric): self._metric_dict = params['metric_dict'] fleet._role_maker._barrier_worker() result = self.get_global_metrics(scope, self._metric_dict) + if result['total_ins_num'] == 0: + self._result = result + self._result['auc'] = 0 + self._result['bucket_error'] = 0 + self._result['actual_ctr'] = 0 + self._result['predict_ctr'] = 0 + self._result['mae'] = 0 + self._result['rmse'] = 0 + self._result['copc'] = 0 + self._result['mean_q'] = 0 + return self._result if 'stat_pos' in result and 'stat_neg' in result: result['auc'] = self.calculate_auc(result['stat_pos'], result['stat_neg']) result['bucket_error'] = self.calculate_auc(result['stat_pos'], result['stat_neg']) diff --git a/kagle/kagle_util.py b/kagle/kagle_util.py index 3e7a99e8..b64f7076 100755 --- a/kagle/kagle_util.py +++ b/kagle/kagle_util.py @@ -53,46 +53,43 @@ def make_datetime(date_str, fmt=None): return datetime.datetime.strptime(date_str, fmt) -def wroker_numric_opt(value, opt): +def wroker_numric_opt(value, env, opt): """ numric count opt for workers Args: value: value for count + env: mpi/gloo opt: count operator, SUM/MAX/MIN/AVG Return: count result """ local_value = np.array([value]) global_value = np.copy(local_value) * 0 - fleet._role_maker._node_type_comm.Allreduce(local_value, global_value, op=opt) + fleet._role_maker.all_reduce_worker(local_value, global_value, opt) return global_value[0] - -def worker_numric_sum(value): +def worker_numric_sum(value, env="mpi"): """R """ - from mpi4py import MPI - return wroker_numric_opt(value, MPI.SUM) + return wroker_numric_opt(value, env, "sum") -def worker_numric_avg(value): +def worker_numric_avg(value, env="mpi"): """R """ - return worker_numric_sum(value) / fleet.worker_num() + return worker_numric_sum(value, env) / fleet.worker_num() -def worker_numric_min(value): +def worker_numric_min(value, env="mpi"): """R """ - from mpi4py import MPI - return wroker_numric_opt(value, MPI.MIN) + return wroker_numric_opt(value, env, "min") -def worker_numric_max(value): +def worker_numric_max(value, env="mpi"): """R """ - from mpi4py import MPI - return wroker_numric_opt(value, MPI.MAX) + return wroker_numric_opt(value, env, "max") def rank0_print(log_str): @@ -267,7 +264,6 @@ class TimeTrainPass(object): self._pass_id = pass_id mins = self._interval_per_pass * (pass_id - 1) self._current_train_time = date_time + datetime.timedelta(minutes=mins) - print(self._current_train_time) def init_pass_by_time(self, datetime_str): """ diff --git a/kagle/trainer/abacus_trainer.py b/kagle/trainer/abacus_trainer.py index c9218dc7..641c85e3 100755 --- a/kagle/trainer/abacus_trainer.py +++ b/kagle/trainer/abacus_trainer.py @@ -16,6 +16,7 @@ import kagle.kagle_metric as kagle_metric import kagle.kagle_dataset as kagle_dataset import kagle.trainer.kagle_trainer as kagle_trainer from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet +from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker class AbacusPaddleTrainer(kagle_trainer.Trainer): """R @@ -52,7 +53,14 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): def init(self, context): """R """ - fleet.init(self._exe) + role_maker = None + if self.global_config.get('process_mode', 'mpi') == 'brilliant_cpu': + afs_config = self.global_config['io']['afs'] + role_maker = fluid.incubate.fleet.base.role_maker.GeneralRoleMaker( + hdfs_name=afs_config['fs_name'], hdfs_ugi=afs_config['fs_ugi'], + path=self.global_config['output_path'] + "/gloo", + init_timeout_seconds=1200, run_timeout_seconds=1200) + fleet.init(role_maker) data_var_list = [] data_var_name_dict = {} runnnable_scope = [] @@ -136,7 +144,8 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): save_mode = 3 # unseen_day++, save all kagle_util.rank0_print("going to save_model %s" % model_path) fleet.save_persistables(None, model_path, mode=save_mode) - self._train_pass.save_train_progress(day, pass_index, base_key, model_path, is_checkpoint=True) + if fleet._role_maker.is_first_worker(): + self._train_pass.save_train_progress(day, pass_index, base_key, model_path, is_checkpoint=True) cost_printer.done() return model_path @@ -180,17 +189,19 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): } cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, 'log_format': 'save dense model cost %s sec', 'stdout': stdout_str}) - for executor in self.global_config['executor']: - if 'layer_for_inference' not in executor: - continue - executor_name = executor['name'] - model = self._exector_context[executor_name]['model'] - save_env_param['inference_list'] = executor['layer_for_inference'] - save_env_param['scope'] = self._exector_context[executor_name]['scope'] - model.dump_inference_param(save_env_param) - for dnn_layer in executor['layer_for_inference']: - model_file_handler.cp(dnn_layer['save_file_name'], - model_path + '/dnn_plugin/' + dnn_layer['save_file_name']) + if fleet._role_maker.is_first_worker(): + for executor in self.global_config['executor']: + if 'layer_for_inference' not in executor: + continue + executor_name = executor['name'] + model = self._exector_context[executor_name]['model'] + save_env_param['inference_list'] = executor['layer_for_inference'] + save_env_param['scope'] = self._exector_context[executor_name]['scope'] + model.dump_inference_param(save_env_param) + for dnn_layer in executor['layer_for_inference']: + model_file_handler.cp(dnn_layer['save_file_name'], + model_path + '/dnn_plugin/' + dnn_layer['save_file_name']) + fleet._role_maker._barrier_worker() cost_printer.done() xbox_done_info = { @@ -206,9 +217,11 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): "job_id": kagle_util.get_env_value("JOB_ID"), "job_name": kagle_util.get_env_value("JOB_NAME") } - model_file_handler.write(json.dumps(xbox_done_info) + "\n", xbox_model_donefile, 'a') - if pass_index > 0: - self._train_pass.save_train_progress(day, pass_index, xbox_base_key, model_path, is_checkpoint=False) + if fleet._role_maker.is_first_worker(): + model_file_handler.write(json.dumps(xbox_done_info) + "\n", xbox_model_donefile, 'a') + if pass_index > 0: + self._train_pass.save_train_progress(day, pass_index, xbox_base_key, model_path, is_checkpoint=False) + fleet._role_maker._barrier_worker() return stdout_str def run_executor(self, executor_config, dataset, stdout_str): @@ -239,6 +252,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): kagle_util.rank0_print("End " + executor_name + " pass") if self._train_pass.need_dump_inference(pass_id) and executor_config['dump_inference_model']: stdout_str += self.save_xbox_model(day, pass_id, xbox_base_key, monitor_data) + fleet._role_maker._barrier_worker() def startup(self, context): """R @@ -305,6 +319,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): kagle_util.rank0_print("going to save batch model") self.save_model(next_date, 0, xbox_base_key) self._train_pass._base_key = xbox_base_key + fleet._role_maker._barrier_worker() def train_pass(self, context): """R @@ -325,6 +340,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): 'node_num': fleet.worker_num(), 'node_idx': fleet.worker_index(), 'begin_time': pass_time, 'time_window_min': self._train_pass._interval_per_pass }) + fleet._role_maker._barrier_worker() cost_printer.done() kagle_util.rank0_print("going to global shuffle") @@ -335,6 +351,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): current_dataset[name].global_shuffle(fleet, self.global_config['dataset']['shuffle_thread']) cost_printer.done() # str(dataset.get_shuffle_data_size(fleet)) + fleet._role_maker._barrier_worker() if self.global_config['prefetch_data']: next_pass_time = (self._train_pass._current_train_time + @@ -345,6 +362,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): 'begin_time': next_pass_time, 'time_window_min': self._train_pass._interval_per_pass }) + fleet._role_maker._barrier_worker() pure_train_begin = time.time() for executor in self.global_config['executor']: self.run_executor(executor, current_dataset[executor['dataset_name']], stdout_str) @@ -367,6 +385,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): kagle_util.rank0_print(log_str) stdout_str += kagle_util.now_time_str() + log_str sys.stdout.write(stdout_str) + fleet._role_maker._barrier_worker() stdout_str = "" if pass_id == self._train_pass.max_pass_num_day(): context['status'] = 'end_day' diff --git a/kagle/trainer/kagle_trainer.py b/kagle/trainer/kagle_trainer.py index d4fd239b..62e65a77 100755 --- a/kagle/trainer/kagle_trainer.py +++ b/kagle/trainer/kagle_trainer.py @@ -41,7 +41,7 @@ class Trainer(object): None, just sleep in base """ print('unknow context_status:%s, do nothing' % context['status']) - time.sleep(60) + time.sleep(60) def reload_train_context(self): """ -- GitLab