From f9ef376e0c94b00351240833eba63c31e648cd9b Mon Sep 17 00:00:00 2001 From: tangwei Date: Wed, 20 May 2020 14:20:59 +0800 Subject: [PATCH] fix code style --- core/modules/modul/build.py | 76 ++++++++++----- core/modules/modul/layers.py | 113 +++++++++++++++++------ core/trainer.py | 6 +- core/trainers/cluster_trainer.py | 41 ++++---- core/trainers/online_learning_trainer.py | 42 +++++---- core/trainers/single_trainer.py | 30 +++--- core/trainers/tdm_cluster_trainer.py | 40 ++++---- core/trainers/tdm_single_trainer.py | 55 ++++++----- core/trainers/transpiler_trainer.py | 73 ++++++++------- core/utils/dataset_holder.py | 65 +++++++++---- core/utils/dataset_instance.py | 5 +- core/utils/envs.py | 13 ++- core/utils/util.py | 51 ++++++---- 13 files changed, 383 insertions(+), 227 deletions(-) diff --git a/core/modules/modul/build.py b/core/modules/modul/build.py index 0263cbf6..dae77717 100755 --- a/core/modules/modul/build.py +++ b/core/modules/modul/build.py @@ -31,6 +31,7 @@ def create(config): Model Instance """ model = None + if config['mode'] == 'fluid': model = YamlModel(config) model.train_net() @@ -50,7 +51,12 @@ class YamlModel(Model): f = open(config['layer_file'], 'r') self._build_nodes = yaml.safe_load(f.read()) self._build_phase = ['input', 'param', 'summary', 'layer'] - self._build_param = {'layer': {}, 'inner_layer': {}, 'layer_extend': {}, 'model': {}} + self._build_param = { + 'layer': {}, + 'inner_layer': {}, + 'layer_extend': {}, + 'model': {} + } self._inference_meta = {'dependency': {}, 'params': {}} def train_net(self): @@ -76,10 +82,12 @@ class YamlModel(Model): if self._build_nodes[phase] is None: continue for node in self._build_nodes[phase]: - exec("""layer=layer.{}(node)""".format(node['class'])) - layer_output, extend_output = layer.generate(self._config['mode'], self._build_param) + exec ("""layer=layer.{}(node)""".format(node['class'])) + layer_output, extend_output = layer.generate( + self._config['mode'], self._build_param) self._build_param['layer'][node['name']] = layer_output - self._build_param['layer_extend'][node['name']] = extend_output + self._build_param['layer_extend'][node[ + 'name']] = extend_output if extend_output is None: continue if 'loss' in extend_output: @@ -89,17 +97,24 @@ class YamlModel(Model): self._cost += extend_output['loss'] if 'data_var' in extend_output: self._data_var += extend_output['data_var'] - if 'metric_label' in extend_output and extend_output['metric_label'] is not None: - self._metrics[extend_output['metric_label']] = extend_output['metric_dict'] + if 'metric_label' in extend_output and extend_output[ + 'metric_label'] is not None: + self._metrics[extend_output[ + 'metric_label']] = extend_output['metric_dict'] if 'inference_param' in extend_output: inference_param = extend_output['inference_param'] param_name = inference_param['name'] if param_name not in self._build_param['table']: - self._build_param['table'][param_name] = {'params': []} - table_meta = table.TableMeta.alloc_new_table(inference_param['table_id']) - self._build_param['table'][param_name]['_meta'] = table_meta - self._build_param['table'][param_name]['params'] += inference_param['params'] + self._build_param['table'][param_name] = { + 'params': [] + } + table_meta = table.TableMeta.alloc_new_table( + inference_param['table_id']) + self._build_param['table'][param_name][ + '_meta'] = table_meta + self._build_param['table'][param_name][ + 'params'] += inference_param['params'] pass @classmethod @@ -114,20 +129,25 @@ class YamlModel(Model): metrics = params['metrics'] for name in metrics: model_metrics = metrics[name] - stat_var_names += [model_metrics[metric]['var'].name for metric in model_metrics] + stat_var_names += [ + model_metrics[metric]['var'].name + for metric in model_metrics + ] strategy['stat_var_names'] = list(set(stat_var_names)) optimizer_generator = 'optimizer = fluid.optimizer.' + optimizer_conf['class'] + \ '(learning_rate=' + str(optimizer_conf['learning_rate']) + ')' - exec(optimizer_generator) + exec (optimizer_generator) optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy) return optimizer def dump_model_program(self, path): """R """ - with open(path + '/' + self._name + '_main_program.pbtxt', "w") as fout: + with open(path + '/' + self._name + '_main_program.pbtxt', + "w") as fout: print >> fout, self._build_param['model']['train_program'] - with open(path + '/' + self._name + '_startup_program.pbtxt', "w") as fout: + with open(path + '/' + self._name + '_startup_program.pbtxt', + "w") as fout: print >> fout, self._build_param['model']['startup_program'] pass @@ -137,7 +157,8 @@ class YamlModel(Model): scope = params['scope'] decay = params['decay'] for param_table in self._build_param['table']: - table_id = self._build_param['table'][param_table]['_meta']._table_id + table_id = self._build_param['table'][param_table][ + '_meta']._table_id fleet.shrink_dense_table(decay, scope=scope, table_id=table_id) def dump_inference_program(self, inference_layer, path): @@ -152,17 +173,25 @@ class YamlModel(Model): executor = params['executor'] program = self._build_param['model']['train_program'] for table_name, table in self._build_param['table'].items(): - fleet._fleet_ptr.pull_dense(scope, table['_meta']._table_id, table['params']) + fleet._fleet_ptr.pull_dense(scope, table['_meta']._table_id, + table['params']) for infernce_item in params['inference_list']: - params_name_list = self.inference_params(infernce_item['layer_name']) - params_var_list = [program.global_block().var(i) for i in params_name_list] + params_name_list = self.inference_params(infernce_item[ + 'layer_name']) + params_var_list = [ + program.global_block().var(i) for i in params_name_list + ] params_file_name = infernce_item['save_file_name'] with fluid.scope_guard(scope): if params['save_combine']: fluid.io.save_vars(executor, "./", \ program, vars=params_var_list, filename=params_file_name) else: - fluid.io.save_vars(executor, params_file_name, program, vars=params_var_list) + fluid.io.save_vars( + executor, + params_file_name, + program, + vars=params_var_list) def inference_params(self, inference_layer): """ @@ -177,11 +206,13 @@ class YamlModel(Model): return self._inference_meta['params'][layer] self._inference_meta['params'][layer] = [] - self._inference_meta['dependency'][layer] = self.get_dependency(self._build_param['inner_layer'], layer) + self._inference_meta['dependency'][layer] = self.get_dependency( + self._build_param['inner_layer'], layer) for node in self._build_nodes['layer']: if node['name'] not in self._inference_meta['dependency'][layer]: continue - if 'inference_param' in self._build_param['layer_extend'][node['name']]: + if 'inference_param' in self._build_param['layer_extend'][node[ + 'name']]: self._inference_meta['params'][layer] += \ self._build_param['layer_extend'][node['name']]['inference_param']['params'] return self._inference_meta['params'][layer] @@ -199,5 +230,6 @@ class YamlModel(Model): dependencys = copy.deepcopy(layer_graph[dest_layer]['input']) dependency_list = copy.deepcopy(dependencys) for dependency in dependencys: - dependency_list = dependency_list + self.get_dependency(layer_graph, dependency) + dependency_list = dependency_list + self.get_dependency( + layer_graph, dependency) return list(set(dependency_list)) diff --git a/core/modules/modul/layers.py b/core/modules/modul/layers.py index 060c023f..008ce6e4 100755 --- a/core/modules/modul/layers.py +++ b/core/modules/modul/layers.py @@ -18,7 +18,7 @@ from paddlerec.core.layer import Layer class EmbeddingFuseLayer(Layer): - """R + """embedding + sequence + concat """ def __init__(self, config): @@ -40,7 +40,8 @@ class EmbeddingFuseLayer(Layer): show_clk.stop_gradient = True data_var = [] for slot in self._slots: - l = fluid.layers.data(name=slot, shape=[1], dtype="int64", lod_level=1) + l = fluid.layers.data( + name=slot, shape=[1], dtype="int64", lod_level=1) data_var.append(l) emb = fluid.layers.embedding(input=l, size=[10, self._emb_dim], \ is_sparse=True, is_distributed=True, @@ -48,7 +49,8 @@ class EmbeddingFuseLayer(Layer): emb = fluid.layers.sequence_pool(input=emb, pool_type='sum') emb = fluid.layers.continuous_value_model(emb, show_clk, self._cvm) self._emb_layers.append(emb) - output = fluid.layers.concat(input=self._emb_layers, axis=1, name=self._name) + output = fluid.layers.concat( + input=self._emb_layers, axis=1, name=self._name) return output, {'data_var': data_var} @@ -111,7 +113,13 @@ class ParamLayer(Layer): def generate(self, param): """R """ - return self._config, {'inference_param': {'name': 'param', 'params': [], 'table_id': self._table_id}} + return self._config, { + 'inference_param': { + 'name': 'param', + 'params': [], + 'table_id': self._table_id + } + } class SummaryLayer(Layer): @@ -129,7 +137,13 @@ class SummaryLayer(Layer): def generate(self, param): """R """ - return self._config, {'inference_param': {'name': 'summary', 'params': [], 'table_id': self._table_id}} + return self._config, { + 'inference_param': { + 'name': 'summary', + 'params': [], + 'table_id': self._table_id + } + } class NormalizationLayer(Layer): @@ -152,9 +166,19 @@ class NormalizationLayer(Layer): if len(self._input) > 0: input_list = [param['layer'][i] for i in self._input] input_layer = fluid.layers.concat(input=input_list, axis=1) - bn = fluid.layers.data_norm(input=input_layer, name=self._name, epsilon=1e-4, param_attr={ - "batch_size": 1e4, "batch_sum_default": 0.0, "batch_square": 1e4}) - inference_param = [self._name + '.batch_size', self._name + '.batch_sum', self._name + '.batch_square_sum'] + bn = fluid.layers.data_norm( + input=input_layer, + name=self._name, + epsilon=1e-4, + param_attr={ + "batch_size": 1e4, + "batch_sum_default": 0.0, + "batch_square": 1e4 + }) + inference_param = [ + self._name + '.batch_size', self._name + '.batch_sum', + self._name + '.batch_square_sum' + ] return bn, {'inference_param': {'name': 'summary', \ 'params': inference_param, 'table_id': summary_layer.get('table_id', -1)}} @@ -181,11 +205,13 @@ class FCLayer(Layer): input_list = [param['layer'][i] for i in self._input] input_layer = fluid.layers.concat(input=input_list, axis=1) input_coln = input_layer.shape[1] - scale = param_layer['init_range'] / (input_coln ** 0.5) + scale = param_layer['init_range'] / (input_coln**0.5) bias = None if self._bias: - bias = fluid.ParamAttr(learning_rate=1.0, - initializer=fluid.initializer.NormalInitializer(loc=0.0, scale=scale)) + bias = fluid.ParamAttr( + learning_rate=1.0, + initializer=fluid.initializer.NormalInitializer( + loc=0.0, scale=scale)) fc = fluid.layers.fc( name=self._name, input=input_layer, @@ -216,18 +242,46 @@ class LogLossLayer(Layer): self._extend_output = { 'metric_label': self._metric_label, 'metric_dict': { - 'auc': {'var': None}, - 'batch_auc': {'var': None}, - 'stat_pos': {'var': None, 'data_type': 'int64'}, - 'stat_neg': {'var': None, 'data_type': 'int64'}, - 'batch_stat_pos': {'var': None, 'data_type': 'int64'}, - 'batch_stat_neg': {'var': None, 'data_type': 'int64'}, - 'pos_ins_num': {'var': None}, - 'abserr': {'var': None}, - 'sqrerr': {'var': None}, - 'prob': {'var': None}, - 'total_ins_num': {'var': None}, - 'q': {'var': None} + 'auc': { + 'var': None + }, + 'batch_auc': { + 'var': None + }, + 'stat_pos': { + 'var': None, + 'data_type': 'int64' + }, + 'stat_neg': { + 'var': None, + 'data_type': 'int64' + }, + 'batch_stat_pos': { + 'var': None, + 'data_type': 'int64' + }, + 'batch_stat_neg': { + 'var': None, + 'data_type': 'int64' + }, + 'pos_ins_num': { + 'var': None + }, + 'abserr': { + 'var': None + }, + 'sqrerr': { + 'var': None + }, + 'prob': { + 'var': None + }, + 'total_ins_num': { + 'var': None + }, + 'q': { + 'var': None + } } } @@ -236,9 +290,12 @@ class LogLossLayer(Layer): """ input_layer = param['layer'][self._input[0]] label_layer = param['layer'][self._label] - output = fluid.layers.clip(input_layer, self._bound[0], self._bound[1], name=self._name) + output = fluid.layers.clip( + input_layer, self._bound[0], self._bound[1], name=self._name) norm = fluid.layers.sigmoid(output, name=self._name) - output = fluid.layers.log_loss(norm, fluid.layers.cast(x=label_layer, dtype='float32')) + output = fluid.layers.log_loss( + norm, fluid.layers.cast( + x=label_layer, dtype='float32')) if self._weight: weight_layer = param['layer'][self._weight] output = fluid.layers.elementwise_mul(output, weight_layer) @@ -248,7 +305,11 @@ class LogLossLayer(Layer): # For AUC Metric metric = self._extend_output['metric_dict'] binary_predict = fluid.layers.concat( - input=[fluid.layers.elementwise_sub(fluid.layers.ceil(norm), norm), norm], axis=1) + input=[ + fluid.layers.elementwise_sub(fluid.layers.ceil(norm), norm), + norm + ], + axis=1) metric['auc']['var'], metric['batch_auc']['var'], [metric['batch_stat_pos']['var'], \ metric['batch_stat_neg']['var'], metric['stat_pos']['var'], metric['stat_neg']['var']] = \ diff --git a/core/trainer.py b/core/trainer.py index 40fc35de..b7c22ea8 100755 --- a/core/trainer.py +++ b/core/trainer.py @@ -30,8 +30,10 @@ class Trainer(object): def __init__(self, config=None): self._status_processor = {} + self._place = fluid.CPUPlace() self._exe = fluid.Executor(self._place) + self._exector_context = {} self._context = {'status': 'uninit', 'is_exit': False} self._config_yaml = config @@ -95,6 +97,6 @@ def user_define_engine(engine_yaml): train_dirname = os.path.dirname(train_location) base_name = os.path.splitext(os.path.basename(train_location))[0] sys.path.append(train_dirname) - trainer_class = envs.lazy_instance_by_fliename( - base_name, "UserDefineTraining") + trainer_class = envs.lazy_instance_by_fliename(base_name, + "UserDefineTraining") return trainer_class diff --git a/core/trainers/cluster_trainer.py b/core/trainers/cluster_trainer.py index faa96035..792b897f 100755 --- a/core/trainers/cluster_trainer.py +++ b/core/trainers/cluster_trainer.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - """ Training use fluid with one node only. """ @@ -43,11 +42,14 @@ class ClusterTrainer(TranspileTrainer): self.regist_context_processor('uninit', self.instance) self.regist_context_processor('init_pass', self.init) self.regist_context_processor('startup_pass', self.startup) - if envs.get_platform() == "LINUX" and envs.get_global_env("dataset_class", None, "train.reader") != "DataLoader": + + if envs.get_platform() == "LINUX" and envs.get_global_env( + "dataset_class", None, "train.reader") != "DataLoader": self.regist_context_processor('train_pass', self.dataset_train) else: - self.regist_context_processor( - 'train_pass', self.dataloader_train) + self.regist_context_processor('train_pass', + self.dataloader_train) + self.regist_context_processor('infer_pass', self.infer) self.regist_context_processor('terminal_pass', self.terminal) @@ -75,8 +77,8 @@ class ClusterTrainer(TranspileTrainer): def init(self, context): self.model.train_net() optimizer = self.model.optimizer() - optimizer_name = envs.get_global_env( - "hyper_parameters.optimizer", None, "train.model") + optimizer_name = envs.get_global_env("hyper_parameters.optimizer", + None, "train.model") if optimizer_name not in ["", "sgd", "SGD", "Sgd"]: os.environ["FLAGS_communicator_is_sgd_optimizer"] = '0' @@ -114,9 +116,9 @@ class ClusterTrainer(TranspileTrainer): program = fluid.compiler.CompiledProgram( fleet.main_program).with_data_parallel( - loss_name=self.model.get_avg_cost().name, - build_strategy=self.strategy.get_build_strategy(), - exec_strategy=self.strategy.get_execute_strategy()) + loss_name=self.model.get_avg_cost().name, + build_strategy=self.strategy.get_build_strategy(), + exec_strategy=self.strategy.get_execute_strategy()) metrics_varnames = [] metrics_format = [] @@ -135,9 +137,8 @@ class ClusterTrainer(TranspileTrainer): batch_id = 0 try: while True: - metrics_rets = self._exe.run( - program=program, - fetch_list=metrics_varnames) + metrics_rets = self._exe.run(program=program, + fetch_list=metrics_varnames) metrics = [epoch, batch_id] metrics.extend(metrics_rets) @@ -162,14 +163,16 @@ class ClusterTrainer(TranspileTrainer): for i in range(epochs): begin_time = time.time() - self._exe.train_from_dataset(program=fluid.default_main_program(), - dataset=dataset, - fetch_list=self.fetch_vars, - fetch_info=self.fetch_alias, - print_period=self.fetch_period) + self._exe.train_from_dataset( + program=fluid.default_main_program(), + dataset=dataset, + fetch_list=self.fetch_vars, + fetch_info=self.fetch_alias, + print_period=self.fetch_period) end_time = time.time() - times = end_time-begin_time - print("epoch {} using time {}, speed {:.2f} lines/s".format(i, times, ins/times)) + times = end_time - begin_time + print("epoch {} using time {}, speed {:.2f} lines/s".format( + i, times, ins / times)) self.save(i, "train", is_fleet=True) fleet.stop_worker() diff --git a/core/trainers/online_learning_trainer.py b/core/trainers/online_learning_trainer.py index 0303e96a..b2856844 100755 --- a/core/trainers/online_learning_trainer.py +++ b/core/trainers/online_learning_trainer.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - """ Training use fluid with one node only. """ @@ -44,11 +43,14 @@ class OnlineLearningTrainer(TranspileTrainer): self.regist_context_processor('uninit', self.instance) self.regist_context_processor('init_pass', self.init) self.regist_context_processor('startup_pass', self.startup) - if envs.get_platform() == "LINUX" and envs.get_global_env("dataset_class", None, "train.reader") != "DataLoader": + + if envs.get_platform() == "LINUX" and envs.get_global_env( + "dataset_class", None, "train.reader") != "DataLoader": self.regist_context_processor('train_pass', self.dataset_train) else: - self.regist_context_processor( - 'train_pass', self.dataloader_train) + self.regist_context_processor('train_pass', + self.dataloader_train) + self.regist_context_processor('infer_pass', self.infer) self.regist_context_processor('terminal_pass', self.terminal) @@ -110,27 +112,27 @@ class OnlineLearningTrainer(TranspileTrainer): if state == "TRAIN": inputs = self.model.get_inputs() namespace = "train.reader" - train_data_path = envs.get_global_env( - "train_data_path", None, namespace) + train_data_path = envs.get_global_env("train_data_path", None, + namespace) else: inputs = self.model.get_infer_inputs() namespace = "evaluate.reader" - train_data_path = envs.get_global_env( - "test_data_path", None, namespace) + train_data_path = envs.get_global_env("test_data_path", None, + namespace) threads = int(envs.get_runtime_environ("train.trainer.threads")) batch_size = envs.get_global_env("batch_size", None, namespace) reader_class = envs.get_global_env("class", None, namespace) abs_dir = os.path.dirname(os.path.abspath(__file__)) reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py') - pipe_cmd = "python {} {} {} {}".format( - reader, reader_class, state, self._config_yaml) + pipe_cmd = "python {} {} {} {}".format(reader, reader_class, state, + self._config_yaml) if train_data_path.startswith("paddlerec::"): package_base = envs.get_runtime_environ("PACKAGE_BASE") assert package_base is not None - train_data_path = os.path.join( - package_base, train_data_path.split("::")[1]) + train_data_path = os.path.join(package_base, + train_data_path.split("::")[1]) dataset = fluid.DatasetFactory().create_dataset() dataset.set_use_var(inputs) @@ -166,14 +168,16 @@ class OnlineLearningTrainer(TranspileTrainer): ins = self._get_dataset_ins() begin_time = time.time() - self._exe.train_from_dataset(program=fluid.default_main_program(), - dataset=dataset, - fetch_list=self.fetch_vars, - fetch_info=self.fetch_alias, - print_period=self.fetch_period) + self._exe.train_from_dataset( + program=fluid.default_main_program(), + dataset=dataset, + fetch_list=self.fetch_vars, + fetch_info=self.fetch_alias, + print_period=self.fetch_period) end_time = time.time() - times = end_time-begin_time - print("epoch {} using time {}, speed {:.2f} lines/s".format(i, times, ins/times)) + times = end_time - begin_time + print("epoch {} using time {}, speed {:.2f} lines/s".format( + i, times, ins / times)) self.save(i, "train", is_fleet=True) fleet.stop_worker() diff --git a/core/trainers/single_trainer.py b/core/trainers/single_trainer.py index 8079377b..a564ba55 100755 --- a/core/trainers/single_trainer.py +++ b/core/trainers/single_trainer.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - """ Training use fluid with one node only. """ @@ -36,8 +35,9 @@ class SingleTrainer(TranspileTrainer): self.regist_context_processor('uninit', self.instance) self.regist_context_processor('init_pass', self.init) self.regist_context_processor('startup_pass', self.startup) - if envs.get_platform() == "LINUX" and envs.get_global_env("dataset_class", None, - "train.reader") != "DataLoader": + + if envs.get_platform() == "LINUX" and envs.get_global_env( + "dataset_class", None, "train.reader") != "DataLoader": self.regist_context_processor('train_pass', self.dataset_train) else: self.regist_context_processor('train_pass', self.dataloader_train) @@ -73,9 +73,8 @@ class SingleTrainer(TranspileTrainer): reader = self._get_dataloader("TRAIN") epochs = envs.get_global_env("train.epochs") - program = fluid.compiler.CompiledProgram( - fluid.default_main_program()).with_data_parallel( - loss_name=self.model.get_avg_cost().name) + program = fluid.compiler.CompiledProgram(fluid.default_main_program( + )).with_data_parallel(loss_name=self.model.get_avg_cost().name) metrics_varnames = [] metrics_format = [] @@ -94,9 +93,8 @@ class SingleTrainer(TranspileTrainer): batch_id = 0 try: while True: - metrics_rets = self._exe.run( - program=program, - fetch_list=metrics_varnames) + metrics_rets = self._exe.run(program=program, + fetch_list=metrics_varnames) metrics = [epoch, batch_id] metrics.extend(metrics_rets) @@ -117,14 +115,16 @@ class SingleTrainer(TranspileTrainer): epochs = envs.get_global_env("train.epochs") for i in range(epochs): begin_time = time.time() - self._exe.train_from_dataset(program=fluid.default_main_program(), - dataset=dataset, - fetch_list=self.fetch_vars, - fetch_info=self.fetch_alias, - print_period=self.fetch_period) + self._exe.train_from_dataset( + program=fluid.default_main_program(), + dataset=dataset, + fetch_list=self.fetch_vars, + fetch_info=self.fetch_alias, + print_period=self.fetch_period) end_time = time.time() times = end_time - begin_time - print("epoch {} using time {}, speed {:.2f} lines/s".format(i, times, ins / times)) + print("epoch {} using time {}, speed {:.2f} lines/s".format( + i, times, ins / times)) self.save(i, "train", is_fleet=False) context['status'] = 'infer_pass' diff --git a/core/trainers/tdm_cluster_trainer.py b/core/trainers/tdm_cluster_trainer.py index 3bd1ad33..a7e8f97e 100755 --- a/core/trainers/tdm_cluster_trainer.py +++ b/core/trainers/tdm_cluster_trainer.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - """ Training use fluid with one node only. """ @@ -36,8 +35,8 @@ special_param = ["TDM_Tree_Travel", "TDM_Tree_Layer", "TDM_Tree_Info"] class TDMClusterTrainer(ClusterTrainer): def server(self, context): namespace = "train.startup" - init_model_path = envs.get_global_env( - "cluster.init_model_path", "", namespace) + init_model_path = envs.get_global_env("cluster.init_model_path", "", + namespace) assert init_model_path != "", "Cluster train must has init_model for TDM" fleet.init_server(init_model_path) logger.info("TDM: load model from {}".format(init_model_path)) @@ -48,24 +47,27 @@ class TDMClusterTrainer(ClusterTrainer): self._exe.run(fleet.startup_program) namespace = "train.startup" - load_tree = envs.get_global_env( - "tree.load_tree", True, namespace) - self.tree_layer_path = envs.get_global_env( - "tree.tree_layer_path", "", namespace) - self.tree_travel_path = envs.get_global_env( - "tree.tree_travel_path", "", namespace) - self.tree_info_path = envs.get_global_env( - "tree.tree_info_path", "", namespace) - - save_init_model = envs.get_global_env( - "cluster.save_init_model", False, namespace) - init_model_path = envs.get_global_env( - "cluster.init_model_path", "", namespace) + load_tree = envs.get_global_env("tree.load_tree", True, namespace) + + self.tree_layer_path = envs.get_global_env("tree.tree_layer_path", "", + namespace) + + self.tree_travel_path = envs.get_global_env("tree.tree_travel_path", + "", namespace) + + self.tree_info_path = envs.get_global_env("tree.tree_info_path", "", + namespace) + + save_init_model = envs.get_global_env("cluster.save_init_model", False, + namespace) + init_model_path = envs.get_global_env("cluster.init_model_path", "", + namespace) if load_tree: # covert tree to tensor, set it into Fluid's variable. for param_name in special_param: - param_t = fluid.global_scope().find_var(param_name).get_tensor() + param_t = fluid.global_scope().find_var(param_name).get_tensor( + ) param_array = self._tdm_prepare(param_name) param_t.set(param_array.astype('int32'), self._place) @@ -93,8 +95,8 @@ class TDMClusterTrainer(ClusterTrainer): def _tdm_travel_prepare(self): """load tdm tree param from npy/list file""" travel_array = np.load(self.tree_travel_path) - logger.info("TDM Tree leaf node nums: {}".format( - travel_array.shape[0])) + logger.info("TDM Tree leaf node nums: {}".format(travel_array.shape[ + 0])) return travel_array def _tdm_layer_prepare(self): diff --git a/core/trainers/tdm_single_trainer.py b/core/trainers/tdm_single_trainer.py index 21be66a6..c0f23fc3 100755 --- a/core/trainers/tdm_single_trainer.py +++ b/core/trainers/tdm_single_trainer.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - """ Training use fluid with one node only. """ @@ -27,33 +26,38 @@ from paddlerec.core.utils import envs logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger("fluid") logger.setLevel(logging.INFO) -special_param = ["TDM_Tree_Travel", "TDM_Tree_Layer", - "TDM_Tree_Info", "TDM_Tree_Emb"] +special_param = [ + "TDM_Tree_Travel", "TDM_Tree_Layer", "TDM_Tree_Info", "TDM_Tree_Emb" +] class TDMSingleTrainer(SingleTrainer): def startup(self, context): namespace = "train.startup" - load_persistables = envs.get_global_env( - "single.load_persistables", False, namespace) + load_persistables = envs.get_global_env("single.load_persistables", + False, namespace) + persistables_model_path = envs.get_global_env( "single.persistables_model_path", "", namespace) - load_tree = envs.get_global_env( - "tree.load_tree", False, namespace) - self.tree_layer_path = envs.get_global_env( - "tree.tree_layer_path", "", namespace) - self.tree_travel_path = envs.get_global_env( - "tree.tree_travel_path", "", namespace) - self.tree_info_path = envs.get_global_env( - "tree.tree_info_path", "", namespace) - self.tree_emb_path = envs.get_global_env( - "tree.tree_emb_path", "", namespace) - - save_init_model = envs.get_global_env( - "single.save_init_model", False, namespace) - init_model_path = envs.get_global_env( - "single.init_model_path", "", namespace) + load_tree = envs.get_global_env("tree.load_tree", False, namespace) + + self.tree_layer_path = envs.get_global_env("tree.tree_layer_path", "", + namespace) + + self.tree_travel_path = envs.get_global_env("tree.tree_travel_path", + "", namespace) + + self.tree_info_path = envs.get_global_env("tree.tree_info_path", "", + namespace) + + self.tree_emb_path = envs.get_global_env("tree.tree_emb_path", "", + namespace) + + save_init_model = envs.get_global_env("single.save_init_model", False, + namespace) + init_model_path = envs.get_global_env("single.init_model_path", "", + namespace) self._exe.run(fluid.default_startup_program()) if load_persistables: @@ -68,7 +72,8 @@ class TDMSingleTrainer(SingleTrainer): if load_tree: # covert tree to tensor, set it into Fluid's variable. for param_name in special_param: - param_t = fluid.global_scope().find_var(param_name).get_tensor() + param_t = fluid.global_scope().find_var(param_name).get_tensor( + ) param_array = self._tdm_prepare(param_name) if param_name == 'TDM_Tree_Emb': param_t.set(param_array.astype('float32'), self._place) @@ -102,15 +107,15 @@ class TDMSingleTrainer(SingleTrainer): def _tdm_travel_prepare(self): """load tdm tree param from npy/list file""" travel_array = np.load(self.tree_travel_path) - logger.info("TDM Tree leaf node nums: {}".format( - travel_array.shape[0])) + logger.info("TDM Tree leaf node nums: {}".format(travel_array.shape[ + 0])) return travel_array def _tdm_emb_prepare(self): """load tdm tree param from npy/list file""" emb_array = np.load(self.tree_emb_path) - logger.info("TDM Tree node nums from emb: {}".format( - emb_array.shape[0])) + logger.info("TDM Tree node nums from emb: {}".format(emb_array.shape[ + 0])) return emb_array def _tdm_layer_prepare(self): diff --git a/core/trainers/transpiler_trainer.py b/core/trainers/transpiler_trainer.py index a67d4759..c121b4ab 100755 --- a/core/trainers/transpiler_trainer.py +++ b/core/trainers/transpiler_trainer.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - """ Training use fluid with DistributeTranspiler """ @@ -39,9 +38,12 @@ class TranspileTrainer(Trainer): self.increment_models = [] def processor_register(self): - print("Need implement by trainer, `self.regist_context_processor('uninit', self.instance)` must be the first") + print( + "Need implement by trainer, `self.regist_context_processor('uninit', self.instance)` must be the first" + ) def _get_dataloader(self, state="TRAIN"): + if state == "TRAIN": dataloader = self.model._data_loader namespace = "train.reader" @@ -59,12 +61,14 @@ class TranspileTrainer(Trainer): if sparse_slots is None and dense_slots is None: reader_class = envs.get_global_env("class", None, namespace) - reader = dataloader_instance.dataloader( - reader_class, state, self._config_yaml) - reader_class = envs.lazy_instance_by_fliename(reader_class, class_name) + reader = dataloader_instance.dataloader(reader_class, state, + self._config_yaml) + reader_class = envs.lazy_instance_by_fliename(reader_class, + class_name) reader_ins = reader_class(self._config_yaml) else: - reader = dataloader_instance.slotdataloader("", state, self._config_yaml) + reader = dataloader_instance.slotdataloader("", state, + self._config_yaml) reader_ins = SlotReader(self._config_yaml) if hasattr(reader_ins, 'generate_batch_from_trainfiles'): @@ -94,13 +98,13 @@ class TranspileTrainer(Trainer): if state == "TRAIN": inputs = self.model.get_inputs() namespace = "train.reader" - train_data_path = envs.get_global_env( - "train_data_path", None, namespace) + train_data_path = envs.get_global_env("train_data_path", None, + namespace) else: inputs = self.model.get_infer_inputs() namespace = "evaluate.reader" - train_data_path = envs.get_global_env( - "test_data_path", None, namespace) + train_data_path = envs.get_global_env("test_data_path", None, + namespace) sparse_slots = envs.get_global_env("sparse_slots", None, namespace) dense_slots = envs.get_global_env("dense_slots", None, namespace) @@ -112,8 +116,8 @@ class TranspileTrainer(Trainer): reader = os.path.join(abs_dir, '../utils', 'dataset_instance.py') if sparse_slots is None and dense_slots is None: - pipe_cmd = "python {} {} {} {}".format( - reader, reader_class, state, self._config_yaml) + pipe_cmd = "python {} {} {} {}".format(reader, reader_class, state, + self._config_yaml) else: padding = envs.get_global_env("padding", 0, namespace) pipe_cmd = "python {} {} {} {} {} {} {} {}".format( @@ -123,8 +127,8 @@ class TranspileTrainer(Trainer): if train_data_path.startswith("paddlerec::"): package_base = envs.get_runtime_environ("PACKAGE_BASE") assert package_base is not None - train_data_path = os.path.join( - package_base, train_data_path.split("::")[1]) + train_data_path = os.path.join(package_base, + train_data_path.split("::")[1]) dataset = fluid.DatasetFactory().create_dataset() dataset.set_use_var(inputs) @@ -140,11 +144,11 @@ class TranspileTrainer(Trainer): debug_mode = envs.get_global_env("reader_debug_mode", False, namespace) if debug_mode: - print( - "--- Dataset Debug Mode Begin , show pre 10 data of {}---".format(file_list[0])) + print("--- Dataset Debug Mode Begin , show pre 10 data of {}---". + format(file_list[0])) os.system("cat {} | {} | head -10".format(file_list[0], pipe_cmd)) - print( - "--- Dataset Debug Mode End , show pre 10 data of {}---".format(file_list[0])) + print("--- Dataset Debug Mode End , show pre 10 data of {}---". + format(file_list[0])) exit(0) return dataset @@ -166,27 +170,29 @@ class TranspileTrainer(Trainer): if not need_save(epoch_id, save_interval, False): return - feed_varnames = envs.get_global_env( - "save.inference.feed_varnames", None, namespace) + feed_varnames = envs.get_global_env("save.inference.feed_varnames", + None, namespace) fetch_varnames = envs.get_global_env( "save.inference.fetch_varnames", None, namespace) if feed_varnames is None or fetch_varnames is None: return - fetch_vars = [fluid.default_main_program().global_block().vars[varname] - for varname in fetch_varnames] - dirname = envs.get_global_env( - "save.inference.dirname", None, namespace) + fetch_vars = [ + fluid.default_main_program().global_block().vars[varname] + for varname in fetch_varnames + ] + dirname = envs.get_global_env("save.inference.dirname", None, + namespace) assert dirname is not None dirname = os.path.join(dirname, str(epoch_id)) if is_fleet: - fleet.save_inference_model( - self._exe, dirname, feed_varnames, fetch_vars) + fleet.save_inference_model(self._exe, dirname, feed_varnames, + fetch_vars) else: - fluid.io.save_inference_model( - dirname, feed_varnames, fetch_vars, self._exe) + fluid.io.save_inference_model(dirname, feed_varnames, + fetch_vars, self._exe) self.inference_models.append((epoch_id, dirname)) def save_persistables(): @@ -196,8 +202,8 @@ class TranspileTrainer(Trainer): if not need_save(epoch_id, save_interval, False): return - dirname = envs.get_global_env( - "save.increment.dirname", None, namespace) + dirname = envs.get_global_env("save.increment.dirname", None, + namespace) assert dirname is not None dirname = os.path.join(dirname, str(epoch_id)) @@ -275,10 +281,9 @@ class TranspileTrainer(Trainer): batch_id = 0 try: while True: - metrics_rets = self._exe.run( - program=program, - fetch_list=metrics_varnames, - return_numpy=is_return_numpy) + metrics_rets = self._exe.run(program=program, + fetch_list=metrics_varnames, + return_numpy=is_return_numpy) metrics = [epoch, batch_id] metrics.extend(metrics_rets) diff --git a/core/utils/dataset_holder.py b/core/utils/dataset_holder.py index cd195450..a75d52b6 100755 --- a/core/utils/dataset_holder.py +++ b/core/utils/dataset_holder.py @@ -24,7 +24,7 @@ from paddlerec.core.utils import util as util class DatasetHolder(object): """ - Dataset Base + Dataset Holder """ __metaclass__ = abc.ABCMeta @@ -74,11 +74,17 @@ class TimeSplitDatasetHolder(DatasetHolder): Dataset.__init__(self, config) if 'data_donefile' not in config or config['data_donefile'] is None: config['data_donefile'] = config['data_path'] + "/to.hadoop.done" - self._path_generator = util.PathGenerator({'templates': [ - {'name': 'data_path', 'template': config['data_path']}, - {'name': 'donefile_path', 'template': config['data_donefile']} - ]}) - self._split_interval = config['split_interval'] # data split N mins per dir + self._path_generator = util.PathGenerator({ + 'templates': [{ + 'name': 'data_path', + 'template': config['data_path'] + }, { + 'name': 'donefile_path', + 'template': config['data_donefile'] + }] + }) + self._split_interval = config[ + 'split_interval'] # data split N mins per dir self._data_file_handler = fs.FileHandler(config) def _format_data_time(self, daytime_str, time_window_mins): @@ -91,7 +97,8 @@ class TimeSplitDatasetHolder(DatasetHolder): return None, 0 if mins_of_day % self._split_interval != 0: - skip_mins = self._split_interval - (mins_of_day % self._split_interval) + skip_mins = self._split_interval - (mins_of_day % + self._split_interval) data_time = data_time + datetime.timedelta(minutes=skip_mins) time_window_mins = time_window_mins - skip_mins return data_time, time_window_mins @@ -106,17 +113,24 @@ class TimeSplitDatasetHolder(DatasetHolder): True/False """ is_ready = True - data_time, windows_mins = self._format_data_time(daytime_str, time_window_mins) + data_time, windows_mins = self._format_data_time(daytime_str, + time_window_mins) while time_window_mins > 0: - file_path = self._path_generator.generate_path('donefile_path', {'time_format': data_time}) + file_path = self._path_generator.generate_path( + 'donefile_path', {'time_format': data_time}) if not self._data_file_handler.is_exist(file_path): is_ready = False break time_window_mins = time_window_mins - self._split_interval - data_time = data_time + datetime.timedelta(minutes=self._split_interval) + data_time = data_time + datetime.timedelta( + minutes=self._split_interval) return is_ready - def get_file_list(self, daytime_str, time_window_mins, node_num=1, node_idx=0): + def get_file_list(self, + daytime_str, + time_window_mins, + node_num=1, + node_idx=0): """ data in [daytime_str, daytime_str + time_window_mins], random shard to node_num, return shard[node_idx] Args: @@ -128,26 +142,32 @@ class TimeSplitDatasetHolder(DatasetHolder): list, data_shard[node_idx] """ data_file_list = [] - data_time, windows_mins = self._format_data_time(daytime_str, time_window_mins) + data_time, windows_mins = self._format_data_time(daytime_str, + time_window_mins) while time_window_mins > 0: - file_path = self._path_generator.generate_path('data_path', {'time_format': data_time}) + file_path = self._path_generator.generate_path( + 'data_path', {'time_format': data_time}) sub_file_list = self._data_file_handler.ls(file_path) for sub_file in sub_file_list: sub_file_name = self._data_file_handler.get_file_name(sub_file) - if not sub_file_name.startswith(self._config['filename_prefix']): + if not sub_file_name.startswith(self._config[ + 'filename_prefix']): continue if hash(sub_file_name) % node_num == node_idx: data_file_list.append(sub_file) time_window_mins = time_window_mins - self._split_interval - data_time = data_time + datetime.timedelta(minutes=self._split_interval) + data_time = data_time + datetime.timedelta( + minutes=self._split_interval) return data_file_list def _alloc_dataset(self, file_list): """ """ - dataset = fluid.DatasetFactory().create_dataset(self._config['dataset_type']) + dataset = fluid.DatasetFactory().create_dataset(self._config[ + 'dataset_type']) dataset.set_batch_size(self._config['batch_size']) dataset.set_thread(self._config['load_thread']) - dataset.set_hdfs_config(self._config['fs_name'], self._config['fs_ugi']) + dataset.set_hdfs_config(self._config['fs_name'], + self._config['fs_ugi']) dataset.set_pipe_command(self._config['data_converter']) dataset.set_filelist(file_list) dataset.set_use_var(self._config['data_vars']) @@ -163,7 +183,9 @@ class TimeSplitDatasetHolder(DatasetHolder): while self.check_ready(begin_time, windown_min) == False: print("dataset not ready, time:" + begin_time) time.sleep(30) - file_list = self.get_file_list(begin_time, windown_min, params['node_num'], params['node_idx']) + file_list = self.get_file_list(begin_time, windown_min, + params['node_num'], + params['node_idx']) self._datasets[begin_time] = self._alloc_dataset(file_list) self._datasets[begin_time].load_into_memory() else: @@ -176,9 +198,12 @@ class TimeSplitDatasetHolder(DatasetHolder): windown_min = params['time_window_min'] if begin_time not in self._datasets: if self.check_ready(begin_time, windown_min): - file_list = self.get_file_list(begin_time, windown_min, params['node_num'], params['node_idx']) + file_list = self.get_file_list(begin_time, windown_min, + params['node_num'], + params['node_idx']) self._datasets[begin_time] = self._alloc_dataset(file_list) - self._datasets[begin_time].preload_into_memory(self._config['preload_thread']) + self._datasets[begin_time].preload_into_memory(self._config[ + 'preload_thread']) return True return False diff --git a/core/utils/dataset_instance.py b/core/utils/dataset_instance.py index f5175c48..2e6082dc 100755 --- a/core/utils/dataset_instance.py +++ b/core/utils/dataset_instance.py @@ -17,10 +17,11 @@ import sys from paddlerec.core.utils.envs import lazy_instance_by_fliename from paddlerec.core.reader import SlotReader -from paddlerec.core.utils import envs if len(sys.argv) < 4: - raise ValueError("reader only accept 3 argument: 1. reader_class 2.train/evaluate/slotreader 3.yaml_abs_path") + raise ValueError( + "reader only accept 3 argument: 1. reader_class 2.train/evaluate/slotreader 3.yaml_abs_path" + ) reader_package = sys.argv[1] diff --git a/core/utils/envs.py b/core/utils/envs.py index 7093d897..bc222e90 100755 --- a/core/utils/envs.py +++ b/core/utils/envs.py @@ -95,7 +95,7 @@ def path_adapter(path): l_p = path.split("paddlerec.")[1].replace(".", "/") return os.path.join(package, l_p) else: - return path + return path def windows_path_converter(path): @@ -159,8 +159,8 @@ def pretty_print_envs(envs, header=None): def lazy_instance_by_package(package, class_name): models = get_global_env("train.model.models") - model_package = __import__( - package, globals(), locals(), package.split(".")) + model_package = __import__(package, + globals(), locals(), package.split(".")) instance = getattr(model_package, class_name) return instance @@ -170,8 +170,8 @@ def lazy_instance_by_fliename(abs, class_name): sys.path.append(dirname) package = os.path.splitext(os.path.basename(abs))[0] - model_package = __import__( - package, globals(), locals(), package.split(".")) + model_package = __import__(package, + globals(), locals(), package.split(".")) instance = getattr(model_package, class_name) return instance @@ -189,8 +189,7 @@ def get_platform(): def find_free_port(): def __free_port(): - with closing(socket.socket(socket.AF_INET, - socket.SOCK_STREAM)) as s: + with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: s.bind(('', 0)) return s.getsockname()[1] diff --git a/core/utils/util.py b/core/utils/util.py index bd632848..34f26c6d 100755 --- a/core/utils/util.py +++ b/core/utils/util.py @@ -22,6 +22,7 @@ from paddlerec.core.utils import fs as fs def save_program_proto(path, program=None): + if program is None: _program = fluid.default_main_program() else: @@ -175,7 +176,8 @@ class PathGenerator(object): """ if template_name in self._templates: if 'time_format' in param: - str = param['time_format'].strftime(self._templates[template_name]) + str = param['time_format'].strftime(self._templates[ + template_name]) return str.format(**param) return self._templates[template_name].format(**param) else: @@ -198,31 +200,39 @@ class TimeTrainPass(object): self._begin_day = make_datetime(day_fields[0].strip()) if len(day_fields) == 1 or len(day_fields[1]) == 0: # 100 years, meaning to continuous running - self._end_day = self._begin_day + datetime.timedelta(days=36500) + self._end_day = self._begin_day + datetime.timedelta( + days=36500) else: # example: 2020212+10 run_day = int(day_fields[1].strip()) - self._end_day = self._begin_day + datetime.timedelta(days=run_day) + self._end_day = self._begin_day + datetime.timedelta( + days=run_day) else: # example: {20191001..20191031} - days = os.popen("echo -n " + self._config['days']).read().split(" ") + days = os.popen("echo -n " + self._config['days']).read().split( + " ") self._begin_day = make_datetime(days[0]) self._end_day = make_datetime(days[len(days) - 1]) self._checkpoint_interval = self._config['checkpoint_interval'] self._dump_inference_interval = self._config['dump_inference_interval'] - self._interval_per_pass = self._config['train_time_interval'] # train N min data per pass + self._interval_per_pass = self._config[ + 'train_time_interval'] # train N min data per pass self._pass_id = 0 self._inference_pass_id = 0 self._pass_donefile_handler = None if 'pass_donefile_name' in self._config: - self._train_pass_donefile = global_config['output_path'] + '/' + self._config['pass_donefile_name'] + self._train_pass_donefile = global_config[ + 'output_path'] + '/' + self._config['pass_donefile_name'] if fs.is_afs_path(self._train_pass_donefile): - self._pass_donefile_handler = fs.FileHandler(global_config['io']['afs']) + self._pass_donefile_handler = fs.FileHandler(global_config[ + 'io']['afs']) else: - self._pass_donefile_handler = fs.FileHandler(global_config['io']['local_fs']) + self._pass_donefile_handler = fs.FileHandler(global_config[ + 'io']['local_fs']) - last_done = self._pass_donefile_handler.cat(self._train_pass_donefile).strip().split('\n')[-1] + last_done = self._pass_donefile_handler.cat( + self._train_pass_donefile).strip().split('\n')[-1] done_fileds = last_done.split('\t') if len(done_fileds) > 4: self._base_key = done_fileds[1] @@ -236,15 +246,18 @@ class TimeTrainPass(object): """ return 24 * 60 / self._interval_per_pass - def save_train_progress(self, day, pass_id, base_key, model_path, is_checkpoint): + def save_train_progress(self, day, pass_id, base_key, model_path, + is_checkpoint): """R """ if is_checkpoint: self._checkpoint_pass_id = pass_id self._checkpoint_model_path = model_path - done_content = "%s\t%s\t%s\t%s\t%d\n" % (day, base_key, - self._checkpoint_model_path, self._checkpoint_pass_id, pass_id) - self._pass_donefile_handler.write(done_content, self._train_pass_donefile, 'a') + done_content = "%s\t%s\t%s\t%s\t%d\n" % ( + day, base_key, self._checkpoint_model_path, + self._checkpoint_pass_id, pass_id) + self._pass_donefile_handler.write(done_content, + self._train_pass_donefile, 'a') pass def init_pass_by_id(self, date_str, pass_id): @@ -286,12 +299,14 @@ class TimeTrainPass(object): if self._pass_id < 1: self.init_pass_by_time(self._begin_day.strftime("%Y%m%d%H%M")) else: - next_time = self._current_train_time + datetime.timedelta(minutes=self._interval_per_pass) + next_time = self._current_train_time + datetime.timedelta( + minutes=self._interval_per_pass) if (next_time - self._end_day).total_seconds() > 0: has_next = False else: self.init_pass_by_time(next_time.strftime("%Y%m%d%H%M")) - if has_next and (self._inference_pass_id < self._pass_id or self._pass_id < old_pass_id): + if has_next and (self._inference_pass_id < self._pass_id or + self._pass_id < old_pass_id): self._inference_pass_id = self._pass_id - 1 return has_next @@ -319,9 +334,11 @@ class TimeTrainPass(object): Return: date(current_train_time + delta_day) """ - return (self._current_train_time + datetime.timedelta(days=delta_day)).strftime("%Y%m%d") + return (self._current_train_time + datetime.timedelta(days=delta_day) + ).strftime("%Y%m%d") def timestamp(self, delta_day=0): """R """ - return (self._current_train_time + datetime.timedelta(days=delta_day)).timestamp() + return (self._current_train_time + datetime.timedelta(days=delta_day) + ).timestamp() -- GitLab