提交 f47be426 编写于 作者: T tangwei

code reformat

上级 f2282996
......@@ -45,8 +45,8 @@ class TrainerFactory(object):
trainer_abs = trainers.get(train_mode, None)
if trainer_abs is None:
if not os.path.exists(train_mode) or not os.path.isfile(train_mode):
raise ValueError("trainer {} can not be recognized".format(train_mode))
if not os.path.isfile(train_mode):
raise FileNotFoundError("trainer {} can not be recognized".format(train_mode))
trainer_abs = train_mode
train_mode = "UserDefineTrainer"
......@@ -60,7 +60,7 @@ class TrainerFactory(object):
@staticmethod
def create(config):
_config = None
if os.path.exists(config) and os.path.isfile(config):
if os.path.isfile(config):
with open(config, 'r') as rb:
_config = yaml.load(rb.read(), Loader=yaml.FullLoader)
else:
......
......@@ -13,6 +13,7 @@ class Model(object):
self._metrics = {}
self._data_var = []
self._fetch_interval = 20
self._namespace = "train.model"
def get_inputs(self):
return self._data_var
......
......@@ -28,7 +28,7 @@ class Reader(dg.MultiSlotDataGenerator):
def __init__(self, config):
dg.MultiSlotDataGenerator.__init__(self)
if os.path.exists(config) and os.path.isfile(config):
if os.path.isfile(config):
with open(config, 'r') as rb:
_config = yaml.load(rb.read(), Loader=yaml.FullLoader)
else:
......
......@@ -123,3 +123,14 @@ def lazy_instance(package, class_name):
model_package = __import__(package, globals(), locals(), package.split("."))
instance = getattr(model_package, class_name)
return instance
def get_platform():
import platform
plats = platform.platform()
if 'Linux' in plats:
return "LINUX"
if 'Darwin' in plats:
return "DARWIN"
if 'Windows' in plats:
return "WINDOWS"
......@@ -51,9 +51,9 @@ class TrainReader(Reader):
sparse_feature.append(
[hash(str(idx) + features[idx]) % self.hash_dim_])
label = [int(features[0])]
feature_name = ["dense_input"]
feature_name = ["D"]
for idx in self.categorical_range_:
feature_name.append("C" + str(idx - 13))
feature_name.append("S" + str(idx - 13))
feature_name.append("label")
yield zip(feature_name, [dense_feature] + sparse_feature + [label])
......
......@@ -22,14 +22,13 @@ from fleetrec.core.model import Model as ModelBase
class Model(ModelBase):
def __init__(self, config):
ModelBase.__init__(self, config)
self.namespace = "train.model"
def input(self):
def sparse_inputs():
ids = envs.get_global_env("hyper_parameters.sparse_inputs_slots", None, self.namespace)
ids = envs.get_global_env("hyper_parameters.sparse_inputs_slots", None, self._namespace)
sparse_input_ids = [
fluid.layers.data(name="C" + str(i),
fluid.layers.data(name="S" + str(i),
shape=[1],
lod_level=1,
dtype="int64") for i in range(1, ids)
......@@ -37,9 +36,9 @@ class Model(ModelBase):
return sparse_input_ids
def dense_input():
dim = envs.get_global_env("hyper_parameters.dense_input_dim", None, self.namespace)
dim = envs.get_global_env("hyper_parameters.dense_input_dim", None, self._namespace)
dense_input_var = fluid.layers.data(name="dense_input",
dense_input_var = fluid.layers.data(name="D",
shape=[dim],
dtype="float32")
return dense_input_var
......@@ -63,8 +62,8 @@ class Model(ModelBase):
trainer = envs.get_trainer()
is_distributed = True if trainer == "CtrTrainer" else False
sparse_feature_number = envs.get_global_env("hyper_parameters.sparse_feature_number", None, self.namespace)
sparse_feature_dim = envs.get_global_env("hyper_parameters.sparse_feature_dim", None, self.namespace)
sparse_feature_number = envs.get_global_env("hyper_parameters.sparse_feature_number", None, self._namespace)
sparse_feature_dim = envs.get_global_env("hyper_parameters.sparse_feature_dim", None, self._namespace)
sparse_feature_dim = 9 if trainer == "CtrTrainer" else sparse_feature_dim
def embedding_layer(input):
......@@ -93,7 +92,7 @@ class Model(ModelBase):
concated = fluid.layers.concat(sparse_embed_seq + [self.dense_input], axis=1)
fcs = [concated]
hidden_layers = envs.get_global_env("hyper_parameters.fc_sizes", None, self.namespace)
hidden_layers = envs.get_global_env("hyper_parameters.fc_sizes", None, self._namespace)
for size in hidden_layers:
fcs.append(fc(fcs[-1], size))
......@@ -128,7 +127,7 @@ class Model(ModelBase):
self.metrics()
def optimizer(self):
learning_rate = envs.get_global_env("hyper_parameters.learning_rate", None, self.namespace)
learning_rate = envs.get_global_env("hyper_parameters.learning_rate", None, self._namespace)
optimizer = fluid.optimizer.Adam(learning_rate, lazy_mode=True)
return optimizer
......
......@@ -7,20 +7,47 @@ from fleetrec.core.factory import TrainerFactory
from fleetrec.core.utils import envs
from fleetrec.core.utils import util
engines = {"TRAINSPILER": {}, "PSLIB": {}}
engines = {}
device = ["CPU", "GPU"]
clusters = ["SINGLE", "LOCAL_CLUSTER", "CLUSTER"]
def is_transpiler():
def engine_registry():
cpu = {"TRANSPILER": {}, "PSLIB": {}}
cpu["TRANSPILER"]["SINGLE"] = single_engine
cpu["TRANSPILER"]["LOCAL_CLUSTER"] = local_cluster_engine
cpu["TRANSPILER"]["CLUSTER"] = cluster_engine
cpu["PSLIB"]["SINGLE"] = local_mpi_engine
cpu["PSLIB"]["LOCAL_CLUSTER"] = local_mpi_engine
cpu["PSLIB"]["CLUSTER"] = cluster_mpi_engine
gpu = {"TRANSPILER": {}, "PSLIB": {}}
gpu["TRANSPILER"]["SINGLE"] = single_engine
engines["CPU"] = cpu
engines["GPU"] = gpu
def get_engine(engine, device):
d_engine = engines[device]
transpiler = get_transpiler()
run_engine = d_engine[transpiler].get(engine, None)
if run_engine is None:
raise ValueError("engine {} can not be supported on device: {}".format(engine, device))
return run_engine
def get_transpiler():
FNULL = open(os.devnull, 'w')
cmd = ["python", "-c",
"import paddle.fluid as fluid; fleet_ptr = fluid.core.Fleet(); [fleet_ptr.copy_table_by_feasign(10, 10, [2020, 1010])];"]
proc = subprocess.Popen(cmd, stdout=FNULL, stderr=FNULL, cwd=os.getcwd())
ret = proc.wait()
if ret == -11:
return False
return "PSLIB"
else:
return True
return "TRANSPILER"
def set_runtime_envs(cluster_envs, engine_yaml):
......@@ -50,18 +77,6 @@ def set_runtime_envs(cluster_envs, engine_yaml):
print(envs.pretty_print_envs(need_print, ("Runtime Envs", "Value")))
def get_engine(engine):
engine = engine.upper()
if is_transpiler():
run_engine = engines["TRAINSPILER"].get(engine, None)
else:
run_engine = engines["PSLIB"].get(engine, None)
if run_engine is None:
raise ValueError("engine only support SINGLE/LOCAL_CLUSTER/CLUSTER")
return run_engine
def single_engine(args):
print("use single engine to run model: {}".format(args.model))
......@@ -69,6 +84,8 @@ def single_engine(args):
single_envs["train.trainer.trainer"] = "SingleTrainer"
single_envs["train.trainer.threads"] = "2"
single_envs["train.trainer.engine"] = "single"
single_envs["train.trainer.device"] = args.device
set_runtime_envs(single_envs, args.model)
trainer = TrainerFactory.create(args.model)
return trainer
......@@ -80,6 +97,8 @@ def cluster_engine(args):
cluster_envs = {}
cluster_envs["train.trainer.trainer"] = "ClusterTrainer"
cluster_envs["train.trainer.engine"] = "cluster"
cluster_envs["train.trainer.device"] = args.device
set_runtime_envs(cluster_envs, args.model)
trainer = TrainerFactory.create(args.model)
......@@ -91,6 +110,8 @@ def cluster_mpi_engine(args):
cluster_envs = {}
cluster_envs["train.trainer.trainer"] = "CtrCodingTrainer"
cluster_envs["train.trainer.device"] = args.device
set_runtime_envs(cluster_envs, args.model)
trainer = TrainerFactory.create(args.model)
......@@ -110,6 +131,7 @@ def local_cluster_engine(args):
cluster_envs["train.trainer.strategy"] = "async"
cluster_envs["train.trainer.threads"] = "2"
cluster_envs["train.trainer.engine"] = "local_cluster"
cluster_envs["train.trainer.device"] = args.device
cluster_envs["CPU_NUM"] = "2"
set_runtime_envs(cluster_envs, args.model)
......@@ -132,36 +154,25 @@ def local_mpi_engine(args):
cluster_envs["train.trainer.trainer"] = "CtrCodingTrainer"
cluster_envs["log_dir"] = "logs"
cluster_envs["train.trainer.engine"] = "local_cluster"
cluster_envs["train.trainer.device"] = args.device
set_runtime_envs(cluster_envs, args.model)
launch = LocalMPIEngine(cluster_envs, args.model)
return launch
def engine_registry():
engines["TRAINSPILER"]["SINGLE"] = single_engine
engines["TRAINSPILER"]["LOCAL_CLUSTER"] = local_cluster_engine
engines["TRAINSPILER"]["CLUSTER"] = cluster_engine
engines["PSLIB"]["SINGLE"] = local_mpi_engine
engines["PSLIB"]["LOCAL_CLUSTER"] = local_mpi_engine
engines["PSLIB"]["CLUSTER"] = cluster_mpi_engine
engine_registry()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='fleet-rec run')
parser.add_argument("-m", "--model", type=str)
parser.add_argument("-e", "--engine", type=str)
parser.add_argument("-e", "--engine", type=str, choices=clusters)
parser.add_argument("-d", "--device", type=str, choices=["CPU", "GPU"], default="CPU")
args = parser.parse_args()
if not os.path.exists(args.model) or not os.path.isfile(args.model):
raise ValueError("argument model: {} error, must specify an existed YAML file".format(args.model))
if args.engine.upper() not in clusters:
raise ValueError("argument engine: {} error, must in {}".format(args.engine, clusters))
if not os.path.isfile(args.model):
raise FileNotFoundError("argument model: {} do not exist".format(args.model))
engine_registry()
which_engine = get_engine(args.engine)
which_engine = get_engine(args.engine, args.device)
engine = which_engine(args)
engine.run()
......@@ -4,12 +4,21 @@ setup for fleet-rec.
from setuptools import setup
packages = ["fleetrec", "fleetrec.models", "fleetrec.models.ctr_dnn",
"fleetrec.examples", "fleetrec.core", "fleetrec.core.engine",
"fleetrec.core.metrics", "fleetrec.core.models", "fleetrec.core.trainers", "fleetrec.core.utils"]
models_ctr = ["fleetrec.models.ctr", "fleetrec.models.ctr.dnn", "fleetrec.models.ctr.deepfm"]
models_ot = ["fleetrec.models.other", "fleetrec.models.tdm", "fleetrec.models.multi_task"]
models_recall = ["fleetrec.models.recall", "fleetrec.models.recall.word2vec"]
models = ["fleetrec.models"] + models_ctr + models_ot + models_recall
core = ["fleetrec.core", "fleetrec.core.engine",
"fleetrec.core.metrics", "fleetrec.core.models",
"fleetrec.core.trainers", "fleetrec.core.utils"]
packages = ["fleetrec", "fleetrec.contrib", "fleetrec.dataset", "fleetrec.doc", "fleetrec.examples",
"fleetrec.tools"] + models + core
requires = [
"paddlepaddle >= 0.0.0"
"paddlepaddle >= 0.0.0",
"netron >= 0.0.0"
]
about = {}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册