未验证 提交 a1e598ec 编写于 作者: K KP 提交者: GitHub

Merge pull request #1815 from haoyuying/add_model

add 10 segmentation model
# ann_resnet50_cityscapes
|模型名称|ann_resnet50_cityscapes|
| :--- | :---: |
|类别|图像-图像分割|
|网络|ann_resnet50vd|
|数据集|Cityscapes|
|是否支持Fine-tuning|是|
|模型大小|228MB|
|指标|-|
|最新更新日期|2022-03-22|
## 一、模型基本信息
- 样例结果示例:
<p align="center">
<img src="https://user-images.githubusercontent.com/35907364/159212111-df341f2a-e994-45d7-92d6-2288d666079c.png" width = "420" height = "505" hspace='10'/> <img src="https://user-images.githubusercontent.com/35907364/159212188-2db40b29-2943-47ce-9ad2-36a6fb85ba3e.png" width = "420" height = "505" hspace='10'/>
</p>
- ### 模型介绍
- 本示例将展示如何使用PaddleHub对预训练模型进行finetune并完成预测任务。
- 更多详情请参考:[ann](https://arxiv.org/pdf/1908.07678.pdf)
## 二、安装
- ### 1、环境依赖
- paddlepaddle >= 2.0.0
- paddlehub >= 2.0.0
- ### 2、安装
- ```shell
$ hub install ann_resnet50_cityscapes
```
- 如您安装时遇到问题,可参考:[零基础windows安装](../../../../docs/docs_ch/get_start/windows_quickstart.md)
| [零基础Linux安装](../../../../docs/docs_ch/get_start/linux_quickstart.md) | [零基础MacOS安装](../../../../docs/docs_ch/get_start/mac_quickstart.md)
## 三、模型API预测
- ### 1.预测代码示例
- ```python
import cv2
import paddle
import paddlehub as hub
if __name__ == '__main__':
model = hub.Module(name='ann_resnet50_cityscapes')
img = cv2.imread("/PATH/TO/IMAGE")
result = model.predict(images=[img], visualization=True)
```
- ### 2.如何开始Fine-tune
- 在完成安装PaddlePaddle与PaddleHub后,通过执行`python train.py`即可开始使用ann_resnet50_cityscapes模型对OpticDiscSeg数据集进行Fine-tune。 `train.py`内容如下:
- 代码步骤
- Step1: 定义数据预处理方式
- ```python
from paddlehub.vision.segmentation_transforms import Compose, Resize, Normalize
transform = Compose([Resize(target_size=(512, 512)), Normalize()])
```
- `segmentation_transforms` 数据增强模块定义了丰富的针对图像分割数据的预处理方式,用户可按照需求替换自己需要的数据预处理方式。
- Step2: 下载数据集并使用
- ```python
from paddlehub.datasets import OpticDiscSeg
train_reader = OpticDiscSeg(transform, mode='train')
```
- `transforms`: 数据预处理方式。
- `mode`: `mode`: 选择数据模式,可选项有 `train`, `test`, `val`, 默认为`train`。
- 数据集的准备代码可以参考 [opticdiscseg.py](../../paddlehub/datasets/opticdiscseg.py)。`hub.datasets.OpticDiscSeg()`会自动从网络下载数据集并解压到用户目录下`$HOME/.paddlehub/dataset`目录。
- Step3: 加载预训练模型
- ```python
import paddlehub as hub
model = hub.Module(name='ann_resnet50_cityscapes', num_classes=2, pretrained=None)
```
- `name`: 选择预训练模型的名字。
- `load_checkpoint`: 是否加载自己训练的模型,若为None,则加载提供的模型默认参数。
- Step4: 选择优化策略和运行配置
- ```python
import paddle
from paddlehub.finetune.trainer import Trainer
scheduler = paddle.optimizer.lr.PolynomialDecay(learning_rate=0.01, decay_steps=1000, power=0.9, end_lr=0.0001)
optimizer = paddle.optimizer.Adam(learning_rate=scheduler, parameters=model.parameters())
trainer = Trainer(model, optimizer, checkpoint_dir='test_ckpt_img_seg', use_gpu=True)
trainer.train(train_reader, epochs=10, batch_size=4, log_interval=10, save_interval=4)
```
- 模型预测
- 当完成Fine-tune后,Fine-tune过程在验证集上表现最优的模型会被保存在`${CHECKPOINT_DIR}/best_model`目录下,其中`${CHECKPOINT_DIR}`目录为Fine-tune时所选择的保存checkpoint的目录。我们使用该模型来进行预测。predict.py脚本如下:
```python
import paddle
import cv2
import paddlehub as hub
if __name__ == '__main__':
model = hub.Module(name='ann_resnet50_cityscapes', pretrained='/PATH/TO/CHECKPOINT')
img = cv2.imread("/PATH/TO/IMAGE")
model.predict(images=[img], visualization=True)
```
- 参数配置正确后,请执行脚本`python predict.py`。
- **Args**
* `images`:原始图像路径或BGR格式图片;
* `visualization`: 是否可视化,默认为True;
* `save_path`: 保存结果的路径,默认保存路径为'seg_result'。
**NOTE:** 进行预测时,所选择的module,checkpoint_dir,dataset必须和Fine-tune所用的一样。
## 四、服务部署
- PaddleHub Serving可以部署一个在线图像分割服务。
- ### 第一步:启动PaddleHub Serving
- 运行启动命令:
- ```shell
$ hub serving start -m ann_resnet50_cityscapes
```
- 这样就完成了一个图像分割服务化API的部署,默认端口号为8866。
- **NOTE:** 如使用GPU预测,则需要在启动服务之前,请设置CUDA_VISIBLE_DEVICES环境变量,否则不用设置。
- ### 第二步:发送预测请求
- 配置好服务端,以下数行代码即可实现发送预测请求,获取预测结果
```python
import requests
import json
import cv2
import base64
import numpy as np
def cv2_to_base64(image):
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tostring()).decode('utf8')
def base64_to_cv2(b64str):
data = base64.b64decode(b64str.encode('utf8'))
data = np.fromstring(data, np.uint8)
data = cv2.imdecode(data, cv2.IMREAD_COLOR)
return data
# 发送HTTP请求
org_im = cv2.imread('/PATH/TO/IMAGE')
data = {'images':[cv2_to_base64(org_im)]}
headers = {"Content-type": "application/json"}
url = "http://127.0.0.1:8866/predict/ginet_resnet50vd_voc"
r = requests.post(url=url, headers=headers, data=json.dumps(data))
mask = base64_to_cv2(r.json()["results"][0])
```
## 五、更新历史
* 1.0.0
初始发布
# ann_resnet50_cityscapes
|Module Name|ann_resnet50_cityscapes|
| :--- | :---: |
|Category|Image Segmentation|
|Network|ann_resnet50vd|
|Dataset|Cityscapes|
|Fine-tuning supported or not|Yes|
|Module Size|228MB|
|Data indicators|-|
|Latest update date|2022-03-22|
## I. Basic Information
- ### Application Effect Display
- Sample results:
<p align="center">
<img src="https://user-images.githubusercontent.com/35907364/159212111-df341f2a-e994-45d7-92d6-2288d666079c.png" width = "420" height = "505" hspace='10'/> <img src="https://user-images.githubusercontent.com/35907364/159212188-2db40b29-2943-47ce-9ad2-36a6fb85ba3e.png" width = "420" height = "505" hspace='10'/>
</p>
- ### Module Introduction
- We will show how to use PaddleHub to finetune the pre-trained model and complete the prediction.
- For more information, please refer to: [ann](https://arxiv.org/pdf/1908.07678.pdf)
## II. Installation
- ### 1、Environmental Dependence
- paddlepaddle >= 2.0.0
- paddlehub >= 2.0.0
- ### 2、Installation
- ```shell
$ hub install ann_resnet50_cityscapes
```
- In case of any problems during installation, please refer to:[Windows_Quickstart](../../../../docs/docs_en/get_start/windows_quickstart.md)
| [Linux_Quickstart](../../../../docs/docs_en/get_start/linux_quickstart.md) | [Mac_Quickstart](../../../../docs/docs_en/get_start/mac_quickstart.md)
## III. Module API Prediction
- ### 1、Prediction Code Example
- ```python
import cv2
import paddle
import paddlehub as hub
if __name__ == '__main__':
model = hub.Module(name='ann_resnet50_cityscapes')
img = cv2.imread("/PATH/TO/IMAGE")
result = model.predict(images=[img], visualization=True)
```
- ### 2.Fine-tune and Encapsulation
- After completing the installation of PaddlePaddle and PaddleHub, you can start using the ann_resnet50_cityscapes model to fine-tune datasets such as OpticDiscSeg.
- Steps:
- Step1: Define the data preprocessing method
- ```python
from paddlehub.vision.segmentation_transforms import Compose, Resize, Normalize
transform = Compose([Resize(target_size=(512, 512)), Normalize()])
```
- `segmentation_transforms`: The data enhancement module defines lots of data preprocessing methods. Users can replace the data preprocessing methods according to their needs.
- Step2: Download the dataset
- ```python
from paddlehub.datasets import OpticDiscSeg
train_reader = OpticDiscSeg(transform, mode='train')
```
* `transforms`: data preprocessing methods.
* `mode`: Select the data mode, the options are `train`, `test`, `val`. Default is `train`.
* Dataset preparation can be referred to [opticdiscseg.py](../../paddlehub/datasets/opticdiscseg.py)。`hub.datasets.OpticDiscSeg()`will be automatically downloaded from the network and decompressed to the `$HOME/.paddlehub/dataset` directory under the user directory.
- Step3: Load the pre-trained model
- ```python
import paddlehub as hub
model = hub.Module(name='ann_resnet50_cityscapes', num_classes=2, pretrained=None)
```
- `name`: model name.
- `load_checkpoint`: Whether to load the self-trained model, if it is None, load the provided parameters.
- Step4: Optimization strategy
- ```python
import paddle
from paddlehub.finetune.trainer import Trainer
scheduler = paddle.optimizer.lr.PolynomialDecay(learning_rate=0.01, decay_steps=1000, power=0.9, end_lr=0.0001)
optimizer = paddle.optimizer.Adam(learning_rate=scheduler, parameters=model.parameters())
trainer = Trainer(model, optimizer, checkpoint_dir='test_ckpt_img_seg', use_gpu=True)
trainer.train(train_reader, epochs=10, batch_size=4, log_interval=10, save_interval=4)
```
- Model prediction
- When Fine-tune is completed, the model with the best performance on the verification set will be saved in the `${CHECKPOINT_DIR}/best_model` directory. We use this model to make predictions. The `predict.py` script is as follows:
```python
import paddle
import cv2
import paddlehub as hub
if __name__ == '__main__':
model = hub.Module(name='ann_resnet50_cityscapes', pretrained='/PATH/TO/CHECKPOINT')
img = cv2.imread("/PATH/TO/IMAGE")
model.predict(images=[img], visualization=True)
```
- **Args**
* `images`: Image path or ndarray data with format [H, W, C], BGR.
* `visualization`: Whether to save the recognition results as picture files.
* `save_path`: Save path of the result, default is 'seg_result'.
## IV. Server Deployment
- PaddleHub Serving can deploy an online service of image segmentation.
- ### Step 1: Start PaddleHub Serving
- Run the startup command:
- ```shell
$ hub serving start -m ann_resnet50_cityscapes
```
- The servitization API is now deployed and the default port number is 8866.
- **NOTE:** If GPU is used for prediction, set CUDA_VISIBLE_DEVICES environment variable before the service, otherwise it need not be set.
- ### Step 2: Send a predictive request
- With a configured server, use the following lines of code to send the prediction request and obtain the result:
```python
import requests
import json
import cv2
import base64
import numpy as np
def cv2_to_base64(image):
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tostring()).decode('utf8')
def base64_to_cv2(b64str):
data = base64.b64decode(b64str.encode('utf8'))
data = np.fromstring(data, np.uint8)
data = cv2.imdecode(data, cv2.IMREAD_COLOR)
return data
org_im = cv2.imread('/PATH/TO/IMAGE')
data = {'images':[cv2_to_base64(org_im)]}
headers = {"Content-type": "application/json"}
url = "http://127.0.0.1:8866/predict/ann_resnet50_cityscapes"
r = requests.post(url=url, headers=headers, data=json.dumps(data))
mask = base64_to_cv2(r.json()["results"][0])
```
## V. Release Note
- 1.0.0
First release
# copyright (c) 2021 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle
import paddle.nn as nn
import paddle.nn.functional as F
from paddle.nn.layer import activation
from paddle.nn import Conv2D, AvgPool2D
def SyncBatchNorm(*args, **kwargs):
"""In cpu environment nn.SyncBatchNorm does not have kernel so use nn.BatchNorm2D instead"""
if paddle.get_device() == 'cpu':
return nn.BatchNorm2D(*args, **kwargs)
else:
return nn.SyncBatchNorm(*args, **kwargs)
class SeparableConvBNReLU(nn.Layer):
"""Depthwise Separable Convolution."""
def __init__(self,
in_channels: int,
out_channels: int,
kernel_size: int,
padding: str = 'same',
**kwargs: dict):
super(SeparableConvBNReLU, self).__init__()
self.depthwise_conv = ConvBN(
in_channels,
out_channels=in_channels,
kernel_size=kernel_size,
padding=padding,
groups=in_channels,
**kwargs)
self.piontwise_conv = ConvBNReLU(
in_channels, out_channels, kernel_size=1, groups=1)
def forward(self, x: paddle.Tensor) -> paddle.Tensor:
x = self.depthwise_conv(x)
x = self.piontwise_conv(x)
return x
class ConvBN(nn.Layer):
"""Basic conv bn layer"""
def __init__(self,
in_channels: int,
out_channels: int,
kernel_size: int,
padding: str = 'same',
**kwargs: dict):
super(ConvBN, self).__init__()
self._conv = Conv2D(
in_channels, out_channels, kernel_size, padding=padding, **kwargs)
self._batch_norm = SyncBatchNorm(out_channels)
def forward(self, x: paddle.Tensor) -> paddle.Tensor:
x = self._conv(x)
x = self._batch_norm(x)
return x
class ConvBNReLU(nn.Layer):
"""Basic conv bn relu layer."""
def __init__(self,
in_channels: int,
out_channels: int,
kernel_size: int,
padding: str = 'same',
**kwargs: dict):
super(ConvBNReLU, self).__init__()
self._conv = Conv2D(
in_channels, out_channels, kernel_size, padding=padding, **kwargs)
self._batch_norm = SyncBatchNorm(out_channels)
def forward(self, x: paddle.Tensor) -> paddle.Tensor:
x = self._conv(x)
x = self._batch_norm(x)
x = F.relu(x)
return x
class Activation(nn.Layer):
"""
The wrapper of activations.
Args:
act (str, optional): The activation name in lowercase. It must be one of ['elu', 'gelu',
'hardshrink', 'tanh', 'hardtanh', 'prelu', 'relu', 'relu6', 'selu', 'leakyrelu', 'sigmoid',
'softmax', 'softplus', 'softshrink', 'softsign', 'tanhshrink', 'logsigmoid', 'logsoftmax',
'hsigmoid']. Default: None, means identical transformation.
Returns:
A callable object of Activation.
Raises:
KeyError: When parameter `act` is not in the optional range.
Examples:
from paddleseg.models.common.activation import Activation
relu = Activation("relu")
print(relu)
# <class 'paddle.nn.layer.activation.ReLU'>
sigmoid = Activation("sigmoid")
print(sigmoid)
# <class 'paddle.nn.layer.activation.Sigmoid'>
not_exit_one = Activation("not_exit_one")
# KeyError: "not_exit_one does not exist in the current dict_keys(['elu', 'gelu', 'hardshrink',
# 'tanh', 'hardtanh', 'prelu', 'relu', 'relu6', 'selu', 'leakyrelu', 'sigmoid', 'softmax',
# 'softplus', 'softshrink', 'softsign', 'tanhshrink', 'logsigmoid', 'logsoftmax', 'hsigmoid'])"
"""
def __init__(self, act: str = None):
super(Activation, self).__init__()
self._act = act
upper_act_names = activation.__dict__.keys()
lower_act_names = [act.lower() for act in upper_act_names]
act_dict = dict(zip(lower_act_names, upper_act_names))
if act is not None:
if act in act_dict.keys():
act_name = act_dict[act]
self.act_func = eval("activation.{}()".format(act_name))
else:
raise KeyError("{} does not exist in the current {}".format(
act, act_dict.keys()))
def forward(self, x: paddle.Tensor) -> paddle.Tensor:
if self._act is not None:
return self.act_func(x)
else:
return x
class ASPPModule(nn.Layer):
"""
Atrous Spatial Pyramid Pooling.
Args:
aspp_ratios (tuple): The dilation rate using in ASSP module.
in_channels (int): The number of input channels.
out_channels (int): The number of output channels.
align_corners (bool): An argument of F.interpolate. It should be set to False when the output size of feature
is even, e.g. 1024x512, otherwise it is True, e.g. 769x769.
use_sep_conv (bool, optional): If using separable conv in ASPP module. Default: False.
image_pooling (bool, optional): If augmented with image-level features. Default: False
"""
def __init__(self,
aspp_ratios: tuple,
in_channels: int,
out_channels: int,
align_corners: bool,
use_sep_conv: bool = False,
image_pooling: bool = False):
super().__init__()
self.align_corners = align_corners
self.aspp_blocks = nn.LayerList()
for ratio in aspp_ratios:
if use_sep_conv and ratio > 1:
conv_func = SeparableConvBNReLU
else:
conv_func = ConvBNReLU
block = conv_func(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=1 if ratio == 1 else 3,
dilation=ratio,
padding=0 if ratio == 1 else ratio)
self.aspp_blocks.append(block)
out_size = len(self.aspp_blocks)
if image_pooling:
self.global_avg_pool = nn.Sequential(
nn.AdaptiveAvgPool2D(output_size=(1, 1)),
ConvBNReLU(in_channels, out_channels, kernel_size=1, bias_attr=False))
out_size += 1
self.image_pooling = image_pooling
self.conv_bn_relu = ConvBNReLU(
in_channels=out_channels * out_size,
out_channels=out_channels,
kernel_size=1)
self.dropout = nn.Dropout(p=0.1) # drop rate
def forward(self, x: paddle.Tensor) -> paddle.Tensor:
outputs = []
for block in self.aspp_blocks:
y = block(x)
y = F.interpolate(
y,
x.shape[2:],
mode='bilinear',
align_corners=self.align_corners)
outputs.append(y)
if self.image_pooling:
img_avg = self.global_avg_pool(x)
img_avg = F.interpolate(
img_avg,
x.shape[2:],
mode='bilinear',
align_corners=self.align_corners)
outputs.append(img_avg)
x = paddle.concat(outputs, axis=1)
x = self.conv_bn_relu(x)
x = self.dropout(x)
return x
class AuxLayer(nn.Layer):
"""
The auxiliary layer implementation for auxiliary loss.
Args:
in_channels (int): The number of input channels.
inter_channels (int): The intermediate channels.
out_channels (int): The number of output channels, and usually it is num_classes.
dropout_prob (float, optional): The drop rate. Default: 0.1.
"""
def __init__(self,
in_channels: int,
inter_channels: int,
out_channels: int,
dropout_prob: float = 0.1,
**kwargs):
super().__init__()
self.conv_bn_relu = ConvBNReLU(
in_channels=in_channels,
out_channels=inter_channels,
kernel_size=3,
padding=1,
**kwargs)
self.dropout = nn.Dropout(p=dropout_prob)
self.conv = nn.Conv2D(
in_channels=inter_channels,
out_channels=out_channels,
kernel_size=1)
def forward(self, x: paddle.Tensor) -> paddle.Tensor:
x = self.conv_bn_relu(x)
x = self.dropout(x)
x = self.conv(x)
return x
class Add(nn.Layer):
def __init__(self):
super().__init__()
def forward(self, x: paddle.Tensor, y: paddle.Tensor, name: str = None):
return paddle.add(x, y, name)
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from typing import Union, List, Tuple
import numpy as np
import paddle
import paddle.nn as nn
import paddle.nn.functional as F
from paddlehub.module.module import moduleinfo
import paddlehub.vision.segmentation_transforms as T
from paddlehub.module.cv_module import ImageSegmentationModule
from ann_resnet50_cityscapes.resnet import ResNet50_vd
import ann_resnet50_cityscapes.layers as layers
@moduleinfo(
name="ann_resnet50_cityscapes",
type="CV/semantic_segmentation",
author="paddlepaddle",
author_email="",
summary="ANNResnet50 is a segmentation model.",
version="1.0.0",
meta=ImageSegmentationModule)
class ANN(nn.Layer):
"""
The ANN implementation based on PaddlePaddle.
The original article refers to
Zhen, Zhu, et al. "Asymmetric Non-local Neural Networks for Semantic Segmentation"
(https://arxiv.org/pdf/1908.07678.pdf).
Args:
num_classes (int): The unique number of target classes.
backbone_indices (tuple, optional): Two values in the tuple indicate the indices of output of backbone.
key_value_channels (int, optional): The key and value channels of self-attention map in both AFNB and APNB modules.
Default: 256.
inter_channels (int, optional): Both input and output channels of APNB modules. Default: 512.
psp_size (tuple, optional): The out size of pooled feature maps. Default: (1, 3, 6, 8).
enable_auxiliary_loss (bool, optional): A bool value indicates whether adding auxiliary loss. Default: True.
align_corners (bool, optional): An argument of F.interpolate. It should be set to False when the feature size is even,
e.g. 1024x512, otherwise it is True, e.g. 769x769. Default: False.
pretrained (str, optional): The path or url of pretrained model. Default: None.
"""
def __init__(self,
num_classes: int = 19,
backbone_indices: Tuple[int] = (2, 3),
key_value_channels: int = 256,
inter_channels: int = 512,
psp_size: Tuple[int] = (1, 3, 6, 8),
align_corners: bool = False,
pretrained: str = None):
super(ANN, self).__init__()
self.backbone = ResNet50_vd()
backbone_channels = [
self.backbone.feat_channels[i] for i in backbone_indices
]
self.head = ANNHead(num_classes, backbone_indices, backbone_channels,
key_value_channels, inter_channels, psp_size)
self.align_corners = align_corners
self.transforms = T.Compose([T.Normalize()])
if pretrained is not None:
model_dict = paddle.load(pretrained)
self.set_dict(model_dict)
print("load custom parameters success")
else:
checkpoint = os.path.join(self.directory, 'model.pdparams')
model_dict = paddle.load(checkpoint)
self.set_dict(model_dict)
print("load pretrained parameters success")
def transform(self, img: Union[np.ndarray, str]) -> Union[np.ndarray, str]:
return self.transforms(img)
def forward(self, x: paddle.Tensor) -> List[paddle.Tensor]:
feat_list = self.backbone(x)
logit_list = self.head(feat_list)
return [
F.interpolate(
logit,
paddle.shape(x)[2:],
mode='bilinear',
align_corners=self.align_corners) for logit in logit_list
]
class ANNHead(nn.Layer):
"""
The ANNHead implementation.
It mainly consists of AFNB and APNB modules.
Args:
num_classes (int): The unique number of target classes.
backbone_indices (tuple): Two values in the tuple indicate the indices of output of backbone.
The first index will be taken as low-level features; the second one will be
taken as high-level features in AFNB module. Usually backbone consists of four
downsampling stage, such as ResNet, and return an output of each stage. If it is (2, 3),
it means taking feature map of the third stage and the fourth stage in backbone.
backbone_channels (tuple): The same length with "backbone_indices". It indicates the channels of corresponding index.
key_value_channels (int): The key and value channels of self-attention map in both AFNB and APNB modules.
inter_channels (int): Both input and output channels of APNB modules.
psp_size (tuple): The out size of pooled feature maps.
enable_auxiliary_loss (bool, optional): A bool value indicates whether adding auxiliary loss. Default: False
"""
def __init__(self,
num_classes: int,
backbone_indices: Tuple[int],
backbone_channels: Tuple[int],
key_value_channels: int,
inter_channels: int,
psp_size: Tuple[int],
enable_auxiliary_loss: bool = False):
super().__init__()
low_in_channels = backbone_channels[0]
high_in_channels = backbone_channels[1]
self.fusion = AFNB(
low_in_channels=low_in_channels,
high_in_channels=high_in_channels,
out_channels=high_in_channels,
key_channels=key_value_channels,
value_channels=key_value_channels,
dropout_prob=0.05,
repeat_sizes=([1]),
psp_size=psp_size)
self.context = nn.Sequential(
layers.ConvBNReLU(
in_channels=high_in_channels,
out_channels=inter_channels,
kernel_size=3,
padding=1),
APNB(
in_channels=inter_channels,
out_channels=inter_channels,
key_channels=key_value_channels,
value_channels=key_value_channels,
dropout_prob=0.05,
repeat_sizes=([1]),
psp_size=psp_size))
self.cls = nn.Conv2D(
in_channels=inter_channels, out_channels=num_classes, kernel_size=1)
self.auxlayer = layers.AuxLayer(
in_channels=low_in_channels,
inter_channels=low_in_channels // 2,
out_channels=num_classes,
dropout_prob=0.05)
self.backbone_indices = backbone_indices
self.enable_auxiliary_loss = enable_auxiliary_loss
def forward(self, feat_list: List[paddle.Tensor]) -> List[paddle.Tensor]:
logit_list = []
low_level_x = feat_list[self.backbone_indices[0]]
high_level_x = feat_list[self.backbone_indices[1]]
x = self.fusion(low_level_x, high_level_x)
x = self.context(x)
logit = self.cls(x)
logit_list.append(logit)
if self.enable_auxiliary_loss:
auxiliary_logit = self.auxlayer(low_level_x)
logit_list.append(auxiliary_logit)
return logit_list
class AFNB(nn.Layer):
"""
Asymmetric Fusion Non-local Block.
Args:
low_in_channels (int): Low-level-feature channels.
high_in_channels (int): High-level-feature channels.
out_channels (int): Out channels of AFNB module.
key_channels (int): The key channels in self-attention block.
value_channels (int): The value channels in self-attention block.
dropout_prob (float): The dropout rate of output.
repeat_sizes (tuple, optional): The number of AFNB modules. Default: ([1]).
psp_size (tuple. optional): The out size of pooled feature maps. Default: (1, 3, 6, 8).
"""
def __init__(self,
low_in_channels: int,
high_in_channels: int,
out_channels: int,
key_channels: int,
value_channels: int,
dropout_prob: float,
repeat_sizes: Tuple[int] = ([1]),
psp_size: Tuple[int] = (1, 3, 6, 8)):
super().__init__()
self.psp_size = psp_size
self.stages = nn.LayerList([
SelfAttentionBlock_AFNB(low_in_channels, high_in_channels,
key_channels, value_channels, out_channels,
size) for size in repeat_sizes
])
self.conv_bn = layers.ConvBN(
in_channels=out_channels + high_in_channels,
out_channels=out_channels,
kernel_size=1)
self.dropout = nn.Dropout(p=dropout_prob)
def forward(self, low_feats: List[paddle.Tensor], high_feats: List[paddle.Tensor]) -> paddle.Tensor:
priors = [stage(low_feats, high_feats) for stage in self.stages]
context = priors[0]
for i in range(1, len(priors)):
context += priors[i]
output = self.conv_bn(paddle.concat([context, high_feats], axis=1))
output = self.dropout(output)
return output
class APNB(nn.Layer):
"""
Asymmetric Pyramid Non-local Block.
Args:
in_channels (int): The input channels of APNB module.
out_channels (int): Out channels of APNB module.
key_channels (int): The key channels in self-attention block.
value_channels (int): The value channels in self-attention block.
dropout_prob (float): The dropout rate of output.
repeat_sizes (tuple, optional): The number of AFNB modules. Default: ([1]).
psp_size (tuple, optional): The out size of pooled feature maps. Default: (1, 3, 6, 8).
"""
def __init__(self,
in_channels: int,
out_channels: int,
key_channels: int,
value_channels: int,
dropout_prob: float,
repeat_sizes: Tuple[int] = ([1]),
psp_size: Tuple[int] = (1, 3, 6, 8)):
super().__init__()
self.psp_size = psp_size
self.stages = nn.LayerList([
SelfAttentionBlock_APNB(in_channels, out_channels, key_channels,
value_channels, size)
for size in repeat_sizes
])
self.conv_bn = layers.ConvBNReLU(
in_channels=in_channels * 2,
out_channels=out_channels,
kernel_size=1)
self.dropout = nn.Dropout(p=dropout_prob)
def forward(self, x: paddle.Tensor) -> paddle.Tensor:
priors = [stage(x) for stage in self.stages]
context = priors[0]
for i in range(1, len(priors)):
context += priors[i]
output = self.conv_bn(paddle.concat([context, x], axis=1))
output = self.dropout(output)
return output
def _pp_module(x: paddle.Tensor, psp_size: List[int]) -> paddle.Tensor:
n, c, h, w = x.shape
priors = []
for size in psp_size:
feat = F.adaptive_avg_pool2d(x, size)
feat = paddle.reshape(feat, shape=(0, c, -1))
priors.append(feat)
center = paddle.concat(priors, axis=-1)
return center
class SelfAttentionBlock_AFNB(nn.Layer):
"""
Self-Attention Block for AFNB module.
Args:
low_in_channels (int): Low-level-feature channels.
high_in_channels (int): High-level-feature channels.
key_channels (int): The key channels in self-attention block.
value_channels (int): The value channels in self-attention block.
out_channels (int, optional): Out channels of AFNB module. Default: None.
scale (int, optional): Pooling size. Default: 1.
psp_size (tuple, optional): The out size of pooled feature maps. Default: (1, 3, 6, 8).
"""
def __init__(self,
low_in_channels: int,
high_in_channels: int,
key_channels: int,
value_channels: int,
out_channels: int = None,
scale: int = 1,
psp_size: Tuple[int] = (1, 3, 6, 8)):
super().__init__()
self.scale = scale
self.in_channels = low_in_channels
self.out_channels = out_channels
self.key_channels = key_channels
self.value_channels = value_channels
if out_channels == None:
self.out_channels = high_in_channels
self.pool = nn.MaxPool2D(scale)
self.f_key = layers.ConvBNReLU(
in_channels=low_in_channels,
out_channels=key_channels,
kernel_size=1)
self.f_query = layers.ConvBNReLU(
in_channels=high_in_channels,
out_channels=key_channels,
kernel_size=1)
self.f_value = nn.Conv2D(
in_channels=low_in_channels,
out_channels=value_channels,
kernel_size=1)
self.W = nn.Conv2D(
in_channels=value_channels,
out_channels=out_channels,
kernel_size=1)
self.psp_size = psp_size
def forward(self, low_feats: List[paddle.Tensor], high_feats: List[paddle.Tensor]) -> paddle.Tensor:
batch_size, _, h, w = high_feats.shape
value = self.f_value(low_feats)
value = _pp_module(value, self.psp_size)
value = paddle.transpose(value, (0, 2, 1))
query = self.f_query(high_feats)
query = paddle.reshape(query, shape=(0, self.key_channels, -1))
query = paddle.transpose(query, perm=(0, 2, 1))
key = self.f_key(low_feats)
key = _pp_module(key, self.psp_size)
sim_map = paddle.matmul(query, key)
sim_map = (self.key_channels**-.5) * sim_map
sim_map = F.softmax(sim_map, axis=-1)
context = paddle.matmul(sim_map, value)
context = paddle.transpose(context, perm=(0, 2, 1))
hf_shape = paddle.shape(high_feats)
context = paddle.reshape(
context, shape=[0, self.value_channels, hf_shape[2], hf_shape[3]])
context = self.W(context)
return context
class SelfAttentionBlock_APNB(nn.Layer):
"""
Self-Attention Block for APNB module.
Args:
in_channels (int): The input channels of APNB module.
out_channels (int): The out channels of APNB module.
key_channels (int): The key channels in self-attention block.
value_channels (int): The value channels in self-attention block.
scale (int, optional): Pooling size. Default: 1.
psp_size (tuple, optional): The out size of pooled feature maps. Default: (1, 3, 6, 8).
"""
def __init__(self,
in_channels: int,
out_channels: int,
key_channels: int,
value_channels: int,
scale: int = 1,
psp_size: Tuple[int] = (1, 3, 6, 8)):
super().__init__()
self.scale = scale
self.in_channels = in_channels
self.out_channels = out_channels
self.key_channels = key_channels
self.value_channels = value_channels
self.pool = nn.MaxPool2D(scale)
self.f_key = layers.ConvBNReLU(
in_channels=self.in_channels,
out_channels=self.key_channels,
kernel_size=1)
self.f_query = self.f_key
self.f_value = nn.Conv2D(
in_channels=self.in_channels,
out_channels=self.value_channels,
kernel_size=1)
self.W = nn.Conv2D(
in_channels=self.value_channels,
out_channels=self.out_channels,
kernel_size=1)
self.psp_size = psp_size
def forward(self, x: paddle.Tensor) -> paddle.Tensor:
batch_size, _, h, w = x.shape
if self.scale > 1:
x = self.pool(x)
value = self.f_value(x)
value = _pp_module(value, self.psp_size)
value = paddle.transpose(value, perm=(0, 2, 1))
query = self.f_query(x)
query = paddle.reshape(query, shape=(0, self.key_channels, -1))
query = paddle.transpose(query, perm=(0, 2, 1))
key = self.f_key(x)
key = _pp_module(key, self.psp_size)
sim_map = paddle.matmul(query, key)
sim_map = (self.key_channels**-.5) * sim_map
sim_map = F.softmax(sim_map, axis=-1)
context = paddle.matmul(sim_map, value)
context = paddle.transpose(context, perm=(0, 2, 1))
x_shape = paddle.shape(x)
context = paddle.reshape(
context, shape=[0, self.value_channels, x_shape[2], x_shape[3]])
context = self.W(context)
return context
# copyright (c) 2022 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Union, List, Tuple
import paddle
import paddle.nn as nn
import paddle.nn.functional as F
import ann_resnet50_cityscapes.layers as layers
class ConvBNLayer(nn.Layer):
def __init__(self,
in_channels: int,
out_channels: int,
kernel_size: int,
stride: int = 1,
dilation: int = 1,
groups: int = 1,
is_vd_mode: bool = False,
act: str = None,
data_format: str = 'NCHW'):
super(ConvBNLayer, self).__init__()
if dilation != 1 and kernel_size != 3:
raise RuntimeError("When the dilation isn't 1," \
"the kernel_size should be 3.")
self.is_vd_mode = is_vd_mode
self._pool2d_avg = nn.AvgPool2D(
kernel_size=2,
stride=2,
padding=0,
ceil_mode=True,
data_format=data_format)
self._conv = nn.Conv2D(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=kernel_size,
stride=stride,
padding=(kernel_size - 1) // 2 \
if dilation == 1 else dilation,
dilation=dilation,
groups=groups,
bias_attr=False,
data_format=data_format)
self._batch_norm = layers.SyncBatchNorm(
out_channels, data_format=data_format)
self._act_op = layers.Activation(act=act)
def forward(self, inputs: paddle.Tensor) -> paddle.Tensor:
if self.is_vd_mode:
inputs = self._pool2d_avg(inputs)
y = self._conv(inputs)
y = self._batch_norm(y)
y = self._act_op(y)
return y
class BottleneckBlock(nn.Layer):
def __init__(self,
in_channels: int,
out_channels: int,
stride: int,
shortcut: bool = True,
if_first: bool = False,
dilation: int = 1,
data_format: str = 'NCHW'):
super(BottleneckBlock, self).__init__()
self.data_format = data_format
self.conv0 = ConvBNLayer(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=1,
act='relu',
data_format=data_format)
self.dilation = dilation
self.conv1 = ConvBNLayer(
in_channels=out_channels,
out_channels=out_channels,
kernel_size=3,
stride=stride,
act='relu',
dilation=dilation,
data_format=data_format)
self.conv2 = ConvBNLayer(
in_channels=out_channels,
out_channels=out_channels * 4,
kernel_size=1,
act=None,
data_format=data_format)
if not shortcut:
self.short = ConvBNLayer(
in_channels=in_channels,
out_channels=out_channels * 4,
kernel_size=1,
stride=1,
is_vd_mode=False if if_first or stride == 1 else True,
data_format=data_format)
self.shortcut = shortcut
# NOTE: Use the wrap layer for quantization training
self.add = layers.Add()
self.relu = layers.Activation(act="relu")
def forward(self, inputs: paddle.Tensor) -> paddle.Tensor:
y = self.conv0(inputs)
conv1 = self.conv1(y)
conv2 = self.conv2(conv1)
if self.shortcut:
short = inputs
else:
short = self.short(inputs)
y = self.add(short, conv2)
y = self.relu(y)
return y
class BasicBlock(nn.Layer):
def __init__(self,
in_channels: int,
out_channels: int,
stride: int,
dilation: int = 1,
shortcut: bool = True,
if_first: bool = False,
data_format: str = 'NCHW'):
super(BasicBlock, self).__init__()
self.conv0 = ConvBNLayer(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=3,
stride=stride,
dilation=dilation,
act='relu',
data_format=data_format)
self.conv1 = ConvBNLayer(
in_channels=out_channels,
out_channels=out_channels,
kernel_size=3,
dilation=dilation,
act=None,
data_format=data_format)
if not shortcut:
self.short = ConvBNLayer(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=1,
stride=1,
is_vd_mode=False if if_first or stride == 1 else True,
data_format=data_format)
self.shortcut = shortcut
self.dilation = dilation
self.data_format = data_format
self.add = layers.Add()
self.relu = layers.Activation(act="relu")
def forward(self, inputs: paddle.Tensor) -> paddle.Tensor:
y = self.conv0(inputs)
conv1 = self.conv1(y)
if self.shortcut:
short = inputs
else:
short = self.short(inputs)
y = self.add(short, conv1)
y = self.relu(y)
return y
class ResNet_vd(nn.Layer):
"""
The ResNet_vd implementation based on PaddlePaddle.
The original article refers to Jingdong
Tong He, et, al. "Bag of Tricks for Image Classification with Convolutional Neural Networks"
(https://arxiv.org/pdf/1812.01187.pdf).
Args:
layers (int, optional): The layers of ResNet_vd. The supported layers are (18, 34, 50, 101, 152, 200). Default: 50.
output_stride (int, optional): The stride of output features compared to input images. It is 8 or 16. Default: 8.
multi_grid (tuple|list, optional): The grid of stage4. Defult: (1, 1, 1).
pretrained (str, optional): The path of pretrained model.
"""
def __init__(self,
layers: int = 50,
output_stride: int = 8,
multi_grid: Tuple[int] = (1, 1, 1),
pretrained: str = None,
data_format: str = 'NCHW'):
super(ResNet_vd, self).__init__()
self.data_format = data_format
self.conv1_logit = None # for gscnn shape stream
self.layers = layers
supported_layers = [18, 34, 50, 101, 152, 200]
assert layers in supported_layers, \
"supported layers are {} but input layer is {}".format(
supported_layers, layers)
if layers == 18:
depth = [2, 2, 2, 2]
elif layers == 34 or layers == 50:
depth = [3, 4, 6, 3]
elif layers == 101:
depth = [3, 4, 23, 3]
elif layers == 152:
depth = [3, 8, 36, 3]
elif layers == 200:
depth = [3, 12, 48, 3]
num_channels = [64, 256, 512, 1024
] if layers >= 50 else [64, 64, 128, 256]
num_filters = [64, 128, 256, 512]
# for channels of four returned stages
self.feat_channels = [c * 4 for c in num_filters
] if layers >= 50 else num_filters
dilation_dict = None
if output_stride == 8:
dilation_dict = {2: 2, 3: 4}
elif output_stride == 16:
dilation_dict = {3: 2}
self.conv1_1 = ConvBNLayer(
in_channels=3,
out_channels=32,
kernel_size=3,
stride=2,
act='relu',
data_format=data_format)
self.conv1_2 = ConvBNLayer(
in_channels=32,
out_channels=32,
kernel_size=3,
stride=1,
act='relu',
data_format=data_format)
self.conv1_3 = ConvBNLayer(
in_channels=32,
out_channels=64,
kernel_size=3,
stride=1,
act='relu',
data_format=data_format)
self.pool2d_max = nn.MaxPool2D(
kernel_size=3, stride=2, padding=1, data_format=data_format)
# self.block_list = []
self.stage_list = []
if layers >= 50:
for block in range(len(depth)):
shortcut = False
block_list = []
for i in range(depth[block]):
if layers in [101, 152] and block == 2:
if i == 0:
conv_name = "res" + str(block + 2) + "a"
else:
conv_name = "res" + str(block + 2) + "b" + str(i)
else:
conv_name = "res" + str(block + 2) + chr(97 + i)
###############################################################################
# Add dilation rate for some segmentation tasks, if dilation_dict is not None.
dilation_rate = dilation_dict[
block] if dilation_dict and block in dilation_dict else 1
# Actually block here is 'stage', and i is 'block' in 'stage'
# At the stage 4, expand the the dilation_rate if given multi_grid
if block == 3:
dilation_rate = dilation_rate * multi_grid[i]
###############################################################################
bottleneck_block = self.add_sublayer(
'bb_%d_%d' % (block, i),
BottleneckBlock(
in_channels=num_channels[block]
if i == 0 else num_filters[block] * 4,
out_channels=num_filters[block],
stride=2 if i == 0 and block != 0
and dilation_rate == 1 else 1,
shortcut=shortcut,
if_first=block == i == 0,
dilation=dilation_rate,
data_format=data_format))
block_list.append(bottleneck_block)
shortcut = True
self.stage_list.append(block_list)
else:
for block in range(len(depth)):
shortcut = False
block_list = []
for i in range(depth[block]):
dilation_rate = dilation_dict[block] \
if dilation_dict and block in dilation_dict else 1
if block == 3:
dilation_rate = dilation_rate * multi_grid[i]
basic_block = self.add_sublayer(
'bb_%d_%d' % (block, i),
BasicBlock(
in_channels=num_channels[block]
if i == 0 else num_filters[block],
out_channels=num_filters[block],
stride=2 if i == 0 and block != 0 \
and dilation_rate == 1 else 1,
dilation=dilation_rate,
shortcut=shortcut,
if_first=block == i == 0,
data_format=data_format))
block_list.append(basic_block)
shortcut = True
self.stage_list.append(block_list)
self.pretrained = pretrained
def forward(self, inputs: paddle.Tensor) -> List[paddle.Tensor]:
y = self.conv1_1(inputs)
y = self.conv1_2(y)
y = self.conv1_3(y)
self.conv1_logit = y.clone()
y = self.pool2d_max(y)
# A feature list saves the output feature map of each stage.
feat_list = []
for stage in self.stage_list:
for block in stage:
y = block(y)
feat_list.append(y)
return feat_list
def ResNet50_vd(**args):
model = ResNet_vd(layers=50, **args)
return model
\ No newline at end of file
# ann_resnet50_voc
|模型名称|ann_resnet50_voc|
| :--- | :---: |
|类别|图像-图像分割|
|网络|ann_resnet50vd|
|数据集|PascalVOC2012|
|是否支持Fine-tuning|是|
|模型大小|228MB|
|指标|-|
|最新更新日期|2022-03-21|
## 一、模型基本信息
- 样例结果示例:
<p align="center">
<img src="https://user-images.githubusercontent.com/35907364/159212097-443a5a65-2f2e-4126-9c07-d7c3c220e55f.jpg" width = "420" height = "505" hspace='10'/> <img src="https://user-images.githubusercontent.com/35907364/159212375-52e123af-4699-4c25-8f50-4240bbb714b4.png" width = "420" height = "505" hspace='10'/>
</p>
- ### 模型介绍
- 本示例将展示如何使用PaddleHub对预训练模型进行finetune并完成预测任务。
- 更多详情请参考:[ann](https://arxiv.org/pdf/1908.07678.pdf)
## 二、安装
- ### 1、环境依赖
- paddlepaddle >= 2.0.0
- paddlehub >= 2.0.0
- ### 2、安装
- ```shell
$ hub install ann_resnet50_voc
```
- 如您安装时遇到问题,可参考:[零基础windows安装](../../../../docs/docs_ch/get_start/windows_quickstart.md)
| [零基础Linux安装](../../../../docs/docs_ch/get_start/linux_quickstart.md) | [零基础MacOS安装](../../../../docs/docs_ch/get_start/mac_quickstart.md)
## 三、模型API预测
- ### 1.预测代码示例
- ```python
import cv2
import paddle
import paddlehub as hub
if __name__ == '__main__':
model = hub.Module(name='ann_resnet50_voc')
img = cv2.imread("/PATH/TO/IMAGE")
result = model.predict(images=[img], visualization=True)
```
- ### 2.如何开始Fine-tune
- 在完成安装PaddlePaddle与PaddleHub后,通过执行`python train.py`即可开始使用ann_resnet50_voc模型对OpticDiscSeg数据集进行Fine-tune。 `train.py`内容如下:
- 代码步骤
- Step1: 定义数据预处理方式
- ```python
from paddlehub.vision.segmentation_transforms import Compose, Resize, Normalize
transform = Compose([Resize(target_size=(512, 512)), Normalize()])
```
- `segmentation_transforms` 数据增强模块定义了丰富的针对图像分割数据的预处理方式,用户可按照需求替换自己需要的数据预处理方式。
- Step2: 下载数据集并使用
- ```python
from paddlehub.datasets import OpticDiscSeg
train_reader = OpticDiscSeg(transform, mode='train')
```
- `transforms`: 数据预处理方式。
- `mode`: `mode`: 选择数据模式,可选项有 `train`, `test`, `val`, 默认为`train`。
- 数据集的准备代码可以参考 [opticdiscseg.py](../../paddlehub/datasets/opticdiscseg.py)。`hub.datasets.OpticDiscSeg()`会自动从网络下载数据集并解压到用户目录下`$HOME/.paddlehub/dataset`目录。
- Step3: 加载预训练模型
- ```python
import paddlehub as hub
model = hub.Module(name='ann_resnet50_voc', num_classes=2, pretrained=None)
```
- `name`: 选择预训练模型的名字。
- `load_checkpoint`: 是否加载自己训练的模型,若为None,则加载提供的模型默认参数。
- Step4: 选择优化策略和运行配置
- ```python
import paddle
from paddlehub.finetune.trainer import Trainer
scheduler = paddle.optimizer.lr.PolynomialDecay(learning_rate=0.01, decay_steps=1000, power=0.9, end_lr=0.0001)
optimizer = paddle.optimizer.Adam(learning_rate=scheduler, parameters=model.parameters())
trainer = Trainer(model, optimizer, checkpoint_dir='test_ckpt_img_seg', use_gpu=True)
trainer.train(train_reader, epochs=10, batch_size=4, log_interval=10, save_interval=4)
```
- 模型预测
- 当完成Fine-tune后,Fine-tune过程在验证集上表现最优的模型会被保存在`${CHECKPOINT_DIR}/best_model`目录下,其中`${CHECKPOINT_DIR}`目录为Fine-tune时所选择的保存checkpoint的目录。我们使用该模型来进行预测。predict.py脚本如下:
```python
import paddle
import cv2
import paddlehub as hub
if __name__ == '__main__':
model = hub.Module(name='ann_resnet50_voc', pretrained='/PATH/TO/CHECKPOINT')
img = cv2.imread("/PATH/TO/IMAGE")
model.predict(images=[img], visualization=True)
```
- 参数配置正确后,请执行脚本`python predict.py`。
- **Args**
* `images`:原始图像路径或BGR格式图片;
* `visualization`: 是否可视化,默认为True;
* `save_path`: 保存结果的路径,默认保存路径为'seg_result'。
**NOTE:** 进行预测时,所选择的module,checkpoint_dir,dataset必须和Fine-tune所用的一样。
## 四、服务部署
- PaddleHub Serving可以部署一个在线图像分割服务。
- ### 第一步:启动PaddleHub Serving
- 运行启动命令:
- ```shell
$ hub serving start -m ann_resnet50_voc
```
- 这样就完成了一个图像分割服务化API的部署,默认端口号为8866。
- **NOTE:** 如使用GPU预测,则需要在启动服务之前,请设置CUDA_VISIBLE_DEVICES环境变量,否则不用设置。
- ### 第二步:发送预测请求
- 配置好服务端,以下数行代码即可实现发送预测请求,获取预测结果
```python
import requests
import json
import cv2
import base64
import numpy as np
def cv2_to_base64(image):
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tostring()).decode('utf8')
def base64_to_cv2(b64str):
data = base64.b64decode(b64str.encode('utf8'))
data = np.fromstring(data, np.uint8)
data = cv2.imdecode(data, cv2.IMREAD_COLOR)
return data
# 发送HTTP请求
org_im = cv2.imread('/PATH/TO/IMAGE')
data = {'images':[cv2_to_base64(org_im)]}
headers = {"Content-type": "application/json"}
url = "http://127.0.0.1:8866/predict/ginet_resnet50vd_voc"
r = requests.post(url=url, headers=headers, data=json.dumps(data))
mask = base64_to_cv2(r.json()["results"][0])
```
## 五、更新历史
* 1.0.0
初始发布
# ann_resnet50_voc
|Module Name|ann_resnet50_voc|
| :--- | :---: |
|Category|Image Segmentation|
|Network|ann_resnet50vd|
|Dataset|PascalVOC2012|
|Fine-tuning supported or not|Yes|
|Module Size|228MB|
|Data indicators|-|
|Latest update date|2022-03-22|
## I. Basic Information
- ### Application Effect Display
- Sample results:
<p align="center">
<img src="https://user-images.githubusercontent.com/35907364/159212097-443a5a65-2f2e-4126-9c07-d7c3c220e55f.jpg" width = "420" height = "505" hspace='10'/> <img src="https://user-images.githubusercontent.com/35907364/159212375-52e123af-4699-4c25-8f50-4240bbb714b4.png" width = "420" height = "505" hspace='10'/>
</p>
- ### Module Introduction
- We will show how to use PaddleHub to finetune the pre-trained model and complete the prediction.
- For more information, please refer to: [ann](https://arxiv.org/pdf/1908.07678.pdf)
## II. Installation
- ### 1、Environmental Dependence
- paddlepaddle >= 2.0.0
- paddlehub >= 2.0.0
- ### 2、Installation
- ```shell
$ hub install ann_resnet50_voc
```
- In case of any problems during installation, please refer to:[Windows_Quickstart](../../../../docs/docs_en/get_start/windows_quickstart.md)
| [Linux_Quickstart](../../../../docs/docs_en/get_start/linux_quickstart.md) | [Mac_Quickstart](../../../../docs/docs_en/get_start/mac_quickstart.md)
## III. Module API Prediction
- ### 1、Prediction Code Example
- ```python
import cv2
import paddle
import paddlehub as hub
if __name__ == '__main__':
model = hub.Module(name='ann_resnet50_voc')
img = cv2.imread("/PATH/TO/IMAGE")
result = model.predict(images=[img], visualization=True)
```
- ### 2.Fine-tune and Encapsulation
- After completing the installation of PaddlePaddle and PaddleHub, you can start using the ann_resnet50_voc model to fine-tune datasets such as OpticDiscSeg.
- Steps:
- Step1: Define the data preprocessing method
- ```python
from paddlehub.vision.segmentation_transforms import Compose, Resize, Normalize
transform = Compose([Resize(target_size=(512, 512)), Normalize()])
```
- `segmentation_transforms`: The data enhancement module defines lots of data preprocessing methods. Users can replace the data preprocessing methods according to their needs.
- Step2: Download the dataset
- ```python
from paddlehub.datasets import OpticDiscSeg
train_reader = OpticDiscSeg(transform, mode='train')
```
* `transforms`: data preprocessing methods.
* `mode`: Select the data mode, the options are `train`, `test`, `val`. Default is `train`.
* Dataset preparation can be referred to [opticdiscseg.py](../../paddlehub/datasets/opticdiscseg.py)。`hub.datasets.OpticDiscSeg()`will be automatically downloaded from the network and decompressed to the `$HOME/.paddlehub/dataset` directory under the user directory.
- Step3: Load the pre-trained model
- ```python
import paddlehub as hub
model = hub.Module(name='ann_resnet50_voc', num_classes=2, pretrained=None)
```
- `name`: model name.
- `load_checkpoint`: Whether to load the self-trained model, if it is None, load the provided parameters.
- Step4: Optimization strategy
- ```python
import paddle
from paddlehub.finetune.trainer import Trainer
scheduler = paddle.optimizer.lr.PolynomialDecay(learning_rate=0.01, decay_steps=1000, power=0.9, end_lr=0.0001)
optimizer = paddle.optimizer.Adam(learning_rate=scheduler, parameters=model.parameters())
trainer = Trainer(model, optimizer, checkpoint_dir='test_ckpt_img_seg', use_gpu=True)
trainer.train(train_reader, epochs=10, batch_size=4, log_interval=10, save_interval=4)
```
- Model prediction
- When Fine-tune is completed, the model with the best performance on the verification set will be saved in the `${CHECKPOINT_DIR}/best_model` directory. We use this model to make predictions. The `predict.py` script is as follows:
```python
import paddle
import cv2
import paddlehub as hub
if __name__ == '__main__':
model = hub.Module(name='ann_resnet50_voc', pretrained='/PATH/TO/CHECKPOINT')
img = cv2.imread("/PATH/TO/IMAGE")
model.predict(images=[img], visualization=True)
```
- **Args**
* `images`: Image path or ndarray data with format [H, W, C], BGR.
* `visualization`: Whether to save the recognition results as picture files.
* `save_path`: Save path of the result, default is 'seg_result'.
## IV. Server Deployment
- PaddleHub Serving can deploy an online service of image segmentation.
- ### Step 1: Start PaddleHub Serving
- Run the startup command:
- ```shell
$ hub serving start -m ann_resnet50_voc
```
- The servitization API is now deployed and the default port number is 8866.
- **NOTE:** If GPU is used for prediction, set CUDA_VISIBLE_DEVICES environment variable before the service, otherwise it need not be set.
- ### Step 2: Send a predictive request
- With a configured server, use the following lines of code to send the prediction request and obtain the result:
```python
import requests
import json
import cv2
import base64
import numpy as np
def cv2_to_base64(image):
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tostring()).decode('utf8')
def base64_to_cv2(b64str):
data = base64.b64decode(b64str.encode('utf8'))
data = np.fromstring(data, np.uint8)
data = cv2.imdecode(data, cv2.IMREAD_COLOR)
return data
org_im = cv2.imread('/PATH/TO/IMAGE')
data = {'images':[cv2_to_base64(org_im)]}
headers = {"Content-type": "application/json"}
url = "http://127.0.0.1:8866/predict/ann_resnet50_voc"
r = requests.post(url=url, headers=headers, data=json.dumps(data))
mask = base64_to_cv2(r.json()["results"][0])
```
## V. Release Note
- 1.0.0
First release
# copyright (c) 2021 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Union, List, Tuple
import paddle
import paddle.nn as nn
import paddle.nn.functional as F
from paddle.nn.layer import activation
from paddle.nn import Conv2D, AvgPool2D
def SyncBatchNorm(*args, **kwargs):
"""In cpu environment nn.SyncBatchNorm does not have kernel so use nn.BatchNorm2D instead"""
if paddle.get_device() == 'cpu':
return nn.BatchNorm2D(*args, **kwargs)
else:
return nn.SyncBatchNorm(*args, **kwargs)
class SeparableConvBNReLU(nn.Layer):
"""Depthwise Separable Convolution."""
def __init__(self,
in_channels: int,
out_channels: int,
kernel_size: int,
padding: str = 'same',
**kwargs: dict):
super(SeparableConvBNReLU, self).__init__()
self.depthwise_conv = ConvBN(
in_channels,
out_channels=in_channels,
kernel_size=kernel_size,
padding=padding,
groups=in_channels,
**kwargs)
self.piontwise_conv = ConvBNReLU(
in_channels, out_channels, kernel_size=1, groups=1)
def forward(self, x: paddle.Tensor) -> paddle.Tensor:
x = self.depthwise_conv(x)
x = self.piontwise_conv(x)
return x
class ConvBN(nn.Layer):
"""Basic conv bn layer"""
def __init__(self,
in_channels: int,
out_channels: int,
kernel_size: int,
padding: str = 'same',
**kwargs: dict):
super(ConvBN, self).__init__()
self._conv = Conv2D(
in_channels, out_channels, kernel_size, padding=padding, **kwargs)
self._batch_norm = SyncBatchNorm(out_channels)
def forward(self, x: paddle.Tensor) -> paddle.Tensor:
x = self._conv(x)
x = self._batch_norm(x)
return x
class ConvBNReLU(nn.Layer):
"""Basic conv bn relu layer."""
def __init__(self,
in_channels: int,
out_channels: int,
kernel_size: int,
padding: str = 'same',
**kwargs: dict):
super(ConvBNReLU, self).__init__()
self._conv = Conv2D(
in_channels, out_channels, kernel_size, padding=padding, **kwargs)
self._batch_norm = SyncBatchNorm(out_channels)
def forward(self, x: paddle.Tensor) -> paddle.Tensor:
x = self._conv(x)
x = self._batch_norm(x)
x = F.relu(x)
return x
class Activation(nn.Layer):
"""
The wrapper of activations.
Args:
act (str, optional): The activation name in lowercase. It must be one of ['elu', 'gelu',
'hardshrink', 'tanh', 'hardtanh', 'prelu', 'relu', 'relu6', 'selu', 'leakyrelu', 'sigmoid',
'softmax', 'softplus', 'softshrink', 'softsign', 'tanhshrink', 'logsigmoid', 'logsoftmax',
'hsigmoid']. Default: None, means identical transformation.
Returns:
A callable object of Activation.
Raises:
KeyError: When parameter `act` is not in the optional range.
Examples:
from paddleseg.models.common.activation import Activation
relu = Activation("relu")
print(relu)
# <class 'paddle.nn.layer.activation.ReLU'>
sigmoid = Activation("sigmoid")
print(sigmoid)
# <class 'paddle.nn.layer.activation.Sigmoid'>
not_exit_one = Activation("not_exit_one")
# KeyError: "not_exit_one does not exist in the current dict_keys(['elu', 'gelu', 'hardshrink',
# 'tanh', 'hardtanh', 'prelu', 'relu', 'relu6', 'selu', 'leakyrelu', 'sigmoid', 'softmax',
# 'softplus', 'softshrink', 'softsign', 'tanhshrink', 'logsigmoid', 'logsoftmax', 'hsigmoid'])"
"""
def __init__(self, act: str = None):
super(Activation, self).__init__()
self._act = act
upper_act_names = activation.__dict__.keys()
lower_act_names = [act.lower() for act in upper_act_names]
act_dict = dict(zip(lower_act_names, upper_act_names))
if act is not None:
if act in act_dict.keys():
act_name = act_dict[act]
self.act_func = eval("activation.{}()".format(act_name))
else:
raise KeyError("{} does not exist in the current {}".format(
act, act_dict.keys()))
def forward(self, x: paddle.Tensor) -> paddle.Tensor:
if self._act is not None:
return self.act_func(x)
else:
return x
class ASPPModule(nn.Layer):
"""
Atrous Spatial Pyramid Pooling.
Args:
aspp_ratios (tuple): The dilation rate using in ASSP module.
in_channels (int): The number of input channels.
out_channels (int): The number of output channels.
align_corners (bool): An argument of F.interpolate. It should be set to False when the output size of feature
is even, e.g. 1024x512, otherwise it is True, e.g. 769x769.
use_sep_conv (bool, optional): If using separable conv in ASPP module. Default: False.
image_pooling (bool, optional): If augmented with image-level features. Default: False
"""
def __init__(self,
aspp_ratios: tuple,
in_channels: int,
out_channels: int,
align_corners: bool,
use_sep_conv: bool = False,
image_pooling: bool = False):
super().__init__()
self.align_corners = align_corners
self.aspp_blocks = nn.LayerList()
for ratio in aspp_ratios:
if use_sep_conv and ratio > 1:
conv_func = SeparableConvBNReLU
else:
conv_func = ConvBNReLU
block = conv_func(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=1 if ratio == 1 else 3,
dilation=ratio,
padding=0 if ratio == 1 else ratio)
self.aspp_blocks.append(block)
out_size = len(self.aspp_blocks)
if image_pooling:
self.global_avg_pool = nn.Sequential(
nn.AdaptiveAvgPool2D(output_size=(1, 1)),
ConvBNReLU(in_channels, out_channels, kernel_size=1, bias_attr=False))
out_size += 1
self.image_pooling = image_pooling
self.conv_bn_relu = ConvBNReLU(
in_channels=out_channels * out_size,
out_channels=out_channels,
kernel_size=1)
self.dropout = nn.Dropout(p=0.1) # drop rate
def forward(self, x: paddle.Tensor) -> paddle.Tensor:
outputs = []
for block in self.aspp_blocks:
y = block(x)
y = F.interpolate(
y,
x.shape[2:],
mode='bilinear',
align_corners=self.align_corners)
outputs.append(y)
if self.image_pooling:
img_avg = self.global_avg_pool(x)
img_avg = F.interpolate(
img_avg,
x.shape[2:],
mode='bilinear',
align_corners=self.align_corners)
outputs.append(img_avg)
x = paddle.concat(outputs, axis=1)
x = self.conv_bn_relu(x)
x = self.dropout(x)
return x
class AuxLayer(nn.Layer):
"""
The auxiliary layer implementation for auxiliary loss.
Args:
in_channels (int): The number of input channels.
inter_channels (int): The intermediate channels.
out_channels (int): The number of output channels, and usually it is num_classes.
dropout_prob (float, optional): The drop rate. Default: 0.1.
"""
def __init__(self,
in_channels: int,
inter_channels: int,
out_channels: int,
dropout_prob: float = 0.1,
**kwargs):
super().__init__()
self.conv_bn_relu = ConvBNReLU(
in_channels=in_channels,
out_channels=inter_channels,
kernel_size=3,
padding=1,
**kwargs)
self.dropout = nn.Dropout(p=dropout_prob)
self.conv = nn.Conv2D(
in_channels=inter_channels,
out_channels=out_channels,
kernel_size=1)
def forward(self, x: paddle.Tensor) -> paddle.Tensor:
x = self.conv_bn_relu(x)
x = self.dropout(x)
x = self.conv(x)
return x
class Add(nn.Layer):
def __init__(self):
super().__init__()
def forward(self, x: paddle.Tensor, y: paddle.Tensor, name: str = None) -> paddle.Tensor:
return paddle.add(x, y, name)
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from typing import Union, List, Tuple
import numpy as np
import paddle
import paddle.nn as nn
import paddle.nn.functional as F
from paddlehub.module.module import moduleinfo
import paddlehub.vision.segmentation_transforms as T
from paddlehub.module.cv_module import ImageSegmentationModule
from ann_resnet50_voc.resnet import ResNet50_vd
import ann_resnet50_voc.layers as layers
@moduleinfo(
name="ann_resnet50_voc",
type="CV/semantic_segmentation",
author="paddlepaddle",
author_email="",
summary="ANNResnet50 is a segmentation model.",
version="1.0.0",
meta=ImageSegmentationModule)
class ANN(nn.Layer):
"""
The ANN implementation based on PaddlePaddle.
The original article refers to
Zhen, Zhu, et al. "Asymmetric Non-local Neural Networks for Semantic Segmentation"
(https://arxiv.org/pdf/1908.07678.pdf).
Args:
num_classes (int): The unique number of target classes.
backbone_indices (tuple, optional): Two values in the tuple indicate the indices of output of backbone.
key_value_channels (int, optional): The key and value channels of self-attention map in both AFNB and APNB modules.
Default: 256.
inter_channels (int, optional): Both input and output channels of APNB modules. Default: 512.
psp_size (tuple, optional): The out size of pooled feature maps. Default: (1, 3, 6, 8).
enable_auxiliary_loss (bool, optional): A bool value indicates whether adding auxiliary loss. Default: True.
align_corners (bool, optional): An argument of F.interpolate. It should be set to False when the feature size is even,
e.g. 1024x512, otherwise it is True, e.g. 769x769. Default: False.
pretrained (str, optional): The path or url of pretrained model. Default: None.
"""
def __init__(self,
num_classes: int = 21,
backbone_indices: Tuple[int] = (2, 3),
key_value_channels: int = 256,
inter_channels: int = 512,
psp_size: Tuple[int] = (1, 3, 6, 8),
align_corners: bool = False,
pretrained: str = None):
super(ANN, self).__init__()
self.backbone = ResNet50_vd()
backbone_channels = [
self.backbone.feat_channels[i] for i in backbone_indices
]
self.head = ANNHead(num_classes, backbone_indices, backbone_channels,
key_value_channels, inter_channels, psp_size)
self.align_corners = align_corners
self.transforms = T.Compose([T.Normalize()])
if pretrained is not None:
model_dict = paddle.load(pretrained)
self.set_dict(model_dict)
print("load custom parameters success")
else:
checkpoint = os.path.join(self.directory, 'model.pdparams')
model_dict = paddle.load(checkpoint)
self.set_dict(model_dict)
print("load pretrained parameters success")
def transform(self, img: Union[np.ndarray, str]) -> Union[np.ndarray, str]:
return self.transforms(img)
def forward(self, x: paddle.Tensor) -> List[paddle.Tensor]:
feat_list = self.backbone(x)
logit_list = self.head(feat_list)
return [
F.interpolate(
logit,
paddle.shape(x)[2:],
mode='bilinear',
align_corners=self.align_corners) for logit in logit_list
]
class ANNHead(nn.Layer):
"""
The ANNHead implementation.
It mainly consists of AFNB and APNB modules.
Args:
num_classes (int): The unique number of target classes.
backbone_indices (tuple): Two values in the tuple indicate the indices of output of backbone.
The first index will be taken as low-level features; the second one will be
taken as high-level features in AFNB module. Usually backbone consists of four
downsampling stage, such as ResNet, and return an output of each stage. If it is (2, 3),
it means taking feature map of the third stage and the fourth stage in backbone.
backbone_channels (tuple): The same length with "backbone_indices". It indicates the channels of corresponding index.
key_value_channels (int): The key and value channels of self-attention map in both AFNB and APNB modules.
inter_channels (int): Both input and output channels of APNB modules.
psp_size (tuple): The out size of pooled feature maps.
enable_auxiliary_loss (bool, optional): A bool value indicates whether adding auxiliary loss. Default: False
"""
def __init__(self,
num_classes: int,
backbone_indices: Tuple[int],
backbone_channels: Tuple[int],
key_value_channels: int,
inter_channels: int,
psp_size: Tuple[int],
enable_auxiliary_loss: bool = False):
super().__init__()
low_in_channels = backbone_channels[0]
high_in_channels = backbone_channels[1]
self.fusion = AFNB(
low_in_channels=low_in_channels,
high_in_channels=high_in_channels,
out_channels=high_in_channels,
key_channels=key_value_channels,
value_channels=key_value_channels,
dropout_prob=0.05,
repeat_sizes=([1]),
psp_size=psp_size)
self.context = nn.Sequential(
layers.ConvBNReLU(
in_channels=high_in_channels,
out_channels=inter_channels,
kernel_size=3,
padding=1),
APNB(
in_channels=inter_channels,
out_channels=inter_channels,
key_channels=key_value_channels,
value_channels=key_value_channels,
dropout_prob=0.05,
repeat_sizes=([1]),
psp_size=psp_size))
self.cls = nn.Conv2D(
in_channels=inter_channels, out_channels=num_classes, kernel_size=1)
self.auxlayer = layers.AuxLayer(
in_channels=low_in_channels,
inter_channels=low_in_channels // 2,
out_channels=num_classes,
dropout_prob=0.05)
self.backbone_indices = backbone_indices
self.enable_auxiliary_loss = enable_auxiliary_loss
def forward(self, feat_list: List[paddle.Tensor]) -> List[paddle.Tensor]:
logit_list = []
low_level_x = feat_list[self.backbone_indices[0]]
high_level_x = feat_list[self.backbone_indices[1]]
x = self.fusion(low_level_x, high_level_x)
x = self.context(x)
logit = self.cls(x)
logit_list.append(logit)
if self.enable_auxiliary_loss:
auxiliary_logit = self.auxlayer(low_level_x)
logit_list.append(auxiliary_logit)
return logit_list
class AFNB(nn.Layer):
"""
Asymmetric Fusion Non-local Block.
Args:
low_in_channels (int): Low-level-feature channels.
high_in_channels (int): High-level-feature channels.
out_channels (int): Out channels of AFNB module.
key_channels (int): The key channels in self-attention block.
value_channels (int): The value channels in self-attention block.
dropout_prob (float): The dropout rate of output.
repeat_sizes (tuple, optional): The number of AFNB modules. Default: ([1]).
psp_size (tuple. optional): The out size of pooled feature maps. Default: (1, 3, 6, 8).
"""
def __init__(self,
low_in_channels: int,
high_in_channels: int,
out_channels: int,
key_channels: int,
value_channels: int,
dropout_prob: float,
repeat_sizes: Tuple[int] = ([1]),
psp_size: Tuple[int] = (1, 3, 6, 8)):
super().__init__()
self.psp_size = psp_size
self.stages = nn.LayerList([
SelfAttentionBlock_AFNB(low_in_channels, high_in_channels,
key_channels, value_channels, out_channels,
size) for size in repeat_sizes
])
self.conv_bn = layers.ConvBN(
in_channels=out_channels + high_in_channels,
out_channels=out_channels,
kernel_size=1)
self.dropout = nn.Dropout(p=dropout_prob)
def forward(self, low_feats: List[paddle.Tensor], high_feats: List[paddle.Tensor]) -> paddle.Tensor:
priors = [stage(low_feats, high_feats) for stage in self.stages]
context = priors[0]
for i in range(1, len(priors)):
context += priors[i]
output = self.conv_bn(paddle.concat([context, high_feats], axis=1))
output = self.dropout(output)
return output
class APNB(nn.Layer):
"""
Asymmetric Pyramid Non-local Block.
Args:
in_channels (int): The input channels of APNB module.
out_channels (int): Out channels of APNB module.
key_channels (int): The key channels in self-attention block.
value_channels (int): The value channels in self-attention block.
dropout_prob (float): The dropout rate of output.
repeat_sizes (tuple, optional): The number of AFNB modules. Default: ([1]).
psp_size (tuple, optional): The out size of pooled feature maps. Default: (1, 3, 6, 8).
"""
def __init__(self,
in_channels: int,
out_channels: int,
key_channels: int,
value_channels: int,
dropout_prob: float,
repeat_sizes: Tuple[int] = ([1]),
psp_size: Tuple[int] = (1, 3, 6, 8)):
super().__init__()
self.psp_size = psp_size
self.stages = nn.LayerList([
SelfAttentionBlock_APNB(in_channels, out_channels, key_channels,
value_channels, size)
for size in repeat_sizes
])
self.conv_bn = layers.ConvBNReLU(
in_channels=in_channels * 2,
out_channels=out_channels,
kernel_size=1)
self.dropout = nn.Dropout(p=dropout_prob)
def forward(self, x: paddle.Tensor) -> paddle.Tensor:
priors = [stage(x) for stage in self.stages]
context = priors[0]
for i in range(1, len(priors)):
context += priors[i]
output = self.conv_bn(paddle.concat([context, x], axis=1))
output = self.dropout(output)
return output
def _pp_module(x: paddle.Tensor, psp_size: List[int]) -> paddle.Tensor:
n, c, h, w = x.shape
priors = []
for size in psp_size:
feat = F.adaptive_avg_pool2d(x, size)
feat = paddle.reshape(feat, shape=(0, c, -1))
priors.append(feat)
center = paddle.concat(priors, axis=-1)
return center
class SelfAttentionBlock_AFNB(nn.Layer):
"""
Self-Attention Block for AFNB module.
Args:
low_in_channels (int): Low-level-feature channels.
high_in_channels (int): High-level-feature channels.
key_channels (int): The key channels in self-attention block.
value_channels (int): The value channels in self-attention block.
out_channels (int, optional): Out channels of AFNB module. Default: None.
scale (int, optional): Pooling size. Default: 1.
psp_size (tuple, optional): The out size of pooled feature maps. Default: (1, 3, 6, 8).
"""
def __init__(self,
low_in_channels: int,
high_in_channels: int,
key_channels: int,
value_channels: int,
out_channels: int = None,
scale: int = 1,
psp_size: Tuple[int] = (1, 3, 6, 8)):
super().__init__()
self.scale = scale
self.in_channels = low_in_channels
self.out_channels = out_channels
self.key_channels = key_channels
self.value_channels = value_channels
if out_channels == None:
self.out_channels = high_in_channels
self.pool = nn.MaxPool2D(scale)
self.f_key = layers.ConvBNReLU(
in_channels=low_in_channels,
out_channels=key_channels,
kernel_size=1)
self.f_query = layers.ConvBNReLU(
in_channels=high_in_channels,
out_channels=key_channels,
kernel_size=1)
self.f_value = nn.Conv2D(
in_channels=low_in_channels,
out_channels=value_channels,
kernel_size=1)
self.W = nn.Conv2D(
in_channels=value_channels,
out_channels=out_channels,
kernel_size=1)
self.psp_size = psp_size
def forward(self, low_feats: List[paddle.Tensor], high_feats: List[paddle.Tensor]) -> paddle.Tensor:
batch_size, _, h, w = high_feats.shape
value = self.f_value(low_feats)
value = _pp_module(value, self.psp_size)
value = paddle.transpose(value, (0, 2, 1))
query = self.f_query(high_feats)
query = paddle.reshape(query, shape=(0, self.key_channels, -1))
query = paddle.transpose(query, perm=(0, 2, 1))
key = self.f_key(low_feats)
key = _pp_module(key, self.psp_size)
sim_map = paddle.matmul(query, key)
sim_map = (self.key_channels**-.5) * sim_map
sim_map = F.softmax(sim_map, axis=-1)
context = paddle.matmul(sim_map, value)
context = paddle.transpose(context, perm=(0, 2, 1))
hf_shape = paddle.shape(high_feats)
context = paddle.reshape(
context, shape=[0, self.value_channels, hf_shape[2], hf_shape[3]])
context = self.W(context)
return context
class SelfAttentionBlock_APNB(nn.Layer):
"""
Self-Attention Block for APNB module.
Args:
in_channels (int): The input channels of APNB module.
out_channels (int): The out channels of APNB module.
key_channels (int): The key channels in self-attention block.
value_channels (int): The value channels in self-attention block.
scale (int, optional): Pooling size. Default: 1.
psp_size (tuple, optional): The out size of pooled feature maps. Default: (1, 3, 6, 8).
"""
def __init__(self,
in_channels: int,
out_channels: int,
key_channels: int,
value_channels: int,
scale: int = 1,
psp_size: Tuple[int] = (1, 3, 6, 8)):
super().__init__()
self.scale = scale
self.in_channels = in_channels
self.out_channels = out_channels
self.key_channels = key_channels
self.value_channels = value_channels
self.pool = nn.MaxPool2D(scale)
self.f_key = layers.ConvBNReLU(
in_channels=self.in_channels,
out_channels=self.key_channels,
kernel_size=1)
self.f_query = self.f_key
self.f_value = nn.Conv2D(
in_channels=self.in_channels,
out_channels=self.value_channels,
kernel_size=1)
self.W = nn.Conv2D(
in_channels=self.value_channels,
out_channels=self.out_channels,
kernel_size=1)
self.psp_size = psp_size
def forward(self, x: paddle.Tensor) -> paddle.Tensor:
batch_size, _, h, w = x.shape
if self.scale > 1:
x = self.pool(x)
value = self.f_value(x)
value = _pp_module(value, self.psp_size)
value = paddle.transpose(value, perm=(0, 2, 1))
query = self.f_query(x)
query = paddle.reshape(query, shape=(0, self.key_channels, -1))
query = paddle.transpose(query, perm=(0, 2, 1))
key = self.f_key(x)
key = _pp_module(key, self.psp_size)
sim_map = paddle.matmul(query, key)
sim_map = (self.key_channels**-.5) * sim_map
sim_map = F.softmax(sim_map, axis=-1)
context = paddle.matmul(sim_map, value)
context = paddle.transpose(context, perm=(0, 2, 1))
x_shape = paddle.shape(x)
context = paddle.reshape(
context, shape=[0, self.value_channels, x_shape[2], x_shape[3]])
context = self.W(context)
return context
# copyright (c) 2022 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Union, List, Tuple
import paddle
import paddle.nn as nn
import paddle.nn.functional as F
import ann_resnet50_voc.layers as layers
class ConvBNLayer(nn.Layer):
def __init__(self,
in_channels: int,
out_channels: int,
kernel_size: int,
stride: int = 1,
dilation: int = 1,
groups: int = 1,
is_vd_mode: bool = False,
act: str = None,
data_format: str = 'NCHW'):
super(ConvBNLayer, self).__init__()
if dilation != 1 and kernel_size != 3:
raise RuntimeError("When the dilation isn't 1," \
"the kernel_size should be 3.")
self.is_vd_mode = is_vd_mode
self._pool2d_avg = nn.AvgPool2D(
kernel_size=2,
stride=2,
padding=0,
ceil_mode=True,
data_format=data_format)
self._conv = nn.Conv2D(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=kernel_size,
stride=stride,
padding=(kernel_size - 1) // 2 \
if dilation == 1 else dilation,
dilation=dilation,
groups=groups,
bias_attr=False,
data_format=data_format)
self._batch_norm = layers.SyncBatchNorm(
out_channels, data_format=data_format)
self._act_op = layers.Activation(act=act)
def forward(self, inputs: paddle.Tensor) -> paddle.Tensor:
if self.is_vd_mode:
inputs = self._pool2d_avg(inputs)
y = self._conv(inputs)
y = self._batch_norm(y)
y = self._act_op(y)
return y
class BottleneckBlock(nn.Layer):
def __init__(self,
in_channels: int,
out_channels: int,
stride: int,
shortcut: bool = True,
if_first: bool = False,
dilation: int = 1,
data_format: str = 'NCHW'):
super(BottleneckBlock, self).__init__()
self.data_format = data_format
self.conv0 = ConvBNLayer(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=1,
act='relu',
data_format=data_format)
self.dilation = dilation
self.conv1 = ConvBNLayer(
in_channels=out_channels,
out_channels=out_channels,
kernel_size=3,
stride=stride,
act='relu',
dilation=dilation,
data_format=data_format)
self.conv2 = ConvBNLayer(
in_channels=out_channels,
out_channels=out_channels * 4,
kernel_size=1,
act=None,
data_format=data_format)
if not shortcut:
self.short = ConvBNLayer(
in_channels=in_channels,
out_channels=out_channels * 4,
kernel_size=1,
stride=1,
is_vd_mode=False if if_first or stride == 1 else True,
data_format=data_format)
self.shortcut = shortcut
# NOTE: Use the wrap layer for quantization training
self.add = layers.Add()
self.relu = layers.Activation(act="relu")
def forward(self, inputs: paddle.Tensor) -> paddle.Tensor:
y = self.conv0(inputs)
conv1 = self.conv1(y)
conv2 = self.conv2(conv1)
if self.shortcut:
short = inputs
else:
short = self.short(inputs)
y = self.add(short, conv2)
y = self.relu(y)
return y
class BasicBlock(nn.Layer):
def __init__(self,
in_channels: int,
out_channels: int,
stride: int,
dilation: int = 1,
shortcut: bool = True,
if_first: bool = False,
data_format: str = 'NCHW'):
super(BasicBlock, self).__init__()
self.conv0 = ConvBNLayer(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=3,
stride=stride,
dilation=dilation,
act='relu',
data_format=data_format)
self.conv1 = ConvBNLayer(
in_channels=out_channels,
out_channels=out_channels,
kernel_size=3,
dilation=dilation,
act=None,
data_format=data_format)
if not shortcut:
self.short = ConvBNLayer(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=1,
stride=1,
is_vd_mode=False if if_first or stride == 1 else True,
data_format=data_format)
self.shortcut = shortcut
self.dilation = dilation
self.data_format = data_format
self.add = layers.Add()
self.relu = layers.Activation(act="relu")
def forward(self, inputs: paddle.Tensor) -> paddle.Tensor:
y = self.conv0(inputs)
conv1 = self.conv1(y)
if self.shortcut:
short = inputs
else:
short = self.short(inputs)
y = self.add(short, conv1)
y = self.relu(y)
return y
class ResNet_vd(nn.Layer):
"""
The ResNet_vd implementation based on PaddlePaddle.
The original article refers to Jingdong
Tong He, et, al. "Bag of Tricks for Image Classification with Convolutional Neural Networks"
(https://arxiv.org/pdf/1812.01187.pdf).
Args:
layers (int, optional): The layers of ResNet_vd. The supported layers are (18, 34, 50, 101, 152, 200). Default: 50.
output_stride (int, optional): The stride of output features compared to input images. It is 8 or 16. Default: 8.
multi_grid (tuple|list, optional): The grid of stage4. Defult: (1, 1, 1).
pretrained (str, optional): The path of pretrained model.
"""
def __init__(self,
layers: int = 50,
output_stride: int = 8,
multi_grid: Tuple[int]=(1, 1, 1),
pretrained: str = None,
data_format: str = 'NCHW'):
super(ResNet_vd, self).__init__()
self.data_format = data_format
self.conv1_logit = None # for gscnn shape stream
self.layers = layers
supported_layers = [18, 34, 50, 101, 152, 200]
assert layers in supported_layers, \
"supported layers are {} but input layer is {}".format(
supported_layers, layers)
if layers == 18:
depth = [2, 2, 2, 2]
elif layers == 34 or layers == 50:
depth = [3, 4, 6, 3]
elif layers == 101:
depth = [3, 4, 23, 3]
elif layers == 152:
depth = [3, 8, 36, 3]
elif layers == 200:
depth = [3, 12, 48, 3]
num_channels = [64, 256, 512, 1024
] if layers >= 50 else [64, 64, 128, 256]
num_filters = [64, 128, 256, 512]
# for channels of four returned stages
self.feat_channels = [c * 4 for c in num_filters
] if layers >= 50 else num_filters
dilation_dict = None
if output_stride == 8:
dilation_dict = {2: 2, 3: 4}
elif output_stride == 16:
dilation_dict = {3: 2}
self.conv1_1 = ConvBNLayer(
in_channels=3,
out_channels=32,
kernel_size=3,
stride=2,
act='relu',
data_format=data_format)
self.conv1_2 = ConvBNLayer(
in_channels=32,
out_channels=32,
kernel_size=3,
stride=1,
act='relu',
data_format=data_format)
self.conv1_3 = ConvBNLayer(
in_channels=32,
out_channels=64,
kernel_size=3,
stride=1,
act='relu',
data_format=data_format)
self.pool2d_max = nn.MaxPool2D(
kernel_size=3, stride=2, padding=1, data_format=data_format)
# self.block_list = []
self.stage_list = []
if layers >= 50:
for block in range(len(depth)):
shortcut = False
block_list = []
for i in range(depth[block]):
if layers in [101, 152] and block == 2:
if i == 0:
conv_name = "res" + str(block + 2) + "a"
else:
conv_name = "res" + str(block + 2) + "b" + str(i)
else:
conv_name = "res" + str(block + 2) + chr(97 + i)
###############################################################################
# Add dilation rate for some segmentation tasks, if dilation_dict is not None.
dilation_rate = dilation_dict[
block] if dilation_dict and block in dilation_dict else 1
# Actually block here is 'stage', and i is 'block' in 'stage'
# At the stage 4, expand the the dilation_rate if given multi_grid
if block == 3:
dilation_rate = dilation_rate * multi_grid[i]
###############################################################################
bottleneck_block = self.add_sublayer(
'bb_%d_%d' % (block, i),
BottleneckBlock(
in_channels=num_channels[block]
if i == 0 else num_filters[block] * 4,
out_channels=num_filters[block],
stride=2 if i == 0 and block != 0
and dilation_rate == 1 else 1,
shortcut=shortcut,
if_first=block == i == 0,
dilation=dilation_rate,
data_format=data_format))
block_list.append(bottleneck_block)
shortcut = True
self.stage_list.append(block_list)
else:
for block in range(len(depth)):
shortcut = False
block_list = []
for i in range(depth[block]):
dilation_rate = dilation_dict[block] \
if dilation_dict and block in dilation_dict else 1
if block == 3:
dilation_rate = dilation_rate * multi_grid[i]
basic_block = self.add_sublayer(
'bb_%d_%d' % (block, i),
BasicBlock(
in_channels=num_channels[block]
if i == 0 else num_filters[block],
out_channels=num_filters[block],
stride=2 if i == 0 and block != 0 \
and dilation_rate == 1 else 1,
dilation=dilation_rate,
shortcut=shortcut,
if_first=block == i == 0,
data_format=data_format))
block_list.append(basic_block)
shortcut = True
self.stage_list.append(block_list)
self.pretrained = pretrained
def forward(self, inputs: paddle.Tensor) -> List[paddle.Tensor]:
y = self.conv1_1(inputs)
y = self.conv1_2(y)
y = self.conv1_3(y)
self.conv1_logit = y.clone()
y = self.pool2d_max(y)
# A feature list saves the output feature map of each stage.
feat_list = []
for stage in self.stage_list:
for block in stage:
y = block(y)
feat_list.append(y)
return feat_list
def ResNet50_vd(**args):
model = ResNet_vd(layers=50, **args)
return model
# danet_resnet50_cityscapes
|模型名称|danet_resnet50_cityscapes|
| :--- | :---: |
|类别|图像-图像分割|
|网络|danet_resnet50vd|
|数据集|Cityscapes|
|是否支持Fine-tuning|是|
|模型大小|272MB|
|指标|-|
|最新更新日期|2022-03-21|
## 一、模型基本信息
- 样例结果示例:
<p align="center">
<img src="https://user-images.githubusercontent.com/35907364/159212111-df341f2a-e994-45d7-92d6-2288d666079c.png" width = "420" height = "505" hspace='10'/> <img src="https://user-images.githubusercontent.com/35907364/159212188-2db40b29-2943-47ce-9ad2-36a6fb85ba3e.png" width = "420" height = "505" hspace='10'/>
</p>
- ### 模型介绍
- 本示例将展示如何使用PaddleHub对预训练模型进行finetune并完成预测任务。
- 更多详情请参考:[ann](https://arxiv.org/pdf/1908.07678.pdf)
## 二、安装
- ### 1、环境依赖
- paddlepaddle >= 2.0.0
- paddlehub >= 2.0.0
- ### 2、安装
- ```shell
$ hub install danet_resnet50_cityscapes
```
- 如您安装时遇到问题,可参考:[零基础windows安装](../../../../docs/docs_ch/get_start/windows_quickstart.md)
| [零基础Linux安装](../../../../docs/docs_ch/get_start/linux_quickstart.md) | [零基础MacOS安装](../../../../docs/docs_ch/get_start/mac_quickstart.md)
## 三、模型API预测
- ### 1.预测代码示例
- ```python
import cv2
import paddle
import paddlehub as hub
if __name__ == '__main__':
model = hub.Module(name='danet_resnet50_cityscapes')
img = cv2.imread("/PATH/TO/IMAGE")
result = model.predict(images=[img], visualization=True)
```
- ### 2.如何开始Fine-tune
- 在完成安装PaddlePaddle与PaddleHub后,通过执行`python train.py`即可开始使用danet_resnet50_cityscapes模型对OpticDiscSeg数据集进行Fine-tune。 `train.py`内容如下:
- 代码步骤
- Step1: 定义数据预处理方式
- ```python
from paddlehub.vision.segmentation_transforms import Compose, Resize, Normalize
transform = Compose([Resize(target_size=(512, 512)), Normalize()])
```
- `segmentation_transforms` 数据增强模块定义了丰富的针对图像分割数据的预处理方式,用户可按照需求替换自己需要的数据预处理方式。
- Step2: 下载数据集并使用
- ```python
from paddlehub.datasets import OpticDiscSeg
train_reader = OpticDiscSeg(transform, mode='train')
```
- `transforms`: 数据预处理方式。
- `mode`: `mode`: 选择数据模式,可选项有 `train`, `test`, `val`, 默认为`train`。
- 数据集的准备代码可以参考 [opticdiscseg.py](../../paddlehub/datasets/opticdiscseg.py)。`hub.datasets.OpticDiscSeg()`会自动从网络下载数据集并解压到用户目录下`$HOME/.paddlehub/dataset`目录。
- Step3: 加载预训练模型
- ```python
import paddlehub as hub
model = hub.Module(name='danet_resnet50_cityscapes', num_classes=2, pretrained=None)
```
- `name`: 选择预训练模型的名字。
- `load_checkpoint`: 是否加载自己训练的模型,若为None,则加载提供的模型默认参数。
- Step4: 选择优化策略和运行配置
- ```python
import paddle
from paddlehub.finetune.trainer import Trainer
scheduler = paddle.optimizer.lr.PolynomialDecay(learning_rate=0.01, decay_steps=1000, power=0.9, end_lr=0.0001)
optimizer = paddle.optimizer.Adam(learning_rate=scheduler, parameters=model.parameters())
trainer = Trainer(model, optimizer, checkpoint_dir='test_ckpt_img_seg', use_gpu=True)
trainer.train(train_reader, epochs=10, batch_size=4, log_interval=10, save_interval=4)
```
- 模型预测
- 当完成Fine-tune后,Fine-tune过程在验证集上表现最优的模型会被保存在`${CHECKPOINT_DIR}/best_model`目录下,其中`${CHECKPOINT_DIR}`目录为Fine-tune时所选择的保存checkpoint的目录。我们使用该模型来进行预测。predict.py脚本如下:
```python
import paddle
import cv2
import paddlehub as hub
if __name__ == '__main__':
model = hub.Module(name='danet_resnet50_cityscapes', pretrained='/PATH/TO/CHECKPOINT')
img = cv2.imread("/PATH/TO/IMAGE")
model.predict(images=[img], visualization=True)
```
- 参数配置正确后,请执行脚本`python predict.py`。
- **Args**
* `images`:原始图像路径或BGR格式图片;
* `visualization`: 是否可视化,默认为True;
* `save_path`: 保存结果的路径,默认保存路径为'seg_result'。
**NOTE:** 进行预测时,所选择的module,checkpoint_dir,dataset必须和Fine-tune所用的一样。
## 四、服务部署
- PaddleHub Serving可以部署一个在线图像分割服务。
- ### 第一步:启动PaddleHub Serving
- 运行启动命令:
- ```shell
$ hub serving start -m danet_resnet50_cityscapes
```
- 这样就完成了一个图像分割服务化API的部署,默认端口号为8866。
- **NOTE:** 如使用GPU预测,则需要在启动服务之前,请设置CUDA_VISIBLE_DEVICES环境变量,否则不用设置。
- ### 第二步:发送预测请求
- 配置好服务端,以下数行代码即可实现发送预测请求,获取预测结果
```python
import requests
import json
import cv2
import base64
import numpy as np
def cv2_to_base64(image):
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tostring()).decode('utf8')
def base64_to_cv2(b64str):
data = base64.b64decode(b64str.encode('utf8'))
data = np.fromstring(data, np.uint8)
data = cv2.imdecode(data, cv2.IMREAD_COLOR)
return data
# 发送HTTP请求
org_im = cv2.imread('/PATH/TO/IMAGE')
data = {'images':[cv2_to_base64(org_im)]}
headers = {"Content-type": "application/json"}
url = "http://127.0.0.1:8866/predict/ginet_resnet50vd_voc"
r = requests.post(url=url, headers=headers, data=json.dumps(data))
mask = base64_to_cv2(r.json()["results"][0])
```
## 五、更新历史
* 1.0.0
初始发布
# danet_resnet50_cityscapes
|Module Name|danet_resnet50_cityscapes|
| :--- | :---: |
|Category|Image Segmentation|
|Network|danet_resnet50vd|
|Dataset|Cityscapes|
|Fine-tuning supported or not|Yes|
|Module Size|272MB|
|Data indicators|-|
|Latest update date|2022-03-21|
## I. Basic Information
- ### Application Effect Display
- Sample results:
<p align="center">
<img src="https://user-images.githubusercontent.com/35907364/159212111-df341f2a-e994-45d7-92d6-2288d666079c.png" width = "420" height = "505" hspace='10'/> <img src="https://user-images.githubusercontent.com/35907364/159212188-2db40b29-2943-47ce-9ad2-36a6fb85ba3e.png" width = "420" height = "505" hspace='10'/>
</p>
- ### Module Introduction
- We will show how to use PaddleHub to finetune the pre-trained model and complete the prediction.
- For more information, please refer to: [ginet](https://arxiv.org/pdf/2009.06160)
## II. Installation
- ### 1、Environmental Dependence
- paddlepaddle >= 2.0.0
- paddlehub >= 2.0.0
- ### 2、Installation
- ```shell
$ hub install danet_resnet50_cityscapes
```
- In case of any problems during installation, please refer to:[Windows_Quickstart](../../../../docs/docs_en/get_start/windows_quickstart.md)
| [Linux_Quickstart](../../../../docs/docs_en/get_start/linux_quickstart.md) | [Mac_Quickstart](../../../../docs/docs_en/get_start/mac_quickstart.md)
## III. Module API Prediction
- ### 1、Prediction Code Example
- ```python
import cv2
import paddle
import paddlehub as hub
if __name__ == '__main__':
model = hub.Module(name='danet_resnet50_cityscapes')
img = cv2.imread("/PATH/TO/IMAGE")
result = model.predict(images=[img], visualization=True)
```
- ### 2.Fine-tune and Encapsulation
- After completing the installation of PaddlePaddle and PaddleHub, you can start using the danet_resnet50_cityscapes model to fine-tune datasets such as OpticDiscSeg.
- Steps:
- Step1: Define the data preprocessing method
- ```python
from paddlehub.vision.segmentation_transforms import Compose, Resize, Normalize
transform = Compose([Resize(target_size=(512, 512)), Normalize()])
```
- `segmentation_transforms`: The data enhancement module defines lots of data preprocessing methods. Users can replace the data preprocessing methods according to their needs.
- Step2: Download the dataset
- ```python
from paddlehub.datasets import OpticDiscSeg
train_reader = OpticDiscSeg(transform, mode='train')
```
* `transforms`: data preprocessing methods.
* `mode`: Select the data mode, the options are `train`, `test`, `val`. Default is `train`.
* Dataset preparation can be referred to [opticdiscseg.py](../../paddlehub/datasets/opticdiscseg.py)。`hub.datasets.OpticDiscSeg()`will be automatically downloaded from the network and decompressed to the `$HOME/.paddlehub/dataset` directory under the user directory.
- Step3: Load the pre-trained model
- ```python
import paddlehub as hub
model = hub.Module(name='danet_resnet50_cityscapes', num_classes=2, pretrained=None)
```
- `name`: model name.
- `load_checkpoint`: Whether to load the self-trained model, if it is None, load the provided parameters.
- Step4: Optimization strategy
- ```python
import paddle
from paddlehub.finetune.trainer import Trainer
scheduler = paddle.optimizer.lr.PolynomialDecay(learning_rate=0.01, decay_steps=1000, power=0.9, end_lr=0.0001)
optimizer = paddle.optimizer.Adam(learning_rate=scheduler, parameters=model.parameters())
trainer = Trainer(model, optimizer, checkpoint_dir='test_ckpt_img_seg', use_gpu=True)
trainer.train(train_reader, epochs=10, batch_size=4, log_interval=10, save_interval=4)
```
- Model prediction
- When Fine-tune is completed, the model with the best performance on the verification set will be saved in the `${CHECKPOINT_DIR}/best_model` directory. We use this model to make predictions. The `predict.py` script is as follows:
```python
import paddle
import cv2
import paddlehub as hub
if __name__ == '__main__':
model = hub.Module(name='danet_resnet50_cityscapes', pretrained='/PATH/TO/CHECKPOINT')
img = cv2.imread("/PATH/TO/IMAGE")
model.predict(images=[img], visualization=True)
```
- **Args**
* `images`: Image path or ndarray data with format [H, W, C], BGR.
* `visualization`: Whether to save the recognition results as picture files.
* `save_path`: Save path of the result, default is 'seg_result'.
## IV. Server Deployment
- PaddleHub Serving can deploy an online service of image segmentation.
- ### Step 1: Start PaddleHub Serving
- Run the startup command:
- ```shell
$ hub serving start -m danet_resnet50_cityscapes
```
- The servitization API is now deployed and the default port number is 8866.
- **NOTE:** If GPU is used for prediction, set CUDA_VISIBLE_DEVICES environment variable before the service, otherwise it need not be set.
- ### Step 2: Send a predictive request
- With a configured server, use the following lines of code to send the prediction request and obtain the result:
```python
import requests
import json
import cv2
import base64
import numpy as np
def cv2_to_base64(image):
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tostring()).decode('utf8')
def base64_to_cv2(b64str):
data = base64.b64decode(b64str.encode('utf8'))
data = np.fromstring(data, np.uint8)
data = cv2.imdecode(data, cv2.IMREAD_COLOR)
return data
org_im = cv2.imread('/PATH/TO/IMAGE')
data = {'images':[cv2_to_base64(org_im)]}
headers = {"Content-type": "application/json"}
url = "http://127.0.0.1:8866/predict/danet_resnet50_cityscapes"
r = requests.post(url=url, headers=headers, data=json.dumps(data))
mask = base64_to_cv2(r.json()["results"][0])
```
## V. Release Note
- 1.0.0
First release
# copyright (c) 2022 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle
import paddle.nn as nn
import paddle.nn.functional as F
from paddle.nn.layer import activation
from paddle.nn import Conv2D, AvgPool2D
def SyncBatchNorm(*args, **kwargs):
"""In cpu environment nn.SyncBatchNorm does not have kernel so use nn.BatchNorm2D instead"""
if paddle.get_device() == 'cpu':
return nn.BatchNorm2D(*args, **kwargs)
else:
return nn.SyncBatchNorm(*args, **kwargs)
class ConvBNLayer(nn.Layer):
"""Basic conv bn relu layer."""
def __init__(
self,
in_channels: int,
out_channels: int,
kernel_size: int,
stride: int = 1,
dilation: int = 1,
groups: int = 1,
is_vd_mode: bool = False,
act: str = None,
name: str = None):
super(ConvBNLayer, self).__init__()
self.is_vd_mode = is_vd_mode
self._pool2d_avg = AvgPool2D(
kernel_size=2, stride=2, padding=0, ceil_mode=True)
self._conv = Conv2D(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=kernel_size,
stride=stride,
padding=(kernel_size - 1) // 2 if dilation == 1 else 0,
dilation=dilation,
groups=groups,
bias_attr=False)
self._batch_norm = SyncBatchNorm(out_channels)
self._act_op = Activation(act=act)
def forward(self, inputs: paddle.Tensor) -> paddle.Tensor:
if self.is_vd_mode:
inputs = self._pool2d_avg(inputs)
y = self._conv(inputs)
y = self._batch_norm(y)
y = self._act_op(y)
return y
class BottleneckBlock(nn.Layer):
"""Residual bottleneck block"""
def __init__(self,
in_channels: int,
out_channels: int,
stride: int,
shortcut: bool = True,
if_first: bool = False,
dilation: int = 1,
name: str = None):
super(BottleneckBlock, self).__init__()
self.conv0 = ConvBNLayer(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=1,
act='relu',
name=name + "_branch2a")
self.dilation = dilation
self.conv1 = ConvBNLayer(
in_channels=out_channels,
out_channels=out_channels,
kernel_size=3,
stride=stride,
act='relu',
dilation=dilation,
name=name + "_branch2b")
self.conv2 = ConvBNLayer(
in_channels=out_channels,
out_channels=out_channels * 4,
kernel_size=1,
act=None,
name=name + "_branch2c")
if not shortcut:
self.short = ConvBNLayer(
in_channels=in_channels,
out_channels=out_channels * 4,
kernel_size=1,
stride=1,
is_vd_mode=False if if_first or stride == 1 else True,
name=name + "_branch1")
self.shortcut = shortcut
def forward(self, inputs: paddle.Tensor) -> paddle.Tensor:
y = self.conv0(inputs)
if self.dilation > 1:
padding = self.dilation
y = F.pad(y, [padding, padding, padding, padding])
conv1 = self.conv1(y)
conv2 = self.conv2(conv1)
if self.shortcut:
short = inputs
else:
short = self.short(inputs)
y = paddle.add(x=short, y=conv2)
y = F.relu(y)
return y
class SeparableConvBNReLU(nn.Layer):
"""Depthwise Separable Convolution."""
def __init__(self,
in_channels: int,
out_channels: int,
kernel_size: int,
padding: str = 'same',
**kwargs: dict):
super(SeparableConvBNReLU, self).__init__()
self.depthwise_conv = ConvBN(
in_channels,
out_channels=in_channels,
kernel_size=kernel_size,
padding=padding,
groups=in_channels,
**kwargs)
self.piontwise_conv = ConvBNReLU(
in_channels, out_channels, kernel_size=1, groups=1)
def forward(self, x: paddle.Tensor) -> paddle.Tensor:
x = self.depthwise_conv(x)
x = self.piontwise_conv(x)
return x
class ConvBN(nn.Layer):
"""Basic conv bn layer"""
def __init__(self,
in_channels: int,
out_channels: int,
kernel_size: int,
padding: str = 'same',
**kwargs: dict):
super(ConvBN, self).__init__()
self._conv = Conv2D(
in_channels, out_channels, kernel_size, padding=padding, **kwargs)
self._batch_norm = SyncBatchNorm(out_channels)
def forward(self, x: paddle.Tensor) -> paddle.Tensor:
x = self._conv(x)
x = self._batch_norm(x)
return x
class ConvBNReLU(nn.Layer):
"""Basic conv bn relu layer."""
def __init__(self,
in_channels: int,
out_channels: int,
kernel_size: int,
padding: str = 'same',
**kwargs: dict):
super(ConvBNReLU, self).__init__()
self._conv = Conv2D(
in_channels, out_channels, kernel_size, padding=padding, **kwargs)
self._batch_norm = SyncBatchNorm(out_channels)
def forward(self, x: paddle.Tensor) -> paddle.Tensor:
x = self._conv(x)
x = self._batch_norm(x)
x = F.relu(x)
return x
class Activation(nn.Layer):
"""
The wrapper of activations.
Args:
act (str, optional): The activation name in lowercase. It must be one of ['elu', 'gelu',
'hardshrink', 'tanh', 'hardtanh', 'prelu', 'relu', 'relu6', 'selu', 'leakyrelu', 'sigmoid',
'softmax', 'softplus', 'softshrink', 'softsign', 'tanhshrink', 'logsigmoid', 'logsoftmax',
'hsigmoid']. Default: None, means identical transformation.
Returns:
A callable object of Activation.
Raises:
KeyError: When parameter `act` is not in the optional range.
Examples:
from paddleseg.models.common.activation import Activation
relu = Activation("relu")
print(relu)
# <class 'paddle.nn.layer.activation.ReLU'>
sigmoid = Activation("sigmoid")
print(sigmoid)
# <class 'paddle.nn.layer.activation.Sigmoid'>
not_exit_one = Activation("not_exit_one")
# KeyError: "not_exit_one does not exist in the current dict_keys(['elu', 'gelu', 'hardshrink',
# 'tanh', 'hardtanh', 'prelu', 'relu', 'relu6', 'selu', 'leakyrelu', 'sigmoid', 'softmax',
# 'softplus', 'softshrink', 'softsign', 'tanhshrink', 'logsigmoid', 'logsoftmax', 'hsigmoid'])"
"""
def __init__(self, act: str = None):
super(Activation, self).__init__()
self._act = act
upper_act_names = activation.__dict__.keys()
lower_act_names = [act.lower() for act in upper_act_names]
act_dict = dict(zip(lower_act_names, upper_act_names))
if act is not None:
if act in act_dict.keys():
act_name = act_dict[act]
self.act_func = eval("activation.{}()".format(act_name))
else:
raise KeyError("{} does not exist in the current {}".format(
act, act_dict.keys()))
def forward(self, x: paddle.Tensor) -> paddle.Tensor:
if self._act is not None:
return self.act_func(x)
else:
return x
class ASPPModule(nn.Layer):
"""
Atrous Spatial Pyramid Pooling.
Args:
aspp_ratios (tuple): The dilation rate using in ASSP module.
in_channels (int): The number of input channels.
out_channels (int): The number of output channels.
align_corners (bool): An argument of F.interpolate. It should be set to False when the output size of feature
is even, e.g. 1024x512, otherwise it is True, e.g. 769x769.
use_sep_conv (bool, optional): If using separable conv in ASPP module. Default: False.
image_pooling (bool, optional): If augmented with image-level features. Default: False
"""
def __init__(self,
aspp_ratios: tuple,
in_channels: int,
out_channels: int,
align_corners: bool,
use_sep_conv: bool= False,
image_pooling: bool = False):
super().__init__()
self.align_corners = align_corners
self.aspp_blocks = nn.LayerList()
for ratio in aspp_ratios:
if use_sep_conv and ratio > 1:
conv_func = SeparableConvBNReLU
else:
conv_func = ConvBNReLU
block = conv_func(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=1 if ratio == 1 else 3,
dilation=ratio,
padding=0 if ratio == 1 else ratio)
self.aspp_blocks.append(block)
out_size = len(self.aspp_blocks)
if image_pooling:
self.global_avg_pool = nn.Sequential(
nn.AdaptiveAvgPool2D(output_size=(1, 1)),
ConvBNReLU(in_channels, out_channels, kernel_size=1, bias_attr=False))
out_size += 1
self.image_pooling = image_pooling
self.conv_bn_relu = ConvBNReLU(
in_channels=out_channels * out_size,
out_channels=out_channels,
kernel_size=1)
self.dropout = nn.Dropout(p=0.1) # drop rate
def forward(self, x: paddle.Tensor) -> paddle.Tensor:
outputs = []
for block in self.aspp_blocks:
y = block(x)
y = F.interpolate(
y,
x.shape[2:],
mode='bilinear',
align_corners=self.align_corners)
outputs.append(y)
if self.image_pooling:
img_avg = self.global_avg_pool(x)
img_avg = F.interpolate(
img_avg,
x.shape[2:],
mode='bilinear',
align_corners=self.align_corners)
outputs.append(img_avg)
x = paddle.concat(outputs, axis=1)
x = self.conv_bn_relu(x)
x = self.dropout(x)
return x
# copyright (c) 2022 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from typing import Union, List, Tuple
import paddle
from paddle import nn
import paddle.nn.functional as F
import numpy as np
from paddlehub.module.module import moduleinfo
import paddlehub.vision.segmentation_transforms as T
from paddlehub.module.cv_module import ImageSegmentationModule
from danet_resnet50_voc.resnet import ResNet50_vd
import danet_resnet50_voc.layers as L
@moduleinfo(
name="danet_resnet50_cityscapes",
type="CV/semantic_segmentation",
author="paddlepaddle",
author_email="",
summary="DANetResnet50 is a segmentation model.",
version="1.0.0",
meta=ImageSegmentationModule)
class DANet(nn.Layer):
"""
The DANet implementation based on PaddlePaddle.
The original article refers to
Fu, jun, et al. "Dual Attention Network for Scene Segmentation"
(https://arxiv.org/pdf/1809.02983.pdf)
Args:
num_classes (int): The unique number of target classes.
backbone (Paddle.nn.Layer): A backbone network.
backbone_indices (tuple): The values in the tuple indicate the indices of
output of backbone.
align_corners (bool): An argument of F.interpolate. It should be set to False when the output size of feature
is even, e.g. 1024x512, otherwise it is True, e.g. 769x769. Default: False.
pretrained (str, optional): The path or url of pretrained model. Default: None.
"""
def __init__(self,
num_classes: int = 19,
backbone_indices: Tuple[int] = (2, 3),
align_corners: bool = False,
pretrained: str = None):
super(DANet, self).__init__()
self.backbone = ResNet50_vd()
self.backbone_indices = backbone_indices
in_channels = [self.backbone.feat_channels[i] for i in backbone_indices]
self.head = DAHead(num_classes=num_classes, in_channels=in_channels)
self.align_corners = align_corners
self.transforms = T.Compose([T.Normalize()])
if pretrained is not None:
model_dict = paddle.load(pretrained)
self.set_dict(model_dict)
print("load custom parameters success")
else:
checkpoint = os.path.join(self.directory, 'model.pdparams')
model_dict = paddle.load(checkpoint)
self.set_dict(model_dict)
print("load pretrained parameters success")
def forward(self, x: paddle.Tensor) -> List[paddle.Tensor]:
feats = self.backbone(x)
feats = [feats[i] for i in self.backbone_indices]
logit_list = self.head(feats)
if not self.training:
logit_list = [logit_list[0]]
logit_list = [
F.interpolate(
logit,
paddle.shape(x)[2:],
mode='bilinear',
align_corners=self.align_corners,
align_mode=1) for logit in logit_list
]
return logit_list
def transform(self, img: Union[np.ndarray, str]) -> Union[np.ndarray, str]:
return self.transforms(img)
class DAHead(nn.Layer):
"""
The Dual attention head.
Args:
num_classes (int): The unique number of target classes.
in_channels (tuple): The number of input channels.
"""
def __init__(self, num_classes: int, in_channels: int):
super().__init__()
in_channels = in_channels[-1]
inter_channels = in_channels // 4
self.channel_conv = L.ConvBNReLU(in_channels, inter_channels, 3)
self.position_conv = L.ConvBNReLU(in_channels, inter_channels, 3)
self.pam = PAM(inter_channels)
self.cam = CAM(inter_channels)
self.conv1 = L.ConvBNReLU(inter_channels, inter_channels, 3)
self.conv2 = L.ConvBNReLU(inter_channels, inter_channels, 3)
self.aux_head = nn.Sequential(
nn.Dropout2D(0.1), nn.Conv2D(in_channels, num_classes, 1))
self.aux_head_pam = nn.Sequential(
nn.Dropout2D(0.1), nn.Conv2D(inter_channels, num_classes, 1))
self.aux_head_cam = nn.Sequential(
nn.Dropout2D(0.1), nn.Conv2D(inter_channels, num_classes, 1))
self.cls_head = nn.Sequential(
nn.Dropout2D(0.1), nn.Conv2D(inter_channels, num_classes, 1))
def forward(self, feat_list: List[paddle.Tensor]) -> List[paddle.Tensor]:
feats = feat_list[-1]
channel_feats = self.channel_conv(feats)
channel_feats = self.cam(channel_feats)
channel_feats = self.conv1(channel_feats)
position_feats = self.position_conv(feats)
position_feats = self.pam(position_feats)
position_feats = self.conv2(position_feats)
feats_sum = position_feats + channel_feats
logit = self.cls_head(feats_sum)
if not self.training:
return [logit]
cam_logit = self.aux_head_cam(channel_feats)
pam_logit = self.aux_head_cam(position_feats)
aux_logit = self.aux_head(feats)
return [logit, cam_logit, pam_logit, aux_logit]
class PAM(nn.Layer):
"""Position attention module."""
def __init__(self, in_channels: int):
super().__init__()
mid_channels = in_channels // 8
self.mid_channels = mid_channels
self.in_channels = in_channels
self.query_conv = nn.Conv2D(in_channels, mid_channels, 1, 1)
self.key_conv = nn.Conv2D(in_channels, mid_channels, 1, 1)
self.value_conv = nn.Conv2D(in_channels, in_channels, 1, 1)
self.gamma = self.create_parameter(
shape=[1],
dtype='float32',
default_initializer=nn.initializer.Constant(0))
def forward(self, x: paddle.Tensor) -> paddle.Tensor:
x_shape = paddle.shape(x)
# query: n, h * w, c1
query = self.query_conv(x)
query = paddle.reshape(query, (0, self.mid_channels, -1))
query = paddle.transpose(query, (0, 2, 1))
# key: n, c1, h * w
key = self.key_conv(x)
key = paddle.reshape(key, (0, self.mid_channels, -1))
# sim: n, h * w, h * w
sim = paddle.bmm(query, key)
sim = F.softmax(sim, axis=-1)
value = self.value_conv(x)
value = paddle.reshape(value, (0, self.in_channels, -1))
sim = paddle.transpose(sim, (0, 2, 1))
# feat: from (n, c2, h * w) -> (n, c2, h, w)
feat = paddle.bmm(value, sim)
feat = paddle.reshape(feat,
(0, self.in_channels, x_shape[2], x_shape[3]))
out = self.gamma * feat + x
return out
class CAM(nn.Layer):
"""Channel attention module."""
def __init__(self, channels: int):
super().__init__()
self.channels = channels
self.gamma = self.create_parameter(
shape=[1],
dtype='float32',
default_initializer=nn.initializer.Constant(0))
def forward(self, x: paddle.Tensor) -> paddle.Tensor:
x_shape = paddle.shape(x)
# query: n, c, h * w
query = paddle.reshape(x, (0, self.channels, -1))
# key: n, h * w, c
key = paddle.reshape(x, (0, self.channels, -1))
key = paddle.transpose(key, (0, 2, 1))
# sim: n, c, c
sim = paddle.bmm(query, key)
# The danet author claims that this can avoid gradient divergence
sim = paddle.max(
sim, axis=-1, keepdim=True).tile([1, 1, self.channels]) - sim
sim = F.softmax(sim, axis=-1)
# feat: from (n, c, h * w) to (n, c, h, w)
value = paddle.reshape(x, (0, self.channels, -1))
feat = paddle.bmm(sim, value)
feat = paddle.reshape(feat, (0, self.channels, x_shape[2], x_shape[3]))
out = self.gamma * feat + x
return out
# copyright (c) 2022 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Union, List, Tuple
import paddle.nn as nn
import ann_resnet50_voc.layers as layers
class ConvBNLayer(nn.Layer):
def __init__(self,
in_channels: int,
out_channels: int,
kernel_size: int,
stride: int = 1,
dilation: int = 1,
groups: int = 1,
is_vd_mode: bool = False,
act: str = None,
data_format: str = 'NCHW'):
super(ConvBNLayer, self).__init__()
if dilation != 1 and kernel_size != 3:
raise RuntimeError("When the dilation isn't 1," \
"the kernel_size should be 3.")
self.is_vd_mode = is_vd_mode
self._pool2d_avg = nn.AvgPool2D(
kernel_size=2,
stride=2,
padding=0,
ceil_mode=True,
data_format=data_format)
self._conv = nn.Conv2D(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=kernel_size,
stride=stride,
padding=(kernel_size - 1) // 2 \
if dilation == 1 else dilation,
dilation=dilation,
groups=groups,
bias_attr=False,
data_format=data_format)
self._batch_norm = layers.SyncBatchNorm(
out_channels, data_format=data_format)
self._act_op = layers.Activation(act=act)
def forward(self, inputs: paddle.Tensor) -> paddle.Tensor:
if self.is_vd_mode:
inputs = self._pool2d_avg(inputs)
y = self._conv(inputs)
y = self._batch_norm(y)
y = self._act_op(y)
return y
class BottleneckBlock(nn.Layer):
def __init__(self,
in_channels: int,
out_channels: int,
stride: int,
shortcut: bool = True,
if_first: bool = False,
dilation: int = 1,
data_format: str = 'NCHW'):
super(BottleneckBlock, self).__init__()
self.data_format = data_format
self.conv0 = ConvBNLayer(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=1,
act='relu',
data_format=data_format)
self.dilation = dilation
self.conv1 = ConvBNLayer(
in_channels=out_channels,
out_channels=out_channels,
kernel_size=3,
stride=stride,
act='relu',
dilation=dilation,
data_format=data_format)
self.conv2 = ConvBNLayer(
in_channels=out_channels,
out_channels=out_channels * 4,
kernel_size=1,
act=None,
data_format=data_format)
if not shortcut:
self.short = ConvBNLayer(
in_channels=in_channels,
out_channels=out_channels * 4,
kernel_size=1,
stride=1,
is_vd_mode=False if if_first or stride == 1 else True,
data_format=data_format)
self.shortcut = shortcut
# NOTE: Use the wrap layer for quantization training
self.add = layers.Add()
self.relu = layers.Activation(act="relu")
def forward(self, inputs: paddle.Tensor) -> paddle.Tensor:
y = self.conv0(inputs)
conv1 = self.conv1(y)
conv2 = self.conv2(conv1)
if self.shortcut:
short = inputs
else:
short = self.short(inputs)
y = self.add(short, conv2)
y = self.relu(y)
return y
class BasicBlock(nn.Layer):
def __init__(self,
in_channels: int,
out_channels: int,
stride: int,
dilation: int = 1,
shortcut: bool = True,
if_first: bool = False,
data_format: str = 'NCHW'):
super(BasicBlock, self).__init__()
self.conv0 = ConvBNLayer(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=3,
stride=stride,
dilation=dilation,
act='relu',
data_format=data_format)
self.conv1 = ConvBNLayer(
in_channels=out_channels,
out_channels=out_channels,
kernel_size=3,
dilation=dilation,
act=None,
data_format=data_format)
if not shortcut:
self.short = ConvBNLayer(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=1,
stride=1,
is_vd_mode=False if if_first or stride == 1 else True,
data_format=data_format)
self.shortcut = shortcut
self.dilation = dilation
self.data_format = data_format
self.add = layers.Add()
self.relu = layers.Activation(act="relu")
def forward(self, inputs: paddle.Tensor) -> paddle.Tensor:
y = self.conv0(inputs)
conv1 = self.conv1(y)
if self.shortcut:
short = inputs
else:
short = self.short(inputs)
y = self.add(short, conv1)
y = self.relu(y)
return y
class ResNet_vd(nn.Layer):
"""
The ResNet_vd implementation based on PaddlePaddle.
The original article refers to Jingdong
Tong He, et, al. "Bag of Tricks for Image Classification with Convolutional Neural Networks"
(https://arxiv.org/pdf/1812.01187.pdf).
Args:
layers (int, optional): The layers of ResNet_vd. The supported layers are (18, 34, 50, 101, 152, 200). Default: 50.
output_stride (int, optional): The stride of output features compared to input images. It is 8 or 16. Default: 8.
multi_grid (tuple|list, optional): The grid of stage4. Defult: (1, 1, 1).
pretrained (str, optional): The path of pretrained model.
"""
def __init__(self,
layers: int = 50,
output_stride: int = 8,
multi_grid: Tuple[int] = (1, 1, 1),
pretrained: str = None,
data_format: str = 'NCHW'):
super(ResNet_vd, self).__init__()
self.data_format = data_format
self.conv1_logit = None # for gscnn shape stream
self.layers = layers
supported_layers = [18, 34, 50, 101, 152, 200]
assert layers in supported_layers, \
"supported layers are {} but input layer is {}".format(
supported_layers, layers)
if layers == 18:
depth = [2, 2, 2, 2]
elif layers == 34 or layers == 50:
depth = [3, 4, 6, 3]
elif layers == 101:
depth = [3, 4, 23, 3]
elif layers == 152:
depth = [3, 8, 36, 3]
elif layers == 200:
depth = [3, 12, 48, 3]
num_channels = [64, 256, 512, 1024
] if layers >= 50 else [64, 64, 128, 256]
num_filters = [64, 128, 256, 512]
# for channels of four returned stages
self.feat_channels = [c * 4 for c in num_filters
] if layers >= 50 else num_filters
dilation_dict = None
if output_stride == 8:
dilation_dict = {2: 2, 3: 4}
elif output_stride == 16:
dilation_dict = {3: 2}
self.conv1_1 = ConvBNLayer(
in_channels=3,
out_channels=32,
kernel_size=3,
stride=2,
act='relu',
data_format=data_format)
self.conv1_2 = ConvBNLayer(
in_channels=32,
out_channels=32,
kernel_size=3,
stride=1,
act='relu',
data_format=data_format)
self.conv1_3 = ConvBNLayer(
in_channels=32,
out_channels=64,
kernel_size=3,
stride=1,
act='relu',
data_format=data_format)
self.pool2d_max = nn.MaxPool2D(
kernel_size=3, stride=2, padding=1, data_format=data_format)
# self.block_list = []
self.stage_list = []
if layers >= 50:
for block in range(len(depth)):
shortcut = False
block_list = []
for i in range(depth[block]):
if layers in [101, 152] and block == 2:
if i == 0:
conv_name = "res" + str(block + 2) + "a"
else:
conv_name = "res" + str(block + 2) + "b" + str(i)
else:
conv_name = "res" + str(block + 2) + chr(97 + i)
###############################################################################
# Add dilation rate for some segmentation tasks, if dilation_dict is not None.
dilation_rate = dilation_dict[
block] if dilation_dict and block in dilation_dict else 1
# Actually block here is 'stage', and i is 'block' in 'stage'
# At the stage 4, expand the the dilation_rate if given multi_grid
if block == 3:
dilation_rate = dilation_rate * multi_grid[i]
###############################################################################
bottleneck_block = self.add_sublayer(
'bb_%d_%d' % (block, i),
BottleneckBlock(
in_channels=num_channels[block]
if i == 0 else num_filters[block] * 4,
out_channels=num_filters[block],
stride=2 if i == 0 and block != 0
and dilation_rate == 1 else 1,
shortcut=shortcut,
if_first=block == i == 0,
dilation=dilation_rate,
data_format=data_format))
block_list.append(bottleneck_block)
shortcut = True
self.stage_list.append(block_list)
else:
for block in range(len(depth)):
shortcut = False
block_list = []
for i in range(depth[block]):
dilation_rate = dilation_dict[block] \
if dilation_dict and block in dilation_dict else 1
if block == 3:
dilation_rate = dilation_rate * multi_grid[i]
basic_block = self.add_sublayer(
'bb_%d_%d' % (block, i),
BasicBlock(
in_channels=num_channels[block]
if i == 0 else num_filters[block],
out_channels=num_filters[block],
stride=2 if i == 0 and block != 0 \
and dilation_rate == 1 else 1,
dilation=dilation_rate,
shortcut=shortcut,
if_first=block == i == 0,
data_format=data_format))
block_list.append(basic_block)
shortcut = True
self.stage_list.append(block_list)
self.pretrained = pretrained
def forward(self, inputs: paddle.Tensor) -> List[paddle.Tensor]:
y = self.conv1_1(inputs)
y = self.conv1_2(y)
y = self.conv1_3(y)
self.conv1_logit = y.clone()
y = self.pool2d_max(y)
# A feature list saves the output feature map of each stage.
feat_list = []
for stage in self.stage_list:
for block in stage:
y = block(y)
feat_list.append(y)
return feat_list
def ResNet50_vd(**args):
model = ResNet_vd(layers=50, **args)
return model
\ No newline at end of file
# danet_resnet50_voc
|模型名称|danet_resnet50_voc|
| :--- | :---: |
|类别|图像-图像分割|
|网络|danet_resnet50vd|
|数据集|PascalVOC2012|
|是否支持Fine-tuning|是|
|模型大小|273MB|
|指标|-|
|最新更新日期|2022-03-21|
## 一、模型基本信息
- 样例结果示例:
<p align="center">
<img src="https://user-images.githubusercontent.com/35907364/159212097-443a5a65-2f2e-4126-9c07-d7c3c220e55f.jpg" width = "420" height = "505" hspace='10'/> <img src="https://user-images.githubusercontent.com/35907364/159212375-52e123af-4699-4c25-8f50-4240bbb714b4.png" width = "420" height = "505" hspace='10'/>
</p>
- ### 模型介绍
- 本示例将展示如何使用PaddleHub对预训练模型进行finetune并完成预测任务。
- 更多详情请参考:[danet](https://arxiv.org/pdf/1809.02983.pdf)
## 二、安装
- ### 1、环境依赖
- paddlepaddle >= 2.0.0
- paddlehub >= 2.0.0
- ### 2、安装
- ```shell
$ hub install danet_resnet50_voc
```
- 如您安装时遇到问题,可参考:[零基础windows安装](../../../../docs/docs_ch/get_start/windows_quickstart.md)
| [零基础Linux安装](../../../../docs/docs_ch/get_start/linux_quickstart.md) | [零基础MacOS安装](../../../../docs/docs_ch/get_start/mac_quickstart.md)
## 三、模型API预测
- ### 1.预测代码示例
- ```python
import cv2
import paddle
import paddlehub as hub
if __name__ == '__main__':
model = hub.Module(name='danet_resnet50_voc')
img = cv2.imread("/PATH/TO/IMAGE")
result = model.predict(images=[img], visualization=True)
```
- ### 2.如何开始Fine-tune
- 在完成安装PaddlePaddle与PaddleHub后,通过执行`python train.py`即可开始使用danet_resnet50_voc模型对OpticDiscSeg数据集进行Fine-tune。 `train.py`内容如下:
- 代码步骤
- Step1: 定义数据预处理方式
- ```python
from paddlehub.vision.segmentation_transforms import Compose, Resize, Normalize
transform = Compose([Resize(target_size=(512, 512)), Normalize()])
```
- `segmentation_transforms` 数据增强模块定义了丰富的针对图像分割数据的预处理方式,用户可按照需求替换自己需要的数据预处理方式。
- Step2: 下载数据集并使用
- ```python
from paddlehub.datasets import OpticDiscSeg
train_reader = OpticDiscSeg(transform, mode='train')
```
- `transforms`: 数据预处理方式。
- `mode`: `mode`: 选择数据模式,可选项有 `train`, `test`, `val`, 默认为`train`。
- 数据集的准备代码可以参考 [opticdiscseg.py](../../paddlehub/datasets/opticdiscseg.py)。`hub.datasets.OpticDiscSeg()`会自动从网络下载数据集并解压到用户目录下`$HOME/.paddlehub/dataset`目录。
- Step3: 加载预训练模型
- ```python
import paddlehub as hub
model = hub.Module(name='danet_resnet50_voc', num_classes=2, pretrained=None)
```
- `name`: 选择预训练模型的名字。
- `load_checkpoint`: 是否加载自己训练的模型,若为None,则加载提供的模型默认参数。
- Step4: 选择优化策略和运行配置
- ```python
import paddle
from paddlehub.finetune.trainer import Trainer
scheduler = paddle.optimizer.lr.PolynomialDecay(learning_rate=0.01, decay_steps=1000, power=0.9, end_lr=0.0001)
optimizer = paddle.optimizer.Adam(learning_rate=scheduler, parameters=model.parameters())
trainer = Trainer(model, optimizer, checkpoint_dir='test_ckpt_img_seg', use_gpu=True)
trainer.train(train_reader, epochs=10, batch_size=4, log_interval=10, save_interval=4)
```
- 模型预测
- 当完成Fine-tune后,Fine-tune过程在验证集上表现最优的模型会被保存在`${CHECKPOINT_DIR}/best_model`目录下,其中`${CHECKPOINT_DIR}`目录为Fine-tune时所选择的保存checkpoint的目录。我们使用该模型来进行预测。predict.py脚本如下:
```python
import paddle
import cv2
import paddlehub as hub
if __name__ == '__main__':
model = hub.Module(name='danet_resnet50_voc', pretrained='/PATH/TO/CHECKPOINT')
img = cv2.imread("/PATH/TO/IMAGE")
model.predict(images=[img], visualization=True)
```
- 参数配置正确后,请执行脚本`python predict.py`。
- **Args**
* `images`:原始图像路径或BGR格式图片;
* `visualization`: 是否可视化,默认为True;
* `save_path`: 保存结果的路径,默认保存路径为'seg_result'。
**NOTE:** 进行预测时,所选择的module,checkpoint_dir,dataset必须和Fine-tune所用的一样。
## 四、服务部署
- PaddleHub Serving可以部署一个在线图像分割服务。
- ### 第一步:启动PaddleHub Serving
- 运行启动命令:
- ```shell
$ hub serving start -m danet_resnet50_voc
```
- 这样就完成了一个图像分割服务化API的部署,默认端口号为8866。
- **NOTE:** 如使用GPU预测,则需要在启动服务之前,请设置CUDA_VISIBLE_DEVICES环境变量,否则不用设置。
- ### 第二步:发送预测请求
- 配置好服务端,以下数行代码即可实现发送预测请求,获取预测结果
```python
import requests
import json
import cv2
import base64
import numpy as np
def cv2_to_base64(image):
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tostring()).decode('utf8')
def base64_to_cv2(b64str):
data = base64.b64decode(b64str.encode('utf8'))
data = np.fromstring(data, np.uint8)
data = cv2.imdecode(data, cv2.IMREAD_COLOR)
return data
# 发送HTTP请求
org_im = cv2.imread('/PATH/TO/IMAGE')
data = {'images':[cv2_to_base64(org_im)]}
headers = {"Content-type": "application/json"}
url = "http://127.0.0.1:8866/predict/ginet_resnet50vd_voc"
r = requests.post(url=url, headers=headers, data=json.dumps(data))
mask = base64_to_cv2(r.json()["results"][0])
```
## 五、更新历史
* 1.0.0
初始发布
# danet_resnet50_voc
|Module Name|danet_resnet50_voc|
| :--- | :---: |
|Category|Image Segmentation|
|Network|danet_resnet50vd|
|Dataset|PascalVOC2012|
|Fine-tuning supported or not|Yes|
|Module Size|273MB|
|Data indicators|-|
|Latest update date|2022-03-22|
## I. Basic Information
- ### Application Effect Display
- Sample results:
<p align="center">
<img src="https://user-images.githubusercontent.com/35907364/159212097-443a5a65-2f2e-4126-9c07-d7c3c220e55f.jpg" width = "420" height = "505" hspace='10'/> <img src="https://user-images.githubusercontent.com/35907364/159212375-52e123af-4699-4c25-8f50-4240bbb714b4.png" width = "420" height = "505" hspace='10'/>
</p>
- ### Module Introduction
- We will show how to use PaddleHub to finetune the pre-trained model and complete the prediction.
- For more information, please refer to: [danet](https://arxiv.org/pdf/1809.02983.pdf)
## II. Installation
- ### 1、Environmental Dependence
- paddlepaddle >= 2.0.0
- paddlehub >= 2.0.0
- ### 2、Installation
- ```shell
$ hub install danet_resnet50_voc
```
- In case of any problems during installation, please refer to:[Windows_Quickstart](../../../../docs/docs_en/get_start/windows_quickstart.md)
| [Linux_Quickstart](../../../../docs/docs_en/get_start/linux_quickstart.md) | [Mac_Quickstart](../../../../docs/docs_en/get_start/mac_quickstart.md)
## III. Module API Prediction
- ### 1、Prediction Code Example
- ```python
import cv2
import paddle
import paddlehub as hub
if __name__ == '__main__':
model = hub.Module(name='danet_resnet50_voc')
img = cv2.imread("/PATH/TO/IMAGE")
result = model.predict(images=[img], visualization=True)
```
- ### 2.Fine-tune and Encapsulation
- After completing the installation of PaddlePaddle and PaddleHub, you can start using the danet_resnet50_voc model to fine-tune datasets such as OpticDiscSeg.
- Steps:
- Step1: Define the data preprocessing method
- ```python
from paddlehub.vision.segmentation_transforms import Compose, Resize, Normalize
transform = Compose([Resize(target_size=(512, 512)), Normalize()])
```
- `segmentation_transforms`: The data enhancement module defines lots of data preprocessing methods. Users can replace the data preprocessing methods according to their needs.
- Step2: Download the dataset
- ```python
from paddlehub.datasets import OpticDiscSeg
train_reader = OpticDiscSeg(transform, mode='train')
```
* `transforms`: data preprocessing methods.
* `mode`: Select the data mode, the options are `train`, `test`, `val`. Default is `train`.
* Dataset preparation can be referred to [opticdiscseg.py](../../paddlehub/datasets/opticdiscseg.py)。`hub.datasets.OpticDiscSeg()`will be automatically downloaded from the network and decompressed to the `$HOME/.paddlehub/dataset` directory under the user directory.
- Step3: Load the pre-trained model
- ```python
import paddlehub as hub
model = hub.Module(name='danet_resnet50_voc', num_classes=2, pretrained=None)
```
- `name`: model name.
- `load_checkpoint`: Whether to load the self-trained model, if it is None, load the provided parameters.
- Step4: Optimization strategy
- ```python
import paddle
from paddlehub.finetune.trainer import Trainer
scheduler = paddle.optimizer.lr.PolynomialDecay(learning_rate=0.01, decay_steps=1000, power=0.9, end_lr=0.0001)
optimizer = paddle.optimizer.Adam(learning_rate=scheduler, parameters=model.parameters())
trainer = Trainer(model, optimizer, checkpoint_dir='test_ckpt_img_seg', use_gpu=True)
trainer.train(train_reader, epochs=10, batch_size=4, log_interval=10, save_interval=4)
```
- Model prediction
- When Fine-tune is completed, the model with the best performance on the verification set will be saved in the `${CHECKPOINT_DIR}/best_model` directory. We use this model to make predictions. The `predict.py` script is as follows:
```python
import paddle
import cv2
import paddlehub as hub
if __name__ == '__main__':
model = hub.Module(name='danet_resnet50_voc', pretrained='/PATH/TO/CHECKPOINT')
img = cv2.imread("/PATH/TO/IMAGE")
model.predict(images=[img], visualization=True)
```
- **Args**
* `images`: Image path or ndarray data with format [H, W, C], BGR.
* `visualization`: Whether to save the recognition results as picture files.
* `save_path`: Save path of the result, default is 'seg_result'.
## IV. Server Deployment
- PaddleHub Serving can deploy an online service of image segmentation.
- ### Step 1: Start PaddleHub Serving
- Run the startup command:
- ```shell
$ hub serving start -m danet_resnet50_voc
```
- The servitization API is now deployed and the default port number is 8866.
- **NOTE:** If GPU is used for prediction, set CUDA_VISIBLE_DEVICES environment variable before the service, otherwise it need not be set.
- ### Step 2: Send a predictive request
- With a configured server, use the following lines of code to send the prediction request and obtain the result:
```python
import requests
import json
import cv2
import base64
import numpy as np
def cv2_to_base64(image):
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tostring()).decode('utf8')
def base64_to_cv2(b64str):
data = base64.b64decode(b64str.encode('utf8'))
data = np.fromstring(data, np.uint8)
data = cv2.imdecode(data, cv2.IMREAD_COLOR)
return data
org_im = cv2.imread('/PATH/TO/IMAGE')
data = {'images':[cv2_to_base64(org_im)]}
headers = {"Content-type": "application/json"}
url = "http://127.0.0.1:8866/predict/danet_resnet50_voc"
r = requests.post(url=url, headers=headers, data=json.dumps(data))
mask = base64_to_cv2(r.json()["results"][0])
```
## V. Release Note
- 1.0.0
First release
# copyright (c) 2021 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle
import paddle.nn as nn
import paddle.nn.functional as F
from paddle.nn.layer import activation
from paddle.nn import Conv2D, AvgPool2D
def SyncBatchNorm(*args, **kwargs):
"""In cpu environment nn.SyncBatchNorm does not have kernel so use nn.BatchNorm2D instead"""
if paddle.get_device() == 'cpu':
return nn.BatchNorm2D(*args, **kwargs)
else:
return nn.SyncBatchNorm(*args, **kwargs)
class ConvBNLayer(nn.Layer):
"""Basic conv bn relu layer."""
def __init__(
self,
in_channels: int,
out_channels: int,
kernel_size: int,
stride: int = 1,
dilation: int = 1,
groups: int = 1,
is_vd_mode: bool = False,
act: str = None,
name: str = None):
super(ConvBNLayer, self).__init__()
self.is_vd_mode = is_vd_mode
self._pool2d_avg = AvgPool2D(
kernel_size=2, stride=2, padding=0, ceil_mode=True)
self._conv = Conv2D(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=kernel_size,
stride=stride,
padding=(kernel_size - 1) // 2 if dilation == 1 else 0,
dilation=dilation,
groups=groups,
bias_attr=False)
self._batch_norm = SyncBatchNorm(out_channels)
self._act_op = Activation(act=act)
def forward(self, inputs: paddle.Tensor) -> paddle.Tensor:
if self.is_vd_mode:
inputs = self._pool2d_avg(inputs)
y = self._conv(inputs)
y = self._batch_norm(y)
y = self._act_op(y)
return y
class BottleneckBlock(nn.Layer):
"""Residual bottleneck block"""
def __init__(self,
in_channels: int,
out_channels: int,
stride: int,
shortcut: bool = True,
if_first: bool = False,
dilation: int = 1,
name: str = None):
super(BottleneckBlock, self).__init__()
self.conv0 = ConvBNLayer(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=1,
act='relu',
name=name + "_branch2a")
self.dilation = dilation
self.conv1 = ConvBNLayer(
in_channels=out_channels,
out_channels=out_channels,
kernel_size=3,
stride=stride,
act='relu',
dilation=dilation,
name=name + "_branch2b")
self.conv2 = ConvBNLayer(
in_channels=out_channels,
out_channels=out_channels * 4,
kernel_size=1,
act=None,
name=name + "_branch2c")
if not shortcut:
self.short = ConvBNLayer(
in_channels=in_channels,
out_channels=out_channels * 4,
kernel_size=1,
stride=1,
is_vd_mode=False if if_first or stride == 1 else True,
name=name + "_branch1")
self.shortcut = shortcut
def forward(self, inputs: paddle.Tensor) -> paddle.Tensor:
y = self.conv0(inputs)
if self.dilation > 1:
padding = self.dilation
y = F.pad(y, [padding, padding, padding, padding])
conv1 = self.conv1(y)
conv2 = self.conv2(conv1)
if self.shortcut:
short = inputs
else:
short = self.short(inputs)
y = paddle.add(x=short, y=conv2)
y = F.relu(y)
return y
class SeparableConvBNReLU(nn.Layer):
"""Depthwise Separable Convolution."""
def __init__(self,
in_channels: int,
out_channels: int,
kernel_size: int,
padding: str = 'same',
**kwargs: dict):
super(SeparableConvBNReLU, self).__init__()
self.depthwise_conv = ConvBN(
in_channels,
out_channels=in_channels,
kernel_size=kernel_size,
padding=padding,
groups=in_channels,
**kwargs)
self.piontwise_conv = ConvBNReLU(
in_channels, out_channels, kernel_size=1, groups=1)
def forward(self, x: paddle.Tensor) -> paddle.Tensor:
x = self.depthwise_conv(x)
x = self.piontwise_conv(x)
return x
class ConvBN(nn.Layer):
"""Basic conv bn layer"""
def __init__(self,
in_channels: int,
out_channels: int,
kernel_size: int,
padding: str = 'same',
**kwargs: dict):
super(ConvBN, self).__init__()
self._conv = Conv2D(
in_channels, out_channels, kernel_size, padding=padding, **kwargs)
self._batch_norm = SyncBatchNorm(out_channels)
def forward(self, x: paddle.Tensor) -> paddle.Tensor:
x = self._conv(x)
x = self._batch_norm(x)
return x
class ConvBNReLU(nn.Layer):
"""Basic conv bn relu layer."""
def __init__(self,
in_channels: int,
out_channels: int,
kernel_size: int,
padding: str = 'same',
**kwargs: dict):
super(ConvBNReLU, self).__init__()
self._conv = Conv2D(
in_channels, out_channels, kernel_size, padding=padding, **kwargs)
self._batch_norm = SyncBatchNorm(out_channels)
def forward(self, x: paddle.Tensor) -> paddle.Tensor:
x = self._conv(x)
x = self._batch_norm(x)
x = F.relu(x)
return x
class Activation(nn.Layer):
"""
The wrapper of activations.
Args:
act (str, optional): The activation name in lowercase. It must be one of ['elu', 'gelu',
'hardshrink', 'tanh', 'hardtanh', 'prelu', 'relu', 'relu6', 'selu', 'leakyrelu', 'sigmoid',
'softmax', 'softplus', 'softshrink', 'softsign', 'tanhshrink', 'logsigmoid', 'logsoftmax',
'hsigmoid']. Default: None, means identical transformation.
Returns:
A callable object of Activation.
Raises:
KeyError: When parameter `act` is not in the optional range.
Examples:
from paddleseg.models.common.activation import Activation
relu = Activation("relu")
print(relu)
# <class 'paddle.nn.layer.activation.ReLU'>
sigmoid = Activation("sigmoid")
print(sigmoid)
# <class 'paddle.nn.layer.activation.Sigmoid'>
not_exit_one = Activation("not_exit_one")
# KeyError: "not_exit_one does not exist in the current dict_keys(['elu', 'gelu', 'hardshrink',
# 'tanh', 'hardtanh', 'prelu', 'relu', 'relu6', 'selu', 'leakyrelu', 'sigmoid', 'softmax',
# 'softplus', 'softshrink', 'softsign', 'tanhshrink', 'logsigmoid', 'logsoftmax', 'hsigmoid'])"
"""
def __init__(self, act: str = None):
super(Activation, self).__init__()
self._act = act
upper_act_names = activation.__dict__.keys()
lower_act_names = [act.lower() for act in upper_act_names]
act_dict = dict(zip(lower_act_names, upper_act_names))
if act is not None:
if act in act_dict.keys():
act_name = act_dict[act]
self.act_func = eval("activation.{}()".format(act_name))
else:
raise KeyError("{} does not exist in the current {}".format(
act, act_dict.keys()))
def forward(self, x: paddle.Tensor) -> paddle.Tensor:
if self._act is not None:
return self.act_func(x)
else:
return x
class ASPPModule(nn.Layer):
"""
Atrous Spatial Pyramid Pooling.
Args:
aspp_ratios (tuple): The dilation rate using in ASSP module.
in_channels (int): The number of input channels.
out_channels (int): The number of output channels.
align_corners (bool): An argument of F.interpolate. It should be set to False when the output size of feature
is even, e.g. 1024x512, otherwise it is True, e.g. 769x769.
use_sep_conv (bool, optional): If using separable conv in ASPP module. Default: False.
image_pooling (bool, optional): If augmented with image-level features. Default: False
"""
def __init__(self,
aspp_ratios: tuple,
in_channels: int,
out_channels: int,
align_corners: bool,
use_sep_conv: bool= False,
image_pooling: bool = False):
super().__init__()
self.align_corners = align_corners
self.aspp_blocks = nn.LayerList()
for ratio in aspp_ratios:
if use_sep_conv and ratio > 1:
conv_func = SeparableConvBNReLU
else:
conv_func = ConvBNReLU
block = conv_func(
in_channels=in_channels,
out_channels=out_channels,
kernel_size=1 if ratio == 1 else 3,
dilation=ratio,
padding=0 if ratio == 1 else ratio)
self.aspp_blocks.append(block)
out_size = len(self.aspp_blocks)
if image_pooling:
self.global_avg_pool = nn.Sequential(
nn.AdaptiveAvgPool2D(output_size=(1, 1)),
ConvBNReLU(in_channels, out_channels, kernel_size=1, bias_attr=False))
out_size += 1
self.image_pooling = image_pooling
self.conv_bn_relu = ConvBNReLU(
in_channels=out_channels * out_size,
out_channels=out_channels,
kernel_size=1)
self.dropout = nn.Dropout(p=0.1) # drop rate
def forward(self, x: paddle.Tensor) -> paddle.Tensor:
outputs = []
for block in self.aspp_blocks:
y = block(x)
y = F.interpolate(
y,
x.shape[2:],
mode='bilinear',
align_corners=self.align_corners)
outputs.append(y)
if self.image_pooling:
img_avg = self.global_avg_pool(x)
img_avg = F.interpolate(
img_avg,
x.shape[2:],
mode='bilinear',
align_corners=self.align_corners)
outputs.append(img_avg)
x = paddle.concat(outputs, axis=1)
x = self.conv_bn_relu(x)
x = self.dropout(x)
return x
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册