提交 9042cb45 编写于 作者: X xiexionghang

depend on paddle with bcloud

上级 61b1fd00
......@@ -77,6 +77,7 @@ class Model(object):
"""R
"""
pass
@abc.abstractmethod
def dump_inference_program(self, inference_layer, path):
"""R
......@@ -101,7 +102,8 @@ class Model(object):
if node['name'] not in self._inference_meta['dependency'][layer]:
continue
if 'inference_param' in self._build_param['layer_extend'][node['name']]:
self._inference_meta['params'][layer] += self._build_param['layer_extend'][node['name']]['inference_param']['params']
self._inference_meta['params'][layer] += \
self._build_param['layer_extend'][node['name']]['inference_param']['params']
return self._inference_meta['params'][layer]
def get_dependency(self, layer_graph, dest_layer):
......@@ -192,10 +194,10 @@ class FluidModel(Model):
metrics = params['metrics']
for name in metrics:
model_metrics = metrics[name]
stat_var_names += [ model_metrics[metric]['var'].name for metric in model_metrics]
stat_var_names += [model_metrics[metric]['var'].name for metric in model_metrics]
strategy['stat_var_names'] = list(set(stat_var_names))
optimizer_generator = 'optimizer = fluid.optimizer.' + optimizer_conf['class'] + \
'(learning_rate=' + str(optimizer_conf['learning_rate']) + ')'
'(learning_rate=' + str(optimizer_conf['learning_rate']) + ')'
exec(optimizer_generator)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
return optimizer
......@@ -233,12 +235,12 @@ class FluidModel(Model):
fleet._fleet_ptr.pull_dense(scope, table['_meta']._table_id, table['params'])
for infernce_item in params['inference_list']:
params_name_list = self.inference_params(infernce_item['layer_name'])
params_var_list = [ program.global_block().var(i) for i in params_name_list ]
params_var_list = [program.global_block().var(i) for i in params_name_list]
params_file_name = infernce_item['save_file_name']
with fluid.scope_guard(scope):
if params['save_combine']:
fluid.io.save_vars(
executor, "./", program, vars=params_var_list, filename=params_file_name)
fluid.io.save_vars(executor, "./", \
program, vars=params_var_list, filename=params_file_name)
else:
fluid.io.save_vars(executor, params_file_name, program, vars=params_var_list)
pass
"""
Util lib
"""
import os
import sys
import time
import datetime
import kagle_fs
import kagle.kagle_fs
import numpy as np
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
def get_env_value(env_name):
"""
get os environment value
"""
return os.popen("echo -n ${" + env_name + "}").read().strip()
def now_time_str():
return "\n" + time.strftime("%Y-%m-%d %H:%M:%S",time.localtime()) + "[0]:"
"""
get current format str_time
"""
return "\n" + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "[0]:"
def get_absolute_path(path, params):
"""R
"""
if path.startswith('afs:') or path.startswith('hdfs:'):
sub_path = path.split('fs:')[1]
if ':' in sub_path: #such as afs://xxx:prot/xxxx
......@@ -23,6 +34,14 @@ def get_absolute_path(path, params):
return path
def make_datetime(date_str, fmt = None):
"""
create a datetime instance by date_string
Args:
date_str: such as 2020-01-14
date_str_format: "%Y-%m-%d"
Return:
datetime
"""
if fmt is None:
if len(date_str) == 8: #%Y%m%d
return datetime.datetime.strptime(date_str, '%Y%m%d')
......@@ -32,28 +51,51 @@ def make_datetime(date_str, fmt = None):
def wroker_numric_opt(value, opt):
"""
numric count opt for workers
Args:
value: value for count
opt: count operator, SUM/MAX/MIN/AVG
Return:
count result
"""
local_value = np.array([value])
global_value = np.copy(local_value) * 0
fleet._role_maker._node_type_comm.Allreduce(local_value, global_value, op=opt)
return global_value[0]
def worker_numric_sum(value):
"""R
"""
from mpi4py import MPI
return wroker_numric_opt(value, MPI.SUM)
def worker_numric_avg(value):
"""R
"""
return worker_numric_sum(value) / fleet.worker_num()
def worker_numric_min(value):
"""R
"""
from mpi4py import MPI
return wroker_numric_opt(value, MPI.MIN)
def worker_numric_max(value):
"""R
"""
from mpi4py import MPI
return wroker_numric_opt(value, MPI.MAX)
def rank0_print(log_str):
"""R
"""
print_log(log_str, {'master': True})
def print_log(log_str, params):
"""R
"""
if params['master']:
if fleet.worker_index() == 0:
print(log_str)
......@@ -64,22 +106,33 @@ def print_log(log_str, params):
params['stdout'] += str(datetime.datetime.now()) + log_str
def print_cost(cost, params):
"""R
"""
log_str = params['log_format'] % cost
print_log(log_str, params)
return log_str
class CostPrinter:
"""
For count cost time && print cost log
"""
def __init__(self, callback, callback_params):
"""R
"""
self.reset(callback, callback_params)
pass
def __del__(self):
"""R
"""
if not self._done:
self.done()
pass
def reset(self, callback, callback_params):
"""R
"""
self._done = False
self._callback = callback
self._callback_params = callback_params
......@@ -87,24 +140,35 @@ class CostPrinter:
pass
def done(self):
"""R
"""
cost = time.time() - self._begin_time
log_str = self._callback(cost, self._callback_params) #cost(s)
self._done = True
return cost, log_str
class PathGenerator:
"""
generate path with template & runtime variables
"""
def __init__(self, config):
"""R
"""
self._templates = {}
self.add_path_template(config)
pass
def add_path_template(self, config):
"""R
"""
if 'templates' in config:
for template in config['templates']:
self._templates[template['name']] = template['template']
pass
def generate_path(self, template_name, param):
"""R
"""
if template_name in self._templates:
if 'time_format' in param:
str = param['time_format'].strftime(self._templates[template_name])
......@@ -113,8 +177,15 @@ class PathGenerator:
else:
return ""
class TimeTrainPass:
"""
timely pass
define pass time_interval && start_time && end_time
"""
def __init__(self, global_config):
"""R
"""
self._config = global_config['epoch']
if '+' in self._config['days']:
day_str = self._config['days'].replace(' ', '')
......@@ -156,9 +227,13 @@ class TimeTrainPass:
self.init_pass_by_id(done_fileds[0], self._checkpoint_pass_id)
def max_pass_num_day(self):
"""R
"""
return 24 * 60 / self._interval_per_pass
def save_train_progress(self, day, pass_id, base_key, model_path, is_checkpoint):
"""R
"""
if is_checkpoint:
self._checkpoint_pass_id = pass_id
self._checkpoint_model_path = model_path
......@@ -168,6 +243,12 @@ class TimeTrainPass:
pass
def init_pass_by_id(self, date_str, pass_id):
"""
init pass context with pass_id
Args:
date_str: example "20200110"
pass_id(int): pass_id of date
"""
date_time = make_datetime(date_str)
if pass_id < 1:
pass_id = 0
......@@ -179,14 +260,23 @@ class TimeTrainPass:
print(self._current_train_time)
def init_pass_by_time(self, datetime_str):
"""
init pass context with datetime
Args:
date_str: example "20200110000" -> "%Y%m%d%H%M"
"""
self._current_train_time = make_datetime(datetime_str)
minus = self._current_train_time.hour * 60 + self._current_train_time.minute;
self._pass_id = minus / self._interval_per_pass + 1
def current_pass():
def current_pass(self):
"""R
"""
return self._pass_id
def next(self):
"""R
"""
has_next = True
old_pass_id = self._pass_id
if self._pass_id < 1:
......@@ -202,6 +292,8 @@ class TimeTrainPass:
return has_next
def is_checkpoint_pass(self, pass_id):
"""R
"""
if pass_id < 1:
return True
if pass_id == self.max_pass_num_day():
......@@ -211,10 +303,21 @@ class TimeTrainPass:
return False
def need_dump_inference(self, pass_id):
"""R
"""
return self._inference_pass_id < pass_id and pass_id % self._dump_inference_interval == 0
def date(self, delta_day=0):
"""
get train date
Args:
delta_day(int): n day afer current_train_date
Return:
date(current_train_time + delta_day)
"""
return (self._current_train_time + datetime.timedelta(days=delta_day)).strftime("%Y%m%d")
def timestamp(self, delta_day=0):
"""R
"""
return (self._current_train_time + datetime.timedelta(days=delta_day)).timestamp()
"""
A paddle trainer Adapt to Abacus
"""
import abc
import sys
import copy
import yaml
import time
import json
import datetime
import kagle_trainer
from .. import kagle_fs
from .. import kagle_util
from .. import kagle_model
from .. import kagle_metric
from .. import kagle_dataset
import kagle.kagle_fs
import kagle.kagle_util
import kagle.kagle_model
import kagle.kagle_metric
import kagle.kagle_dataset
import kagle.trainer.kagle_trainer
import paddle.fluid as fluid
from abc import ABCMeta, abstractmethod
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
class AbacusPaddleTrainer(kagle_trainer.Trainer):
"""R
"""
def __init__(self, config):
"""R
"""
kagle_trainer.Trainer.__init__(self, config)
config['output_path'] = kagle_util.get_absolute_path(
config['output_path'], config['io']['afs'])
......@@ -43,6 +50,8 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer):
self.regist_context_processor('end_day', self.end_day)
def init(self, context):
"""R
"""
fleet.init(self._exe)
data_var_list = []
data_var_name_dict = {}
......@@ -77,7 +86,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer):
if not executor['is_update_sparse']:
program._fleet_opt["program_configs"][str(id(model.get_cost_op().block.program))]["push_sparse"] = []
if 'train_thread_num' not in executor:
executor['train_thread_num'] = global_config['train_thread_num']
executor['train_thread_num'] = self.global_config['train_thread_num']
with fluid.scope_guard(scope):
self._exe.run(model._build_param['model']['startup_program'])
model.dump_model_program('./')
......@@ -98,23 +107,29 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer):
pass
def print_log(self, log_str, params):
"""R
"""
params['index'] = fleet.worker_index()
return kagle_util.print_log(log_str, params)
def print_global_metrics(self, scope, model, monitor_data, stdout_str):
"""R
"""
metrics = model.get_metrics()
metric_calculator = kagle_metric.PaddleAUCMetric(None)
for metric in metrics:
metric_param = {'label' : metric, 'metric_dict' : metrics[metric]}
metric_param = {'label': metric, 'metric_dict': metrics[metric]}
metric_calculator.calculate(scope, metric_param)
metric_result = metric_calculator.get_result_to_string()
self.print_log(metric_result, {'master': True, 'stdout' : stdout_str})
self.print_log(metric_result, {'master': True, 'stdout': stdout_str})
monitor_data += metric_result
metric_calculator.clear(scope, metric_param)
def save_model(self, day, pass_index, base_key):
"""R
"""
cost_printer = kagle_util.CostPrinter(kagle_util.print_cost,
{'master': True, 'log_format' : 'save model cost %s sec'})
{'master': True, 'log_format': 'save model cost %s sec'})
model_path = self._path_generator.generate_path('batch_model', {'day': day, 'pass_id': pass_index})
save_mode = 0 # just save all
if pass_index < 1: #batch_model
......@@ -126,27 +141,30 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer):
return model_path
def save_xbox_model(self, day, pass_index, xbox_base_key, monitor_data):
"""R
"""
stdout_str = ""
xbox_patch_id = str(int(time.time()))
kagle_util.rank0_print("begin save delta model")
model_path = ""
xbox_model_donefile = ""
cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, 'log_format' : 'save xbox model cost %s sec', 'stdout' : stdout_str})
cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, \
'log_format': 'save xbox model cost %s sec', 'stdout': stdout_str})
if pass_index < 1:
save_mode = 2
xbox_patch_id = xbox_base_key
model_path = self._path_generator.generate_path('xbox_base', {'day' : day})
xbox_model_donefile = self._path_generator.generate_path('xbox_base_done', {'day' : day})
model_path = self._path_generator.generate_path('xbox_base', {'day': day})
xbox_model_donefile = self._path_generator.generate_path('xbox_base_done', {'day': day})
else:
save_mode = 1
model_path = self._path_generator.generate_path('xbox_delta', {'day' : day, 'pass_id':pass_index})
xbox_model_donefile = self._path_generator.generate_path('xbox_delta_done', {'day' : day})
model_path = self._path_generator.generate_path('xbox_delta', {'day': day, 'pass_id': pass_index})
xbox_model_donefile = self._path_generator.generate_path('xbox_delta_done', {'day': day})
total_save_num = fleet.save_persistables(None, model_path, mode=save_mode)
cost_printer.done()
cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True,
'log_format' : 'save cache model cost %s sec', 'stdout' : stdout_str})
'log_format': 'save cache model cost %s sec', 'stdout': stdout_str})
model_file_handler = kagle_fs.FileHandler(self.global_config['io']['afs'])
if self.global_config['save_cache_model']:
cache_save_num = fleet.save_cache_model(None, model_path, mode=save_mode)
......@@ -161,7 +179,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer):
'save_combine': True
}
cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True,
'log_format' : 'save dense model cost %s sec', 'stdout' : stdout_str})
'log_format': 'save dense model cost %s sec', 'stdout': stdout_str})
for executor in self.global_config['executor']:
if 'layer_for_inference' not in executor:
continue
......@@ -176,17 +194,17 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer):
cost_printer.done()
xbox_done_info = {
"id" : xbox_patch_id,
"key" : xbox_base_key,
"ins_path" : "",
"ins_tag" : "feasign",
"partition_type" : "2",
"record_count" : "111111",
"monitor_data" : monitor_data,
"mpi_size" : str(fleet.worker_num()),
"input" : model_path.rstrip("/") + "/000",
"job_id" : kagle_util.get_env_value("JOB_ID"),
"job_name" : kagle_util.get_env_value("JOB_NAME")
"id": xbox_patch_id,
"key": xbox_base_key,
"ins_path": "",
"ins_tag": "feasign",
"partition_type": "2",
"record_count": "111111",
"monitor_data": monitor_data,
"mpi_size": str(fleet.worker_num()),
"input": model_path.rstrip("/") + "/000",
"job_id": kagle_util.get_env_value("JOB_ID"),
"job_name": kagle_util.get_env_value("JOB_NAME")
}
model_file_handler.write(json.dumps(xbox_done_info) + "\n", xbox_model_donefile, 'a')
if pass_index > 0:
......@@ -194,6 +212,8 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer):
return stdout_str
def run_executor(self, executor_config, dataset, stdout_str):
"""R
"""
day = self._train_pass.date()
pass_id = self._train_pass._pass_id
xbox_base_key = self._train_pass._base_key
......@@ -221,6 +241,8 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer):
stdout_str += self.save_xbox_model(day, pass_id, xbox_base_key, monitor_data)
def startup(self, context):
"""R
"""
if fleet.is_server():
fleet.run_server()
context['status'] = 'wait'
......@@ -239,24 +261,28 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer):
cost_printer.done()
if self.global_config['save_first_base']:
self.print_log("save_first_base=True", {'master': True})
self.print_log("going to save xbox base model", {'master': True, 'stdout' : stdout_str})
self.print_log("going to save xbox base model", {'master': True, 'stdout': stdout_str})
self._train_pass._base_key = int(time.time())
stdout_str += self.save_xbox_model(day, 0, self._train_pass._base_key, "")
stdout_str += self.save_xbox_model(self._train_pass.date(), 0, self._train_pass._base_key, "")
context['status'] = 'begin_day'
def begin_day(self, context):
"""R
"""
stdout_str = ""
if not self._train_pass.next():
context['is_exit'] = True
day = self._train_pass.date()
pass_id = self._train_pass._pass_id
self.print_log("======== BEGIN DAY:%s ========" % day, {'master': True, 'stdout' : stdout_str})
self.print_log("======== BEGIN DAY:%s ========" % day, {'master': True, 'stdout': stdout_str})
if pass_id == self._train_pass.max_pass_num_day():
context['status'] = 'end_day'
else:
context['status'] = 'train_pass'
def end_day(self, context):
"""R
"""
day = self._train_pass.date()
pass_id = self._train_pass._pass_id
xbox_base_key = int(time.time())
......@@ -264,7 +290,7 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer):
kagle_util.rank0_print("shrink table")
cost_printer = kagle_util.CostPrinter(kagle_util.print_cost,
{'master': True, 'log_format' : 'shrink table done, cost %s sec'})
{'master': True, 'log_format': 'shrink table done, cost %s sec'})
fleet.shrink_sparse_table()
for executor in self._exector_context:
self._exector_context[executor]['model'].shrink({
......@@ -281,27 +307,29 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer):
self._train_pass._base_key = xbox_base_key
def train_pass(self, context):
"""R
"""
stdout_str = ""
day = self._train_pass.date()
pass_id = self._train_pass._pass_id
base_key = self._train_pass._base_key
pass_time = self._train_pass._current_train_time.strftime("%Y%m%d%H%M")
self.print_log(" ==== begin delta:%s ========" % pass_id, {'master': True, 'stdout' : stdout_str})
self.print_log(" ==== begin delta:%s ========" % pass_id, {'master': True, 'stdout': stdout_str})
train_begin_time = time.time()
cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, 'log_format' : 'load into memory done, cost %s sec', 'stdout' : stdout_str})
cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, 'log_format': 'load into memory done, cost %s sec', 'stdout': stdout_str})
current_dataset = {}
for name in self._dataset:
current_dataset[name] = self._dataset[name].load_dataset({
'node_num': fleet.worker_num(), 'node_idx': fleet.worker_index(),
'begin_time': pass_time, 'time_window_min': self._train_pass._interval_per_pass
'begin_time': pass_time, 'time_window_min': self._train_pass._interval_per_pass
})
cost_printer.done()
kagle_util.rank0_print("going to global shuffle")
cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {
'master': True, 'stdout' : stdout_str,
'log_format' : 'global shuffle done, cost %s sec'})
'master': True, 'stdout': stdout_str,
'log_format': 'global shuffle done, cost %s sec'})
for name in current_dataset:
current_dataset[name].global_shuffle(fleet, self.global_config['dataset']['shuffle_thread'])
cost_printer.done()
......@@ -313,13 +341,14 @@ class AbacusPaddleTrainer(kagle_trainer.Trainer):
for name in self._dataset:
self._dataset[name].preload_dataset({
'node_num': fleet.worker_num(), 'node_idx': fleet.worker_index(),
'begin_time': next_pass_time, 'time_window_min': self._train_pass._interval_per_pass
'begin_time': next_pass_time, 'time_window_min': self._train_pass._interval_per_pass
})
pure_train_begin = time.time()
for executor in self.global_config['executor']:
self.run_executor(executor, current_dataset[executor['dataset_name']], stdout_str)
cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, {'master': True, 'log_format' : 'release_memory cost %s sec'})
cost_printer = kagle_util.CostPrinter(kagle_util.print_cost, \
{'master': True, 'log_format': 'release_memory cost %s sec'})
for name in current_dataset:
current_dataset[name].release_memory()
pure_train_cost = time.time() - pure_train_begin
......
......@@ -8,7 +8,7 @@ import time
class Trainer(object):
"""R
"""
__metaclass__ = self.ABCMeta
__metaclass__ = abc.ABCMeta
def __init__(self, config):
"""R
"""
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册