提交 afec7a49 编写于 作者: T tangwei12

debug ctr-dnn local training

上级 987e86a6
......@@ -25,7 +25,6 @@
# limitations under the License.
train:
batch_size: 32
threads: 12
epochs: 10
trainer: "SingleTraining"
......@@ -35,11 +34,12 @@ train:
reader:
mode: "dataset"
pipe_command: "python reader.py dataset"
train_data_path: "raw_data"
batch_size: 32
pipe_command: "python /paddle/eleps/models/ctr_dnn/dataset.py"
train_data_path: "/paddle/eleps/models/ctr_dnn/data/train"
model:
models: "eleps.models.ctr_dnn.model.py"
models: "eleps.models.ctr_dnn.model"
hyper_parameters:
sparse_inputs_slots: 27
sparse_feature_number: 1000001
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.fluid.incubate.data_generator as dg
cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
cont_max_ = [20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
cont_diff_ = [20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
hash_dim_ = 1000001
continuous_range_ = range(1, 14)
categorical_range_ = range(14, 40)
class CriteoDataset(dg.MultiSlotDataGenerator):
"""
DacDataset: inheritance MultiSlotDataGeneratior, Implement data reading
Help document: http://wiki.baidu.com/pages/viewpage.action?pageId=728820675
"""
def generate_sample(self, line):
"""
Read the data line by line and process it as a dictionary
"""
def reader():
"""
This function needs to be implemented by the user, based on data format
"""
features = line.rstrip('\n').split('\t')
dense_feature = []
sparse_feature = []
for idx in continuous_range_:
if features[idx] == "":
dense_feature.append(0.0)
else:
dense_feature.append(
(float(features[idx]) - cont_min_[idx - 1]) /
cont_diff_[idx - 1])
for idx in categorical_range_:
sparse_feature.append(
[hash(str(idx) + features[idx]) % hash_dim_])
label = [int(features[0])]
process_line = dense_feature, sparse_feature, label
feature_name = ["dense_input"]
for idx in categorical_range_:
feature_name.append("C" + str(idx - 13))
feature_name.append("label")
yield zip(feature_name, [dense_feature] + sparse_feature + [label])
return reader
d = CriteoDataset()
d.run_from_stdin()
......@@ -15,7 +15,7 @@
import math
import paddle.fluid as fluid
from ...utils import envs
from eleps.utils import envs
class Train(object):
......@@ -28,10 +28,12 @@ class Train(object):
self.sparse_input_varnames = []
self.dense_input_varname = None
self.label_input_varname = None
self.namespace = "train.model"
def input(self):
def sparse_inputs():
ids = envs.get_global_env("sparse_inputs_counts")
ids = envs.get_global_env("hyper_parameters.sparse_inputs_slots", None ,self.namespace)
sparse_input_ids = [
fluid.layers.data(name="C" + str(i),
......@@ -42,10 +44,10 @@ class Train(object):
return sparse_input_ids, [var.name for var in sparse_input_ids]
def dense_input():
dense_input_dim = envs.get_global_env("dense_input_dim")
dim = envs.get_global_env("hyper_parameters.dense_input_dim", None ,self.namespace)
dense_input_var = fluid.layers.data(name="dense_input",
shape=dense_input_dim,
shape=[dim],
dtype="float32")
return dense_input_var, dense_input_var.name
......@@ -65,13 +67,13 @@ class Train(object):
def net(self):
def embedding_layer(input):
sparse_feature_number = envs.get_global_env("sparse_feature_number")
sparse_feature_dim = envs.get_global_env("sparse_feature_dim")
sparse_feature_number = envs.get_global_env("hyper_parameters.sparse_feature_number", None ,self.namespace)
sparse_feature_dim = envs.get_global_env("hyper_parameters.sparse_feature_dim", None ,self.namespace)
emb = fluid.layers.embedding(
input=input,
is_sparse=True,
size=[{sparse_feature_number}, {sparse_feature_dim}],
size=[sparse_feature_number, sparse_feature_dim],
param_attr=fluid.ParamAttr(
name="SparseFeatFactors",
initializer=fluid.initializer.Uniform()),
......@@ -92,7 +94,7 @@ class Train(object):
concated = fluid.layers.concat(sparse_embed_seq + [self.dense_input], axis=1)
fcs = [concated]
hidden_layers = envs.get_global_env("fc_sizes")
hidden_layers = envs.get_global_env("hyper_parameters.fc_sizes", None ,self.namespace)
for size in hidden_layers:
fcs.append(fc(fcs[-1], size))
......@@ -107,8 +109,8 @@ class Train(object):
self.predict = predict
def avg_loss(self, predict):
cost = fluid.layers.cross_entropy(input=predict, label=self.label_input)
def avg_loss(self):
cost = fluid.layers.cross_entropy(input=self.predict, label=self.label_input)
avg_cost = fluid.layers.reduce_sum(cost)
self.loss = avg_cost
return avg_cost
......@@ -120,8 +122,10 @@ class Train(object):
slide_steps=20)
self.metrics = (auc, batch_auc)
return self.metrics
def optimizer(self):
learning_rate = envs.get_global_env("learning_rate")
learning_rate = envs.get_global_env("hyper_parameters.learning_rate", None ,self.namespace)
optimizer = fluid.optimizer.Adam(learning_rate, lazy_mode=True)
return optimizer
......
......@@ -80,6 +80,9 @@ class TrainerFactory(object):
raise ValueError("unknown config about eleps")
envs.set_global_envs(_config)
print(envs.pretty_print_envs())
trainer = TrainerFactory._build_trainer(_config)
return trainer
......
......@@ -51,7 +51,10 @@ class SingleTrainer(Trainer):
self.regist_context_processor('terminal_pass', self.terminal)
def instance(self, context):
model_package = __import__(envs.get_global_env("train.model.models"))
models = envs.get_global_env("train.model.models")
model_package = __import__(models, globals(), locals(), models.split("."))
train_model = getattr(model_package, 'Train')
self.model = train_model()
......@@ -64,7 +67,7 @@ class SingleTrainer(Trainer):
self.metrics = self.model.metrics()
loss = self.model.avg_loss()
optimizer = self.model.get_optimizer()
optimizer = self.model.optimizer()
optimizer.minimize(loss)
# run startup program at once
......@@ -89,15 +92,24 @@ class SingleTrainerWithDataloader(SingleTrainer):
class SingleTrainerWithDataset(SingleTrainer):
def _get_dataset(self, inputs, threads, batch_size, pipe_command, train_files_path):
def _get_dataset(self):
namespace = "train.reader"
inputs = self.model.input_vars()
threads = envs.get_global_env("train.threads", None)
batch_size = envs.get_global_env("batch_size", None, namespace)
pipe_command = envs.get_global_env("pipe_command", None, namespace)
train_data_path = envs.get_global_env("train_data_path", None, namespace)
dataset = fluid.DatasetFactory().create_dataset()
dataset.set_use_var(inputs)
dataset.set_pipe_command(pipe_command)
dataset.set_batch_size(batch_size)
dataset.set_thread(threads)
file_list = [
os.path.join(train_files_path, x)
for x in os.listdir(train_files_path)
os.path.join(train_data_path, x)
for x in os.listdir(train_data_path)
]
dataset.set_filelist(file_list)
......@@ -146,21 +158,17 @@ class SingleTrainerWithDataset(SingleTrainer):
save_inference_model()
def train(self, context):
inputs = self.model.input_vars()
threads = envs.get_global_env("threads")
batch_size = envs.get_global_env("batch_size")
pipe_command = envs.get_global_env("pipe_command")
train_data_path = envs.get_global_env("train_data_path")
dataset = self._get_dataset()
dataset = self._get_dataset(inputs, threads, batch_size, pipe_command, train_data_path)
epochs = envs.get_global_env("train.epochs")
epochs = envs.get_global_env("epochs")
print("fetch_list: {}".format(len(self.metrics)))
for i in range(epochs):
self.exe.train_from_dataset(program=fluid.default_main_program(),
dataset=dataset,
fetch_list=[self.metrics],
fetch_info=["epoch {} auc ".format(i)],
fetch_list=self.metrics,
fetch_info=["auc ", "batch auc"],
print_period=100)
context['status'] = 'infer_pass'
......
......@@ -14,27 +14,62 @@
import os
import copy
global_envs = {}
def encode_value(v):
return v
def set_global_envs(envs):
assert isinstance(envs, dict)
def decode_value(v):
return v
def fatten_env_namespace(namespace_nests, local_envs):
for k, v in local_envs.items():
if isinstance(v, dict):
nests = copy.deepcopy(namespace_nests)
nests.append(k)
fatten_env_namespace(nests, v)
else:
global_k = ".".join(namespace_nests + [k])
global_envs[global_k] = v
for k, v in envs.items():
fatten_env_namespace([k], v)
def set_global_envs(yaml):
for k, v in yaml.items():
os.environ[k] = encode_value(v)
def get_global_env(env_name, default_value=None):
def get_global_env(env_name, default_value=None, namespace=None):
"""
get os environment value
"""
if env_name not in os.environ:
return default_value
_env_name = env_name if namespace is None else ".".join([namespace, env_name])
return global_envs.get(_env_name, default_value)
def pretty_print_envs():
spacing = 5
max_k = 45
max_v = 20
for k, v in global_envs.items():
max_k = max(max_k, len(k))
max_v = max(max_v, len(str(v)))
h_format = "{{:^{}s}}{{:<{}s}}\n".format(max_k, max_v)
l_format = "{{:<{}s}}{{}}{{:<{}s}}\n".format(max_k, max_v)
length = max_k + max_v + spacing
border = "".join(["="] * length)
line = "".join(["-"] * length)
draws = ""
draws += border + "\n"
draws += h_format.format("Eleps Global Envs", "Value")
draws += line + "\n"
for k, v in global_envs.items():
draws += l_format.format(k, " " * spacing, str(v))
draws += border
_str = "\n{}\n".format(draws)
return _str
v = os.environ[env_name]
return decode_value(v)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册