未验证 提交 2e42f73d 编写于 作者: D Dong Daxiang 提交者: GitHub

Merge pull request #7 from fuyinno4/master

remove nouse trainer
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import numpy as np
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
from paddle.fluid.incubate.fleet.base.role_maker import MPISymetricRoleMaker
from paddlerec.core.utils import envs
from paddlerec.core.trainer import Trainer
class CtrTrainer(Trainer):
"""R
"""
def __init__(self, config):
"""R
"""
Trainer.__init__(self, config)
self.global_config = config
self._metrics = {}
self.processor_register()
def processor_register(self):
role = MPISymetricRoleMaker()
fleet.init(role)
if fleet.is_server():
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('server_pass', self.server)
else:
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('train_pass', self.train)
self.regist_context_processor('terminal_pass', self.terminal)
def _get_dataset(self):
namespace = "train.reader"
inputs = self.model.get_inputs()
threads = envs.get_global_env("train.threads", None)
batch_size = envs.get_global_env("batch_size", None, namespace)
reader_class = envs.get_global_env("class", None, namespace)
abs_dir = os.path.dirname(os.path.abspath(__file__))
reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py')
pipe_cmd = "python {} {} {} {}".format(reader, reader_class, "TRAIN", self._config_yaml)
train_data_path = envs.get_global_env("train_data_path", None, namespace)
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_use_var(inputs)
dataset.set_pipe_command(pipe_cmd)
dataset.set_batch_size(batch_size)
dataset.set_thread(threads)
file_list = [
os.path.join(train_data_path, x)
for x in os.listdir(train_data_path)
]
dataset.set_filelist(file_list)
return dataset
def instance(self, context):
models = envs.get_global_env("train.model.models")
model_class = envs.lazy_instance_by_fliename(models, "Model")
self.model = model_class(None)
context['status'] = 'init_pass'
def init(self, context):
"""R
"""
self.model.train_net()
optimizer = self.model.optimizer()
optimizer = fleet.distributed_optimizer(optimizer, strategy={"use_cvm": False})
optimizer.minimize(self.model.get_avg_cost())
if fleet.is_server():
context['status'] = 'server_pass'
else:
self.fetch_vars = []
self.fetch_alias = []
self.fetch_period = self.model.get_fetch_period()
metrics = self.model.get_metrics()
if metrics:
self.fetch_vars = metrics.values()
self.fetch_alias = metrics.keys()
context['status'] = 'train_pass'
def server(self, context):
fleet.run_server()
fleet.stop_worker()
context['is_exit'] = True
def train(self, context):
self._exe.run(fluid.default_startup_program())
fleet.init_worker()
dataset = self._get_dataset()
shuf = np.array([fleet.worker_index()])
gs = shuf * 0
fleet._role_maker._node_type_comm.Allreduce(shuf, gs)
print("trainer id: {}, trainers: {}, gs: {}".format(fleet.worker_index(), fleet.worker_num(), gs))
epochs = envs.get_global_env("train.epochs")
for i in range(epochs):
self._exe.train_from_dataset(program=fluid.default_main_program(),
dataset=dataset,
fetch_list=self.fetch_vars,
fetch_info=self.fetch_alias,
print_period=self.fetch_period)
context['status'] = 'terminal_pass'
fleet.stop_worker()
def terminal(self, context):
print("terminal ended.")
context['is_exit'] = True
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import json
import sys
import time
import numpy as np
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker
from paddlerec.core.utils import fs as fs
from paddlerec.core.utils import util as util
from paddlerec.core.metrics.auc_metrics import AUCMetric
from paddlerec.core.modules.modul import build as model_basic
from paddlerec.core.utils import dataset
from paddlerec.core.trainer import Trainer
def wroker_numric_opt(value, env, opt):
"""
numric count opt for workers
Args:
value: value for count
env: mpi/gloo
opt: count operator, SUM/MAX/MIN/AVG
Return:
count result
"""
local_value = np.array([value])
global_value = np.copy(local_value) * 0
fleet._role_maker.all_reduce_worker(local_value, global_value, opt)
return global_value[0]
def worker_numric_sum(value, env="mpi"):
"""R
"""
return wroker_numric_opt(value, env, "sum")
def worker_numric_avg(value, env="mpi"):
"""R
"""
return worker_numric_sum(value, env) / fleet.worker_num()
def worker_numric_min(value, env="mpi"):
"""R
"""
return wroker_numric_opt(value, env, "min")
def worker_numric_max(value, env="mpi"):
"""R
"""
return wroker_numric_opt(value, env, "max")
class CtrTrainer(Trainer):
"""R
"""
def __init__(self, config):
"""R
"""
Trainer.__init__(self, config)
config['output_path'] = util.get_absolute_path(
config['output_path'], config['io']['afs'])
self.global_config = config
self._metrics = {}
self._path_generator = util.PathGenerator({
'templates': [
{'name': 'xbox_base_done', 'template': config['output_path'] + '/xbox_base_done.txt'},
{'name': 'xbox_delta_done', 'template': config['output_path'] + '/xbox_patch_done.txt'},
{'name': 'xbox_base', 'template': config['output_path'] + '/xbox/{day}/base/'},
{'name': 'xbox_delta', 'template': config['output_path'] + '/xbox/{day}/delta-{pass_id}/'},
{'name': 'batch_model', 'template': config['output_path'] + '/batch_model/{day}/{pass_id}/'}
]
})
if 'path_generator' in config:
self._path_generator.add_path_template(config['path_generator'])
self.regist_context_processor('uninit', self.init)
self.regist_context_processor('startup', self.startup)
self.regist_context_processor('begin_day', self.begin_day)
self.regist_context_processor('train_pass', self.train_pass)
self.regist_context_processor('end_day', self.end_day)
def init(self, context):
"""R
"""
role_maker = None
if self.global_config.get('process_mode', 'mpi') == 'brilliant_cpu':
afs_config = self.global_config['io']['afs']
role_maker = GeneralRoleMaker(
hdfs_name=afs_config['fs_name'], hdfs_ugi=afs_config['fs_ugi'],
path=self.global_config['output_path'] + "/gloo",
init_timeout_seconds=1200, run_timeout_seconds=1200)
fleet.init(role_maker)
data_var_list = []
data_var_name_dict = {}
runnnable_scope = []
runnnable_cost_op = []
context['status'] = 'startup'
for executor in self.global_config['executor']:
scope = fluid.Scope()
self._exector_context[executor['name']] = {}
self._exector_context[executor['name']]['scope'] = scope
self._exector_context[executor['name']]['model'] = model_basic.create(executor)
model = self._exector_context[executor['name']]['model']
self._metrics.update(model.get_metrics())
runnnable_scope.append(scope)
runnnable_cost_op.append(model.get_avg_cost())
for var in model._data_var:
if var.name in data_var_name_dict:
continue
data_var_list.append(var)
data_var_name_dict[var.name] = var
optimizer = model_basic.YamlModel.build_optimizer({
'metrics': self._metrics,
'optimizer_conf': self.global_config['optimizer']
})
optimizer.minimize(runnnable_cost_op, runnnable_scope)
for executor in self.global_config['executor']:
scope = self._exector_context[executor['name']]['scope']
model = self._exector_context[executor['name']]['model']
program = model._build_param['model']['train_program']
if not executor['is_update_sparse']:
program._fleet_opt["program_configs"][str(id(model.get_avg_cost().block.program))]["push_sparse"] = []
if 'train_thread_num' not in executor:
executor['train_thread_num'] = self.global_config['train_thread_num']
with fluid.scope_guard(scope):
self._exe.run(model._build_param['model']['startup_program'])
model.dump_model_program('./')
# server init done
if fleet.is_server():
return 0
self._dataset = {}
for dataset_item in self.global_config['dataset']['data_list']:
dataset_item['data_vars'] = data_var_list
dataset_item.update(self.global_config['io']['afs'])
dataset_item["batch_size"] = self.global_config['batch_size']
self._dataset[dataset_item['name']] = dataset.FluidTimeSplitDataset(dataset_item)
# if config.need_reqi_changeslot and config.reqi_dnn_plugin_day >= last_day and config.reqi_dnn_plugin_pass >= last_pass:
# util.reqi_changeslot(config.hdfs_dnn_plugin_path, join_save_params, common_save_params, update_save_params, scope2, scope3)
fleet.init_worker()
pass
def print_log(self, log_str, params):
"""R
"""
params['index'] = fleet.worker_index()
if params['master']:
if fleet.worker_index() == 0:
print(log_str)
sys.stdout.flush()
else:
print(log_str)
if 'stdout' in params:
params['stdout'] += str(datetime.datetime.now()) + log_str
def print_global_metrics(self, scope, model, monitor_data, stdout_str):
"""R
"""
metrics = model.get_metrics()
metric_calculator = AUCMetric(None)
for metric in metrics:
metric_param = {'label': metric, 'metric_dict': metrics[metric]}
metric_calculator.calculate(scope, metric_param)
metric_result = metric_calculator.get_result_to_string()
self.print_log(metric_result, {'master': True, 'stdout': stdout_str})
monitor_data += metric_result
metric_calculator.clear(scope, metric_param)
def save_model(self, day, pass_index, base_key):
"""R
"""
cost_printer = util.CostPrinter(util.print_cost,
{'master': True, 'log_format': 'save model cost %s sec'})
model_path = self._path_generator.generate_path('batch_model', {'day': day, 'pass_id': pass_index})
save_mode = 0 # just save all
if pass_index < 1: # batch_model
save_mode = 3 # unseen_day++, save all
util.rank0_print("going to save_model %s" % model_path)
fleet.save_persistables(None, model_path, mode=save_mode)
if fleet._role_maker.is_first_worker():
self._train_pass.save_train_progress(day, pass_index, base_key, model_path, is_checkpoint=True)
cost_printer.done()
return model_path
def save_xbox_model(self, day, pass_index, xbox_base_key, monitor_data):
"""R
"""
stdout_str = ""
xbox_patch_id = str(int(time.time()))
util.rank0_print("begin save delta model")
model_path = ""
xbox_model_donefile = ""
cost_printer = util.CostPrinter(util.print_cost, {'master': True, \
'log_format': 'save xbox model cost %s sec',
'stdout': stdout_str})
if pass_index < 1:
save_mode = 2
xbox_patch_id = xbox_base_key
model_path = self._path_generator.generate_path('xbox_base', {'day': day})
xbox_model_donefile = self._path_generator.generate_path('xbox_base_done', {'day': day})
else:
save_mode = 1
model_path = self._path_generator.generate_path('xbox_delta', {'day': day, 'pass_id': pass_index})
xbox_model_donefile = self._path_generator.generate_path('xbox_delta_done', {'day': day})
total_save_num = fleet.save_persistables(None, model_path, mode=save_mode)
cost_printer.done()
cost_printer = util.CostPrinter(util.print_cost, {'master': True,
'log_format': 'save cache model cost %s sec',
'stdout': stdout_str})
model_file_handler = fs.FileHandler(self.global_config['io']['afs'])
if self.global_config['save_cache_model']:
cache_save_num = fleet.save_cache_model(None, model_path, mode=save_mode)
model_file_handler.write(
"file_prefix:part\npart_num:16\nkey_num:%d\n" % cache_save_num,
model_path + '/000_cache/sparse_cache.meta', 'w')
cost_printer.done()
util.rank0_print("save xbox cache model done, key_num=%s" % cache_save_num)
save_env_param = {
'executor': self._exe,
'save_combine': True
}
cost_printer = util.CostPrinter(util.print_cost, {'master': True,
'log_format': 'save dense model cost %s sec',
'stdout': stdout_str})
if fleet._role_maker.is_first_worker():
for executor in self.global_config['executor']:
if 'layer_for_inference' not in executor:
continue
executor_name = executor['name']
model = self._exector_context[executor_name]['model']
save_env_param['inference_list'] = executor['layer_for_inference']
save_env_param['scope'] = self._exector_context[executor_name]['scope']
model.dump_inference_param(save_env_param)
for dnn_layer in executor['layer_for_inference']:
model_file_handler.cp(dnn_layer['save_file_name'],
model_path + '/dnn_plugin/' + dnn_layer['save_file_name'])
fleet._role_maker._barrier_worker()
cost_printer.done()
xbox_done_info = {
"id": xbox_patch_id,
"key": xbox_base_key,
"ins_path": "",
"ins_tag": "feasign",
"partition_type": "2",
"record_count": "111111",
"monitor_data": monitor_data,
"mpi_size": str(fleet.worker_num()),
"input": model_path.rstrip("/") + "/000",
"job_id": util.get_env_value("JOB_ID"),
"job_name": util.get_env_value("JOB_NAME")
}
if fleet._role_maker.is_first_worker():
model_file_handler.write(json.dumps(xbox_done_info) + "\n", xbox_model_donefile, 'a')
if pass_index > 0:
self._train_pass.save_train_progress(day, pass_index, xbox_base_key, model_path, is_checkpoint=False)
fleet._role_maker._barrier_worker()
return stdout_str
def run_executor(self, executor_config, dataset, stdout_str):
"""R
"""
day = self._train_pass.date()
pass_id = self._train_pass._pass_id
xbox_base_key = self._train_pass._base_key
executor_name = executor_config['name']
scope = self._exector_context[executor_name]['scope']
model = self._exector_context[executor_name]['model']
with fluid.scope_guard(scope):
util.rank0_print("Begin " + executor_name + " pass")
begin = time.time()
program = model._build_param['model']['train_program']
self._exe.train_from_dataset(program, dataset, scope,
thread=executor_config['train_thread_num'], debug=self.global_config['debug'])
end = time.time()
local_cost = (end - begin) / 60.0
avg_cost = worker_numric_avg(local_cost)
min_cost = worker_numric_min(local_cost)
max_cost = worker_numric_max(local_cost)
util.rank0_print("avg train time %s mins, min %s mins, max %s mins" % (avg_cost, min_cost, max_cost))
self._exector_context[executor_name]['cost'] = max_cost
monitor_data = ""
self.print_global_metrics(scope, model, monitor_data, stdout_str)
util.rank0_print("End " + executor_name + " pass")
if self._train_pass.need_dump_inference(pass_id) and executor_config['dump_inference_model']:
stdout_str += self.save_xbox_model(day, pass_id, xbox_base_key, monitor_data)
fleet._role_maker._barrier_worker()
def startup(self, context):
"""R
"""
if fleet.is_server():
fleet.run_server()
context['status'] = 'wait'
return
stdout_str = ""
self._train_pass = util.TimeTrainPass(self.global_config)
if not self.global_config['cold_start']:
cost_printer = util.CostPrinter(util.print_cost,
{'master': True, 'log_format': 'load model cost %s sec',
'stdout': stdout_str})
self.print_log("going to load model %s" % self._train_pass._checkpoint_model_path, {'master': True})
# if config.need_reqi_changeslot and config.reqi_dnn_plugin_day >= self._train_pass.date()
# and config.reqi_dnn_plugin_pass >= self._pass_id:
# fleet.load_one_table(0, self._train_pass._checkpoint_model_path)
# else:
fleet.init_server(self._train_pass._checkpoint_model_path, mode=0)
cost_printer.done()
if self.global_config['save_first_base']:
self.print_log("save_first_base=True", {'master': True})
self.print_log("going to save xbox base model", {'master': True, 'stdout': stdout_str})
self._train_pass._base_key = int(time.time())
stdout_str += self.save_xbox_model(self._train_pass.date(), 0, self._train_pass._base_key, "")
context['status'] = 'begin_day'
def begin_day(self, context):
"""R
"""
stdout_str = ""
if not self._train_pass.next():
context['is_exit'] = True
day = self._train_pass.date()
pass_id = self._train_pass._pass_id
self.print_log("======== BEGIN DAY:%s ========" % day, {'master': True, 'stdout': stdout_str})
if pass_id == self._train_pass.max_pass_num_day():
context['status'] = 'end_day'
else:
context['status'] = 'train_pass'
def end_day(self, context):
"""R
"""
day = self._train_pass.date()
pass_id = self._train_pass._pass_id
xbox_base_key = int(time.time())
context['status'] = 'begin_day'
util.rank0_print("shrink table")
cost_printer = util.CostPrinter(util.print_cost,
{'master': True, 'log_format': 'shrink table done, cost %s sec'})
fleet.shrink_sparse_table()
for executor in self._exector_context:
self._exector_context[executor]['model'].shrink({
'scope': self._exector_context[executor]['scope'],
'decay': self.global_config['optimizer']['dense_decay_rate']
})
cost_printer.done()
next_date = self._train_pass.date(delta_day=1)
util.rank0_print("going to save xbox base model")
self.save_xbox_model(next_date, 0, xbox_base_key, "")
util.rank0_print("going to save batch model")
self.save_model(next_date, 0, xbox_base_key)
self._train_pass._base_key = xbox_base_key
fleet._role_maker._barrier_worker()
def train_pass(self, context):
"""R
"""
stdout_str = ""
day = self._train_pass.date()
pass_id = self._train_pass._pass_id
base_key = self._train_pass._base_key
pass_time = self._train_pass._current_train_time.strftime("%Y%m%d%H%M")
self.print_log(" ==== begin delta:%s ========" % pass_id, {'master': True, 'stdout': stdout_str})
train_begin_time = time.time()
cost_printer = util.CostPrinter(util.print_cost, \
{'master': True, 'log_format': 'load into memory done, cost %s sec',
'stdout': stdout_str})
current_dataset = {}
for name in self._dataset:
current_dataset[name] = self._dataset[name].load_dataset({
'node_num': fleet.worker_num(), 'node_idx': fleet.worker_index(),
'begin_time': pass_time, 'time_window_min': self._train_pass._interval_per_pass
})
fleet._role_maker._barrier_worker()
cost_printer.done()
util.rank0_print("going to global shuffle")
cost_printer = util.CostPrinter(util.print_cost, {
'master': True, 'stdout': stdout_str,
'log_format': 'global shuffle done, cost %s sec'})
for name in current_dataset:
current_dataset[name].global_shuffle(fleet, self.global_config['dataset']['shuffle_thread'])
cost_printer.done()
# str(dataset.get_shuffle_data_size(fleet))
fleet._role_maker._barrier_worker()
if self.global_config['prefetch_data']:
next_pass_time = (self._train_pass._current_train_time +
datetime.timedelta(minutes=self._train_pass._interval_per_pass)).strftime("%Y%m%d%H%M")
for name in self._dataset:
self._dataset[name].preload_dataset({
'node_num': fleet.worker_num(), 'node_idx': fleet.worker_index(),
'begin_time': next_pass_time, 'time_window_min': self._train_pass._interval_per_pass
})
fleet._role_maker._barrier_worker()
pure_train_begin = time.time()
for executor in self.global_config['executor']:
self.run_executor(executor, current_dataset[executor['dataset_name']], stdout_str)
cost_printer = util.CostPrinter(util.print_cost, \
{'master': True, 'log_format': 'release_memory cost %s sec'})
for name in current_dataset:
current_dataset[name].release_memory()
pure_train_cost = time.time() - pure_train_begin
if self._train_pass.is_checkpoint_pass(pass_id):
self.save_model(day, pass_id, base_key)
train_end_time = time.time()
train_cost = train_end_time - train_begin_time
other_cost = train_cost - pure_train_cost
log_str = "finished train day %s pass %s time cost:%s sec job time cost:" % (day, pass_id, train_cost)
for executor in self._exector_context:
log_str += '[' + executor + ':' + str(self._exector_context[executor]['cost']) + ']'
log_str += '[other_cost:' + str(other_cost) + ']'
util.rank0_print(log_str)
stdout_str += util.now_time_str() + log_str
sys.stdout.write(stdout_str)
fleet._role_maker._barrier_worker()
stdout_str = ""
if pass_id == self._train_pass.max_pass_num_day():
context['status'] = 'end_day'
return
elif not self._train_pass.next():
context['is_exit'] = True
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册