提交 a59e0631 编写于 作者: F frankwhzhang

fix mmoe

上级 91fda308
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from paddlerec.core.reader import Reader
class EvaluateReader(Reader):
def init(self):
pass
def generate_sample(self, line):
"""
Read the data line by line and process it as a dictionary
"""
def reader():
"""
This function needs to be implemented by the user, based on data format
"""
l = line.strip().split(',')
l = list(map(float, l))
label_income = []
label_marital = []
data = l[2:]
if int(l[1]) == 0:
label_income = [1, 0]
elif int(l[1]) == 1:
label_income = [0, 1]
if int(l[0]) == 0:
label_marital = [1, 0]
elif int(l[0]) == 1:
label_marital = [0, 1]
feature_name = ["input", "label_income", "label_marital"]
yield zip(feature_name, [data] + [label_income] + [label_marital])
return reader
...@@ -12,43 +12,57 @@ ...@@ -12,43 +12,57 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
evaluate: workspace: "paddlerec.models.multitask.mmoe"
reader:
batch_size: 1
class: "{workspace}/census_infer_reader.py"
test_data_path: "{workspace}/data/train"
train: dataset:
trainer: - name: dataset_train
# for cluster training batch_size: 1
strategy: "async" type: QueueDataset
data_path: "{workspace}/data/train"
data_converter: "{workspace}/census_reader.py"
- name: dataset_infer
batch_size: 1
type: QueueDataset
data_path: "{workspace}/data/train"
data_converter: "{workspace}/census_reader.py"
epochs: 3 hyper_parameters:
workspace: "paddlerec.models.multitask.mmoe" feature_size: 499
device: cpu expert_num: 8
gate_num: 2
expert_size: 16
tower_size: 8
optimizer:
class: adam
learning_rate: 0.001
strategy: async
reader: #use infer_runner mode and modify 'phase' below if infer
batch_size: 1 mode: train_runner
class: "{workspace}/census_reader.py" #mode: infer_runner
train_data_path: "{workspace}/data/train"
model: runner:
models: "{workspace}/model.py" - name: train_runner
hyper_parameters: class: single_train
feature_size: 499 device: cpu
expert_num: 8 epochs: 3
gate_num: 2 save_checkpoint_interval: 2
expert_size: 16 save_inference_interval: 4
tower_size: 8 save_checkpoint_path: "increment"
learning_rate: 0.001 save_inference_path: "inference"
optimizer: adam print_interval: 10
- name: infer_runner
class: single_infer
init_model_path: "increment/0"
device: cpu
epochs: 3
save: phase:
increment: - name: train
dirname: "increment" model: "{workspace}/model.py"
epoch_interval: 2 dataset_name: dataset_train
save_last: True thread_num: 1
inference: #- name: infer
dirname: "inference" # model: "{workspace}/model.py"
epoch_interval: 4 # dataset_name: dataset_infer
save_last: True # thread_num: 1
...@@ -22,53 +22,51 @@ class Model(ModelBase): ...@@ -22,53 +22,51 @@ class Model(ModelBase):
def __init__(self, config): def __init__(self, config):
ModelBase.__init__(self, config) ModelBase.__init__(self, config)
def MMOE(self, is_infer=False): def _init_hyper_parameters(self):
feature_size = envs.get_global_env("hyper_parameters.feature_size", self.feature_size = envs.get_global_env(
None, self._namespace) "hyper_parameters.feature_size")
expert_num = envs.get_global_env("hyper_parameters.expert_num", None, self.expert_num = envs.get_global_env("hyper_parameters.expert_num")
self._namespace) self.gate_num = envs.get_global_env("hyper_parameters.gate_num")
gate_num = envs.get_global_env("hyper_parameters.gate_num", None, self.expert_size = envs.get_global_env("hyper_parameters.expert_size")
self._namespace) self.tower_size = envs.get_global_env("hyper_parameters.tower_size")
expert_size = envs.get_global_env("hyper_parameters.expert_size", None,
self._namespace) def input_data(self, is_infer=False, **kwargs):
tower_size = envs.get_global_env("hyper_parameters.tower_size", None, inputs = fluid.data(
self._namespace) name="input", shape=[-1, self.feature_size], dtype="float32")
input_data = fluid.data(
name="input", shape=[-1, feature_size], dtype="float32")
label_income = fluid.data( label_income = fluid.data(
name="label_income", shape=[-1, 2], dtype="float32", lod_level=0) name="label_income", shape=[-1, 2], dtype="float32", lod_level=0)
label_marital = fluid.data( label_marital = fluid.data(
name="label_marital", shape=[-1, 2], dtype="float32", lod_level=0) name="label_marital", shape=[-1, 2], dtype="float32", lod_level=0)
if is_infer: if is_infer:
self._infer_data_var = [input_data, label_income, label_marital] return [inputs, label_income, label_marital]
self._infer_data_loader = fluid.io.DataLoader.from_generator( else:
feed_list=self._infer_data_var, return [inputs, label_income, label_marital]
capacity=64,
use_double_buffer=False, def net(self, inputs, is_infer=False):
iterable=False) input_data = inputs[0]
label_income = inputs[1]
self._data_var.extend([input_data, label_income, label_marital]) label_marital = inputs[2]
# f_{i}(x) = activation(W_{i} * x + b), where activation is ReLU according to the paper # f_{i}(x) = activation(W_{i} * x + b), where activation is ReLU according to the paper
expert_outputs = [] expert_outputs = []
for i in range(0, expert_num): for i in range(0, self.expert_num):
expert_output = fluid.layers.fc( expert_output = fluid.layers.fc(
input=input_data, input=input_data,
size=expert_size, size=self.expert_size,
act='relu', act='relu',
bias_attr=fluid.ParamAttr(learning_rate=1.0), bias_attr=fluid.ParamAttr(learning_rate=1.0),
name='expert_' + str(i)) name='expert_' + str(i))
expert_outputs.append(expert_output) expert_outputs.append(expert_output)
expert_concat = fluid.layers.concat(expert_outputs, axis=1) expert_concat = fluid.layers.concat(expert_outputs, axis=1)
expert_concat = fluid.layers.reshape(expert_concat, expert_concat = fluid.layers.reshape(
[-1, expert_num, expert_size]) expert_concat, [-1, self.expert_num, self.expert_size])
# g^{k}(x) = activation(W_{gk} * x + b), where activation is softmax according to the paper # g^{k}(x) = activation(W_{gk} * x + b), where activation is softmax according to the paper
output_layers = [] output_layers = []
for i in range(0, gate_num): for i in range(0, self.gate_num):
cur_gate = fluid.layers.fc( cur_gate = fluid.layers.fc(
input=input_data, input=input_data,
size=expert_num, size=self.expert_num,
act='softmax', act='softmax',
bias_attr=fluid.ParamAttr(learning_rate=1.0), bias_attr=fluid.ParamAttr(learning_rate=1.0),
name='gate_' + str(i)) name='gate_' + str(i))
...@@ -78,7 +76,7 @@ class Model(ModelBase): ...@@ -78,7 +76,7 @@ class Model(ModelBase):
cur_gate_expert = fluid.layers.reduce_sum(cur_gate_expert, dim=1) cur_gate_expert = fluid.layers.reduce_sum(cur_gate_expert, dim=1)
# Build tower layer # Build tower layer
cur_tower = fluid.layers.fc(input=cur_gate_expert, cur_tower = fluid.layers.fc(input=cur_gate_expert,
size=tower_size, size=self.tower_size,
act='relu', act='relu',
name='task_layer_' + str(i)) name='task_layer_' + str(i))
out = fluid.layers.fc(input=cur_tower, out = fluid.layers.fc(input=cur_tower,
...@@ -127,8 +125,5 @@ class Model(ModelBase): ...@@ -127,8 +125,5 @@ class Model(ModelBase):
self._metrics["AUC_marital"] = auc_marital self._metrics["AUC_marital"] = auc_marital
self._metrics["BATCH_AUC_marital"] = batch_auc_2 self._metrics["BATCH_AUC_marital"] = batch_auc_2
def train_net(self):
self.MMOE()
def infer_net(self): def infer_net(self):
self.MMOE(is_infer=True) pass
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册