提交 192682ad 编写于 作者: X xiexionghang

add gloo support and fix some barrier bug

上级 520c7780
...@@ -133,9 +133,7 @@ class FileHandler(object): ...@@ -133,9 +133,7 @@ class FileHandler(object):
"""R """R
""" """
if is_afs_path(path): if is_afs_path(path):
print("xxh go cat " + path)
hdfs_cat = self._hdfs_client.cat(path) hdfs_cat = self._hdfs_client.cat(path)
print(hdfs_cat)
return hdfs_cat return hdfs_cat
else: else:
return self._local_fs_client.cat(path) return self._local_fs_client.cat(path)
...@@ -146,9 +144,10 @@ class FileHandler(object): ...@@ -146,9 +144,10 @@ class FileHandler(object):
files = [] files = []
if is_afs_path(path): if is_afs_path(path):
files = self._hdfs_client.ls(path) files = self._hdfs_client.ls(path)
files = [path + '/' + self.get_file_name(fi) for fi in files] # absulte path
else: else:
files = self._local_fs_client.ls(path) files = self._local_fs_client.ls(path)
files = [path + '/' + fi for fi in files] files = [path + '/' + fi for fi in files] # absulte path
return files return files
def cp(self, org_path, dest_path): def cp(self, org_path, dest_path):
......
...@@ -199,6 +199,17 @@ class PaddleAUCMetric(Metric): ...@@ -199,6 +199,17 @@ class PaddleAUCMetric(Metric):
self._metric_dict = params['metric_dict'] self._metric_dict = params['metric_dict']
fleet._role_maker._barrier_worker() fleet._role_maker._barrier_worker()
result = self.get_global_metrics(scope, self._metric_dict) result = self.get_global_metrics(scope, self._metric_dict)
if result['total_ins_num'] == 0:
self._result = result
self._result['auc'] = 0
self._result['bucket_error'] = 0
self._result['actual_ctr'] = 0
self._result['predict_ctr'] = 0
self._result['mae'] = 0
self._result['rmse'] = 0
self._result['copc'] = 0
self._result['mean_q'] = 0
return self._result
if 'stat_pos' in result and 'stat_neg' in result: if 'stat_pos' in result and 'stat_neg' in result:
result['auc'] = self.calculate_auc(result['stat_pos'], result['stat_neg']) result['auc'] = self.calculate_auc(result['stat_pos'], result['stat_neg'])
result['bucket_error'] = self.calculate_auc(result['stat_pos'], result['stat_neg']) result['bucket_error'] = self.calculate_auc(result['stat_pos'], result['stat_neg'])
......
...@@ -53,46 +53,43 @@ def make_datetime(date_str, fmt=None): ...@@ -53,46 +53,43 @@ def make_datetime(date_str, fmt=None):
return datetime.datetime.strptime(date_str, fmt) return datetime.datetime.strptime(date_str, fmt)
def wroker_numric_opt(value, opt): def wroker_numric_opt(value, env, opt):
""" """
numric count opt for workers numric count opt for workers
Args: Args:
value: value for count value: value for count
env: mpi/gloo
opt: count operator, SUM/MAX/MIN/AVG opt: count operator, SUM/MAX/MIN/AVG
Return: Return:
count result count result
""" """
local_value = np.array([value]) local_value = np.array([value])
global_value = np.copy(local_value) * 0 global_value = np.copy(local_value) * 0
fleet._role_maker._node_type_comm.Allreduce(local_value, global_value, op=opt) fleet._role_maker.all_reduce_worker(local_value, global_value, opt)
return global_value[0] return global_value[0]
def worker_numric_sum(value, env="mpi"):
def worker_numric_sum(value):
"""R """R
""" """
from mpi4py import MPI return wroker_numric_opt(value, env, "sum")
return wroker_numric_opt(value, MPI.SUM)
def worker_numric_avg(value): def worker_numric_avg(value, env="mpi"):
"""R """R
""" """
return worker_numric_sum(value) / fleet.worker_num() return worker_numric_sum(value, env) / fleet.worker_num()
def worker_numric_min(value): def worker_numric_min(value, env="mpi"):
"""R """R
""" """
from mpi4py import MPI return wroker_numric_opt(value, env, "min")
return wroker_numric_opt(value, MPI.MIN)
def worker_numric_max(value): def worker_numric_max(value, env="mpi"):
"""R """R
""" """
from mpi4py import MPI return wroker_numric_opt(value, env, "max")
return wroker_numric_opt(value, MPI.MAX)
def rank0_print(log_str): def rank0_print(log_str):
...@@ -267,7 +264,6 @@ class TimeTrainPass(object): ...@@ -267,7 +264,6 @@ class TimeTrainPass(object):
self._pass_id = pass_id self._pass_id = pass_id
mins = self._interval_per_pass * (pass_id - 1) mins = self._interval_per_pass * (pass_id - 1)
self._current_train_time = date_time + datetime.timedelta(minutes=mins) self._current_train_time = date_time + datetime.timedelta(minutes=mins)
print(self._current_train_time)
def init_pass_by_time(self, datetime_str): def init_pass_by_time(self, datetime_str):
""" """
......
...@@ -16,6 +16,7 @@ import kagle.kagle_metric as kagle_metric ...@@ -16,6 +16,7 @@ import kagle.kagle_metric as kagle_metric
import kagle.kagle_dataset as kagle_dataset import kagle.kagle_dataset as kagle_dataset
import kagle.trainer.kagle_trainer as kagle_trainer import kagle.trainer.kagle_trainer as kagle_trainer
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker
class AbacusPaddleTrainer(kagle_trainer.Trainer): class AbacusPaddleTrainer(kagle_trainer.Trainer):
"""R """R
...@@ -52,7 +53,14 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): ...@@ -52,7 +53,14 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer):
def init(self, context): def init(self, context):
"""R """R
""" """
fleet.init(self._exe) role_maker = None
if self.global_config.get('process_mode', 'mpi') == 'brilliant_cpu':
afs_config = self.global_config['io']['afs']
role_maker = fluid.incubate.fleet.base.role_maker.GeneralRoleMaker(
hdfs_name=afs_config['fs_name'], hdfs_ugi=afs_config['fs_ugi'],
path=self.global_config['output_path'] + "/gloo",
init_timeout_seconds=1200, run_timeout_seconds=1200)
fleet.init(role_maker)
data_var_list = [] data_var_list = []
data_var_name_dict = {} data_var_name_dict = {}
runnnable_scope = [] runnnable_scope = []
...@@ -136,7 +144,8 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): ...@@ -136,7 +144,8 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer):
save_mode = 3 # unseen_day++, save all save_mode = 3 # unseen_day++, save all
kagle_util.rank0_print("going to save_model %s" % model_path) kagle_util.rank0_print("going to save_model %s" % model_path)
fleet.save_persistables(None, model_path, mode=save_mode) fleet.save_persistables(None, model_path, mode=save_mode)
self._train_pass.save_train_progress(day, pass_index, base_key, model_path, is_checkpoint=True) if fleet._role_maker.is_first_worker():
self._train_pass.save_train_progress(day, pass_index, base_key, model_path, is_checkpoint=True)
cost_printer.done() cost_printer.done()
return model_path return model_path
...@@ -180,17 +189,19 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): ...@@ -180,17 +189,19 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer):
} }
cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True,
'log_format': 'save dense model cost %s sec', 'stdout': stdout_str}) 'log_format': 'save dense model cost %s sec', 'stdout': stdout_str})
for executor in self.global_config['executor']: if fleet._role_maker.is_first_worker():
if 'layer_for_inference' not in executor: for executor in self.global_config['executor']:
continue if 'layer_for_inference' not in executor:
executor_name = executor['name'] continue
model = self._exector_context[executor_name]['model'] executor_name = executor['name']
save_env_param['inference_list'] = executor['layer_for_inference'] model = self._exector_context[executor_name]['model']
save_env_param['scope'] = self._exector_context[executor_name]['scope'] save_env_param['inference_list'] = executor['layer_for_inference']
model.dump_inference_param(save_env_param) save_env_param['scope'] = self._exector_context[executor_name]['scope']
for dnn_layer in executor['layer_for_inference']: model.dump_inference_param(save_env_param)
model_file_handler.cp(dnn_layer['save_file_name'], for dnn_layer in executor['layer_for_inference']:
model_path + '/dnn_plugin/' + dnn_layer['save_file_name']) model_file_handler.cp(dnn_layer['save_file_name'],
model_path + '/dnn_plugin/' + dnn_layer['save_file_name'])
fleet._role_maker._barrier_worker()
cost_printer.done() cost_printer.done()
xbox_done_info = { xbox_done_info = {
...@@ -206,9 +217,11 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): ...@@ -206,9 +217,11 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer):
"job_id": kagle_util.get_env_value("JOB_ID"), "job_id": kagle_util.get_env_value("JOB_ID"),
"job_name": kagle_util.get_env_value("JOB_NAME") "job_name": kagle_util.get_env_value("JOB_NAME")
} }
model_file_handler.write(json.dumps(xbox_done_info) + "\n", xbox_model_donefile, 'a') if fleet._role_maker.is_first_worker():
if pass_index > 0: model_file_handler.write(json.dumps(xbox_done_info) + "\n", xbox_model_donefile, 'a')
self._train_pass.save_train_progress(day, pass_index, xbox_base_key, model_path, is_checkpoint=False) if pass_index > 0:
self._train_pass.save_train_progress(day, pass_index, xbox_base_key, model_path, is_checkpoint=False)
fleet._role_maker._barrier_worker()
return stdout_str return stdout_str
def run_executor(self, executor_config, dataset, stdout_str): def run_executor(self, executor_config, dataset, stdout_str):
...@@ -239,6 +252,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): ...@@ -239,6 +252,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer):
kagle_util.rank0_print("End " + executor_name + " pass") kagle_util.rank0_print("End " + executor_name + " pass")
if self._train_pass.need_dump_inference(pass_id) and executor_config['dump_inference_model']: if self._train_pass.need_dump_inference(pass_id) and executor_config['dump_inference_model']:
stdout_str += self.save_xbox_model(day, pass_id, xbox_base_key, monitor_data) stdout_str += self.save_xbox_model(day, pass_id, xbox_base_key, monitor_data)
fleet._role_maker._barrier_worker()
def startup(self, context): def startup(self, context):
"""R """R
...@@ -305,6 +319,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): ...@@ -305,6 +319,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer):
kagle_util.rank0_print("going to save batch model") kagle_util.rank0_print("going to save batch model")
self.save_model(next_date, 0, xbox_base_key) self.save_model(next_date, 0, xbox_base_key)
self._train_pass._base_key = xbox_base_key self._train_pass._base_key = xbox_base_key
fleet._role_maker._barrier_worker()
def train_pass(self, context): def train_pass(self, context):
"""R """R
...@@ -325,6 +340,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): ...@@ -325,6 +340,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer):
'node_num': fleet.worker_num(), 'node_idx': fleet.worker_index(), 'node_num': fleet.worker_num(), 'node_idx': fleet.worker_index(),
'begin_time': pass_time, 'time_window_min': self._train_pass._interval_per_pass 'begin_time': pass_time, 'time_window_min': self._train_pass._interval_per_pass
}) })
fleet._role_maker._barrier_worker()
cost_printer.done() cost_printer.done()
kagle_util.rank0_print("going to global shuffle") kagle_util.rank0_print("going to global shuffle")
...@@ -335,6 +351,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): ...@@ -335,6 +351,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer):
current_dataset[name].global_shuffle(fleet, self.global_config['dataset']['shuffle_thread']) current_dataset[name].global_shuffle(fleet, self.global_config['dataset']['shuffle_thread'])
cost_printer.done() cost_printer.done()
# str(dataset.get_shuffle_data_size(fleet)) # str(dataset.get_shuffle_data_size(fleet))
fleet._role_maker._barrier_worker()
if self.global_config['prefetch_data']: if self.global_config['prefetch_data']:
next_pass_time = (self._train_pass._current_train_time + next_pass_time = (self._train_pass._current_train_time +
...@@ -345,6 +362,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): ...@@ -345,6 +362,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer):
'begin_time': next_pass_time, 'time_window_min': self._train_pass._interval_per_pass 'begin_time': next_pass_time, 'time_window_min': self._train_pass._interval_per_pass
}) })
fleet._role_maker._barrier_worker()
pure_train_begin = time.time() pure_train_begin = time.time()
for executor in self.global_config['executor']: for executor in self.global_config['executor']:
self.run_executor(executor, current_dataset[executor['dataset_name']], stdout_str) self.run_executor(executor, current_dataset[executor['dataset_name']], stdout_str)
...@@ -367,6 +385,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): ...@@ -367,6 +385,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer):
kagle_util.rank0_print(log_str) kagle_util.rank0_print(log_str)
stdout_str += kagle_util.now_time_str() + log_str stdout_str += kagle_util.now_time_str() + log_str
sys.stdout.write(stdout_str) sys.stdout.write(stdout_str)
fleet._role_maker._barrier_worker()
stdout_str = "" stdout_str = ""
if pass_id == self._train_pass.max_pass_num_day(): if pass_id == self._train_pass.max_pass_num_day():
context['status'] = 'end_day' context['status'] = 'end_day'
......
...@@ -41,7 +41,7 @@ class Trainer(object): ...@@ -41,7 +41,7 @@ class Trainer(object):
None, just sleep in base None, just sleep in base
""" """
print('unknow context_status:%s, do nothing' % context['status']) print('unknow context_status:%s, do nothing' % context['status'])
time.sleep(60) time.sleep(60)
def reload_train_context(self): def reload_train_context(self):
""" """
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册