提交 bfce2242 编写于 作者: T tangwei

fix code style

上级 988a26aa
......@@ -27,6 +27,7 @@ from paddlerec.core.utils import envs
class ClusterEngine(Engine):
def __init_impl__(self):
abs_dir = os.path.dirname(os.path.abspath(__file__))
backend = envs.get_runtime_environ("engine_backend")
if backend == "PaddleCloud":
self.submit_script = os.path.join(abs_dir, "cloud/cluster.sh")
......@@ -57,4 +58,5 @@ class ClusterEngine(Engine):
self.start_worker_procs()
else:
raise ValueError("role {} error, must in MASTER/WORKER".format(role))
raise ValueError("role {} error, must in MASTER/WORKER".format(
role))
......@@ -46,10 +46,13 @@ class LocalClusterEngine(Engine):
ports.append(new_port)
break
user_endpoints = ",".join(["127.0.0.1:" + str(x) for x in ports])
user_endpoints_ips = [x.split(":")[0]
for x in user_endpoints.split(",")]
user_endpoints_port = [x.split(":")[1]
for x in user_endpoints.split(",")]
user_endpoints_ips = [
x.split(":")[0] for x in user_endpoints.split(",")
]
user_endpoints_port = [
x.split(":")[1] for x in user_endpoints.split(",")
]
factory = "paddlerec.core.factory"
cmd = [sys.executable, "-u", "-m", factory, self.trainer]
......@@ -97,8 +100,10 @@ class LocalClusterEngine(Engine):
if len(log_fns) > 0:
log_fns[i].close()
procs[i].terminate()
print("all workers already completed, you can view logs under the `{}` directory".format(logs_dir),
file=sys.stderr)
print(
"all workers already completed, you can view logs under the `{}` directory".
format(logs_dir),
file=sys.stderr)
def run(self):
self.start_procs()
......@@ -26,7 +26,6 @@ from paddlerec.core.engine.engine import Engine
class LocalMPIEngine(Engine):
def start_procs(self):
logs_dir = self.envs["log_dir"]
default_env = os.environ.copy()
current_env = copy.copy(default_env)
current_env.pop("http_proxy", None)
......@@ -42,7 +41,8 @@ class LocalMPIEngine(Engine):
os.system("mkdir -p {}".format(logs_dir))
fn = open("%s/job.log" % logs_dir, "w")
log_fns.append(fn)
proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn, cwd=os.getcwd())
proc = subprocess.Popen(
cmd, env=current_env, stdout=fn, stderr=fn, cwd=os.getcwd())
else:
proc = subprocess.Popen(cmd, env=current_env, cwd=os.getcwd())
procs.append(proc)
......@@ -51,7 +51,9 @@ class LocalMPIEngine(Engine):
if len(log_fns) > 0:
log_fns[i].close()
procs[i].wait()
print("all workers and parameter servers already completed", file=sys.stderr)
print(
"all workers and parameter servers already completed",
file=sys.stderr)
def run(self):
self.start_procs()
......@@ -19,24 +19,23 @@ import yaml
from paddlerec.core.utils import envs
trainer_abs = os.path.join(os.path.dirname(
os.path.abspath(__file__)), "trainers")
trainer_abs = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "trainers")
trainers = {}
def trainer_registry():
trainers["SingleTrainer"] = os.path.join(
trainer_abs, "single_trainer.py")
trainers["ClusterTrainer"] = os.path.join(
trainer_abs, "cluster_trainer.py")
trainers["CtrCodingTrainer"] = os.path.join(
trainer_abs, "ctr_coding_trainer.py")
trainers["CtrModulTrainer"] = os.path.join(
trainer_abs, "ctr_modul_trainer.py")
trainers["TDMSingleTrainer"] = os.path.join(
trainer_abs, "tdm_single_trainer.py")
trainers["TDMClusterTrainer"] = os.path.join(
trainer_abs, "tdm_cluster_trainer.py")
trainers["SingleTrainer"] = os.path.join(trainer_abs, "single_trainer.py")
trainers["ClusterTrainer"] = os.path.join(trainer_abs,
"cluster_trainer.py")
trainers["CtrCodingTrainer"] = os.path.join(trainer_abs,
"ctr_coding_trainer.py")
trainers["CtrModulTrainer"] = os.path.join(trainer_abs,
"ctr_modul_trainer.py")
trainers["TDMSingleTrainer"] = os.path.join(trainer_abs,
"tdm_single_trainer.py")
trainers["TDMClusterTrainer"] = os.path.join(trainer_abs,
"tdm_cluster_trainer.py")
trainer_registry()
......@@ -55,8 +54,8 @@ class TrainerFactory(object):
if trainer_abs is None:
if not os.path.isfile(train_mode):
raise IOError(
"trainer {} can not be recognized".format(train_mode))
raise IOError("trainer {} can not be recognized".format(
train_mode))
trainer_abs = train_mode
train_mode = "UserDefineTrainer"
......
......@@ -22,7 +22,7 @@ from paddlerec.core.metric import Metric
class AUCMetric(Metric):
"""
Metric For Paddle Model
Metric For Fluid Model
"""
def __init__(self, config, fleet):
......@@ -83,7 +83,8 @@ class AUCMetric(Metric):
if scope.find_var(metric_item['var'].name) is None:
result[metric_name] = None
continue
result[metric_name] = self.get_metric(scope, metric_item['var'].name)
result[metric_name] = self.get_metric(scope,
metric_item['var'].name)
return result
def calculate_auc(self, global_pos, global_neg):
......@@ -178,14 +179,18 @@ class AUCMetric(Metric):
self._result['mean_q'] = 0
return self._result
if 'stat_pos' in result and 'stat_neg' in result:
result['auc'] = self.calculate_auc(result['stat_pos'], result['stat_neg'])
result['bucket_error'] = self.calculate_auc(result['stat_pos'], result['stat_neg'])
result['auc'] = self.calculate_auc(result['stat_pos'],
result['stat_neg'])
result['bucket_error'] = self.calculate_auc(result['stat_pos'],
result['stat_neg'])
if 'pos_ins_num' in result:
result['actual_ctr'] = result['pos_ins_num'] / result['total_ins_num']
result['actual_ctr'] = result['pos_ins_num'] / result[
'total_ins_num']
if 'abserr' in result:
result['mae'] = result['abserr'] / result['total_ins_num']
if 'sqrerr' in result:
result['rmse'] = math.sqrt(result['sqrerr'] / result['total_ins_num'])
result['rmse'] = math.sqrt(result['sqrerr'] /
result['total_ins_num'])
if 'prob' in result:
result['predict_ctr'] = result['prob'] / result['total_ins_num']
if abs(result['predict_ctr']) > 1e-6:
......
......@@ -20,7 +20,7 @@ from paddlerec.core.utils import envs
class Model(object):
"""R
"""Base Model
"""
__metaclass__ = abc.ABCMeta
......@@ -39,32 +39,43 @@ class Model(object):
self._platform = envs.get_platform()
def _init_slots(self):
sparse_slots = envs.get_global_env("sparse_slots", None, "train.reader")
sparse_slots = envs.get_global_env("sparse_slots", None,
"train.reader")
dense_slots = envs.get_global_env("dense_slots", None, "train.reader")
if sparse_slots is not None or dense_slots is not None:
sparse_slots = sparse_slots.strip().split(" ")
dense_slots = dense_slots.strip().split(" ")
dense_slots_shape = [[int(j) for j in i.split(":")[1].strip("[]").split(",")] for i in dense_slots]
dense_slots_shape = [[
int(j) for j in i.split(":")[1].strip("[]").split(",")
] for i in dense_slots]
dense_slots = [i.split(":")[0] for i in dense_slots]
self._dense_data_var = []
for i in range(len(dense_slots)):
l = fluid.layers.data(name=dense_slots[i], shape=dense_slots_shape[i], dtype="float32")
l = fluid.layers.data(
name=dense_slots[i],
shape=dense_slots_shape[i],
dtype="float32")
self._data_var.append(l)
self._dense_data_var.append(l)
self._sparse_data_var = []
for name in sparse_slots:
l = fluid.layers.data(name=name, shape=[1], lod_level=1, dtype="int64")
l = fluid.layers.data(
name=name, shape=[1], lod_level=1, dtype="int64")
self._data_var.append(l)
self._sparse_data_var.append(l)
dataset_class = envs.get_global_env("dataset_class", None, "train.reader")
dataset_class = envs.get_global_env("dataset_class", None,
"train.reader")
if dataset_class == "DataLoader":
self._init_dataloader()
def _init_dataloader(self):
self._data_loader = fluid.io.DataLoader.from_generator(
feed_list=self._data_var, capacity=64, use_double_buffer=False, iterable=False)
feed_list=self._data_var,
capacity=64,
use_double_buffer=False,
iterable=False)
def get_inputs(self):
return self._data_var
......@@ -96,8 +107,8 @@ class Model(object):
"configured optimizer can only supported SGD/Adam/Adagrad")
if name == "SGD":
reg = envs.get_global_env(
"hyper_parameters.reg", 0.0001, self._namespace)
reg = envs.get_global_env("hyper_parameters.reg", 0.0001,
self._namespace)
optimizer_i = fluid.optimizer.SGD(
lr, regularization=fluid.regularizer.L2DecayRegularizer(reg))
elif name == "ADAM":
......@@ -111,10 +122,10 @@ class Model(object):
return optimizer_i
def optimizer(self):
learning_rate = envs.get_global_env(
"hyper_parameters.learning_rate", None, self._namespace)
optimizer = envs.get_global_env(
"hyper_parameters.optimizer", None, self._namespace)
learning_rate = envs.get_global_env("hyper_parameters.learning_rate",
None, self._namespace)
optimizer = envs.get_global_env("hyper_parameters.optimizer", None,
self._namespace)
print(">>>>>>>>>>>.learnig rate: %s" % learning_rate)
return self._build_optimizer(optimizer, learning_rate)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册