提交 af3dad94 编写于 作者: T tangwei

code fix

上级 47720eac
...@@ -85,7 +85,7 @@ class Model(object): ...@@ -85,7 +85,7 @@ class Model(object):
self._cost = None self._cost = None
self._metrics = {} self._metrics = {}
self._data_var = [] self._data_var = []
pass self._fetch_interval = 10
def get_cost_op(self): def get_cost_op(self):
"""R """R
...@@ -97,6 +97,9 @@ class Model(object): ...@@ -97,6 +97,9 @@ class Model(object):
""" """
return self._metrics return self._metrics
def get_fetch_period(self):
return self._fetch_interval
@abc.abstractmethod @abc.abstractmethod
def shrink(self, params): def shrink(self, params):
"""R """R
...@@ -169,6 +172,7 @@ class Model(object): ...@@ -169,6 +172,7 @@ class Model(object):
class YamlModel(Model): class YamlModel(Model):
"""R """R
""" """
def __init__(self, config): def __init__(self, config):
"""R """R
""" """
...@@ -218,7 +222,7 @@ class YamlModel(Model): ...@@ -218,7 +222,7 @@ class YamlModel(Model):
inference_param = extend_output['inference_param'] inference_param = extend_output['inference_param']
param_name = inference_param['name'] param_name = inference_param['name']
if param_name not in self._build_param['table']: if param_name not in self._build_param['table']:
self._build_param['table'][param_name] = {'params' :[]} self._build_param['table'][param_name] = {'params': []}
table_meta = table.TableMeta.alloc_new_table(inference_param['table_id']) table_meta = table.TableMeta.alloc_new_table(inference_param['table_id'])
self._build_param['table'][param_name]['_meta'] = table_meta self._build_param['table'][param_name]['_meta'] = table_meta
self._build_param['table'][param_name]['params'] += inference_param['params'] self._build_param['table'][param_name]['params'] += inference_param['params']
......
...@@ -16,24 +16,17 @@ import math ...@@ -16,24 +16,17 @@ import math
import paddle.fluid as fluid import paddle.fluid as fluid
from fleetrec.utils import envs from fleetrec.utils import envs
from fleetrec.models.base import Model
class Train(object): class Train(Model):
def __init__(self, config):
def __init__(self): super().__init__(config)
self.sparse_inputs = []
self.dense_input = None
self.label_input = None
self.sparse_input_varnames = []
self.dense_input_varname = None
self.label_input_varname = None
self.namespace = "train.model" self.namespace = "train.model"
def input(self): def input(self):
def sparse_inputs(): def sparse_inputs():
ids = envs.get_global_env("hyper_parameters.sparse_inputs_slots", None ,self.namespace) ids = envs.get_global_env("hyper_parameters.sparse_inputs_slots", None, self.namespace)
sparse_input_ids = [ sparse_input_ids = [
fluid.layers.data(name="C" + str(i), fluid.layers.data(name="C" + str(i),
...@@ -44,7 +37,7 @@ class Train(object): ...@@ -44,7 +37,7 @@ class Train(object):
return sparse_input_ids, [var.name for var in sparse_input_ids] return sparse_input_ids, [var.name for var in sparse_input_ids]
def dense_input(): def dense_input():
dim = envs.get_global_env("hyper_parameters.dense_input_dim", None ,self.namespace) dim = envs.get_global_env("hyper_parameters.dense_input_dim", None, self.namespace)
dense_input_var = fluid.layers.data(name="dense_input", dense_input_var = fluid.layers.data(name="dense_input",
shape=[dim], shape=[dim],
...@@ -65,10 +58,10 @@ class Train(object): ...@@ -65,10 +58,10 @@ class Train(object):
def input_varnames(self): def input_varnames(self):
return [input.name for input in self.input_vars()] return [input.name for input in self.input_vars()]
def net(self): def build_model(self):
def embedding_layer(input): def embedding_layer(input):
sparse_feature_number = envs.get_global_env("hyper_parameters.sparse_feature_number", None ,self.namespace) sparse_feature_number = envs.get_global_env("hyper_parameters.sparse_feature_number", None, self.namespace)
sparse_feature_dim = envs.get_global_env("hyper_parameters.sparse_feature_dim", None ,self.namespace) sparse_feature_dim = envs.get_global_env("hyper_parameters.sparse_feature_dim", None, self.namespace)
emb = fluid.layers.embedding( emb = fluid.layers.embedding(
input=input, input=input,
...@@ -94,7 +87,7 @@ class Train(object): ...@@ -94,7 +87,7 @@ class Train(object):
concated = fluid.layers.concat(sparse_embed_seq + [self.dense_input], axis=1) concated = fluid.layers.concat(sparse_embed_seq + [self.dense_input], axis=1)
fcs = [concated] fcs = [concated]
hidden_layers = envs.get_global_env("hyper_parameters.fc_sizes", None ,self.namespace) hidden_layers = envs.get_global_env("hyper_parameters.fc_sizes", None, self.namespace)
for size in hidden_layers: for size in hidden_layers:
fcs.append(fc(fcs[-1], size)) fcs.append(fc(fcs[-1], size))
...@@ -111,30 +104,34 @@ class Train(object): ...@@ -111,30 +104,34 @@ class Train(object):
def avg_loss(self): def avg_loss(self):
cost = fluid.layers.cross_entropy(input=self.predict, label=self.label_input) cost = fluid.layers.cross_entropy(input=self.predict, label=self.label_input)
avg_cost = fluid.layers.reduce_sum(cost) avg_cost = fluid.layers.reduce_mean(cost)
self.loss = avg_cost self._cost = avg_cost
return avg_cost
def metrics(self): def metrics(self):
auc, batch_auc, _ = fluid.layers.auc(input=self.predict, auc, batch_auc, _ = fluid.layers.auc(input=self.predict,
label=self.label_input, label=self.label_input,
num_thresholds=2 ** 12, num_thresholds=2 ** 12,
slide_steps=20) slide_steps=20)
self.metrics = (auc, batch_auc) self._metrics["AUC"] = auc
self._metrics["BATCH_AUC"] = batch_auc
return self.metrics
def metric_extras(self):
self.metric_vars = [self.metrics[0]]
self.metric_alias = ["AUC"]
self.fetch_interval_batchs = 10
return (self.metric_vars, self.metric_alias, self.fetch_interval_batchs)
def optimizer(self): def optimizer(self):
learning_rate = envs.get_global_env("hyper_parameters.learning_rate", None ,self.namespace) learning_rate = envs.get_global_env("hyper_parameters.learning_rate", None, self.namespace)
optimizer = fluid.optimizer.Adam(learning_rate, lazy_mode=True) optimizer = fluid.optimizer.Adam(learning_rate, lazy_mode=True)
return optimizer return optimizer
def dump_model_program(self, path):
pass
def dump_inference_param(self, params):
pass
def dump_inference_program(self, inference_layer, path):
pass
def shrink(self, params):
pass
class Evaluate(object): class Evaluate(object):
def input(self): def input(self):
......
...@@ -70,24 +70,27 @@ class ClusterTrainerWithDataset(TranspileTrainer): ...@@ -70,24 +70,27 @@ class ClusterTrainerWithDataset(TranspileTrainer):
return strategy return strategy
def init(self, context): def init(self, context):
print("init pass")
self.model.input() self.model.input()
self.model.net() self.model.net()
self.metrics = self.model.metrics() self.model.metrics()
self.metric_extras = self.model.metric_extras() self.model.avg_loss()
loss = self.model.avg_loss()
optimizer = self.model.optimizer() optimizer = self.model.optimizer()
strategy = self.build_strategy() strategy = self.build_strategy()
optimizer = fleet.distributed_optimizer(optimizer, strategy) optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(loss) optimizer.minimize(self.model._cost)
if fleet.is_server(): if fleet.is_server():
context['status'] = 'server_pass' context['status'] = 'server_pass'
else: else:
self.fetch_vars = []
self.fetch_alias = []
self.fetch_period = self.model.get_fetch_period()
metrics = self.model.get_metrics()
if metrics:
self.fetch_vars = metrics.values()
self.fetch_alias = metrics.keys()
context['status'] = 'train_pass' context['status'] = 'train_pass'
def server(self, context): def server(self, context):
...@@ -95,23 +98,19 @@ class ClusterTrainerWithDataset(TranspileTrainer): ...@@ -95,23 +98,19 @@ class ClusterTrainerWithDataset(TranspileTrainer):
fleet.run_server() fleet.run_server()
context['is_exit'] = True context['is_exit'] = True
def terminal(self, context):
fleet.stop_worker()
context['is_exit'] = True
def train(self, context): def train(self, context):
self.exe.run(fleet.startup_program) self._exe.run(fleet.startup_program)
fleet.init_worker() fleet.init_worker()
dataset = self._get_dataset() dataset = self._get_dataset()
epochs = envs.get_global_env("train.epochs") epochs = envs.get_global_env("train.epochs")
for i in range(epochs): for i in range(epochs):
self.exe.train_from_dataset(program=fluid.default_main_program(), self._exe.train_from_dataset(program=fluid.default_main_program(),
dataset=dataset, dataset=dataset,
fetch_list=self.metric_extras[0], fetch_list=self.fetch_vars,
fetch_info=self.metric_extras[1], fetch_info=self.fetch_alias,
print_period=self.metric_extras[2]) print_period=self.fetch_period)
self.save(i, "train", is_fleet=True) self.save(i, "train", is_fleet=True)
context['status'] = 'infer_pass' context['status'] = 'infer_pass'
fleet.stop_worker() fleet.stop_worker()
......
...@@ -82,10 +82,6 @@ class CtrPaddleTrainer(Trainer): ...@@ -82,10 +82,6 @@ class CtrPaddleTrainer(Trainer):
config['output_path'] = util.get_absolute_path( config['output_path'] = util.get_absolute_path(
config['output_path'], config['io']['afs']) config['output_path'], config['io']['afs'])
self._place = fluid.CPUPlace()
self._exe = fluid.Executor(self._place)
self._exector_context = {}
self.global_config = config self.global_config = config
self._metrics = {} self._metrics = {}
......
...@@ -46,29 +46,35 @@ class SingleTrainerWithDataset(TranspileTrainer): ...@@ -46,29 +46,35 @@ class SingleTrainerWithDataset(TranspileTrainer):
def init(self, context): def init(self, context):
self.model.input() self.model.input()
self.model.net() self.model.net()
self.metrics = self.model.metrics() self.model.metrics()
self.metric_extras = self.model.metric_extras() self.model.avg_loss()
loss = self.model.avg_loss()
optimizer = self.model.optimizer() optimizer = self.model.optimizer()
optimizer.minimize(loss) optimizer.minimize(self.model._cost)
self.fetch_vars = []
self.fetch_alias = []
self.fetch_period = self.model.get_fetch_period()
metrics = self.model.get_metrics()
if metrics:
self.fetch_vars = metrics.values()
self.fetch_alias = metrics.keys()
context['status'] = 'train_pass' context['status'] = 'train_pass'
def train(self, context): def train(self, context):
# run startup program at once # run startup program at once
self.exe.run(fluid.default_startup_program()) self._exe.run(fluid.default_startup_program())
dataset = self._get_dataset() dataset = self._get_dataset()
epochs = envs.get_global_env("train.epochs") epochs = envs.get_global_env("train.epochs")
for i in range(epochs): for i in range(epochs):
self.exe.train_from_dataset(program=fluid.default_main_program(), self._exe.train_from_dataset(program=fluid.default_main_program(),
dataset=dataset, dataset=dataset,
fetch_list=self.metric_extras[0], fetch_list=self.fetch_vars,
fetch_info=self.metric_extras[1], fetch_info=self.fetch_alias,
print_period=self.metric_extras[2]) print_period=self.fetch_period)
self.save(i, "train", is_fleet=False) self.save(i, "train", is_fleet=False)
context['status'] = 'infer_pass' context['status'] = 'infer_pass'
......
...@@ -15,8 +15,9 @@ ...@@ -15,8 +15,9 @@
import abc import abc
import time import time
import yaml import yaml
from paddle import fluid
from .. utils import envs from ..utils import envs
class Trainer(object): class Trainer(object):
...@@ -26,6 +27,9 @@ class Trainer(object): ...@@ -26,6 +27,9 @@ class Trainer(object):
def __init__(self, config=None): def __init__(self, config=None):
self._status_processor = {} self._status_processor = {}
self._place = fluid.CPUPlace()
self._exe = fluid.Executor(self._place)
self._exector_context = {}
self._context = {'status': 'uninit', 'is_exit': False} self._context = {'status': 'uninit', 'is_exit': False}
def regist_context_processor(self, status_name, processor): def regist_context_processor(self, status_name, processor):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册