未验证 提交 59bd510f 编写于 作者: K kinghuin 提交者: GitHub

add Plato (#799)

上级 3ceffb4a
## 概述
PLATO2是一个超大规模生成式对话系统模型。它承袭了PLATO隐变量进行回复多样化生成的特性,能够就开放域话题进行流畅深入的聊天。据公开数据,其效果超越了Google 于2020年2月份发布的 Meena和Facebook AI Research于2020年4月份发布的Blender的效果。plato2_en_base包含310M参数,可用于一键预测对话回复,该Module仅支持使用GPU预测,不支持CPU。
<p align="center">
<img src="https://image.jiqizhixin.com/uploads/editor/65107b78-0259-4121-b8c5-a090f9d3175b/640.png" hspace='10'/> <br />
</p>
更多详情参考论文[PLATO-2: Towards Building an Open-Domain Chatbot via Curriculum Learning](https://arxiv.org/abs/2006.16779)
## 命令行预测
```shell
$ hub run plato2_en_base --input_text="Hello, how are you" --use_gpu
```
## API
```python
def generate(texts):
```
预测API,输入对话上下文,输出机器回复。
**参数**
* texts (list\[str\] or str): 如果不在交互模式中,texts应为list,每个元素为一次对话的上下文,上下文应包含人类和机器人的对话内容,不同角色之间的聊天用分隔符"\t"进行分割;例如[["Hello\thi, nice to meet you\tnice to meet you"]]。这个输入中包含1次对话,机器人回复了"hi, nice to meet you"后人类回复“nice to meet you”,现在轮到机器人回复了。如果在交互模式中,texts应为str,模型将自动构建它的上下文。
**返回**
* results (list\[str\]): 每个元素为相应对话中机器人的新回复。
**代码示例**
```python
import paddlehub as hub
module = hub.Module(name="plato2_en_base")
test_texts = ["Hello","Hello\thi, nice to meet you\tnice to meet you"]
results = module.generate(texts=test_texts)
for result in results:
print(result)
```
```python
def interactive_mode(max_turn =6):
```
进入交互模式。交互模式中,generate接口的texts将支持字符串类型。
**参数**
* max_turn (int): 模型能记忆的对话轮次,当max_turn = 1时,模型只能记住当前对话,无法获知之前的对话内容。
**代码示例**
```python
import paddlehub as hub
module = hub.Module(name="plato2_en_base")
with module.interactive_mode(max_turn=6):
while True:
human_utterance = input("[Human]: ").strip()
robot_utterance = module.generate(human_utterance)
print("[Bot]: %s"%robot_utterance[0])
```
## 服务部署
PaddleHub Serving 可以部署在线服务。
### 第一步:启动PaddleHub Serving
运行启动命令:
```shell
$ hub serving start -m plato2_en_base -p 8866
```
这样就完成了一个服务化API的部署,默认端口号为8866。
**NOTE:** 在启动服务之前,请设置CUDA\_VISIBLE\_DEVICES环境变量。
### 第二步:发送预测请求
方式1: 自定义脚本发送对话信息
配置好服务端,以下数行代码即可实现发送预测请求,获取预测结果
```python
import requests
import json
# 发送HTTP请求
data = {'texts':["Hello","Hello\thi, nice to meet you\tnice to meet you"]}
headers = {"Content-type": "application/json"}
url = "http://127.0.0.1:8866/predict/plato2_en_base"
r = requests.post(url=url, headers=headers, data=json.dumps(data))
# 保存结果
results = r.json()["results"]
for result in results:
print(result)
```
方式2: 通过交互式客户端进入交互模式
您可以执行以下客户端脚本进入交互模式:
```python
import requests
import json
ADDR = "127.0.0.1" # Your serving address
PORT = 8866 # Your serving port
MAX_TURN = 6 # The maximum dialogue turns
headers = {"Content-type": "application/json"}
url = "http://%s:%s/predict/plato2_en_base" % (ADDR, PORT)
context = []
while True:
user_utt = input("[Human]: ").strip()
if user_utt == "[NEXT]":
context = ""
print("Restart")
else:
context.append(user_utt)
data = {'texts': ["\t".join(context[-MAX_TURN:])]}
r = requests.post(url=url, headers=headers, data=json.dumps(data))
bot_response = r.json()["results"][0]
print("[Bot]: %s"%bot_response)
context.append(bot_response)
```
## 查看代码
https://github.com/PaddlePaddle/Knover
### 依赖
paddlepaddle >= 1.8.2
paddlehub >= 1.7.0
## 更新历史
* 1.0.0
初始发布
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Define model."""
from plato2_en_base.models.model_base import Model
MODEL_REGISTRY = {}
__all__ = [
"MODEL_REGISTRY", "register_model", "create_model", "add_cmdline_args"
]
def register_model(name):
"""
Register a new model class.
"""
def __wrapped__(cls):
if name in MODEL_REGISTRY:
raise ValueError(f"Cannot register duplicate model ({name})")
if not issubclass(cls, Model):
raise ValueError(
f"Model ({name}: {cls.__name__}) must extend Model")
MODEL_REGISTRY[name] = cls
return cls
return __wrapped__
def create_model(args, place) -> Model:
"""
Create a model.
"""
return MODEL_REGISTRY[args.model](args, place)
def add_cmdline_args(parser):
""" Add cmdline argument of Model. """
group = parser.add_argument_group("Model")
# Model
group.add_argument("--model", type=str, required=True)
# Config
group.add_argument("--config_path", type=str, required=True)
# Model related.
args, _ = parser.parse_known_args()
if args.model not in MODEL_REGISTRY:
raise ValueError(f"Unknown model type: {args.model}")
MODEL_REGISTRY[args.model].add_cmdline_args(parser)
return group
import plato2_en_base.models.nsp_model
import plato2_en_base.models.plato
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Generator class"""
import numpy as np
import paddle.fluid.layers as layers
from plato2_en_base.utils.args import str2bool
class Generator(object):
"""
Generator class
Use generator in inference phase.
"""
@classmethod
def add_cmdline_args(cls, parser):
"""Add cmdline argurments."""
group = parser.add_argument_group("Generator")
group.add_argument("--min_dec_len", type=int, default=1)
group.add_argument("--max_dec_len", type=int, default=64)
group.add_argument(
"--decoding_strategy",
type=str,
default="topk_sampling",
choices=["beam_search", "topk_sampling", "topp_sampling"])
group.add_argument("--temperature", type=float, default=1.)
group.add_argument("--ignore_unk", type=str2bool, default=True)
# multi sampling
group.add_argument("--num_samples", type=int, default=None)
# top-k sampling
group.add_argument("--topk", type=int, default=10)
# top-p sampling
group.add_argument("--topp", type=float, default=0.9)
# beam search
group.add_argument("--beam_size", type=int, default=10)
group.add_argument("--length_average", type=str2bool, default=True)
group.add_argument("--length_penalty", type=float, default=0.0)
return group
def __init__(self, args):
self.min_dec_len = args.min_dec_len
self.max_dec_len = args.max_dec_len
self.eos_id = args.eos_id
self.unk_id = args.unk_id
self.mask_id = args.mask_id
self.vocab_size = args.vocab_size
# model related
# basic settings
self.decoding_strategy = args.decoding_strategy
self.ignore_unk = args.ignore_unk
self.continuous_position = args.continuous_position
self.temperature = args.temperature
# reranking
self.num_samples = args.num_samples
# top-k sampling
self.topk = args.topk
# top-p sampling
self.topp = args.topp
# beam search
self.beam_size = args.beam_size
self.length_penalty = args.length_penalty
self.length_average = args.length_average
return
def inference(self, model, inputs, outputs):
"""
Run inference.
Args:
inputs(dict): Its key is input name(str) and its value is a Variable.
model(object): A generate model. Need to implement `_generation_network` and `_calc_logits`.
Returns:
dict(str:Variable): Its key is output name(str) and its value is a Variable.
"""
# prepare while loop
max_len = layers.fill_constant(
shape=[1], dtype="int64", value=self.max_dec_len, force_cpu=True)
min_len = layers.fill_constant(
shape=[1], dtype="int64", value=self.min_dec_len, force_cpu=True)
step_idx = layers.fill_constant(
shape=[1], dtype="int64", value=0, force_cpu=True)
ids = layers.array_write(
layers.reshape(inputs["tgt_ids"], (-1, 1)), step_idx)
pos_biases = layers.array_write(
layers.reshape(inputs["tgt_pos"], (-1, 1)), step_idx)
scores = layers.array_write(inputs["init_score"], step_idx)
tgt_generation_mask = layers.array_write(inputs["tgt_generation_mask"],
step_idx)
parent_idx = inputs["parent_idx"]
if self.decoding_strategy == "beam_search":
beam_size = self.beam_size
else:
beam_size = 1
eos_penalty = np.zeros(self.vocab_size, dtype="float32")
eos_penalty[self.eos_id] = -1e9
eos_penalty = layers.assign(eos_penalty)
token_penalty = np.zeros(self.vocab_size, dtype="float32")
token_penalty[self.unk_id] = -1e9
if self.mask_id >= 0:
token_penalty[self.mask_id] = -1e9
token_penalty = layers.assign(token_penalty)
# start while loop
cond = layers.less_than(x=step_idx, y=max_len)
while_op = layers.While(cond)
with while_op.block():
pre_ids = layers.array_read(array=ids, i=step_idx)
pre_ids = layers.reshape(pre_ids, (-1, 1, 1), inplace=True)
pre_scores = layers.array_read(array=scores, i=step_idx)
pos_bias = layers.array_read(array=pos_biases, i=step_idx)
pos_bias = layers.gather(input=pos_bias, index=parent_idx)
tmp_tgt_generation_mask = layers.array_read(
tgt_generation_mask, i=step_idx)
dtype = tmp_tgt_generation_mask.dtype
append_mask = layers.fill_constant_batch_size_like(
input=pre_ids, value=1.0, shape=[-1, 1, 1], dtype=dtype)
tmp_tgt_generation_mask = layers.concat(
[tmp_tgt_generation_mask, append_mask], axis=2)
pre_mask = tmp_tgt_generation_mask = layers.gather(
input=tmp_tgt_generation_mask, index=parent_idx)
pre_sent = layers.fill_constant_batch_size_like(
input=pre_mask, value=1, shape=[-1, 1, 1], dtype=pre_ids.dtype)
if self.continuous_position:
pre_pos = layers.elementwise_mul(
x=layers.fill_constant_batch_size_like(
input=pre_mask,
value=1,
shape=[-1, 1, 1],
dtype=pre_ids.dtype),
y=step_idx,
axis=0) + pos_bias
else:
pre_pos = layers.elementwise_mul(
x=layers.fill_constant_batch_size_like(
input=pre_mask,
value=1,
shape=[-1, 1, 1],
dtype=pre_ids.dtype),
y=step_idx,
axis=0)
dec_out, _ = model._generation_network(
token_ids=pre_ids,
type_ids=pre_sent,
pos_ids=pre_pos,
generation_mask=tmp_tgt_generation_mask,
gather_idx=parent_idx)
logits = model._calc_logits(dec_out)
# ignore unk and mask token
if self.ignore_unk:
logits = layers.elementwise_add(logits, token_penalty, axis=1)
# min dec length
min_len_cond = layers.less_than(x=step_idx, y=min_len)
def min_len_penalty():
"""Plus minimum length penalty."""
return layers.elementwise_add(logits, eos_penalty, axis=1)
def no_penalty():
"""No penalty."""
return logits
logits = layers.case([(min_len_cond, min_len_penalty)],
default=no_penalty)
# get probs
probs = layers.softmax(logits / self.temperature)
if self.decoding_strategy == "beam_search":
topk_scores, topk_indices = layers.topk(
input=probs, k=beam_size)
else:
if self.decoding_strategy.startswith("sampling"):
sampling_ids = layers.sampling_id(probs, dtype="int")
elif self.decoding_strategy.startswith("topk_sampling"):
topk_probs, _ = layers.topk(input=probs, k=self.topk)
ge_cond = layers.cast(
layers.greater_equal(
probs, layers.unsqueeze(topk_probs[:, -1], [1])),
"float32")
old_probs = probs
probs = probs * ge_cond / layers.reduce_sum(
topk_probs, dim=-1, keep_dim=True)
sampling_ids = layers.sampling_id(probs, dtype="int")
probs = old_probs
elif self.decoding_strategy.startswith("topp_sampling"):
sorted_probs, sorted_idx = layers.argsort(
probs, descending=True)
cum_sorted_probs = layers.cumsum(
sorted_probs, axis=1, exclusive=True)
lt_cond = layers.cast(
layers.less_than(
cum_sorted_probs,
layers.fill_constant_batch_size_like(
cum_sorted_probs, cum_sorted_probs.shape,
cum_sorted_probs.dtype, self.topp)), "float32")
old_probs = probs
candidate_probs = sorted_probs * lt_cond
probs = candidate_probs / layers.reduce_sum(
candidate_probs, dim=-1, keep_dim=True)
sampling_ids = layers.sampling_id(probs, dtype="int")
sampling_ids = layers.index_sample(
sorted_idx, layers.unsqueeze(sampling_ids, [1]))
sampling_ids = layers.squeeze(sampling_ids, [1])
probs = old_probs
else:
raise ValueError(self.decoding_strategy)
sampling_scores = layers.one_hot(
layers.unsqueeze(sampling_ids, [1]), probs.shape[1])
sampling_scores = sampling_scores * probs - (
1 - sampling_scores) * 1e3
topk_scores, topk_indices = layers.topk(
input=sampling_scores, k=1)
pre_len = layers.cast(step_idx, "float32")
layers.increment(x=step_idx, value=1.0, in_place=True)
cur_len = layers.cast(step_idx, "float32")
# update scores
if self.length_average:
accu_scores = layers.elementwise_add(
x=layers.log(topk_scores), y=pre_scores * pre_len,
axis=0) / cur_len
elif self.length_penalty > 0:
pre_lp = layers.pow((5 + pre_len) / 6, self.length_penalty)
cur_lp = layers.pow((5 + cur_len) / 6, self.length_penalty)
accu_scores = layers.elementwise_add(
x=layers.log(topk_scores), y=pre_scores * pre_lp,
axis=0) / cur_lp
else:
accu_scores = layers.elementwise_add(
x=layers.log(topk_scores), y=pre_scores, axis=0)
topk_indices = layers.lod_reset(topk_indices, pre_ids)
accu_scores = layers.lod_reset(accu_scores, pre_ids)
selected_ids, selected_scores, gather_idx = layers.beam_search(
pre_ids=pre_ids,
pre_scores=pre_scores,
ids=topk_indices,
scores=accu_scores,
beam_size=beam_size,
end_id=self.eos_id,
return_parent_idx=True)
layers.array_write(selected_ids, i=step_idx, array=ids)
layers.array_write(selected_scores, i=step_idx, array=scores)
layers.array_write(pre_mask, i=step_idx, array=tgt_generation_mask)
layers.array_write(pos_bias, i=step_idx, array=pos_biases)
layers.assign(gather_idx, parent_idx)
length_cond = layers.less_than(x=step_idx, y=max_len)
finish_cond = layers.logical_not(layers.is_empty(x=selected_ids))
layers.logical_and(x=length_cond, y=finish_cond, out=cond)
finished_ids, finished_scores = layers.beam_search_decode(
ids, scores, beam_size=beam_size, end_id=self.eos_id)
predictions = {
"finished_ids": finished_ids,
"finished_scores": finished_scores,
"token_ids": inputs["token_ids"],
"data_id": inputs["data_id"]
}
return predictions
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Model base."""
from abc import abstractmethod, ABC
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.collective import fleet, DistributedStrategy
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
import paddle.fluid.layers as layers
from plato2_en_base.models.optimizer import AdamW
from plato2_en_base.utils import init_pretraining_params, init_checkpoint, to_lodtensor
from plato2_en_base.utils.args import str2bool
class Model(ABC):
"""
Basic model wrapper for paddle.
"""
@classmethod
def add_cmdline_args(cls, parser):
"""Add cmdline argurments."""
group = parser.add_argument_group("Model")
# Init checkpoint
group.add_argument("--init_checkpoint", type=str, default="")
group.add_argument("--init_pretraining_params", type=str, default="")
# Optimizer
group.add_argument(
"-lr",
"--learning_rate",
type=float,
default=1e-5,
help="The learning rate for optimizer.")
group.add_argument(
"--warmup_steps", type=int, default=0, help="The warmup steps.")
group.add_argument(
"--weight_decay",
type=float,
default=0.0,
help="The weight decay for optimizer.")
group.add_argument(
"--max_grad_norm",
type=float,
default=.1,
help="The maximum norm of gradient.")
group.add_argument("--use_recompute", type=str2bool, default=False)
group.add_argument("--use_amp", type=str2bool, default=False)
group.add_argument("--amp_loss_scaling", type=float, default=12800)
return group
def __init__(self, args, place):
self.place = place
self.exe = fluid.Executor(place)
self.init_checkpoint = args.init_checkpoint
self.init_pretraining_params = args.init_pretraining_params
self.learning_rate = args.learning_rate
self.warmup_steps = args.warmup_steps
self.weight_decay = args.weight_decay
self.max_grad_norm = args.max_grad_norm
self.is_distributed = args.is_distributed
self.use_recompute = args.use_recompute
self.use_amp = args.use_amp
self.amp_loss_scaling = args.amp_loss_scaling
self.run_infer = args.get("run_infer", False)
self.batch_size = args.get("batch_size", 1)
self._build_programs()
return
def _build_programs(self):
"""
Build programs.
Build train_program, eval_program and inference_program. Only use in static graph mode.
"""
if self.run_infer:
self.startup_program = fluid.Program()
# build infer program
self.infer_program = fluid.Program()
with fluid.program_guard(self.infer_program, self.startup_program):
with fluid.unique_name.guard():
self.infer_feed_dict = inputs = self._get_feed_dict(
is_infer=True)
outputs = self.forward(inputs, is_infer=True)
predictions = self.infer(inputs, outputs)
self.infer_fetch_dict = predictions
self.infer_program = self.infer_program.clone(for_test=True)
self.program = self.infer_program
else:
if self.is_distributed:
exec_strategy = fluid.ExecutionStrategy()
exec_strategy.use_experimental_executor = True
exec_strategy.num_threads = 4
exec_strategy.num_iteration_per_drop_scope = 1
dist_strategy = DistributedStrategy()
dist_strategy.exec_strategy = exec_strategy
dist_strategy.nccl_comm_num = 1
dist_strategy.fuse_all_reduce_ops = True
if self.use_recompute:
dist_strategy.forward_recompute = True
dist_strategy.enable_sequential_execution = True
if self.use_amp:
dist_strategy.use_amp = True
dist_strategy.amp_loss_scaling = self.amp_loss_scaling
self.dist_strategy = dist_strategy
self.startup_program = fluid.Program()
# build train program
self.train_program = fluid.Program()
with fluid.program_guard(self.train_program, self.startup_program):
with fluid.unique_name.guard():
self.feed_dict = inputs = self._get_feed_dict()
outputs = self.forward(inputs)
if self.is_distributed and self.use_recompute:
self.dist_strategy.recompute_checkpoints = outputs[
"checkpoints"]
metrics, statistics = self.get_metrics_and_statistics(
inputs, outputs)
# build eval program
self.eval_program = self.train_program.clone(for_test=True)
self.eval_fetch_dict = {**metrics, **statistics}
scheduled_lr = self.optimize(metrics)
metrics["scheduled_lr"] = scheduled_lr
self.train_fetch_dict = metrics
self.program = self.train_program
if self.is_distributed:
self.train_program = fleet.main_program
self.exe.run(self.startup_program)
if self.init_pretraining_params != "":
init_pretraining_params(self.exe, self.init_pretraining_params,
self.program)
elif self.init_checkpoint != "":
init_checkpoint(self.exe, self.init_checkpoint, self.program)
return
def load(self, model_dir, is_checkpoint=False):
"""
Load persistables or parameters.
"""
# TODO: support dygraph.
if is_checkpoint:
init_checkpoint(self.exe, model_dir, self.program)
else:
init_pretraining_params(self.exe, model_dir, self.program)
return
def save(self, model_dir, is_checkpoint=False):
"""
Save persistables or parameters.
"""
# TODO: support dygraph.
if is_checkpoint:
fluid.io.save_persistables(self.exe, model_dir, self.program)
else:
fluid.io.save_params(self.exe, model_dir, self.program)
return
@abstractmethod
def _get_feed_dict(self, is_infer=False):
"""
Return input feed list.
"""
pass
def _get_feed(self, inputs, is_infer=False):
"""
Convert `inputs` into model's feed data format.
"""
if isinstance(inputs, list):
# return list direclty which is used in `get_data_loader`.
return inputs
for k in inputs:
if isinstance(inputs[k], list):
inputs[k] = to_lodtensor(inputs[k], self.place)
return inputs
def get_data_loader(self, generator=None, is_infer=False):
"""
Return DataLoader.
If generator is not `None`, the data loader set it as the batch generator.
"""
# TODO: support dygraph.
if is_infer:
feed_name_list, feed_list = zip(*self.infer_feed_dict.items())
else:
feed_name_list, feed_list = zip(*self.feed_dict.items())
loader = fluid.io.DataLoader.from_generator(
feed_list=feed_list,
capacity=64,
use_double_buffer=True,
iterable=True)
if generator is not None:
def __wrapper__():
for batch in generator():
batch = self._get_feed(batch)
batch = [
batch[name] for name in feed_name_list if name in batch
]
yield batch
loader.set_batch_generator(__wrapper__, self.place)
return loader
@abstractmethod
def forward(self, inputs, is_infer=False):
"""
Run model main forward.
"""
pass
@abstractmethod
def get_metrics_and_statistics(self, inputs, outputs):
"""
Get metrics and statistics.
"""
pass
@abstractmethod
def infer(self, inputs, outputs):
"""
Run model inference.
"""
pass
def optimize(self, metrics):
"""
Optimize the model by metrics(mainly `metrics["loss"]`).
"""
# TODO: support dygraph
if self.warmup_steps > 0:
scheduled_lr = layers.learning_rate_scheduler.noam_decay(
1 / (self.warmup_steps * (self.learning_rate**2)),
self.warmup_steps)
else:
scheduled_lr = layers.create_global_var(
name=fluid.unique_name.generate("learning_rate"),
shape=[1],
value=self.learning_rate,
dtype="float32",
persistable=True)
grad_clip = fluid.clip.GradientClipByGlobalNorm(self.max_grad_norm)
self.optimizer = AdamW(
learning_rate=scheduled_lr,
grad_clip=grad_clip,
weight_decay=self.weight_decay)
if self.is_distributed:
self.optimizer = fleet.distributed_optimizer(
self.optimizer, strategy=self.dist_strategy)
self.optimizer.minimize(metrics["loss"])
return scheduled_lr
def _execute(self, program, feed, fetch_dict, **kwargs):
"""
Execute program.
"""
fetch_list = [var.name for var in fetch_dict.values()]
fetch_vars = self.exe.run(program, feed, fetch_list, **kwargs)
return dict(zip(fetch_dict.keys(), fetch_vars))
def train_step(self, inputs):
"""
Run one training step.
"""
# TODO: support dygraph.
return self._execute(
self.train_program,
self._get_feed(inputs),
self.train_fetch_dict,
use_program_cache=True)
def eval_step(self, inputs):
"""
Run one evaluation step.
"""
# TODO: support dygraph.
return self._execute(self.eval_program, self._get_feed(inputs),
self.eval_fetch_dict)
def infer_step(self, inputs):
"""
Run one inference step.
"""
# TODO: support dygraph.
return self._execute(self.infer_program,
self._get_feed(inputs, is_infer=True),
self.infer_fetch_dict)
def save_inference_model(self, inference_model_path):
"""
Save the inference model.
"""
feed_list = [var.name for var in self.infer_feed_dict.values()]
fetch_list = list(self.infer_fetch_dict.values())
fluid.io.save_inference_model(inference_model_path, feed_list,
fetch_list, self.exe, self.infer_program)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""NSP model."""
import paddle.fluid as fluid
import paddle.fluid.layers as layers
from . import register_model
from .model_base import Model
from .unified_transformer import UnifiedTransformer
@register_model("NSPModel")
class NSPModel(UnifiedTransformer):
"""NSP model."""
def _get_feed_dict(self, is_infer=False):
"""
Get the feed list of the model.
Args:
is_infer(bool): True if running inference.
Returns:
list(Variable): The feed list.
list(str): The name of each Variable in feed list.
"""
feed_dict = {}
feed_dict["token_ids"] = layers.data(
name="token_ids", shape=[-1, self.max_seq_len, 1], dtype="int64")
feed_dict["type_ids"] = layers.data(
name="type_ids", shape=[-1, self.max_seq_len, 1], dtype="int64")
feed_dict["pos_ids"] = layers.data(
name="pos_ids", shape=[-1, self.max_seq_len, 1], dtype="int64")
feed_dict["attention_mask"] = layers.data(
name="attention_mask",
shape=[-1, self.max_seq_len, self.max_seq_len],
dtype=self.dtype)
feed_dict["label_pos"] = layers.data(
name="label_pos", shape=[-1, 1], dtype="int64")
if not is_infer:
feed_dict["label"] = layers.data(
name="label", shape=[-1, 1], dtype="int64")
feed_dict["tgt_label"] = layers.data(
name="tgt_ids", shape=[-1, 1], dtype="int64")
feed_dict["tgt_pos"] = layers.data(
name="tgt_pos", shape=[-1, 1], dtype="int64")
feed_dict["data_id"] = layers.data(
name="data_id", shape=[-1, 1], dtype="int64")
return feed_dict
def _get_feed(self, inputs, is_infer=False):
return Model._get_feed(self, inputs, is_infer)
def forward(self, inputs, is_infer=False):
outputs = {}
self.generation_caches = None
outputs["enc_out"], self.checkpoints = self._generation_network(
token_ids=inputs["token_ids"],
type_ids=inputs["type_ids"],
pos_ids=inputs["pos_ids"],
generation_mask=inputs["attention_mask"])
return outputs
def _get_metrics(self, inputs, outputs):
metrics = {}
fc_out = self._calc_logits(outputs["enc_out"], inputs["tgt_pos"])
lm_loss = layers.softmax_with_cross_entropy(
logits=fc_out, label=inputs["tgt_pos"])
need_cal = layers.not_equal(
inputs["tgt_label"],
layers.fill_constant(shape=[1], dtype="int64", value=1))
need_cal = layers.cast(need_cal, self.dtype)
mean_lm_loss = layers.reduce_sum(
lm_loss * need_cal) / (layers.reduce_sum(need_cal) + 1e-10)
pooled_out = self._get_pooled_output(outputs["enc_out"],
inputs["label_pos"])
nsp_fc_out = layers.fc(
input=pooled_out,
size=2,
param_attr=fluid.ParamAttr(
name="next_sent_fc.w_0", initializer=self.param_initializer),
bias_attr="next_sent_fc.b_0")
nsp_loss, nsp_softmax = layers.softmax_with_cross_entropy(
logits=nsp_fc_out, label=inputs["label"], return_softmax=True)
nsp_acc = layers.accuracy(nsp_softmax, inputs["label"])
mean_nsp_loss = layers.mean(nsp_loss)
metrics["loss"] = mean_lm_loss + mean_nsp_loss
metrics["lm_loss"] = mean_lm_loss
metrics["nsp_loss"] = mean_nsp_loss
metrics["nsp_acc"] = nsp_acc
return metrics
def infer(self, inputs, outputs):
pooled_out = self._get_pooled_output(outputs["enc_out"],
inputs["label_pos"])
nsp_fc_out = layers.fc(
input=pooled_out,
size=2,
param_attr=fluid.ParamAttr(
name="next_sent_fc.w_0", initializer=self.param_initializer),
bias_attr="next_sent_fc.b_0")
scores = layers.softmax(nsp_fc_out)
predictions = {"scores": scores, "data_id": inputs["data_id"]}
return predictions
def infer_step(self, inputs):
return Model.infer_step(self, inputs)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Optimizer."""
import re
import paddle.fluid as fluid
import paddle.fluid.layers as layers
class AdamW(fluid.optimizer.AdamOptimizer):
"""AdamW object for dygraph"""
def __init__(self, *args, **kwargs):
weight_decay = kwargs.pop('weight_decay', None)
var_name_to_exclude = kwargs.pop(
'var_name_to_exclude', '.*layer_norm_scale|.*layer_norm_bias|.*b_0')
super(AdamW, self).__init__(*args, **kwargs)
self.wd = weight_decay
self.pat = re.compile(var_name_to_exclude)
def apply_optimize(self, loss, startup_program, params_grads):
"""Update params with weight decay."""
super(AdamW, self).apply_optimize(loss, startup_program, params_grads)
for p, g in params_grads:
if not self.pat.match(p.name):
layers.assign(p * (1. - self.wd * self._learning_rate), p)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Plato model."""
import numpy as np
import paddle.fluid as fluid
import paddle.fluid.layers as layers
from . import register_model
from .model_base import Model
from .unified_transformer import UnifiedTransformer
from .transformer_block import encoder, pre_process_layer
from plato2_en_base.utils import repeat_array_or_tensor
from plato2_en_base.utils.args import str2bool
from .generator import Generator
@register_model("Plato")
class Plato(UnifiedTransformer):
"""Plato model."""
@classmethod
def add_cmdline_args(cls, parser):
"""Add cmdline argurments."""
group = UnifiedTransformer.add_cmdline_args(parser)
group.add_argument("--use_bow", type=str2bool, default=True)
group.add_argument("--use_entropy", type=str2bool, default=False)
return group
def __init__(self, args, place):
# latent related
self.mask_id = args.mask_id
self.latent_type_size = args.latent_type_size
self.latent_emb_name = "latent_embedding"
self.use_bow = args.use_bow
self.use_entropy = args.use_entropy
super(Plato, self).__init__(args, place)
def _get_feed_dict(self, is_infer=False):
feed_dict = {}
feed_dict["token_ids"] = layers.data(
name="token_ids", shape=[-1, self.max_seq_len, 1], dtype="int64")
feed_dict["type_ids"] = layers.data(
name="type_ids", shape=[-1, self.max_seq_len, 1], dtype="int64")
feed_dict["pos_ids"] = layers.data(
name="pos_ids", shape=[-1, self.max_seq_len, 1], dtype="int64")
if not is_infer:
feed_dict["recognition_mask"] = layers.data(
name="recognition_mask",
shape=[-1, self.max_seq_len + 1, self.max_seq_len + 1],
dtype=self.dtype)
feed_dict["generation_mask"] = layers.data(
name="generation_mask",
shape=[-1, self.max_seq_len + 1, self.max_seq_len + 1],
dtype=self.dtype)
if is_infer:
feed_dict["tgt_ids"] = layers.data(
name="tgt_ids",
shape=[-1, self.max_seq_len, 1],
dtype="int64",
lod_level=2)
feed_dict["tgt_pos"] = layers.data(
name="tgt_pos",
shape=[-1, self.max_seq_len, 1],
dtype="int64",
lod_level=2)
feed_dict["init_score"] = layers.data(
name="init_score", shape=[-1, 1], dtype="float32", lod_level=1)
feed_dict["parent_idx"] = layers.data(
name="parent_idx", shape=[-1], dtype="int64")
feed_dict["tgt_generation_mask"] = layers.data(
name="tgt_generation_mask",
shape=[-1, 1, self.max_seq_len + 1],
dtype="float32")
feed_dict["latent_id"] = layers.data(
name="latent_id", shape=[-1, 1], dtype="int64")
else:
feed_dict["tgt_label"] = layers.data(
name="tgt_label", shape=[-1, 1], dtype="int64")
feed_dict["tgt_pos"] = layers.data(
name="tgt_pos", shape=[-1, 1], dtype="int64")
if self.use_bow:
feed_dict["bow_label"] = layers.data(
name="bow_label", shape=[-1, 1], dtype="int64")
feed_dict["bow_pos"] = layers.data(
name="bow_pos", shape=[-1, 1], dtype="int64")
feed_dict["data_id"] = layers.data(
name="data_id", shape=[-1, 1], dtype="int64")
return feed_dict
def _recognition_network(self, token_ids, type_ids, pos_ids,
recognition_mask):
mask_id = layers.fill_constant_batch_size_like(
input=token_ids,
shape=[-1, 1, 1],
value=self.mask_id,
dtype="int64")
mask_emb = layers.embedding(
input=mask_id,
size=[self.vocab_size, self.emb_size],
dtype=self.dtype,
param_attr=fluid.ParamAttr(
name=self.token_emb_name, initializer=self.param_initializer))
emb_out, n_head_self_attn_mask = self._gen_input(
token_ids, type_ids, pos_ids, recognition_mask, aux_emb=mask_emb)
recognition_out, checkpoints = self._encode(emb_out,
n_head_self_attn_mask)
recognition_feat = layers.slice(
input=recognition_out, axes=[1], starts=[0], ends=[1])
recognition_feat = layers.fc(
input=recognition_feat,
size=self.hidden_size,
act="tanh",
param_attr=fluid.ParamAttr(
name="recognition_fc.w_0", initializer=self.param_initializer),
bias_attr="recognition_fc.b_0")
logits = layers.fc(
input=recognition_feat,
size=self.latent_type_size,
param_attr=fluid.ParamAttr(
name=self.latent_emb_name, initializer=self.param_initializer),
bias_attr="recognition_bias")
return logits, checkpoints
def _gumbel_softmax(self, logits, tau=0.67, eps=1e-10):
u = layers.uniform_random_batch_size_like(
logits, shape=[-1, self.latent_type_size], min=0.0, max=1.0)
u.stop_gradient = True
gumbel = 0.0 - layers.log(eps - layers.log(u + eps))
y = logits + gumbel
return layers.softmax(y / tau)
def forward(self, inputs, is_infer=False):
"""
Run model main forward.
"""
outputs = {}
if is_infer:
self.generation_caches = [{
"k":
layers.fill_constant_batch_size_like(
input=inputs["token_ids"],
shape=[-1, 0, self.d_key * self.n_head],
dtype=self.dtype,
value=0),
"v":
layers.fill_constant_batch_size_like(
input=inputs["token_ids"],
shape=[-1, 0, self.d_value * self.n_head],
dtype=self.dtype,
value=0),
} for i in range(self.n_layer)]
else:
self.generation_caches = None
latent_embeddings = layers.create_parameter(
shape=[self.emb_size, self.latent_type_size],
dtype=self.dtype,
attr=fluid.ParamAttr(
name=self.latent_emb_name, initializer=self.param_initializer))
if is_infer:
latent_id = inputs["latent_id"]
weights = layers.one_hot(latent_id, self.latent_type_size)
else:
logits, recognition_checkpoints = self._recognition_network(
token_ids=inputs["token_ids"],
type_ids=inputs["type_ids"],
pos_ids=inputs["pos_ids"],
recognition_mask=inputs["recognition_mask"],
)
outputs["post_probs"] = layers.softmax(logits)
weights = self._gumbel_softmax(logits)
outputs["checkpoints"] = recognition_checkpoints
latent_emb = layers.matmul(
x=weights, y=latent_embeddings, transpose_y=True)
outputs["enc_out"], generation_checkpoints = self._generation_network(
token_ids=inputs["token_ids"],
type_ids=inputs["type_ids"],
pos_ids=inputs["pos_ids"],
generation_mask=inputs["generation_mask"],
aux_emb=layers.unsqueeze(latent_emb, axes=[1]),
gather_idx=inputs.get("parent_idx", None),
)
if not is_infer:
outputs["checkpoints"].extend(generation_checkpoints)
return outputs
def _calc_bow_logits(self, enc_out, checkpoints, bow_pos):
"""Get the logits of generation."""
bow_feat = layers.slice(input=enc_out, axes=[1], starts=[0], ends=[1])
bow_feat = layers.reshape(x=bow_feat, shape=[-1, self.hidden_size])
bow_pos = layers.cast(x=bow_pos, dtype="int32")
bow_feat = layers.gather(input=bow_feat, index=bow_pos)
bow_trans_feat = layers.fc(
input=bow_feat,
size=self.emb_size,
act=self.hidden_act,
param_attr=fluid.ParamAttr(
name="bow_trans_fc.w_0", initializer=self.param_initializer),
bias_attr=fluid.ParamAttr(name="bow_trans_fc.b_0"))
bow_trans_feat = pre_process_layer(
bow_trans_feat, self.post_cls_cmd, name="bow_trans")
checkpoints.append(bow_trans_feat)
if self.weight_sharing:
fc_out = layers.matmul(
x=bow_trans_feat,
y=fluid.default_main_program().global_block().var(
self.token_emb_name),
transpose_y=True)
if self.cls_bias:
fc_out += layers.create_parameter(
shape=[self.vocab_size],
dtype=self.dtype,
attr=fluid.ParamAttr(name="bow_out_fc.b_0"),
is_bias=True)
else:
bow_out_bias_attr = fluid.ParamAttr(
name="bow_out_fc.b_0") if self.cls_bias else False
fc_out = layers.fc(
input=bow_trans_feat,
size=self.vocab_size,
param_attr=fluid.ParamAttr(
name="bow_out_fc.w_0", initializer=self.param_initializer),
bias_attr=bow_out_bias_attr)
return fc_out
def _get_metrics(self, inputs, outputs):
metrics = super(Plato, self)._get_metrics(inputs, outputs)
if self.use_bow:
fc_out = self._calc_bow_logits(
outputs["enc_out"], outputs["checkpoints"], inputs["bow_pos"])
bow_loss = layers.softmax_with_cross_entropy(
logits=fc_out, label=inputs["bow_label"])
mean_bow_loss = layers.mean(bow_loss)
metrics["token_bow_loss"] = mean_bow_loss
metrics["loss"] = metrics["loss"] + mean_bow_loss
entropy_loss = layers.reduce_sum(
outputs["post_probs"] * layers.log(outputs["post_probs"]), dim=1)
mean_entropy_loss = layers.mean(entropy_loss)
metrics["entropy_loss"] = mean_entropy_loss
if self.use_entropy:
metrics["loss"] = metrics["loss"] + mean_entropy_loss
return metrics
def infer_step(self, inputs):
"""
Run one inference step.
"""
if self.do_generation:
batch_size = len(inputs["data_id"])
new_bsz = batch_size * self.latent_type_size
inputs = {
name: repeat_array_or_tensor(array_or_tensor, self.place,
self.latent_type_size)
for name, array_or_tensor in inputs.items()
}
# Add latent_id
inputs["latent_id"] = np.array([
i for i in range(self.latent_type_size)
for _ in range(batch_size)
],
dtype="int64").reshape([-1, 1])
return super(Plato, self).infer_step(inputs)
else:
return self._execute(self.infer_program,
self._get_feed(inputs, is_infer=True),
self.infer_fetch_dict)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Transformer block."""
from functools import partial
import paddle.fluid as fluid
import paddle.fluid.layers as layers
def multi_head_attention(queries,
keys,
values,
attn_bias,
d_key,
d_value,
d_model,
n_head=1,
dropout_rate=0.,
cache=None,
gather_idx=None,
store=False,
param_initializer=None,
name="multi_head_att"):
"""
Multi-Head Attention. Note that attn_bias is added to the logit before
computing softmax activiation to mask certain selected positions so that
they will not considered in attention weights.
"""
keys = queries if keys is None else keys
values = keys if values is None else values
if not (len(queries.shape) == len(keys.shape) == len(values.shape) == 3):
raise ValueError(
"Inputs: quries, keys and values should all be 3-D tensors.")
def __compute_qkv(queries, keys, values, n_head, d_key, d_value):
"""
Add linear projection to queries, keys, and values.
"""
q = layers.fc(
input=queries,
size=d_key * n_head,
num_flatten_dims=2,
param_attr=fluid.ParamAttr(
name=name + "_query_fc.w_0", initializer=param_initializer),
bias_attr=name + "_query_fc.b_0")
k = layers.fc(
input=keys,
size=d_key * n_head,
num_flatten_dims=2,
param_attr=fluid.ParamAttr(
name=name + "_key_fc.w_0", initializer=param_initializer),
bias_attr=name + "_key_fc.b_0")
v = layers.fc(
input=values,
size=d_value * n_head,
num_flatten_dims=2,
param_attr=fluid.ParamAttr(
name=name + "_value_fc.w_0", initializer=param_initializer),
bias_attr=name + "_value_fc.b_0")
return q, k, v
def __split_heads(x, n_head):
"""
Reshape the last dimension of inpunt tensor x so that it becomes two
dimensions and then transpose. Specifically, input a tensor with shape
[bs, max_sequence_length, n_head * hidden_dim] then output a tensor
with shape [bs, n_head, max_sequence_length, hidden_dim].
"""
hidden_size = x.shape[-1]
# The value 0 in shape attr means copying the corresponding dimension
# size of the input as the output dimension size.
reshaped = layers.reshape(
x=x, shape=[0, 0, n_head, hidden_size // n_head], inplace=True)
# permuate the dimensions into:
# [batch_size, n_head, max_sequence_len, hidden_size_per_head]
return layers.transpose(x=reshaped, perm=[0, 2, 1, 3])
def __combine_heads(x):
"""
Transpose and then reshape the last two dimensions of inpunt tensor x
so that it becomes one dimension, which is reverse to __split_heads.
"""
if len(x.shape) == 3: return x
if len(x.shape) != 4:
raise ValueError("Input(x) should be a 4-D Tensor.")
trans_x = layers.transpose(x, perm=[0, 2, 1, 3])
# The value 0 in shape attr means copying the corresponding dimension
# size of the input as the output dimension size.
return layers.reshape(
x=trans_x,
shape=[0, 0, trans_x.shape[2] * trans_x.shape[3]],
inplace=True)
def scaled_dot_product_attention(q, k, v, attn_bias, d_key, dropout_rate):
"""
Scaled Dot-Product Attention
"""
scaled_q = layers.scale(x=q, scale=d_key**-0.5)
product = layers.matmul(x=scaled_q, y=k, transpose_y=True)
if attn_bias:
product += attn_bias
weights = layers.softmax(product, use_cudnn=True)
if dropout_rate:
weights = layers.dropout(
weights,
dropout_prob=dropout_rate,
dropout_implementation="upscale_in_train",
is_test=False)
out = layers.matmul(weights, v)
return out
q, k, v = __compute_qkv(queries, keys, values, n_head, d_key, d_value)
if cache is not None: # use cache and concat time steps
# Since the inplace reshape in __split_heads changes the shape of k and
# v, which is the cache input for next time step, reshape the cache
# input from the previous time step first.
cache_k, cache_v = cache["k"], cache["v"]
select_k = layers.gather(cache_k, index=gather_idx)
select_v = layers.gather(cache_v, index=gather_idx)
select_k = layers.reshape(select_k, shape=[0, 0, d_key * n_head])
select_v = layers.reshape(select_v, shape=[0, 0, d_value * n_head])
if store:
k = layers.concat([select_k, k], axis=1)
v = layers.concat([select_v, v], axis=1)
layers.assign(k, cache["k"])
layers.assign(v, cache["v"])
else:
#k = select_k
#v = select_v
tmp_k = layers.concat([select_k, k[:, :1]], axis=1)
tmp_v = layers.concat([select_v, v[:, :1]], axis=1)
layers.assign(tmp_k, cache["k"])
layers.assign(tmp_v, cache["v"])
k = layers.concat([select_k, k], axis=1)
v = layers.concat([select_v, v], axis=1)
q = __split_heads(q, n_head)
k = __split_heads(k, n_head)
v = __split_heads(v, n_head)
ctx_multiheads = scaled_dot_product_attention(q, k, v, attn_bias, d_key,
dropout_rate)
out = __combine_heads(ctx_multiheads)
# Project back to the model size.
proj_out = layers.fc(
input=out,
size=d_model,
num_flatten_dims=2,
param_attr=fluid.ParamAttr(
name=name + "_output_fc.w_0", initializer=param_initializer),
bias_attr=name + "_output_fc.b_0")
return proj_out
def positionwise_feed_forward(x,
d_inner_hid,
d_hid,
dropout_rate,
hidden_act,
param_initializer=None,
name="ffn"):
"""
Position-wise Feed-Forward Networks.
This module consists of two linear transformations with a ReLU activation
in between, which is applied to each position separately and identically.
"""
hidden = layers.fc(
input=x,
size=d_inner_hid,
num_flatten_dims=2,
act=hidden_act,
param_attr=fluid.ParamAttr(
name=name + "_fc_0.w_0", initializer=param_initializer),
bias_attr=name + "_fc_0.b_0")
if dropout_rate:
hidden = layers.dropout(
hidden,
dropout_prob=dropout_rate,
dropout_implementation="upscale_in_train",
is_test=False)
out = layers.fc(
input=hidden,
size=d_hid,
num_flatten_dims=2,
param_attr=fluid.ParamAttr(
name=name + "_fc_1.w_0", initializer=param_initializer),
bias_attr=name + "_fc_1.b_0")
return out
def pre_post_process_layer(prev_out,
out,
process_cmd,
dropout_rate=0.,
epsilon=1e-5,
name=""):
"""
Add residual connection, layer normalization and droput to the out tensor
optionally according to the value of process_cmd.
This will be used before or after multi-head attention and position-wise
feed-forward networks.
"""
for cmd in process_cmd:
if cmd == "a": # add residual connection
out = out + prev_out if prev_out else out
elif cmd == "n": # add layer normalization
out = layers.layer_norm(
out,
begin_norm_axis=len(out.shape) - 1,
param_attr=fluid.ParamAttr(
name=name + "_layer_norm_scale",
initializer=fluid.initializer.Constant(1.)),
bias_attr=fluid.ParamAttr(
name=name + "_layer_norm_bias",
initializer=fluid.initializer.Constant(0.)),
epsilon=epsilon)
elif cmd == "d": # add dropout
if dropout_rate:
out = layers.dropout(
out,
dropout_prob=dropout_rate,
dropout_implementation="upscale_in_train",
is_test=False)
return out
pre_process_layer = partial(pre_post_process_layer, None)
post_process_layer = pre_post_process_layer
def encoder_layer(input,
attn_bias,
n_head,
d_key,
d_value,
d_model,
d_inner_hid,
prepostprocess_dropout,
attention_dropout,
relu_dropout,
hidden_act,
preprocess_cmd="n",
postprocess_cmd="da",
param_initializer=None,
name="",
epsilon=1e-5,
cache=None,
gather_idx=None,
store=False):
"""
The encoder layers that can be stacked to form a deep encoder.
This module consits of a multi-head (self) attention followed by
position-wise feed-forward networks and both the two components companied
with the pre_process_layer / post_process_layer to add residual connection,
layer normalization and droput.
"""
attn_output = multi_head_attention(
pre_process_layer(
input,
preprocess_cmd,
prepostprocess_dropout,
epsilon=epsilon,
name=name + "_pre_att"),
None,
None,
attn_bias,
d_key,
d_value,
d_model,
n_head,
attention_dropout,
param_initializer=param_initializer,
name=name + "_multi_head_att",
cache=cache,
gather_idx=gather_idx,
store=store)
attn_output = post_process_layer(
input,
attn_output,
postprocess_cmd,
prepostprocess_dropout,
name=name + "_post_att",
epsilon=epsilon)
ffd_output = positionwise_feed_forward(
pre_process_layer(
attn_output,
preprocess_cmd,
prepostprocess_dropout,
epsilon=epsilon,
name=name + "_pre_ffn"),
d_inner_hid,
d_model,
relu_dropout,
hidden_act,
param_initializer=param_initializer,
name=name + "_ffn")
ffd_output = post_process_layer(
attn_output,
ffd_output,
postprocess_cmd,
prepostprocess_dropout,
name=name + "_post_ffn",
epsilon=epsilon)
return ffd_output, [attn_output, ffd_output]
def encoder(enc_input,
attn_bias,
n_layer,
n_head,
d_key,
d_value,
d_model,
d_inner_hid,
prepostprocess_dropout,
attention_dropout,
relu_dropout,
hidden_act,
preprocess_cmd="n",
postprocess_cmd="da",
param_initializer=None,
name="",
epsilon=1e-5,
n_layer_per_block=1,
param_share="normal",
caches=None,
gather_idx=None,
store=False):
"""
The encoder is composed of a stack of identical layers returned by calling
encoder_layer.
"""
checkpoints = []
names = []
if param_share == "inner_share":
for _ in range(n_layer // n_layer_per_block):
for i in range(n_layer_per_block):
names.append(name + "_layer_" + str(i))
else:
for i in range(n_layer // n_layer_per_block):
for _ in range(n_layer_per_block):
names.append(name + "_layer_" + str(i))
for i in range(n_layer):
enc_output, cps = encoder_layer(
enc_input,
attn_bias,
n_head,
d_key,
d_value,
d_model,
d_inner_hid,
prepostprocess_dropout,
attention_dropout,
relu_dropout,
hidden_act,
preprocess_cmd,
postprocess_cmd,
param_initializer=param_initializer,
epsilon=epsilon,
name=names[i],
cache=caches[i] if caches is not None else None,
gather_idx=gather_idx,
store=store)
checkpoints.extend(cps)
enc_input = enc_output
enc_output = pre_process_layer(
enc_output,
preprocess_cmd,
prepostprocess_dropout,
name="post_encoder",
epsilon=epsilon)
return enc_output, checkpoints
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Unified Transformer model."""
import numpy as np
import paddle.fluid as fluid
import paddle.fluid.layers as layers
from . import register_model
from .model_base import Model
from .transformer_block import encoder, pre_process_layer
from plato2_en_base.utils.args import str2bool
from plato2_en_base.utils import repeat_array_or_tensor, slice_array_or_tensor
from .generator import Generator
@register_model("UnifiedTransformer")
class UnifiedTransformer(Model):
"""Unified Transformer"""
@classmethod
def add_cmdline_args(cls, parser):
"""Add cmdline argurments."""
group = Model.add_cmdline_args(parser)
group.add_argument("--max_seq_len", type=int, default=256)
group.add_argument("--weight_sharing", type=str2bool, default=True)
group.add_argument("--mem_efficient", type=str2bool, default=False)
Generator.add_cmdline_args(parser)
return group
def __init__(self, args, place):
self.max_seq_len = args.max_seq_len
self.emb_size = args.emb_size or args.hidden_size
self.hidden_size = args.hidden_size
self.n_layer = args.num_hidden_layers
self.n_head = args.num_attention_heads
self.d_key = args.get("key_size", self.hidden_size // self.n_head)
self.d_value = args.get("value_size", self.hidden_size // self.n_head)
self.inner_hidden_size = args.get("inner_hidden_size",
self.hidden_size * 4)
self.vocab_size = args.vocab_size
self.max_position_seq_len = args.max_position_embeddings
self.type_size = args.type_vocab_size
self.token_emb_name = "word_embedding"
self.type_emb_name = "sent_embedding"
self.pos_emb_name = "pos_embedding"
self.epsilon = args.epsilon or 1e-5
self.n_layer_per_block = args.n_layer_per_block or 1
self.pre_encoder_cmd = args.get("pre_encoder_cmd", "nd")
self.preprocess_cmd = args.get("preprocess_cmd", "")
self.postprocess_cmd = args.get("postprocess_cmd", "dan")
self.post_cls_cmd = args.get("post_cls_cmd", "n")
self.cls_bias = args.get("cls_bias", True)
if self.hidden_size != self.emb_size:
self.emb_mapping_in = True
else:
self.emb_mapping_in = args.get("emb_mapping_in", False)
self.hidden_act = args.hidden_act
self.prepostprocess_dropout = args.hidden_dropout_prob
self.attention_dropout = args.attention_probs_dropout_prob
self.weight_sharing = args.weight_sharing
self.mem_efficient = args.mem_efficient
self.dtype = "float32"
# Initialize all weigths by truncated normal initializer, and all biases
# will be initialized by constant zero by default.
self.param_initializer = fluid.initializer.TruncatedNormal(
scale=args.initializer_range)
# task-related
self.generator = Generator(args)
self.do_generation = args.do_generation
super(UnifiedTransformer, self).__init__(args, place)
def _gen_input(self, token_ids, type_ids, pos_ids, input_mask,
aux_emb=None):
token_emb_out = layers.embedding(
input=token_ids,
size=[self.vocab_size, self.emb_size],
dtype=self.dtype,
param_attr=fluid.ParamAttr(
name=self.token_emb_name, initializer=self.param_initializer))
type_emb_out = layers.embedding(
input=type_ids,
size=[self.type_size, self.emb_size],
dtype=self.dtype,
param_attr=fluid.ParamAttr(
name=self.type_emb_name, initializer=self.param_initializer))
pos_emb_out = layers.embedding(
input=pos_ids,
size=[self.max_position_seq_len, self.emb_size],
dtype=self.dtype,
param_attr=fluid.ParamAttr(
name=self.pos_emb_name, initializer=self.param_initializer))
emb_out = token_emb_out + type_emb_out + pos_emb_out
# auxiliary memory embeddings
if aux_emb is not None:
emb_out = layers.concat([aux_emb, emb_out], axis=1)
# post process of embedding
emb_out = pre_process_layer(
emb_out,
self.pre_encoder_cmd,
self.prepostprocess_dropout,
name="pre_encoder",
epsilon=self.epsilon)
if self.emb_mapping_in:
emb_out = layers.fc(
input=emb_out,
num_flatten_dims=2,
size=self.hidden_size,
param_attr=fluid.ParamAttr(
name="emb_hidden_mapping",
initializer=self.param_initializer),
bias_attr="emb_hidden_mapping_bias")
# generate n-head self-attention mask
self_attn_mask = input_mask
self_attn_mask = layers.scale(
x=self_attn_mask, scale=1e4, bias=-1.0, bias_after_scale=False)
n_head_self_attn_mask = layers.stack(
x=[self_attn_mask] * self.n_head, axis=1)
n_head_self_attn_mask.stop_gradient = True
return emb_out, n_head_self_attn_mask
def _get_pooled_output(self, enc_out, pos):
enc_out = layers.reshape(x=enc_out, shape=[-1, self.hidden_size])
pos = layers.cast(x=pos, dtype="int32")
feat = layers.gather(input=enc_out, index=pos)
pooled_out = layers.fc(
input=feat,
size=self.hidden_size,
act="tanh",
param_attr=fluid.ParamAttr(
name="pooled_fc.w_0", initializer=self.param_initializer),
bias_attr="pooled_fc.b_0")
return pooled_out
def _generation_network(self,
token_ids,
type_ids,
pos_ids,
generation_mask,
aux_emb=None,
gather_idx=None):
emb_out, n_head_self_attn_mask = self._gen_input(
token_ids, type_ids, pos_ids, generation_mask, aux_emb=aux_emb)
return self._encode(
emb_out,
n_head_self_attn_mask,
self.generation_caches,
gather_idx=gather_idx)
def _encode(self,
emb_out,
n_head_self_attn_mask,
caches=None,
gather_idx=None):
return encoder(
enc_input=emb_out,
attn_bias=n_head_self_attn_mask,
n_layer=self.n_layer,
n_head=self.n_head,
d_key=self.d_key,
d_value=self.d_value,
d_model=self.hidden_size,
d_inner_hid=self.inner_hidden_size,
prepostprocess_dropout=self.prepostprocess_dropout,
attention_dropout=self.attention_dropout,
relu_dropout=0,
hidden_act=self.hidden_act,
preprocess_cmd=self.preprocess_cmd,
postprocess_cmd=self.postprocess_cmd,
param_initializer=self.param_initializer,
epsilon=self.epsilon,
n_layer_per_block=self.n_layer_per_block,
name="encoder",
caches=caches,
gather_idx=gather_idx,
store=caches is not None)
def _gumbel_softmax(self, logits, tau=0.67, eps=1e-10):
u = layers.uniform_random_batch_size_like(
logits, shape=[-1, self.latent_type_size], min=0.0, max=1.0)
u.stop_gradient = True
gumbel = 0.0 - layers.log(eps - layers.log(u + eps))
y = logits + gumbel
return layers.softmax(y / tau)
def _get_feed_dict(self, is_infer=False):
"""
Get the feed list of the model.
Args:
is_infer(bool): True if running inference.
Returns:
list(Variable): The feed list.
list(str): The name of each Variable in feed list.
"""
feed_dict = {}
feed_dict["token_ids"] = layers.data(
name="token_ids", shape=[-1, self.max_seq_len, 1], dtype="int64")
feed_dict["type_ids"] = layers.data(
name="type_ids", shape=[-1, self.max_seq_len, 1], dtype="int64")
feed_dict["pos_ids"] = layers.data(
name="pos_ids", shape=[-1, self.max_seq_len, 1], dtype="int64")
feed_dict["generation_mask"] = layers.data(
name="generation_mask",
shape=[-1, self.max_seq_len, self.max_seq_len],
dtype=self.dtype)
if is_infer:
feed_dict["tgt_ids"] = layers.data(
name="tgt_ids",
shape=[-1, self.max_seq_len, 1],
dtype="int64",
lod_level=2)
feed_dict["tgt_pos"] = layers.data(
name="tgt_pos",
shape=[-1, self.max_seq_len, 1],
dtype="int64",
lod_level=2)
feed_dict["init_score"] = layers.data(
name="init_score", shape=[-1, 1], dtype="float32", lod_level=1)
feed_dict["parent_idx"] = layers.data(
name="parent_idx", shape=[-1], dtype="int64")
feed_dict["tgt_generation_mask"] = layers.data(
name="tgt_generation_mask",
shape=[-1, 1, self.max_seq_len],
dtype="float32")
else:
feed_dict["tgt_label"] = layers.data(
name="tgt_label", shape=[-1, 1], dtype="int64")
feed_dict["tgt_pos"] = layers.data(
name="tgt_pos", shape=[-1, 1], dtype="int64")
feed_dict["data_id"] = layers.data(
name="data_id", shape=[-1, 1], dtype="int64")
return feed_dict
def forward(self, inputs, is_infer=False):
"""
Run model main forward.
"""
outputs = {}
if is_infer:
self.generation_caches = [{
"k":
layers.fill_constant_batch_size_like(
input=inputs["token_ids"],
shape=[-1, 0, self.d_key * self.n_head],
dtype=self.dtype,
value=0),
"v":
layers.fill_constant_batch_size_like(
input=inputs["token_ids"],
shape=[-1, 0, self.d_value * self.n_head],
dtype=self.dtype,
value=0),
} for i in range(self.n_layer)]
else:
self.generation_caches = None
outputs["enc_out"], generation_checkpoints = self._generation_network(
token_ids=inputs["token_ids"],
type_ids=inputs["type_ids"],
pos_ids=inputs["pos_ids"],
generation_mask=inputs["generation_mask"],
gather_idx=inputs.get("parent_idx", None))
if not is_infer:
outputs["checkpoints"] = generation_checkpoints
return outputs
def _calc_logits(self, enc_out, checkpoints=None, seq_pos=None):
"""Get the logits of generation."""
enc_out = layers.reshape(x=enc_out, shape=[-1, self.hidden_size])
if seq_pos is not None:
seq_pos = layers.cast(x=seq_pos, dtype="int32")
seq_feat = layers.gather(input=enc_out, index=seq_pos)
else:
seq_feat = enc_out
seq_trans_feat = layers.fc(
input=seq_feat,
size=self.emb_size,
act=self.hidden_act,
param_attr=fluid.ParamAttr(
name="mask_lm_trans_fc.w_0",
initializer=self.param_initializer),
bias_attr=fluid.ParamAttr(name="mask_lm_trans_fc.b_0"))
seq_trans_feat = pre_process_layer(
seq_trans_feat, self.post_cls_cmd, name="mask_lm_trans")
if checkpoints is not None:
checkpoints.append(seq_trans_feat)
if self.weight_sharing:
fc_out = layers.matmul(
x=seq_trans_feat,
y=fluid.default_main_program().global_block().var(
self.token_emb_name),
transpose_y=True)
if self.cls_bias:
fc_out += layers.create_parameter(
shape=[self.vocab_size],
dtype=self.dtype,
attr=fluid.ParamAttr(name="mask_lm_out_fc.b_0"),
is_bias=True)
else:
seq_out_bias_attr = fluid.ParamAttr(
name="mask_lm_out_fc.b_0") if self.cls_bias else False
fc_out = layers.fc(
input=seq_trans_feat,
size=self.vocab_size,
param_attr=fluid.ParamAttr(
name="mask_lm_out_fc.w_0",
initializer=self.param_initializer),
bias_attr=seq_out_bias_attr)
return fc_out
def _get_metrics(self, inputs, outputs):
metrics = {}
fc_out = self._calc_logits(outputs["enc_out"], outputs["checkpoints"],
inputs["tgt_pos"])
tgt_lm_loss = layers.softmax_with_cross_entropy(
logits=fc_out, label=inputs["tgt_label"])
mean_tgt_lm_loss = layers.mean(tgt_lm_loss)
loss = mean_tgt_lm_loss
metrics["token_lm_loss"] = mean_tgt_lm_loss
metrics["loss"] = loss
return metrics
def _get_statistics(self, inputs, outputs):
statistics = {}
if "tgt_label" in inputs:
statistics["tokens_num"] = layers.reduce_sum(
layers.fill_constant_batch_size_like(
input=inputs["tgt_label"],
value=1.0,
shape=[-1],
dtype="int64"))
statistics["batch_size"] = layers.reduce_sum(
layers.fill_constant_batch_size_like(
input=inputs["token_ids"], value=1.0, shape=[-1],
dtype="int64"))
return statistics
def get_metrics_and_statistics(self, inputs, outputs):
"""
Get metrics and statistics.
"""
metrics = self._get_metrics(inputs, outputs)
statistics = self._get_statistics(inputs, outputs)
return metrics, statistics
def infer(self, inputs, outputs):
"""
Run model inference.
"""
if self.do_generation:
return self.generator.inference(self, inputs, outputs)
else:
raise NotImplementedError
def _run_generation(self, inputs):
"""
Run generation.
"""
batch_size = len(inputs["data_id"])
inputs["parent_idx"] = np.array(range(batch_size), dtype="int64")
outputs = self._execute(
self.infer_program,
self._get_feed(inputs, is_infer=True),
self.infer_fetch_dict,
return_numpy=False)
predictions = []
data_id_list = np.array(outputs["data_id"]).reshape(-1).tolist()
token_ids_list = np.array(outputs["token_ids"]).squeeze(2).tolist()
seq_ids = outputs["finished_ids"]
seq_ids_np = np.array(outputs["finished_ids"])
seq_scores_np = np.array(outputs["finished_scores"])
for i, (data_id, token_ids) in enumerate(
zip(data_id_list, token_ids_list)):
start = seq_ids.lod()[0][i]
end = seq_ids.lod()[0][i + 1]
for j in range(start, end):
sub_start = seq_ids.lod()[1][j]
sub_end = seq_ids.lod()[1][j + 1]
info = {}
info["data_id"] = data_id
info["decode_score"] = float(seq_scores_np[sub_end - 1])
info["context_token_ids"] = token_ids
info["response_token_ids"] = seq_ids_np[sub_start:
sub_end].tolist()
predictions.append(info)
return predictions
def infer_step(self, inputs):
"""
Run one inference step.
"""
if self.do_generation:
if self.generator.num_samples:
inputs = {
name: repeat_array_or_tensor(array_or_tensor, self.place,
self.generator.num_samples)
for name, array_or_tensor in inputs.items()
}
if self.mem_efficient:
predictions = []
for idx in range(0, len(inputs["data_id"]), self.batch_size):
part_inputs = {
name: slice_array_or_tensor(array_or_tensor, self.place,
idx, idx + self.batch_size)
for name, array_or_tensor in inputs.items()
}
part_outputs = self._run_generation(part_inputs)
predictions.extend(part_outputs)
else:
predictions = self._run_generation(inputs)
return predictions
else:
return self._execute(self.infer_program,
self._get_feed(inputs, is_infer=True),
self.infer_fetch_dict)
# coding:utf-8
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ast
import os
import json
import sys
import argparse
import contextlib
from collections import namedtuple
import paddle.fluid as fluid
import paddlehub as hub
from paddlehub.module.module import runnable
from paddlehub.module.nlp_module import DataFormatError
from paddlehub.common.logger import logger
from paddlehub.module.module import moduleinfo, serving
import plato2_en_base.models as plato_models
from plato2_en_base.tasks.dialog_generation import DialogGeneration
from plato2_en_base.utils import check_cuda, Timer
from plato2_en_base.utils.args import parse_args
@moduleinfo(
name="plato2_en_base",
version="1.0.0",
summary=
"A novel pre-training model for dialogue generation, incorporated with latent discrete variables for one-to-many relationship modeling.",
author="baidu-nlp",
author_email="",
type="nlp/text_generation",
)
class Plato(hub.NLPPredictionModule):
def _initialize(self):
"""
initialize with the necessary elements
"""
if "CUDA_VISIBLE_DEVICES" not in os.environ:
raise RuntimeError(
"The module only support GPU. Please set the environment variable CUDA_VISIBLE_DEVICES."
)
args = self.setup_args()
self.task = DialogGeneration(args)
self.model = plato_models.create_model(args, fluid.CUDAPlace(0))
self.Example = namedtuple("Example", ["src", "data_id"])
self._interactive_mode = False
def setup_args(self):
"""
Setup arguments.
"""
assets_path = os.path.join(self.directory, "assets")
vocab_path = os.path.join(assets_path, "vocab.txt")
init_pretraining_params = os.path.join(assets_path, "24L", "Plato")
spm_model_file = os.path.join(assets_path, "spm.model")
nsp_inference_model_path = os.path.join(assets_path, "24L", "NSP")
config_path = os.path.join(assets_path, "24L.json")
# ArgumentParser.parse_args use argv[1:], it will drop the first one arg, so the first one in sys.argv should be ""
sys.argv = [
"", "--model", "Plato", "--vocab_path",
"%s" % vocab_path, "--do_lower_case", "False",
"--init_pretraining_params",
"%s" % init_pretraining_params, "--spm_model_file",
"%s" % spm_model_file, "--nsp_inference_model_path",
"%s" % nsp_inference_model_path, "--ranking_score", "nsp_score",
"--do_generation", "True", "--batch_size", "1", "--config_path",
"%s" % config_path
]
parser = argparse.ArgumentParser()
plato_models.add_cmdline_args(parser)
DialogGeneration.add_cmdline_args(parser)
args = parse_args(parser)
args.load(args.config_path, "Model")
args.run_infer = True # only build infer program
return args
@serving
def generate(self, texts):
"""
Get the robot responses of the input texts.
Args:
texts(list or str): If not in the interactive mode, texts should be a list in which every element is the chat context separated with '\t'.
Otherwise, texts shoule be one sentence. The module can get the context automatically.
Returns:
results(list): the robot responses.
"""
if not texts:
return []
if self._interactive_mode:
if isinstance(texts, str):
self.context.append(texts.strip())
texts = [" [SEP] ".join(self.context[-self.max_turn:])]
else:
raise ValueError(
"In the interactive mode, the input data should be a string."
)
elif not isinstance(texts, list):
raise ValueError(
"If not in the interactive mode, the input data should be a list."
)
bot_responses = []
for i, text in enumerate(texts):
example = self.Example(src=text.replace("\t", " [SEP] "), data_id=i)
record = self.task.reader._convert_example_to_record(
example, is_infer=True)
data = self.task.reader._pad_batch_records([record], is_infer=True)
pred = self.task.infer_step(self.model, data)[0] # batch_size is 1
bot_response = pred["response"] # ignore data_id and score
bot_responses.append(bot_response)
if self._interactive_mode:
self.context.append(bot_responses[0].strip())
return bot_responses
@contextlib.contextmanager
def interactive_mode(self, max_turn=6):
"""
Enter the interactive mode.
Args:
max_turn(int): the max dialogue turns. max_turn = 1 means the robot can only remember the last one utterance you have said.
"""
self._interactive_mode = True
self.max_turn = max_turn
self.context = []
yield
self.context = []
self._interactive_mode = False
@runnable
def run_cmd(self, argvs):
"""
Run as a command
"""
self.parser = argparse.ArgumentParser(
description='Run the %s module.' % self.name,
prog='hub run %s' % self.name,
usage='%(prog)s',
add_help=True)
self.arg_input_group = self.parser.add_argument_group(
title="Input options", description="Input data. Required")
self.arg_config_group = self.parser.add_argument_group(
title="Config options",
description=
"Run configuration for controlling module behavior, optional.")
self.add_module_input_arg()
args = self.parser.parse_args(argvs)
try:
input_data = self.check_input_data(args)
except DataFormatError and RuntimeError:
self.parser.print_help()
return None
results = self.generate(texts=input_data)
return results
if __name__ == "__main__":
module = Plato()
for result in module.generate([
"Hello",
"Hello\thi, nice to meet you, my name is tom\tso your name is tom?"
]):
print(result)
with module.interactive_mode(max_turn=3):
while True:
human_utterance = input()
robot_utterance = module.generate(human_utterance)
print("Robot: %s" % robot_utterance[0])
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Dialogue Reader."""
import csv
from collections import namedtuple
from contextlib import contextmanager
import gzip
import os
import numpy as np
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.collective import fleet
from plato2_en_base.utils import pad_batch_data
from plato2_en_base.utils.args import str2bool
from plato2_en_base.utils.masking import mask
import plato2_en_base.utils.tokenization as tokenization
class DialogReader(object):
"""The implement of DialogReader."""
@classmethod
def add_cmdline_args(cls, parser):
"""Add cmdline argurments."""
group = parser.add_argument_group("Reader")
group.add_argument("--max_src_len", type=int, default=128)
group.add_argument("--max_tgt_len", type=int, default=128)
group.add_argument(
"--truncate_first_turn", type=str2bool, default=False)
group.add_argument(
"--file_format",
type=str,
default="file",
choices=["file", "filelist"])
group.add_argument(
"--data_format",
type=str,
default="raw",
choices=["raw", "tokenized", "numerical"])
group.add_argument("--in_tokens", type=str2bool, default=False)
group.add_argument("--batch_size", type=int, default=16)
group.add_argument("--continuous_position", type=str2bool, default=True)
group.add_argument("--random_seed", type=int, default=11)
group.add_argument("--sort_pool_size", type=int, default=2**16)
group = parser.add_argument_group("Tokenizer")
group.add_argument(
"--tokenizer", type=str, default="SentencePieceTokenizer")
args, _ = parser.parse_known_args()
tokenizer_cls = getattr(tokenization, args.tokenizer)
tokenizer_cls.add_cmdline_args(parser)
return group
def __init__(self, args):
tokenizer_cls = getattr(tokenization, args.tokenizer)
self.tokenizer = tokenizer_cls(args)
self.vocab = self.tokenizer.vocab
self.pad_id = args.pad_id = self.vocab["[PAD]"]
self.bos_id = args.bos_id = self.vocab["[CLS]"]
self.eos_id = args.eos_id = self.vocab["[SEP]"]
self.unk_id = args.unk_id = self.vocab["[UNK]"]
self.mask_id = args.mask_id = self.vocab["[MASK]"]
self.vocab_size = args.get("vocab_size", 0)
self.max_src_len = args.max_src_len
self.max_tgt_len = args.max_tgt_len
self.truncate_first_turn = args.truncate_first_turn
self.file_format = args.file_format
self.data_format = args.data_format
self.in_tokens = args.in_tokens
self.batch_size = args.batch_size
self.continuous_position = args.continuous_position
self.sort_pool_size = args.sort_pool_size
# random_seed must be set for data slicing when using multi-gpu
self.global_rng = np.random.RandomState(args.random_seed)
# training progress
self.current_example = 0
self.current_epoch = 0
self.num_examples = 0
# model related
self.fields = ["token_ids", "type_ids", "pos_ids"]
self.num_numerical_fields = len(self.fields)
self.fields += ["tgt_start_idx", "data_id"]
self.sort_key = lambda record: [len(record.token_ids)]
self.Record = namedtuple(
"Record", self.fields, defaults=(None, ) * len(self.fields))
self.features = {}
return
def get_train_progress(self):
"""Gets progress for training phase."""
return self.current_epoch, self.current_file_index, self.total_file
def _convert_example_to_record(self, example, is_infer):
# process src
src_token_ids = []
src_pos_ids = []
# tokenize src
s_token_ids_list = []
for s in example.src.split("[SEP]"):
s = tokenization.convert_to_unicode(s).strip()
if self.data_format == "tokenized":
s_tokens = s.split(" ")
else:
s_tokens = self.tokenizer.tokenize(s)
s_token_ids = self.tokenizer.convert_tokens_to_ids(s_tokens) + [
self.eos_id
]
s_token_ids_list.append(s_token_ids)
# trim src
idx = len(s_token_ids_list) - 1
total_token_num = 1
while idx >= 0:
total_token_num += len(s_token_ids_list[idx])
if total_token_num > self.max_src_len:
if self.truncate_first_turn and idx == 0:
truncated_ids = s_token_ids_list[idx][:self.max_src_len -
total_token_num]
if len(truncated_ids) > 1:
s_token_ids_list[idx] = truncated_ids[:-1] + [
self.eos_id
]
idx -= 1
break
idx -= 1
for i, s_token_ids in enumerate(s_token_ids_list[idx + 1:], idx + 1):
src_token_ids += s_token_ids
src_pos_ids += list(range(1, len(s_token_ids) + 1))
src_token_ids = [self.bos_id] + src_token_ids
src_type_ids = [0] * len(src_token_ids)
src_pos_ids = [0] + src_pos_ids
assert len(src_token_ids) == len(src_type_ids) == len(src_pos_ids), \
"not len(src_token_ids) == len(src_type_ids) == len(src_pos_ids)"
token_ids = src_token_ids
type_ids = src_type_ids
pos_ids = src_pos_ids
tgt_start_idx = len(token_ids)
if not is_infer:
# process tgt
# tokenize tgt
tgt = tokenization.convert_to_unicode(example.tgt).strip()
if self.data_format == "tokenized":
tgt_tokens = tgt.split(" ")
else:
tgt_tokens = self.tokenizer.tokenize(tgt)
tgt_token_ids = self.tokenizer.convert_tokens_to_ids(tgt_tokens)
tgt_token_ids.append(self.eos_id)
# trim tgt
if len(tgt_token_ids) > self.max_tgt_len - 1:
tgt_token_ids = tgt_token_ids[:self.max_tgt_len - 1]
tgt_token_ids = [self.bos_id] + tgt_token_ids
tgt_type_ids = [1] * len(tgt_token_ids)
tgt_pos_ids = list(range(1, len(tgt_token_ids) + 1))
assert len(tgt_token_ids) == len(tgt_type_ids) == len(tgt_pos_ids), \
"not len(tgt_token_ids) == len(tgt_type_ids) == len(tgt_pos_ids)"
token_ids += tgt_token_ids
type_ids += tgt_type_ids
pos_ids += tgt_pos_ids
assert len(token_ids) == len(type_ids) == len(pos_ids), \
"not len(token_ids) == len(type_ids) == len(pos_ids)"
if self.continuous_position:
src_pos_ids = list(range(len(src_token_ids)))
if not is_infer:
tgt_pos_ids = list(range(len(tgt_token_ids)))
pos_ids = list(range(len(token_ids)))
field_values = {
"token_ids": src_token_ids,
"type_ids": src_type_ids,
"pos_ids": src_pos_ids
}
field_values["tgt_start_idx"] = tgt_start_idx
field_values["data_id"] = example.data_id
record = self.Record(**field_values)
return record
def _read_tsv(self, fp, phase, is_infer, delimiter="\t", quotechar=None):
"""Reads a tab separated value file."""
csv.field_size_limit(2**20)
reader = csv.reader(fp, delimiter=delimiter, quotechar=quotechar)
headers = next(reader)
headers.append("data_id")
Example = namedtuple("Example", headers)
for i, line in enumerate(reader):
example = Example(*line, data_id=i)
if is_infer or phase.endswith("test"):
self.features[phase][i] = example
record = self._convert_example_to_record(example, is_infer)
yield record
def _read_numerical_file(self, fp, delimiter=";"):
for i, line in enumerate(fp):
cols = tokenization.convert_to_unicode(line).strip().split(
delimiter)
cols = list(map(lambda x: list(map(int, x.split(" "))), cols))
if len(cols) > self.num_numerical_fields:
cols = cols[:self.num_numerical_fields]
tgt_start_idx = cols[0].index(self.bos_id, 1)
record = self.Record(*cols, tgt_start_idx=tgt_start_idx, data_id=i)
yield record
def _read_file(self, input_file, phase, is_infer):
def __wrapper__():
with open_file(input_file) as fp:
if self.data_format == "numerical":
records = self._read_numerical_file(fp)
else:
records = self._read_tsv(fp, phase, is_infer)
for record in records:
yield record
return __wrapper__
def _read_files(self, filelist, phase, is_infer, shuffle_files):
input_files = open(filelist).readlines()
def __wrapper__():
if shuffle_files:
self.global_rng.shuffle(input_files)
if phase == "train":
self.total_file = len(input_files)
for file_index, input_file in enumerate(input_files, 1):
if phase == "train":
self.current_file_index = file_index
self.current_file = input_file
file_reader = self._read_file(input_file.strip(), phase,
is_infer)
for record in file_reader():
yield record
return __wrapper__
def _batch_reader(self,
reader,
phase=None,
is_infer=False,
sort_pool_size=2**16):
"""Construct a batch reader."""
def update_max_lens(max_lens, record):
"""Update max_lens."""
if max_lens is None:
return self.sort_key(record)
else:
return [
max(max_len, l)
for max_len, l in zip(max_lens, self.sort_key(record))
]
def get_batch(reader):
"""Generate batches from reader."""
batch, max_lens = [], None
for record in reader():
if record is None:
yield batch
batch, max_lens = [], None
continue
self.current_example += 1
max_lens = update_max_lens(max_lens, record)
if self.in_tokens:
to_append = (
len(batch) + 1) * sum(max_lens) <= self.batch_size
else:
to_append = len(batch) < self.batch_size
if to_append:
batch.append(record)
else:
yield batch
batch, max_lens = [record], self.sort_key(record)
if len(batch) > 0:
yield batch
def get_sorted_batch(pool):
"""Generate sorted batches from pool."""
pool = sorted(pool, key=self.sort_key)
batches = []
batch, max_lens = [], None
for record in pool:
self.current_example += 1
max_lens = update_max_lens(max_lens, record)
if self.in_tokens:
to_append = (
len(batch) + 1) * sum(max_lens) <= self.batch_size
else:
to_append = len(batch) < self.batch_size
if to_append:
batch.append(record)
else:
batches.append(batch)
batch, max_lens = [record], self.sort_key(record)
if len(batch) > 0:
batches.append(batch)
self.global_rng.shuffle(batches)
for batch in batches:
yield batch
def __wrapper__():
if sort_pool_size > 0:
pool = []
for record in reader():
pool.append(record)
if len(pool) == sort_pool_size:
for batch in get_sorted_batch(pool):
yield batch
pool = []
if len(pool) > 0:
for batch in get_sorted_batch(pool):
yield batch
else:
for batch in get_batch(reader):
yield batch
return __wrapper__
def _distributed_batch_reader(self,
batch_reader,
num_part,
part_id,
is_test=False):
def __wrapper__():
batches = []
for batch in batch_reader():
batches.append(batch)
if len(batches) == num_part:
yield batches[part_id]
batches = []
if is_test and 0 <= part_id < len(batches):
yield batches[part_id]
return
return __wrapper__
def data_generator(self,
input_file=None,
reader=None,
num_epochs=1,
num_part=1,
part_id=0,
phase=None,
is_infer=False):
"""Data generator."""
def __wrapper__():
if is_infer or phase.endswith("test"):
self.features[phase] = {}
nonlocal reader
if reader is None:
if self.file_format == "filelist":
reader = self._read_files(input_file, phase, is_infer,
not phase.endswith("test"))
else:
if phase == "train":
self.total_file = 1
self.current_file_index = 1
self.current_file = input_file
reader = self._read_file(input_file, phase, is_infer)
batch_reader = self._batch_reader(
reader,
phase,
is_infer,
sort_pool_size=self.sort_pool_size if not is_infer else 0)
if phase == "train":
batch_reader = self._distributed_batch_reader(
batch_reader, num_part, part_id)
elif phase.startswith("distributed"):
batch_reader = self._distributed_batch_reader(
batch_reader, num_part, part_id, is_test=True)
for epoch_index in range(num_epochs):
if phase == "train":
self.current_example = 0
self.current_epoch = epoch_index + 1
for batch in batch_reader():
yield self._pad_batch_records(batch, is_infer)
return __wrapper__
def _gen_self_attn_mask(self,
batch_token_ids,
batch_tgt_start_idx=None,
is_unidirectional=True,
shift_len=0):
max_len = max(map(len, batch_token_ids))
input_mask_data = np.zeros((len(batch_token_ids), max_len + shift_len,
max_len + shift_len))
if is_unidirectional:
for index, mask_data in enumerate(input_mask_data):
start = 0 if batch_tgt_start_idx is None else batch_tgt_start_idx[
index]
end = len(batch_token_ids[index])
mask_data[:end + shift_len, :start + shift_len] = 1.0
# Generate the lower triangular matrix using the slice of matrix
b = np.tril(np.ones([end - start, end - start]), 0)
mask_data[start + shift_len:end + shift_len, start +
shift_len:end + shift_len] = b
else:
for index, token_ids in enumerate(batch_token_ids):
input_mask_data[index, :len(token_ids) +
shift_len, :len(token_ids) + shift_len] = 1.0
return input_mask_data.astype("float32")
def _pad_batch_records(self, batch_records, is_infer):
"""
Padding batch records and construct model's inputs.
"""
batch_size = len(batch_records)
batch = {}
batch_token_ids = [record.token_ids for record in batch_records]
batch_type_ids = [record.type_ids for record in batch_records]
batch_pos_ids = [record.pos_ids for record in batch_records]
batch["token_ids"] = pad_batch_data(batch_token_ids, pad_id=self.pad_id)
batch["type_ids"] = pad_batch_data(batch_type_ids, pad_id=self.pad_id)
batch["pos_ids"] = pad_batch_data(batch_pos_ids, pad_id=self.pad_id)
batch_tgt_start_idx = [record.tgt_start_idx for record in batch_records]
batch["generation_mask"] = self._gen_self_attn_mask(
batch_token_ids, batch_tgt_start_idx=batch_tgt_start_idx)
if is_infer:
tgt_ids = np.array(
[[[self.bos_id]]] * len(batch_token_ids), dtype="int64")
if self.continuous_position:
tgt_pos = np.array(batch_tgt_start_idx, dtype="int64")
else:
tgt_pos = np.zeros_like(batch_tgt_start_idx, dtype="int64")
tgt_pos = tgt_pos.reshape(-1, 1, 1)
batch["init_score"] = np.zeros_like(
tgt_ids, dtype="float32").reshape(-1, 1).tolist()
batch["tgt_ids"] = tgt_ids.tolist()
batch["tgt_pos"] = tgt_pos.tolist()
batch["tgt_generation_mask"] = batch[
"generation_mask"][:, 0:1, :].astype("float32")
else:
batch["tgt_label"], batch["tgt_pos"] = mask(
batch_tokens=batch_token_ids,
vocab_size=self.vocab_size,
sent_b_starts=batch_tgt_start_idx,
is_unidirectional=True)
batch_data_id = [record.data_id for record in batch_records]
batch["data_id"] = np.array(batch_data_id).astype("int64").reshape(
[-1, 1])
return batch
@contextmanager
def open_file(filename):
"""Open file."""
if filename.endswith(".gz"):
fp = gzip.open(filename, "rt")
else:
fp = open(filename)
yield fp
fp.close()
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""NSP Reader."""
from collections import namedtuple
import numpy as np
from plato2_en_base.readers.dialog_reader import DialogReader
from plato2_en_base.utils import pad_batch_data
from plato2_en_base.utils.args import str2bool
from plato2_en_base.utils.masking import mask
class NSPReader(DialogReader):
"""NSP Reader."""
@classmethod
def add_cmdline_args(cls, parser):
"""Add cmdline argurments."""
group = DialogReader.add_cmdline_args(parser)
group.add_argument(
"--attention_style",
type=str,
default="bidirectional",
choices=["bidirectional", "unidirectional"])
group.add_argument(
"--mix_negative_sample", type=str2bool, default=False)
return group
def __init__(self, args):
super(NSPReader, self).__init__(args)
self.fields.append("label")
self.Record = namedtuple(
"Record", self.fields, defaults=(None, ) * len(self.fields))
self.attention_style = args.attention_style
self.mix_negative_sample = args.mix_negative_sample
return
def _convert_example_to_record(self, example, is_infer):
record = super(NSPReader, self)._convert_example_to_record(
example, False)
if "label" in example._fields:
record = record._replace(label=int(example.label))
return record
def _mix_negative_sample(self, reader, neg_pool_size=2**16):
def gen_from_pool(pool):
num_samples = len(pool)
if num_samples == 1:
# only one sample: it is impossible to generate negative sample
yield pool[0]._replace(label=1)
return
self.global_rng.shuffle(pool)
for i in range(num_samples):
pool[i] = pool[i]._replace(label=1)
j = (i + 1) % num_samples
idx_i = pool[i].tgt_start_idx
idx_j = pool[j].tgt_start_idx
field_values = {}
field_values["token_ids"] = pool[i].token_ids[:idx_i] + pool[
j].token_ids[idx_j:]
field_values["type_ids"] = pool[i].type_ids[:idx_i] + pool[
j].type_ids[idx_j:]
field_values["pos_ids"] = list(
range(len(field_values["token_ids"])))
neg_record = self.Record(
**field_values, tgt_start_idx=idx_i, data_id=-1, label=0)
pool.append(neg_record)
assert len(neg_record.token_ids) <= self.max_seq_len
self.global_rng.shuffle(pool)
for record in pool:
yield record
def __wrapper__():
pool = []
for record in reader():
pool.append(record)
if len(pool) == neg_pool_size:
for record in gen_from_pool(pool):
yield record
pool = []
if len(pool) > 0:
for record in gen_from_pool(pool):
yield record
return __wrapper__
def _batch_reader(self,
reader,
phase=None,
is_infer=False,
sort_pool_size=2**16):
if self.mix_negative_sample:
reader = self._mix_negative_sample(reader)
return super(NSPReader, self)._batch_reader(
reader,
phase=phase,
is_infer=is_infer,
sort_pool_size=sort_pool_size)
def _pad_batch_records(self, batch_records, is_infer):
"""
Padding batch records and construct model's inputs.
"""
batch = {}
batch_token_ids = [record.token_ids for record in batch_records]
batch_type_ids = [record.type_ids for record in batch_records]
batch_pos_ids = [record.pos_ids for record in batch_records]
batch_tgt_start_idx = [record.tgt_start_idx for record in batch_records]
batch_label = [record.label for record in batch_records]
if self.attention_style == "unidirectional":
batch["token_ids"] = pad_batch_data(
batch_token_ids, pad_id=self.pad_id)
batch["type_ids"] = pad_batch_data(
batch_type_ids, pad_id=self.pad_id)
batch["pos_ids"] = pad_batch_data(batch_pos_ids, pad_id=self.pad_id)
tgt_label, tgt_pos, label_pos = mask(
batch_tokens=batch_token_ids,
vocab_size=self.vocab_size,
bos_id=self.bos_id,
sent_b_starts=batch_tgt_start_idx,
labels=batch_label,
is_unidirectional=True)
attention_mask = self._gen_self_attn_mask(batch_token_ids,
batch_tgt_start_idx)
else:
batch_mask_token_ids, tgt_label, tgt_pos, label_pos = mask(
batch_tokens=batch_token_ids,
vocab_size=self.vocab_size,
bos_id=self.bos_id,
eos_id=self.eos_id,
mask_id=self.mask_id,
sent_b_starts=batch_tgt_start_idx,
labels=batch_label,
is_unidirectional=False)
if not is_infer:
batch_token_ids = batch_mask_token_ids
batch["token_ids"] = pad_batch_data(
batch_token_ids, pad_id=self.pad_id)
batch["type_ids"] = pad_batch_data(
batch_type_ids, pad_id=self.pad_id)
batch["pos_ids"] = pad_batch_data(batch_pos_ids, pad_id=self.pad_id)
attention_mask = self._gen_self_attn_mask(
batch_token_ids, is_unidirectional=False)
batch["attention_mask"] = attention_mask
batch["label_pos"] = label_pos
if not is_infer:
batch_label = np.array(batch_label).astype("int64").reshape([-1, 1])
batch["label"] = batch_label
batch["tgt_label"] = tgt_label
batch["tgt_pos"] = tgt_pos
batch_data_id = [record.data_id for record in batch_records]
batch["data_id"] = np.array(batch_data_id).astype("int64").reshape(
[-1, 1])
return batch
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Plato Reader."""
import numpy as np
from plato2_en_base.readers.dialog_reader import DialogReader
from plato2_en_base.utils import pad_batch_data
from plato2_en_base.utils.masking import mask
class PlatoReader(DialogReader):
"""The implement of PlatoReader"""
def __init__(self, args):
super(PlatoReader, self).__init__(args)
self.latent_type_size = args.latent_type_size
self.use_bow = args.use_bow
def _pad_batch_records(self, batch_records, is_infer):
"""
Padding batch records and construct model's inputs.
"""
batch = {}
batch_token_ids = [record.token_ids for record in batch_records]
batch_type_ids = [record.type_ids for record in batch_records]
batch_pos_ids = [record.pos_ids for record in batch_records]
batch_tgt_start_idx = [record.tgt_start_idx for record in batch_records]
batch_size = len(batch_token_ids)
# padding
batch["token_ids"] = pad_batch_data(batch_token_ids, pad_id=self.pad_id)
batch["type_ids"] = pad_batch_data(batch_type_ids, pad_id=self.pad_id)
batch["pos_ids"] = pad_batch_data(batch_pos_ids, pad_id=self.pad_id)
batch["generation_mask"] = self._gen_self_attn_mask(
batch_token_ids,
batch_tgt_start_idx=batch_tgt_start_idx,
is_unidirectional=True,
shift_len=1)
if not is_infer:
batch["recognition_mask"] = self._gen_self_attn_mask(
batch_token_ids, is_unidirectional=False, shift_len=1)
if is_infer:
tgt_ids = np.array([[[self.bos_id]]] * batch_size, dtype="int64")
if self.continuous_position:
tgt_pos = np.array(batch_tgt_start_idx, dtype="int64")
else:
tgt_pos = np.zeros_like(batch_tgt_start_idx, dtype="int64")
tgt_pos = tgt_pos.reshape(-1, 1, 1)
batch["init_score"] = np.zeros_like(
tgt_ids, dtype="float32").reshape(-1, 1).tolist()
batch["tgt_ids"] = tgt_ids.tolist()
batch["tgt_pos"] = tgt_pos.tolist()
batch["parent_idx"] = np.array(range(batch_size), dtype="int32")
batch["tgt_generation_mask"] = batch[
"generation_mask"][:, 0:1, :].astype("float32")
else:
mask_return_list = mask(
batch_tokens=batch_token_ids,
vocab_size=self.vocab_size,
sent_b_starts=batch_tgt_start_idx,
is_unidirectional=True,
use_latent=True,
use_bow=self.use_bow)
batch["tgt_label"] = mask_return_list[0]
batch["tgt_pos"] = mask_return_list[1]
if self.use_bow:
batch["bow_label"] = mask_return_list[2]
batch["bow_pos"] = mask_return_list[3]
batch_data_id = [record.data_id for record in batch_records]
batch["data_id"] = np.array(batch_data_id).astype("int64").reshape(
[-1, 1])
return batch
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Define task."""
from .task_base import Task
TASK_REGISTRY = {}
__all__ = ["TASK_REGISTRY", "register_task", "create_task", "add_cmdline_args"]
def register_task(name):
"""
Register a new task class.
"""
def __wrapped__(cls):
if name in TASK_REGISTRY:
raise ValueError(f"Cannot register duplicate task ({name})")
if not issubclass(cls, Task):
raise ValueError(f"Task ({name}: {cls.__name__}) must extend Task")
TASK_REGISTRY[name] = cls
return cls
return __wrapped__
def create_task(args) -> Task:
"""
Create a task.
"""
return TASK_REGISTRY[args.task](args)
def add_cmdline_args(parser):
"""
Add cmdline argument of Task.
"""
group = parser.add_argument_group("Task")
group.add_argument("--task", type=str, required=True)
args, _ = parser.parse_known_args()
if args.task not in TASK_REGISTRY:
raise ValueError(f"Unknown task type: {args.task}")
TASK_REGISTRY[args.task].add_cmdline_args(parser)
return group
import plato2_en_base.tasks.dialog_generation
import plato2_en_base.tasks.next_sentence_prediction
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Dialogue generation task."""
from collections import defaultdict
import math
from plato2_en_base.readers.dialog_reader import DialogReader
from plato2_en_base.readers.plato_reader import PlatoReader
from plato2_en_base.tasks import register_task
from plato2_en_base.tasks.task_base import Task
from plato2_en_base.utils.args import str2bool
from plato2_en_base.utils.inference import create_predictor
def post_process_context(token_ids, reader, merge=True):
"""Post-process the context sequence."""
context = []
utt = []
for tok_id in token_ids[1:]:
if tok_id == reader.eos_id:
utt = reader.tokenizer.convert_ids_to_tokens(utt)
if merge:
utt = reader.tokenizer.merge_subword(utt)
context.append(utt)
utt = []
else:
utt.append(tok_id)
return context
def post_process_response(token_ids, reader, merge=True):
"""
Post-process the decoded sequence. Truncate from the first
<eos> and remove the <bos> and <eos> tokens currently.
"""
eos_pos = len(token_ids)
for i, tok_id in enumerate(token_ids):
if tok_id == reader.eos_id:
eos_pos = i
break
token_ids = token_ids[1:eos_pos]
response = reader.tokenizer.convert_ids_to_tokens(token_ids)
if merge:
response = reader.tokenizer.merge_subword(response)
return token_ids, response
def get_cross_turn_repetition(context, pred_tokens, eos_idx, is_cn=False):
"""Get cross-turn repetition."""
if len(pred_tokens) == 0:
return 1.0
if is_cn:
context = ["".join(utt) for utt in context]
pred_tokens = "".join(pred_tokens)
pred_tri_grams = set()
for i in range(len(pred_tokens) - 2):
tri_gram = tuple(pred_tokens[i:i + 3])
pred_tri_grams.add(tri_gram)
for utt in context:
for i in range(len(utt) - 2):
tri_gram = tuple(utt[i:i + 3])
if tri_gram in pred_tri_grams:
return 1.0
return 0.0
def get_in_turn_repetition(pred, is_cn=False):
"""Get in-turn repetition."""
if len(pred) == 0:
return 1.0
if isinstance(pred[0], str):
pred = [tok.lower() for tok in pred]
if is_cn:
pred = "".join(pred)
tri_grams = set()
for i in range(len(pred) - 2):
tri_gram = tuple(pred[i:i + 3])
if tri_gram in tri_grams:
return 1.0
tri_grams.add(tri_gram)
return 0.0
def get_nsp_score_batch(nsp_predictor, predictions):
"""
Get NSP scores of a batch.
"""
import argparse
from collections import namedtuple
from plato2_en_base.readers.nsp_reader import NSPReader
from plato2_en_base.utils.args import parse_args
from plato2_en_base.tasks.next_sentence_prediction import NextSentencePrediction
parser = argparse.ArgumentParser()
NextSentencePrediction.add_cmdline_args(parser)
parser.add_argument("--num_samples", type=int, default=None)
parser.add_argument("--config_path", type=str, required=True)
parser.add_argument("--mem_efficient", type=str2bool, default=False)
args = parse_args(parser, allow_unknown=True)
args.load(args.config_path)
if not args.mem_efficient:
if args.num_samples:
args.batch_size *= args.num_samples
if args.latent_type_size:
args.batch_size *= args.latent_type_size
args.tokenized_input = True
reader = NSPReader(args)
def __reader__():
headers = ["src", "tgt", "data_id"]
Example = namedtuple("Example", headers)
for i, info in enumerate(predictions):
context = post_process_context(
info["context_token_ids"], reader, merge=False)
context_tokenized_input = " [SEP] ".join(
" ".join(utt) for utt in context)
_, response = post_process_response(
info["response_token_ids"], reader, merge=False)
response_tokenized_input = " ".join(response)
example = Example(
src=context_tokenized_input,
tgt=response_tokenized_input,
data_id=i)
record = reader._convert_example_to_record(example, is_infer=True)
yield record
return
generator = reader.data_generator(
reader=__reader__,
is_infer=True,
phase="test",
)
steps = 0
for data in generator():
outputs = nsp_predictor(data)
for probs, data_id in zip(outputs[0], outputs[-1]):
data_id = data_id[0]
info = predictions[data_id]
info["nsp_score"] = float(probs[1])
return
@register_task("DialogGeneration")
class DialogGeneration(Task):
"""
Define dialogue response generation.
"""
@classmethod
def add_cmdline_args(cls, parser):
"""Add cmdline argurments."""
group = parser.add_argument_group("Task")
group.add_argument("--do_generation", type=str2bool, default=False)
group.add_argument("--is_cn", type=str2bool, default=False)
group.add_argument("--nsp_inference_model_path", type=str, default=None)
group.add_argument(
"--nsp_attention_style", type=str, default="bidirectional")
group.add_argument("--ranking_score", type=str, default="decode_score")
args, _ = parser.parse_known_args()
if args.model == "Plato":
PlatoReader.add_cmdline_args(parser)
else:
DialogReader.add_cmdline_args(parser)
return group
def __init__(self, args):
super(DialogGeneration, self).__init__(args)
self.do_generation = args.do_generation
self.is_cn = args.is_cn
if args.model == "Plato":
self.reader = PlatoReader(args)
else:
self.reader = DialogReader(args)
if args.nsp_inference_model_path:
self.nsp_predictor = create_predictor(args.nsp_inference_model_path,
args.is_distributed)
self.nsp_attention_style = args.nsp_attention_style
else:
self.nsp_predictor = None
self.ranking_score = args.ranking_score
self.max_dec_len = args.max_dec_len
return
def _post_process_generation_output(self, predictions):
"""
Post process generation output.
Calculate repetion, reranking.
"""
for info in predictions:
tokens = post_process_context(info["context_token_ids"],
self.reader)
pred_token_ids, pred_tokens = post_process_response(
info["response_token_ids"], self.reader)
info["context"] = " [SEP] ".join(" ".join(u) for u in tokens)
info["response"] = " ".join(pred_tokens)
info["num_token"] = len(pred_token_ids)
info["cross_turn_repetition"] = get_cross_turn_repetition(
tokens, pred_tokens, self.reader.eos_id, self.is_cn)
info["in_turn_repetition"] = max(
get_in_turn_repetition(pred_tokens, self.is_cn),
get_in_turn_repetition(pred_token_ids))
if self.nsp_predictor is not None:
get_nsp_score_batch(self.nsp_predictor, predictions)
group = defaultdict(list)
for info in predictions:
group[info["data_id"]].append(info)
predictions = []
for data_id in group:
infos = group[data_id]
for info in infos:
info["score"] = info[self.ranking_score]
if self.max_dec_len is not None and info[
"num_token"] >= self.max_dec_len: # not ending
info["score"] -= 1e3
elif info["cross_turn_repetition"] > 0:
info["score"] -= 1e3
elif info["in_turn_repetition"] > 0:
info["score"] -= 1e3
infos = sorted(infos, key=lambda info: -info["score"])
pred = infos[0]
keep_attr = ["data_id", "score", "response"]
pred = {k: pred[k] for k in keep_attr}
predictions.append(pred)
return predictions
def _post_process_scoring_output(self, predictions):
raise NotImplementedError
def _post_process_infer_output(self, predictions):
if self.do_generation:
return self._post_process_generation_output(predictions)
else:
return self._post_process_scoring_output(predictions)
def merge_mertrics_and_statistics(self, outputs, part_outputs):
"""
Merge two evaulation output.
"""
if outputs is None:
return part_outputs
if part_outputs is None:
return outputs
batch_size = outputs.pop("batch_size")
tokens_num = outputs.pop("tokens_num")
part_batch_size = part_outputs.pop("batch_size")
part_tokens_num = part_outputs.pop("tokens_num")
new_outputs = {
"batch_size": batch_size + part_batch_size,
"tokens_num": tokens_num + part_tokens_num
}
for k in outputs:
if k.startswith("token_"):
new_outputs[k] = (outputs[k] * tokens_num + part_outputs[k] *
part_tokens_num) / new_outputs["tokens_num"]
else:
new_outputs[k] = (outputs[k] * batch_size + part_outputs[k] *
part_batch_size) / new_outputs["batch_size"]
return new_outputs
def get_metrics(self, outputs):
"""
Get metrics.
"""
if outputs is None:
raise ValueError("metrics is None")
outputs = dict(outputs)
outputs.pop("batch_size", None)
outputs.pop("tokens_num", None)
metrics = {}
for k in outputs:
if k.startswith("token_"):
metrics[k[6:]] = outputs[k]
else:
metrics[k] = outputs[k]
if k == "token_lm_loss":
metrics["ppl"] = math.exp(outputs[k])
return metrics
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Next sentence prediction task."""
from plato2_en_base.readers.nsp_reader import NSPReader
from plato2_en_base.tasks import register_task
from plato2_en_base.tasks.task_base import Task
from plato2_en_base.utils.args import str2bool
@register_task("NextSentencePrediction")
class NextSentencePrediction(Task):
"""
Define dialogue response generation.
"""
@classmethod
def add_cmdline_args(cls, parser):
"""Add cmdline argurments."""
group = NSPReader.add_cmdline_args(parser)
return group
def __init__(self, args):
super(NextSentencePrediction, self).__init__(args)
self.reader = NSPReader(args)
return
def _post_process_infer_output(self, predictions):
predictions = [{
"data_id": data_id.tolist()[0],
"score": score.tolist()[1]
} for data_id, score in zip(predictions["data_id"],
predictions["scores"])]
return predictions
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Task base."""
from abc import abstractmethod, ABC
from plato2_en_base.models.model_base import Model
class Task(ABC):
"""
Basic task.
"""
def __init__(self, args):
return
def train_step(self, model: Model, inputs):
"""Run one training step."""
outputs = model.train_step(inputs)
outputs = {k: v.tolist()[0] for k, v in outputs.items()}
return outputs
def eval_step(self, model: Model, inputs):
"""Run one evaluation step"""
outputs = model.eval_step(inputs)
outputs = {k: v.tolist()[0] for k, v in outputs.items()}
return outputs
def infer_step(self, model: Model, inputs):
"""Run one inference step."""
predictions = model.infer_step(inputs)
outputs = self._post_process_infer_output(predictions)
return outputs
def _post_process_infer_output(self, predictions):
"""
Post-process inference output.
"""
return predictions
def merge_mertrics_and_statistics(self, outputs, part_outputs):
"""
Merge metrics and statistics.
"""
if outputs is None:
return part_outputs
if part_outputs is None:
return outputs
batch_size = outputs.pop("batch_size")
part_batch_size = part_outputs.pop("batch_size")
new_outputs = {
"batch_size": batch_size + part_batch_size,
}
for k in outputs:
new_outputs[k] = (outputs[k] * batch_size + part_outputs[k] *
part_batch_size) / new_outputs["batch_size"]
return new_outputs
def get_metrics(self, outputs):
"""
Get metrics.
"""
if outputs is None:
raise ValueError("metrics is None")
outputs = dict(outputs)
# pop statistics
outputs.pop("batch_size", None)
return outputs
def get_data_loader(self, model, *args, is_infer=False, **kwargs):
generator = self.reader.data_generator(
*args, is_infer=is_infer, **kwargs)
return model.get_data_loader(generator, is_infer)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utils."""
from itertools import chain
import os
import time
import sys
import numpy as np
import paddle.fluid as fluid
def to_lodtensor(data, place):
"""Convert data to LoDTensor."""
if place is None:
return data
lengths = []
while isinstance(data[0], list):
lengths.append(list(map(len, data)))
data = [x for xs in data for x in xs]
if isinstance(data[0], float):
data = np.array(data, dtype="float32")
else:
data = np.array(data, dtype="int64")
data_tensor = fluid.LoDTensor()
data_tensor.set(data, place)
data_tensor.set_recursive_sequence_lengths(lengths)
return data_tensor
def pad_batch_data(insts, pad_id=0):
"""Pad the instances to the max sequence length in batch. """
max_len = max(map(len, insts))
inst_data = np.array(
[list(inst) + [pad_id] * (max_len - len(inst)) for inst in insts])
return inst_data.astype("int64").reshape([-1, max_len, 1])
def convert_lodtensor_to_list(tensor):
data = np.array(tensor)
recursive_sequence_lengths = tensor.recursive_sequence_lengths()
recursive_sequence_lengths.reverse()
for i, lengths in enumerate(recursive_sequence_lengths):
shift = 0
new_data = []
for j, l in enumerate(lengths):
new_data.append(data[shift:shift + l])
shift += l
data = new_data
return data
def concatenate_lodtensors(tensors, place):
"""Concatenate LoD tensors."""
data = []
recursive_sequence_lengths = []
for tensor in tensors:
data.append(np.array(tensor))
recursive_sequence_lengths.append(tensor.recursive_sequence_lengths())
data = np.concatenate(data, axis=0)
recursive_sequence_lengths = [
sum(lens, []) for lens in zip(*recursive_sequence_lengths)
]
data_tensor = fluid.LoDTensor()
data_tensor.set(data, place)
data_tensor.set_recursive_sequence_lengths(recursive_sequence_lengths)
assert data_tensor.has_valid_recursive_sequence_lengths()
return data_tensor
def repeat_array_or_tensor(array_or_tensor, place, times):
"""Repeate numpy array or LoD tensor."""
if isinstance(array_or_tensor, fluid.LoDTensor):
data = [np.array(array_or_tensor)] * times
recursive_sequence_lengths = [
array_or_tensor.recursive_sequence_lengths()
] * times
data = np.concatenate(data, axis=0)
recursive_sequence_lengths = [
sum(lens, []) for lens in zip(*recursive_sequence_lengths)
]
data_tensor = fluid.LoDTensor()
data_tensor.set(data, place)
data_tensor.set_recursive_sequence_lengths(recursive_sequence_lengths)
assert data_tensor.has_valid_recursive_sequence_lengths()
return data_tensor
elif isinstance(array_or_tensor, list):
return list(chain(*([array_or_tensor] * times)))
else:
return np.concatenate([array_or_tensor] * times, axis=0)
def slice_array_or_tensor(array_or_tensor, place, begin, end):
"""Repeate numpy array or LoD tensor."""
if isinstance(array_or_tensor, fluid.LoDTensor):
data = convert_lodtensor_to_list(array_or_tensor)
data = data[begin:end]
return to_lodtensor(data, place)
else:
return array_or_tensor[begin:end]
def init_checkpoint(exe, init_checkpoint_path, main_program):
"""Initialize from checkpoint."""
assert os.path.exists(
init_checkpoint_path), "[%s] cann't be found." % init_checkpoint_path
def existed_persitables(var):
"""Whether var is a persistables."""
if not fluid.io.is_persistable(var):
return False
return os.path.exists(os.path.join(init_checkpoint_path, var.name))
fluid.io.load_vars(
exe,
init_checkpoint_path,
main_program=main_program,
predicate=existed_persitables)
print(f"Load model from {init_checkpoint_path}")
def init_pretraining_params(exe, pretraining_params_path, main_program):
"""Only initialize parameters."""
assert os.path.exists(pretraining_params_path
), "[%s] cann't be found." % pretraining_params_path
def existed_params(var):
"""Whether var is a parameter."""
if not isinstance(var, fluid.framework.Parameter):
return False
return os.path.exists(os.path.join(pretraining_params_path, var.name))
fluid.io.load_vars(
exe,
pretraining_params_path,
main_program=main_program,
predicate=existed_params)
print(f"Load pretraining parameters from {pretraining_params_path}.")
return
class Timer(object):
def __init__(self):
self._pass_time = 0
self._start_time = None
return
def start(self):
self._start_time = time.time()
def pause(self):
self._pass_time += time.time() - self._start_time
self._start_time = None
def reset(self):
self._pass_time = 0
@property
def pass_time(self):
if self._start_time is None:
return self._pass_time
else:
return self._pass_time + time.time() - self._start_time
ERROR_MESSAGE = "\nYou can not set use_cuda = True in the model because you are using paddlepaddle-cpu.\n \
Please: 1. Install paddlepaddle-gpu to run your models on GPU or 2. Set use_cuda = False to run models on CPU.\n"
def check_cuda(use_cuda, err=ERROR_MESSAGE):
"""Check CUDA."""
try:
if use_cuda and not fluid.is_compiled_with_cuda():
print(err)
sys.exit(1)
except Exception as e:
pass
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Parse argument."""
import argparse
import json
import sys
import paddle.fluid as fluid
def str2bool(v):
""" Support bool type for argparse. """
if v.lower() in ("yes", "true", "t", "y", "1"):
return True
elif v.lower() in ("no", "false", "f", "n", "0"):
return False
else:
raise argparse.ArgumentTypeError("Unsupported value encountered.")
class Args(dict):
""" Arguments class
Store arguments in training / infer / ... scripts.
"""
def __getattr__(self, name):
if name in self.keys():
return self[name]
for v in self.values():
if isinstance(v, Args):
if name in v:
return v[name]
return None
def get(self, key, default_value=None):
"""Get the value of corresponding key."""
if key in self.keys():
return self[key]
for v in self.values():
if isinstance(v, Args):
if key in v:
return v[key]
return default_value
def __setattr__(self, name, value):
self[name] = value
def save(self, filename):
with open(filename, "w") as fp:
json.dump(self, fp, ensure_ascii=False, indent=4, sort_keys=False)
def load(self, filename, group_name=None):
if group_name is not None:
if group_name not in self:
self[group_name] = Args()
self[group_name].load(filename)
return
with open(filename, "r") as fp:
params_dict = json.load(fp)
for k, v in params_dict.items():
if isinstance(v, dict):
self[k].update(Args(v))
else:
self[k] = v
def parse_args(parser: argparse.ArgumentParser, allow_unknown=False) -> Args:
""" Parse hyper-parameters from cmdline. """
if allow_unknown:
parsed, _ = parser.parse_known_args()
else:
parsed = parser.parse_args()
args = Args()
optional_args = parser._action_groups[1]
for action in optional_args._group_actions[1:]:
arg_name = action.dest
args[arg_name] = getattr(parsed, arg_name)
for group in parser._action_groups[2:]:
group_args = Args()
for action in group._group_actions:
arg_name = action.dest
group_args[arg_name] = getattr(parsed, arg_name)
if len(group_args) > 0:
if group.title in args:
args[group.title].update(group_args)
else:
args[group.title] = group_args
return args
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Inference utils."""
import os
import paddle.fluid as fluid
def create_predictor(inference_model_path, is_distributed=False):
"""Create predictor."""
if is_distributed:
dev_count = fluid.core.get_cuda_device_count()
gpu_id = int(os.getenv("FLAGS_selected_gpus"))
else:
dev_count = 1
gpu_id = 0
place = fluid.CUDAPlace(gpu_id)
exe = fluid.Executor(place)
scope = fluid.Scope()
with fluid.scope_guard(scope):
inference_prog, feed_target_names, fetch_targets = fluid.io.load_inference_model(
inference_model_path, exe)
def __predict__(inputs):
with fluid.scope_guard(scope):
outputs = exe.run(
inference_prog,
feed=inputs,
fetch_list=fetch_targets,
return_numpy=True)
return outputs
return __predict__
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Reader utils."""
import numpy as np
import plato2_en_base.utils
def mask(batch_tokens,
vocab_size,
bos_id=1,
eos_id=2,
mask_id=3,
sent_b_starts=None,
labels=None,
is_unidirectional=False,
use_latent=False,
use_bow=False):
"""
Add mask for batch_tokens, return out, mask_label, mask_pos;
Note: mask_pos responding the batch_tokens after padded;
"""
batch_tokens = np.copy(batch_tokens)
max_len = max(map(len, batch_tokens))
mask_label = []
mask_pos = []
if labels is not None:
label_pos = []
if is_unidirectional:
# unidirectional language model
if use_latent:
max_len += 1
shift_len = 1
else:
shift_len = 0
for sent_index, sent in enumerate(batch_tokens):
sent_b_index = sent_b_starts[
sent_index] if sent_b_starts is not None else 0
need_cal = True
if labels is not None:
label_pos.append(sent_index * max_len + len(sent) - 1 +
shift_len)
if labels[sent_index] == 0:
need_cal = False
mask_label.extend(sent[sent_b_index + 1:])
mask_pos.extend([
sent_index * max_len + i + shift_len
for i in range(sent_b_index,
len(sent) - 1)
])
mask_label = np.array(mask_label).astype("int64").reshape([-1, 1])
mask_pos = np.array(mask_pos).astype("int64").reshape([-1, 1])
return_list = [mask_label, mask_pos]
# latent related (bow label and pos)
if use_latent and use_bow:
bow_label = []
bow_pos = []
for sent_index, sent in enumerate(batch_tokens):
sent_b_index = sent_b_starts[
sent_index] if sent_b_starts is not None else 0
def __filter__(tok_id):
# TODO: exclude [EOS] from bow loss
return True
bow_pos.extend([
sent_index for i in range(sent_b_index + 1, len(sent))
if __filter__(sent[i])
])
bow_label.extend([
sent[i] for i in range(sent_b_index + 1, len(sent))
if __filter__(sent[i])
])
bow_label = np.array(bow_label).astype("int64").reshape([-1, 1])
bow_pos = np.array(bow_pos).astype("int64").reshape([-1, 1])
return_list += [bow_label, bow_pos]
else:
# bidirectional mask language model
total_token_num = sum(map(len, batch_tokens))
prob_mask = np.random.rand(total_token_num)
# TODO: fix replace_ids, include [UNK]
replace_ids = np.random.randint(
3, high=vocab_size, size=total_token_num)
prob_index = 0
for sent_index, sent in enumerate(batch_tokens):
# add pair label position
if labels is not None:
label_pos.append(sent_index * max_len)
# add mask label and position
for token_index, token in enumerate(sent):
if token == eos_id or token == bos_id:
continue
prob = prob_mask[prob_index + token_index]
if prob > 0.15:
continue
elif 0.03 < prob <= 0.15:
# mask
mask_label.append(sent[token_index])
sent[token_index] = mask_id
mask_pos.append(sent_index * max_len + token_index)
elif 0.015 < prob <= 0.03:
# random replace
mask_label.append(sent[token_index])
sent[token_index] = replace_ids[prob_index + token_index]
mask_pos.append(sent_index * max_len + token_index)
else:
# keep the original token
mask_label.append(sent[token_index])
mask_pos.append(sent_index * max_len + token_index)
prob_index += len(sent)
mask_label = np.array(mask_label).astype("int64").reshape([-1, 1])
mask_pos = np.array(mask_pos).astype("int64").reshape([-1, 1])
return_list = [batch_tokens, mask_label, mask_pos]
if labels is not None:
label_pos = np.array(label_pos).astype("int64").reshape([-1, 1])
assert len(labels) == len(label_pos)
return_list.append(label_pos)
return return_list
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tokenization classes."""
import collections
import json
import sentencepiece as spm
import six
import unicodedata
from plato2_en_base.utils.args import str2bool
SPIECE_UNDERLINE = u"▁".encode("utf-8")
def clean_text(text):
"""Performs invalid character removal and whitespace cleanup on text."""
text = text.replace(u"“", u'"')\
.replace(u'”', u'"')\
.replace(u'‘', "'")\
.replace(u'’', u"'")\
.replace(u'—', u'-')
output = []
for char in text:
if _is_control(char):
continue
if _is_whitespace(char):
output.append(" ")
else:
output.append(char)
return "".join(output)
def preprocess_text(inputs, remove_space=True, lower=False):
"""preprocess data by removing extra space and normalize data."""
outputs = inputs
if remove_space:
outputs = " ".join(inputs.strip().split())
outputs = unicodedata.normalize("NFKD", outputs)
outputs = "".join([c for c in outputs if not unicodedata.combining(c)])
if lower:
outputs = outputs.lower()
return outputs
def encode_pieces(spm_model, text, return_unicode=True, sample=False):
"""turn sentences into word pieces."""
# liujiaxiang: add for ernie-albert, mainly consider for “/”/‘/’/— causing too many unk
text = clean_text(text)
if not sample:
pieces = spm_model.EncodeAsPieces(text)
else:
pieces = spm_model.SampleEncodeAsPieces(text, 64, 0.1)
return pieces
def encode_ids(spm_model, text, sample=False):
"""turn sentences into word pieces."""
pieces = encode_pieces(spm_model, text, return_unicode=False, sample=sample)
ids = [spm_model.PieceToId(piece) for piece in pieces]
return ids
def convert_to_unicode(text):
"""Converts `text` to Unicode (if it's not already), assuming utf-8 input."""
if six.PY3:
if isinstance(text, str):
return text
elif isinstance(text, bytes):
return text.decode("utf-8", "ignore")
else:
raise ValueError("Unsupported string type: %s" % (type(text)))
elif six.PY2:
if isinstance(text, str):
return text.decode("utf-8", "ignore")
elif isinstance(text, unicode):
return text
else:
raise ValueError("Unsupported string type: %s" % (type(text)))
else:
raise ValueError("Not running on Python2 or Python 3?")
def load_vocab(vocab_file):
"""Loads a vocabulary file into a dictionary."""
vocab = collections.OrderedDict()
fin = open(vocab_file)
for num, line in enumerate(fin):
items = convert_to_unicode(line.rstrip()).split("\t")
if len(items) > 2:
break
token = items[0]
index = items[1] if len(items) == 2 else num
token = token.strip()
vocab[token] = int(index)
return vocab
def convert_by_vocab(vocab, items):
"""Converts a sequence of [tokens|ids] using the vocab."""
output = []
for item in items:
output.append(vocab[item])
return output
class SentencePieceTokenizer(object):
"""Runs end-to-end tokenziation."""
@classmethod
def add_cmdline_args(cls, parser):
"""Add cmdline argurments."""
group = parser.add_argument_group("Tokenizer")
group.add_argument("--vocab_path", type=str, required=True)
group.add_argument("--do_lower_case", type=str2bool, default=False)
group.add_argument("--spm_model_file", type=str, required=True)
return group
def __init__(self, args):
self.spm_model = spm.SentencePieceProcessor()
self.spm_model.Load(args.spm_model_file)
self.vocab = load_vocab(args.vocab_path)
self.do_lower_case = args.do_lower_case
self.inv_vocab = {v: k for k, v in self.vocab.items()}
def tokenize(self, text):
"""Tokenizes a piece of text."""
text = preprocess_text(text, lower=self.do_lower_case)
return encode_pieces(self.spm_model, text, return_unicode=True)
def convert_tokens_to_ids(self, tokens):
"""Convert tokens to ids."""
ret = []
unk_id = self.vocab["<unk>"]
for token in tokens:
if token in self.vocab:
ret.append(self.vocab[token])
else:
ret.append(unk_id)
return ret
def convert_ids_to_tokens(self, ids):
"""Convert ids to tokens."""
return convert_by_vocab(self.inv_vocab, ids)
def merge_subword(self, tokens):
"""Merge subword."""
ret = []
for token in tokens:
if token.startswith(u"▁"):
ret.append(token[1:])
else:
if len(ret):
ret[-1] += token
else:
ret.append(token)
ret = [token for token in ret if token]
return ret
def convert_ids_to_str(self, ids):
"""Convert ids to string."""
tokens = self.convert_ids_to_tokens(ids)
tokens = self.merge_subword(tokens)
res = " ".join(tokens).replace("<s>", "")
res = res.replace("</s>", "\n").replace("\n ", "\n").strip()
return res
def _is_whitespace(char):
"""Checks whether `chars` is a whitespace character."""
# \t, \n, and \r are technically contorl characters but we treat them
# as whitespace since they are generally considered as such.
if char == " " or char == "\t" or char == "\n" or char == "\r":
return True
cat = unicodedata.category(char)
if cat == "Zs":
return True
return False
def _is_control(char):
"""Checks whether `chars` is a control character."""
# These are technically control characters but we count them as whitespace
# characters.
if char == "\t" or char == "\n" or char == "\r":
return False
cat = unicodedata.category(char)
if cat.startswith("C"):
return True
return False
## 概述
PLATO2是一个超大规模生成式对话系统模型。它承袭了PLATO隐变量进行回复多样化生成的特性,能够就开放域话题进行流畅深入的聊天。据公开数据,其效果超越了Google 于2020年2月份发布的 Meena和Facebook AI Research于2020年4月份发布的Blender的效果。plato2_en_large包含1.6B参数,可用于一键预测对话回复,该Module仅支持使用GPU预测,不支持CPU。
<p align="center">
<img src="https://image.jiqizhixin.com/uploads/editor/65107b78-0259-4121-b8c5-a090f9d3175b/640.png" hspace='10'/> <br />
</p>
更多详情参考论文[PLATO-2: Towards Building an Open-Domain Chatbot via Curriculum Learning](https://arxiv.org/abs/2006.16779)
## 命令行预测
```shell
$ hub run plato2_en_large --input_text="Hello, how are you" --use_gpu
```
## API
```python
def generate(texts):
```
预测API,输入对话上下文,输出机器回复。
**参数**
* texts (list\[str\] or str): 如果不在交互模式中,texts应为list,每个元素为一次对话的上下文,上下文应包含人类和机器人的对话内容,不同角色之间的聊天用分隔符"\t"进行分割;例如[["Hello\thi, nice to meet you\tnice to meet you"]]。这个输入中包含1次对话,机器人回复了"hi, nice to meet you"后人类回复“nice to meet you”,现在轮到机器人回复了。如果在交互模式中,texts应为str,模型将自动构建它的上下文。
**返回**
* results (list\[str\]): 每个元素为相应对话中机器人的新回复。
**代码示例**
```python
import paddlehub as hub
module = hub.Module(name="plato2_en_large")
test_texts = ["Hello","Hello\thi, nice to meet you\tnice to meet you"]
results = module.generate(texts=test_texts)
for result in results:
print(result)
```
```python
def interactive_mode(max_turn =6):
```
进入交互模式。交互模式中,generate接口的texts将支持字符串类型。
**参数**
* max_turn (int): 模型能记忆的对话轮次,当max_turn = 1时,模型只能记住当前对话,无法获知之前的对话内容。
**代码示例**
```python
import paddlehub as hub
module = hub.Module(name="plato2_en_large")
with module.interactive_mode(max_turn=6):
while True:
human_utterance = input("[Human]: ").strip()
robot_utterance = module.generate(human_utterance)
print("[Bot]: %s"%robot_utterance[0])
```
## 服务部署
PaddleHub Serving 可以部署在线服务。
### 第一步:启动PaddleHub Serving
运行启动命令:
```shell
$ hub serving start -m plato2_en_large -p 8866
```
这样就完成了一个服务化API的部署,默认端口号为8866。
**NOTE:** 在启动服务之前,请设置CUDA\_VISIBLE\_DEVICES环境变量。
### 第二步:发送预测请求
方式1: 自定义脚本发送对话信息
配置好服务端,以下数行代码即可实现发送预测请求,获取预测结果
```python
import requests
import json
# 发送HTTP请求
data = {'texts':["Hello","Hello\thi, nice to meet you\tnice to meet you"]}
headers = {"Content-type": "application/json"}
url = "http://127.0.0.1:8866/predict/plato2_en_large"
r = requests.post(url=url, headers=headers, data=json.dumps(data))
# 保存结果
results = r.json()["results"]
for result in results:
print(result)
```
方式2: 通过交互式客户端进入交互模式
您可以执行以下客户端脚本进入交互模式:
```python
import requests
import json
ADDR = "127.0.0.1" # Your serving address
PORT = 8866 # Your serving port
MAX_TURN = 6
headers = {"Content-type": "application/json"}
url = "http://%s:%s/predict/plato2_en_large" % (ADDR, PORT)
context = []
while True:
user_utt = input("[Human]: ").strip()
if user_utt == "[NEXT]":
context = ""
print("Restart")
else:
context.append(user_utt)
data = {'texts': ["\t".join(context[-MAX_TURN:])]}
r = requests.post(url=url, headers=headers, data=json.dumps(data))
bot_response = r.json()["results"][0]
print("[Bot]: %s"%bot_response)
context.append(bot_response)
```
## 查看代码
https://github.com/PaddlePaddle/Knover
### 依赖
paddlepaddle >= 1.8.2
paddlehub >= 1.7.0
## 更新历史
* 1.0.0
初始发布
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Define model."""
from plato2_en_large.models.model_base import Model
MODEL_REGISTRY = {}
__all__ = [
"MODEL_REGISTRY", "register_model", "create_model", "add_cmdline_args"
]
def register_model(name):
"""
Register a new model class.
"""
def __wrapped__(cls):
if name in MODEL_REGISTRY:
raise ValueError(f"Cannot register duplicate model ({name})")
if not issubclass(cls, Model):
raise ValueError(
f"Model ({name}: {cls.__name__}) must extend Model")
MODEL_REGISTRY[name] = cls
return cls
return __wrapped__
def create_model(args, place) -> Model:
"""
Create a model.
"""
return MODEL_REGISTRY[args.model](args, place)
def add_cmdline_args(parser):
""" Add cmdline argument of Model. """
group = parser.add_argument_group("Model")
# Model
group.add_argument("--model", type=str, required=True)
# Config
group.add_argument("--config_path", type=str, required=True)
# Model related.
args, _ = parser.parse_known_args()
if args.model not in MODEL_REGISTRY:
raise ValueError(f"Unknown model type: {args.model}")
MODEL_REGISTRY[args.model].add_cmdline_args(parser)
return group
import plato2_en_large.models.nsp_model
import plato2_en_large.models.plato
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Generator class"""
import numpy as np
import paddle.fluid.layers as layers
from plato2_en_large.utils.args import str2bool
class Generator(object):
"""
Generator class
Use generator in inference phase.
"""
@classmethod
def add_cmdline_args(cls, parser):
"""Add cmdline argurments."""
group = parser.add_argument_group("Generator")
group.add_argument("--min_dec_len", type=int, default=1)
group.add_argument("--max_dec_len", type=int, default=64)
group.add_argument(
"--decoding_strategy",
type=str,
default="topk_sampling",
choices=["beam_search", "topk_sampling", "topp_sampling"])
group.add_argument("--temperature", type=float, default=1.)
group.add_argument("--ignore_unk", type=str2bool, default=True)
# multi sampling
group.add_argument("--num_samples", type=int, default=None)
# top-k sampling
group.add_argument("--topk", type=int, default=10)
# top-p sampling
group.add_argument("--topp", type=float, default=0.9)
# beam search
group.add_argument("--beam_size", type=int, default=10)
group.add_argument("--length_average", type=str2bool, default=True)
group.add_argument("--length_penalty", type=float, default=0.0)
return group
def __init__(self, args):
self.min_dec_len = args.min_dec_len
self.max_dec_len = args.max_dec_len
self.eos_id = args.eos_id
self.unk_id = args.unk_id
self.mask_id = args.mask_id
self.vocab_size = args.vocab_size
# model related
# basic settings
self.decoding_strategy = args.decoding_strategy
self.ignore_unk = args.ignore_unk
self.continuous_position = args.continuous_position
self.temperature = args.temperature
# reranking
self.num_samples = args.num_samples
# top-k sampling
self.topk = args.topk
# top-p sampling
self.topp = args.topp
# beam search
self.beam_size = args.beam_size
self.length_penalty = args.length_penalty
self.length_average = args.length_average
return
def inference(self, model, inputs, outputs):
"""
Run inference.
Args:
inputs(dict): Its key is input name(str) and its value is a Variable.
model(object): A generate model. Need to implement `_generation_network` and `_calc_logits`.
Returns:
dict(str:Variable): Its key is output name(str) and its value is a Variable.
"""
# prepare while loop
max_len = layers.fill_constant(
shape=[1], dtype="int64", value=self.max_dec_len, force_cpu=True)
min_len = layers.fill_constant(
shape=[1], dtype="int64", value=self.min_dec_len, force_cpu=True)
step_idx = layers.fill_constant(
shape=[1], dtype="int64", value=0, force_cpu=True)
ids = layers.array_write(
layers.reshape(inputs["tgt_ids"], (-1, 1)), step_idx)
pos_biases = layers.array_write(
layers.reshape(inputs["tgt_pos"], (-1, 1)), step_idx)
scores = layers.array_write(inputs["init_score"], step_idx)
tgt_generation_mask = layers.array_write(inputs["tgt_generation_mask"],
step_idx)
parent_idx = inputs["parent_idx"]
if self.decoding_strategy == "beam_search":
beam_size = self.beam_size
else:
beam_size = 1
eos_penalty = np.zeros(self.vocab_size, dtype="float32")
eos_penalty[self.eos_id] = -1e9
eos_penalty = layers.assign(eos_penalty)
token_penalty = np.zeros(self.vocab_size, dtype="float32")
token_penalty[self.unk_id] = -1e9
if self.mask_id >= 0:
token_penalty[self.mask_id] = -1e9
token_penalty = layers.assign(token_penalty)
# start while loop
cond = layers.less_than(x=step_idx, y=max_len)
while_op = layers.While(cond)
with while_op.block():
pre_ids = layers.array_read(array=ids, i=step_idx)
pre_ids = layers.reshape(pre_ids, (-1, 1, 1), inplace=True)
pre_scores = layers.array_read(array=scores, i=step_idx)
pos_bias = layers.array_read(array=pos_biases, i=step_idx)
pos_bias = layers.gather(input=pos_bias, index=parent_idx)
tmp_tgt_generation_mask = layers.array_read(
tgt_generation_mask, i=step_idx)
dtype = tmp_tgt_generation_mask.dtype
append_mask = layers.fill_constant_batch_size_like(
input=pre_ids, value=1.0, shape=[-1, 1, 1], dtype=dtype)
tmp_tgt_generation_mask = layers.concat(
[tmp_tgt_generation_mask, append_mask], axis=2)
pre_mask = tmp_tgt_generation_mask = layers.gather(
input=tmp_tgt_generation_mask, index=parent_idx)
pre_sent = layers.fill_constant_batch_size_like(
input=pre_mask, value=1, shape=[-1, 1, 1], dtype=pre_ids.dtype)
if self.continuous_position:
pre_pos = layers.elementwise_mul(
x=layers.fill_constant_batch_size_like(
input=pre_mask,
value=1,
shape=[-1, 1, 1],
dtype=pre_ids.dtype),
y=step_idx,
axis=0) + pos_bias
else:
pre_pos = layers.elementwise_mul(
x=layers.fill_constant_batch_size_like(
input=pre_mask,
value=1,
shape=[-1, 1, 1],
dtype=pre_ids.dtype),
y=step_idx,
axis=0)
dec_out, _ = model._generation_network(
token_ids=pre_ids,
type_ids=pre_sent,
pos_ids=pre_pos,
generation_mask=tmp_tgt_generation_mask,
gather_idx=parent_idx)
logits = model._calc_logits(dec_out)
# ignore unk and mask token
if self.ignore_unk:
logits = layers.elementwise_add(logits, token_penalty, axis=1)
# min dec length
min_len_cond = layers.less_than(x=step_idx, y=min_len)
def min_len_penalty():
"""Plus minimum length penalty."""
return layers.elementwise_add(logits, eos_penalty, axis=1)
def no_penalty():
"""No penalty."""
return logits
logits = layers.case([(min_len_cond, min_len_penalty)],
default=no_penalty)
# get probs
probs = layers.softmax(logits / self.temperature)
if self.decoding_strategy == "beam_search":
topk_scores, topk_indices = layers.topk(
input=probs, k=beam_size)
else:
if self.decoding_strategy.startswith("sampling"):
sampling_ids = layers.sampling_id(probs, dtype="int")
elif self.decoding_strategy.startswith("topk_sampling"):
topk_probs, _ = layers.topk(input=probs, k=self.topk)
ge_cond = layers.cast(
layers.greater_equal(
probs, layers.unsqueeze(topk_probs[:, -1], [1])),
"float32")
old_probs = probs
probs = probs * ge_cond / layers.reduce_sum(
topk_probs, dim=-1, keep_dim=True)
sampling_ids = layers.sampling_id(probs, dtype="int")
probs = old_probs
elif self.decoding_strategy.startswith("topp_sampling"):
sorted_probs, sorted_idx = layers.argsort(
probs, descending=True)
cum_sorted_probs = layers.cumsum(
sorted_probs, axis=1, exclusive=True)
lt_cond = layers.cast(
layers.less_than(
cum_sorted_probs,
layers.fill_constant_batch_size_like(
cum_sorted_probs, cum_sorted_probs.shape,
cum_sorted_probs.dtype, self.topp)), "float32")
old_probs = probs
candidate_probs = sorted_probs * lt_cond
probs = candidate_probs / layers.reduce_sum(
candidate_probs, dim=-1, keep_dim=True)
sampling_ids = layers.sampling_id(probs, dtype="int")
sampling_ids = layers.index_sample(
sorted_idx, layers.unsqueeze(sampling_ids, [1]))
sampling_ids = layers.squeeze(sampling_ids, [1])
probs = old_probs
else:
raise ValueError(self.decoding_strategy)
sampling_scores = layers.one_hot(
layers.unsqueeze(sampling_ids, [1]), probs.shape[1])
sampling_scores = sampling_scores * probs - (
1 - sampling_scores) * 1e3
topk_scores, topk_indices = layers.topk(
input=sampling_scores, k=1)
pre_len = layers.cast(step_idx, "float32")
layers.increment(x=step_idx, value=1.0, in_place=True)
cur_len = layers.cast(step_idx, "float32")
# update scores
if self.length_average:
accu_scores = layers.elementwise_add(
x=layers.log(topk_scores), y=pre_scores * pre_len,
axis=0) / cur_len
elif self.length_penalty > 0:
pre_lp = layers.pow((5 + pre_len) / 6, self.length_penalty)
cur_lp = layers.pow((5 + cur_len) / 6, self.length_penalty)
accu_scores = layers.elementwise_add(
x=layers.log(topk_scores), y=pre_scores * pre_lp,
axis=0) / cur_lp
else:
accu_scores = layers.elementwise_add(
x=layers.log(topk_scores), y=pre_scores, axis=0)
topk_indices = layers.lod_reset(topk_indices, pre_ids)
accu_scores = layers.lod_reset(accu_scores, pre_ids)
selected_ids, selected_scores, gather_idx = layers.beam_search(
pre_ids=pre_ids,
pre_scores=pre_scores,
ids=topk_indices,
scores=accu_scores,
beam_size=beam_size,
end_id=self.eos_id,
return_parent_idx=True)
layers.array_write(selected_ids, i=step_idx, array=ids)
layers.array_write(selected_scores, i=step_idx, array=scores)
layers.array_write(pre_mask, i=step_idx, array=tgt_generation_mask)
layers.array_write(pos_bias, i=step_idx, array=pos_biases)
layers.assign(gather_idx, parent_idx)
length_cond = layers.less_than(x=step_idx, y=max_len)
finish_cond = layers.logical_not(layers.is_empty(x=selected_ids))
layers.logical_and(x=length_cond, y=finish_cond, out=cond)
finished_ids, finished_scores = layers.beam_search_decode(
ids, scores, beam_size=beam_size, end_id=self.eos_id)
predictions = {
"finished_ids": finished_ids,
"finished_scores": finished_scores,
"token_ids": inputs["token_ids"],
"data_id": inputs["data_id"]
}
return predictions
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""NSP model."""
import paddle.fluid as fluid
import paddle.fluid.layers as layers
from . import register_model
from .model_base import Model
from .unified_transformer import UnifiedTransformer
@register_model("NSPModel")
class NSPModel(UnifiedTransformer):
"""NSP model."""
def _get_feed_dict(self, is_infer=False):
"""
Get the feed list of the model.
Args:
is_infer(bool): True if running inference.
Returns:
list(Variable): The feed list.
list(str): The name of each Variable in feed list.
"""
feed_dict = {}
feed_dict["token_ids"] = layers.data(
name="token_ids", shape=[-1, self.max_seq_len, 1], dtype="int64")
feed_dict["type_ids"] = layers.data(
name="type_ids", shape=[-1, self.max_seq_len, 1], dtype="int64")
feed_dict["pos_ids"] = layers.data(
name="pos_ids", shape=[-1, self.max_seq_len, 1], dtype="int64")
feed_dict["attention_mask"] = layers.data(
name="attention_mask",
shape=[-1, self.max_seq_len, self.max_seq_len],
dtype=self.dtype)
feed_dict["label_pos"] = layers.data(
name="label_pos", shape=[-1, 1], dtype="int64")
if not is_infer:
feed_dict["label"] = layers.data(
name="label", shape=[-1, 1], dtype="int64")
feed_dict["tgt_label"] = layers.data(
name="tgt_ids", shape=[-1, 1], dtype="int64")
feed_dict["tgt_pos"] = layers.data(
name="tgt_pos", shape=[-1, 1], dtype="int64")
feed_dict["data_id"] = layers.data(
name="data_id", shape=[-1, 1], dtype="int64")
return feed_dict
def _get_feed(self, inputs, is_infer=False):
return Model._get_feed(self, inputs, is_infer)
def forward(self, inputs, is_infer=False):
outputs = {}
self.generation_caches = None
outputs["enc_out"], self.checkpoints = self._generation_network(
token_ids=inputs["token_ids"],
type_ids=inputs["type_ids"],
pos_ids=inputs["pos_ids"],
generation_mask=inputs["attention_mask"])
return outputs
def _get_metrics(self, inputs, outputs):
metrics = {}
fc_out = self._calc_logits(outputs["enc_out"], inputs["tgt_pos"])
lm_loss = layers.softmax_with_cross_entropy(
logits=fc_out, label=inputs["tgt_pos"])
need_cal = layers.not_equal(
inputs["tgt_label"],
layers.fill_constant(shape=[1], dtype="int64", value=1))
need_cal = layers.cast(need_cal, self.dtype)
mean_lm_loss = layers.reduce_sum(
lm_loss * need_cal) / (layers.reduce_sum(need_cal) + 1e-10)
pooled_out = self._get_pooled_output(outputs["enc_out"],
inputs["label_pos"])
nsp_fc_out = layers.fc(
input=pooled_out,
size=2,
param_attr=fluid.ParamAttr(
name="next_sent_fc.w_0", initializer=self.param_initializer),
bias_attr="next_sent_fc.b_0")
nsp_loss, nsp_softmax = layers.softmax_with_cross_entropy(
logits=nsp_fc_out, label=inputs["label"], return_softmax=True)
nsp_acc = layers.accuracy(nsp_softmax, inputs["label"])
mean_nsp_loss = layers.mean(nsp_loss)
metrics["loss"] = mean_lm_loss + mean_nsp_loss
metrics["lm_loss"] = mean_lm_loss
metrics["nsp_loss"] = mean_nsp_loss
metrics["nsp_acc"] = nsp_acc
return metrics
def infer(self, inputs, outputs):
pooled_out = self._get_pooled_output(outputs["enc_out"],
inputs["label_pos"])
nsp_fc_out = layers.fc(
input=pooled_out,
size=2,
param_attr=fluid.ParamAttr(
name="next_sent_fc.w_0", initializer=self.param_initializer),
bias_attr="next_sent_fc.b_0")
scores = layers.softmax(nsp_fc_out)
predictions = {"scores": scores, "data_id": inputs["data_id"]}
return predictions
def infer_step(self, inputs):
return Model.infer_step(self, inputs)
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Optimizer."""
import re
import paddle.fluid as fluid
import paddle.fluid.layers as layers
class AdamW(fluid.optimizer.AdamOptimizer):
"""AdamW object for dygraph"""
def __init__(self, *args, **kwargs):
weight_decay = kwargs.pop('weight_decay', None)
var_name_to_exclude = kwargs.pop(
'var_name_to_exclude', '.*layer_norm_scale|.*layer_norm_bias|.*b_0')
super(AdamW, self).__init__(*args, **kwargs)
self.wd = weight_decay
self.pat = re.compile(var_name_to_exclude)
def apply_optimize(self, loss, startup_program, params_grads):
"""Update params with weight decay."""
super(AdamW, self).apply_optimize(loss, startup_program, params_grads)
for p, g in params_grads:
if not self.pat.match(p.name):
layers.assign(p * (1. - self.wd * self._learning_rate), p)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册