提交 2f1f76f3 编写于 作者: C chengmo

simple code

上级 a82671c4
......@@ -18,6 +18,7 @@ Training use fluid with one node only.
from __future__ import print_function
import os
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
......@@ -39,7 +40,7 @@ class ClusterTrainer(TranspileTrainer):
else:
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('startup_pass', self.startup)
if envs.get_platform() == "LINUX":
self.regist_context_processor('train_pass', self.dataset_train)
else:
......@@ -71,9 +72,11 @@ class ClusterTrainer(TranspileTrainer):
def init(self, context):
self.model.train_net()
optimizer = self.model.optimizer()
optimizer_name = envs.get_global_env("hyper_parameters.optimizer")
if optimizer_name in ['adam', 'ADAM', 'Adagrad', 'ADAGRAD']:
os.environ["FLAGS_communicator_is_sgd_optimizer"] = 0
optimizer_name = envs.get_global_env(
"hyper_parameters.optimizer", None, "train.model")
if optimizer_name not in ["", "sgd", "SGD", "Sgd"]:
os.environ["FLAGS_communicator_is_sgd_optimizer"] = '0'
strategy = self.build_strategy()
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(self.model.get_cost_op())
......@@ -89,16 +92,18 @@ class ClusterTrainer(TranspileTrainer):
if metrics:
self.fetch_vars = metrics.values()
self.fetch_alias = metrics.keys()
context['status'] = 'train_pass'
context['status'] = 'startup_pass'
def server(self, context):
fleet.init_server()
fleet.run_server()
context['is_exit'] = True
def dataloader_train(self, context):
def startup(self, context):
self._exe.run(fleet.startup_program)
context['status'] = 'train_pass'
def dataloader_train(self, context):
fleet.init_worker()
reader = self._get_dataloader()
......@@ -144,7 +149,6 @@ class ClusterTrainer(TranspileTrainer):
context['status'] = 'terminal_pass'
def dataset_train(self, context):
self._exe.run(fleet.startup_program)
fleet.init_worker()
dataset = self._get_dataset()
......
......@@ -33,7 +33,7 @@ class SingleTrainer(TranspileTrainer):
def processor_register(self):
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('startup_pass', self.startup)
if envs.get_platform() == "LINUX":
self.regist_context_processor('train_pass', self.dataset_train)
else:
......@@ -55,10 +55,14 @@ class SingleTrainer(TranspileTrainer):
if metrics:
self.fetch_vars = metrics.values()
self.fetch_alias = metrics.keys()
context['status'] = 'startup_pass'
def startup(self, context):
self._exe.run(fluid.default_startup_program())
context['status'] = 'train_pass'
def dataloader_train(self, context):
self._exe.run(fluid.default_startup_program())
self.model.custom_preprocess()
reader = self._get_dataloader()
......
......@@ -18,7 +18,7 @@ Training use fluid with one node only.
"""
from __future__ import print_function
import logging
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
......@@ -26,85 +26,28 @@ from paddle.fluid.incubate.fleet.base.role_maker import PaddleCloudRoleMaker
from fleetrec.core.utils import envs
from fleetrec.core.trainers.transpiler_trainer import TranspileTrainer
special_param = ["TDM_Tree_Travel", "TDM_Tree_Layer", "TDM_Tree_Info"]
class TDMClusterTrainer(TranspileTrainer):
def processor_register(self):
role = PaddleCloudRoleMaker()
fleet.init(role)
if fleet.is_server():
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('server_pass', self.server)
else:
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor(
'trainer_startup_pass', self.trainer_startup)
if envs.get_platform() == "LINUX":
self.regist_context_processor('train_pass', self.dataset_train)
else:
self.regist_context_processor(
'train_pass', self.dataloader_train)
self.regist_context_processor('terminal_pass', self.terminal)
def build_strategy(self):
mode = envs.get_runtime_environ("train.trainer.strategy")
assert mode in ["async", "geo", "sync", "half_async"]
strategy = None
if mode == "async":
strategy = StrategyFactory.create_async_strategy()
elif mode == "geo":
push_num = envs.get_global_env("train.strategy.mode.push_num", 100)
strategy = StrategyFactory.create_geo_strategy(push_num)
elif mode == "sync":
strategy = StrategyFactory.create_sync_strategy()
elif mode == "half_async":
strategy = StrategyFactory.create_half_async_strategy()
assert strategy is not None
self.strategy = strategy
return strategy
def init(self, context):
self.model.train_net()
optimizer = self.model.optimizer()
optimizer_name = envs.get_global_env("hyper_parameters.optimizer")
if optimizer_name in ['adam', 'ADAM', 'Adagrad', 'ADAGRAD']:
os.environ["FLAGS_communicator_is_sgd_optimizer"] = 0
strategy = self.build_strategy()
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(self.model.get_cost_op())
if fleet.is_server():
context['status'] = 'server_pass'
else:
self.fetch_vars = []
self.fetch_alias = []
self.fetch_period = self.model.get_fetch_period()
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
special_param = ["TDM_Tree_Travel", "TDM_Tree_Layer", "TDM_Tree_Info"]
metrics = self.model.get_metrics()
if metrics:
self.fetch_vars = metrics.values()
self.fetch_alias = metrics.keys()
context['status'] = 'trainer_startup_pass'
class TDMClusterTrainer(TranspileTrainer):
def server(self, context):
namespace = "train.startup"
model_path = envs.get_global_env(
"cluster.model_path", "", namespace)
assert not model_path, "Cluster train must has init_model for TDM"
fleet.init_server(model_path)
init_model_path = envs.get_global_env(
"cluster.init_model_path", "", namespace)
assert init_model_path != "", "Cluster train must has init_model for TDM"
fleet.init_server(init_model_path)
logger.info("TDM: load model from {}".format(init_model_path))
fleet.run_server()
context['is_exit'] = True
def trainer_startup(self, context):
def startup(self, context):
self._exe.run(fleet.startup_program)
namespace = "train.startup"
load_tree = envs.get_global_env(
"cluster.load_tree", True, namespace)
......@@ -119,7 +62,6 @@ class TDMClusterTrainer(TranspileTrainer):
"cluster.save_init_model", False, namespace)
init_model_path = envs.get_global_env(
"cluster.init_model_path", "", namespace)
self._exe.run(fluid.default_startup_program())
if load_tree:
# 将明文树结构及数据,set到组网中的Variale中
......@@ -137,74 +79,47 @@ class TDMClusterTrainer(TranspileTrainer):
context['status'] = 'train_pass'
def dataloader_train(self, context):
self._exe.run(fleet.startup_program)
fleet.init_worker()
reader = self._get_dataloader()
epochs = envs.get_global_env("train.epochs")
program = fluid.compiler.CompiledProgram(
fleet.main_program).with_data_parallel(
loss_name=self.model.get_cost_op().name,
build_strategy=self.strategy.get_build_strategy(),
exec_strategy=self.strategy.get_execute_strategy())
metrics_varnames = []
metrics_format = []
metrics_format.append("{}: {{}}".format("epoch"))
metrics_format.append("{}: {{}}".format("batch"))
for name, var in self.model.get_metrics().items():
metrics_varnames.append(var.name)
metrics_format.append("{}: {{}}".format(name))
metrics_format = ", ".join(metrics_format)
for epoch in range(epochs):
reader.start()
batch_id = 0
try:
while True:
metrics_rets = self._exe.run(
program=program,
fetch_list=metrics_varnames)
metrics = [epoch, batch_id]
metrics.extend(metrics_rets)
if batch_id % 10 == 0 and batch_id != 0:
print(metrics_format.format(*metrics))
batch_id += 1
except fluid.core.EOFException:
reader.reset()
fleet.stop_worker()
context['status'] = 'terminal_pass'
def dataset_train(self, context):
self._exe.run(fleet.startup_program)
fleet.init_worker()
dataset = self._get_dataset()
epochs = envs.get_global_env("train.epochs")
for i in range(epochs):
self._exe.train_from_dataset(program=fluid.default_main_program(),
dataset=dataset,
fetch_list=self.fetch_vars,
fetch_info=self.fetch_alias,
print_period=self.fetch_period)
self.save(i, "train", is_fleet=True)
fleet.stop_worker()
context['status'] = 'terminal_pass'
def infer(self, context):
context['status'] = 'terminal_pass'
def terminal(self, context):
for model in self.increment_models:
print("epoch :{}, dir: {}".format(model[0], model[1]))
context['is_exit'] = True
def tdm_prepare(self, param_name):
if param_name == "TDM_Tree_Travel":
travel_array = self.tdm_travel_prepare()
return travel_array
elif param_name == "TDM_Tree_Layer":
layer_array, _ = self.tdm_layer_prepare()
return layer_array
elif param_name == "TDM_Tree_Info":
info_array = self.tdm_info_prepare()
return info_array
else:
raise " {} is not a special tdm param name".format(param_name)
def tdm_travel_prepare(self):
"""load tdm tree param from npy/list file"""
travel_array = np.load(self.tree_travel_path)
logger.info("TDM Tree leaf node nums: {}".format(
travel_array.shape[0]))
return travel_array
def tdm_layer_prepare(self):
"""load tdm tree param from npy/list file"""
layer_list = []
layer_list_flat = []
with open(self.tree_layer_path, 'r') as fin:
for line in fin.readlines():
l = []
layer = (line.split('\n'))[0].split(',')
for node in layer:
if node:
layer_list_flat.append(node)
l.append(node)
layer_list.append(l)
layer_array = np.array(layer_list_flat)
layer_array = layer_array.reshape([-1, 1])
logger.info("TDM Tree max layer: {}".format(len(layer_list)))
logger.info("TDM Tree layer_node_num_list: {}".format(
[len(i) for i in layer_list]))
return layer_array, layer_list
def tdm_info_prepare(self):
"""load tdm tree param from list file"""
info_array = np.load(self.tree_info_path)
return info_array
......@@ -34,34 +34,6 @@ special_param = ["TDM_Tree_Travel", "TDM_Tree_Layer",
class TDMSingleTrainer(SingleTrainer):
def processor_register(self):
self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init)
self.regist_context_processor('startup_pass', self.startup)
if envs.get_platform() == "LINUX":
self.regist_context_processor('train_pass', self.dataset_train)
else:
self.regist_context_processor('train_pass', self.dataloader_train)
self.regist_context_processor('infer_pass', self.infer)
self.regist_context_processor('terminal_pass', self.terminal)
def init(self, context):
self.model.train_net()
optimizer = self.model.optimizer()
optimizer.minimize((self.model.get_cost_op()))
self.fetch_vars = []
self.fetch_alias = []
self.fetch_period = self.model.get_fetch_period()
metrics = self.model.get_metrics()
if metrics:
self.fetch_vars = metrics.values()
self.fetch_alias = metrics.keys()
context['status'] = 'startup_pass'
def startup(self, context):
namespace = "train.startup"
load_persistables = envs.get_global_env(
......@@ -114,67 +86,6 @@ class TDMSingleTrainer(SingleTrainer):
context['status'] = 'train_pass'
def dataloader_train(self, context):
reader = self._get_dataloader()
epochs = envs.get_global_env("train.epochs")
program = fluid.compiler.CompiledProgram(
fluid.default_main_program()).with_data_parallel(
loss_name=self.model.get_cost_op().name)
metrics_varnames = []
metrics_format = []
metrics_format.append("{}: {{}}".format("epoch"))
metrics_format.append("{}: {{}}".format("batch"))
for name, var in self.model.get_metrics().items():
metrics_varnames.append(var.name)
metrics_format.append("{}: {{}}".format(name))
metrics_format = ", ".join(metrics_format)
for epoch in range(epochs):
reader.start()
batch_id = 0
try:
while True:
metrics_rets = self._exe.run(
program=program,
fetch_list=metrics_varnames)
metrics = [epoch, batch_id]
metrics.extend(metrics_rets)
if batch_id % 10 == 0 and batch_id != 0:
print(metrics_format.format(*metrics))
batch_id += 1
except fluid.core.EOFException:
reader.reset()
context['status'] = 'infer_pass'
def dataset_train(self, context):
dataset = self._get_dataset()
epochs = envs.get_global_env("train.epochs")
for i in range(epochs):
self._exe.train_from_dataset(program=fluid.default_main_program(),
dataset=dataset,
fetch_list=self.fetch_vars,
fetch_info=self.fetch_alias,
print_period=self.fetch_period)
self.save(i, "train", is_fleet=False)
context['status'] = 'infer_pass'
def infer(self, context):
context['status'] = 'terminal_pass'
def terminal(self, context):
for model in self.increment_models:
print("epoch :{}, dir: {}".format(model[0], model[1]))
context['is_exit'] = True
def tdm_prepare(self, param_name):
if param_name == "TDM_Tree_Travel":
travel_array = self.tdm_travel_prepare()
......
......@@ -10,6 +10,7 @@ from fleetrec.core.utils import util
engines = {}
device = ["CPU", "GPU"]
clusters = ["SINGLE", "LOCAL_CLUSTER", "CLUSTER"]
custom_model = ['tdm']
def engine_registry():
......@@ -31,9 +32,12 @@ def engine_registry():
engines["GPU"] = gpu
def get_engine(engine, device):
def get_engine(args):
device = args.device
d_engine = engines[device]
transpiler = get_transpiler()
engine = get_custom_model_engine(args)
run_engine = d_engine[transpiler].get(engine, None)
if run_engine is None:
......@@ -42,6 +46,16 @@ def get_engine(engine, device):
return run_engine
def get_custom_model_engine(args):
model = args.model
model_name = model.split('.')[1]
if model_name in custom_model:
engine = "_".join((model_name.upper(), args.engine))
else:
engine = args.engine
return engine
def get_transpiler():
FNULL = open(os.devnull, 'w')
cmd = ["python", "-c",
......@@ -81,30 +95,23 @@ def set_runtime_envs(cluster_envs, engine_yaml):
print(envs.pretty_print_envs(need_print, ("Runtime Envs", "Value")))
def single_engine(args):
print("use single engine to run model: {}".format(args.model))
single_envs = {}
single_envs["train.trainer.trainer"] = "SingleTrainer"
single_envs["train.trainer.threads"] = "2"
single_envs["train.trainer.engine"] = "single"
single_envs["train.trainer.device"] = args.device
single_envs["train.trainer.platform"] = envs.get_platform()
set_runtime_envs(single_envs, args.model)
trainer = TrainerFactory.create(args.model)
return trainer
def get_trainer_prefix(args):
model = args.model
model_name = model.split('.')[1]
if model_name in custom_model:
return model_name.upper()
return ""
def tdm_single_engine(args):
print("use tdm single engine to run model: {}".format(args.model))
def single_engine(args):
trainer = get_trainer_prefix(args) + "SingleTrainer"
single_envs = {}
single_envs["train.trainer.trainer"] = "TDMSingleTrainer"
single_envs["train.trainer.trainer"] = trainer
single_envs["train.trainer.threads"] = "2"
single_envs["train.trainer.engine"] = "single"
single_envs["train.trainer.device"] = args.device
single_envs["train.trainer.platform"] = envs.get_platform()
print("use {} engine to run model: {}".format(trainer, args.model))
set_runtime_envs(single_envs, args.model)
trainer = TrainerFactory.create(args.model)
......@@ -112,31 +119,15 @@ def tdm_single_engine(args):
def cluster_engine(args):
print("launch cluster engine with cluster to run model: {}".format(args.model))
cluster_envs = {}
cluster_envs["train.trainer.trainer"] = "ClusterTrainer"
cluster_envs["train.trainer.engine"] = "cluster"
cluster_envs["train.trainer.device"] = args.device
cluster_envs["train.trainer.platform"] = envs.get_platform()
set_runtime_envs(cluster_envs, args.model)
trainer = TrainerFactory.create(args.model)
return trainer
def tdm_cluster_engine(args):
print("launch tdm cluster engine with cluster to run model: {}".format(args.model))
trainer = get_trainer_prefix(args) + "ClusterTrainer"
cluster_envs = {}
cluster_envs["train.trainer.trainer"] = "TDMClusterTrainer"
cluster_envs["train.trainer.trainer"] = trainer
cluster_envs["train.trainer.engine"] = "cluster"
cluster_envs["train.trainer.device"] = args.device
cluster_envs["train.trainer.platform"] = envs.get_platform()
print("launch {} engine with cluster to run model: {}".format(trainer, args.model))
set_runtime_envs(cluster_envs, args.model)
trainer = TrainerFactory.create(args.model)
return trainer
......@@ -156,40 +147,15 @@ def cluster_mpi_engine(args):
def local_cluster_engine(args):
print("launch cluster engine with cluster to run model: {}".format(args.model))
from fleetrec.core.engine.local_cluster_engine import LocalClusterEngine
cluster_envs = {}
cluster_envs["server_num"] = 1
cluster_envs["worker_num"] = 1
cluster_envs["start_port"] = 36001
cluster_envs["log_dir"] = "logs"
cluster_envs["train.trainer.trainer"] = "ClusterTrainer"
cluster_envs["train.trainer.strategy"] = "async"
cluster_envs["train.trainer.threads"] = "2"
cluster_envs["train.trainer.engine"] = "local_cluster"
cluster_envs["train.trainer.device"] = args.device
cluster_envs["train.trainer.platform"] = envs.get_platform()
cluster_envs["CPU_NUM"] = "2"
set_runtime_envs(cluster_envs, args.model)
launch = LocalClusterEngine(cluster_envs, args.model)
return launch
def tdm_local_cluster_engine(args):
print("launch tdm cluster engine with cluster to run model: {}".format(args.model))
from fleetrec.core.engine.local_cluster_engine import LocalClusterEngine
trainer = get_trainer_prefix(args) + "ClusterTrainer"
cluster_envs = {}
cluster_envs["server_num"] = 1
cluster_envs["worker_num"] = 1
cluster_envs["start_port"] = 36001
cluster_envs["log_dir"] = "logs"
cluster_envs["train.trainer.trainer"] = "TDMClusterTrainer"
cluster_envs["train.trainer.trainer"] = trainer
cluster_envs["train.trainer.strategy"] = "async"
cluster_envs["train.trainer.threads"] = "2"
cluster_envs["train.trainer.engine"] = "local_cluster"
......@@ -198,9 +164,9 @@ def tdm_local_cluster_engine(args):
cluster_envs["train.trainer.platform"] = envs.get_platform()
cluster_envs["CPU_NUM"] = "2"
print("launch {} engine with cluster to run model: {}".format(trainer, args.model))
set_runtime_envs(cluster_envs, args.model)
launch = LocalClusterEngine(cluster_envs, args.model)
return launch
......@@ -258,7 +224,7 @@ if __name__ == "__main__":
args.model = get_abs_model(args.model)
engine_registry()
which_engine = get_engine(args.engine, args.device)
which_engine = get_engine(args)
engine = which_engine(args)
engine.run()
......@@ -46,22 +46,22 @@ train:
startup:
single:
# 建议tree只load一次,保存为paddle tensor,之后从paddle模型热启
load_persistables: False
persistables_model_path: ""
tree:
# 单机训练建议tree只load一次,保存为paddle tensor,之后从paddle模型热启
# 分布式训练trainer需要独立load
load_tree: True
tree_layer_path: "{workspace}/tree/layer_list.txt"
tree_travel_path: "{workspace}/tree/travel_list.npy"
tree_info_path: "{workspace}/tree/tree_info.npy"
tree_emb_path: "{workspace}/tree/tree_emb.npy"
single:
load_persistables: False
persistables_model_path: ""
save_init_model: True
init_model_path: ""
init_model_path: "{workspace}/init_model"
cluster:
load_persistables: True
persistables_model_path: ""
init_model_path: "{workspace}/init_model"
save:
increment:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册