diff --git a/fleetrec/models/base.py b/fleetrec/models/base.py index 763c36663ea03eb21c16823e575bd3b6474a1256..99e822dcb8f8984a355db999256f9143883f7476 100644 --- a/fleetrec/models/base.py +++ b/fleetrec/models/base.py @@ -63,7 +63,7 @@ def create(config): model = None if config['mode'] == 'fluid': model = YamlModel(config) - model.build_model() + model.net() return model @@ -94,13 +94,13 @@ class Model(object): return self._fetch_interval @abc.abstractmethod - def shrink(self, params): + def net(self): """R """ pass @abc.abstractmethod - def build_model(self): + def shrink(self, params): """R """ pass @@ -140,7 +140,7 @@ class YamlModel(Model): self._build_param = {'layer': {}, 'inner_layer': {}, 'layer_extend': {}, 'model': {}} self._inference_meta = {'dependency': {}, 'params': {}} - def build_model(self): + def net(self): """R build a fluid model with config Return: @@ -287,4 +287,4 @@ class YamlModel(Model): dependency_list = copy.deepcopy(dependencys) for dependency in dependencys: dependency_list = dependency_list + self.get_dependency(layer_graph, dependency) - return list(set(dependency_list)) \ No newline at end of file + return list(set(dependency_list)) diff --git a/fleetrec/models/ctr_dnn/dataloader.py b/fleetrec/models/ctr_dnn/dataloader.py deleted file mode 100644 index 7806fc4604d60b09c1a3786c31dc2dfaf6d64621..0000000000000000000000000000000000000000 --- a/fleetrec/models/ctr_dnn/dataloader.py +++ /dev/null @@ -1,83 +0,0 @@ -# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ...utils import envs - -# There are 13 integer features and 26 categorical features -continous_features = range(1, 14) -categorial_features = range(14, 40) -continous_clip = [20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50] - - -class CriteoDataset(object): - def __init__(self, sparse_feature_dim): - self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] - self.cont_max_ = [ - 20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50 - ] - self.cont_diff_ = [ - 20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50 - ] - self.hash_dim_ = sparse_feature_dim - # here, training data are lines with line_index < train_idx_ - self.train_idx_ = 41256555 - self.continuous_range_ = range(1, 14) - self.categorical_range_ = range(14, 40) - - def _reader_creator(self, file_list, is_train, trainer_num, trainer_id): - def reader(): - for file in file_list: - with open(file, 'r') as f: - line_idx = 0 - for line in f: - line_idx += 1 - features = line.rstrip('\n').split('\t') - dense_feature = [] - sparse_feature = [] - for idx in self.continuous_range_: - if features[idx] == '': - dense_feature.append(0.0) - else: - dense_feature.append( - (float(features[idx]) - - self.cont_min_[idx - 1]) / - self.cont_diff_[idx - 1]) - for idx in self.categorical_range_: - sparse_feature.append([ - hash(str(idx) + features[idx]) % self.hash_dim_ - ]) - - label = [int(features[0])] - yield [dense_feature] + sparse_feature + [label] - - return reader - - def train(self, file_list, trainer_num, trainer_id): - return self._reader_creator(file_list, True, trainer_num, trainer_id) - - def test(self, file_list): - return self._reader_creator(file_list, False, 1, 0) - - -def Train(): - sparse_feature_number = envs.get_global_env("sparse_feature_number") - train_generator = CriteoDataset(sparse_feature_number) - return train_generator.train - - -def Evaluate(): - sparse_feature_number = envs.get_global_env("sparse_feature_number") - train_generator = CriteoDataset(sparse_feature_number) - return train_generator.test - diff --git a/fleetrec/models/ctr_dnn/dataset.py b/fleetrec/models/ctr_dnn/dataset.py deleted file mode 100644 index 67df683cc5cb29eb9559e2982a71c3a6dd0c2e43..0000000000000000000000000000000000000000 --- a/fleetrec/models/ctr_dnn/dataset.py +++ /dev/null @@ -1,69 +0,0 @@ -# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from __future__ import print_function -import sys - -import paddle.fluid.incubate.data_generator as dg - - -cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] -cont_max_ = [20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50] -cont_diff_ = [20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50] -hash_dim_ = 1000001 -continuous_range_ = range(1, 14) -categorical_range_ = range(14, 40) - - -class CriteoDataset(dg.MultiSlotDataGenerator): - """ - DacDataset: inheritance MultiSlotDataGeneratior, Implement data reading - Help document: http://wiki.baidu.com/pages/viewpage.action?pageId=728820675 - """ - def generate_sample(self, line): - """ - Read the data line by line and process it as a dictionary - """ - def reader(): - """ - This function needs to be implemented by the user, based on data format - """ - features = line.rstrip('\n').split('\t') - - dense_feature = [] - sparse_feature = [] - for idx in continuous_range_: - if features[idx] == "": - dense_feature.append(0.0) - else: - dense_feature.append( - (float(features[idx]) - cont_min_[idx - 1]) / - cont_diff_[idx - 1]) - - for idx in categorical_range_: - sparse_feature.append( - [hash(str(idx) + features[idx]) % hash_dim_]) - label = [int(features[0])] - process_line = dense_feature, sparse_feature, label - feature_name = ["dense_input"] - for idx in categorical_range_: - feature_name.append("C" + str(idx - 13)) - feature_name.append("label") - - yield zip(feature_name, [dense_feature] + sparse_feature + [label]) - - return reader - - -d = CriteoDataset() -d.run_from_stdin() diff --git a/fleetrec/models/ctr_dnn/model.py b/fleetrec/models/ctr_dnn/model.py index f65a60e8203c2b1db52cab6da6f39977d01c11eb..5c0f6e9bcd455ea816a459be1b70bab7c1655538 100644 --- a/fleetrec/models/ctr_dnn/model.py +++ b/fleetrec/models/ctr_dnn/model.py @@ -19,7 +19,7 @@ from fleetrec.utils import envs from fleetrec.models.base import Model -class Train(Model): +class TrainModel(Model): def __init__(self, config): Model.__init__(self, config) self.namespace = "train.model" @@ -34,7 +34,7 @@ class Train(Model): lod_level=1, dtype="int64") for i in range(1, ids) ] - return sparse_input_ids, [var.name for var in sparse_input_ids] + return sparse_input_ids def dense_input(): dim = envs.get_global_env("hyper_parameters.dense_input_dim", None, self.namespace) @@ -42,23 +42,20 @@ class Train(Model): dense_input_var = fluid.layers.data(name="dense_input", shape=[dim], dtype="float32") - return dense_input_var, dense_input_var.name + return dense_input_var def label_input(): label = fluid.layers.data(name="label", shape=[1], dtype="int64") - return label, label.name + return label - self.sparse_inputs, self.sparse_input_varnames = sparse_inputs() - self.dense_input, self.dense_input_varname = dense_input() - self.label_input, self.label_input_varname = label_input() + self.sparse_inputs = sparse_inputs() + self.dense_input = dense_input() + self.label_input = label_input() - def input_vars(self): + def inputs(self): return [self.dense_input] + self.sparse_inputs + [self.label_input] - def input_varnames(self): - return [input.name for input in self.input_vars()] - - def build_model(self): + def net(self): def embedding_layer(input): sparse_feature_number = envs.get_global_env("hyper_parameters.sparse_feature_number", None, self.namespace) sparse_feature_dim = envs.get_global_env("hyper_parameters.sparse_feature_dim", None, self.namespace) @@ -120,20 +117,8 @@ class Train(Model): optimizer = fluid.optimizer.Adam(learning_rate, lazy_mode=True) return optimizer - def dump_model_program(self, path): - pass - - def dump_inference_param(self, params): - pass - - def dump_inference_program(self, inference_layer, path): - pass - - def shrink(self, params): - pass - -class Evaluate(object): +class EvaluateModel(object): def input(self): pass diff --git a/fleetrec/trainer/cluster_trainer.py b/fleetrec/trainer/cluster_trainer.py index b82799a0de6dad6bb7b153ae73725e4d0bced52c..6b3e6471841bb526d2cc95c1a2b328833d82570e 100644 --- a/fleetrec/trainer/cluster_trainer.py +++ b/fleetrec/trainer/cluster_trainer.py @@ -32,11 +32,7 @@ logger = logging.getLogger("fluid") logger.setLevel(logging.INFO) -class ClusterTrainerWithDataloader(TranspileTrainer): - pass - - -class ClusterTrainerWithDataset(TranspileTrainer): +class ClusterTrainer(TranspileTrainer): def processor_register(self): role = PaddleCloudRoleMaker() fleet.init(role) @@ -71,7 +67,7 @@ class ClusterTrainerWithDataset(TranspileTrainer): def init(self, context): self.model.input() - self.model.build_model() + self.model.net() self.model.metrics() self.model.avg_loss() optimizer = self.model.optimizer() diff --git a/fleetrec/trainer/factory.py b/fleetrec/trainer/factory.py index 6434d7da82b46055dd2aa91ff1b5b633b3f2dc34..74369eec9fc15dd35e67837ec39bfa43267a49be 100644 --- a/fleetrec/trainer/factory.py +++ b/fleetrec/trainer/factory.py @@ -10,46 +10,19 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and -# limitations under the License.# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and # limitations under the License. import os import sys import yaml - -from fleetrec.trainer.single_trainer import SingleTrainerWithDataloader -from fleetrec.trainer.single_trainer import SingleTrainerWithDataset - -from fleetrec.trainer.cluster_trainer import ClusterTrainerWithDataloader -from fleetrec.trainer.cluster_trainer import ClusterTrainerWithDataset - from fleetrec.trainer.local_engine import Launch +from fleetrec.trainer.single_trainer import SingleTrainer +from fleetrec.trainer.cluster_trainer import ClusterTrainer from fleetrec.trainer.ctr_trainer import CtrPaddleTrainer from fleetrec.utils import envs - - -def str2bool(v): - if isinstance(v, bool): - return v - if v.lower() in ('yes', 'true', 't', 'y', '1'): - return True - elif v.lower() in ('no', 'false', 'f', 'n', '0'): - return False - else: - raise ValueError('Boolean value expected.') +from fleetrec.utils import util class TrainerFactory(object): @@ -61,21 +34,10 @@ class TrainerFactory(object): print(envs.pretty_print_envs(envs.get_global_envs())) train_mode = envs.get_global_env("train.trainer") - reader_mode = envs.get_global_env("train.reader.mode") if train_mode == "SingleTraining": - if reader_mode == "dataset": - trainer = SingleTrainerWithDataset() - elif reader_mode == "dataloader": - trainer = SingleTrainerWithDataloader() - else: - raise ValueError("reader only support dataset/dataloader") + trainer = SingleTrainer() elif train_mode == "ClusterTraining": - if reader_mode == "dataset": - trainer = ClusterTrainerWithDataset() - elif reader_mode == "dataloader": - trainer = ClusterTrainerWithDataloader() - else: - raise ValueError("reader only support dataset/dataloader") + trainer = ClusterTrainer() elif train_mode == "CtrTrainer": trainer = CtrPaddleTrainer(config) else: @@ -108,7 +70,7 @@ class TrainerFactory(object): envs.set_global_envs(_config) mode = envs.get_global_env("train.trainer") container = envs.get_global_env("train.container") - instance = str2bool(os.getenv("CLUSTER_INSTANCE", "0")) + instance = util.str2bool(os.getenv("CLUSTER_INSTANCE", "0")) if mode == "ClusterTraining" and container == "local" and not instance: trainer = TrainerFactory._build_engine(config) @@ -124,4 +86,3 @@ if __name__ == "__main__": raise ValueError("need a yaml file path argv") trainer = TrainerFactory.create(sys.argv[1]) trainer.run() - diff --git a/fleetrec/trainer/single_trainer.py b/fleetrec/trainer/single_trainer.py index ed03e42a63fdeafbdb9e3eb1b4230b2d72edcc03..8cb0be63ba1ea08b459a24779d4e45735c57ebae 100644 --- a/fleetrec/trainer/single_trainer.py +++ b/fleetrec/trainer/single_trainer.py @@ -17,25 +17,18 @@ Training use fluid with one node only. """ from __future__ import print_function -import os -import time -import numpy as np import logging import paddle.fluid as fluid -from .transpiler_trainer import TranspileTrainer -from ..utils import envs +from fleetrec.trainer.transpiler_trainer import TranspileTrainer +from fleetrec.utils import envs logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger("fluid") logger.setLevel(logging.INFO) -class SingleTrainerWithDataloader(TranspileTrainer): - pass - - -class SingleTrainerWithDataset(TranspileTrainer): +class SingleTrainer(TranspileTrainer): def processor_register(self): self.regist_context_processor('uninit', self.instance) self.regist_context_processor('init_pass', self.init) diff --git a/fleetrec/trainer/trainer.py b/fleetrec/trainer/trainer.py index 0819c84f50ec476d2bd877850bf4ca9c91deb343..b4b3dc57f63e49c494daf66ef8c2c8678e1838ad 100755 --- a/fleetrec/trainer/trainer.py +++ b/fleetrec/trainer/trainer.py @@ -14,11 +14,8 @@ import abc import time -import yaml from paddle import fluid -from ..utils import envs - class Trainer(object): """R diff --git a/fleetrec/trainer/transpiler_trainer.py b/fleetrec/trainer/transpiler_trainer.py index fa568851986e02585f3f556c9f204247116ee1db..44571962ffeb67aeeef8f73ad70513629c7fd11f 100644 --- a/fleetrec/trainer/transpiler_trainer.py +++ b/fleetrec/trainer/transpiler_trainer.py @@ -18,10 +18,9 @@ Training use fluid with DistributeTranspiler import os import paddle.fluid as fluid - from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet -from fleetrec.trainer import Trainer +from fleetrec.trainer.trainer import Trainer from fleetrec.utils import envs @@ -39,15 +38,18 @@ class TranspileTrainer(Trainer): def _get_dataset(self): namespace = "train.reader" - inputs = self.model.input_vars() + inputs = self.model.inputs() threads = envs.get_global_env("train.threads", None) batch_size = envs.get_global_env("batch_size", None, namespace) - pipe_command = envs.get_global_env("pipe_command", None, namespace) + reader_class = envs.get_global_env("class", None, namespace) + abs_dir = os.path.dirname(os.path.abspath(__file__)) + reader = os.path.join(abs_dir, '..', 'reader_implement.py') + pipe_cmd = "python {} {} {}".format(reader, reader_class, "TRAIN") train_data_path = envs.get_global_env("train_data_path", None, namespace) dataset = fluid.DatasetFactory().create_dataset() dataset.set_use_var(inputs) - dataset.set_pipe_command(pipe_command) + dataset.set_pipe_command(pipe_cmd) dataset.set_batch_size(batch_size) dataset.set_thread(threads) file_list = [ diff --git a/fleetrec/utils/util.py b/fleetrec/utils/util.py index 950aa59c4786328c90a711baebab752b51b78b17..8b5971d0fe3c0b3fd9b9221a0e6c2c7bf16c68fb 100755 --- a/fleetrec/utils/util.py +++ b/fleetrec/utils/util.py @@ -15,7 +15,18 @@ import os import time import datetime -from .. utils import fs as fs +from ..utils import fs as fs + + +def str2bool(v): + if isinstance(v, bool): + return v + if v.lower() in ('yes', 'true', 't', 'y', '1'): + return True + elif v.lower() in ('no', 'false', 'f', 'n', '0'): + return False + else: + raise ValueError('Boolean value expected.') def get_env_value(env_name):