diff --git a/core/model.py b/core/model.py index 847a5d23362e4c3db8ef0cfa59fb83c4ed9a4c91..52526659c9633b5c061db5467e9221948c442853 100755 --- a/core/model.py +++ b/core/model.py @@ -38,6 +38,7 @@ class Model(object): self._namespace = "train.model" self._platform = envs.get_platform() self._init_hyper_parameters() + self._env = config def _init_hyper_parameters(self): pass @@ -103,7 +104,7 @@ class Model(object): def get_fetch_period(self): return self._fetch_interval - def _build_optimizer(self, name, lr): + def _build_optimizer(self, name, lr, strategy=None): name = name.upper() optimizers = ["SGD", "ADAM", "ADAGRAD"] if name not in optimizers: @@ -133,10 +134,18 @@ class Model(object): print(">>>>>>>>>>>.learnig rate: %s" % learning_rate) return self._build_optimizer(optimizer, learning_rate) - def input_data(self, is_infer=False): - sparse_slots = envs.get_global_env("sparse_slots", None, - "train.reader") - dense_slots = envs.get_global_env("dense_slots", None, "train.reader") + def input_data(self, is_infer=False, dataset_name=None, program=None): + dataset = {} + for i in self._env["dataset"]: + if i["name"] == dataset_name: + dataset = i + break + sparse_slots = dataset.get("sparse_slots", None) + #sparse_slots = + #envs.get_global_env("sparse_slots", None, + # "train.reader") + #dense_slots = envs.get_global_env("dense_slots", None, "train.reader") + dense_slots = dataset.get("dense_slots", None) if sparse_slots is not None or dense_slots is not None: sparse_slots = sparse_slots.strip().split(" ") dense_slots = dense_slots.strip().split(" ") @@ -159,6 +168,8 @@ class Model(object): name=name, shape=[1], lod_level=1, dtype="int64") data_var_.append(l) self._sparse_data_var.append(l) + print(self._dense_data_var) + print(self._sparse_data_var) return data_var_ else: diff --git a/core/reader.py b/core/reader.py index dc0b9b0b7fd784fdc86e7db5e0b488c35b9021dc..28f47256c74df2b20c29942afd79485ddf3647a8 100755 --- a/core/reader.py +++ b/core/reader.py @@ -58,8 +58,8 @@ class SlotReader(dg.MultiSlotDataGenerator): _config = yaml.load(rb.read(), Loader=yaml.FullLoader) else: raise ValueError("reader config only support yaml") - envs.set_global_envs(_config) - envs.update_workspace() + #envs.set_global_envs(_config) + #envs.update_workspace() def init(self, sparse_slots, dense_slots, padding=0): from operator import mul diff --git a/core/trainers/single_trainer.py b/core/trainers/single_trainer.py index a564ba5585c313a163542f028fa158f8c50c8d2a..526ede424246daa48cee7881b07c33405362fe26 100755 --- a/core/trainers/single_trainer.py +++ b/core/trainers/single_trainer.py @@ -19,11 +19,12 @@ from __future__ import print_function import time import logging - +import os import paddle.fluid as fluid from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer from paddlerec.core.utils import envs +from paddlerec.core.reader import SlotReader logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger("fluid") @@ -31,47 +32,253 @@ logger.setLevel(logging.INFO) class SingleTrainer(TranspileTrainer): + def __init__(self, config=None): + super(TranspileTrainer, self).__init__(config) + self._env = self._config#envs.get_global_envs() + #device = envs.get_global_env("train.device", "cpu") + device = self._env["device"] + if device == 'gpu': + self._place = fluid.CUDAPlace(0) + elif device == 'cpu': + self._place = fluid.CPUPlace() + self._exe = fluid.Executor(self._place) + self.processor_register() + self._model = {} + self._dataset = {} + #self.inference_models = [] + #self.increment_models = [] + def processor_register(self): self.regist_context_processor('uninit', self.instance) self.regist_context_processor('init_pass', self.init) self.regist_context_processor('startup_pass', self.startup) - if envs.get_platform() == "LINUX" and envs.get_global_env( - "dataset_class", None, "train.reader") != "DataLoader": - self.regist_context_processor('train_pass', self.dataset_train) - else: - self.regist_context_processor('train_pass', self.dataloader_train) + #if envs.get_platform() == "LINUX" and envs.get_global_env( + # "dataset_class", None, "train.reader") != "DataLoader": + + self.regist_context_processor('train_pass', self.executor_train) +# if envs.get_platform() == "LINUX" and envs.get_global_env( +# "" +# self.regist_context_processor('train_pass', self.dataset_train) +# else: +# self.regist_context_processor('train_pass', self.dataloader_train) - self.regist_context_processor('infer_pass', self.infer) + #self.regist_context_processor('infer_pass', self.infer) self.regist_context_processor('terminal_pass', self.terminal) - def init(self, context): - self.model.train_net() - optimizer = self.model.optimizer() - optimizer.minimize((self.model.get_avg_cost())) + def instance(self, context): + context['status'] = 'init_pass' - self.fetch_vars = [] - self.fetch_alias = [] - self.fetch_period = self.model.get_fetch_period() + def dataloader_train(self, context): + pass - metrics = self.model.get_metrics() - if metrics: - self.fetch_vars = metrics.values() - self.fetch_alias = metrics.keys() - evaluate_only = envs.get_global_env( - 'evaluate_only', False, namespace='evaluate') - if evaluate_only: - context['status'] = 'infer_pass' + def dataset_train(self, context): + pass + + #def _get_optmizer(self, cost): + # if self._env["hyper_parameters"]["optimizer"]["class"] == "Adam": + + def _create_dataset(self, dataset_name): + config_dict = None + for i in self._env["dataset"]: + if i["name"] == dataset_name: + config_dict = i + break + #reader_ins = SlotReader(self._config_yaml) + sparse_slots = config_dict["sparse_slots"] + dense_slots = config_dict["dense_slots"] + padding = 0 + reader = envs.path_adapter("paddlerec.core.utils") + "/dataset_instance.py" + #reader = "{workspace}/paddlerec/core/utils/dataset_instance.py".replace("{workspace}", envs.path_adapter(self._env["workspace"])) + pipe_cmd = "python {} {} {} {} {} {} {} {}".format( + reader, "slot", "slot", self._config_yaml, "fake", \ + sparse_slots.replace(" ", "#"), dense_slots.replace(" ", "#"), str(padding)) + + if config_dict["type"] == "QueueDataset": + dataset = fluid.DatasetFactory().create_dataset(config_dict["type"]) + dataset.set_batch_size(config_dict["batch_size"]) + #dataset.set_thread(config_dict["thread_num"]) + #dataset.set_hdfs_config(config_dict["data_fs_name"], config_dict["data_fs_ugi"]) + dataset.set_pipe_command(pipe_cmd) + train_data_path = config_dict["data_path"].replace("{workspace}", envs.path_adapter(self._env["workspace"])) + file_list = [ + os.path.join(train_data_path, x) + for x in os.listdir(train_data_path) + ] + dataset.set_filelist(file_list) + for model_dict in self._env["executor"]: + if model_dict["dataset_name"] == dataset_name: + model = self._model[model_dict["name"]][3] + inputs = model.get_inputs() + dataset.set_use_var(inputs) + break else: - context['status'] = 'startup_pass' + pass + + return dataset + + def init(self, context): + #self.model.train_net() + for model_dict in self._env["executor"]: + self._model[model_dict["name"]] = [None] * 4 +# self._model[model_dict["name"]][0] = fluid.Program() #train_program +# self._model[model_dict["name"]][1] = fluid.Program() #startup_program +# self._model[model_dict["name"]][2] = fluid.Scope() #scope + train_program = fluid.Program() + startup_program = fluid.Program() + scope = fluid.Scope() + opt_name = self._env["hyper_parameters"]["optimizer"]["class"] + opt_lr = self._env["hyper_parameters"]["optimizer"]["learning_rate"] + opt_strategy = self._env["hyper_parameters"]["optimizer"]["strategy"] + with fluid.program_guard(train_program, startup_program): + with fluid.unique_name.guard(): + model_path = model_dict["model"].replace("{workspace}", envs.path_adapter(self._env["workspace"])) + model = envs.lazy_instance_by_fliename(model_path, "Model")(self._env) + model._data_var = model.input_data(dataset_name=model_dict["dataset_name"]) + model.net(None)#### + optimizer = model._build_optimizer(opt_name, opt_lr, opt_strategy) + optimizer.minimize(model._cost) + self._model[model_dict["name"]][0] = train_program + self._model[model_dict["name"]][1] = startup_program + self._model[model_dict["name"]][2] = scope + self._model[model_dict["name"]][3] = model + + for dataset in self._env["dataset"]: + self._dataset[dataset["name"]] = self._create_dataset(dataset["name"]) + +# self.fetch_vars = [] +# self.fetch_alias = [] +# self.fetch_period = self.model.get_fetch_period() + +# metrics = self.model.get_metrics() +# if metrics: +# self.fetch_vars = metrics.values() +# self.fetch_alias = metrics.keys() + #evaluate_only = envs.get_global_env( + # 'evaluate_only', False, namespace='evaluate') + #if evaluate_only: + # context['status'] = 'infer_pass' + #else: + context['status'] = 'startup_pass' def startup(self, context): - self._exe.run(fluid.default_startup_program()) + for model_dict in self._env["executor"]: + with fluid.scope_guard(self._model[model_dict["name"]][2]): + self._exe.run(self._model[model_dict["name"]][1]) context['status'] = 'train_pass' + def executor_train(self, context): + epochs = int(self._env["epochs"]) + for j in range(epochs): + for model_dict in self._env["executor"]: + reader_name = model_dict["dataset_name"] + #print(self._dataset) + #print(reader_name) + dataset = None + for i in self._env["dataset"]: + if i["name"] == reader_name: + dataset = i + break + if dataset["type"] == "DataLoader": + self._executor_dataloader_train(model_dict) + else: + self._executor_dataset_train(model_dict) + print("epoch %s done" % j) +# self._model[model_dict["name"]][1] = fluid.compiler.CompiledProgram( +# self._model[model_dict["name"]][1]).with_data_parallel(loss_name=self._model.get_avg_cost().name) +# fetch_vars = [] +# fetch_alias = [] +# fetch_period = self._model.get_fetch_period() +# metrics = self._model.get_metrics() +# if metrics: +# fetch_vars = metrics.values() +# fetch_alias = metrics.keys() +# metrics_varnames = [] + context['status'] = "terminal_pass" + + def _executor_dataset_train(self, model_dict): +# dataset = self._get_dataset("TRAIN") +# ins = self._get_dataset_ins() + +# epochs = envs.get_global_env("train.epochs") +# for i in range(epochs): + reader_name = model_dict["dataset_name"] + model_name = model_dict["name"] + model_class = self._model[model_name][3] + fetch_vars = [] + fetch_alias = [] + fetch_period = 1#model_class.get_fetch_period() + metrics = model_class.get_metrics() + if metrics: + fetch_vars = metrics.values() + fetch_alias = metrics.keys() + scope = self._model[model_name][2] + program = self._model[model_name][1] + reader = self._dataset[reader_name] + with fluid.scope_guard(scope): + begin_time = time.time() + self._exe.train_from_dataset( + program=program, + dataset=reader, + fetch_list=fetch_vars, + fetch_info=fetch_alias, + print_period=fetch_period) + end_time = time.time() + times = end_time - begin_time + #print("epoch {} using time {}".format(i, times)) + #print("epoch {} using time {}, speed {:.2f} lines/s".format( + # i, times, ins / times)) + + + def _executor_dataloader_train(self, model_dict): + reader_name = model_dict["dataset_name"] + model_name = model_dict["name"] + model_class = self._model[model][3] + self._model[model_name][1] = fluid.compiler.CompiledProgram( + self._model[model_name][1]).with_data_parallel(loss_name=model_class.get_avg_cost().name) + fetch_vars = [] + fetch_alias = [] + fetch_period = self._model.get_fetch_period() + metrics = self._model.get_metrics() + if metrics: + fetch_vars = metrics.values() + fetch_alias = metrics.keys() + metrics_varnames = [] + metrics_format = [] + metrics_format.append("{}: {{}}".format("epoch")) + metrics_format.append("{}: {{}}".format("batch")) + for name, var in model_class.items(): + metrics_varnames.append(var.name) + metrics_format.append("{}: {{}}".format(name)) + metrics_format = ", ".join(metrics_format) + + reader = self._dataset["reader_name"] + reader.start() + batch_id = 0 + scope = self._model[model_name][3] + prorgram = self._model[model_name][1] + with fluid.scope_guard(self._model[model_name][3]): + try: + while True: + metrics_rets = self._exe.run(program=program, + fetch_list=metrics_varnames) + + metrics = [epoch, batch_id] + metrics.extend(metrics_rets) + + if batch_id % self.fetch_period == 0 and batch_id != 0: + print(metrics_format.format(*metrics)) + batch_id += 1 + except fluid.core.EOFException: + reader.reset() + + def dataloader_train(self, context): - reader = self._get_dataloader("TRAIN") - epochs = envs.get_global_env("train.epochs") + + exit(-1) + + reader = self._get_dataloader(self._env["TRAIN"]) + epochs = self._env["epochs"] program = fluid.compiler.CompiledProgram(fluid.default_main_program( )).with_data_parallel(loss_name=self.model.get_avg_cost().name) @@ -130,6 +337,6 @@ class SingleTrainer(TranspileTrainer): context['status'] = 'infer_pass' def terminal(self, context): - for model in self.increment_models: - print("epoch :{}, dir: {}".format(model[0], model[1])) + #for model in self.increment_models: + # print("epoch :{}, dir: {}".format(model[0], model[1])) context['is_exit'] = True diff --git a/core/utils/envs.py b/core/utils/envs.py index bc222e906448435031024281a0a80298073d3979..ab631ac4f517c7cbb190c317c5fdc5667a953d91 100755 --- a/core/utils/envs.py +++ b/core/utils/envs.py @@ -20,6 +20,7 @@ import sys global_envs = {} +global_envs_raw = {} def flatten_environs(envs, separator="."): flatten_dict = {} @@ -62,6 +63,10 @@ def get_trainer(): def set_global_envs(envs): assert isinstance(envs, dict) + global_envs_raw = envs + + return + def fatten_env_namespace(namespace_nests, local_envs): for k, v in local_envs.items(): if isinstance(v, dict): diff --git a/models/rank/dnn/config.yaml b/models/rank/dnn/config.yaml index ff59d9ef023da6dbaee01fda519abd9cfa75b1e3..9329c9bacfd26b9c9d7a68e5ec98520935a30e5c 100755 --- a/models/rank/dnn/config.yaml +++ b/models/rank/dnn/config.yaml @@ -12,39 +12,48 @@ # See the License for the specific language governing permissions and # limitations under the License. -train: - epochs: 10 - engine: single - workspace: "paddlerec.models.rank.dnn" +debug: false +cold_start: true +epochs: 10 +device: cpu +workspace: "paddlerec.models.rank.dnn" - trainer: - # for cluster training - strategy: "async" +dataset: +#- name: dataset_1 +# batch_size: 2 +# type: DataLoader +# data_path: "{workspace}/data/sample_data/train" +# sparse_slots: "click 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26" +# dense_slots: "dense_var:13" - reader: - batch_size: 2 - train_data_path: "{workspace}/data/sample_data/train" - reader_debug_mode: False - sparse_slots: "click 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26" - dense_slots: "dense_var:13" +- name: dataset_2 + batch_size: 2 + type: QueueDataset + data_path: "{workspace}/data/sample_data/train" +# data_path: "{workspace}/data/sample_data/train" + sparse_slots: "click 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26" + dense_slots: "dense_var:13" - model: - models: "{workspace}/model.py" - hyper_parameters: - sparse_inputs_slots: 27 - sparse_feature_number: 1000001 - sparse_feature_dim: 9 - dense_input_dim: 13 - fc_sizes: [512, 256, 128, 32] - learning_rate: 0.001 - optimizer: adam +hyper_parameters: + optimizer: + class: Adam + learning_rate: 0.001 + strategy: async + sparse_inputs_slots: 27 + sparse_feature_number: 1000001 + sparse_feature_dim: 9 + dense_input_dim: 13 + fc_sizes: [512, 256, 128, 32] - save: - increment: - dirname: "increment" - epoch_interval: 2 - save_last: True - inference: - dirname: "inference" - epoch_interval: 4 - save_last: True +epoch: + trainer_class: Single + save_checkpoint_interval: 2 + save_inference_interval: 4 + save_checkpoint_path: "increment" + save_inference_path: "inference" + +executor: + - name: train + model: "{workspace}/model.py" + dataset_name: dataset_2 + thread_num: 1 diff --git a/models/rank/dnn/model.py b/models/rank/dnn/model.py index 644bbd7c98ec8360faa22969e245f284946947d8..e45c4a181ab16b52861511a332c6cd72310a32c4 100755 --- a/models/rank/dnn/model.py +++ b/models/rank/dnn/model.py @@ -27,12 +27,12 @@ class Model(ModelBase): def _init_hyper_parameters(self): self.is_distributed = True if envs.get_trainer( ) == "CtrTrainer" else False - self.sparse_feature_number = envs.get_global_env( - "hyper_parameters.sparse_feature_number", None, self._namespace) - self.sparse_feature_dim = envs.get_global_env( - "hyper_parameters.sparse_feature_dim", None, self._namespace) - self.learning_rate = envs.get_global_env( - "hyper_parameters.learning_rate", None, self._namespace) + self.sparse_feature_number = 1000001 #envs.get_global_env( + #"hyper_parameters.sparse_feature_number", None, self._namespace) + self.sparse_feature_dim = 9#envs.get_global_env( + #"hyper_parameters.sparse_feature_dim", None, self._namespace) + self.learning_rate = 0.001#envs.get_global_env( + #"hyper_parameters.learning_rate", None, self._namespace) def net(self, input, is_infer=False): self.sparse_inputs = self._sparse_data_var[1:] @@ -56,8 +56,8 @@ class Model(ModelBase): sparse_embed_seq + [self.dense_input], axis=1) fcs = [concated] - hidden_layers = envs.get_global_env("hyper_parameters.fc_sizes", None, - self._namespace) + hidden_layers = [512, 256, 128, 32]#envs.get_global_env("hyper_parameters.fc_sizes", None, + # self._namespace) for size in hidden_layers: output = fluid.layers.fc( @@ -82,6 +82,8 @@ class Model(ModelBase): input=self.predict, label=self.label_input) avg_cost = fluid.layers.reduce_mean(cost) self._cost = avg_cost + + auc, batch_auc, _ = fluid.layers.auc(input=self.predict, label=self.label_input, num_thresholds=2**12,