diff --git a/fleetrec/models/base.py b/fleetrec/models/base.py index 47b48c79b72d6d49310a07c6a9c9c6767b4e6dd5..a309d37fafa83bf9fd146ddf9e2def8ac61c1538 100644 --- a/fleetrec/models/base.py +++ b/fleetrec/models/base.py @@ -85,7 +85,7 @@ class Model(object): self._cost = None self._metrics = {} self._data_var = [] - pass + self._fetch_interval = 10 def get_cost_op(self): """R @@ -97,6 +97,9 @@ class Model(object): """ return self._metrics + def get_fetch_period(self): + return self._fetch_interval + @abc.abstractmethod def shrink(self, params): """R @@ -169,6 +172,7 @@ class Model(object): class YamlModel(Model): """R """ + def __init__(self, config): """R """ @@ -218,7 +222,7 @@ class YamlModel(Model): inference_param = extend_output['inference_param'] param_name = inference_param['name'] if param_name not in self._build_param['table']: - self._build_param['table'][param_name] = {'params' :[]} + self._build_param['table'][param_name] = {'params': []} table_meta = table.TableMeta.alloc_new_table(inference_param['table_id']) self._build_param['table'][param_name]['_meta'] = table_meta self._build_param['table'][param_name]['params'] += inference_param['params'] diff --git a/fleetrec/models/ctr_dnn/model.py b/fleetrec/models/ctr_dnn/model.py index cd3c9df5926cece7495d56c6f2419785b5039829..c6b3997102ff42b06b14d8130d4dad2ce5d754b7 100644 --- a/fleetrec/models/ctr_dnn/model.py +++ b/fleetrec/models/ctr_dnn/model.py @@ -16,24 +16,17 @@ import math import paddle.fluid as fluid from fleetrec.utils import envs +from fleetrec.models.base import Model -class Train(object): - - def __init__(self): - self.sparse_inputs = [] - self.dense_input = None - self.label_input = None - - self.sparse_input_varnames = [] - self.dense_input_varname = None - self.label_input_varname = None - +class Train(Model): + def __init__(self, config): + super().__init__(config) self.namespace = "train.model" def input(self): def sparse_inputs(): - ids = envs.get_global_env("hyper_parameters.sparse_inputs_slots", None ,self.namespace) + ids = envs.get_global_env("hyper_parameters.sparse_inputs_slots", None, self.namespace) sparse_input_ids = [ fluid.layers.data(name="C" + str(i), @@ -44,7 +37,7 @@ class Train(object): return sparse_input_ids, [var.name for var in sparse_input_ids] def dense_input(): - dim = envs.get_global_env("hyper_parameters.dense_input_dim", None ,self.namespace) + dim = envs.get_global_env("hyper_parameters.dense_input_dim", None, self.namespace) dense_input_var = fluid.layers.data(name="dense_input", shape=[dim], @@ -65,10 +58,10 @@ class Train(object): def input_varnames(self): return [input.name for input in self.input_vars()] - def net(self): + def build_model(self): def embedding_layer(input): - sparse_feature_number = envs.get_global_env("hyper_parameters.sparse_feature_number", None ,self.namespace) - sparse_feature_dim = envs.get_global_env("hyper_parameters.sparse_feature_dim", None ,self.namespace) + sparse_feature_number = envs.get_global_env("hyper_parameters.sparse_feature_number", None, self.namespace) + sparse_feature_dim = envs.get_global_env("hyper_parameters.sparse_feature_dim", None, self.namespace) emb = fluid.layers.embedding( input=input, @@ -94,7 +87,7 @@ class Train(object): concated = fluid.layers.concat(sparse_embed_seq + [self.dense_input], axis=1) fcs = [concated] - hidden_layers = envs.get_global_env("hyper_parameters.fc_sizes", None ,self.namespace) + hidden_layers = envs.get_global_env("hyper_parameters.fc_sizes", None, self.namespace) for size in hidden_layers: fcs.append(fc(fcs[-1], size)) @@ -111,30 +104,34 @@ class Train(object): def avg_loss(self): cost = fluid.layers.cross_entropy(input=self.predict, label=self.label_input) - avg_cost = fluid.layers.reduce_sum(cost) - self.loss = avg_cost - return avg_cost + avg_cost = fluid.layers.reduce_mean(cost) + self._cost = avg_cost def metrics(self): auc, batch_auc, _ = fluid.layers.auc(input=self.predict, label=self.label_input, num_thresholds=2 ** 12, slide_steps=20) - self.metrics = (auc, batch_auc) - - return self.metrics - - def metric_extras(self): - self.metric_vars = [self.metrics[0]] - self.metric_alias = ["AUC"] - self.fetch_interval_batchs = 10 - return (self.metric_vars, self.metric_alias, self.fetch_interval_batchs) + self._metrics["AUC"] = auc + self._metrics["BATCH_AUC"] = batch_auc def optimizer(self): - learning_rate = envs.get_global_env("hyper_parameters.learning_rate", None ,self.namespace) + learning_rate = envs.get_global_env("hyper_parameters.learning_rate", None, self.namespace) optimizer = fluid.optimizer.Adam(learning_rate, lazy_mode=True) return optimizer + def dump_model_program(self, path): + pass + + def dump_inference_param(self, params): + pass + + def dump_inference_program(self, inference_layer, path): + pass + + def shrink(self, params): + pass + class Evaluate(object): def input(self): diff --git a/fleetrec/trainer/cluster_trainer.py b/fleetrec/trainer/cluster_trainer.py index 0ecc1eb6892afe02cc3e2d8133b164e021a0de24..791b7b5919953cacd8242ce198b9579ea6d4dd70 100644 --- a/fleetrec/trainer/cluster_trainer.py +++ b/fleetrec/trainer/cluster_trainer.py @@ -70,24 +70,27 @@ class ClusterTrainerWithDataset(TranspileTrainer): return strategy def init(self, context): - - print("init pass") - self.model.input() self.model.net() - self.metrics = self.model.metrics() - self.metric_extras = self.model.metric_extras() - - loss = self.model.avg_loss() + self.model.metrics() + self.model.avg_loss() optimizer = self.model.optimizer() strategy = self.build_strategy() optimizer = fleet.distributed_optimizer(optimizer, strategy) - optimizer.minimize(loss) + optimizer.minimize(self.model._cost) if fleet.is_server(): context['status'] = 'server_pass' else: + self.fetch_vars = [] + self.fetch_alias = [] + self.fetch_period = self.model.get_fetch_period() + + metrics = self.model.get_metrics() + if metrics: + self.fetch_vars = metrics.values() + self.fetch_alias = metrics.keys() context['status'] = 'train_pass' def server(self, context): @@ -95,23 +98,19 @@ class ClusterTrainerWithDataset(TranspileTrainer): fleet.run_server() context['is_exit'] = True - def terminal(self, context): - fleet.stop_worker() - context['is_exit'] = True - def train(self, context): - self.exe.run(fleet.startup_program) + self._exe.run(fleet.startup_program) fleet.init_worker() dataset = self._get_dataset() epochs = envs.get_global_env("train.epochs") for i in range(epochs): - self.exe.train_from_dataset(program=fluid.default_main_program(), - dataset=dataset, - fetch_list=self.metric_extras[0], - fetch_info=self.metric_extras[1], - print_period=self.metric_extras[2]) + self._exe.train_from_dataset(program=fluid.default_main_program(), + dataset=dataset, + fetch_list=self.fetch_vars, + fetch_info=self.fetch_alias, + print_period=self.fetch_period) self.save(i, "train", is_fleet=True) context['status'] = 'infer_pass' fleet.stop_worker() diff --git a/fleetrec/trainer/ctr_trainer.py b/fleetrec/trainer/ctr_trainer.py index 582d60f8e72472ea1d27847a2b3512c8fdc1543f..a83b39026dc4eea904747c71825f24889cb98ba3 100755 --- a/fleetrec/trainer/ctr_trainer.py +++ b/fleetrec/trainer/ctr_trainer.py @@ -82,10 +82,6 @@ class CtrPaddleTrainer(Trainer): config['output_path'] = util.get_absolute_path( config['output_path'], config['io']['afs']) - self._place = fluid.CPUPlace() - self._exe = fluid.Executor(self._place) - self._exector_context = {} - self.global_config = config self._metrics = {} diff --git a/fleetrec/trainer/single_trainer.py b/fleetrec/trainer/single_trainer.py index cfea592719d5799493e804beab48cd770e6ceae3..ed03e42a63fdeafbdb9e3eb1b4230b2d72edcc03 100644 --- a/fleetrec/trainer/single_trainer.py +++ b/fleetrec/trainer/single_trainer.py @@ -46,29 +46,35 @@ class SingleTrainerWithDataset(TranspileTrainer): def init(self, context): self.model.input() self.model.net() - self.metrics = self.model.metrics() - self.metric_extras = self.model.metric_extras() - - loss = self.model.avg_loss() + self.model.metrics() + self.model.avg_loss() optimizer = self.model.optimizer() - optimizer.minimize(loss) + optimizer.minimize(self.model._cost) + + self.fetch_vars = [] + self.fetch_alias = [] + self.fetch_period = self.model.get_fetch_period() + metrics = self.model.get_metrics() + if metrics: + self.fetch_vars = metrics.values() + self.fetch_alias = metrics.keys() context['status'] = 'train_pass' def train(self, context): # run startup program at once - self.exe.run(fluid.default_startup_program()) + self._exe.run(fluid.default_startup_program()) dataset = self._get_dataset() epochs = envs.get_global_env("train.epochs") for i in range(epochs): - self.exe.train_from_dataset(program=fluid.default_main_program(), - dataset=dataset, - fetch_list=self.metric_extras[0], - fetch_info=self.metric_extras[1], - print_period=self.metric_extras[2]) + self._exe.train_from_dataset(program=fluid.default_main_program(), + dataset=dataset, + fetch_list=self.fetch_vars, + fetch_info=self.fetch_alias, + print_period=self.fetch_period) self.save(i, "train", is_fleet=False) context['status'] = 'infer_pass' diff --git a/fleetrec/trainer/trainer.py b/fleetrec/trainer/trainer.py index 5aee3bf1b0282b1006a26979160926739de0996f..0819c84f50ec476d2bd877850bf4ca9c91deb343 100755 --- a/fleetrec/trainer/trainer.py +++ b/fleetrec/trainer/trainer.py @@ -15,19 +15,23 @@ import abc import time import yaml +from paddle import fluid -from .. utils import envs +from ..utils import envs class Trainer(object): """R - """ + """ __metaclass__ = abc.ABCMeta def __init__(self, config=None): self._status_processor = {} + self._place = fluid.CPUPlace() + self._exe = fluid.Executor(self._place) + self._exector_context = {} self._context = {'status': 'uninit', 'is_exit': False} - + def regist_context_processor(self, status_name, processor): """ regist a processor for specify status @@ -46,7 +50,7 @@ class Trainer(object): self._status_processor[context['status']](context) else: self.other_status_processor(context) - + def other_status_processor(self, context): """ if no processor match context.status, use defalut processor