From 765a2adaf540e3b533f23bac9ce8b21bd9a96e33 Mon Sep 17 00:00:00 2001 From: ziyoujiyi <73728031+ziyoujiyi@users.noreply.github.com> Date: Mon, 14 Feb 2022 14:18:41 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BB=9F=E4=B8=80ps=EF=BC=9Aheter=20ps=20?= =?UTF-8?q?=E4=BA=8C=E9=98=B6=E6=AE=B5=E5=8D=95=E6=B5=8B=E9=80=9A=E8=BF=87?= =?UTF-8?q?=20(#39468)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * delete gloo connect retry * the_one_ps dirs reconstruct * . * . * create the_one_ps dirs * create the_one_ps dirs * create the_one_ps dirs * create the_one_ps dirs * create the_one_ps dirs * create the_one_ps dirs * the one ps dirs modify * the one ps dirs modify * the one ps dirs modify * the one ps dirs modify * refactor ps optimize * refactor ps optimize * refactor ps optimize * . * . * . * . * . * . * refactor theoneps * the_one_ps * add ps pass unittest * add ps pass unittest * ps unitest frame * ps unittest frame * ps unittest frame * ps unittest frame * ps unittest frame * ps unittest frame * ps unittest frame * ps unittest frame * ps unittest frame * ps unittest frame * ps unittest frame * ps unittest frame * ps unittest frame * ps unittest frame * ps unittest frame * ps unittest frame * ps unittest frame * add cpu_async_ps_mode test * add cpu_async_ps_mode test * add cpu_async_ps_mode test * ps unittest ready * ps unittest ready * solve dist_pass init conflict * solve import CommContext error * unittest ok * implement AllocateFrom * solve setup.py.in conflict * solve conflict * solve conflict * solve conflict * . * . * cpu-async-ps minimize test ok & gpu minimize test ok * add heter 2stage unittest * add heter 2stage unittest * add heter 2stage unittest Co-authored-by: zkh2016 --- .../distributed/passes/ps_trainer_pass.py | 125 ++++++++++++--- .../ps/utils/ps_program_builder.py | 11 +- python/paddle/distributed/ps/utils/public.py | 148 ++++++++++++++++++ .../distributed_passes/ps_pass_test_base.py | 14 +- .../test_ps_trainer_pass.py | 21 +++ .../tests/unittests/ps/heter_ps_config.yaml | 36 +++++ .../tests/unittests/ps/ps_dnn_trainer.py | 6 +- .../fluid/tests/unittests/ps_dnn_model.py | 10 +- 8 files changed, 336 insertions(+), 35 deletions(-) create mode 100644 python/paddle/fluid/tests/unittests/ps/heter_ps_config.yaml mode change 100644 => 100755 python/paddle/fluid/tests/unittests/ps_dnn_model.py diff --git a/python/paddle/distributed/passes/ps_trainer_pass.py b/python/paddle/distributed/passes/ps_trainer_pass.py index 28cfa873b96..fff10a2d468 100755 --- a/python/paddle/distributed/passes/ps_trainer_pass.py +++ b/python/paddle/distributed/passes/ps_trainer_pass.py @@ -21,25 +21,6 @@ from .pass_base import PassBase, register_pass from paddle.fluid.transpiler.details.program_utils import delete_ops from paddle.fluid.transpiler.collective import SingleProcessMultiThread -OP_NAME_SCOPE = "op_namescope" -CLIP_OP_NAME_SCOPE = "gradient_clip" -STEP_COUNTER = "@PS_STEP_COUNTER@" -OP_ROLE_VAR_ATTR_NAME = core.op_proto_and_checker_maker.kOpRoleVarAttrName() -RPC_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.RPC -LR_SCHED_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.LRSched -OPT_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.Optimize -op_role_attr_name = core.op_proto_and_checker_maker.kOpRoleAttrName() -backward = core.op_proto_and_checker_maker.OpRole.Backward - -SPARSE_OP_TYPE_DICT = {"lookup_table": "W", "lookup_table_v2": "W"} -SPARSE_GRAD_OP_TYPE_DICT = { - "lookup_table_grad": "W", - "lookup_table_v2_grad": "W" -} -DEVICE_LIST = ["cpu", "gpu", "xpu"] -COMMUNICATE_OPS_TYPE = ["send", "recv", "fetch_barrier", "send_barrier"] -DEFAULT_DEVICE = 'cpu' - @register_pass("append_send_ops_pass") class AppendSendOpsPass(PassBase): # 该 pass 被多种模式复用 @@ -894,6 +875,100 @@ class SplitTrainerOpsPass(PassBase): def _check_conflict(self, other_pass): return True + def _replace_ops_by_communicate_op(self, program, attrs, heter_block_index, + ops_list, block_var_detail): + all_op = program.global_block().ops + start_op = ops_list[0] + first_op_idx = -1 + for op in all_op: + if str(op) == str(start_op): + first_op_idx = all_op.index(op) + break + assert first_op_idx != -1 + self._delete_same_ops(program.global_block(), ops_list) + + entrance_var = [] + role_maker = attrs['role_maker'] + if heter_block_index == 1: + next_heter_worker_endpoints = get_next_stage_trainers(role_maker) + + entrance_var = block_var_detail[heter_block_index]["forward"][ + "entrance"] + + comm_info = get_communicate_var_info(program, heter_block_index + 1, + entrance_var) + program.global_block()._insert_op( + index=first_op_idx, + type="send_and_recv", + inputs={"X": program.global_block().vars[entrance_var[0]]}, + outputs={"Out": []}, + attrs={ + "mode": "forward", + "send_var_name": entrance_var + ["microbatch_id"], + "recv_var_name": [], + "message_name": comm_info["block_input_var_name"], + "next_endpoints": next_heter_worker_endpoints, + "previous_endpoints": [], + "trainer_id": get_role_id(role_maker), + RPC_OP_ROLE_ATTR_NAME: RPC_OP_ROLE_ATTR_VALUE + }) + + return entrance_var + + def _delete_same_ops(self, block, ops): + for op in ops: + try: + for origin_op in block.ops: + if str(origin_op) == str(op): + idx = list(block.ops).index(origin_op) + block._remove_op(idx) + break + except Exception as e: + print(e) + + def _remove_var_pair_by_grad(self, var_name, attrs): + for index, pair in enumerate(attrs['merged_variables_pairs']): + var = pair[0] + var_grad = pair[1] + if var_grad.merged_var.name == var_name: + del attrs['merged_variables_pairs'][index] + + for index, pair in enumerate(attrs['merged_dense_pairs']): + var = pair[0] + var_grad = pair[1] + if var_grad.merged_var.name == var_name: + del attrs['merged_dense_pairs'][index] + return + + for index, pair in enumerate(attrs['merged_sparse_pairs']): + var = pair[0] + var_grad = pair[1] + if var_grad.merged_var.name == var_name: + del attrs['merged_sparse_pairs'][index] + return + + def _remove_trainer_send_op(self, program, attrs, heter_block_index, + block_var_detail): + # if trainer do FF->BP->SEND, it has follow vars: var, var@GRAD + # if trainer only do SEND, it has one var: var@GRAD + # Delete Send op ,if trainer doesn't has pair var (var<->var@GRAD) + persistables = block_var_detail[heter_block_index]["forward"]["persistables"] + \ + block_var_detail[heter_block_index]["backward"]["persistables"] + need_remove_send_op = [] + need_remove_grad_var = [] + for op in find_send_op(program): + input_list, _ = find_op_input_output(program, + program.global_block(), op) + for var_name in input_list: + origin_var_name = var_name.split("@GRAD")[0] + if origin_var_name in persistables: + need_remove_send_op.append(op) + need_remove_grad_var.append(var_name) + need_remove_send_op = list(set(need_remove_send_op)) + delete_ops(program.global_block(), need_remove_send_op) + for grad_var_name in need_remove_grad_var: + self._remove_var_pair_by_grad(grad_var_name, attrs) + def _create_trainer_program(self, program, origin_program, attrs, program_block_ops_list, block_var_detail): # This function mainly includes the following contents: @@ -911,18 +986,18 @@ class SplitTrainerOpsPass(PassBase): ops_list = program_block_ops_list[heter_block_index][ "forward"] + program_block_ops_list[heter_block_index][ "backward"] - static_var += replace_ops_by_communicate_op( + static_var += self._replace_ops_by_communicate_op( program, attrs, heter_block_index, ops_list, block_var_detail) - remove_trainer_send_op(program, attrs, heter_block_index, - block_var_detail) + self._remove_trainer_send_op(program, attrs, heter_block_index, + block_var_detail) optimizer_block = [] grad_to_block_id = [] bp_ops_list = program_block_ops_list[0]["backward"] - delete_same_ops(program.global_block(), bp_ops_list) - delete_trainer_useless_var(attrs, program, static_var) - backward_block = create_backward_block(program, origin_program, attrs, + self._delete_same_ops(program.global_block(), bp_ops_list) + delete_trainer_useless_var(program, static_var) + backward_block = create_backward_block(program, origin_program, bp_ops_list, block_var_detail) bp_entrance_vars = block_var_detail[0]["backward"]["entrance"] diff --git a/python/paddle/distributed/ps/utils/ps_program_builder.py b/python/paddle/distributed/ps/utils/ps_program_builder.py index d978adaaba0..d649a74e4d6 100755 --- a/python/paddle/distributed/ps/utils/ps_program_builder.py +++ b/python/paddle/distributed/ps/utils/ps_program_builder.py @@ -186,10 +186,10 @@ class HeterAsyncPsProgramBuilder(PsProgramBuilder): add_lr_decay_table_pass.apply([], [], self.pass_ctx) distributed_ops_pass = new_pass("distributed_ops_pass", self.attrs) - distributed_ops_pass.apply([self.cloned_main], [], self.pass_ctx) + distributed_ops_pass.apply([self.cloned_main], [None], self.pass_ctx) delete_optimizer_pass = new_pass("delete_optimizer_pass", self.attrs) - delete_optimizer_pass.apply([None], [_startup], self.pass_ctx) + delete_optimizer_pass.apply([self.cloned_main], [None], self.pass_ctx) append_send_ops_pass = new_pass("append_send_ops_pass", self.attrs) append_send_ops_pass.apply([self.cloned_main], [None], self.pass_ctx) @@ -210,12 +210,13 @@ class HeterAsyncPsProgramBuilder(PsProgramBuilder): else: split_trainer_ops_pass = new_pass("split_trainer_ops_pass", self.attrs) - split_trainer_ops_pass([self.cloned_main], [], self.pass_ctx) + split_trainer_ops_pass.apply([self.cloned_main], [None], + self.pass_ctx) set_heter_pipeline_opt_pass = new_pass('set_heter_pipeline_opt_pass', self.attrs) set_heter_pipeline_opt_pass.apply([self.cloned_main], - [self.cloned_startup], pass_ctx) + [self.cloned_startup], self.pass_ctx) if self.launch_barrier and self.launch_barrier_flag: wait_server_ready(server_endpoints) @@ -228,7 +229,7 @@ class HeterAsyncPsProgramBuilder(PsProgramBuilder): ps_set_heter_pipeline_opt_pass = new_pass( "set_heter_pipeline_opt_pass", self.attrs) ps_set_heter_pipeline_opt_pass.apply( - [self.loss.block.program], [startup_program], self.pass_ctx) + [self.cloned_main], [self.cloned_startup], self.pass_ctx) elif self.attrs['is_server']: self._build_pserver_programs() diff --git a/python/paddle/distributed/ps/utils/public.py b/python/paddle/distributed/ps/utils/public.py index 3c883a0158a..a8587874776 100755 --- a/python/paddle/distributed/ps/utils/public.py +++ b/python/paddle/distributed/ps/utils/public.py @@ -42,9 +42,17 @@ RPC_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.RPC op_role_attr_name = core.op_proto_and_checker_maker.kOpRoleAttrName() LR_SCHED_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.LRSched OPT_OP_ROLE_ATTR_VALUE = core.op_proto_and_checker_maker.OpRole.Optimize +backward = core.op_proto_and_checker_maker.OpRole.Backward +DEVICE_LIST = ["cpu", "gpu", "xpu"] +COMMUNICATE_OPS_TYPE = ["send", "recv", "fetch_barrier", "send_barrier"] SPARSE_OP_LIST = ["lookup_table", "lookup_table_v2"] SPARSE_OP_TYPE_DICT = {"lookup_table": "W", "lookup_table_v2": "W"} +SPARSE_GRAD_OP_TYPE_DICT = { + "lookup_table_grad": "W", + "lookup_table_v2_grad": "W" +} +DEFAULT_DEVICE = 'cpu' def logger_config(log_path, logging_name): @@ -640,6 +648,20 @@ def find_block_joints(program, program_block_ops_list, heter_ops): return block_var_detail +def find_ops_list_input_output(program, ops_list): + input_var_list = [] + output_var_list = [] + for op in ops_list: + inputs = _get_input_map_from_op(program.global_block().vars, op) + input_var_list += get_varlist_from_op_map(inputs) + outputs = _get_output_map_from_op(program.global_block().vars, op) + output_var_list += get_varlist_from_op_map(outputs) + + input_var_list = list(set(input_var_list)) + output_var_list = list(set(output_var_list)) + return input_var_list, output_var_list + + def find_entrance_exit_private(program, program_block_ops_list): block_var_detail = [] persistables = [] @@ -850,6 +872,54 @@ def _get_output_map_from_op(varmap, op): return iomap +def get_varlist_from_op_map(var_map): + var_list = [] + for key, varlist in six.iteritems(var_map): + if not isinstance(varlist, list): + varlist = [varlist] + for i in range(len(varlist)): + var = varlist[i] + var_list.append(var.name) + return var_list + + +def _get_input_map_from_op(varmap, op): + """Returns a dict from op input name to the vars in varmap.""" + iomap = collections.OrderedDict() + for key in op.input_names: + vars = [] + for varname in op.input(key): + if varname == "@EMPTY@": + continue + if "lod_tensor_blocking_queue" in varname: + continue + vars.append(varmap[varname]) + if len(vars) == 1: + iomap[key] = vars[0] + else: + iomap[key] = vars + return iomap + + +def screen_persistables(program, var_list): + need_remove = [] + for var_name in var_list: + if "@GRAD" in var_name: + if "GRAD" != var_name.split("@")[-1]: + continue + origin_var_name = var_name.split("@GRAD")[0] + var = program.global_block().vars[origin_var_name] + else: + var = program.global_block().vars[var_name] + + if fluid.io.is_persistable(var): + need_remove.append(var_name) + + for var_name in need_remove: + var_list.remove(var_name) + return need_remove + + def block_append_op(program, origin_program, block, op): merge_ordereddict = origin_program.global_block().vars.copy() merge_ordereddict.update(block.vars) @@ -1154,6 +1224,84 @@ def get_param_grads(origin_program): return sparse_param_grads, dense_param_grads +def delete_ops(block, ops): + for op in ops: + try: + idx = list(block.ops).index(op) + block._remove_op(idx) + except Exception as e: + print(e) + + +def find_send_op(program): + send_op_list = [] + for op in program.global_block().ops: + if op.type == "send": + send_op_list.append(op) + return send_op_list + + +def find_op_input_output(program, block, op): + input_var_list = [] + output_var_list = [] + inputs = _get_input_map_from_op(block.vars, op) + input_var_list += get_varlist_from_op_map(inputs) + outputs = _get_output_map_from_op(block.vars, op) + output_var_list += get_varlist_from_op_map(outputs) + input_var_list = list(set(input_var_list)) + output_var_list = list(set(output_var_list)) + return input_var_list, output_var_list + + +def get_vars_name_in_block(block): + vars_list = block.vars.keys() + vars_name_list = [var_name for var_name in vars_list] + return vars_name_list + + +def delete_trainer_useless_var(program, static_var): + static_var = list(set(static_var)) + program_useful_var_list = [] + for op in program.global_block().ops: + input_var_list, output_var_list = find_op_input_output( + program, program.global_block(), op) + op_var_list = list(set(input_var_list).union(set(output_var_list))) + program_useful_var_list = list( + set(program_useful_var_list).union(set(op_var_list))) + program_useful_var_list += static_var + program_useless_var_list = list( + set(get_vars_name_in_block(program.global_block())).difference( + set(program_useful_var_list))) + for var in program_useless_var_list: + program.global_block()._remove_var(var) + return program_useless_var_list + + +def create_backward_block(program, origin_program, bp_ops_list, + block_var_detail): + pre_block_idx = program.num_blocks - 1 + heter_block = program._create_block(pre_block_idx) + + for _, op in enumerate(bp_ops_list): + if op.type == "send": + send_varnames = op.attr('send_varnames') + is_skip = False + for varname in send_varnames: + if varname not in program.global_block( + ).vars and varname not in heter_block.vars: + is_skip = True + break + if is_skip == True: + continue + block_append_op(program, origin_program, heter_block, op) + + entrance_vars = block_var_detail[0]["backward"]["entrance"] + add_vars_by_var_list(entrance_vars, origin_program, program, heter_block) + exit_vars = block_var_detail[0]["backward"]["exit"] + add_vars_by_var_list(exit_vars, origin_program, program, heter_block) + return heter_block + + def debug_program(file, program, is_trainer): if is_trainer: with open(file, 'w+') as f: diff --git a/python/paddle/fluid/tests/unittests/distributed_passes/ps_pass_test_base.py b/python/paddle/fluid/tests/unittests/distributed_passes/ps_pass_test_base.py index 1848fa04b4f..63dd4b8e21e 100755 --- a/python/paddle/fluid/tests/unittests/distributed_passes/ps_pass_test_base.py +++ b/python/paddle/fluid/tests/unittests/distributed_passes/ps_pass_test_base.py @@ -22,6 +22,7 @@ import inspect import unittest import numpy as np from collections import OrderedDict +from paddle.distributed.ps.utils.public import logger from dist_pass_test_base import prepare_python_path_and_return_module, remove_path_if_exists import paddle.distributed.fleet as fleet @@ -37,7 +38,7 @@ class PsPassTestBase(unittest.TestCase): print('Ps tearDown...') def ps_launch(self, config, ps_mode="cpu-ps"): - if ps_mode == "cpu-ps": + if ps_mode == "cpu-ps" or ps_mode == 'heter-ps': os.environ['WITH_DISTRIBUTE'] = 'ON' cmd = [ @@ -45,7 +46,16 @@ class PsPassTestBase(unittest.TestCase): "-u", ] + [ "-m", "launch", "--log_dir", config['log_dir'], "--worker_num", - config['worker_num'], "--server_num", config['server_num'], + config['worker_num'], "--server_num", config['server_num'] + ] + if ps_mode == 'heter-ps': + os.environ['FLAGS_START_PORT'] = '12004' + cmd += [ + '--heter_worker_num', config['heter_worker_num'], + '--heter_devices', config['heter_devices'] + ] + + cmd += [ "../ps/ps_dnn_trainer.py", "-m", config['ps_mode_config'], "--run_minimize", config['run_minimize'], "--run_single_pass", config['run_single_pass'], "--debug_new_pass", diff --git a/python/paddle/fluid/tests/unittests/distributed_passes/test_ps_trainer_pass.py b/python/paddle/fluid/tests/unittests/distributed_passes/test_ps_trainer_pass.py index ac6dd17359e..f28e99fc00d 100755 --- a/python/paddle/fluid/tests/unittests/distributed_passes/test_ps_trainer_pass.py +++ b/python/paddle/fluid/tests/unittests/distributed_passes/test_ps_trainer_pass.py @@ -63,6 +63,27 @@ class TestPsTrainerPass(PsPassTestBase): self.check() + # heter ps 三阶段待测 + def test_ps_optimizer_minimize_heter(self): + self.init() + self.config['worker_num'] = "2" + self.config['server_num'] = "2" + self.config['heter_worker_num'] = '2' + self.config['heter_devices'] = 'gpu' + + self.config['run_minimize'] = '1' + self.config['ps_mode_config'] = "../ps/heter_ps_config.yaml" + + self.config['debug_new_minimize'] = '0' + self.config['log_dir'] = "/heter_log_old_minimize" + remove_path_if_exists(self.config['log_dir']) + self.ps_launch(self.config, 'heter-ps') + + self.config['debug_new_minimize'] = '1' + self.config['log_dir'] = "/heter_log_new_minimize" + remove_path_if_exists(self.config['log_dir']) + self.ps_launch(self.config, 'heter-ps') + def test_ps_optimizer_minimize_gpu(self): self.init() self.config['run_minimize'] = '1' diff --git a/python/paddle/fluid/tests/unittests/ps/heter_ps_config.yaml b/python/paddle/fluid/tests/unittests/ps/heter_ps_config.yaml new file mode 100644 index 00000000000..d0c48e242d9 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/ps/heter_ps_config.yaml @@ -0,0 +1,36 @@ +# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +hyper_parameters: + optimizer: + class: Adam + learning_rate: 0.0001 + strategy: async # 有用 + sparse_inputs_slots: 27 + sparse_feature_number: 1024 + sparse_feature_dim: 11 + dense_input_dim: 13 + fc_sizes: [512, 256, 128, 32] + distributed_embedding: 0 + +runner: + sync_mode: "heter" + thread_num: 8 + micro_num: 8 # micro batch num for each thread + pipeline: True + + model_path: "../ps_dnn_model.py" + + diff --git a/python/paddle/fluid/tests/unittests/ps/ps_dnn_trainer.py b/python/paddle/fluid/tests/unittests/ps/ps_dnn_trainer.py index 2b6ce2e7113..8f8ff65af54 100755 --- a/python/paddle/fluid/tests/unittests/ps/ps_dnn_trainer.py +++ b/python/paddle/fluid/tests/unittests/ps/ps_dnn_trainer.py @@ -23,7 +23,6 @@ import yaml, six, copy import paddle import os import warnings -import logging import ast import numpy as np import struct @@ -176,6 +175,10 @@ def get_user_defined_strategy(config): strategy = paddle.distributed.fleet.DistributedStrategy() strategy.a_sync = True strategy.a_sync_configs = {"heter_worker_device_guard": "gpu"} + strategy.pipeline = True + strategy.pipeline_configs = { + "accumulate_steps": config.get('runner.micro_num') + } elif sync_mode == "gpubox": print("sync_mode = {}".format(sync_mode)) strategy = paddle.distributed.fleet.DistributedStrategy() @@ -328,6 +331,7 @@ class DnnTrainer(object): if self.config['debug_new_minimize'] == 1: logger.info("entering run_minimize -- new") + self.role_maker._generate_role() # 必要 from paddle.distributed.fleet.meta_optimizers.ps_optimizer import ParameterServerOptimizer ps_optimizer = ParameterServerOptimizer(inner_optimizer) ps_optimizer._set_basic_info(loss, self.role_maker, inner_optimizer, diff --git a/python/paddle/fluid/tests/unittests/ps_dnn_model.py b/python/paddle/fluid/tests/unittests/ps_dnn_model.py old mode 100644 new mode 100755 index 1a42df030b3..0a147334dab --- a/python/paddle/fluid/tests/unittests/ps_dnn_model.py +++ b/python/paddle/fluid/tests/unittests/ps_dnn_model.py @@ -17,6 +17,7 @@ import paddle.nn as nn import paddle.nn.functional as F import math import paddle.distributed.fleet as fleet +from paddle.distributed.ps.utils.public import logger class DNNLayer(nn.Layer): @@ -77,8 +78,13 @@ class DNNLayer(nn.Layer): y_dnn = paddle.concat(x=sparse_embs + [dense_inputs], axis=1) - for n_layer in self._mlp_layers: - y_dnn = n_layer(y_dnn) + if self.sync_mode == 'heter': + with paddle.fluid.device_guard('gpu'): + for n_layer in self._mlp_layers: + y_dnn = n_layer(y_dnn) + else: + for n_layer in self._mlp_layers: + y_dnn = n_layer(y_dnn) return y_dnn -- GitLab