diff --git a/core/engine/cluster/cluster.py b/core/engine/cluster/cluster.py index 6dfcec3a929ad8124192014f48270ebd1862dc2c..8c45335799afb165b66c133bd217caf3320f703f 100644 --- a/core/engine/cluster/cluster.py +++ b/core/engine/cluster/cluster.py @@ -27,6 +27,7 @@ from paddlerec.core.utils import envs class ClusterEngine(Engine): def __init_impl__(self): abs_dir = os.path.dirname(os.path.abspath(__file__)) + backend = envs.get_runtime_environ("engine_backend") if backend == "PaddleCloud": self.submit_script = os.path.join(abs_dir, "cloud/cluster.sh") @@ -57,4 +58,5 @@ class ClusterEngine(Engine): self.start_worker_procs() else: - raise ValueError("role {} error, must in MASTER/WORKER".format(role)) + raise ValueError("role {} error, must in MASTER/WORKER".format( + role)) diff --git a/core/engine/local_cluster.py b/core/engine/local_cluster.py index 4cf614f02315acbff2a3c21126d8c061c10ba8ad..89ceafa973c9488a727aecb2e01a74f2574a81f9 100755 --- a/core/engine/local_cluster.py +++ b/core/engine/local_cluster.py @@ -46,10 +46,13 @@ class LocalClusterEngine(Engine): ports.append(new_port) break user_endpoints = ",".join(["127.0.0.1:" + str(x) for x in ports]) - user_endpoints_ips = [x.split(":")[0] - for x in user_endpoints.split(",")] - user_endpoints_port = [x.split(":")[1] - for x in user_endpoints.split(",")] + + user_endpoints_ips = [ + x.split(":")[0] for x in user_endpoints.split(",") + ] + user_endpoints_port = [ + x.split(":")[1] for x in user_endpoints.split(",") + ] factory = "paddlerec.core.factory" cmd = [sys.executable, "-u", "-m", factory, self.trainer] @@ -97,8 +100,10 @@ class LocalClusterEngine(Engine): if len(log_fns) > 0: log_fns[i].close() procs[i].terminate() - print("all workers already completed, you can view logs under the `{}` directory".format(logs_dir), - file=sys.stderr) + print( + "all workers already completed, you can view logs under the `{}` directory". + format(logs_dir), + file=sys.stderr) def run(self): self.start_procs() diff --git a/core/engine/local_mpi.py b/core/engine/local_mpi.py index 49db821fe5764ae9ef7f42cbd3ca2fe77b83a1d1..830bf28c4957e342d317070ab2060cde1de6d6a6 100755 --- a/core/engine/local_mpi.py +++ b/core/engine/local_mpi.py @@ -26,7 +26,6 @@ from paddlerec.core.engine.engine import Engine class LocalMPIEngine(Engine): def start_procs(self): logs_dir = self.envs["log_dir"] - default_env = os.environ.copy() current_env = copy.copy(default_env) current_env.pop("http_proxy", None) @@ -42,7 +41,8 @@ class LocalMPIEngine(Engine): os.system("mkdir -p {}".format(logs_dir)) fn = open("%s/job.log" % logs_dir, "w") log_fns.append(fn) - proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn, cwd=os.getcwd()) + proc = subprocess.Popen( + cmd, env=current_env, stdout=fn, stderr=fn, cwd=os.getcwd()) else: proc = subprocess.Popen(cmd, env=current_env, cwd=os.getcwd()) procs.append(proc) @@ -51,7 +51,9 @@ class LocalMPIEngine(Engine): if len(log_fns) > 0: log_fns[i].close() procs[i].wait() - print("all workers and parameter servers already completed", file=sys.stderr) + print( + "all workers and parameter servers already completed", + file=sys.stderr) def run(self): self.start_procs() diff --git a/core/factory.py b/core/factory.py index 4c08f1f6bbd70cc65011e8430e3acf039d7b6c8f..470b3a025e51d8c9fd6b2b3bcbb118fb8a619d77 100755 --- a/core/factory.py +++ b/core/factory.py @@ -19,24 +19,23 @@ import yaml from paddlerec.core.utils import envs -trainer_abs = os.path.join(os.path.dirname( - os.path.abspath(__file__)), "trainers") +trainer_abs = os.path.join( + os.path.dirname(os.path.abspath(__file__)), "trainers") trainers = {} def trainer_registry(): - trainers["SingleTrainer"] = os.path.join( - trainer_abs, "single_trainer.py") - trainers["ClusterTrainer"] = os.path.join( - trainer_abs, "cluster_trainer.py") - trainers["CtrCodingTrainer"] = os.path.join( - trainer_abs, "ctr_coding_trainer.py") - trainers["CtrModulTrainer"] = os.path.join( - trainer_abs, "ctr_modul_trainer.py") - trainers["TDMSingleTrainer"] = os.path.join( - trainer_abs, "tdm_single_trainer.py") - trainers["TDMClusterTrainer"] = os.path.join( - trainer_abs, "tdm_cluster_trainer.py") + trainers["SingleTrainer"] = os.path.join(trainer_abs, "single_trainer.py") + trainers["ClusterTrainer"] = os.path.join(trainer_abs, + "cluster_trainer.py") + trainers["CtrCodingTrainer"] = os.path.join(trainer_abs, + "ctr_coding_trainer.py") + trainers["CtrModulTrainer"] = os.path.join(trainer_abs, + "ctr_modul_trainer.py") + trainers["TDMSingleTrainer"] = os.path.join(trainer_abs, + "tdm_single_trainer.py") + trainers["TDMClusterTrainer"] = os.path.join(trainer_abs, + "tdm_cluster_trainer.py") trainer_registry() @@ -55,8 +54,8 @@ class TrainerFactory(object): if trainer_abs is None: if not os.path.isfile(train_mode): - raise IOError( - "trainer {} can not be recognized".format(train_mode)) + raise IOError("trainer {} can not be recognized".format( + train_mode)) trainer_abs = train_mode train_mode = "UserDefineTrainer" diff --git a/core/metrics/auc_metrics.py b/core/metrics/auc_metrics.py index 5dd16cc078aa43d8fb07a50a4b006d4fdae3b2e9..085c84990e4a0a3a3e606ef707fef5d90387e8b0 100755 --- a/core/metrics/auc_metrics.py +++ b/core/metrics/auc_metrics.py @@ -22,7 +22,7 @@ from paddlerec.core.metric import Metric class AUCMetric(Metric): """ - Metric For Paddle Model + Metric For Fluid Model """ def __init__(self, config, fleet): @@ -83,7 +83,8 @@ class AUCMetric(Metric): if scope.find_var(metric_item['var'].name) is None: result[metric_name] = None continue - result[metric_name] = self.get_metric(scope, metric_item['var'].name) + result[metric_name] = self.get_metric(scope, + metric_item['var'].name) return result def calculate_auc(self, global_pos, global_neg): @@ -178,14 +179,18 @@ class AUCMetric(Metric): self._result['mean_q'] = 0 return self._result if 'stat_pos' in result and 'stat_neg' in result: - result['auc'] = self.calculate_auc(result['stat_pos'], result['stat_neg']) - result['bucket_error'] = self.calculate_auc(result['stat_pos'], result['stat_neg']) + result['auc'] = self.calculate_auc(result['stat_pos'], + result['stat_neg']) + result['bucket_error'] = self.calculate_auc(result['stat_pos'], + result['stat_neg']) if 'pos_ins_num' in result: - result['actual_ctr'] = result['pos_ins_num'] / result['total_ins_num'] + result['actual_ctr'] = result['pos_ins_num'] / result[ + 'total_ins_num'] if 'abserr' in result: result['mae'] = result['abserr'] / result['total_ins_num'] if 'sqrerr' in result: - result['rmse'] = math.sqrt(result['sqrerr'] / result['total_ins_num']) + result['rmse'] = math.sqrt(result['sqrerr'] / + result['total_ins_num']) if 'prob' in result: result['predict_ctr'] = result['prob'] / result['total_ins_num'] if abs(result['predict_ctr']) > 1e-6: diff --git a/core/model.py b/core/model.py index 212db44c8dc60a20f6e5ed3f7c338b5336f41e2a..82b41ebc4b7ea752e708b9d7246b6bf7d5025db4 100755 --- a/core/model.py +++ b/core/model.py @@ -20,7 +20,7 @@ from paddlerec.core.utils import envs class Model(object): - """R + """Base Model """ __metaclass__ = abc.ABCMeta @@ -39,32 +39,43 @@ class Model(object): self._platform = envs.get_platform() def _init_slots(self): - sparse_slots = envs.get_global_env("sparse_slots", None, "train.reader") + sparse_slots = envs.get_global_env("sparse_slots", None, + "train.reader") dense_slots = envs.get_global_env("dense_slots", None, "train.reader") if sparse_slots is not None or dense_slots is not None: sparse_slots = sparse_slots.strip().split(" ") dense_slots = dense_slots.strip().split(" ") - dense_slots_shape = [[int(j) for j in i.split(":")[1].strip("[]").split(",")] for i in dense_slots] + dense_slots_shape = [[ + int(j) for j in i.split(":")[1].strip("[]").split(",") + ] for i in dense_slots] dense_slots = [i.split(":")[0] for i in dense_slots] self._dense_data_var = [] for i in range(len(dense_slots)): - l = fluid.layers.data(name=dense_slots[i], shape=dense_slots_shape[i], dtype="float32") + l = fluid.layers.data( + name=dense_slots[i], + shape=dense_slots_shape[i], + dtype="float32") self._data_var.append(l) self._dense_data_var.append(l) self._sparse_data_var = [] for name in sparse_slots: - l = fluid.layers.data(name=name, shape=[1], lod_level=1, dtype="int64") + l = fluid.layers.data( + name=name, shape=[1], lod_level=1, dtype="int64") self._data_var.append(l) self._sparse_data_var.append(l) - dataset_class = envs.get_global_env("dataset_class", None, "train.reader") + dataset_class = envs.get_global_env("dataset_class", None, + "train.reader") if dataset_class == "DataLoader": self._init_dataloader() def _init_dataloader(self): self._data_loader = fluid.io.DataLoader.from_generator( - feed_list=self._data_var, capacity=64, use_double_buffer=False, iterable=False) + feed_list=self._data_var, + capacity=64, + use_double_buffer=False, + iterable=False) def get_inputs(self): return self._data_var @@ -96,8 +107,8 @@ class Model(object): "configured optimizer can only supported SGD/Adam/Adagrad") if name == "SGD": - reg = envs.get_global_env( - "hyper_parameters.reg", 0.0001, self._namespace) + reg = envs.get_global_env("hyper_parameters.reg", 0.0001, + self._namespace) optimizer_i = fluid.optimizer.SGD( lr, regularization=fluid.regularizer.L2DecayRegularizer(reg)) elif name == "ADAM": @@ -111,10 +122,10 @@ class Model(object): return optimizer_i def optimizer(self): - learning_rate = envs.get_global_env( - "hyper_parameters.learning_rate", None, self._namespace) - optimizer = envs.get_global_env( - "hyper_parameters.optimizer", None, self._namespace) + learning_rate = envs.get_global_env("hyper_parameters.learning_rate", + None, self._namespace) + optimizer = envs.get_global_env("hyper_parameters.optimizer", None, + self._namespace) print(">>>>>>>>>>>.learnig rate: %s" % learning_rate) return self._build_optimizer(optimizer, learning_rate)