diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000000000000000000000000000000000000..6d715f5a61674c72082313517cc40f13b1ea85ed --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,28 @@ +repos: +- repo: https://github.com/Lucas-C/pre-commit-hooks.git + sha: v1.0.1 + hooks: + - id: remove-crlf + files: (?!.*third_party)^.*$ | (?!.*book)^.*$ +- repo: https://github.com/PaddlePaddle/mirrors-yapf.git + sha: 0d79c0c469bab64f7229c9aca2b1186ef47f0e37 + hooks: + - id: yapf + files: (.*\.(py|bzl)|BUILD|.*\.BUILD|WORKSPACE)$ +- repo: https://github.com/pre-commit/pre-commit-hooks + sha: 5bf6c09bfa1297d3692cadd621ef95f1284e33c0 + hooks: + - id: check-added-large-files + - id: check-merge-conflict + - id: check-symlinks + - id: detect-private-key + files: (?!.*third_party)^.*$ | (?!.*book)^.*$ + - id: end-of-file-fixer +- repo: local + hooks: + - id: copyright_checker + name: copyright_checker + entry: python ./tools/codestyle/copyright.hook + language: system + files: \.(c|cc|cxx|cpp|cu|h|hpp|hxx|proto|py)$ + exclude: (?!.*third_party)^.*$ | (?!.*book)^.*$ diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000000000000000000000000000000000000..5b00ebbf73523eb310c16dcef60f78df9ab48156 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,30 @@ +language: generic +sudo: required +dist: trusty + +services: + - docker + +os: + - linux + +env: + - JOB=check_style + +before_install: + # For pylint dockstring checker + - sudo pip install pylint pytest astroid isort pre-commit + - | + function timeout() { perl -e 'alarm shift; exec @ARGV' "$@"; } + +script: + - "travis_wait 30 sleep 1800 &" + - | + # 43min timeout + tools/build_script.sh ${JOB} + if [ $? -eq 0 ] || [ $? -eq 142 ]; then true; else exit 1; fi; + +notifications: + email: + on_success: change + on_failure: always diff --git a/core/__init__.py b/core/__init__.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..abf198b97e6e818e1fbe59006f98492640bcee54 100755 --- a/core/__init__.py +++ b/core/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/core/engine/__init__.py b/core/engine/__init__.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..abf198b97e6e818e1fbe59006f98492640bcee54 100755 --- a/core/engine/__init__.py +++ b/core/engine/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/core/engine/cluster/__init__.py b/core/engine/cluster/__init__.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..abf198b97e6e818e1fbe59006f98492640bcee54 100644 --- a/core/engine/cluster/__init__.py +++ b/core/engine/cluster/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/core/engine/cluster/cloud/__init__.py b/core/engine/cluster/cloud/__init__.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..abf198b97e6e818e1fbe59006f98492640bcee54 100644 --- a/core/engine/cluster/cloud/__init__.py +++ b/core/engine/cluster/cloud/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/core/engine/cluster/cluster.py b/core/engine/cluster/cluster.py index 6dfcec3a929ad8124192014f48270ebd1862dc2c..8c45335799afb165b66c133bd217caf3320f703f 100644 --- a/core/engine/cluster/cluster.py +++ b/core/engine/cluster/cluster.py @@ -27,6 +27,7 @@ from paddlerec.core.utils import envs class ClusterEngine(Engine): def __init_impl__(self): abs_dir = os.path.dirname(os.path.abspath(__file__)) + backend = envs.get_runtime_environ("engine_backend") if backend == "PaddleCloud": self.submit_script = os.path.join(abs_dir, "cloud/cluster.sh") @@ -57,4 +58,5 @@ class ClusterEngine(Engine): self.start_worker_procs() else: - raise ValueError("role {} error, must in MASTER/WORKER".format(role)) + raise ValueError("role {} error, must in MASTER/WORKER".format( + role)) diff --git a/core/engine/local_cluster.py b/core/engine/local_cluster.py index 4cf614f02315acbff2a3c21126d8c061c10ba8ad..89ceafa973c9488a727aecb2e01a74f2574a81f9 100755 --- a/core/engine/local_cluster.py +++ b/core/engine/local_cluster.py @@ -46,10 +46,13 @@ class LocalClusterEngine(Engine): ports.append(new_port) break user_endpoints = ",".join(["127.0.0.1:" + str(x) for x in ports]) - user_endpoints_ips = [x.split(":")[0] - for x in user_endpoints.split(",")] - user_endpoints_port = [x.split(":")[1] - for x in user_endpoints.split(",")] + + user_endpoints_ips = [ + x.split(":")[0] for x in user_endpoints.split(",") + ] + user_endpoints_port = [ + x.split(":")[1] for x in user_endpoints.split(",") + ] factory = "paddlerec.core.factory" cmd = [sys.executable, "-u", "-m", factory, self.trainer] @@ -97,8 +100,10 @@ class LocalClusterEngine(Engine): if len(log_fns) > 0: log_fns[i].close() procs[i].terminate() - print("all workers already completed, you can view logs under the `{}` directory".format(logs_dir), - file=sys.stderr) + print( + "all workers already completed, you can view logs under the `{}` directory". + format(logs_dir), + file=sys.stderr) def run(self): self.start_procs() diff --git a/core/engine/local_mpi.py b/core/engine/local_mpi.py index 49db821fe5764ae9ef7f42cbd3ca2fe77b83a1d1..830bf28c4957e342d317070ab2060cde1de6d6a6 100755 --- a/core/engine/local_mpi.py +++ b/core/engine/local_mpi.py @@ -26,7 +26,6 @@ from paddlerec.core.engine.engine import Engine class LocalMPIEngine(Engine): def start_procs(self): logs_dir = self.envs["log_dir"] - default_env = os.environ.copy() current_env = copy.copy(default_env) current_env.pop("http_proxy", None) @@ -42,7 +41,8 @@ class LocalMPIEngine(Engine): os.system("mkdir -p {}".format(logs_dir)) fn = open("%s/job.log" % logs_dir, "w") log_fns.append(fn) - proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn, cwd=os.getcwd()) + proc = subprocess.Popen( + cmd, env=current_env, stdout=fn, stderr=fn, cwd=os.getcwd()) else: proc = subprocess.Popen(cmd, env=current_env, cwd=os.getcwd()) procs.append(proc) @@ -51,7 +51,9 @@ class LocalMPIEngine(Engine): if len(log_fns) > 0: log_fns[i].close() procs[i].wait() - print("all workers and parameter servers already completed", file=sys.stderr) + print( + "all workers and parameter servers already completed", + file=sys.stderr) def run(self): self.start_procs() diff --git a/core/factory.py b/core/factory.py index 4c08f1f6bbd70cc65011e8430e3acf039d7b6c8f..470b3a025e51d8c9fd6b2b3bcbb118fb8a619d77 100755 --- a/core/factory.py +++ b/core/factory.py @@ -19,24 +19,23 @@ import yaml from paddlerec.core.utils import envs -trainer_abs = os.path.join(os.path.dirname( - os.path.abspath(__file__)), "trainers") +trainer_abs = os.path.join( + os.path.dirname(os.path.abspath(__file__)), "trainers") trainers = {} def trainer_registry(): - trainers["SingleTrainer"] = os.path.join( - trainer_abs, "single_trainer.py") - trainers["ClusterTrainer"] = os.path.join( - trainer_abs, "cluster_trainer.py") - trainers["CtrCodingTrainer"] = os.path.join( - trainer_abs, "ctr_coding_trainer.py") - trainers["CtrModulTrainer"] = os.path.join( - trainer_abs, "ctr_modul_trainer.py") - trainers["TDMSingleTrainer"] = os.path.join( - trainer_abs, "tdm_single_trainer.py") - trainers["TDMClusterTrainer"] = os.path.join( - trainer_abs, "tdm_cluster_trainer.py") + trainers["SingleTrainer"] = os.path.join(trainer_abs, "single_trainer.py") + trainers["ClusterTrainer"] = os.path.join(trainer_abs, + "cluster_trainer.py") + trainers["CtrCodingTrainer"] = os.path.join(trainer_abs, + "ctr_coding_trainer.py") + trainers["CtrModulTrainer"] = os.path.join(trainer_abs, + "ctr_modul_trainer.py") + trainers["TDMSingleTrainer"] = os.path.join(trainer_abs, + "tdm_single_trainer.py") + trainers["TDMClusterTrainer"] = os.path.join(trainer_abs, + "tdm_cluster_trainer.py") trainer_registry() @@ -55,8 +54,8 @@ class TrainerFactory(object): if trainer_abs is None: if not os.path.isfile(train_mode): - raise IOError( - "trainer {} can not be recognized".format(train_mode)) + raise IOError("trainer {} can not be recognized".format( + train_mode)) trainer_abs = train_mode train_mode = "UserDefineTrainer" diff --git a/core/metrics/auc_metrics.py b/core/metrics/auc_metrics.py index 5dd16cc078aa43d8fb07a50a4b006d4fdae3b2e9..085c84990e4a0a3a3e606ef707fef5d90387e8b0 100755 --- a/core/metrics/auc_metrics.py +++ b/core/metrics/auc_metrics.py @@ -22,7 +22,7 @@ from paddlerec.core.metric import Metric class AUCMetric(Metric): """ - Metric For Paddle Model + Metric For Fluid Model """ def __init__(self, config, fleet): @@ -83,7 +83,8 @@ class AUCMetric(Metric): if scope.find_var(metric_item['var'].name) is None: result[metric_name] = None continue - result[metric_name] = self.get_metric(scope, metric_item['var'].name) + result[metric_name] = self.get_metric(scope, + metric_item['var'].name) return result def calculate_auc(self, global_pos, global_neg): @@ -178,14 +179,18 @@ class AUCMetric(Metric): self._result['mean_q'] = 0 return self._result if 'stat_pos' in result and 'stat_neg' in result: - result['auc'] = self.calculate_auc(result['stat_pos'], result['stat_neg']) - result['bucket_error'] = self.calculate_auc(result['stat_pos'], result['stat_neg']) + result['auc'] = self.calculate_auc(result['stat_pos'], + result['stat_neg']) + result['bucket_error'] = self.calculate_auc(result['stat_pos'], + result['stat_neg']) if 'pos_ins_num' in result: - result['actual_ctr'] = result['pos_ins_num'] / result['total_ins_num'] + result['actual_ctr'] = result['pos_ins_num'] / result[ + 'total_ins_num'] if 'abserr' in result: result['mae'] = result['abserr'] / result['total_ins_num'] if 'sqrerr' in result: - result['rmse'] = math.sqrt(result['sqrerr'] / result['total_ins_num']) + result['rmse'] = math.sqrt(result['sqrerr'] / + result['total_ins_num']) if 'prob' in result: result['predict_ctr'] = result['prob'] / result['total_ins_num'] if abs(result['predict_ctr']) > 1e-6: diff --git a/core/model.py b/core/model.py index b4150155db9677124eabab079f003743fc6c4d8b..82b41ebc4b7ea752e708b9d7246b6bf7d5025db4 100755 --- a/core/model.py +++ b/core/model.py @@ -20,7 +20,7 @@ from paddlerec.core.utils import envs class Model(object): - """R + """Base Model """ __metaclass__ = abc.ABCMeta @@ -38,6 +38,45 @@ class Model(object): self._namespace = "train.model" self._platform = envs.get_platform() + def _init_slots(self): + sparse_slots = envs.get_global_env("sparse_slots", None, + "train.reader") + dense_slots = envs.get_global_env("dense_slots", None, "train.reader") + + if sparse_slots is not None or dense_slots is not None: + sparse_slots = sparse_slots.strip().split(" ") + dense_slots = dense_slots.strip().split(" ") + dense_slots_shape = [[ + int(j) for j in i.split(":")[1].strip("[]").split(",") + ] for i in dense_slots] + dense_slots = [i.split(":")[0] for i in dense_slots] + self._dense_data_var = [] + for i in range(len(dense_slots)): + l = fluid.layers.data( + name=dense_slots[i], + shape=dense_slots_shape[i], + dtype="float32") + self._data_var.append(l) + self._dense_data_var.append(l) + self._sparse_data_var = [] + for name in sparse_slots: + l = fluid.layers.data( + name=name, shape=[1], lod_level=1, dtype="int64") + self._data_var.append(l) + self._sparse_data_var.append(l) + + dataset_class = envs.get_global_env("dataset_class", None, + "train.reader") + if dataset_class == "DataLoader": + self._init_dataloader() + + def _init_dataloader(self): + self._data_loader = fluid.io.DataLoader.from_generator( + feed_list=self._data_var, + capacity=64, + use_double_buffer=False, + iterable=False) + def get_inputs(self): return self._data_var @@ -68,8 +107,8 @@ class Model(object): "configured optimizer can only supported SGD/Adam/Adagrad") if name == "SGD": - reg = envs.get_global_env( - "hyper_parameters.reg", 0.0001, self._namespace) + reg = envs.get_global_env("hyper_parameters.reg", 0.0001, + self._namespace) optimizer_i = fluid.optimizer.SGD( lr, regularization=fluid.regularizer.L2DecayRegularizer(reg)) elif name == "ADAM": @@ -83,10 +122,10 @@ class Model(object): return optimizer_i def optimizer(self): - learning_rate = envs.get_global_env( - "hyper_parameters.learning_rate", None, self._namespace) - optimizer = envs.get_global_env( - "hyper_parameters.optimizer", None, self._namespace) + learning_rate = envs.get_global_env("hyper_parameters.learning_rate", + None, self._namespace) + optimizer = envs.get_global_env("hyper_parameters.optimizer", None, + self._namespace) print(">>>>>>>>>>>.learnig rate: %s" % learning_rate) return self._build_optimizer(optimizer, learning_rate) diff --git a/core/modules/__init__.py b/core/modules/__init__.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..abf198b97e6e818e1fbe59006f98492640bcee54 100755 --- a/core/modules/__init__.py +++ b/core/modules/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/core/modules/coding/__init__.py b/core/modules/coding/__init__.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..abf198b97e6e818e1fbe59006f98492640bcee54 100755 --- a/core/modules/coding/__init__.py +++ b/core/modules/coding/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/core/modules/coding/layers.py b/core/modules/coding/layers.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..abf198b97e6e818e1fbe59006f98492640bcee54 100755 --- a/core/modules/coding/layers.py +++ b/core/modules/coding/layers.py @@ -0,0 +1,13 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/core/modules/modul/__init__.py b/core/modules/modul/__init__.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..abf198b97e6e818e1fbe59006f98492640bcee54 100755 --- a/core/modules/modul/__init__.py +++ b/core/modules/modul/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/core/modules/modul/build.py b/core/modules/modul/build.py index 0263cbf60e3b1647a05cbc471b7bbff1840f88ba..dae777176e49831bde4f6f9938637a3289a0a218 100755 --- a/core/modules/modul/build.py +++ b/core/modules/modul/build.py @@ -31,6 +31,7 @@ def create(config): Model Instance """ model = None + if config['mode'] == 'fluid': model = YamlModel(config) model.train_net() @@ -50,7 +51,12 @@ class YamlModel(Model): f = open(config['layer_file'], 'r') self._build_nodes = yaml.safe_load(f.read()) self._build_phase = ['input', 'param', 'summary', 'layer'] - self._build_param = {'layer': {}, 'inner_layer': {}, 'layer_extend': {}, 'model': {}} + self._build_param = { + 'layer': {}, + 'inner_layer': {}, + 'layer_extend': {}, + 'model': {} + } self._inference_meta = {'dependency': {}, 'params': {}} def train_net(self): @@ -76,10 +82,12 @@ class YamlModel(Model): if self._build_nodes[phase] is None: continue for node in self._build_nodes[phase]: - exec("""layer=layer.{}(node)""".format(node['class'])) - layer_output, extend_output = layer.generate(self._config['mode'], self._build_param) + exec ("""layer=layer.{}(node)""".format(node['class'])) + layer_output, extend_output = layer.generate( + self._config['mode'], self._build_param) self._build_param['layer'][node['name']] = layer_output - self._build_param['layer_extend'][node['name']] = extend_output + self._build_param['layer_extend'][node[ + 'name']] = extend_output if extend_output is None: continue if 'loss' in extend_output: @@ -89,17 +97,24 @@ class YamlModel(Model): self._cost += extend_output['loss'] if 'data_var' in extend_output: self._data_var += extend_output['data_var'] - if 'metric_label' in extend_output and extend_output['metric_label'] is not None: - self._metrics[extend_output['metric_label']] = extend_output['metric_dict'] + if 'metric_label' in extend_output and extend_output[ + 'metric_label'] is not None: + self._metrics[extend_output[ + 'metric_label']] = extend_output['metric_dict'] if 'inference_param' in extend_output: inference_param = extend_output['inference_param'] param_name = inference_param['name'] if param_name not in self._build_param['table']: - self._build_param['table'][param_name] = {'params': []} - table_meta = table.TableMeta.alloc_new_table(inference_param['table_id']) - self._build_param['table'][param_name]['_meta'] = table_meta - self._build_param['table'][param_name]['params'] += inference_param['params'] + self._build_param['table'][param_name] = { + 'params': [] + } + table_meta = table.TableMeta.alloc_new_table( + inference_param['table_id']) + self._build_param['table'][param_name][ + '_meta'] = table_meta + self._build_param['table'][param_name][ + 'params'] += inference_param['params'] pass @classmethod @@ -114,20 +129,25 @@ class YamlModel(Model): metrics = params['metrics'] for name in metrics: model_metrics = metrics[name] - stat_var_names += [model_metrics[metric]['var'].name for metric in model_metrics] + stat_var_names += [ + model_metrics[metric]['var'].name + for metric in model_metrics + ] strategy['stat_var_names'] = list(set(stat_var_names)) optimizer_generator = 'optimizer = fluid.optimizer.' + optimizer_conf['class'] + \ '(learning_rate=' + str(optimizer_conf['learning_rate']) + ')' - exec(optimizer_generator) + exec (optimizer_generator) optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) return optimizer def dump_model_program(self, path): """R """ - with open(path + '/' + self._name + '_main_program.pbtxt', "w") as fout: + with open(path + '/' + self._name + '_main_program.pbtxt', + "w") as fout: print >> fout, self._build_param['model']['train_program'] - with open(path + '/' + self._name + '_startup_program.pbtxt', "w") as fout: + with open(path + '/' + self._name + '_startup_program.pbtxt', + "w") as fout: print >> fout, self._build_param['model']['startup_program'] pass @@ -137,7 +157,8 @@ class YamlModel(Model): scope = params['scope'] decay = params['decay'] for param_table in self._build_param['table']: - table_id = self._build_param['table'][param_table]['_meta']._table_id + table_id = self._build_param['table'][param_table][ + '_meta']._table_id fleet.shrink_dense_table(decay, scope=scope, table_id=table_id) def dump_inference_program(self, inference_layer, path): @@ -152,17 +173,25 @@ class YamlModel(Model): executor = params['executor'] program = self._build_param['model']['train_program'] for table_name, table in self._build_param['table'].items(): - fleet._fleet_ptr.pull_dense(scope, table['_meta']._table_id, table['params']) + fleet._fleet_ptr.pull_dense(scope, table['_meta']._table_id, + table['params']) for infernce_item in params['inference_list']: - params_name_list = self.inference_params(infernce_item['layer_name']) - params_var_list = [program.global_block().var(i) for i in params_name_list] + params_name_list = self.inference_params(infernce_item[ + 'layer_name']) + params_var_list = [ + program.global_block().var(i) for i in params_name_list + ] params_file_name = infernce_item['save_file_name'] with fluid.scope_guard(scope): if params['save_combine']: fluid.io.save_vars(executor, "./", \ program, vars=params_var_list, filename=params_file_name) else: - fluid.io.save_vars(executor, params_file_name, program, vars=params_var_list) + fluid.io.save_vars( + executor, + params_file_name, + program, + vars=params_var_list) def inference_params(self, inference_layer): """ @@ -177,11 +206,13 @@ class YamlModel(Model): return self._inference_meta['params'][layer] self._inference_meta['params'][layer] = [] - self._inference_meta['dependency'][layer] = self.get_dependency(self._build_param['inner_layer'], layer) + self._inference_meta['dependency'][layer] = self.get_dependency( + self._build_param['inner_layer'], layer) for node in self._build_nodes['layer']: if node['name'] not in self._inference_meta['dependency'][layer]: continue - if 'inference_param' in self._build_param['layer_extend'][node['name']]: + if 'inference_param' in self._build_param['layer_extend'][node[ + 'name']]: self._inference_meta['params'][layer] += \ self._build_param['layer_extend'][node['name']]['inference_param']['params'] return self._inference_meta['params'][layer] @@ -199,5 +230,6 @@ class YamlModel(Model): dependencys = copy.deepcopy(layer_graph[dest_layer]['input']) dependency_list = copy.deepcopy(dependencys) for dependency in dependencys: - dependency_list = dependency_list + self.get_dependency(layer_graph, dependency) + dependency_list = dependency_list + self.get_dependency( + layer_graph, dependency) return list(set(dependency_list)) diff --git a/core/modules/modul/layers.py b/core/modules/modul/layers.py index 26ee98a816a63c4121428b2dd5d2c835d05f7216..008ce6e40987a6a3adf6605590aa2b8fe53f034a 100755 --- a/core/modules/modul/layers.py +++ b/core/modules/modul/layers.py @@ -18,7 +18,7 @@ from paddlerec.core.layer import Layer class EmbeddingFuseLayer(Layer): - """R + """embedding + sequence + concat """ def __init__(self, config): @@ -40,7 +40,8 @@ class EmbeddingFuseLayer(Layer): show_clk.stop_gradient = True data_var = [] for slot in self._slots: - l = fluid.layers.data(name=slot, shape=[1], dtype="int64", lod_level=1) + l = fluid.layers.data( + name=slot, shape=[1], dtype="int64", lod_level=1) data_var.append(l) emb = fluid.layers.embedding(input=l, size=[10, self._emb_dim], \ is_sparse=True, is_distributed=True, @@ -48,7 +49,8 @@ class EmbeddingFuseLayer(Layer): emb = fluid.layers.sequence_pool(input=emb, pool_type='sum') emb = fluid.layers.continuous_value_model(emb, show_clk, self._cvm) self._emb_layers.append(emb) - output = fluid.layers.concat(input=self._emb_layers, axis=1, name=self._name) + output = fluid.layers.concat( + input=self._emb_layers, axis=1, name=self._name) return output, {'data_var': data_var} @@ -111,7 +113,13 @@ class ParamLayer(Layer): def generate(self, param): """R """ - return self._config, {'inference_param': {'name': 'param', 'params': [], 'table_id': self._table_id}} + return self._config, { + 'inference_param': { + 'name': 'param', + 'params': [], + 'table_id': self._table_id + } + } class SummaryLayer(Layer): @@ -129,10 +137,16 @@ class SummaryLayer(Layer): def generate(self, param): """R """ - return self._config, {'inference_param': {'name': 'summary', 'params': [], 'table_id': self._table_id}} + return self._config, { + 'inference_param': { + 'name': 'summary', + 'params': [], + 'table_id': self._table_id + } + } -class NormalizetionLayer(Layer): +class NormalizationLayer(Layer): """R """ @@ -152,9 +166,19 @@ class NormalizetionLayer(Layer): if len(self._input) > 0: input_list = [param['layer'][i] for i in self._input] input_layer = fluid.layers.concat(input=input_list, axis=1) - bn = fluid.layers.data_norm(input=input_layer, name=self._name, epsilon=1e-4, param_attr={ - "batch_size": 1e4, "batch_sum_default": 0.0, "batch_square": 1e4}) - inference_param = [self._name + '.batch_size', self._name + '.batch_sum', self._name + '.batch_square_sum'] + bn = fluid.layers.data_norm( + input=input_layer, + name=self._name, + epsilon=1e-4, + param_attr={ + "batch_size": 1e4, + "batch_sum_default": 0.0, + "batch_square": 1e4 + }) + inference_param = [ + self._name + '.batch_size', self._name + '.batch_sum', + self._name + '.batch_square_sum' + ] return bn, {'inference_param': {'name': 'summary', \ 'params': inference_param, 'table_id': summary_layer.get('table_id', -1)}} @@ -181,11 +205,13 @@ class FCLayer(Layer): input_list = [param['layer'][i] for i in self._input] input_layer = fluid.layers.concat(input=input_list, axis=1) input_coln = input_layer.shape[1] - scale = param_layer['init_range'] / (input_coln ** 0.5) + scale = param_layer['init_range'] / (input_coln**0.5) bias = None if self._bias: - bias = fluid.ParamAttr(learning_rate=1.0, - initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=scale)) + bias = fluid.ParamAttr( + learning_rate=1.0, + initializer=fluid.initializer.NormalInitializer( + loc=0.0, scale=scale)) fc = fluid.layers.fc( name=self._name, input=input_layer, @@ -216,18 +242,46 @@ class LogLossLayer(Layer): self._extend_output = { 'metric_label': self._metric_label, 'metric_dict': { - 'auc': {'var': None}, - 'batch_auc': {'var': None}, - 'stat_pos': {'var': None, 'data_type': 'int64'}, - 'stat_neg': {'var': None, 'data_type': 'int64'}, - 'batch_stat_pos': {'var': None, 'data_type': 'int64'}, - 'batch_stat_neg': {'var': None, 'data_type': 'int64'}, - 'pos_ins_num': {'var': None}, - 'abserr': {'var': None}, - 'sqrerr': {'var': None}, - 'prob': {'var': None}, - 'total_ins_num': {'var': None}, - 'q': {'var': None} + 'auc': { + 'var': None + }, + 'batch_auc': { + 'var': None + }, + 'stat_pos': { + 'var': None, + 'data_type': 'int64' + }, + 'stat_neg': { + 'var': None, + 'data_type': 'int64' + }, + 'batch_stat_pos': { + 'var': None, + 'data_type': 'int64' + }, + 'batch_stat_neg': { + 'var': None, + 'data_type': 'int64' + }, + 'pos_ins_num': { + 'var': None + }, + 'abserr': { + 'var': None + }, + 'sqrerr': { + 'var': None + }, + 'prob': { + 'var': None + }, + 'total_ins_num': { + 'var': None + }, + 'q': { + 'var': None + } } } @@ -236,9 +290,12 @@ class LogLossLayer(Layer): """ input_layer = param['layer'][self._input[0]] label_layer = param['layer'][self._label] - output = fluid.layers.clip(input_layer, self._bound[0], self._bound[1], name=self._name) + output = fluid.layers.clip( + input_layer, self._bound[0], self._bound[1], name=self._name) norm = fluid.layers.sigmoid(output, name=self._name) - output = fluid.layers.log_loss(norm, fluid.layers.cast(x=label_layer, dtype='float32')) + output = fluid.layers.log_loss( + norm, fluid.layers.cast( + x=label_layer, dtype='float32')) if self._weight: weight_layer = param['layer'][self._weight] output = fluid.layers.elementwise_mul(output, weight_layer) @@ -248,7 +305,11 @@ class LogLossLayer(Layer): # For AUC Metric metric = self._extend_output['metric_dict'] binary_predict = fluid.layers.concat( - input=[fluid.layers.elementwise_sub(fluid.layers.ceil(norm), norm), norm], axis=1) + input=[ + fluid.layers.elementwise_sub(fluid.layers.ceil(norm), norm), + norm + ], + axis=1) metric['auc']['var'], metric['batch_auc']['var'], [metric['batch_stat_pos']['var'], \ metric['batch_stat_neg']['var'], metric['stat_pos']['var'], metric['stat_neg']['var']] = \ diff --git a/core/reader.py b/core/reader.py index 955afa5e2f9a4d430912ad798de95df3160a3a69..85c0c4f9a57eea194343a6e1af6bfad2d07dd5a0 100755 --- a/core/reader.py +++ b/core/reader.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + from __future__ import print_function import abc @@ -44,3 +45,65 @@ class Reader(dg.MultiSlotDataGenerator): @abc.abstractmethod def generate_sample(self, line): pass + + +class SlotReader(dg.MultiSlotDataGenerator): + __metaclass__ = abc.ABCMeta + + def __init__(self, config): + dg.MultiSlotDataGenerator.__init__(self) + if os.path.isfile(config): + with open(config, 'r') as rb: + _config = yaml.load(rb.read(), Loader=yaml.FullLoader) + else: + raise ValueError("reader config only support yaml") + envs.set_global_envs(_config) + envs.update_workspace() + + def init(self, sparse_slots, dense_slots, padding=0): + from operator import mul + self.sparse_slots = sparse_slots.strip().split(" ") + self.dense_slots = dense_slots.strip().split(" ") + self.dense_slots_shape = [ + reduce(mul, + [int(j) for j in i.split(":")[1].strip("[]").split(",")]) + for i in self.dense_slots + ] + self.dense_slots = [i.split(":")[0] for i in self.dense_slots] + self.slots = self.dense_slots + self.sparse_slots + self.slot2index = {} + self.visit = {} + for i in range(len(self.slots)): + self.slot2index[self.slots[i]] = i + self.visit[self.slots[i]] = False + self.padding = padding + + def generate_sample(self, l): + def reader(): + line = l.strip().split(" ") + output = [(i, []) for i in self.slots] + for i in line: + slot_feasign = i.split(":") + slot = slot_feasign[0] + if slot not in self.slots: + continue + if slot in self.sparse_slots: + feasign = int(slot_feasign[1]) + else: + feasign = float(slot_feasign[1]) + output[self.slot2index[slot]][1].append(feasign) + self.visit[slot] = True + for i in self.visit: + slot = i + if not self.visit[slot]: + if i in self.dense_slots: + output[self.slot2index[i]][1].extend( + [self.padding] * + self.dense_slots_shape[self.slot2index[i]]) + else: + output[self.slot2index[i]][1].extend([self.padding]) + else: + self.visit[slot] = False + yield output + + return reader diff --git a/core/trainer.py b/core/trainer.py index 40fc35de973ce7841bfdf28dfc6c6a3751484be7..b7c22ea89bd279a2e1e233edeb4d8cf11b8aa5c0 100755 --- a/core/trainer.py +++ b/core/trainer.py @@ -30,8 +30,10 @@ class Trainer(object): def __init__(self, config=None): self._status_processor = {} + self._place = fluid.CPUPlace() self._exe = fluid.Executor(self._place) + self._exector_context = {} self._context = {'status': 'uninit', 'is_exit': False} self._config_yaml = config @@ -95,6 +97,6 @@ def user_define_engine(engine_yaml): train_dirname = os.path.dirname(train_location) base_name = os.path.splitext(os.path.basename(train_location))[0] sys.path.append(train_dirname) - trainer_class = envs.lazy_instance_by_fliename( - base_name, "UserDefineTraining") + trainer_class = envs.lazy_instance_by_fliename(base_name, + "UserDefineTraining") return trainer_class diff --git a/core/trainers/__init__.py b/core/trainers/__init__.py index cd9c9db5e6b93fd6171bca0a5b0f97f69306aedc..f14704cad8f3859746f95353ba68753f857ff78d 100755 --- a/core/trainers/__init__.py +++ b/core/trainers/__init__.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - """ trainer implement. @@ -22,5 +21,3 @@ Trainer ↘ (for online learning training) OnlineLearningTrainer """ - - diff --git a/core/trainers/cluster_trainer.py b/core/trainers/cluster_trainer.py index faa960359bc82d6130302002a99fb664c7374249..792b897f779b82a0989d6c25dd79663d52d05abd 100755 --- a/core/trainers/cluster_trainer.py +++ b/core/trainers/cluster_trainer.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - """ Training use fluid with one node only. """ @@ -43,11 +42,14 @@ class ClusterTrainer(TranspileTrainer): self.regist_context_processor('uninit', self.instance) self.regist_context_processor('init_pass', self.init) self.regist_context_processor('startup_pass', self.startup) - if envs.get_platform() == "LINUX" and envs.get_global_env("dataset_class", None, "train.reader") != "DataLoader": + + if envs.get_platform() == "LINUX" and envs.get_global_env( + "dataset_class", None, "train.reader") != "DataLoader": self.regist_context_processor('train_pass', self.dataset_train) else: - self.regist_context_processor( - 'train_pass', self.dataloader_train) + self.regist_context_processor('train_pass', + self.dataloader_train) + self.regist_context_processor('infer_pass', self.infer) self.regist_context_processor('terminal_pass', self.terminal) @@ -75,8 +77,8 @@ class ClusterTrainer(TranspileTrainer): def init(self, context): self.model.train_net() optimizer = self.model.optimizer() - optimizer_name = envs.get_global_env( - "hyper_parameters.optimizer", None, "train.model") + optimizer_name = envs.get_global_env("hyper_parameters.optimizer", + None, "train.model") if optimizer_name not in ["", "sgd", "SGD", "Sgd"]: os.environ["FLAGS_communicator_is_sgd_optimizer"] = '0' @@ -114,9 +116,9 @@ class ClusterTrainer(TranspileTrainer): program = fluid.compiler.CompiledProgram( fleet.main_program).with_data_parallel( - loss_name=self.model.get_avg_cost().name, - build_strategy=self.strategy.get_build_strategy(), - exec_strategy=self.strategy.get_execute_strategy()) + loss_name=self.model.get_avg_cost().name, + build_strategy=self.strategy.get_build_strategy(), + exec_strategy=self.strategy.get_execute_strategy()) metrics_varnames = [] metrics_format = [] @@ -135,9 +137,8 @@ class ClusterTrainer(TranspileTrainer): batch_id = 0 try: while True: - metrics_rets = self._exe.run( - program=program, - fetch_list=metrics_varnames) + metrics_rets = self._exe.run(program=program, + fetch_list=metrics_varnames) metrics = [epoch, batch_id] metrics.extend(metrics_rets) @@ -162,14 +163,16 @@ class ClusterTrainer(TranspileTrainer): for i in range(epochs): begin_time = time.time() - self._exe.train_from_dataset(program=fluid.default_main_program(), - dataset=dataset, - fetch_list=self.fetch_vars, - fetch_info=self.fetch_alias, - print_period=self.fetch_period) + self._exe.train_from_dataset( + program=fluid.default_main_program(), + dataset=dataset, + fetch_list=self.fetch_vars, + fetch_info=self.fetch_alias, + print_period=self.fetch_period) end_time = time.time() - times = end_time-begin_time - print("epoch {} using time {}, speed {:.2f} lines/s".format(i, times, ins/times)) + times = end_time - begin_time + print("epoch {} using time {}, speed {:.2f} lines/s".format( + i, times, ins / times)) self.save(i, "train", is_fleet=True) fleet.stop_worker() diff --git a/core/trainers/ctr_coding_trainer.py b/core/trainers/ctr_coding_trainer.py index 3bfec28cfd149bdbe47fdc202107c7ed7af58fdd..7dc51f340147aec933ce8bffd0be080b7be984c6 100755 --- a/core/trainers/ctr_coding_trainer.py +++ b/core/trainers/ctr_coding_trainer.py @@ -59,8 +59,10 @@ class CtrTrainer(Trainer): reader_class = envs.get_global_env("class", None, namespace) abs_dir = os.path.dirname(os.path.abspath(__file__)) reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py') - pipe_cmd = "python {} {} {} {}".format(reader, reader_class, "TRAIN", self._config_yaml) - train_data_path = envs.get_global_env("train_data_path", None, namespace) + pipe_cmd = "python {} {} {} {}".format(reader, reader_class, "TRAIN", + self._config_yaml) + train_data_path = envs.get_global_env("train_data_path", None, + namespace) dataset = fluid.DatasetFactory().create_dataset() dataset.set_use_var(inputs) @@ -87,7 +89,8 @@ class CtrTrainer(Trainer): self.model.train_net() optimizer = self.model.optimizer() - optimizer = fleet.distributed_optimizer(optimizer, strategy={"use_cvm": False}) + optimizer = fleet.distributed_optimizer( + optimizer, strategy={"use_cvm": False}) optimizer.minimize(self.model.get_avg_cost()) if fleet.is_server(): @@ -118,16 +121,18 @@ class CtrTrainer(Trainer): gs = shuf * 0 fleet._role_maker._node_type_comm.Allreduce(shuf, gs) - print("trainer id: {}, trainers: {}, gs: {}".format(fleet.worker_index(), fleet.worker_num(), gs)) + print("trainer id: {}, trainers: {}, gs: {}".format(fleet.worker_index( + ), fleet.worker_num(), gs)) epochs = envs.get_global_env("train.epochs") for i in range(epochs): - self._exe.train_from_dataset(program=fluid.default_main_program(), - dataset=dataset, - fetch_list=self.fetch_vars, - fetch_info=self.fetch_alias, - print_period=self.fetch_period) + self._exe.train_from_dataset( + program=fluid.default_main_program(), + dataset=dataset, + fetch_list=self.fetch_vars, + fetch_info=self.fetch_alias, + print_period=self.fetch_period) context['status'] = 'terminal_pass' fleet.stop_worker() diff --git a/core/trainers/ctr_modul_trainer.py b/core/trainers/ctr_modul_trainer.py index 7b3bd7874359059c3b03289cc10da7d7756ac35b..af8f3f3a2c3fb59fc6db60e3e4cd050ca3d8ad8a 100755 --- a/core/trainers/ctr_modul_trainer.py +++ b/core/trainers/ctr_modul_trainer.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - import datetime import json import sys @@ -23,7 +22,6 @@ import paddle.fluid as fluid from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker - from paddlerec.core.utils import fs as fs from paddlerec.core.utils import util as util from paddlerec.core.metrics.auc_metrics import AUCMetric @@ -80,20 +78,31 @@ class CtrTrainer(Trainer): """R """ Trainer.__init__(self, config) - config['output_path'] = util.get_absolute_path( - config['output_path'], config['io']['afs']) + config['output_path'] = util.get_absolute_path(config['output_path'], + config['io']['afs']) self.global_config = config self._metrics = {} self._path_generator = util.PathGenerator({ - 'templates': [ - {'name': 'xbox_base_done', 'template': config['output_path'] + '/xbox_base_done.txt'}, - {'name': 'xbox_delta_done', 'template': config['output_path'] + '/xbox_patch_done.txt'}, - {'name': 'xbox_base', 'template': config['output_path'] + '/xbox/{day}/base/'}, - {'name': 'xbox_delta', 'template': config['output_path'] + '/xbox/{day}/delta-{pass_id}/'}, - {'name': 'batch_model', 'template': config['output_path'] + '/batch_model/{day}/{pass_id}/'} - ] + 'templates': [{ + 'name': 'xbox_base_done', + 'template': config['output_path'] + '/xbox_base_done.txt' + }, { + 'name': 'xbox_delta_done', + 'template': config['output_path'] + '/xbox_patch_done.txt' + }, { + 'name': 'xbox_base', + 'template': config['output_path'] + '/xbox/{day}/base/' + }, { + 'name': 'xbox_delta', + 'template': + config['output_path'] + '/xbox/{day}/delta-{pass_id}/' + }, { + 'name': 'batch_model', + 'template': + config['output_path'] + '/batch_model/{day}/{pass_id}/' + }] }) if 'path_generator' in config: self._path_generator.add_path_template(config['path_generator']) @@ -111,9 +120,11 @@ class CtrTrainer(Trainer): if self.global_config.get('process_mode', 'mpi') == 'brilliant_cpu': afs_config = self.global_config['io']['afs'] role_maker = GeneralRoleMaker( - hdfs_name=afs_config['fs_name'], hdfs_ugi=afs_config['fs_ugi'], + hdfs_name=afs_config['fs_name'], + hdfs_ugi=afs_config['fs_ugi'], path=self.global_config['output_path'] + "/gloo", - init_timeout_seconds=1200, run_timeout_seconds=1200) + init_timeout_seconds=1200, + run_timeout_seconds=1200) fleet.init(role_maker) data_var_list = [] data_var_name_dict = {} @@ -125,7 +136,8 @@ class CtrTrainer(Trainer): scope = fluid.Scope() self._exector_context[executor['name']] = {} self._exector_context[executor['name']]['scope'] = scope - self._exector_context[executor['name']]['model'] = model_basic.create(executor) + self._exector_context[executor['name']][ + 'model'] = model_basic.create(executor) model = self._exector_context[executor['name']]['model'] self._metrics.update(model.get_metrics()) runnnable_scope.append(scope) @@ -146,9 +158,12 @@ class CtrTrainer(Trainer): model = self._exector_context[executor['name']]['model'] program = model._build_param['model']['train_program'] if not executor['is_update_sparse']: - program._fleet_opt["program_configs"][str(id(model.get_avg_cost().block.program))]["push_sparse"] = [] + program._fleet_opt["program_configs"][str( + id(model.get_avg_cost().block.program))][ + "push_sparse"] = [] if 'train_thread_num' not in executor: - executor['train_thread_num'] = self.global_config['train_thread_num'] + executor['train_thread_num'] = self.global_config[ + 'train_thread_num'] with fluid.scope_guard(scope): self._exe.run(model._build_param['model']['startup_program']) model.dump_model_program('./') @@ -162,7 +177,8 @@ class CtrTrainer(Trainer): dataset_item['data_vars'] = data_var_list dataset_item.update(self.global_config['io']['afs']) dataset_item["batch_size"] = self.global_config['batch_size'] - self._dataset[dataset_item['name']] = dataset.FluidTimeSplitDataset(dataset_item) + self._dataset[dataset_item[ + 'name']] = dataset.FluidTimeSplitDataset(dataset_item) # if config.need_reqi_changeslot and config.reqi_dnn_plugin_day >= last_day and config.reqi_dnn_plugin_pass >= last_pass: # util.reqi_changeslot(config.hdfs_dnn_plugin_path, join_save_params, common_save_params, update_save_params, scope2, scope3) fleet.init_worker() @@ -190,23 +206,30 @@ class CtrTrainer(Trainer): metric_param = {'label': metric, 'metric_dict': metrics[metric]} metric_calculator.calculate(scope, metric_param) metric_result = metric_calculator.get_result_to_string() - self.print_log(metric_result, {'master': True, 'stdout': stdout_str}) + self.print_log(metric_result, + {'master': True, + 'stdout': stdout_str}) monitor_data += metric_result metric_calculator.clear(scope, metric_param) def save_model(self, day, pass_index, base_key): """R """ - cost_printer = util.CostPrinter(util.print_cost, - {'master': True, 'log_format': 'save model cost %s sec'}) - model_path = self._path_generator.generate_path('batch_model', {'day': day, 'pass_id': pass_index}) + cost_printer = util.CostPrinter(util.print_cost, { + 'master': True, + 'log_format': 'save model cost %s sec' + }) + model_path = self._path_generator.generate_path( + 'batch_model', {'day': day, + 'pass_id': pass_index}) save_mode = 0 # just save all if pass_index < 1: # batch_model save_mode = 3 # unseen_day++, save all util.rank0_print("going to save_model %s" % model_path) fleet.save_persistables(None, model_path, mode=save_mode) if fleet._role_maker.is_first_worker(): - self._train_pass.save_train_progress(day, pass_index, base_key, model_path, is_checkpoint=True) + self._train_pass.save_train_progress( + day, pass_index, base_key, model_path, is_checkpoint=True) cost_printer.done() return model_path @@ -225,46 +248,58 @@ class CtrTrainer(Trainer): if pass_index < 1: save_mode = 2 xbox_patch_id = xbox_base_key - model_path = self._path_generator.generate_path('xbox_base', {'day': day}) - xbox_model_donefile = self._path_generator.generate_path('xbox_base_done', {'day': day}) + model_path = self._path_generator.generate_path('xbox_base', + {'day': day}) + xbox_model_donefile = self._path_generator.generate_path( + 'xbox_base_done', {'day': day}) else: save_mode = 1 - model_path = self._path_generator.generate_path('xbox_delta', {'day': day, 'pass_id': pass_index}) - xbox_model_donefile = self._path_generator.generate_path('xbox_delta_done', {'day': day}) - total_save_num = fleet.save_persistables(None, model_path, mode=save_mode) + model_path = self._path_generator.generate_path( + 'xbox_delta', {'day': day, + 'pass_id': pass_index}) + xbox_model_donefile = self._path_generator.generate_path( + 'xbox_delta_done', {'day': day}) + total_save_num = fleet.save_persistables( + None, model_path, mode=save_mode) cost_printer.done() - cost_printer = util.CostPrinter(util.print_cost, {'master': True, - 'log_format': 'save cache model cost %s sec', - 'stdout': stdout_str}) + cost_printer = util.CostPrinter(util.print_cost, { + 'master': True, + 'log_format': 'save cache model cost %s sec', + 'stdout': stdout_str + }) model_file_handler = fs.FileHandler(self.global_config['io']['afs']) if self.global_config['save_cache_model']: - cache_save_num = fleet.save_cache_model(None, model_path, mode=save_mode) + cache_save_num = fleet.save_cache_model( + None, model_path, mode=save_mode) model_file_handler.write( "file_prefix:part\npart_num:16\nkey_num:%d\n" % cache_save_num, model_path + '/000_cache/sparse_cache.meta', 'w') cost_printer.done() - util.rank0_print("save xbox cache model done, key_num=%s" % cache_save_num) + util.rank0_print("save xbox cache model done, key_num=%s" % + cache_save_num) - save_env_param = { - 'executor': self._exe, - 'save_combine': True - } - cost_printer = util.CostPrinter(util.print_cost, {'master': True, - 'log_format': 'save dense model cost %s sec', - 'stdout': stdout_str}) + save_env_param = {'executor': self._exe, 'save_combine': True} + cost_printer = util.CostPrinter(util.print_cost, { + 'master': True, + 'log_format': 'save dense model cost %s sec', + 'stdout': stdout_str + }) if fleet._role_maker.is_first_worker(): for executor in self.global_config['executor']: if 'layer_for_inference' not in executor: continue executor_name = executor['name'] model = self._exector_context[executor_name]['model'] - save_env_param['inference_list'] = executor['layer_for_inference'] - save_env_param['scope'] = self._exector_context[executor_name]['scope'] + save_env_param['inference_list'] = executor[ + 'layer_for_inference'] + save_env_param['scope'] = self._exector_context[executor_name][ + 'scope'] model.dump_inference_param(save_env_param) for dnn_layer in executor['layer_for_inference']: model_file_handler.cp(dnn_layer['save_file_name'], - model_path + '/dnn_plugin/' + dnn_layer['save_file_name']) + model_path + '/dnn_plugin/' + + dnn_layer['save_file_name']) fleet._role_maker._barrier_worker() cost_printer.done() @@ -282,9 +317,15 @@ class CtrTrainer(Trainer): "job_name": util.get_env_value("JOB_NAME") } if fleet._role_maker.is_first_worker(): - model_file_handler.write(json.dumps(xbox_done_info) + "\n", xbox_model_donefile, 'a') + model_file_handler.write( + json.dumps(xbox_done_info) + "\n", xbox_model_donefile, 'a') if pass_index > 0: - self._train_pass.save_train_progress(day, pass_index, xbox_base_key, model_path, is_checkpoint=False) + self._train_pass.save_train_progress( + day, + pass_index, + xbox_base_key, + model_path, + is_checkpoint=False) fleet._role_maker._barrier_worker() return stdout_str @@ -301,21 +342,28 @@ class CtrTrainer(Trainer): util.rank0_print("Begin " + executor_name + " pass") begin = time.time() program = model._build_param['model']['train_program'] - self._exe.train_from_dataset(program, dataset, scope, - thread=executor_config['train_thread_num'], debug=self.global_config['debug']) + self._exe.train_from_dataset( + program, + dataset, + scope, + thread=executor_config['train_thread_num'], + debug=self.global_config['debug']) end = time.time() local_cost = (end - begin) / 60.0 avg_cost = worker_numric_avg(local_cost) min_cost = worker_numric_min(local_cost) max_cost = worker_numric_max(local_cost) - util.rank0_print("avg train time %s mins, min %s mins, max %s mins" % (avg_cost, min_cost, max_cost)) + util.rank0_print("avg train time %s mins, min %s mins, max %s mins" + % (avg_cost, min_cost, max_cost)) self._exector_context[executor_name]['cost'] = max_cost monitor_data = "" self.print_global_metrics(scope, model, monitor_data, stdout_str) util.rank0_print("End " + executor_name + " pass") - if self._train_pass.need_dump_inference(pass_id) and executor_config['dump_inference_model']: - stdout_str += self.save_xbox_model(day, pass_id, xbox_base_key, monitor_data) + if self._train_pass.need_dump_inference( + pass_id) and executor_config['dump_inference_model']: + stdout_str += self.save_xbox_model(day, pass_id, xbox_base_key, + monitor_data) fleet._role_maker._barrier_worker() def startup(self, context): @@ -328,10 +376,14 @@ class CtrTrainer(Trainer): stdout_str = "" self._train_pass = util.TimeTrainPass(self.global_config) if not self.global_config['cold_start']: - cost_printer = util.CostPrinter(util.print_cost, - {'master': True, 'log_format': 'load model cost %s sec', - 'stdout': stdout_str}) - self.print_log("going to load model %s" % self._train_pass._checkpoint_model_path, {'master': True}) + cost_printer = util.CostPrinter(util.print_cost, { + 'master': True, + 'log_format': 'load model cost %s sec', + 'stdout': stdout_str + }) + self.print_log("going to load model %s" % + self._train_pass._checkpoint_model_path, + {'master': True}) # if config.need_reqi_changeslot and config.reqi_dnn_plugin_day >= self._train_pass.date() # and config.reqi_dnn_plugin_pass >= self._pass_id: # fleet.load_one_table(0, self._train_pass._checkpoint_model_path) @@ -340,9 +392,12 @@ class CtrTrainer(Trainer): cost_printer.done() if self.global_config['save_first_base']: self.print_log("save_first_base=True", {'master': True}) - self.print_log("going to save xbox base model", {'master': True, 'stdout': stdout_str}) + self.print_log("going to save xbox base model", + {'master': True, + 'stdout': stdout_str}) self._train_pass._base_key = int(time.time()) - stdout_str += self.save_xbox_model(self._train_pass.date(), 0, self._train_pass._base_key, "") + stdout_str += self.save_xbox_model(self._train_pass.date(), 0, + self._train_pass._base_key, "") context['status'] = 'begin_day' def begin_day(self, context): @@ -353,7 +408,9 @@ class CtrTrainer(Trainer): context['is_exit'] = True day = self._train_pass.date() pass_id = self._train_pass._pass_id - self.print_log("======== BEGIN DAY:%s ========" % day, {'master': True, 'stdout': stdout_str}) + self.print_log("======== BEGIN DAY:%s ========" % day, + {'master': True, + 'stdout': stdout_str}) if pass_id == self._train_pass.max_pass_num_day(): context['status'] = 'end_day' else: @@ -368,8 +425,10 @@ class CtrTrainer(Trainer): context['status'] = 'begin_day' util.rank0_print("shrink table") - cost_printer = util.CostPrinter(util.print_cost, - {'master': True, 'log_format': 'shrink table done, cost %s sec'}) + cost_printer = util.CostPrinter(util.print_cost, { + 'master': True, + 'log_format': 'shrink table done, cost %s sec' + }) fleet.shrink_sparse_table() for executor in self._exector_context: self._exector_context[executor]['model'].shrink({ @@ -394,7 +453,9 @@ class CtrTrainer(Trainer): pass_id = self._train_pass._pass_id base_key = self._train_pass._base_key pass_time = self._train_pass._current_train_time.strftime("%Y%m%d%H%M") - self.print_log(" ==== begin delta:%s ========" % pass_id, {'master': True, 'stdout': stdout_str}) + self.print_log(" ==== begin delta:%s ========" % pass_id, + {'master': True, + 'stdout': stdout_str}) train_begin_time = time.time() cost_printer = util.CostPrinter(util.print_cost, \ @@ -403,35 +464,46 @@ class CtrTrainer(Trainer): current_dataset = {} for name in self._dataset: current_dataset[name] = self._dataset[name].load_dataset({ - 'node_num': fleet.worker_num(), 'node_idx': fleet.worker_index(), - 'begin_time': pass_time, 'time_window_min': self._train_pass._interval_per_pass + 'node_num': fleet.worker_num(), + 'node_idx': fleet.worker_index(), + 'begin_time': pass_time, + 'time_window_min': self._train_pass._interval_per_pass }) fleet._role_maker._barrier_worker() cost_printer.done() util.rank0_print("going to global shuffle") cost_printer = util.CostPrinter(util.print_cost, { - 'master': True, 'stdout': stdout_str, - 'log_format': 'global shuffle done, cost %s sec'}) + 'master': True, + 'stdout': stdout_str, + 'log_format': 'global shuffle done, cost %s sec' + }) for name in current_dataset: - current_dataset[name].global_shuffle(fleet, self.global_config['dataset']['shuffle_thread']) + current_dataset[name].global_shuffle( + fleet, self.global_config['dataset']['shuffle_thread']) cost_printer.done() # str(dataset.get_shuffle_data_size(fleet)) fleet._role_maker._barrier_worker() if self.global_config['prefetch_data']: - next_pass_time = (self._train_pass._current_train_time + - datetime.timedelta(minutes=self._train_pass._interval_per_pass)).strftime("%Y%m%d%H%M") + next_pass_time = ( + self._train_pass._current_train_time + datetime.timedelta( + minutes=self._train_pass._interval_per_pass) + ).strftime("%Y%m%d%H%M") for name in self._dataset: self._dataset[name].preload_dataset({ - 'node_num': fleet.worker_num(), 'node_idx': fleet.worker_index(), - 'begin_time': next_pass_time, 'time_window_min': self._train_pass._interval_per_pass + 'node_num': fleet.worker_num(), + 'node_idx': fleet.worker_index(), + 'begin_time': next_pass_time, + 'time_window_min': self._train_pass._interval_per_pass }) fleet._role_maker._barrier_worker() pure_train_begin = time.time() for executor in self.global_config['executor']: - self.run_executor(executor, current_dataset[executor['dataset_name']], stdout_str) + self.run_executor(executor, + current_dataset[executor['dataset_name']], + stdout_str) cost_printer = util.CostPrinter(util.print_cost, \ {'master': True, 'log_format': 'release_memory cost %s sec'}) for name in current_dataset: @@ -444,9 +516,11 @@ class CtrTrainer(Trainer): train_end_time = time.time() train_cost = train_end_time - train_begin_time other_cost = train_cost - pure_train_cost - log_str = "finished train day %s pass %s time cost:%s sec job time cost:" % (day, pass_id, train_cost) + log_str = "finished train day %s pass %s time cost:%s sec job time cost:" % ( + day, pass_id, train_cost) for executor in self._exector_context: - log_str += '[' + executor + ':' + str(self._exector_context[executor]['cost']) + ']' + log_str += '[' + executor + ':' + str(self._exector_context[ + executor]['cost']) + ']' log_str += '[other_cost:' + str(other_cost) + ']' util.rank0_print(log_str) stdout_str += util.now_time_str() + log_str diff --git a/core/trainers/online_learning_trainer.py b/core/trainers/online_learning_trainer.py index 0303e96ac0bb20b1f46cdc9f5836d18fa73b9a8e..b285684464ed2cd1d8bfd7710d6f28d30de3f936 100755 --- a/core/trainers/online_learning_trainer.py +++ b/core/trainers/online_learning_trainer.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - """ Training use fluid with one node only. """ @@ -44,11 +43,14 @@ class OnlineLearningTrainer(TranspileTrainer): self.regist_context_processor('uninit', self.instance) self.regist_context_processor('init_pass', self.init) self.regist_context_processor('startup_pass', self.startup) - if envs.get_platform() == "LINUX" and envs.get_global_env("dataset_class", None, "train.reader") != "DataLoader": + + if envs.get_platform() == "LINUX" and envs.get_global_env( + "dataset_class", None, "train.reader") != "DataLoader": self.regist_context_processor('train_pass', self.dataset_train) else: - self.regist_context_processor( - 'train_pass', self.dataloader_train) + self.regist_context_processor('train_pass', + self.dataloader_train) + self.regist_context_processor('infer_pass', self.infer) self.regist_context_processor('terminal_pass', self.terminal) @@ -110,27 +112,27 @@ class OnlineLearningTrainer(TranspileTrainer): if state == "TRAIN": inputs = self.model.get_inputs() namespace = "train.reader" - train_data_path = envs.get_global_env( - "train_data_path", None, namespace) + train_data_path = envs.get_global_env("train_data_path", None, + namespace) else: inputs = self.model.get_infer_inputs() namespace = "evaluate.reader" - train_data_path = envs.get_global_env( - "test_data_path", None, namespace) + train_data_path = envs.get_global_env("test_data_path", None, + namespace) threads = int(envs.get_runtime_environ("train.trainer.threads")) batch_size = envs.get_global_env("batch_size", None, namespace) reader_class = envs.get_global_env("class", None, namespace) abs_dir = os.path.dirname(os.path.abspath(__file__)) reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py') - pipe_cmd = "python {} {} {} {}".format( - reader, reader_class, state, self._config_yaml) + pipe_cmd = "python {} {} {} {}".format(reader, reader_class, state, + self._config_yaml) if train_data_path.startswith("paddlerec::"): package_base = envs.get_runtime_environ("PACKAGE_BASE") assert package_base is not None - train_data_path = os.path.join( - package_base, train_data_path.split("::")[1]) + train_data_path = os.path.join(package_base, + train_data_path.split("::")[1]) dataset = fluid.DatasetFactory().create_dataset() dataset.set_use_var(inputs) @@ -166,14 +168,16 @@ class OnlineLearningTrainer(TranspileTrainer): ins = self._get_dataset_ins() begin_time = time.time() - self._exe.train_from_dataset(program=fluid.default_main_program(), - dataset=dataset, - fetch_list=self.fetch_vars, - fetch_info=self.fetch_alias, - print_period=self.fetch_period) + self._exe.train_from_dataset( + program=fluid.default_main_program(), + dataset=dataset, + fetch_list=self.fetch_vars, + fetch_info=self.fetch_alias, + print_period=self.fetch_period) end_time = time.time() - times = end_time-begin_time - print("epoch {} using time {}, speed {:.2f} lines/s".format(i, times, ins/times)) + times = end_time - begin_time + print("epoch {} using time {}, speed {:.2f} lines/s".format( + i, times, ins / times)) self.save(i, "train", is_fleet=True) fleet.stop_worker() diff --git a/core/trainers/single_trainer.py b/core/trainers/single_trainer.py index 8079377ba257041e4946d6e452cacaa388ca36ce..a564ba5585c313a163542f028fa158f8c50c8d2a 100755 --- a/core/trainers/single_trainer.py +++ b/core/trainers/single_trainer.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - """ Training use fluid with one node only. """ @@ -36,8 +35,9 @@ class SingleTrainer(TranspileTrainer): self.regist_context_processor('uninit', self.instance) self.regist_context_processor('init_pass', self.init) self.regist_context_processor('startup_pass', self.startup) - if envs.get_platform() == "LINUX" and envs.get_global_env("dataset_class", None, - "train.reader") != "DataLoader": + + if envs.get_platform() == "LINUX" and envs.get_global_env( + "dataset_class", None, "train.reader") != "DataLoader": self.regist_context_processor('train_pass', self.dataset_train) else: self.regist_context_processor('train_pass', self.dataloader_train) @@ -73,9 +73,8 @@ class SingleTrainer(TranspileTrainer): reader = self._get_dataloader("TRAIN") epochs = envs.get_global_env("train.epochs") - program = fluid.compiler.CompiledProgram( - fluid.default_main_program()).with_data_parallel( - loss_name=self.model.get_avg_cost().name) + program = fluid.compiler.CompiledProgram(fluid.default_main_program( + )).with_data_parallel(loss_name=self.model.get_avg_cost().name) metrics_varnames = [] metrics_format = [] @@ -94,9 +93,8 @@ class SingleTrainer(TranspileTrainer): batch_id = 0 try: while True: - metrics_rets = self._exe.run( - program=program, - fetch_list=metrics_varnames) + metrics_rets = self._exe.run(program=program, + fetch_list=metrics_varnames) metrics = [epoch, batch_id] metrics.extend(metrics_rets) @@ -117,14 +115,16 @@ class SingleTrainer(TranspileTrainer): epochs = envs.get_global_env("train.epochs") for i in range(epochs): begin_time = time.time() - self._exe.train_from_dataset(program=fluid.default_main_program(), - dataset=dataset, - fetch_list=self.fetch_vars, - fetch_info=self.fetch_alias, - print_period=self.fetch_period) + self._exe.train_from_dataset( + program=fluid.default_main_program(), + dataset=dataset, + fetch_list=self.fetch_vars, + fetch_info=self.fetch_alias, + print_period=self.fetch_period) end_time = time.time() times = end_time - begin_time - print("epoch {} using time {}, speed {:.2f} lines/s".format(i, times, ins / times)) + print("epoch {} using time {}, speed {:.2f} lines/s".format( + i, times, ins / times)) self.save(i, "train", is_fleet=False) context['status'] = 'infer_pass' diff --git a/core/trainers/tdm_cluster_trainer.py b/core/trainers/tdm_cluster_trainer.py index 3bd1ad3367f340019333e8f83cf5abdd3b36b25f..a7e8f97e446bc266a733fc12a798c505ee4d9ec5 100755 --- a/core/trainers/tdm_cluster_trainer.py +++ b/core/trainers/tdm_cluster_trainer.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - """ Training use fluid with one node only. """ @@ -36,8 +35,8 @@ special_param = ["TDM_Tree_Travel", "TDM_Tree_Layer", "TDM_Tree_Info"] class TDMClusterTrainer(ClusterTrainer): def server(self, context): namespace = "train.startup" - init_model_path = envs.get_global_env( - "cluster.init_model_path", "", namespace) + init_model_path = envs.get_global_env("cluster.init_model_path", "", + namespace) assert init_model_path != "", "Cluster train must has init_model for TDM" fleet.init_server(init_model_path) logger.info("TDM: load model from {}".format(init_model_path)) @@ -48,24 +47,27 @@ class TDMClusterTrainer(ClusterTrainer): self._exe.run(fleet.startup_program) namespace = "train.startup" - load_tree = envs.get_global_env( - "tree.load_tree", True, namespace) - self.tree_layer_path = envs.get_global_env( - "tree.tree_layer_path", "", namespace) - self.tree_travel_path = envs.get_global_env( - "tree.tree_travel_path", "", namespace) - self.tree_info_path = envs.get_global_env( - "tree.tree_info_path", "", namespace) - - save_init_model = envs.get_global_env( - "cluster.save_init_model", False, namespace) - init_model_path = envs.get_global_env( - "cluster.init_model_path", "", namespace) + load_tree = envs.get_global_env("tree.load_tree", True, namespace) + + self.tree_layer_path = envs.get_global_env("tree.tree_layer_path", "", + namespace) + + self.tree_travel_path = envs.get_global_env("tree.tree_travel_path", + "", namespace) + + self.tree_info_path = envs.get_global_env("tree.tree_info_path", "", + namespace) + + save_init_model = envs.get_global_env("cluster.save_init_model", False, + namespace) + init_model_path = envs.get_global_env("cluster.init_model_path", "", + namespace) if load_tree: # covert tree to tensor, set it into Fluid's variable. for param_name in special_param: - param_t = fluid.global_scope().find_var(param_name).get_tensor() + param_t = fluid.global_scope().find_var(param_name).get_tensor( + ) param_array = self._tdm_prepare(param_name) param_t.set(param_array.astype('int32'), self._place) @@ -93,8 +95,8 @@ class TDMClusterTrainer(ClusterTrainer): def _tdm_travel_prepare(self): """load tdm tree param from npy/list file""" travel_array = np.load(self.tree_travel_path) - logger.info("TDM Tree leaf node nums: {}".format( - travel_array.shape[0])) + logger.info("TDM Tree leaf node nums: {}".format(travel_array.shape[ + 0])) return travel_array def _tdm_layer_prepare(self): diff --git a/core/trainers/tdm_single_trainer.py b/core/trainers/tdm_single_trainer.py index 21be66a677750f6e817b63794819b14ed72d9fa2..c0f23fc361e907ca5732a3531fc7c460ddc5aad3 100755 --- a/core/trainers/tdm_single_trainer.py +++ b/core/trainers/tdm_single_trainer.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - """ Training use fluid with one node only. """ @@ -27,33 +26,38 @@ from paddlerec.core.utils import envs logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger("fluid") logger.setLevel(logging.INFO) -special_param = ["TDM_Tree_Travel", "TDM_Tree_Layer", - "TDM_Tree_Info", "TDM_Tree_Emb"] +special_param = [ + "TDM_Tree_Travel", "TDM_Tree_Layer", "TDM_Tree_Info", "TDM_Tree_Emb" +] class TDMSingleTrainer(SingleTrainer): def startup(self, context): namespace = "train.startup" - load_persistables = envs.get_global_env( - "single.load_persistables", False, namespace) + load_persistables = envs.get_global_env("single.load_persistables", + False, namespace) + persistables_model_path = envs.get_global_env( "single.persistables_model_path", "", namespace) - load_tree = envs.get_global_env( - "tree.load_tree", False, namespace) - self.tree_layer_path = envs.get_global_env( - "tree.tree_layer_path", "", namespace) - self.tree_travel_path = envs.get_global_env( - "tree.tree_travel_path", "", namespace) - self.tree_info_path = envs.get_global_env( - "tree.tree_info_path", "", namespace) - self.tree_emb_path = envs.get_global_env( - "tree.tree_emb_path", "", namespace) - - save_init_model = envs.get_global_env( - "single.save_init_model", False, namespace) - init_model_path = envs.get_global_env( - "single.init_model_path", "", namespace) + load_tree = envs.get_global_env("tree.load_tree", False, namespace) + + self.tree_layer_path = envs.get_global_env("tree.tree_layer_path", "", + namespace) + + self.tree_travel_path = envs.get_global_env("tree.tree_travel_path", + "", namespace) + + self.tree_info_path = envs.get_global_env("tree.tree_info_path", "", + namespace) + + self.tree_emb_path = envs.get_global_env("tree.tree_emb_path", "", + namespace) + + save_init_model = envs.get_global_env("single.save_init_model", False, + namespace) + init_model_path = envs.get_global_env("single.init_model_path", "", + namespace) self._exe.run(fluid.default_startup_program()) if load_persistables: @@ -68,7 +72,8 @@ class TDMSingleTrainer(SingleTrainer): if load_tree: # covert tree to tensor, set it into Fluid's variable. for param_name in special_param: - param_t = fluid.global_scope().find_var(param_name).get_tensor() + param_t = fluid.global_scope().find_var(param_name).get_tensor( + ) param_array = self._tdm_prepare(param_name) if param_name == 'TDM_Tree_Emb': param_t.set(param_array.astype('float32'), self._place) @@ -102,15 +107,15 @@ class TDMSingleTrainer(SingleTrainer): def _tdm_travel_prepare(self): """load tdm tree param from npy/list file""" travel_array = np.load(self.tree_travel_path) - logger.info("TDM Tree leaf node nums: {}".format( - travel_array.shape[0])) + logger.info("TDM Tree leaf node nums: {}".format(travel_array.shape[ + 0])) return travel_array def _tdm_emb_prepare(self): """load tdm tree param from npy/list file""" emb_array = np.load(self.tree_emb_path) - logger.info("TDM Tree node nums from emb: {}".format( - emb_array.shape[0])) + logger.info("TDM Tree node nums from emb: {}".format(emb_array.shape[ + 0])) return emb_array def _tdm_layer_prepare(self): diff --git a/core/trainers/transpiler_trainer.py b/core/trainers/transpiler_trainer.py index 81591056c94dc414fdeeba12d449f18aaaa0e216..c121b4abb624503936faca8e77902a97e3f0cf82 100755 --- a/core/trainers/transpiler_trainer.py +++ b/core/trainers/transpiler_trainer.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - """ Training use fluid with DistributeTranspiler """ @@ -23,6 +22,7 @@ from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import f from paddlerec.core.trainer import Trainer from paddlerec.core.utils import envs from paddlerec.core.utils import dataloader_instance +from paddlerec.core.reader import SlotReader class TranspileTrainer(Trainer): @@ -38,9 +38,12 @@ class TranspileTrainer(Trainer): self.increment_models = [] def processor_register(self): - print("Need implement by trainer, `self.regist_context_processor('uninit', self.instance)` must be the first") + print( + "Need implement by trainer, `self.regist_context_processor('uninit', self.instance)` must be the first" + ) def _get_dataloader(self, state="TRAIN"): + if state == "TRAIN": dataloader = self.model._data_loader namespace = "train.reader" @@ -50,14 +53,24 @@ class TranspileTrainer(Trainer): namespace = "evaluate.reader" class_name = "EvaluateReader" + sparse_slots = envs.get_global_env("sparse_slots", None, namespace) + dense_slots = envs.get_global_env("dense_slots", None, namespace) + batch_size = envs.get_global_env("batch_size", None, namespace) - reader_class = envs.get_global_env("class", None, namespace) print("batch_size: {}".format(batch_size)) - reader = dataloader_instance.dataloader( - reader_class, state, self._config_yaml) - reader_class = envs.lazy_instance_by_fliename(reader_class, class_name) - reader_ins = reader_class(self._config_yaml) + if sparse_slots is None and dense_slots is None: + reader_class = envs.get_global_env("class", None, namespace) + reader = dataloader_instance.dataloader(reader_class, state, + self._config_yaml) + reader_class = envs.lazy_instance_by_fliename(reader_class, + class_name) + reader_ins = reader_class(self._config_yaml) + else: + reader = dataloader_instance.slotdataloader("", state, + self._config_yaml) + reader_ins = SlotReader(self._config_yaml) + if hasattr(reader_ins, 'generate_batch_from_trainfiles'): dataloader.set_sample_list_generator(reader) else: @@ -85,27 +98,37 @@ class TranspileTrainer(Trainer): if state == "TRAIN": inputs = self.model.get_inputs() namespace = "train.reader" - train_data_path = envs.get_global_env( - "train_data_path", None, namespace) + train_data_path = envs.get_global_env("train_data_path", None, + namespace) else: inputs = self.model.get_infer_inputs() namespace = "evaluate.reader" - train_data_path = envs.get_global_env( - "test_data_path", None, namespace) + train_data_path = envs.get_global_env("test_data_path", None, + namespace) + + sparse_slots = envs.get_global_env("sparse_slots", None, namespace) + dense_slots = envs.get_global_env("dense_slots", None, namespace) threads = int(envs.get_runtime_environ("train.trainer.threads")) batch_size = envs.get_global_env("batch_size", None, namespace) reader_class = envs.get_global_env("class", None, namespace) abs_dir = os.path.dirname(os.path.abspath(__file__)) reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py') - pipe_cmd = "python {} {} {} {}".format( - reader, reader_class, state, self._config_yaml) + + if sparse_slots is None and dense_slots is None: + pipe_cmd = "python {} {} {} {}".format(reader, reader_class, state, + self._config_yaml) + else: + padding = envs.get_global_env("padding", 0, namespace) + pipe_cmd = "python {} {} {} {} {} {} {} {}".format( + reader, "slot", "slot", self._config_yaml, namespace, \ + sparse_slots.replace(" ", "#"), dense_slots.replace(" ", "#"), str(padding)) if train_data_path.startswith("paddlerec::"): package_base = envs.get_runtime_environ("PACKAGE_BASE") assert package_base is not None - train_data_path = os.path.join( - package_base, train_data_path.split("::")[1]) + train_data_path = os.path.join(package_base, + train_data_path.split("::")[1]) dataset = fluid.DatasetFactory().create_dataset() dataset.set_use_var(inputs) @@ -121,11 +144,11 @@ class TranspileTrainer(Trainer): debug_mode = envs.get_global_env("reader_debug_mode", False, namespace) if debug_mode: - print( - "--- Dataset Debug Mode Begin , show pre 10 data of {}---".format(file_list[0])) + print("--- Dataset Debug Mode Begin , show pre 10 data of {}---". + format(file_list[0])) os.system("cat {} | {} | head -10".format(file_list[0], pipe_cmd)) - print( - "--- Dataset Debug Mode End , show pre 10 data of {}---".format(file_list[0])) + print("--- Dataset Debug Mode End , show pre 10 data of {}---". + format(file_list[0])) exit(0) return dataset @@ -147,30 +170,29 @@ class TranspileTrainer(Trainer): if not need_save(epoch_id, save_interval, False): return - # print("save inference model is not supported now.") - # return - - feed_varnames = envs.get_global_env( - "save.inference.feed_varnames", None, namespace) + feed_varnames = envs.get_global_env("save.inference.feed_varnames", + None, namespace) fetch_varnames = envs.get_global_env( "save.inference.fetch_varnames", None, namespace) if feed_varnames is None or fetch_varnames is None: return - fetch_vars = [fluid.default_main_program().global_block().vars[varname] - for varname in fetch_varnames] - dirname = envs.get_global_env( - "save.inference.dirname", None, namespace) + fetch_vars = [ + fluid.default_main_program().global_block().vars[varname] + for varname in fetch_varnames + ] + dirname = envs.get_global_env("save.inference.dirname", None, + namespace) assert dirname is not None dirname = os.path.join(dirname, str(epoch_id)) if is_fleet: - fleet.save_inference_model( - self._exe, dirname, feed_varnames, fetch_vars) + fleet.save_inference_model(self._exe, dirname, feed_varnames, + fetch_vars) else: - fluid.io.save_inference_model( - dirname, feed_varnames, fetch_vars, self._exe) + fluid.io.save_inference_model(dirname, feed_varnames, + fetch_vars, self._exe) self.inference_models.append((epoch_id, dirname)) def save_persistables(): @@ -180,8 +202,8 @@ class TranspileTrainer(Trainer): if not need_save(epoch_id, save_interval, False): return - dirname = envs.get_global_env( - "save.increment.dirname", None, namespace) + dirname = envs.get_global_env("save.increment.dirname", None, + namespace) assert dirname is not None dirname = os.path.join(dirname, str(epoch_id)) @@ -259,10 +281,9 @@ class TranspileTrainer(Trainer): batch_id = 0 try: while True: - metrics_rets = self._exe.run( - program=program, - fetch_list=metrics_varnames, - return_numpy=is_return_numpy) + metrics_rets = self._exe.run(program=program, + fetch_list=metrics_varnames, + return_numpy=is_return_numpy) metrics = [epoch, batch_id] metrics.extend(metrics_rets) diff --git a/core/utils/dataloader_instance.py b/core/utils/dataloader_instance.py index 6234882f4d49191d3b55770078863910a356e9cd..8d4db6f82c05a41a0945f2c882caefd2a3c83d36 100755 --- a/core/utils/dataloader_instance.py +++ b/core/utils/dataloader_instance.py @@ -18,6 +18,7 @@ import os from paddlerec.core.utils.envs import lazy_instance_by_fliename from paddlerec.core.utils.envs import get_global_env from paddlerec.core.utils.envs import get_runtime_environ +from paddlerec.core.reader import SlotReader def dataloader(readerclass, train, yaml_file): @@ -62,3 +63,49 @@ def dataloader(readerclass, train, yaml_file): if hasattr(reader, 'generate_batch_from_trainfiles'): return gen_batch_reader() return gen_reader + + +def slotdataloader(readerclass, train, yaml_file): + if train == "TRAIN": + reader_name = "SlotReader" + namespace = "train.reader" + data_path = get_global_env("train_data_path", None, namespace) + else: + reader_name = "SlotReader" + namespace = "evaluate.reader" + data_path = get_global_env("test_data_path", None, namespace) + + if data_path.startswith("paddlerec::"): + package_base = get_runtime_environ("PACKAGE_BASE") + assert package_base is not None + data_path = os.path.join(package_base, data_path.split("::")[1]) + + files = [str(data_path) + "/%s" % x for x in os.listdir(data_path)] + + sparse = get_global_env("sparse_slots", None, namespace) + dense = get_global_env("dense_slots", None, namespace) + padding = get_global_env("padding", 0, namespace) + reader = SlotReader(yaml_file) + reader.init(sparse, dense, int(padding)) + + def gen_reader(): + for file in files: + with open(file, 'r') as f: + for line in f: + line = line.rstrip('\n') + iter = reader.generate_sample(line) + for parsed_line in iter(): + if parsed_line is None: + continue + else: + values = [] + for pased in parsed_line: + values.append(pased[1]) + yield values + + def gen_batch_reader(): + return reader.generate_batch_from_trainfiles(files) + + if hasattr(reader, 'generate_batch_from_trainfiles'): + return gen_batch_reader() + return gen_reader diff --git a/core/utils/dataset_holder.py b/core/utils/dataset_holder.py index cd195450336cac0265f76670ca0e3fa24c45a7ba..a75d52b60440f924acbb45ff7ff9125eaa121e36 100755 --- a/core/utils/dataset_holder.py +++ b/core/utils/dataset_holder.py @@ -24,7 +24,7 @@ from paddlerec.core.utils import util as util class DatasetHolder(object): """ - Dataset Base + Dataset Holder """ __metaclass__ = abc.ABCMeta @@ -74,11 +74,17 @@ class TimeSplitDatasetHolder(DatasetHolder): Dataset.__init__(self, config) if 'data_donefile' not in config or config['data_donefile'] is None: config['data_donefile'] = config['data_path'] + "/to.hadoop.done" - self._path_generator = util.PathGenerator({'templates': [ - {'name': 'data_path', 'template': config['data_path']}, - {'name': 'donefile_path', 'template': config['data_donefile']} - ]}) - self._split_interval = config['split_interval'] # data split N mins per dir + self._path_generator = util.PathGenerator({ + 'templates': [{ + 'name': 'data_path', + 'template': config['data_path'] + }, { + 'name': 'donefile_path', + 'template': config['data_donefile'] + }] + }) + self._split_interval = config[ + 'split_interval'] # data split N mins per dir self._data_file_handler = fs.FileHandler(config) def _format_data_time(self, daytime_str, time_window_mins): @@ -91,7 +97,8 @@ class TimeSplitDatasetHolder(DatasetHolder): return None, 0 if mins_of_day % self._split_interval != 0: - skip_mins = self._split_interval - (mins_of_day % self._split_interval) + skip_mins = self._split_interval - (mins_of_day % + self._split_interval) data_time = data_time + datetime.timedelta(minutes=skip_mins) time_window_mins = time_window_mins - skip_mins return data_time, time_window_mins @@ -106,17 +113,24 @@ class TimeSplitDatasetHolder(DatasetHolder): True/False """ is_ready = True - data_time, windows_mins = self._format_data_time(daytime_str, time_window_mins) + data_time, windows_mins = self._format_data_time(daytime_str, + time_window_mins) while time_window_mins > 0: - file_path = self._path_generator.generate_path('donefile_path', {'time_format': data_time}) + file_path = self._path_generator.generate_path( + 'donefile_path', {'time_format': data_time}) if not self._data_file_handler.is_exist(file_path): is_ready = False break time_window_mins = time_window_mins - self._split_interval - data_time = data_time + datetime.timedelta(minutes=self._split_interval) + data_time = data_time + datetime.timedelta( + minutes=self._split_interval) return is_ready - def get_file_list(self, daytime_str, time_window_mins, node_num=1, node_idx=0): + def get_file_list(self, + daytime_str, + time_window_mins, + node_num=1, + node_idx=0): """ data in [daytime_str, daytime_str + time_window_mins], random shard to node_num, return shard[node_idx] Args: @@ -128,26 +142,32 @@ class TimeSplitDatasetHolder(DatasetHolder): list, data_shard[node_idx] """ data_file_list = [] - data_time, windows_mins = self._format_data_time(daytime_str, time_window_mins) + data_time, windows_mins = self._format_data_time(daytime_str, + time_window_mins) while time_window_mins > 0: - file_path = self._path_generator.generate_path('data_path', {'time_format': data_time}) + file_path = self._path_generator.generate_path( + 'data_path', {'time_format': data_time}) sub_file_list = self._data_file_handler.ls(file_path) for sub_file in sub_file_list: sub_file_name = self._data_file_handler.get_file_name(sub_file) - if not sub_file_name.startswith(self._config['filename_prefix']): + if not sub_file_name.startswith(self._config[ + 'filename_prefix']): continue if hash(sub_file_name) % node_num == node_idx: data_file_list.append(sub_file) time_window_mins = time_window_mins - self._split_interval - data_time = data_time + datetime.timedelta(minutes=self._split_interval) + data_time = data_time + datetime.timedelta( + minutes=self._split_interval) return data_file_list def _alloc_dataset(self, file_list): """ """ - dataset = fluid.DatasetFactory().create_dataset(self._config['dataset_type']) + dataset = fluid.DatasetFactory().create_dataset(self._config[ + 'dataset_type']) dataset.set_batch_size(self._config['batch_size']) dataset.set_thread(self._config['load_thread']) - dataset.set_hdfs_config(self._config['fs_name'], self._config['fs_ugi']) + dataset.set_hdfs_config(self._config['fs_name'], + self._config['fs_ugi']) dataset.set_pipe_command(self._config['data_converter']) dataset.set_filelist(file_list) dataset.set_use_var(self._config['data_vars']) @@ -163,7 +183,9 @@ class TimeSplitDatasetHolder(DatasetHolder): while self.check_ready(begin_time, windown_min) == False: print("dataset not ready, time:" + begin_time) time.sleep(30) - file_list = self.get_file_list(begin_time, windown_min, params['node_num'], params['node_idx']) + file_list = self.get_file_list(begin_time, windown_min, + params['node_num'], + params['node_idx']) self._datasets[begin_time] = self._alloc_dataset(file_list) self._datasets[begin_time].load_into_memory() else: @@ -176,9 +198,12 @@ class TimeSplitDatasetHolder(DatasetHolder): windown_min = params['time_window_min'] if begin_time not in self._datasets: if self.check_ready(begin_time, windown_min): - file_list = self.get_file_list(begin_time, windown_min, params['node_num'], params['node_idx']) + file_list = self.get_file_list(begin_time, windown_min, + params['node_num'], + params['node_idx']) self._datasets[begin_time] = self._alloc_dataset(file_list) - self._datasets[begin_time].preload_into_memory(self._config['preload_thread']) + self._datasets[begin_time].preload_into_memory(self._config[ + 'preload_thread']) return True return False diff --git a/core/utils/dataset_instance.py b/core/utils/dataset_instance.py index 731b3b47169d9e67735953c8488469d4d60cb296..2e6082dc5e381b6ac2fc46f7fb6fbe73d4214b69 100755 --- a/core/utils/dataset_instance.py +++ b/core/utils/dataset_instance.py @@ -16,19 +16,34 @@ from __future__ import print_function import sys from paddlerec.core.utils.envs import lazy_instance_by_fliename +from paddlerec.core.reader import SlotReader -if len(sys.argv) != 4: - raise ValueError("reader only accept 3 argument: 1. reader_class 2.train/evaluate 3.yaml_abs_path") +if len(sys.argv) < 4: + raise ValueError( + "reader only accept 3 argument: 1. reader_class 2.train/evaluate/slotreader 3.yaml_abs_path" + ) reader_package = sys.argv[1] -if sys.argv[2] == "TRAIN": +if sys.argv[2].upper() == "TRAIN": reader_name = "TrainReader" -else: +elif sys.argv[2].upper() == "EVALUATE": reader_name = "EvaluateReader" +else: + reader_name = "SlotReader" + namespace = sys.argv[4] + sparse_slots = sys.argv[5].replace("#", " ") + dense_slots = sys.argv[6].replace("#", " ") + padding = int(sys.argv[7]) yaml_abs_path = sys.argv[3] -reader_class = lazy_instance_by_fliename(reader_package, reader_name) -reader = reader_class(yaml_abs_path) -reader.init() -reader.run_from_stdin() + +if reader_name != "SlotReader": + reader_class = lazy_instance_by_fliename(reader_package, reader_name) + reader = reader_class(yaml_abs_path) + reader.init() + reader.run_from_stdin() +else: + reader = SlotReader(yaml_abs_path) + reader.init(sparse_slots, dense_slots, padding) + reader.run_from_stdin() diff --git a/core/utils/envs.py b/core/utils/envs.py index 7093d897e780c525e91516a0058bc90319d4e918..bc222e906448435031024281a0a80298073d3979 100755 --- a/core/utils/envs.py +++ b/core/utils/envs.py @@ -95,7 +95,7 @@ def path_adapter(path): l_p = path.split("paddlerec.")[1].replace(".", "/") return os.path.join(package, l_p) else: - return path + return path def windows_path_converter(path): @@ -159,8 +159,8 @@ def pretty_print_envs(envs, header=None): def lazy_instance_by_package(package, class_name): models = get_global_env("train.model.models") - model_package = __import__( - package, globals(), locals(), package.split(".")) + model_package = __import__(package, + globals(), locals(), package.split(".")) instance = getattr(model_package, class_name) return instance @@ -170,8 +170,8 @@ def lazy_instance_by_fliename(abs, class_name): sys.path.append(dirname) package = os.path.splitext(os.path.basename(abs))[0] - model_package = __import__( - package, globals(), locals(), package.split(".")) + model_package = __import__(package, + globals(), locals(), package.split(".")) instance = getattr(model_package, class_name) return instance @@ -189,8 +189,7 @@ def get_platform(): def find_free_port(): def __free_port(): - with closing(socket.socket(socket.AF_INET, - socket.SOCK_STREAM)) as s: + with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: s.bind(('', 0)) return s.getsockname()[1] diff --git a/core/utils/fs.py b/core/utils/fs.py index 836c6f598b9c423b0922e30f536a669c55e83098..fab84496c5761e4214f4e5bb3666960408abf68c 100755 --- a/core/utils/fs.py +++ b/core/utils/fs.py @@ -18,7 +18,7 @@ from paddle.fluid.incubate.fleet.utils.hdfs import HDFSClient def is_afs_path(path): - """R + """is_afs_path """ if path.startswith("afs") or path.startswith("hdfs"): return True @@ -133,8 +133,9 @@ class FileHandler(object): if mode.find('a') >= 0: org_content = self._hdfs_client.cat(dest_path) content = content + org_content - self._local_fs_client.write(content, temp_local_file, - mode) # fleet hdfs_client only support upload, so write tmp file + self._local_fs_client.write( + content, temp_local_file, mode + ) # fleet hdfs_client only support upload, so write tmp file self._hdfs_client.delete(dest_path + ".tmp") self._hdfs_client.upload(dest_path + ".tmp", temp_local_file) self._hdfs_client.delete(dest_path + ".bak") @@ -158,7 +159,8 @@ class FileHandler(object): files = [] if is_afs_path(path): files = self._hdfs_client.ls(path) - files = [path + '/' + self.get_file_name(fi) for fi in files] # absulte path + files = [path + '/' + self.get_file_name(fi) + for fi in files] # absulte path else: files = self._local_fs_client.ls(path) files = [path + '/' + fi for fi in files] # absulte path diff --git a/core/utils/util.py b/core/utils/util.py index bd63284873b6c6be80c9849f40535cebe1b7fb14..34f26c6d113faf4739ff621d1087da475414c46f 100755 --- a/core/utils/util.py +++ b/core/utils/util.py @@ -22,6 +22,7 @@ from paddlerec.core.utils import fs as fs def save_program_proto(path, program=None): + if program is None: _program = fluid.default_main_program() else: @@ -175,7 +176,8 @@ class PathGenerator(object): """ if template_name in self._templates: if 'time_format' in param: - str = param['time_format'].strftime(self._templates[template_name]) + str = param['time_format'].strftime(self._templates[ + template_name]) return str.format(**param) return self._templates[template_name].format(**param) else: @@ -198,31 +200,39 @@ class TimeTrainPass(object): self._begin_day = make_datetime(day_fields[0].strip()) if len(day_fields) == 1 or len(day_fields[1]) == 0: # 100 years, meaning to continuous running - self._end_day = self._begin_day + datetime.timedelta(days=36500) + self._end_day = self._begin_day + datetime.timedelta( + days=36500) else: # example: 2020212+10 run_day = int(day_fields[1].strip()) - self._end_day = self._begin_day + datetime.timedelta(days=run_day) + self._end_day = self._begin_day + datetime.timedelta( + days=run_day) else: # example: {20191001..20191031} - days = os.popen("echo -n " + self._config['days']).read().split(" ") + days = os.popen("echo -n " + self._config['days']).read().split( + " ") self._begin_day = make_datetime(days[0]) self._end_day = make_datetime(days[len(days) - 1]) self._checkpoint_interval = self._config['checkpoint_interval'] self._dump_inference_interval = self._config['dump_inference_interval'] - self._interval_per_pass = self._config['train_time_interval'] # train N min data per pass + self._interval_per_pass = self._config[ + 'train_time_interval'] # train N min data per pass self._pass_id = 0 self._inference_pass_id = 0 self._pass_donefile_handler = None if 'pass_donefile_name' in self._config: - self._train_pass_donefile = global_config['output_path'] + '/' + self._config['pass_donefile_name'] + self._train_pass_donefile = global_config[ + 'output_path'] + '/' + self._config['pass_donefile_name'] if fs.is_afs_path(self._train_pass_donefile): - self._pass_donefile_handler = fs.FileHandler(global_config['io']['afs']) + self._pass_donefile_handler = fs.FileHandler(global_config[ + 'io']['afs']) else: - self._pass_donefile_handler = fs.FileHandler(global_config['io']['local_fs']) + self._pass_donefile_handler = fs.FileHandler(global_config[ + 'io']['local_fs']) - last_done = self._pass_donefile_handler.cat(self._train_pass_donefile).strip().split('\n')[-1] + last_done = self._pass_donefile_handler.cat( + self._train_pass_donefile).strip().split('\n')[-1] done_fileds = last_done.split('\t') if len(done_fileds) > 4: self._base_key = done_fileds[1] @@ -236,15 +246,18 @@ class TimeTrainPass(object): """ return 24 * 60 / self._interval_per_pass - def save_train_progress(self, day, pass_id, base_key, model_path, is_checkpoint): + def save_train_progress(self, day, pass_id, base_key, model_path, + is_checkpoint): """R """ if is_checkpoint: self._checkpoint_pass_id = pass_id self._checkpoint_model_path = model_path - done_content = "%s\t%s\t%s\t%s\t%d\n" % (day, base_key, - self._checkpoint_model_path, self._checkpoint_pass_id, pass_id) - self._pass_donefile_handler.write(done_content, self._train_pass_donefile, 'a') + done_content = "%s\t%s\t%s\t%s\t%d\n" % ( + day, base_key, self._checkpoint_model_path, + self._checkpoint_pass_id, pass_id) + self._pass_donefile_handler.write(done_content, + self._train_pass_donefile, 'a') pass def init_pass_by_id(self, date_str, pass_id): @@ -286,12 +299,14 @@ class TimeTrainPass(object): if self._pass_id < 1: self.init_pass_by_time(self._begin_day.strftime("%Y%m%d%H%M")) else: - next_time = self._current_train_time + datetime.timedelta(minutes=self._interval_per_pass) + next_time = self._current_train_time + datetime.timedelta( + minutes=self._interval_per_pass) if (next_time - self._end_day).total_seconds() > 0: has_next = False else: self.init_pass_by_time(next_time.strftime("%Y%m%d%H%M")) - if has_next and (self._inference_pass_id < self._pass_id or self._pass_id < old_pass_id): + if has_next and (self._inference_pass_id < self._pass_id or + self._pass_id < old_pass_id): self._inference_pass_id = self._pass_id - 1 return has_next @@ -319,9 +334,11 @@ class TimeTrainPass(object): Return: date(current_train_time + delta_day) """ - return (self._current_train_time + datetime.timedelta(days=delta_day)).strftime("%Y%m%d") + return (self._current_train_time + datetime.timedelta(days=delta_day) + ).strftime("%Y%m%d") def timestamp(self, delta_day=0): """R """ - return (self._current_train_time + datetime.timedelta(days=delta_day)).timestamp() + return (self._current_train_time + datetime.timedelta(days=delta_day) + ).timestamp() diff --git a/doc/__init__.py b/doc/__init__.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..abf198b97e6e818e1fbe59006f98492640bcee54 100755 --- a/doc/__init__.py +++ b/doc/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/doc/contribute.md b/doc/contribute.md index a9bd1910021e78573f1d9fd99f66404b77927737..26770d8ac0b64e9835f7398768d0f81de9383132 100644 --- a/doc/contribute.md +++ b/doc/contribute.md @@ -1,2 +1,2 @@ # PaddleRec 贡献代码 -> 占位 \ No newline at end of file +> 占位 diff --git a/doc/custom_dataset_reader.md b/doc/custom_dataset_reader.md index 7f29c21c3a32e2cf17b775741707e4ba83e90373..82b0fd12d4f52fe83155fd371f6041be52c8bcba 100644 --- a/doc/custom_dataset_reader.md +++ b/doc/custom_dataset_reader.md @@ -1,3 +1,73 @@ +# PaddleRec 推荐数据集格式 + +当你的数据集格式为[slot:feasign]*这种模式,或者可以预处理为这种格式时,可以直接使用PaddleRec内置的Reader。 +好处是不用自己写Reader了,各个model之间的数据格式也都可以统一成一样的格式。 + +## 数据格式说明 + +假如你的原始数据格式为 + +```bash +