From afec7a4914c527e7441ce9ae9c5bb275a220cf95 Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Thu, 2 Apr 2020 06:41:35 +0000 Subject: [PATCH] debug ctr-dnn local training --- examples/ctr-dnn_train.yaml | 8 +-- models/ctr_dnn/__init__.py | 13 ++++ .../ctr_dnn/{ => data/test}/sample_test.txt | 0 .../ctr_dnn/{ => data/train}/sample_train.txt | 0 models/ctr_dnn/{reader.py => dataloader.py} | 0 models/ctr_dnn/dataset.py | 65 +++++++++++++++++++ models/ctr_dnn/model.py | 26 ++++---- trainer/factory.py | 3 + trainer/single_train.py | 36 ++++++---- utils/envs.py | 61 +++++++++++++---- 10 files changed, 170 insertions(+), 42 deletions(-) create mode 100755 models/ctr_dnn/__init__.py rename models/ctr_dnn/{ => data/test}/sample_test.txt (100%) rename models/ctr_dnn/{ => data/train}/sample_train.txt (100%) rename models/ctr_dnn/{reader.py => dataloader.py} (100%) create mode 100644 models/ctr_dnn/dataset.py diff --git a/examples/ctr-dnn_train.yaml b/examples/ctr-dnn_train.yaml index 98a533cb..22f03c97 100644 --- a/examples/ctr-dnn_train.yaml +++ b/examples/ctr-dnn_train.yaml @@ -25,7 +25,6 @@ # limitations under the License. train: - batch_size: 32 threads: 12 epochs: 10 trainer: "SingleTraining" @@ -35,11 +34,12 @@ train: reader: mode: "dataset" - pipe_command: "python reader.py dataset" - train_data_path: "raw_data" + batch_size: 32 + pipe_command: "python /paddle/eleps/models/ctr_dnn/dataset.py" + train_data_path: "/paddle/eleps/models/ctr_dnn/data/train" model: - models: "eleps.models.ctr_dnn.model.py" + models: "eleps.models.ctr_dnn.model" hyper_parameters: sparse_inputs_slots: 27 sparse_feature_number: 1000001 diff --git a/models/ctr_dnn/__init__.py b/models/ctr_dnn/__init__.py new file mode 100755 index 00000000..abf198b9 --- /dev/null +++ b/models/ctr_dnn/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/models/ctr_dnn/sample_test.txt b/models/ctr_dnn/data/test/sample_test.txt similarity index 100% rename from models/ctr_dnn/sample_test.txt rename to models/ctr_dnn/data/test/sample_test.txt diff --git a/models/ctr_dnn/sample_train.txt b/models/ctr_dnn/data/train/sample_train.txt similarity index 100% rename from models/ctr_dnn/sample_train.txt rename to models/ctr_dnn/data/train/sample_train.txt diff --git a/models/ctr_dnn/reader.py b/models/ctr_dnn/dataloader.py similarity index 100% rename from models/ctr_dnn/reader.py rename to models/ctr_dnn/dataloader.py diff --git a/models/ctr_dnn/dataset.py b/models/ctr_dnn/dataset.py new file mode 100644 index 00000000..925ab6b7 --- /dev/null +++ b/models/ctr_dnn/dataset.py @@ -0,0 +1,65 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle.fluid.incubate.data_generator as dg + + +cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] +cont_max_ = [20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50] +cont_diff_ = [20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50] +hash_dim_ = 1000001 +continuous_range_ = range(1, 14) +categorical_range_ = range(14, 40) + + +class CriteoDataset(dg.MultiSlotDataGenerator): + """ + DacDataset: inheritance MultiSlotDataGeneratior, Implement data reading + Help document: http://wiki.baidu.com/pages/viewpage.action?pageId=728820675 + """ + def generate_sample(self, line): + """ + Read the data line by line and process it as a dictionary + """ + def reader(): + """ + This function needs to be implemented by the user, based on data format + """ + features = line.rstrip('\n').split('\t') + dense_feature = [] + sparse_feature = [] + for idx in continuous_range_: + if features[idx] == "": + dense_feature.append(0.0) + else: + dense_feature.append( + (float(features[idx]) - cont_min_[idx - 1]) / + cont_diff_[idx - 1]) + for idx in categorical_range_: + sparse_feature.append( + [hash(str(idx) + features[idx]) % hash_dim_]) + label = [int(features[0])] + process_line = dense_feature, sparse_feature, label + feature_name = ["dense_input"] + for idx in categorical_range_: + feature_name.append("C" + str(idx - 13)) + feature_name.append("label") + + yield zip(feature_name, [dense_feature] + sparse_feature + [label]) + + return reader + + +d = CriteoDataset() +d.run_from_stdin() diff --git a/models/ctr_dnn/model.py b/models/ctr_dnn/model.py index 44e0087d..45b5274b 100644 --- a/models/ctr_dnn/model.py +++ b/models/ctr_dnn/model.py @@ -15,7 +15,7 @@ import math import paddle.fluid as fluid -from ...utils import envs +from eleps.utils import envs class Train(object): @@ -28,10 +28,12 @@ class Train(object): self.sparse_input_varnames = [] self.dense_input_varname = None self.label_input_varname = None + + self.namespace = "train.model" def input(self): def sparse_inputs(): - ids = envs.get_global_env("sparse_inputs_counts") + ids = envs.get_global_env("hyper_parameters.sparse_inputs_slots", None ,self.namespace) sparse_input_ids = [ fluid.layers.data(name="C" + str(i), @@ -42,10 +44,10 @@ class Train(object): return sparse_input_ids, [var.name for var in sparse_input_ids] def dense_input(): - dense_input_dim = envs.get_global_env("dense_input_dim") + dim = envs.get_global_env("hyper_parameters.dense_input_dim", None ,self.namespace) dense_input_var = fluid.layers.data(name="dense_input", - shape=dense_input_dim, + shape=[dim], dtype="float32") return dense_input_var, dense_input_var.name @@ -65,13 +67,13 @@ class Train(object): def net(self): def embedding_layer(input): - sparse_feature_number = envs.get_global_env("sparse_feature_number") - sparse_feature_dim = envs.get_global_env("sparse_feature_dim") + sparse_feature_number = envs.get_global_env("hyper_parameters.sparse_feature_number", None ,self.namespace) + sparse_feature_dim = envs.get_global_env("hyper_parameters.sparse_feature_dim", None ,self.namespace) emb = fluid.layers.embedding( input=input, is_sparse=True, - size=[{sparse_feature_number}, {sparse_feature_dim}], + size=[sparse_feature_number, sparse_feature_dim], param_attr=fluid.ParamAttr( name="SparseFeatFactors", initializer=fluid.initializer.Uniform()), @@ -92,7 +94,7 @@ class Train(object): concated = fluid.layers.concat(sparse_embed_seq + [self.dense_input], axis=1) fcs = [concated] - hidden_layers = envs.get_global_env("fc_sizes") + hidden_layers = envs.get_global_env("hyper_parameters.fc_sizes", None ,self.namespace) for size in hidden_layers: fcs.append(fc(fcs[-1], size)) @@ -107,8 +109,8 @@ class Train(object): self.predict = predict - def avg_loss(self, predict): - cost = fluid.layers.cross_entropy(input=predict, label=self.label_input) + def avg_loss(self): + cost = fluid.layers.cross_entropy(input=self.predict, label=self.label_input) avg_cost = fluid.layers.reduce_sum(cost) self.loss = avg_cost return avg_cost @@ -120,8 +122,10 @@ class Train(object): slide_steps=20) self.metrics = (auc, batch_auc) + return self.metrics + def optimizer(self): - learning_rate = envs.get_global_env("learning_rate") + learning_rate = envs.get_global_env("hyper_parameters.learning_rate", None ,self.namespace) optimizer = fluid.optimizer.Adam(learning_rate, lazy_mode=True) return optimizer diff --git a/trainer/factory.py b/trainer/factory.py index 393a2d44..e6c4d1db 100644 --- a/trainer/factory.py +++ b/trainer/factory.py @@ -80,6 +80,9 @@ class TrainerFactory(object): raise ValueError("unknown config about eleps") envs.set_global_envs(_config) + + print(envs.pretty_print_envs()) + trainer = TrainerFactory._build_trainer(_config) return trainer diff --git a/trainer/single_train.py b/trainer/single_train.py index 0865b419..3b157936 100644 --- a/trainer/single_train.py +++ b/trainer/single_train.py @@ -51,7 +51,10 @@ class SingleTrainer(Trainer): self.regist_context_processor('terminal_pass', self.terminal) def instance(self, context): - model_package = __import__(envs.get_global_env("train.model.models")) + + models = envs.get_global_env("train.model.models") + model_package = __import__(models, globals(), locals(), models.split(".")) + train_model = getattr(model_package, 'Train') self.model = train_model() @@ -64,7 +67,7 @@ class SingleTrainer(Trainer): self.metrics = self.model.metrics() loss = self.model.avg_loss() - optimizer = self.model.get_optimizer() + optimizer = self.model.optimizer() optimizer.minimize(loss) # run startup program at once @@ -89,15 +92,24 @@ class SingleTrainerWithDataloader(SingleTrainer): class SingleTrainerWithDataset(SingleTrainer): - def _get_dataset(self, inputs, threads, batch_size, pipe_command, train_files_path): + def _get_dataset(self): + namespace = "train.reader" + + inputs = self.model.input_vars() + threads = envs.get_global_env("train.threads", None) + batch_size = envs.get_global_env("batch_size", None, namespace) + pipe_command = envs.get_global_env("pipe_command", None, namespace) + train_data_path = envs.get_global_env("train_data_path", None, namespace) + + dataset = fluid.DatasetFactory().create_dataset() dataset.set_use_var(inputs) dataset.set_pipe_command(pipe_command) dataset.set_batch_size(batch_size) dataset.set_thread(threads) file_list = [ - os.path.join(train_files_path, x) - for x in os.listdir(train_files_path) + os.path.join(train_data_path, x) + for x in os.listdir(train_data_path) ] dataset.set_filelist(file_list) @@ -146,21 +158,17 @@ class SingleTrainerWithDataset(SingleTrainer): save_inference_model() def train(self, context): - inputs = self.model.input_vars() - threads = envs.get_global_env("threads") - batch_size = envs.get_global_env("batch_size") - pipe_command = envs.get_global_env("pipe_command") - train_data_path = envs.get_global_env("train_data_path") + dataset = self._get_dataset() - dataset = self._get_dataset(inputs, threads, batch_size, pipe_command, train_data_path) + epochs = envs.get_global_env("train.epochs") - epochs = envs.get_global_env("epochs") + print("fetch_list: {}".format(len(self.metrics))) for i in range(epochs): self.exe.train_from_dataset(program=fluid.default_main_program(), dataset=dataset, - fetch_list=[self.metrics], - fetch_info=["epoch {} auc ".format(i)], + fetch_list=self.metrics, + fetch_info=["auc ", "batch auc"], print_period=100) context['status'] = 'infer_pass' diff --git a/utils/envs.py b/utils/envs.py index 7bdfb988..b4ea3457 100644 --- a/utils/envs.py +++ b/utils/envs.py @@ -14,27 +14,62 @@ import os +import copy +global_envs = {} -def encode_value(v): - return v +def set_global_envs(envs): + assert isinstance(envs, dict) -def decode_value(v): - return v + def fatten_env_namespace(namespace_nests, local_envs): + for k, v in local_envs.items(): + if isinstance(v, dict): + nests = copy.deepcopy(namespace_nests) + nests.append(k) + fatten_env_namespace(nests, v) + else: + global_k = ".".join(namespace_nests + [k]) + global_envs[global_k] = v + for k, v in envs.items(): + fatten_env_namespace([k], v) -def set_global_envs(yaml): - for k, v in yaml.items(): - os.environ[k] = encode_value(v) - -def get_global_env(env_name, default_value=None): +def get_global_env(env_name, default_value=None, namespace=None): """ get os environment value """ - if env_name not in os.environ: - return default_value + _env_name = env_name if namespace is None else ".".join([namespace, env_name]) + return global_envs.get(_env_name, default_value) + + +def pretty_print_envs(): + spacing = 5 + max_k = 45 + max_v = 20 + + for k, v in global_envs.items(): + max_k = max(max_k, len(k)) + max_v = max(max_v, len(str(v))) + + h_format = "{{:^{}s}}{{:<{}s}}\n".format(max_k, max_v) + l_format = "{{:<{}s}}{{}}{{:<{}s}}\n".format(max_k, max_v) + length = max_k + max_v + spacing + + border = "".join(["="] * length) + line = "".join(["-"] * length) + + draws = "" + draws += border + "\n" + draws += h_format.format("Eleps Global Envs", "Value") + draws += line + "\n" + + for k, v in global_envs.items(): + draws += l_format.format(k, " " * spacing, str(v)) + + draws += border + + _str = "\n{}\n".format(draws) + return _str - v = os.environ[env_name] - return decode_value(v) -- GitLab