提交 f8d55c9f 编写于 作者: T tangwei

code clean

上级 357f0da7
...@@ -63,7 +63,7 @@ def create(config): ...@@ -63,7 +63,7 @@ def create(config):
model = None model = None
if config['mode'] == 'fluid': if config['mode'] == 'fluid':
model = YamlModel(config) model = YamlModel(config)
model.build_model() model.net()
return model return model
...@@ -94,13 +94,13 @@ class Model(object): ...@@ -94,13 +94,13 @@ class Model(object):
return self._fetch_interval return self._fetch_interval
@abc.abstractmethod @abc.abstractmethod
def shrink(self, params): def net(self):
"""R """R
""" """
pass pass
@abc.abstractmethod @abc.abstractmethod
def build_model(self): def shrink(self, params):
"""R """R
""" """
pass pass
...@@ -140,7 +140,7 @@ class YamlModel(Model): ...@@ -140,7 +140,7 @@ class YamlModel(Model):
self._build_param = {'layer': {}, 'inner_layer': {}, 'layer_extend': {}, 'model': {}} self._build_param = {'layer': {}, 'inner_layer': {}, 'layer_extend': {}, 'model': {}}
self._inference_meta = {'dependency': {}, 'params': {}} self._inference_meta = {'dependency': {}, 'params': {}}
def build_model(self): def net(self):
"""R """R
build a fluid model with config build a fluid model with config
Return: Return:
...@@ -287,4 +287,4 @@ class YamlModel(Model): ...@@ -287,4 +287,4 @@ class YamlModel(Model):
dependency_list = copy.deepcopy(dependencys) dependency_list = copy.deepcopy(dependencys)
for dependency in dependencys: for dependency in dependencys:
dependency_list = dependency_list + self.get_dependency(layer_graph, dependency) dependency_list = dependency_list + self.get_dependency(layer_graph, dependency)
return list(set(dependency_list)) return list(set(dependency_list))
\ No newline at end of file
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ...utils import envs
# There are 13 integer features and 26 categorical features
continous_features = range(1, 14)
categorial_features = range(14, 40)
continous_clip = [20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
class CriteoDataset(object):
def __init__(self, sparse_feature_dim):
self.cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
self.cont_max_ = [
20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50
]
self.cont_diff_ = [
20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50
]
self.hash_dim_ = sparse_feature_dim
# here, training data are lines with line_index < train_idx_
self.train_idx_ = 41256555
self.continuous_range_ = range(1, 14)
self.categorical_range_ = range(14, 40)
def _reader_creator(self, file_list, is_train, trainer_num, trainer_id):
def reader():
for file in file_list:
with open(file, 'r') as f:
line_idx = 0
for line in f:
line_idx += 1
features = line.rstrip('\n').split('\t')
dense_feature = []
sparse_feature = []
for idx in self.continuous_range_:
if features[idx] == '':
dense_feature.append(0.0)
else:
dense_feature.append(
(float(features[idx]) -
self.cont_min_[idx - 1]) /
self.cont_diff_[idx - 1])
for idx in self.categorical_range_:
sparse_feature.append([
hash(str(idx) + features[idx]) % self.hash_dim_
])
label = [int(features[0])]
yield [dense_feature] + sparse_feature + [label]
return reader
def train(self, file_list, trainer_num, trainer_id):
return self._reader_creator(file_list, True, trainer_num, trainer_id)
def test(self, file_list):
return self._reader_creator(file_list, False, 1, 0)
def Train():
sparse_feature_number = envs.get_global_env("sparse_feature_number")
train_generator = CriteoDataset(sparse_feature_number)
return train_generator.train
def Evaluate():
sparse_feature_number = envs.get_global_env("sparse_feature_number")
train_generator = CriteoDataset(sparse_feature_number)
return train_generator.test
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import sys
import paddle.fluid.incubate.data_generator as dg
cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
cont_max_ = [20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
cont_diff_ = [20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
hash_dim_ = 1000001
continuous_range_ = range(1, 14)
categorical_range_ = range(14, 40)
class CriteoDataset(dg.MultiSlotDataGenerator):
"""
DacDataset: inheritance MultiSlotDataGeneratior, Implement data reading
Help document: http://wiki.baidu.com/pages/viewpage.action?pageId=728820675
"""
def generate_sample(self, line):
"""
Read the data line by line and process it as a dictionary
"""
def reader():
"""
This function needs to be implemented by the user, based on data format
"""
features = line.rstrip('\n').split('\t')
dense_feature = []
sparse_feature = []
for idx in continuous_range_:
if features[idx] == "":
dense_feature.append(0.0)
else:
dense_feature.append(
(float(features[idx]) - cont_min_[idx - 1]) /
cont_diff_[idx - 1])
for idx in categorical_range_:
sparse_feature.append(
[hash(str(idx) + features[idx]) % hash_dim_])
label = [int(features[0])]
process_line = dense_feature, sparse_feature, label
feature_name = ["dense_input"]
for idx in categorical_range_:
feature_name.append("C" + str(idx - 13))
feature_name.append("label")
yield zip(feature_name, [dense_feature] + sparse_feature + [label])
return reader
d = CriteoDataset()
d.run_from_stdin()
...@@ -19,7 +19,7 @@ from fleetrec.utils import envs ...@@ -19,7 +19,7 @@ from fleetrec.utils import envs
from fleetrec.models.base import Model from fleetrec.models.base import Model
class Train(Model): class TrainModel(Model):
def __init__(self, config): def __init__(self, config):
Model.__init__(self, config) Model.__init__(self, config)
self.namespace = "train.model" self.namespace = "train.model"
...@@ -34,7 +34,7 @@ class Train(Model): ...@@ -34,7 +34,7 @@ class Train(Model):
lod_level=1, lod_level=1,
dtype="int64") for i in range(1, ids) dtype="int64") for i in range(1, ids)
] ]
return sparse_input_ids, [var.name for var in sparse_input_ids] return sparse_input_ids
def dense_input(): def dense_input():
dim = envs.get_global_env("hyper_parameters.dense_input_dim", None, self.namespace) dim = envs.get_global_env("hyper_parameters.dense_input_dim", None, self.namespace)
...@@ -42,23 +42,20 @@ class Train(Model): ...@@ -42,23 +42,20 @@ class Train(Model):
dense_input_var = fluid.layers.data(name="dense_input", dense_input_var = fluid.layers.data(name="dense_input",
shape=[dim], shape=[dim],
dtype="float32") dtype="float32")
return dense_input_var, dense_input_var.name return dense_input_var
def label_input(): def label_input():
label = fluid.layers.data(name="label", shape=[1], dtype="int64") label = fluid.layers.data(name="label", shape=[1], dtype="int64")
return label, label.name return label
self.sparse_inputs, self.sparse_input_varnames = sparse_inputs() self.sparse_inputs = sparse_inputs()
self.dense_input, self.dense_input_varname = dense_input() self.dense_input = dense_input()
self.label_input, self.label_input_varname = label_input() self.label_input = label_input()
def input_vars(self): def inputs(self):
return [self.dense_input] + self.sparse_inputs + [self.label_input] return [self.dense_input] + self.sparse_inputs + [self.label_input]
def input_varnames(self): def net(self):
return [input.name for input in self.input_vars()]
def build_model(self):
def embedding_layer(input): def embedding_layer(input):
sparse_feature_number = envs.get_global_env("hyper_parameters.sparse_feature_number", None, self.namespace) sparse_feature_number = envs.get_global_env("hyper_parameters.sparse_feature_number", None, self.namespace)
sparse_feature_dim = envs.get_global_env("hyper_parameters.sparse_feature_dim", None, self.namespace) sparse_feature_dim = envs.get_global_env("hyper_parameters.sparse_feature_dim", None, self.namespace)
...@@ -120,20 +117,8 @@ class Train(Model): ...@@ -120,20 +117,8 @@ class Train(Model):
optimizer = fluid.optimizer.Adam(learning_rate, lazy_mode=True) optimizer = fluid.optimizer.Adam(learning_rate, lazy_mode=True)
return optimizer return optimizer
def dump_model_program(self, path):
pass
def dump_inference_param(self, params):
pass
def dump_inference_program(self, inference_layer, path):
pass
def shrink(self, params):
pass
class Evaluate(object): class EvaluateModel(object):
def input(self): def input(self):
pass pass
......
...@@ -32,11 +32,7 @@ logger = logging.getLogger("fluid") ...@@ -32,11 +32,7 @@ logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
class ClusterTrainerWithDataloader(TranspileTrainer): class ClusterTrainer(TranspileTrainer):
pass
class ClusterTrainerWithDataset(TranspileTrainer):
def processor_register(self): def processor_register(self):
role = PaddleCloudRoleMaker() role = PaddleCloudRoleMaker()
fleet.init(role) fleet.init(role)
...@@ -71,7 +67,7 @@ class ClusterTrainerWithDataset(TranspileTrainer): ...@@ -71,7 +67,7 @@ class ClusterTrainerWithDataset(TranspileTrainer):
def init(self, context): def init(self, context):
self.model.input() self.model.input()
self.model.build_model() self.model.net()
self.model.metrics() self.model.metrics()
self.model.avg_loss() self.model.avg_loss()
optimizer = self.model.optimizer() optimizer = self.model.optimizer()
......
...@@ -10,46 +10,19 @@ ...@@ -10,46 +10,19 @@
# distributed under the License is distributed on an "AS IS" BASIS, # distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License.# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import os import os
import sys import sys
import yaml import yaml
from fleetrec.trainer.single_trainer import SingleTrainerWithDataloader
from fleetrec.trainer.single_trainer import SingleTrainerWithDataset
from fleetrec.trainer.cluster_trainer import ClusterTrainerWithDataloader
from fleetrec.trainer.cluster_trainer import ClusterTrainerWithDataset
from fleetrec.trainer.local_engine import Launch from fleetrec.trainer.local_engine import Launch
from fleetrec.trainer.single_trainer import SingleTrainer
from fleetrec.trainer.cluster_trainer import ClusterTrainer
from fleetrec.trainer.ctr_trainer import CtrPaddleTrainer from fleetrec.trainer.ctr_trainer import CtrPaddleTrainer
from fleetrec.utils import envs from fleetrec.utils import envs
from fleetrec.utils import util
def str2bool(v):
if isinstance(v, bool):
return v
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
return False
else:
raise ValueError('Boolean value expected.')
class TrainerFactory(object): class TrainerFactory(object):
...@@ -61,21 +34,10 @@ class TrainerFactory(object): ...@@ -61,21 +34,10 @@ class TrainerFactory(object):
print(envs.pretty_print_envs(envs.get_global_envs())) print(envs.pretty_print_envs(envs.get_global_envs()))
train_mode = envs.get_global_env("train.trainer") train_mode = envs.get_global_env("train.trainer")
reader_mode = envs.get_global_env("train.reader.mode")
if train_mode == "SingleTraining": if train_mode == "SingleTraining":
if reader_mode == "dataset": trainer = SingleTrainer()
trainer = SingleTrainerWithDataset()
elif reader_mode == "dataloader":
trainer = SingleTrainerWithDataloader()
else:
raise ValueError("reader only support dataset/dataloader")
elif train_mode == "ClusterTraining": elif train_mode == "ClusterTraining":
if reader_mode == "dataset": trainer = ClusterTrainer()
trainer = ClusterTrainerWithDataset()
elif reader_mode == "dataloader":
trainer = ClusterTrainerWithDataloader()
else:
raise ValueError("reader only support dataset/dataloader")
elif train_mode == "CtrTrainer": elif train_mode == "CtrTrainer":
trainer = CtrPaddleTrainer(config) trainer = CtrPaddleTrainer(config)
else: else:
...@@ -108,7 +70,7 @@ class TrainerFactory(object): ...@@ -108,7 +70,7 @@ class TrainerFactory(object):
envs.set_global_envs(_config) envs.set_global_envs(_config)
mode = envs.get_global_env("train.trainer") mode = envs.get_global_env("train.trainer")
container = envs.get_global_env("train.container") container = envs.get_global_env("train.container")
instance = str2bool(os.getenv("CLUSTER_INSTANCE", "0")) instance = util.str2bool(os.getenv("CLUSTER_INSTANCE", "0"))
if mode == "ClusterTraining" and container == "local" and not instance: if mode == "ClusterTraining" and container == "local" and not instance:
trainer = TrainerFactory._build_engine(config) trainer = TrainerFactory._build_engine(config)
...@@ -124,4 +86,3 @@ if __name__ == "__main__": ...@@ -124,4 +86,3 @@ if __name__ == "__main__":
raise ValueError("need a yaml file path argv") raise ValueError("need a yaml file path argv")
trainer = TrainerFactory.create(sys.argv[1]) trainer = TrainerFactory.create(sys.argv[1])
trainer.run() trainer.run()
...@@ -17,25 +17,18 @@ Training use fluid with one node only. ...@@ -17,25 +17,18 @@ Training use fluid with one node only.
""" """
from __future__ import print_function from __future__ import print_function
import os
import time
import numpy as np
import logging import logging
import paddle.fluid as fluid import paddle.fluid as fluid
from .transpiler_trainer import TranspileTrainer from fleetrec.trainer.transpiler_trainer import TranspileTrainer
from ..utils import envs from fleetrec.utils import envs
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s") logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid") logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
class SingleTrainerWithDataloader(TranspileTrainer): class SingleTrainer(TranspileTrainer):
pass
class SingleTrainerWithDataset(TranspileTrainer):
def processor_register(self): def processor_register(self):
self.regist_context_processor('uninit', self.instance) self.regist_context_processor('uninit', self.instance)
self.regist_context_processor('init_pass', self.init) self.regist_context_processor('init_pass', self.init)
......
...@@ -14,11 +14,8 @@ ...@@ -14,11 +14,8 @@
import abc import abc
import time import time
import yaml
from paddle import fluid from paddle import fluid
from ..utils import envs
class Trainer(object): class Trainer(object):
"""R """R
......
...@@ -18,10 +18,9 @@ Training use fluid with DistributeTranspiler ...@@ -18,10 +18,9 @@ Training use fluid with DistributeTranspiler
import os import os
import paddle.fluid as fluid import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from fleetrec.trainer import Trainer from fleetrec.trainer.trainer import Trainer
from fleetrec.utils import envs from fleetrec.utils import envs
...@@ -39,15 +38,18 @@ class TranspileTrainer(Trainer): ...@@ -39,15 +38,18 @@ class TranspileTrainer(Trainer):
def _get_dataset(self): def _get_dataset(self):
namespace = "train.reader" namespace = "train.reader"
inputs = self.model.input_vars() inputs = self.model.inputs()
threads = envs.get_global_env("train.threads", None) threads = envs.get_global_env("train.threads", None)
batch_size = envs.get_global_env("batch_size", None, namespace) batch_size = envs.get_global_env("batch_size", None, namespace)
pipe_command = envs.get_global_env("pipe_command", None, namespace) reader_class = envs.get_global_env("class", None, namespace)
abs_dir = os.path.dirname(os.path.abspath(__file__))
reader = os.path.join(abs_dir, '..', 'reader_implement.py')
pipe_cmd = "python {} {} {}".format(reader, reader_class, "TRAIN")
train_data_path = envs.get_global_env("train_data_path", None, namespace) train_data_path = envs.get_global_env("train_data_path", None, namespace)
dataset = fluid.DatasetFactory().create_dataset() dataset = fluid.DatasetFactory().create_dataset()
dataset.set_use_var(inputs) dataset.set_use_var(inputs)
dataset.set_pipe_command(pipe_command) dataset.set_pipe_command(pipe_cmd)
dataset.set_batch_size(batch_size) dataset.set_batch_size(batch_size)
dataset.set_thread(threads) dataset.set_thread(threads)
file_list = [ file_list = [
......
...@@ -15,7 +15,18 @@ ...@@ -15,7 +15,18 @@
import os import os
import time import time
import datetime import datetime
from .. utils import fs as fs from ..utils import fs as fs
def str2bool(v):
if isinstance(v, bool):
return v
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
return False
else:
raise ValueError('Boolean value expected.')
def get_env_value(env_name): def get_env_value(env_name):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册