From 03dfc4133bb61cc8e60256a4613ccc227c67a7d2 Mon Sep 17 00:00:00 2001 From: xjqbest <173596896@qq.com> Date: Wed, 27 May 2020 22:26:14 +0800 Subject: [PATCH] fix --- core/factory.py | 4 - core/model.py | 37 +++- core/reader.py | 10 +- core/trainers/single_auc_yamlopt.py | 214 ------------------ core/trainers/single_trainer.py | 282 ++++++++++++++++++------ core/trainers/single_trainer_yamlopt.py | 214 ------------------ core/trainers/transpiler_trainer.py | 4 + core/utils/dataloader_instance.py | 92 +++++++- core/utils/envs.py | 2 - models/rank/dnn/config.yaml | 7 +- run.py | 32 +-- 11 files changed, 351 insertions(+), 547 deletions(-) delete mode 100755 core/trainers/single_auc_yamlopt.py delete mode 100755 core/trainers/single_trainer_yamlopt.py diff --git a/core/factory.py b/core/factory.py index de9c8c49..53d43e34 100755 --- a/core/factory.py +++ b/core/factory.py @@ -36,10 +36,6 @@ def trainer_registry(): "tdm_single_trainer.py") trainers["TDMClusterTrainer"] = os.path.join(trainer_abs, "tdm_cluster_trainer.py") - trainers["SingleTrainerYamlOpt"] = os.path.join(trainer_abs, - "single_trainer_yamlopt.py") - trainers["SingleAucYamlOpt"] = os.path.join(trainer_abs, - "single_auc_yamlopt.py") trainer_registry() diff --git a/core/model.py b/core/model.py index 04061b39..30d7c2a9 100755 --- a/core/model.py +++ b/core/model.py @@ -39,15 +39,32 @@ class Model(object): self._platform = envs.get_platform() self._init_hyper_parameters() self._env = config + self._slot_inited = False def _init_hyper_parameters(self): pass - def _init_slots(self): - sparse_slots = envs.get_global_env("sparse_slots", None, - "train.reader") - dense_slots = envs.get_global_env("dense_slots", None, "train.reader") - + def _init_slots(self, **kargs): + if self._slot_inited: + return + self._slot_inited = True + dataset = {} + model_dict = {}#self._env["executor"]#[kargs["name"]] + for i in self._env["executor"]: + if i["name"] == kargs["name"]: + model_dict = i + break + for i in self._env["dataset"]: + if i["name"] == model_dict["dataset_name"]: + dataset = i + break + name = "dataset." + dataset["name"] + "." + sparse_slots = envs.get_global_env(name + "sparse_slots")#"sparse_slots", None, + #"train.reader") + dense_slots = envs.get_global_env(name + "dense_slots") + #"dense_slots", None, "train.reader") + #print(sparse_slots) + #print(dense_slots) if sparse_slots is not None or dense_slots is not None: sparse_slots = sparse_slots.strip().split(" ") dense_slots = dense_slots.strip().split(" ") @@ -70,12 +87,13 @@ class Model(object): self._data_var.append(l) self._sparse_data_var.append(l) - dataset_class = envs.get_global_env("dataset_class", None, - "train.reader") - if dataset_class == "DataLoader": - self._init_dataloader() + #dataset_class = dataset["type"]#envs.get_global_env("dataset_class", None, + # "train.reader") + #if dataset_class == "DataLoader": + # self._init_dataloader() def _init_dataloader(self): + #print(self._data_var) self._data_loader = fluid.io.DataLoader.from_generator( feed_list=self._data_var, capacity=64, @@ -131,7 +149,6 @@ class Model(object): None, self._namespace) optimizer = envs.get_global_env("hyper_parameters.optimizer", None, self._namespace) - print(">>>>>>>>>>>.learnig rate: %s" % learning_rate) return self._build_optimizer(optimizer, learning_rate) def input_data(self, is_infer=False, **kwargs): diff --git a/core/reader.py b/core/reader.py index 28f47256..91048ee2 100755 --- a/core/reader.py +++ b/core/reader.py @@ -35,8 +35,6 @@ class Reader(dg.MultiSlotDataGenerator): else: raise ValueError("reader config only support yaml") - envs.set_global_envs(_config) - envs.update_workspace() @abc.abstractmethod def init(self): @@ -63,8 +61,12 @@ class SlotReader(dg.MultiSlotDataGenerator): def init(self, sparse_slots, dense_slots, padding=0): from operator import mul - self.sparse_slots = sparse_slots.strip().split(" ") - self.dense_slots = dense_slots.strip().split(" ") + self.sparse_slots = [] + if sparse_slots.strip() != "#": + self.sparse_slots = sparse_slots.strip().split(" ") + self.dense_slots = [] + if dense_slots.strip() != "#": + self.dense_slots = dense_slots.strip().split(" ") self.dense_slots_shape = [ reduce(mul, [int(j) for j in i.split(":")[1].strip("[]").split(",")]) diff --git a/core/trainers/single_auc_yamlopt.py b/core/trainers/single_auc_yamlopt.py deleted file mode 100755 index 634b5a90..00000000 --- a/core/trainers/single_auc_yamlopt.py +++ /dev/null @@ -1,214 +0,0 @@ -# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" -Training use fluid with one node only. -""" - -from __future__ import print_function - -import time -import logging -import os -import paddle.fluid as fluid - -from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer -from paddlerec.core.utils import envs -from paddlerec.core.reader import SlotReader - -logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s") -logger = logging.getLogger("fluid") -logger.setLevel(logging.INFO) - - -class SingleAucYamlOpt(TranspileTrainer): - def __init__(self, config=None): - super(TranspileTrainer, self).__init__(config) - self._env = self._config - device = envs.get_global_env("device") - if device == 'gpu': - self._place = fluid.CUDAPlace(0) - elif device == 'cpu': - self._place = fluid.CPUPlace() - self._exe = fluid.Executor(self._place) - self.processor_register() - self._model = {} - self._dataset = {} - envs.set_global_envs(self._config) - envs.update_workspace() - - def processor_register(self): - self.regist_context_processor('uninit', self.instance) - self.regist_context_processor('init_pass', self.init) - self.regist_context_processor('startup_pass', self.startup) - self.regist_context_processor('train_pass', self.executor_train) - self.regist_context_processor('terminal_pass', self.terminal) - - def instance(self, context): - context['status'] = 'init_pass' - - def _create_dataset(self, dataset_name): - name = "dataset." + dataset_name + "." - sparse_slots = envs.get_global_env(name + "sparse_slots") - dense_slots = envs.get_global_env(name + "dense_slots") - thread_num = envs.get_global_env(name + "thread_num") - batch_size = envs.get_global_env(name + "batch_size") - reader_type = envs.get_global_env(name + "type") - if envs.get_platform() != "LINUX": - print("platform ", envs.get_platform(), " change reader to DataLoader") - reader_type = "DataLoader" - padding = 0 - - reader = envs.path_adapter("paddlerec.core.utils") + "/dataset_instance.py" - pipe_cmd = "python {} {} {} {} {} {} {} {}".format( - reader, "slot", "slot", self._config_yaml, "fake", \ - sparse_slots.replace(" ", "#"), dense_slots.replace(" ", "#"), str(padding)) - - type_name = envs.get_global_env(name + "type") - if type_name == "QueueDataset": - dataset = fluid.DatasetFactory().create_dataset() - dataset.set_batch_size(envs.get_global_env(name + "batch_size")) - dataset.set_pipe_command(pipe_cmd) - train_data_path = envs.get_global_env(name + "data_path") - file_list = [ - os.path.join(train_data_path, x) - for x in os.listdir(train_data_path) - ] - dataset.set_filelist(file_list) - for model_dict in self._env["executor"]: - if model_dict["dataset_name"] == dataset_name: - model = self._model[model_dict["name"]][3] - inputs = model.get_inputs() - dataset.set_use_var(inputs) - break - else: - pass - - return dataset - - def init(self, context): - for model_dict in self._env["executor"]: - self._model[model_dict["name"]] = [None] * 4 - train_program = fluid.Program() - startup_program = fluid.Program() - scope = fluid.Scope() - opt_name = envs.get_global_env("hyper_parameters.optimizer.class") - opt_lr = envs.get_global_env("hyper_parameters.optimizer.learning_rate") - opt_strategy = envs.get_global_env("hyper_parameters.optimizer.strategy") - with fluid.program_guard(train_program, startup_program): - with fluid.unique_name.guard(): - model_path = model_dict["model"].replace("{workspace}", envs.path_adapter(self._env["workspace"])) - model = envs.lazy_instance_by_fliename(model_path, "Model")(self._env) - model._data_var = model.input_data(dataset_name=model_dict["dataset_name"]) - model.net(None, is_infer=True) - optimizer = model._build_optimizer(opt_name, opt_lr, opt_strategy) - optimizer.minimize(model._cost) - self._model[model_dict["name"]][0] = train_program - self._model[model_dict["name"]][1] = startup_program - self._model[model_dict["name"]][2] = scope - self._model[model_dict["name"]][3] = model - - for dataset in self._env["dataset"]: - self._dataset[dataset["name"]] = self._create_dataset(dataset["name"]) - - context['status'] = 'startup_pass' - - def startup(self, context): - for model_dict in self._env["executor"]: - with fluid.scope_guard(self._model[model_dict["name"]][2]): - self._exe.run(self._model[model_dict["name"]][1]) - context['status'] = 'train_pass' - - def executor_train(self, context): - epochs = int(self._env["epochs"]) - for j in range(epochs): - for model_dict in self._env["executor"]: - reader_name = model_dict["dataset_name"] - name = "dataset." + reader_name + "." - begin_time = time.time() - if envs.get_global_env(name + "type") == "DataLoader": - self._executor_dataloader_train(model_dict) - else: - self._executor_dataset_train(model_dict) - end_time = time.time() - seconds = end_time - begin_time - print("epoch {} done, time elasped: {}".format(j, seconds)) - context['status'] = "terminal_pass" - - def _executor_dataset_train(self, model_dict): - reader_name = model_dict["dataset_name"] - model_name = model_dict["name"] - model_class = self._model[model_name][3] - fetch_vars = [] - fetch_alias = [] - fetch_period = 20 - metrics = model_class.get_metrics() - if metrics: - fetch_vars = metrics.values() - fetch_alias = metrics.keys() - scope = self._model[model_name][2] - program = self._model[model_name][0] - reader = self._dataset[reader_name] - with fluid.scope_guard(scope): - self._exe.train_from_dataset( - program=program, - dataset=reader, - fetch_list=fetch_vars, - fetch_info=fetch_alias, - print_period=fetch_period) - - - def _executor_dataloader_train(self, model_dict): - reader_name = model_dict["dataset_name"] - model_name = model_dict["name"] - model_class = self._model[model_name][3] - self._model[model_name][1] = fluid.compiler.CompiledProgram( - self._model[model_name][1]).with_data_parallel(loss_name=model_class.get_avg_cost().name) - fetch_vars = [] - fetch_alias = [] - fetch_period = 20 - metrics = model_class.get_metrics() - if metrics: - fetch_vars = metrics.values() - fetch_alias = metrics.keys() - metrics_varnames = [] - metrics_format = [] - metrics_format.append("{}: {{}}".format("epoch")) - metrics_format.append("{}: {{}}".format("batch")) - for name, var in model_class.items(): - metrics_varnames.append(var.name) - metrics_format.append("{}: {{}}".format(name)) - metrics_format = ", ".join(metrics_format) - - reader = self._dataset[reader_name] - reader.start() - batch_id = 0 - scope = self._model[model_name][2] - prorgram = self._model[model_name][0] - with fluid.scope_guard(scope): - try: - while True: - metrics_rets = self._exe.run(program=program, - fetch_list=metrics_varnames) - - metrics = [epoch, batch_id] - metrics.extend(metrics_rets) - - if batch_id % self.fetch_period == 0 and batch_id != 0: - print(metrics_format.format(*metrics)) - batch_id += 1 - except fluid.core.EOFException: - reader.reset() - - def terminal(self, context): - context['is_exit'] = True diff --git a/core/trainers/single_trainer.py b/core/trainers/single_trainer.py index a564ba55..ce1aa987 100755 --- a/core/trainers/single_trainer.py +++ b/core/trainers/single_trainer.py @@ -19,83 +19,263 @@ from __future__ import print_function import time import logging - +import os import paddle.fluid as fluid from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer from paddlerec.core.utils import envs +from paddlerec.core.reader import SlotReader +from paddlerec.core.utils import dataloader_instance logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger("fluid") logger.setLevel(logging.INFO) -class SingleTrainer(TranspileTrainer): +class SingleTrainerYamlOpt(TranspileTrainer): + def __init__(self, config=None): + super(TranspileTrainer, self).__init__(config) + self._env = self._config + device = envs.get_global_env("device") + if device == 'gpu': + self._place = fluid.CUDAPlace(0) + elif device == 'cpu': + self._place = fluid.CPUPlace() + self._exe = fluid.Executor(self._place) + self.processor_register() + self._model = {} + self._dataset = {} + envs.set_global_envs(self._config) + envs.update_workspace() + def processor_register(self): self.regist_context_processor('uninit', self.instance) self.regist_context_processor('init_pass', self.init) self.regist_context_processor('startup_pass', self.startup) + self.regist_context_processor('train_pass', self.executor_train) + self.regist_context_processor('terminal_pass', self.terminal) + + def instance(self, context): + context['status'] = 'init_pass' + + def _get_dataset(self, dataset_name): + name = "dataset." + dataset_name + "." + sparse_slots = envs.get_global_env(name + "sparse_slots") + dense_slots = envs.get_global_env(name + "dense_slots") + thread_num = envs.get_global_env(name + "thread_num") + batch_size = envs.get_global_env(name + "batch_size") + reader_class = envs.get_global_env("data_convertor") + abs_dir = os.path.dirname(os.path.abspath(__file__)) + reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py') + + if sparse_slots is None and dense_slots is None: + pipe_cmd = "python {} {} {} {}".format(reader, reader_class, "fake", self._config_yaml) - if envs.get_platform() == "LINUX" and envs.get_global_env( - "dataset_class", None, "train.reader") != "DataLoader": - self.regist_context_processor('train_pass', self.dataset_train) else: - self.regist_context_processor('train_pass', self.dataloader_train) + if sparse_slots is None: + sparse_slots = "#" + if dense_slots is None: + dense_slots = "#" + padding = envs.get_global_env(name +"padding", 0) + pipe_cmd = "python {} {} {} {} {} {} {} {}".format( + reader, "slot", "slot", self._config_yaml, "fake", \ + sparse_slots.replace(" ", "#"), dense_slots.replace(" ", "#"), str(padding)) + + dataset = fluid.DatasetFactory().create_dataset() + dataset.set_batch_size(envs.get_global_env(name + "batch_size")) + dataset.set_pipe_command(pipe_cmd) + train_data_path = envs.get_global_env(name + "data_path") + file_list = [ + os.path.join(train_data_path, x) + for x in os.listdir(train_data_path) + ] + dataset.set_filelist(file_list) + for model_dict in self._env["executor"]: + if model_dict["dataset_name"] == dataset_name: + model = self._model[model_dict["name"]][3] + inputs = model.get_inputs() + dataset.set_use_var(inputs) + break + return dataset - self.regist_context_processor('infer_pass', self.infer) - self.regist_context_processor('terminal_pass', self.terminal) + def _get_dataloader(self, dataset_name): + name = "dataset." + dataset_name + "." + sparse_slots = envs.get_global_env(name + "sparse_slots") + dense_slots = envs.get_global_env(name + "dense_slots") + thread_num = envs.get_global_env(name + "thread_num") + batch_size = envs.get_global_env(name + "batch_size") + reader_class = envs.get_global_env("data_convertor") + abs_dir = os.path.dirname(os.path.abspath(__file__)) + #reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py') + if sparse_slots is None and dense_slots is None: + #reader_class = envs.get_global_env("class") + reader = dataloader_instance.dataloader_by_name(reader_class, dataset_name, self._config_yaml) + reader_class = envs.lazy_instance_by_fliename(reader_class, "TrainReader") + reader_ins = reader_class(self._config_yaml) + else: + reader = dataloader_instance.slotdataloader_by_name("", dataset_name, self._config_yaml) + reader_ins = SlotReader(self._config_yaml) + if hasattr(reader_ins, 'generate_batch_from_trainfiles'): + dataloader.set_sample_list_generator(reader) + else: + dataloader.set_sample_generator(reader, batch_size) + return dataloader - def init(self, context): - self.model.train_net() - optimizer = self.model.optimizer() - optimizer.minimize((self.model.get_avg_cost())) - self.fetch_vars = [] - self.fetch_alias = [] - self.fetch_period = self.model.get_fetch_period() + def _create_dataset(self, dataset_name): + name = "dataset." + dataset_name + "." + sparse_slots = envs.get_global_env(name + "sparse_slots") + dense_slots = envs.get_global_env(name + "dense_slots") + thread_num = envs.get_global_env(name + "thread_num") + batch_size = envs.get_global_env(name + "batch_size") + type_name = envs.get_global_env(name + "type") + if envs.get_platform() != "LINUX": + print("platform ", envs.get_platform(), " change reader to DataLoader") + type_name = "DataLoader" + padding = 0 - metrics = self.model.get_metrics() - if metrics: - self.fetch_vars = metrics.values() - self.fetch_alias = metrics.keys() - evaluate_only = envs.get_global_env( - 'evaluate_only', False, namespace='evaluate') - if evaluate_only: - context['status'] = 'infer_pass' + if type_name == "DataLoader": + return None#self._get_dataloader(dataset_name) + else: + return self._get_dataset(dataset_name) + + + reader = envs.path_adapter("paddlerec.core.utils") + "/dataset_instance.py" + pipe_cmd = "python {} {} {} {} {} {} {} {}".format( + reader, "slot", "slot", self._config_yaml, "fake", \ + sparse_slots.replace(" ", "#"), dense_slots.replace(" ", "#"), str(padding)) + + if type_name == "QueueDataset": + dataset = fluid.DatasetFactory().create_dataset() + dataset.set_batch_size(envs.get_global_env(name + "batch_size")) + dataset.set_pipe_command(pipe_cmd) + train_data_path = envs.get_global_env(name + "data_path") + file_list = [ + os.path.join(train_data_path, x) + for x in os.listdir(train_data_path) + ] + dataset.set_filelist(file_list) + for model_dict in self._env["executor"]: + if model_dict["dataset_name"] == dataset_name: + model = self._model[model_dict["name"]][3] + inputs = model.get_inputs() + dataset.set_use_var(inputs) + break else: - context['status'] = 'startup_pass' + pass + + return dataset + + def init(self, context): + for model_dict in self._env["executor"]: + self._model[model_dict["name"]] = [None] * 4 + train_program = fluid.Program() + startup_program = fluid.Program() + scope = fluid.Scope() + dataset_name = model_dict["dataset_name"] + opt_name = envs.get_global_env("hyper_parameters.optimizer.class") + opt_lr = envs.get_global_env("hyper_parameters.optimizer.learning_rate") + opt_strategy = envs.get_global_env("hyper_parameters.optimizer.strategy") + with fluid.program_guard(train_program, startup_program): + with fluid.unique_name.guard(): + model_path = model_dict["model"].replace("{workspace}", envs.path_adapter(self._env["workspace"])) + model = envs.lazy_instance_by_fliename(model_path, "Model")(self._env) + model._data_var = model.input_data(dataset_name=model_dict["dataset_name"]) + #model._init_slots(name=model_dict["name"]) + if envs.get_global_env("dataset." + dataset_name + ".type") == "DataLoader": + model._init_dataloader() + model.net(model._data_var) + optimizer = model._build_optimizer(opt_name, opt_lr, opt_strategy) + optimizer.minimize(model._cost) + self._model[model_dict["name"]][0] = train_program + self._model[model_dict["name"]][1] = startup_program + self._model[model_dict["name"]][2] = scope + self._model[model_dict["name"]][3] = model + + for dataset in self._env["dataset"]: + if dataset["type"] != "DataLoader": + self._dataset[dataset["name"]] = self._create_dataset(dataset["name"]) + + context['status'] = 'startup_pass' def startup(self, context): - self._exe.run(fluid.default_startup_program()) + for model_dict in self._env["executor"]: + with fluid.scope_guard(self._model[model_dict["name"]][2]): + self._exe.run(self._model[model_dict["name"]][1]) context['status'] = 'train_pass' - def dataloader_train(self, context): - reader = self._get_dataloader("TRAIN") - epochs = envs.get_global_env("train.epochs") + def executor_train(self, context): + epochs = int(self._env["epochs"]) + for j in range(epochs): + for model_dict in self._env["executor"]: + reader_name = model_dict["dataset_name"] + name = "dataset." + reader_name + "." + begin_time = time.time() + if envs.get_global_env(name + "type") == "DataLoader": + self._executor_dataloader_train(model_dict) + else: + self._executor_dataset_train(model_dict) + end_time = time.time() + seconds = end_time - begin_time + print("epoch {} done, time elasped: {}".format(j, seconds)) + context['status'] = "terminal_pass" - program = fluid.compiler.CompiledProgram(fluid.default_main_program( - )).with_data_parallel(loss_name=self.model.get_avg_cost().name) + def _executor_dataset_train(self, model_dict): + reader_name = model_dict["dataset_name"] + model_name = model_dict["name"] + model_class = self._model[model_name][3] + fetch_vars = [] + fetch_alias = [] + fetch_period = 20 + metrics = model_class.get_metrics() + if metrics: + fetch_vars = metrics.values() + fetch_alias = metrics.keys() + scope = self._model[model_name][2] + program = self._model[model_name][0] + reader = self._dataset[reader_name] + with fluid.scope_guard(scope): + self._exe.train_from_dataset( + program=program, + dataset=reader, + fetch_list=fetch_vars, + fetch_info=fetch_alias, + print_period=fetch_period) + + def _executor_dataloader_train(self, model_dict): + reader_name = model_dict["dataset_name"] + model_name = model_dict["name"] + model_class = self._model[model_name][3] + self._model[model_name][0] = fluid.compiler.CompiledProgram( + self._model[model_name][0]).with_data_parallel(loss_name=model_class.get_avg_cost().name) + fetch_vars = [] + fetch_alias = [] + fetch_period = 20 + metrics = model_class.get_metrics() + if metrics: + fetch_vars = metrics.values() + fetch_alias = metrics.keys() metrics_varnames = [] metrics_format = [] - metrics_format.append("{}: {{}}".format("epoch")) metrics_format.append("{}: {{}}".format("batch")) - - for name, var in self.model.get_metrics().items(): + for name, var in model_class.get_metrics().items(): metrics_varnames.append(var.name) metrics_format.append("{}: {{}}".format(name)) - metrics_format = ", ".join(metrics_format) - for epoch in range(epochs): - reader.start() - batch_id = 0 + reader = self._model[model_name][3]._data_loader + reader.start() + batch_id = 0 + scope = self._model[model_name][2] + program = self._model[model_name][0] + #print(metrics_varnames) + with fluid.scope_guard(scope): try: while True: metrics_rets = self._exe.run(program=program, fetch_list=metrics_varnames) - metrics = [epoch, batch_id] metrics.extend(metrics_rets) @@ -104,32 +284,6 @@ class SingleTrainer(TranspileTrainer): batch_id += 1 except fluid.core.EOFException: reader.reset() - self.save(epoch, "train", is_fleet=False) - - context['status'] = 'infer_pass' - - def dataset_train(self, context): - dataset = self._get_dataset("TRAIN") - ins = self._get_dataset_ins() - - epochs = envs.get_global_env("train.epochs") - for i in range(epochs): - begin_time = time.time() - self._exe.train_from_dataset( - program=fluid.default_main_program(), - dataset=dataset, - fetch_list=self.fetch_vars, - fetch_info=self.fetch_alias, - print_period=self.fetch_period) - end_time = time.time() - times = end_time - begin_time - print("epoch {} using time {}, speed {:.2f} lines/s".format( - i, times, ins / times)) - - self.save(i, "train", is_fleet=False) - context['status'] = 'infer_pass' def terminal(self, context): - for model in self.increment_models: - print("epoch :{}, dir: {}".format(model[0], model[1])) context['is_exit'] = True diff --git a/core/trainers/single_trainer_yamlopt.py b/core/trainers/single_trainer_yamlopt.py deleted file mode 100755 index efe04f59..00000000 --- a/core/trainers/single_trainer_yamlopt.py +++ /dev/null @@ -1,214 +0,0 @@ -# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" -Training use fluid with one node only. -""" - -from __future__ import print_function - -import time -import logging -import os -import paddle.fluid as fluid - -from paddlerec.core.trainers.transpiler_trainer import TranspileTrainer -from paddlerec.core.utils import envs -from paddlerec.core.reader import SlotReader - -logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s") -logger = logging.getLogger("fluid") -logger.setLevel(logging.INFO) - - -class SingleTrainerYamlOpt(TranspileTrainer): - def __init__(self, config=None): - super(TranspileTrainer, self).__init__(config) - self._env = self._config - device = envs.get_global_env("device") - if device == 'gpu': - self._place = fluid.CUDAPlace(0) - elif device == 'cpu': - self._place = fluid.CPUPlace() - self._exe = fluid.Executor(self._place) - self.processor_register() - self._model = {} - self._dataset = {} - envs.set_global_envs(self._config) - envs.update_workspace() - - def processor_register(self): - self.regist_context_processor('uninit', self.instance) - self.regist_context_processor('init_pass', self.init) - self.regist_context_processor('startup_pass', self.startup) - self.regist_context_processor('train_pass', self.executor_train) - self.regist_context_processor('terminal_pass', self.terminal) - - def instance(self, context): - context['status'] = 'init_pass' - - def _create_dataset(self, dataset_name): - name = "dataset." + dataset_name + "." - sparse_slots = envs.get_global_env(name + "sparse_slots") - dense_slots = envs.get_global_env(name + "dense_slots") - thread_num = envs.get_global_env(name + "thread_num") - batch_size = envs.get_global_env(name + "batch_size") - reader_type = envs.get_global_env(name + "type") - if envs.get_platform() != "LINUX": - print("platform ", envs.get_platform(), " change reader to DataLoader") - reader_type = "DataLoader" - padding = 0 - - reader = envs.path_adapter("paddlerec.core.utils") + "/dataset_instance.py" - pipe_cmd = "python {} {} {} {} {} {} {} {}".format( - reader, "slot", "slot", self._config_yaml, "fake", \ - sparse_slots.replace(" ", "#"), dense_slots.replace(" ", "#"), str(padding)) - - type_name = envs.get_global_env(name + "type") - if type_name == "QueueDataset": - dataset = fluid.DatasetFactory().create_dataset() - dataset.set_batch_size(envs.get_global_env(name + "batch_size")) - dataset.set_pipe_command(pipe_cmd) - train_data_path = envs.get_global_env(name + "data_path") - file_list = [ - os.path.join(train_data_path, x) - for x in os.listdir(train_data_path) - ] - dataset.set_filelist(file_list) - for model_dict in self._env["executor"]: - if model_dict["dataset_name"] == dataset_name: - model = self._model[model_dict["name"]][3] - inputs = model.get_inputs() - dataset.set_use_var(inputs) - break - else: - pass - - return dataset - - def init(self, context): - for model_dict in self._env["executor"]: - self._model[model_dict["name"]] = [None] * 4 - train_program = fluid.Program() - startup_program = fluid.Program() - scope = fluid.Scope() - opt_name = envs.get_global_env("hyper_parameters.optimizer.class") - opt_lr = envs.get_global_env("hyper_parameters.optimizer.learning_rate") - opt_strategy = envs.get_global_env("hyper_parameters.optimizer.strategy") - with fluid.program_guard(train_program, startup_program): - with fluid.unique_name.guard(): - model_path = model_dict["model"].replace("{workspace}", envs.path_adapter(self._env["workspace"])) - model = envs.lazy_instance_by_fliename(model_path, "Model")(self._env) - model._data_var = model.input_data(dataset_name=model_dict["dataset_name"]) - model.net(None) - optimizer = model._build_optimizer(opt_name, opt_lr, opt_strategy) - optimizer.minimize(model._cost) - self._model[model_dict["name"]][0] = train_program - self._model[model_dict["name"]][1] = startup_program - self._model[model_dict["name"]][2] = scope - self._model[model_dict["name"]][3] = model - - for dataset in self._env["dataset"]: - self._dataset[dataset["name"]] = self._create_dataset(dataset["name"]) - - context['status'] = 'startup_pass' - - def startup(self, context): - for model_dict in self._env["executor"]: - with fluid.scope_guard(self._model[model_dict["name"]][2]): - self._exe.run(self._model[model_dict["name"]][1]) - context['status'] = 'train_pass' - - def executor_train(self, context): - epochs = int(self._env["epochs"]) - for j in range(epochs): - for model_dict in self._env["executor"]: - reader_name = model_dict["dataset_name"] - name = "dataset." + reader_name + "." - begin_time = time.time() - if envs.get_global_env(name + "type") == "DataLoader": - self._executor_dataloader_train(model_dict) - else: - self._executor_dataset_train(model_dict) - end_time = time.time() - seconds = end_time - begin_time - print("epoch {} done, time elasped: {}".format(j, seconds)) - context['status'] = "terminal_pass" - - def _executor_dataset_train(self, model_dict): - reader_name = model_dict["dataset_name"] - model_name = model_dict["name"] - model_class = self._model[model_name][3] - fetch_vars = [] - fetch_alias = [] - fetch_period = 20 - metrics = model_class.get_metrics() - if metrics: - fetch_vars = metrics.values() - fetch_alias = metrics.keys() - scope = self._model[model_name][2] - program = self._model[model_name][0] - reader = self._dataset[reader_name] - with fluid.scope_guard(scope): - self._exe.train_from_dataset( - program=program, - dataset=reader, - fetch_list=fetch_vars, - fetch_info=fetch_alias, - print_period=fetch_period) - - - def _executor_dataloader_train(self, model_dict): - reader_name = model_dict["dataset_name"] - model_name = model_dict["name"] - model_class = self._model[model_name][3] - self._model[model_name][1] = fluid.compiler.CompiledProgram( - self._model[model_name][1]).with_data_parallel(loss_name=model_class.get_avg_cost().name) - fetch_vars = [] - fetch_alias = [] - fetch_period = 20 - metrics = model_class.get_metrics() - if metrics: - fetch_vars = metrics.values() - fetch_alias = metrics.keys() - metrics_varnames = [] - metrics_format = [] - metrics_format.append("{}: {{}}".format("epoch")) - metrics_format.append("{}: {{}}".format("batch")) - for name, var in model_class.items(): - metrics_varnames.append(var.name) - metrics_format.append("{}: {{}}".format(name)) - metrics_format = ", ".join(metrics_format) - - reader = self._dataset[reader_name] - reader.start() - batch_id = 0 - scope = self._model[model_name][2] - prorgram = self._model[model_name][0] - with fluid.scope_guard(scope): - try: - while True: - metrics_rets = self._exe.run(program=program, - fetch_list=metrics_varnames) - - metrics = [epoch, batch_id] - metrics.extend(metrics_rets) - - if batch_id % self.fetch_period == 0 and batch_id != 0: - print(metrics_format.format(*metrics)) - batch_id += 1 - except fluid.core.EOFException: - reader.reset() - - def terminal(self, context): - context['is_exit'] = True diff --git a/core/trainers/transpiler_trainer.py b/core/trainers/transpiler_trainer.py index c121b4ab..b3c09797 100755 --- a/core/trainers/transpiler_trainer.py +++ b/core/trainers/transpiler_trainer.py @@ -119,6 +119,10 @@ class TranspileTrainer(Trainer): pipe_cmd = "python {} {} {} {}".format(reader, reader_class, state, self._config_yaml) else: + if sparse_slots is None: + sparse_slots = "#" + if dense_slots is None: + dense_slots = "#" padding = envs.get_global_env("padding", 0, namespace) pipe_cmd = "python {} {} {} {} {} {} {} {}".format( reader, "slot", "slot", self._config_yaml, namespace, \ diff --git a/core/utils/dataloader_instance.py b/core/utils/dataloader_instance.py index 8d4db6f8..b68cbf76 100755 --- a/core/utils/dataloader_instance.py +++ b/core/utils/dataloader_instance.py @@ -14,12 +14,102 @@ from __future__ import print_function import os - from paddlerec.core.utils.envs import lazy_instance_by_fliename from paddlerec.core.utils.envs import get_global_env from paddlerec.core.utils.envs import get_runtime_environ from paddlerec.core.reader import SlotReader +def dataloader_by_name(readerclass, dataset_name, yaml_file): + reader_class = lazy_instance_by_fliename(readerclass, "TrainReader") + name = "dataset." + dataset_name + "." + data_path = get_global_env(name + "data_path") + #else: + # reader_name = "SlotReader" + # namespace = "evaluate.reader" + # data_path = get_global_env("test_data_path", None, namespace) + + if data_path.startswith("paddlerec::"): + package_base = get_runtime_environ("PACKAGE_BASE") + assert package_base is not None + data_path = os.path.join(package_base, data_path.split("::")[1]) + + files = [str(data_path) + "/%s" % x for x in os.listdir(data_path)] + + reader = reader_class(yaml_file) + reader.init() + def gen_reader(): + for file in files: + with open(file, 'r') as f: + for line in f: + line = line.rstrip('\n') + iter = reader.generate_sample(line) + for parsed_line in iter(): + if parsed_line is None: + continue + else: + values = [] + for pased in parsed_line: + values.append(pased[1]) + yield values + + def gen_batch_reader(): + return reader.generate_batch_from_trainfiles(files) + + if hasattr(reader, 'generate_batch_from_trainfiles'): + return gen_batch_reader() + return gen_reader + + +def slotdataloader_by_name(readerclass, dataset_name, yaml_file): + name = "dataset." + dataset_name + "." + #if train == "TRAIN": + reader_name = "SlotReader" + # namespace = "train.reader" + print(name) + data_path = get_global_env(name + "data_path") + #else: + # reader_name = "SlotReader" + # namespace = "evaluate.reader" + # data_path = get_global_env("test_data_path", None, namespace) + + if data_path.startswith("paddlerec::"): + package_base = get_runtime_environ("PACKAGE_BASE") + assert package_base is not None + data_path = os.path.join(package_base, data_path.split("::")[1]) + + files = [str(data_path) + "/%s" % x for x in os.listdir(data_path)] + + #sparse = get_global_env("sparse_slots", None, namespace) + #dense = get_global_env("dense_slots", None, namespace) + #padding = get_global_env("padding", 0, namespace) + #name = "dataset." + dataset_name + "." + sparse = get_global_env(name + "sparse_slots") + dense = get_global_env(name + "dense_slots") + padding = get_global_env(name + "padding", 0) + reader = SlotReader(yaml_file) + reader.init(sparse, dense, int(padding)) + + def gen_reader(): + for file in files: + with open(file, 'r') as f: + for line in f: + line = line.rstrip('\n') + iter = reader.generate_sample(line) + for parsed_line in iter(): + if parsed_line is None: + continue + else: + values = [] + for pased in parsed_line: + values.append(pased[1]) + yield values + + def gen_batch_reader(): + return reader.generate_batch_from_trainfiles(files) + + if hasattr(reader, 'generate_batch_from_trainfiles'): + return gen_batch_reader() + return gen_reader def dataloader(readerclass, train, yaml_file): if train == "TRAIN": diff --git a/core/utils/envs.py b/core/utils/envs.py index ac9f2d0f..9937956c 100755 --- a/core/utils/envs.py +++ b/core/utils/envs.py @@ -80,8 +80,6 @@ def set_global_envs(envs): global_envs[global_k] = v fatten_env_namespace([], envs) - for i in global_envs: - print i,":",global_envs[i] def get_global_env(env_name, default_value=None, namespace=None): """ diff --git a/models/rank/dnn/config.yaml b/models/rank/dnn/config.yaml index ac871d0c..f5863498 100755 --- a/models/rank/dnn/config.yaml +++ b/models/rank/dnn/config.yaml @@ -21,7 +21,8 @@ workspace: "paddlerec.models.rank.dnn" dataset: - name: dataset_2 batch_size: 2 - type: QueueDataset + #type: QueueDataset + type: DataLoader data_path: "{workspace}/data/sample_data/train" sparse_slots: "click 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26" dense_slots: "dense_var:13" @@ -38,8 +39,8 @@ hyper_parameters: fc_sizes: [512, 256, 128, 32] epoch: - trainer_class: single_yamlopt - #trainer_class: single_auc_yamlopt + name: + trainer_class: single save_checkpoint_interval: 2 save_inference_interval: 4 save_checkpoint_path: "increment" diff --git a/run.py b/run.py index 4920ca6a..fb5c9a94 100755 --- a/run.py +++ b/run.py @@ -28,7 +28,7 @@ device = ["CPU", "GPU"] clusters = ["SINGLE", "LOCAL_CLUSTER", "CLUSTER"] engine_choices = [ "SINGLE", "LOCAL_CLUSTER", "CLUSTER", "TDM_SINGLE", "TDM_LOCAL_CLUSTER", - "TDM_CLUSTER", "SINGLE_YAMLOPT", "SINGLE_AUC_YAMLOPT" + "TDM_CLUSTER" ] custom_model = ['TDM'] model_name = "" @@ -41,10 +41,6 @@ def engine_registry(): engines["TRANSPILER"]["SINGLE"] = single_engine engines["TRANSPILER"]["LOCAL_CLUSTER"] = local_cluster_engine engines["TRANSPILER"]["CLUSTER"] = cluster_engine - - engines["TRANSPILER"]["SINGLE_YAMLOPT"] = single_yamlopt_engine - engines["TRANSPILER"]["SINGLE_AUC_YAMLOPT"] = single_auc_yamlopt_engine - engines["PSLIB"]["SINGLE"] = local_mpi_engine engines["PSLIB"]["LOCAL_CLUSTER"] = local_mpi_engine engines["PSLIB"]["CLUSTER"] = cluster_mpi_engine @@ -139,32 +135,6 @@ def single_engine(args): trainer = TrainerFactory.create(args.model) return trainer -def single_yamlopt_engine(args): - trainer = get_trainer_prefix(args) + "SingleTrainerYamlOpt" - single_envs = {} - single_envs["train.trainer.trainer"] = trainer - single_envs["train.trainer.threads"] = "2" - single_envs["train.trainer.engine"] = "single_yamlopt" - single_envs["train.trainer.platform"] = envs.get_platform() - print("use {} engine to run model: {}".format(trainer, args.model)) - - set_runtime_envs(single_envs, args.model) - trainer = TrainerFactory.create(args.model) - return trainer - -def single_auc_yamlopt_engine(args): - trainer = get_trainer_prefix(args) + "SingleAucYamlOpt" - single_envs = {} - single_envs["train.trainer.trainer"] = trainer - single_envs["train.trainer.threads"] = "2" - single_envs["train.trainer.engine"] = "single_yamlopt" - single_envs["train.trainer.platform"] = envs.get_platform() - print("use {} engine to run model: {}".format(trainer, args.model)) - - set_runtime_envs(single_envs, args.model) - trainer = TrainerFactory.create(args.model) - return trainer - def cluster_engine(args): def update_workspace(cluster_envs): workspace = cluster_envs.get("engine_workspace", None) -- GitLab