提交 69701c7b 编写于 作者: X xiexionghang

depend on paddle with bcloud

上级 9042cb45
...@@ -231,7 +231,7 @@ class FluidModel(Model): ...@@ -231,7 +231,7 @@ class FluidModel(Model):
scope = params['scope'] scope = params['scope']
executor = params['executor'] executor = params['executor']
program = self._build_param['model']['train_program'] program = self._build_param['model']['train_program']
for table_name,table in self._build_param['table'].items(): for table_name, table in self._build_param['table'].items():
fleet._fleet_ptr.pull_dense(scope, table['_meta']._table_id, table['params']) fleet._fleet_ptr.pull_dense(scope, table['_meta']._table_id, table['params'])
for infernce_item in params['inference_list']: for infernce_item in params['inference_list']:
params_name_list = self.inference_params(infernce_item['layer_name']) params_name_list = self.inference_params(infernce_item['layer_name'])
......
...@@ -15,12 +15,14 @@ def get_env_value(env_name): ...@@ -15,12 +15,14 @@ def get_env_value(env_name):
""" """
return os.popen("echo -n ${" + env_name + "}").read().strip() return os.popen("echo -n ${" + env_name + "}").read().strip()
def now_time_str(): def now_time_str():
""" """
get current format str_time get current format str_time
""" """
return "\n" + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "[0]:" return "\n" + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "[0]:"
def get_absolute_path(path, params): def get_absolute_path(path, params):
"""R """R
""" """
...@@ -33,6 +35,7 @@ def get_absolute_path(path, params): ...@@ -33,6 +35,7 @@ def get_absolute_path(path, params):
else: else:
return path return path
def make_datetime(date_str, fmt = None): def make_datetime(date_str, fmt = None):
""" """
create a datetime instance by date_string create a datetime instance by date_string
...@@ -64,23 +67,27 @@ def wroker_numric_opt(value, opt): ...@@ -64,23 +67,27 @@ def wroker_numric_opt(value, opt):
fleet._role_maker._node_type_comm.Allreduce(local_value, global_value, op=opt) fleet._role_maker._node_type_comm.Allreduce(local_value, global_value, op=opt)
return global_value[0] return global_value[0]
def worker_numric_sum(value): def worker_numric_sum(value):
"""R """R
""" """
from mpi4py import MPI from mpi4py import MPI
return wroker_numric_opt(value, MPI.SUM) return wroker_numric_opt(value, MPI.SUM)
def worker_numric_avg(value): def worker_numric_avg(value):
"""R """R
""" """
return worker_numric_sum(value) / fleet.worker_num() return worker_numric_sum(value) / fleet.worker_num()
def worker_numric_min(value): def worker_numric_min(value):
"""R """R
""" """
from mpi4py import MPI from mpi4py import MPI
return wroker_numric_opt(value, MPI.MIN) return wroker_numric_opt(value, MPI.MIN)
def worker_numric_max(value): def worker_numric_max(value):
"""R """R
""" """
...@@ -93,6 +100,7 @@ def rank0_print(log_str): ...@@ -93,6 +100,7 @@ def rank0_print(log_str):
""" """
print_log(log_str, {'master': True}) print_log(log_str, {'master': True})
def print_log(log_str, params): def print_log(log_str, params):
"""R """R
""" """
...@@ -105,6 +113,7 @@ def print_log(log_str, params): ...@@ -105,6 +113,7 @@ def print_log(log_str, params):
if 'stdout' in params: if 'stdout' in params:
params['stdout'] += str(datetime.datetime.now()) + log_str params['stdout'] += str(datetime.datetime.now()) + log_str
def print_cost(cost, params): def print_cost(cost, params):
"""R """R
""" """
...@@ -113,7 +122,7 @@ def print_cost(cost, params): ...@@ -113,7 +122,7 @@ def print_cost(cost, params):
return log_str return log_str
class CostPrinter: class CostPrinter(object):
""" """
For count cost time && print cost log For count cost time && print cost log
""" """
...@@ -147,7 +156,8 @@ class CostPrinter: ...@@ -147,7 +156,8 @@ class CostPrinter:
self._done = True self._done = True
return cost, log_str return cost, log_str
class PathGenerator:
class PathGenerator(object):
""" """
generate path with template & runtime variables generate path with template & runtime variables
""" """
...@@ -178,7 +188,7 @@ class PathGenerator: ...@@ -178,7 +188,7 @@ class PathGenerator:
return "" return ""
class TimeTrainPass: class TimeTrainPass(object):
""" """
timely pass timely pass
define pass time_interval && start_time && end_time define pass time_interval && start_time && end_time
...@@ -266,7 +276,7 @@ class TimeTrainPass: ...@@ -266,7 +276,7 @@ class TimeTrainPass:
date_str: example "20200110000" -> "%Y%m%d%H%M" date_str: example "20200110000" -> "%Y%m%d%H%M"
""" """
self._current_train_time = make_datetime(datetime_str) self._current_train_time = make_datetime(datetime_str)
minus = self._current_train_time.hour * 60 + self._current_train_time.minute; minus = self._current_train_time.hour * 60 + self._current_train_time.minute
self._pass_id = minus / self._interval_per_pass + 1 self._pass_id = minus / self._interval_per_pass + 1
def current_pass(self): def current_pass(self):
......
...@@ -32,7 +32,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): ...@@ -32,7 +32,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer):
self._exector_context = {} self._exector_context = {}
self._metrics = {} self._metrics = {}
self._path_generator = kagle_util.PathGenerator({ self._path_generator = kagle_util.PathGenerator({
'templates' : [ 'templates': [
{'name': 'xbox_base_done', 'template': config['output_path'] + '/xbox_base_done.txt'}, {'name': 'xbox_base_done', 'template': config['output_path'] + '/xbox_base_done.txt'},
{'name': 'xbox_delta_done', 'template': config['output_path'] + '/xbox_patch_done.txt'}, {'name': 'xbox_delta_done', 'template': config['output_path'] + '/xbox_patch_done.txt'},
{'name': 'xbox_base', 'template': config['output_path'] + '/xbox/{day}/base/'}, {'name': 'xbox_base', 'template': config['output_path'] + '/xbox/{day}/base/'},
...@@ -75,8 +75,8 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): ...@@ -75,8 +75,8 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer):
data_var_name_dict[var.name] = var data_var_name_dict[var.name] = var
optimizer = kagle_model.FluidModel.build_optimizer({ optimizer = kagle_model.FluidModel.build_optimizer({
'metrics' : self._metrics, 'metrics': self._metrics,
'optimizer_conf' : self.global_config['optimizer'] 'optimizer_conf': self.global_config['optimizer']
}) })
optimizer.minimize(runnnable_cost_op, runnnable_scope) optimizer.minimize(runnnable_cost_op, runnnable_scope)
for executor in self.global_config['executor']: for executor in self.global_config['executor']:
...@@ -227,7 +227,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): ...@@ -227,7 +227,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer):
self._exe.train_from_dataset(program, dataset, scope, self._exe.train_from_dataset(program, dataset, scope,
thread=executor_config['train_thread_num'], debug=self.global_config['debug']) thread=executor_config['train_thread_num'], debug=self.global_config['debug'])
end = time.time() end = time.time()
local_cost = (end-begin) / 60.0 local_cost = (end - begin) / 60.0
avg_cost = kagle_util.worker_numric_avg(local_cost) avg_cost = kagle_util.worker_numric_avg(local_cost)
min_cost = kagle_util.worker_numric_min(local_cost) min_cost = kagle_util.worker_numric_min(local_cost)
max_cost = kagle_util.worker_numric_max(local_cost) max_cost = kagle_util.worker_numric_max(local_cost)
...@@ -251,7 +251,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): ...@@ -251,7 +251,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer):
self._train_pass = kagle_util.TimeTrainPass(self.global_config) self._train_pass = kagle_util.TimeTrainPass(self.global_config)
if not self.global_config['cold_start']: if not self.global_config['cold_start']:
cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, cost_printer = kagle_util.CostPrinter(kagle_util.print_cost,
{'master': True, 'log_format' : 'load model cost %s sec', 'stdout' : stdout_str}) {'master': True, 'log_format': 'load model cost %s sec', 'stdout': stdout_str})
self.print_log("going to load model %s" % self._train_pass._checkpoint_model_path, {'master': True}) self.print_log("going to load model %s" % self._train_pass._checkpoint_model_path, {'master': True})
#if config.need_reqi_changeslot and config.reqi_dnn_plugin_day >= self._train_pass.date() #if config.need_reqi_changeslot and config.reqi_dnn_plugin_day >= self._train_pass.date()
# and config.reqi_dnn_plugin_pass >= self._pass_id: # and config.reqi_dnn_plugin_pass >= self._pass_id:
...@@ -317,7 +317,8 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer): ...@@ -317,7 +317,8 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer):
self.print_log(" ==== begin delta:%s ========" % pass_id, {'master': True, 'stdout': stdout_str}) self.print_log(" ==== begin delta:%s ========" % pass_id, {'master': True, 'stdout': stdout_str})
train_begin_time = time.time() train_begin_time = time.time()
cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, 'log_format': 'load into memory done, cost %s sec', 'stdout': stdout_str}) cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, \
{'master': True, 'log_format': 'load into memory done, cost %s sec', 'stdout': stdout_str})
current_dataset = {} current_dataset = {}
for name in self._dataset: for name in self._dataset:
current_dataset[name] = self._dataset[name].load_dataset({ current_dataset[name] = self._dataset[name].load_dataset({
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册