提交 3563b4f5 编写于 作者: W wuzewu

Merge module from release/v1.8

上级 d728f771
## 模型概述
deoldify是用于图像和视频的着色渲染模型,该模型能够实现给黑白照片和视频恢复原彩。
## API 说明
```python
def predict(self, input):
```
着色变换API,得到着色后的图片或者视频。
**参数**
* input(str): 图片或者视频的路径;
**返回**
若输入是图片,返回值为:
* pred_img(np.ndarray): BGR图片数据;
* out_path(str): 保存图片路径。
若输入是视频,返回值为:
* frame_pattern_combined(str): 视频着色后单帧数据保存路径;
* vid_out_path(str): 视频保存路径。
```python
def run_image(self, img):
```
图像着色API, 得到着色后的图片。
**参数**
* img (str|np.ndarray): 图片路径或则BGR格式图片。
**返回**
* pred_img(np.ndarray): BGR图片数据;
```python
def run_video(self, video):
```
视频着色API, 得到着色后的视频。
**参数**
* video (str): 待处理视频路径。
**返回**
* frame_pattern_combined(str): 视频着色后单帧数据保存路径;
* vid_out_path(str): 视频保存路径。
## 预测代码示例
```python
import paddlehub as hub
model = hub.Module(name='deoldify')
model.predict('/PATH/TO/IMAGE/OR/VIDEO')
```
## 服务部署
PaddleHub Serving可以部署一个在线照片着色服务。
## 第一步:启动PaddleHub Serving
运行启动命令:
```shell
$ hub serving start -m deoldify
```
这样就完成了一个图像着色的在线服务API的部署,默认端口号为8866。
**NOTE:** 如使用GPU预测,则需要在启动服务之前,请设置CUDA\_VISIBLE\_DEVICES环境变量,否则不用设置。
## 第二步:发送预测请求
配置好服务端,以下数行代码即可实现发送预测请求,获取预测结果
```python
import requests
import json
import base64
import cv2
import numpy as np
def cv2_to_base64(image):
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tostring()).decode('utf8')
def base64_to_cv2(b64str):
data = base64.b64decode(b64str.encode('utf8'))
data = np.fromstring(data, np.uint8)
data = cv2.imdecode(data, cv2.IMREAD_COLOR)
return data
# 发送HTTP请求
org_im = cv2.imread('/PATH/TO/ORIGIN/IMAGE')
data = {'images':cv2_to_base64(org_im)}
headers = {"Content-type": "application/json"}
url = "http://127.0.0.1:8866/predict/deoldify"
r = requests.post(url=url, headers=headers, data=json.dumps(data))
img = base64_to_cv2(r.json()["results"])
cv2.imwrite('/PATH/TO/SAVE/IMAGE', img)
```
## 模型相关信息
### 模型代码
https://github.com/jantic/DeOldify
### 依赖
paddlepaddle >= 2.0.0rc
paddlehub >= 1.8.3
import paddle
import numpy as np
import paddle.nn as nn
import paddle.nn.functional as F
from paddle.vision.models import resnet101
import deoldify.utils as U
class SequentialEx(nn.Layer):
"Like `nn.Sequential`, but with ModuleList semantics, and can access module input"
def __init__(self, *layers):
super().__init__()
self.layers = nn.LayerList(layers)
def forward(self, x):
res = x
for l in self.layers:
if isinstance(l, MergeLayer):
l.orig = x
nres = l(res)
# We have to remove res.orig to avoid hanging refs and therefore memory leaks
# l.orig = None
res = nres
return res
def __getitem__(self, i):
return self.layers[i]
def append(self, l):
return self.layers.append(l)
def extend(self, l):
return self.layers.extend(l)
def insert(self, i, l):
return self.layers.insert(i, l)
class Deoldify(SequentialEx):
def __init__(self,
encoder,
n_classes,
blur=False,
blur_final=True,
self_attention=False,
y_range=None,
last_cross=True,
bottle=False,
norm_type='Batch',
nf_factor=1,
**kwargs):
imsize = (256, 256)
sfs_szs = U.model_sizes(encoder, size=imsize)
sfs_idxs = list(reversed(_get_sfs_idxs(sfs_szs)))
self.sfs = U.hook_outputs([encoder[i] for i in sfs_idxs], detach=False)
x = U.dummy_eval(encoder, imsize).detach()
nf = 512 * nf_factor
extra_bn = norm_type == 'Spectral'
ni = sfs_szs[-1][1]
middle_conv = nn.Sequential(
custom_conv_layer(ni, ni * 2, norm_type=norm_type, extra_bn=extra_bn),
custom_conv_layer(ni * 2, ni, norm_type=norm_type, extra_bn=extra_bn),
)
layers = [encoder, nn.BatchNorm(ni), nn.ReLU(), middle_conv]
for i, idx in enumerate(sfs_idxs):
not_final = i != len(sfs_idxs) - 1
up_in_c, x_in_c = int(x.shape[1]), int(sfs_szs[idx][1])
do_blur = blur and (not_final or blur_final)
sa = self_attention and (i == len(sfs_idxs) - 3)
n_out = nf if not_final else nf // 2
unet_block = UnetBlockWide(
up_in_c,
x_in_c,
n_out,
self.sfs[i],
final_div=not_final,
blur=blur,
self_attention=sa,
norm_type=norm_type,
extra_bn=extra_bn,
**kwargs)
unet_block.eval()
layers.append(unet_block)
x = unet_block(x)
ni = x.shape[1]
if imsize != sfs_szs[0][-2:]:
layers.append(PixelShuffle_ICNR(ni, **kwargs))
if last_cross:
layers.append(MergeLayer(dense=True))
ni += 3
layers.append(res_block(ni, bottle=bottle, norm_type=norm_type, **kwargs))
layers += [custom_conv_layer(ni, n_classes, ks=1, use_activ=False, norm_type=norm_type)]
if y_range is not None:
layers.append(SigmoidRange(*y_range))
super().__init__(*layers)
def custom_conv_layer(ni: int,
nf: int,
ks: int = 3,
stride: int = 1,
padding: int = None,
bias: bool = None,
is_1d: bool = False,
norm_type='Batch',
use_activ: bool = True,
leaky: float = None,
transpose: bool = False,
self_attention: bool = False,
extra_bn: bool = False,
**kwargs):
"Create a sequence of convolutional (`ni` to `nf`), ReLU (if `use_activ`) and batchnorm (if `bn`) layers."
if padding is None:
padding = (ks - 1) // 2 if not transpose else 0
bn = norm_type in ('Batch', 'Batchzero') or extra_bn == True
if bias is None:
bias = not bn
conv_func = nn.Conv2DTranspose if transpose else nn.Conv1d if is_1d else nn.Conv2D
conv = conv_func(ni, nf, kernel_size=ks, bias_attr=bias, stride=stride, padding=padding)
if norm_type == 'Weight':
conv = nn.utils.weight_norm(conv)
elif norm_type == 'Spectral':
conv = U.Spectralnorm(conv)
layers = [conv]
if use_activ:
layers.append(relu(True, leaky=leaky))
if bn:
layers.append((nn.BatchNorm if is_1d else nn.BatchNorm)(nf))
if self_attention:
layers.append(SelfAttention(nf))
return nn.Sequential(*layers)
def relu(inplace: bool = False, leaky: float = None):
"Return a relu activation, maybe `leaky` and `inplace`."
return nn.LeakyReLU(leaky) if leaky is not None else nn.ReLU()
class UnetBlockWide(nn.Layer):
"A quasi-UNet block, using `PixelShuffle_ICNR upsampling`."
def __init__(self,
up_in_c: int,
x_in_c: int,
n_out: int,
hook,
final_div: bool = True,
blur: bool = False,
leaky: float = None,
self_attention: bool = False,
**kwargs):
super().__init__()
self.hook = hook
up_out = x_out = n_out // 2
self.shuf = CustomPixelShuffle_ICNR(up_in_c, up_out, blur=blur, leaky=leaky, **kwargs)
self.bn = nn.BatchNorm(x_in_c)
ni = up_out + x_in_c
self.conv = custom_conv_layer(ni, x_out, leaky=leaky, self_attention=self_attention, **kwargs)
self.relu = relu(leaky=leaky)
def forward(self, up_in):
s = self.hook.stored
up_out = self.shuf(up_in)
ssh = s.shape[-2:]
if ssh != up_out.shape[-2:]:
up_out = F.interpolate(up_out, s.shape[-2:], mode='nearest')
cat_x = self.relu(paddle.concat([up_out, self.bn(s)], axis=1))
return self.conv(cat_x)
class UnetBlockDeep(nn.Layer):
"A quasi-UNet block, using `PixelShuffle_ICNR upsampling`."
def __init__(
self,
up_in_c: int,
x_in_c: int,
# hook: Hook,
final_div: bool = True,
blur: bool = False,
leaky: float = None,
self_attention: bool = False,
nf_factor: float = 1.0,
**kwargs):
super().__init__()
self.shuf = CustomPixelShuffle_ICNR(up_in_c, up_in_c // 2, blur=blur, leaky=leaky, **kwargs)
self.bn = nn.BatchNorm(x_in_c)
ni = up_in_c // 2 + x_in_c
nf = int((ni if final_div else ni // 2) * nf_factor)
self.conv1 = custom_conv_layer(ni, nf, leaky=leaky, **kwargs)
self.conv2 = custom_conv_layer(nf, nf, leaky=leaky, self_attention=self_attention, **kwargs)
self.relu = relu(leaky=leaky)
def forward(self, up_in):
s = self.hook.stored
up_out = self.shuf(up_in)
ssh = s.shape[-2:]
if ssh != up_out.shape[-2:]:
up_out = F.interpolate(up_out, s.shape[-2:], mode='nearest')
cat_x = self.relu(paddle.concat([up_out, self.bn(s)], axis=1))
return self.conv2(self.conv1(cat_x))
def ifnone(a, b):
"`a` if `a` is not None, otherwise `b`."
return b if a is None else a
class PixelShuffle_ICNR(nn.Layer):
"Upsample by `scale` from `ni` filters to `nf` (default `ni`), using `nn.PixelShuffle`, \
`icnr` init, and `weight_norm`."
def __init__(self,
ni: int,
nf: int = None,
scale: int = 2,
blur: bool = False,
norm_type='Weight',
leaky: float = None):
super().__init__()
nf = ifnone(nf, ni)
self.conv = conv_layer(ni, nf * (scale**2), ks=1, norm_type=norm_type, use_activ=False)
self.shuf = PixelShuffle(scale)
self.pad = ReplicationPad2d([1, 0, 1, 0])
self.blur = nn.AvgPool2D(2, stride=1)
self.relu = relu(True, leaky=leaky)
def forward(self, x):
x = self.shuf(self.relu(self.conv(x)))
return self.blur(self.pad(x)) if self.blur else x
def conv_layer(ni: int,
nf: int,
ks: int = 3,
stride: int = 1,
padding: int = None,
bias: bool = None,
is_1d: bool = False,
norm_type='Batch',
use_activ: bool = True,
leaky: float = None,
transpose: bool = False,
init=None,
self_attention: bool = False):
"Create a sequence of convolutional (`ni` to `nf`), ReLU (if `use_activ`) and batchnorm (if `bn`) layers."
if padding is None: padding = (ks - 1) // 2 if not transpose else 0
bn = norm_type in ('Batch', 'BatchZero')
if bias is None: bias = not bn
conv_func = nn.Conv2DTranspose if transpose else nn.Conv1d if is_1d else nn.Conv2D
conv = conv_func(ni, nf, kernel_size=ks, bias_attr=bias, stride=stride, padding=padding)
if norm_type == 'Weight':
conv = nn.utils.weight_norm(conv)
elif norm_type == 'Spectral':
conv = U.Spectralnorm(conv)
layers = [conv]
if use_activ: layers.append(relu(True, leaky=leaky))
if bn: layers.append((nn.BatchNorm if is_1d else nn.BatchNorm)(nf))
if self_attention: layers.append(SelfAttention(nf))
return nn.Sequential(*layers)
class CustomPixelShuffle_ICNR(nn.Layer):
"Upsample by `scale` from `ni` filters to `nf` (default `ni`), using `nn.PixelShuffle`, `icnr` init, \
and `weight_norm`."
def __init__(self, ni: int, nf: int = None, scale: int = 2, blur: bool = False, leaky: float = None, **kwargs):
super().__init__()
nf = ifnone(nf, ni)
self.conv = custom_conv_layer(ni, nf * (scale**2), ks=1, use_activ=False, **kwargs)
self.shuf = PixelShuffle(scale)
self.pad = ReplicationPad2d([1, 0, 1, 0])
self.blur = paddle.nn.AvgPool2D(2, stride=1)
self.relu = nn.LeakyReLU(leaky) if leaky is not None else nn.ReLU() # relu(True, leaky=leaky)
def forward(self, x):
x = self.shuf(self.relu(self.conv(x)))
return self.blur(self.pad(x)) if self.blur else x
class MergeLayer(nn.Layer):
"Merge a shortcut with the result of the module by adding them or concatenating thme if `dense=True`."
def __init__(self, dense: bool = False):
super().__init__()
self.dense = dense
self.orig = None
def forward(self, x):
out = paddle.concat([x, self.orig], axis=1) if self.dense else (x + self.orig)
self.orig = None
return out
def res_block(nf, dense: bool = False, norm_type='Batch', bottle: bool = False, **conv_kwargs):
"Resnet block of `nf` features. `conv_kwargs` are passed to `conv_layer`."
norm2 = norm_type
if not dense and (norm_type == 'Batch'): norm2 = 'BatchZero'
nf_inner = nf // 2 if bottle else nf
return SequentialEx(
conv_layer(nf, nf_inner, norm_type=norm_type, **conv_kwargs),
conv_layer(nf_inner, nf, norm_type=norm2, **conv_kwargs), MergeLayer(dense))
class SigmoidRange(nn.Layer):
"Sigmoid module with range `(low,x_max)`"
def __init__(self, low, high):
super().__init__()
self.low, self.high = low, high
def forward(self, x):
return sigmoid_range(x, self.low, self.high)
def sigmoid_range(x, low, high):
"Sigmoid function with range `(low, high)`"
return F.sigmoid(x) * (high - low) + low
class PixelShuffle(nn.Layer):
def __init__(self, upscale_factor):
super(PixelShuffle, self).__init__()
self.upscale_factor = upscale_factor
def forward(self, x):
return F.pixel_shuffle(x, self.upscale_factor)
class ReplicationPad2d(nn.Layer):
def __init__(self, size):
super(ReplicationPad2d, self).__init__()
self.size = size
def forward(self, x):
return F.pad(x, self.size, mode="replicate")
def conv1d(ni: int, no: int, ks: int = 1, stride: int = 1, padding: int = 0, bias: bool = False):
"Create and initialize a `nn.Conv1d` layer with spectral normalization."
conv = nn.Conv1D(ni, no, ks, stride=stride, padding=padding, bias_attr=bias)
return U.Spectralnorm(conv)
class SelfAttention(nn.Layer):
"Self attention layer for nd."
def __init__(self, n_channels):
super().__init__()
self.query = conv1d(n_channels, n_channels // 8)
self.key = conv1d(n_channels, n_channels // 8)
self.value = conv1d(n_channels, n_channels)
self.gamma = self.create_parameter(
shape=[1], default_initializer=paddle.nn.initializer.Constant(0.0)) # nn.Parameter(tensor([0.]))
def forward(self, x):
# Notation from https://arxiv.org/pdf/1805.08318.pdf
size = x.shape
x = paddle.reshape(x, list(size[:2]) + [-1])
f, g, h = self.query(x), self.key(x), self.value(x)
beta = paddle.nn.functional.softmax(paddle.bmm(paddle.transpose(f, [0, 2, 1]), g), axis=1)
o = self.gamma * paddle.bmm(h, beta) + x
return paddle.reshape(o, size)
def _get_sfs_idxs(sizes):
"Get the indexes of the layers where the size of the activation changes."
feature_szs = [size[-1] for size in sizes]
sfs_idxs = list(np.where(np.array(feature_szs[:-1]) != np.array(feature_szs[1:]))[0])
if feature_szs[0] != feature_szs[1]:
sfs_idxs = [0] + sfs_idxs
return sfs_idxs
def build_model():
backbone = resnet101()
cut = -2
encoder = nn.Sequential(*list(backbone.children())[:cut])
model = Deoldify(encoder, 3, blur=True, y_range=(-3, 3), norm_type='Spectral', self_attention=True, nf_factor=2)
return model
# coding:utf-8
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import glob
import cv2
import paddle
import paddle.nn as nn
import numpy as np
from PIL import Image
from tqdm import tqdm
import deoldify.utils as U
from paddlehub.module.module import moduleinfo, serving, Module
from deoldify.base_module import build_model
@moduleinfo(
name="deoldify",
type="CV/image_editing",
author="paddlepaddle",
author_email="",
summary="Deoldify is a colorizaton model",
version="1.0.0")
class DeOldifyPredictor(Module):
def _initialize(self, render_factor: int = 32, output_path: int = 'result', load_checkpoint: str = None):
#super(DeOldifyPredictor, self).__init__()
self.model = build_model()
self.render_factor = render_factor
self.output = os.path.join(output_path, 'DeOldify')
if not os.path.exists(self.output):
os.makedirs(self.output)
if load_checkpoint is not None:
state_dict = paddle.load(load_checkpoint)
self.model.load_dict(state_dict)
print("load custom checkpoint success")
else:
checkpoint = os.path.join(self.directory, 'DeOldify_stable.pdparams')
state_dict = paddle.load(checkpoint)
self.model.load_dict(state_dict)
print("load pretrained checkpoint success")
def norm(self, img, render_factor=32, render_base=16):
target_size = render_factor * render_base
img = img.resize((target_size, target_size), resample=Image.BILINEAR)
img = np.array(img).transpose([2, 0, 1]).astype('float32') / 255.0
img_mean = np.array([0.485, 0.456, 0.406]).reshape((3, 1, 1))
img_std = np.array([0.229, 0.224, 0.225]).reshape((3, 1, 1))
img -= img_mean
img /= img_std
return img.astype('float32')
def denorm(self, img):
img_mean = np.array([0.485, 0.456, 0.406]).reshape((3, 1, 1))
img_std = np.array([0.229, 0.224, 0.225]).reshape((3, 1, 1))
img *= img_std
img += img_mean
img = img.transpose((1, 2, 0))
return (img * 255).clip(0, 255).astype('uint8')
def post_process(self, raw_color, orig):
color_np = np.asarray(raw_color)
orig_np = np.asarray(orig)
color_yuv = cv2.cvtColor(color_np, cv2.COLOR_BGR2YUV)
orig_yuv = cv2.cvtColor(orig_np, cv2.COLOR_BGR2YUV)
hires = np.copy(orig_yuv)
hires[:, :, 1:3] = color_yuv[:, :, 1:3]
final = cv2.cvtColor(hires, cv2.COLOR_YUV2BGR)
return final
def run_image(self, img):
if isinstance(img, str):
ori_img = Image.open(img).convert('LA').convert('RGB')
elif isinstance(img, np.ndarray):
ori_img = Image.fromarray(img).convert('LA').convert('RGB')
elif isinstance(img, Image.Image):
ori_img = img
img = self.norm(ori_img, self.render_factor)
x = paddle.to_tensor(img[np.newaxis, ...])
out = self.model(x)
pred_img = self.denorm(out.numpy()[0])
pred_img = Image.fromarray(pred_img)
pred_img = pred_img.resize(ori_img.size, resample=Image.BILINEAR)
pred_img = self.post_process(pred_img, ori_img)
pred_img = cv2.cvtColor(pred_img, cv2.COLOR_RGB2BGR)
return pred_img
def run_video(self, video):
base_name = os.path.basename(video).split('.')[0]
output_path = os.path.join(self.output, base_name)
pred_frame_path = os.path.join(output_path, 'frames_pred')
if not os.path.exists(output_path):
os.makedirs(output_path)
if not os.path.exists(pred_frame_path):
os.makedirs(pred_frame_path)
cap = cv2.VideoCapture(video)
fps = cap.get(cv2.CAP_PROP_FPS)
out_path = U.video2frames(video, output_path)
frames = sorted(glob.glob(os.path.join(out_path, '*.png')))
for frame in tqdm(frames):
pred_img = self.run_image(frame)
pred_img = cv2.cvtColor(pred_img, cv2.COLOR_BGR2RGB)
pred_img = Image.fromarray(pred_img)
frame_name = os.path.basename(frame)
pred_img.save(os.path.join(pred_frame_path, frame_name))
frame_pattern_combined = os.path.join(pred_frame_path, '%08d.png')
vid_out_path = os.path.join(output_path, '{}_deoldify_out.mp4'.format(base_name))
U.frames2video(frame_pattern_combined, vid_out_path, str(int(fps)))
print('Save video result at {}.'.format(vid_out_path))
return frame_pattern_combined, vid_out_path
def predict(self, input):
if not os.path.exists(self.output):
os.makedirs(self.output)
if not U.is_image(input):
return self.run_video(input)
else:
pred_img = self.run_image(input)
if self.output:
base_name = os.path.splitext(os.path.basename(input))[0]
out_path = os.path.join(self.output, base_name + '.png')
cv2.imwrite(out_path, pred_img)
return pred_img, out_path
@serving
def serving_method(self, images, **kwargs):
"""
Run as a service.
"""
images_decode = U.base64_to_cv2(images)
results = self.run_image(img=images_decode)
results = U.cv2_to_base64(results)
return results
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
from __future__ import print_function
import math
import paddle.fluid as fluid
from paddle.fluid.dygraph.nn import Conv2D, Pool2D, BatchNorm, Linear
from paddle.fluid.dygraph.container import Sequential
from paddle.utils.download import get_weights_path_from_url
__all__ = ['ResNet', 'resnet18', 'resnet34', 'resnet50', 'resnet101', 'resnet152']
model_urls = {
'resnet18': ('https://paddle-hapi.bj.bcebos.com/models/resnet18.pdparams', '0ba53eea9bc970962d0ef96f7b94057e'),
'resnet34': ('https://paddle-hapi.bj.bcebos.com/models/resnet34.pdparams', '46bc9f7c3dd2e55b7866285bee91eff3'),
'resnet50': ('https://paddle-hapi.bj.bcebos.com/models/resnet50.pdparams', '5ce890a9ad386df17cf7fe2313dca0a1'),
'resnet101': ('https://paddle-hapi.bj.bcebos.com/models/resnet101.pdparams', 'fb07a451df331e4b0bb861ed97c3a9b9'),
'resnet152': ('https://paddle-hapi.bj.bcebos.com/models/resnet152.pdparams', 'f9c700f26d3644bb76ad2226ed5f5713'),
}
class ConvBNLayer(fluid.dygraph.Layer):
def __init__(self, num_channels, num_filters, filter_size, stride=1, groups=1, act=None):
super(ConvBNLayer, self).__init__()
self._conv = Conv2D(
num_channels=num_channels,
num_filters=num_filters,
filter_size=filter_size,
stride=stride,
padding=(filter_size - 1) // 2,
groups=groups,
act=None,
bias_attr=False)
self._batch_norm = BatchNorm(num_filters, act=act)
def forward(self, inputs):
x = self._conv(inputs)
x = self._batch_norm(x)
return x
class BasicBlock(fluid.dygraph.Layer):
"""residual block of resnet18 and resnet34
"""
expansion = 1
def __init__(self, num_channels, num_filters, stride, shortcut=True):
super(BasicBlock, self).__init__()
self.conv0 = ConvBNLayer(num_channels=num_channels, num_filters=num_filters, filter_size=3, act='relu')
self.conv1 = ConvBNLayer(
num_channels=num_filters, num_filters=num_filters, filter_size=3, stride=stride, act='relu')
if not shortcut:
self.short = ConvBNLayer(num_channels=num_channels, num_filters=num_filters, filter_size=1, stride=stride)
self.shortcut = shortcut
def forward(self, inputs):
y = self.conv0(inputs)
conv1 = self.conv1(y)
if self.shortcut:
short = inputs
else:
short = self.short(inputs)
y = short + conv1
return fluid.layers.relu(y)
class BottleneckBlock(fluid.dygraph.Layer):
"""residual block of resnet50, resnet101 amd resnet152
"""
expansion = 4
def __init__(self, num_channels, num_filters, stride, shortcut=True):
super(BottleneckBlock, self).__init__()
self.conv0 = ConvBNLayer(num_channels=num_channels, num_filters=num_filters, filter_size=1, act='relu')
self.conv1 = ConvBNLayer(
num_channels=num_filters, num_filters=num_filters, filter_size=3, stride=stride, act='relu')
self.conv2 = ConvBNLayer(
num_channels=num_filters, num_filters=num_filters * self.expansion, filter_size=1, act=None)
if not shortcut:
self.short = ConvBNLayer(
num_channels=num_channels, num_filters=num_filters * self.expansion, filter_size=1, stride=stride)
self.shortcut = shortcut
self._num_channels_out = num_filters * self.expansion
def forward(self, inputs):
x = self.conv0(inputs)
conv1 = self.conv1(x)
conv2 = self.conv2(conv1)
if self.shortcut:
short = inputs
else:
short = self.short(inputs)
x = fluid.layers.elementwise_add(x=short, y=conv2)
return fluid.layers.relu(x)
class ResNet(fluid.dygraph.Layer):
"""ResNet model from
`"Deep Residual Learning for Image Recognition" <https://arxiv.org/pdf/1512.03385.pdf>`_
Args:
Block (BasicBlock|BottleneckBlock): block module of model.
depth (int): layers of resnet, default: 50.
num_classes (int): output dim of last fc layer. If num_classes <=0, last fc layer
will not be defined. Default: 1000.
with_pool (bool): use pool before the last fc layer or not. Default: True.
classifier_activation (str): activation for the last fc layer. Default: 'softmax'.
Examples:
.. code-block:: python
from paddle.vision.models import ResNet
from paddle.vision.models.resnet import BottleneckBlock, BasicBlock
resnet50 = ResNet(BottleneckBlock, 50)
resnet18 = ResNet(BasicBlock, 18)
"""
def __init__(self, Block, depth=50, num_classes=1000, with_pool=True, classifier_activation='softmax'):
super(ResNet, self).__init__()
self.num_classes = num_classes
self.with_pool = with_pool
layer_config = {
18: [2, 2, 2, 2],
34: [3, 4, 6, 3],
50: [3, 4, 6, 3],
101: [3, 4, 23, 3],
152: [3, 8, 36, 3],
}
assert depth in layer_config.keys(), \
"supported depth are {} but input layer is {}".format(
layer_config.keys(), depth)
layers = layer_config[depth]
in_channels = 64
out_channels = [64, 128, 256, 512]
self.conv = ConvBNLayer(num_channels=3, num_filters=64, filter_size=7, stride=2, act='relu')
self.pool = Pool2D(pool_size=3, pool_stride=2, pool_padding=1, pool_type='max')
self.layers = []
for idx, num_blocks in enumerate(layers):
blocks = []
shortcut = False
for b in range(num_blocks):
if b == 1:
in_channels = out_channels[idx] * Block.expansion
block = Block(
num_channels=in_channels,
num_filters=out_channels[idx],
stride=2 if b == 0 and idx != 0 else 1,
shortcut=shortcut)
blocks.append(block)
shortcut = True
layer = self.add_sublayer("layer_{}".format(idx), Sequential(*blocks))
self.layers.append(layer)
if with_pool:
self.global_pool = Pool2D(pool_size=7, pool_type='avg', global_pooling=True)
if num_classes > 0:
stdv = 1.0 / math.sqrt(out_channels[-1] * Block.expansion * 1.0)
self.fc_input_dim = out_channels[-1] * Block.expansion * 1 * 1
self.fc = Linear(
self.fc_input_dim,
num_classes,
act=classifier_activation,
param_attr=fluid.param_attr.ParamAttr(initializer=fluid.initializer.Uniform(-stdv, stdv)))
def forward(self, inputs):
x = self.conv(inputs)
x = self.pool(x)
for layer in self.layers:
x = layer(x)
if self.with_pool:
x = self.global_pool(x)
if self.num_classes > -1:
x = fluid.layers.reshape(x, shape=[-1, self.fc_input_dim])
x = self.fc(x)
return x
def _resnet(arch, Block, depth, pretrained, **kwargs):
model = ResNet(Block, depth, **kwargs)
if pretrained:
assert arch in model_urls, "{} model do not have a pretrained model now, you should set pretrained=False".format(
arch)
weight_path = get_weights_path_from_url(model_urls[arch][0], model_urls[arch][1])
assert weight_path.endswith('.pdparams'), "suffix of weight must be .pdparams"
param, _ = fluid.load_dygraph(weight_path)
model.set_dict(param)
return model
def resnet18(pretrained=False, **kwargs):
"""ResNet 18-layer model
Args:
pretrained (bool): If True, returns a model pre-trained on ImageNet
Examples:
.. code-block:: python
from paddle.vision.models import resnet18
# build model
model = resnet18()
# build model and load imagenet pretrained weight
# model = resnet18(pretrained=True)
"""
return _resnet('resnet18', BasicBlock, 18, pretrained, **kwargs)
def resnet34(pretrained=False, **kwargs):
"""ResNet 34-layer model
Args:
pretrained (bool): If True, returns a model pre-trained on ImageNet
Examples:
.. code-block:: python
from paddle.vision.models import resnet34
# build model
model = resnet34()
# build model and load imagenet pretrained weight
# model = resnet34(pretrained=True)
"""
return _resnet('resnet34', BasicBlock, 34, pretrained, **kwargs)
def resnet50(pretrained=False, **kwargs):
"""ResNet 50-layer model
Args:
pretrained (bool): If True, returns a model pre-trained on ImageNet
Examples:
.. code-block:: python
from paddle.vision.models import resnet50
# build model
model = resnet50()
# build model and load imagenet pretrained weight
# model = resnet50(pretrained=True)
"""
return _resnet('resnet50', BottleneckBlock, 50, pretrained, **kwargs)
def resnet101(pretrained=False, **kwargs):
"""ResNet 101-layer model
Args:
pretrained (bool): If True, returns a model pre-trained on ImageNet
Examples:
.. code-block:: python
from paddle.vision.models import resnet101
# build model
model = resnet101()
# build model and load imagenet pretrained weight
# model = resnet101(pretrained=True)
"""
return _resnet('resnet101', BottleneckBlock, 101, pretrained, **kwargs)
def resnet152(pretrained=False, **kwargs):
"""ResNet 152-layer model
Args:
pretrained (bool): If True, returns a model pre-trained on ImageNet
Examples:
.. code-block:: python
from paddle.vision.models import resnet152
# build model
model = resnet152()
# build model and load imagenet pretrained weight
# model = resnet152(pretrained=True)
"""
return _resnet('resnet152', BottleneckBlock, 152, pretrained, **kwargs)
import os
import sys
import base64
import cv2
import numpy as np
import paddle
import paddle.nn as nn
from PIL import Image
def cv2_to_base64(image):
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tostring()).decode('utf8')
def base64_to_cv2(b64str):
data = base64.b64decode(b64str.encode('utf8'))
data = np.fromstring(data, np.uint8)
data = cv2.imdecode(data, cv2.IMREAD_COLOR)
return data
def is_listy(x):
return isinstance(x, (tuple, list))
class Hook():
"Create a hook on `m` with `hook_func`."
def __init__(self, m, hook_func, is_forward=True, detach=True):
self.hook_func, self.detach, self.stored = hook_func, detach, None
f = m.register_forward_post_hook if is_forward else m.register_backward_hook
self.hook = f(self.hook_fn)
self.removed = False
def hook_fn(self, module, input, output):
"Applies `hook_func` to `module`, `input`, `output`."
if self.detach:
input = (o.detach() for o in input) if is_listy(input) else input.detach()
output = (o.detach() for o in output) if is_listy(output) else output.detach()
self.stored = self.hook_func(module, input, output)
def remove(self):
"Remove the hook from the model."
if not self.removed:
self.hook.remove()
self.removed = True
def __enter__(self, *args):
return self
def __exit__(self, *args):
self.remove()
class Hooks():
"Create several hooks on the modules in `ms` with `hook_func`."
def __init__(self, ms, hook_func, is_forward=True, detach=True):
self.hooks = []
try:
for m in ms:
self.hooks.append(Hook(m, hook_func, is_forward, detach))
except Exception as e:
pass
def __getitem__(self, i: int) -> Hook:
return self.hooks[i]
def __len__(self) -> int:
return len(self.hooks)
def __iter__(self):
return iter(self.hooks)
@property
def stored(self):
return [o.stored for o in self]
def remove(self):
"Remove the hooks from the model."
for h in self.hooks:
h.remove()
def __enter__(self, *args):
return self
def __exit__(self, *args):
self.remove()
def _hook_inner(m, i, o):
return o if isinstance(o, paddle.fluid.framework.Variable) else o if is_listy(o) else list(o)
def hook_output(module, detach=True, grad=False):
"Return a `Hook` that stores activations of `module` in `self.stored`"
return Hook(module, _hook_inner, detach=detach, is_forward=not grad)
def hook_outputs(modules, detach=True, grad=False):
"Return `Hooks` that store activations of all `modules` in `self.stored`"
return Hooks(modules, _hook_inner, detach=detach, is_forward=not grad)
def model_sizes(m, size=(64, 64)):
"Pass a dummy input through the model `m` to get the various sizes of activations."
with hook_outputs(m) as hooks:
x = dummy_eval(m, size)
return [o.stored.shape for o in hooks]
def dummy_eval(m, size=(64, 64)):
"Pass a `dummy_batch` in evaluation mode in `m` with `size`."
m.eval()
return m(dummy_batch(size))
def dummy_batch(size=(64, 64), ch_in=3):
"Create a dummy batch to go through `m` with `size`."
arr = np.random.rand(1, ch_in, *size).astype('float32') * 2 - 1
return paddle.to_tensor(arr)
class _SpectralNorm(nn.SpectralNorm):
def __init__(self, weight_shape, dim=0, power_iters=1, eps=1e-12, dtype='float32'):
super(_SpectralNorm, self).__init__(weight_shape, dim, power_iters, eps, dtype)
def forward(self, weight):
inputs = {'Weight': weight, 'U': self.weight_u, 'V': self.weight_v}
out = self._helper.create_variable_for_type_inference(self._dtype)
_power_iters = self._power_iters if self.training else 0
self._helper.append_op(
type="spectral_norm",
inputs=inputs,
outputs={
"Out": out,
},
attrs={
"dim": self._dim,
"power_iters": _power_iters,
"eps": self._eps,
})
return out
class Spectralnorm(paddle.nn.Layer):
def __init__(self, layer, dim=0, power_iters=1, eps=1e-12, dtype='float32'):
super(Spectralnorm, self).__init__()
self.spectral_norm = _SpectralNorm(layer.weight.shape, dim, power_iters, eps, dtype)
self.dim = dim
self.power_iters = power_iters
self.eps = eps
self.layer = layer
weight = layer._parameters['weight']
del layer._parameters['weight']
self.weight_orig = self.create_parameter(weight.shape, dtype=weight.dtype)
self.weight_orig.set_value(weight)
def forward(self, x):
weight = self.spectral_norm(self.weight_orig)
self.layer.weight = weight
out = self.layer(x)
return out
def video2frames(video_path, outpath, **kargs):
def _dict2str(kargs):
cmd_str = ''
for k, v in kargs.items():
cmd_str += (' ' + str(k) + ' ' + str(v))
return cmd_str
ffmpeg = ['ffmpeg ', ' -y -loglevel ', ' error ']
vid_name = video_path.split('/')[-1].split('.')[0]
out_full_path = os.path.join(outpath, vid_name)
if not os.path.exists(out_full_path):
os.makedirs(out_full_path)
# video file name
outformat = out_full_path + '/%08d.png'
cmd = ffmpeg
cmd = ffmpeg + [' -i ', video_path, ' -start_number ', ' 0 ', outformat]
cmd = ''.join(cmd) + _dict2str(kargs)
if os.system(cmd) != 0:
raise RuntimeError('ffmpeg process video: {} error'.format(vid_name))
sys.stdout.flush()
return out_full_path
def frames2video(frame_path, video_path, r):
ffmpeg = ['ffmpeg ', ' -y -loglevel ', ' error ']
cmd = ffmpeg + [
' -r ', r, ' -f ', ' image2 ', ' -i ', frame_path, ' -vcodec ', ' libx264 ', ' -pix_fmt ', ' yuv420p ',
' -crf ', ' 16 ', video_path
]
cmd = ''.join(cmd)
if os.system(cmd) != 0:
raise RuntimeError('ffmpeg process video: {} error'.format(video_path))
sys.stdout.flush()
def is_image(input):
try:
img = Image.open(input)
_ = img.size
return True
except:
return False
## 模型概述
photo_restoration 是针对老照片修复的模型。它主要由两个部分组成:着色和超分。着色模型基于deoldify
,超分模型基于realsr. 用户可以根据自己的需求选择对图像进行着色或超分操作。因此在使用该模型时,请预先安装deoldify和realsr两个模型。
## API
```python
def run_image(self,
input,
model_select= ['Colorization', 'SuperResolution'],
save_path = 'photo_restoration'):
```
预测API,用于图片修复。
**参数**
* input (numpy.ndarray|str): 图片数据,numpy.ndarray 或者 str形式。ndarray.shape 为 \[H, W, C\],BGR格式; str为图片的路径。
* model_select (list\[str\]): 选择对图片对操作,\['Colorization'\]对图像只进行着色操作, \['SuperResolution'\]对图像只进行超分操作;
默认值为\['Colorization', 'SuperResolution'\]
* save_path (str): 保存图片的路径, 默认为'photo_restoration'。
**返回**
* output (numpy.ndarray): 照片修复结果,ndarray.shape 为 \[H, W, C\],BGR格式。
## 代码示例
图片修复代码示例:
```python
import cv2
import paddlehub as hub
model = hub.Module(name='photo_restoration', visualization=True)
im = cv2.imread('/PATH/TO/IMAGE')
res = model.run_image(im)
```
## 服务部署
PaddleHub Serving可以部署一个照片修复的在线服务。
## 第一步:启动PaddleHub Serving
运行启动命令:
```shell
$ hub serving start -m photo_restoration
```
这样就完成了一个照片修复的服务化API的部署,默认端口号为8866。
**NOTE:** 如使用GPU预测,则需要在启动服务之前,设置CUDA\_VISIBLE\_DEVICES环境变量,否则不用设置。
## 第二步:发送预测请求
配置好服务端,以下数行代码即可实现发送预测请求,获取预测结果
```python
import requests
import json
import base64
import cv2
import numpy as np
def cv2_to_base64(image):
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tostring()).decode('utf8')
def base64_to_cv2(b64str):
data = base64.b64decode(b64str.encode('utf8'))
data = np.fromstring(data, np.uint8)
data = cv2.imdecode(data, cv2.IMREAD_COLOR)
return data
# 发送HTTP请求
org_im = cv2.imread('PATH/TO/IMAGE')
data = {'images':cv2_to_base64(org_im), 'model_select': ['Colorization', 'SuperResolution']}
headers = {"Content-type": "application/json"}
url = "http://127.0.0.1:8866/predict/photo_restoration"
r = requests.post(url=url, headers=headers, data=json.dumps(data))
img = base64_to_cv2(r.json()["results"])
cv2.imwrite('PATH/TO/SAVE/IMAGE', img)
```
### 依赖
paddlepaddle >= 2.0.0rc
paddlehub >= 1.8.2
# coding:utf-8
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
import cv2
import paddle.nn as nn
import paddlehub as hub
from paddlehub.module.module import moduleinfo, serving, Module
import photo_restoration.utils as U
@moduleinfo(
name="photo_restoration",
type="CV/image_editing",
author="paddlepaddle",
author_email="",
summary="photo_restoration is a photo restoration model based on deoldify and realsr.",
version="1.0.0")
class PhotoRestoreModel(Module):
"""
PhotoRestoreModel
Args:
load_checkpoint(str): Checkpoint save path, default is None.
visualization (bool): Whether to save the estimation result. Default is True.
"""
def _initialize(self, visualization: bool = False):
#super(PhotoRestoreModel, self).__init__()
self.deoldify = hub.Module(name='deoldify')
self.realsr = hub.Module(name='realsr')
self.visualization = visualization
def run_image(self,
input,
model_select: list = ['Colorization', 'SuperResolution'],
save_path: str = 'photo_restoration'):
self.models = []
for model in model_select:
print('\n {} model proccess start..'.format(model))
if model == 'Colorization':
self.deoldify.eval()
self.models.append(self.deoldify)
if model == 'SuperResolution':
self.realsr.eval()
self.models.append(self.realsr)
for model in self.models:
output = model.run_image(input)
input = output
if self.visualization:
if not os.path.exists(save_path):
os.mkdir(save_path)
img_name = str(time.time()) + '.png'
save_img = os.path.join(save_path, img_name)
cv2.imwrite(save_img, output)
print("save result at: ", save_img)
return output
@serving
def serving_method(self, images, model_select):
"""
Run as a service.
"""
print(model_select)
images_decode = U.base64_to_cv2(images)
results = self.run_image(input=images_decode, model_select=model_select)
results = U.cv2_to_base64(results)
return results
import base64
import cv2
import numpy as np
def cv2_to_base64(image):
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tostring()).decode('utf8')
def base64_to_cv2(b64str):
data = base64.b64decode(b64str.encode('utf8'))
data = np.fromstring(data, np.uint8)
data = cv2.imdecode(data, cv2.IMREAD_COLOR)
return data
## 模型概述
DCSCN是基于Fast and Accurate Image Super Resolution by Deep CNN with Skip Connection and Network in Network设计的轻量化超分辨模型。该模型使用残差结构和跳连的方式构建网络来提取局部和全局特征,同时使用并行1*1的卷积网络学习细节特征提升模型性能。该模型提供的超分倍数为2倍。
## 命令行预测
```
$ hub run dcscn --input_path "/PATH/TO/IMAGE"
```
## API
```python
def reconstruct(self,
images=None,
paths=None,
use_gpu=False,
visualization=False,
output_dir="dcscn_output")
```
预测API,用于图像超分辨率。
**参数**
* images (list\[numpy.ndarray\]): 图片数据,ndarray.shape 为 \[H, W, C\],BGR格式;
* paths (list\[str\]): 图片的路径;
* use\_gpu (bool): 是否使用 GPU预测,如果使用GPU预测,则在预测之前,请设置CUDA_VISIBLE_DEVICES环境变量,否则不用设置;
* visualization (bool): 是否将识别结果保存为图片文件;
* output\_dir (str): 图片的保存路径。
**返回**
* res (list\[dict\]): 识别结果的列表,列表中每一个元素为 dict,关键字有 'save\_path', 'data',对应的取值为:
* save\_path (str, optional): 可视化图片的保存路径(仅当visualization=True时存在);
* data (numpy.ndarray): 超分辨后图像。
```python
def save_inference_model(self,
dirname='dcscn_save_model',
model_filename=None,
params_filename=None,
combined=False)
```
将模型保存到指定路径。
**参数**
* dirname: 存在模型的目录名称
* model\_filename: 模型文件名称,默认为\_\_model\_\_
* params\_filename: 参数文件名称,默认为\_\_params\_\_(仅当`combined`为True时生效)
* combined: 是否将参数保存到统一的一个文件中
## 代码示例
```python
import cv2
import paddlehub as hub
sr_model = hub.Module(name='dcscn')
im = cv2.imread('/PATH/TO/IMAGE').astype('float32')
#visualization=True可以用于查看超分图片效果,可设置为False提升运行速度。
res = sr_model.reconstruct(images=[im], visualization=True)
print(res[0]['data'])
sr_model.save_inference_model()
```
## 服务部署
PaddleHub Serving可以部署一个图像超分的在线服务。
## 第一步:启动PaddleHub Serving
运行启动命令:
```shell
$ hub serving start -m dcscn
```
这样就完成了一个超分任务的服务化API的部署,默认端口号为8866。
**NOTE:** 如使用GPU预测,则需要在启动服务之前,设置CUDA_VISIBLE_DEVICES环境变量,否则不用设置。
## 第二步:发送预测请求
配置好服务端,以下数行代码即可实现发送预测请求,获取预测结果
```python
import requests
import json
import base64
import cv2
import numpy as np
def cv2_to_base64(image):
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tostring()).decode('utf8')
def base64_to_cv2(b64str):
data = base64.b64decode(b64str.encode('utf8'))
data = np.fromstring(data, np.uint8)
data = cv2.imdecode(data, cv2.IMREAD_COLOR)
return data
# 发送HTTP请求
org_im = cv2.imread('/PATH/TO/IMAGE')
data = {'images':[cv2_to_base64(org_im)]}
headers = {"Content-type": "application/json"}
url = "http://127.0.0.1:8866/predict/dcscn"
r = requests.post(url=url, headers=headers, data=json.dumps(data))
sr = np.expand_dims(cv2.cvtColor(base64_to_cv2(r.json()["results"][0]['data']), cv2.COLOR_BGR2GRAY), axis=2)
shape =sr.shape
org_im = cv2.cvtColor(org_im, cv2.COLOR_BGR2YUV)
uv = cv2.resize(org_im[...,1:], (shape[1], shape[0]), interpolation=cv2.INTER_CUBIC)
combine_im = cv2.cvtColor(np.concatenate((sr, uv), axis=2), cv2.COLOR_YUV2BGR)
cv2.imwrite('dcscn_X2.png', combine_im)
print("save image as dcscn_X2.png")
```
### 查看代码
https://github.com/jiny2001/dcscn-super-resolution
### 依赖
paddlepaddle >= 1.8.0
paddlehub >= 1.7.1
# -*- coding:utf-8 -*-
import os
import time
from collections import OrderedDict
import cv2
import numpy as np
from PIL import Image
__all__ = ['reader']
def reader(images=None, paths=None):
"""
Preprocess to yield image.
Args:
images (list(numpy.ndarray)): images data, shape of each is [H, W, C]
paths (list[str]): paths to images.
Yield:
each (collections.OrderedDict): info of original image, preprocessed image.
"""
component = list()
if paths:
for im_path in paths:
each = OrderedDict()
assert os.path.isfile(im_path), "The {} isn't a valid file path.".format(im_path)
im = cv2.imread(im_path).astype('float32')
each['org_im'] = im
each['org_im_path'] = im_path
each['org_im_shape'] = im.shape
component.append(each)
if images is not None:
assert type(images) is list, "images should be a list."
for im in images:
im = im.astype(np.float32)
each = OrderedDict()
each['org_im'] = im
each['org_im_path'] = 'ndarray_time={}'.format(round(time.time(), 6) * 1e6)
each['org_im_shape'] = im.shape
component.append(each)
for element in component:
img = element['org_im'].copy()
img = cv2.cvtColor(img, cv2.COLOR_BGR2YUV)
shape = img.shape
img_x = np.expand_dims(img[:, :, 0], axis=2)
img_x2 = np.expand_dims(cv2.resize(img_x, (shape[1] * 2, shape[0] * 2), interpolation=cv2.INTER_CUBIC), axis=2)
img_x = img_x.transpose((2, 0, 1)) / 255
img_x2 = img_x2.transpose(2, 0, 1) / 255
img_x = img_x.astype(np.float32)
img_x2 = img_x2.astype(np.float32)
element['img_x'] = img_x
element['img_x2'] = img_x2
yield element
if __name__ == "__main__":
path = ['photo.jpg']
reader(paths=path)
# -*- coding:utf-8 -*-
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ast
import os
import argparse
import numpy as np
import paddle.fluid as fluid
import paddlehub as hub
from paddle.fluid.core import PaddleTensor, AnalysisConfig, create_paddle_predictor
from paddlehub.module.module import moduleinfo, runnable, serving
from dcscn.data_feed import reader
from dcscn.processor import postprocess, base64_to_cv2, cv2_to_base64, check_dir
@moduleinfo(
name="dcscn",
type="CV/image_editing",
author="paddlepaddle",
author_email="",
summary="dcscn is a super resolution model.",
version="1.0.0")
class Dcscn(hub.Module):
def _initialize(self):
self.default_pretrained_model_path = os.path.join(self.directory, "dcscn_model")
self._set_config()
def _set_config(self):
"""
predictor config setting
"""
self.model_file_path = self.default_pretrained_model_path
cpu_config = AnalysisConfig(self.model_file_path)
cpu_config.disable_glog_info()
cpu_config.disable_gpu()
self.cpu_predictor = create_paddle_predictor(cpu_config)
try:
_places = os.environ["CUDA_VISIBLE_DEVICES"]
int(_places[0])
use_gpu = True
except:
use_gpu = False
if use_gpu:
gpu_config = AnalysisConfig(self.model_file_path)
gpu_config.disable_glog_info()
gpu_config.enable_use_gpu(memory_pool_init_size_mb=1000, device_id=0)
self.gpu_predictor = create_paddle_predictor(gpu_config)
def reconstruct(self, images=None, paths=None, use_gpu=False, visualization=False, output_dir="dcscn_output"):
"""
API for super resolution.
Args:
images (list(numpy.ndarray)): images data, shape of each is [H, W, C], the color space is BGR.
paths (list[str]): The paths of images.
use_gpu (bool): Whether to use gpu.
visualization (bool): Whether to save image or not.
output_dir (str): The path to store output images.
Returns:
res (list[dict]): each element in the list is a dict, the keys and values are:
save_path (str, optional): the path to save images. (Exists only if visualization is True)
data (numpy.ndarray): data of post processed image.
"""
if use_gpu:
try:
_places = os.environ["CUDA_VISIBLE_DEVICES"]
int(_places[0])
except:
raise RuntimeError(
"Environment Variable CUDA_VISIBLE_DEVICES is not set correctly. If you wanna use gpu, please set CUDA_VISIBLE_DEVICES as cuda_device_id."
)
all_data = list()
for yield_data in reader(images, paths):
all_data.append(yield_data)
total_num = len(all_data)
res = list()
for i in range(total_num):
image_x = np.array([all_data[i]['img_x']])
image_x2 = np.array([all_data[i]['img_x2']])
dropout = np.array([0])
image_x = PaddleTensor(image_x.copy())
image_x2 = PaddleTensor(image_x2.copy())
drop_out = PaddleTensor(dropout.copy())
output = self.gpu_predictor.run([image_x, image_x2]) if use_gpu else self.cpu_predictor.run(
[image_x, image_x2])
output = np.expand_dims(output[0].as_ndarray(), axis=1)
out = postprocess(
data_out=output,
org_im=all_data[i]['org_im'],
org_im_shape=all_data[i]['org_im_shape'],
org_im_path=all_data[i]['org_im_path'],
output_dir=output_dir,
visualization=visualization)
res.append(out)
return res
def save_inference_model(self,
dirname='dcscn_save_model',
model_filename=None,
params_filename=None,
combined=False):
if combined:
model_filename = "__model__" if not model_filename else model_filename
params_filename = "__params__" if not params_filename else params_filename
place = fluid.CPUPlace()
exe = fluid.Executor(place)
program, feeded_var_names, target_vars = fluid.io.load_inference_model(
dirname=self.default_pretrained_model_path, executor=exe)
fluid.io.save_inference_model(
dirname=dirname,
main_program=program,
executor=exe,
feeded_var_names=feeded_var_names,
target_vars=target_vars,
model_filename=model_filename,
params_filename=params_filename)
@serving
def serving_method(self, images, **kwargs):
"""
Run as a service.
"""
images_decode = [base64_to_cv2(image) for image in images]
results = self.reconstruct(images=images_decode, **kwargs)
results = [{'data': cv2_to_base64(result['data'])} for result in results]
return results
@runnable
def run_cmd(self, argvs):
"""
Run as a command.
"""
self.parser = argparse.ArgumentParser(
description="Run the {} module.".format(self.name),
prog='hub run {}'.format(self.name),
usage='%(prog)s',
add_help=True)
self.arg_input_group = self.parser.add_argument_group(title="Input options", description="Input data. Required")
self.arg_config_group = self.parser.add_argument_group(
title="Config options", description="Run configuration for controlling module behavior, not required.")
self.add_module_config_arg()
self.add_module_input_arg()
args = self.parser.parse_args(argvs)
results = self.reconstruct(
paths=[args.input_path], use_gpu=args.use_gpu, output_dir=args.output_dir, visualization=args.visualization)
if args.save_dir is not None:
check_dir(args.save_dir)
self.save_inference_model(args.save_dir)
return results
def add_module_config_arg(self):
"""
Add the command config options.
"""
self.arg_config_group.add_argument(
'--use_gpu', type=ast.literal_eval, default=False, help="whether use GPU or not")
self.arg_config_group.add_argument(
'--output_dir', type=str, default='dcscn_output', help="The directory to save output images.")
self.arg_config_group.add_argument(
'--save_dir', type=str, default='dcscn_save_model', help="The directory to save model.")
self.arg_config_group.add_argument(
'--visualization', type=ast.literal_eval, default=True, help="whether to save output as images.")
def add_module_input_arg(self):
"""
Add the command input options.
"""
self.arg_input_group.add_argument('--input_path', type=str, help="path to image.")
if __name__ == "__main__":
module = Dcscn()
#module.reconstruct(paths=["BSD100_001.png","BSD100_002.png"])
import cv2
img = cv2.imread("BSD100_001.png").astype('float32')
res = module.reconstruct(images=[img])
module.save_inference_model()
# -*- coding:utf-8 -*-
import os
import time
import base64
import cv2
import numpy as np
__all__ = ['cv2_to_base64', 'base64_to_cv2', 'postprocess']
def cv2_to_base64(image):
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tostring()).decode('utf8')
def base64_to_cv2(b64str):
data = base64.b64decode(b64str.encode('utf8'))
data = np.fromstring(data, np.uint8)
data = cv2.imdecode(data, cv2.IMREAD_COLOR)
return data
def postprocess(data_out, org_im, org_im_shape, org_im_path, output_dir, visualization):
"""
Postprocess output of network. one image at a time.
Args:
data_out (numpy.ndarray): output of network.
org_im (numpy.ndarray): original image.
org_im_shape (list): shape pf original image.
org_im_path (list): path of riginal image.
output_dir (str): output directory to store image.
visualization (bool): whether to save image or not.
Returns:
result (dict): The data of processed image.
"""
result = dict()
for sr in data_out:
sr = np.squeeze(sr, 0)
sr = np.clip(sr * 255, 0, 255)
sr = sr.astype(np.uint8)
shape = sr.shape
if visualization:
org_im = cv2.cvtColor(org_im, cv2.COLOR_BGR2YUV)
uv = cv2.resize(org_im[..., 1:], (shape[1], shape[0]), interpolation=cv2.INTER_CUBIC)
combine_im = cv2.cvtColor(np.concatenate((sr, uv), axis=2), cv2.COLOR_YUV2BGR)
check_dir(output_dir)
save_im_path = get_save_image_name(org_im, org_im_path, output_dir)
cv2.imwrite(save_im_path, combine_im)
print("save image at: ", save_im_path)
result['save_path'] = save_im_path
result['data'] = sr
else:
result['data'] = sr
return result
def check_dir(dir_path):
if not os.path.exists(dir_path):
os.makedirs(dir_path)
elif os.path.isfile(dir_path):
os.remove(dir_path)
os.makedirs(dir_path)
def get_save_image_name(org_im, org_im_path, output_dir):
"""
Get save image name from source image path.
"""
# name prefix of orginal image
org_im_name = os.path.split(org_im_path)[-1]
im_prefix = os.path.splitext(org_im_name)[0]
ext = '.png'
# save image path
save_im_path = os.path.join(output_dir, im_prefix + ext)
if os.path.exists(save_im_path):
save_im_path = os.path.join(output_dir, im_prefix + 'time={}'.format(int(time.time())) + ext)
return save_im_path
## 模型概述
falsr_a是基于Fast, Accurate and Lightweight Super-Resolution with Neural Architecture Search设计的轻量化超分辨模型。该模型使用多目标方法处理超分问题,同时使用基于混合控制器的弹性搜索策略来提升模型性能。该模型提供的超分倍数为2倍。
## 命令行预测
```
$ hub run falsr_a --input_path "/PATH/TO/IMAGE"
```
## API
```python
def reconstruct(self,
images=None,
paths=None,
use_gpu=False,
visualization=False,
output_dir="falsr_a_output")
```
预测API,用于图像超分辨率。
**参数**
* images (list\[numpy.ndarray\]): 图片数据,ndarray.shape 为 \[H, W, C\],BGR格式;
* paths (list\[str\]): 图片的路径;
* use\_gpu (bool): 是否使用 GPU预测,如果使用GPU预测,则在预测之前,请设置CUDA_VISIBLE_DEVICES环境变量,否则不用设置;
* visualization (bool): 是否将识别结果保存为图片文件;
* output\_dir (str): 图片的保存路径。
**返回**
* res (list\[dict\]): 识别结果的列表,列表中每一个元素为 dict,关键字有 'save\_path', 'data',对应的取值为:
* save\_path (str, optional): 可视化图片的保存路径(仅当visualization=True时存在);
* data (numpy.ndarray): 超分辨后图像。
```python
def save_inference_model(self,
dirname='falsr_a_save_model',
model_filename=None,
params_filename=None,
combined=False)
```
将模型保存到指定路径。
**参数**
* dirname: 存在模型的目录名称
* model\_filename: 模型文件名称,默认为\_\_model\_\_
* params\_filename: 参数文件名称,默认为\_\_params\_\_(仅当`combined`为True时生效)
* combined: 是否将参数保存到统一的一个文件中
## 代码示例
```python
import cv2
import paddlehub as hub
sr_model = hub.Module(name='falsr_a')
im = cv2.imread('/PATH/TO/IMAGE').astype('float32')
#visualization=True可以用于查看超分图片效果,可设置为False提升运行速度。
res = sr_model.reconstruct(images=[im], visualization=True)
print(res[0]['data'])
sr_model.save_inference_model()
```
## 服务部署
PaddleHub Serving可以部署一个图像超分的在线服务。
## 第一步:启动PaddleHub Serving
运行启动命令:
```shell
$ hub serving start -m falsr_a
```
这样就完成了一个超分任务的服务化API的部署,默认端口号为8866。
**NOTE:** 如使用GPU预测,则需要在启动服务之前,设置CUDA_VISIBLE_DEVICES环境变量,否则不用设置。
## 第二步:发送预测请求
配置好服务端,以下数行代码即可实现发送预测请求,获取预测结果
```python
import requests
import json
import base64
import cv2
import numpy as np
def cv2_to_base64(image):
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tostring()).decode('utf8')
def base64_to_cv2(b64str):
data = base64.b64decode(b64str.encode('utf8'))
data = np.fromstring(data, np.uint8)
data = cv2.imdecode(data, cv2.IMREAD_COLOR)
return data
# 发送HTTP请求
org_im = cv2.imread('/PATH/TO/IMAGE')
data = {'images':[cv2_to_base64(org_im)]}
headers = {"Content-type": "application/json"}
url = "http://127.0.0.1:8866/predict/falsr_a"
r = requests.post(url=url, headers=headers, data=json.dumps(data))
sr = base64_to_cv2(r.json()["results"][0]['data'])
cv2.imwrite('falsr_a_X2.png', sr)
print("save image as falsr_a_X2.png")
```
### 查看代码
https://github.com/xiaomi-automl/FALSR
### 依赖
paddlepaddle >= 1.8.0
paddlehub >= 1.7.1
# -*- coding:utf-8 -*-
import os
import time
from collections import OrderedDict
import cv2
import numpy as np
from PIL import Image
__all__ = ['reader']
def reader(images=None, paths=None):
"""
Preprocess to yield image.
Args:
images (list(numpy.ndarray)): images data, shape of each is [H, W, C]
paths (list[str]): paths to images.
Yield:
each (collections.OrderedDict): info of original image, preprocessed image.
"""
component = list()
if paths:
for im_path in paths:
each = OrderedDict()
assert os.path.isfile(im_path), "The {} isn't a valid file path.".format(im_path)
im = cv2.imread(im_path).astype('float32')
each['org_im'] = im
each['org_im_path'] = im_path
each['org_im_shape'] = im.shape
component.append(each)
if images is not None:
assert type(images) is list, "images should be a list."
for im in images:
im = im.astype(np.float32)
each = OrderedDict()
each['org_im'] = im
each['org_im_path'] = 'ndarray_time={}'.format(round(time.time(), 6) * 1e6)
each['org_im_shape'] = im.shape
component.append(each)
for element in component:
img = element['org_im'].copy()
img = cv2.cvtColor(img, cv2.COLOR_BGR2YUV)
shape = img.shape
img_scale = cv2.resize(img, (shape[1] * 2, shape[0] * 2), interpolation=cv2.INTER_CUBIC)
img_y = np.expand_dims(img[:, :, 0], axis=2)
img_scale_pbpr = img_scale[..., 1:]
img_y = img_y.transpose((2, 0, 1)) / 255
img_scale_pbpr = img_scale_pbpr.transpose(2, 0, 1) / 255
element['img_y'] = img_y
element['img_scale_pbpr'] = img_scale_pbpr
yield element
if __name__ == "__main__":
path = ['BSD100_001.png']
reader(paths=path)
# -*- coding:utf-8 -*-
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ast
import os
import argparse
import numpy as np
import paddle.fluid as fluid
import paddlehub as hub
from paddle.fluid.core import PaddleTensor, AnalysisConfig, create_paddle_predictor
from paddlehub.module.module import moduleinfo, runnable, serving
from falsr_a.data_feed import reader
from falsr_a.processor import postprocess, base64_to_cv2, cv2_to_base64, check_dir
@moduleinfo(
name="falsr_a",
type="CV/image_editing",
author="paddlepaddle",
author_email="",
summary="falsr_a is a super resolution model.",
version="1.0.0")
class Falsr_A(hub.Module):
def _initialize(self):
self.default_pretrained_model_path = os.path.join(self.directory, "falsr_a_model")
self._set_config()
def _set_config(self):
"""
predictor config setting
"""
self.model_file_path = self.default_pretrained_model_path
cpu_config = AnalysisConfig(self.model_file_path)
cpu_config.disable_glog_info()
cpu_config.disable_gpu()
self.cpu_predictor = create_paddle_predictor(cpu_config)
try:
_places = os.environ["CUDA_VISIBLE_DEVICES"]
int(_places[0])
use_gpu = True
except:
use_gpu = False
if use_gpu:
gpu_config = AnalysisConfig(self.model_file_path)
gpu_config.disable_glog_info()
gpu_config.enable_use_gpu(memory_pool_init_size_mb=1000, device_id=0)
self.gpu_predictor = create_paddle_predictor(gpu_config)
def reconstruct(self, images=None, paths=None, use_gpu=False, visualization=False, output_dir="falsr_a_output"):
"""
API for super resolution.
Args:
images (list(numpy.ndarray)): images data, shape of each is [H, W, C], the color space is BGR.
paths (list[str]): The paths of images.
use_gpu (bool): Whether to use gpu.
visualization (bool): Whether to save image or not.
output_dir (str): The path to store output images.
Returns:
res (list[dict]): each element in the list is a dict, the keys and values are:
save_path (str, optional): the path to save images. (Exists only if visualization is True)
data (numpy.ndarray): data of post processed image.
"""
if use_gpu:
try:
_places = os.environ["CUDA_VISIBLE_DEVICES"]
int(_places[0])
except:
raise RuntimeError(
"Environment Variable CUDA_VISIBLE_DEVICES is not set correctly. If you wanna use gpu, please set CUDA_VISIBLE_DEVICES as cuda_device_id."
)
all_data = list()
for yield_data in reader(images, paths):
all_data.append(yield_data)
total_num = len(all_data)
res = list()
for i in range(total_num):
image_y = np.array([all_data[i]['img_y']])
image_scale_pbpr = np.array([all_data[i]['img_scale_pbpr']])
image_y = PaddleTensor(image_y.copy())
image_scale_pbpr = PaddleTensor(image_scale_pbpr.copy())
output = self.gpu_predictor.run([image_y, image_scale_pbpr]) if use_gpu else self.cpu_predictor.run(
[image_y, image_scale_pbpr])
output = np.expand_dims(output[0].as_ndarray(), axis=1)
out = postprocess(
data_out=output,
org_im=all_data[i]['org_im'],
org_im_shape=all_data[i]['org_im_shape'],
org_im_path=all_data[i]['org_im_path'],
output_dir=output_dir,
visualization=visualization)
res.append(out)
return res
def save_inference_model(self,
dirname='falsr_a_save_model',
model_filename=None,
params_filename=None,
combined=False):
if combined:
model_filename = "__model__" if not model_filename else model_filename
params_filename = "__params__" if not params_filename else params_filename
place = fluid.CPUPlace()
exe = fluid.Executor(place)
program, feeded_var_names, target_vars = fluid.io.load_inference_model(
dirname=self.default_pretrained_model_path, executor=exe)
fluid.io.save_inference_model(
dirname=dirname,
main_program=program,
executor=exe,
feeded_var_names=feeded_var_names,
target_vars=target_vars,
model_filename=model_filename,
params_filename=params_filename)
@serving
def serving_method(self, images, **kwargs):
"""
Run as a service.
"""
images_decode = [base64_to_cv2(image) for image in images]
results = self.reconstruct(images=images_decode, **kwargs)
results = [{'data': cv2_to_base64(result['data'])} for result in results]
return results
@runnable
def run_cmd(self, argvs):
"""
Run as a command.
"""
self.parser = argparse.ArgumentParser(
description="Run the {} module.".format(self.name),
prog='hub run {}'.format(self.name),
usage='%(prog)s',
add_help=True)
self.arg_input_group = self.parser.add_argument_group(title="Input options", description="Input data. Required")
self.arg_config_group = self.parser.add_argument_group(
title="Config options", description="Run configuration for controlling module behavior, not required.")
self.add_module_config_arg()
self.add_module_input_arg()
args = self.parser.parse_args(argvs)
results = self.reconstruct(
paths=[args.input_path], use_gpu=args.use_gpu, output_dir=args.output_dir, visualization=args.visualization)
if args.save_dir is not None:
check_dir(args.save_dir)
self.save_inference_model(args.save_dir)
return results
def add_module_config_arg(self):
"""
Add the command config options.
"""
self.arg_config_group.add_argument(
'--use_gpu', type=ast.literal_eval, default=False, help="whether use GPU or not")
self.arg_config_group.add_argument(
'--output_dir', type=str, default='falsr_a_output', help="The directory to save output images.")
self.arg_config_group.add_argument(
'--save_dir', type=str, default='falsr_a_save_model', help="The directory to save model.")
self.arg_config_group.add_argument(
'--visualization', type=ast.literal_eval, default=True, help="whether to save output as images.")
def add_module_input_arg(self):
"""
Add the command input options.
"""
self.arg_input_group.add_argument('--input_path', type=str, help="path to image.")
if __name__ == "__main__":
module = Falsr_A()
module.reconstruct(paths=["BSD100_001.png", "BSD100_002.png", "Set5_003.png"])
module.save_inference_model()
# -*- coding:utf-8 -*-
import os
import time
import base64
import cv2
import numpy as np
__all__ = ['cv2_to_base64', 'base64_to_cv2', 'postprocess']
def cv2_to_base64(image):
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tostring()).decode('utf8')
def base64_to_cv2(b64str):
data = base64.b64decode(b64str.encode('utf8'))
data = np.fromstring(data, np.uint8)
data = cv2.imdecode(data, cv2.IMREAD_COLOR)
return data
def postprocess(data_out, org_im, org_im_shape, org_im_path, output_dir, visualization):
"""
Postprocess output of network. one image at a time.
Args:
data_out (numpy.ndarray): output of network.
org_im (numpy.ndarray): original image.
org_im_shape (list): shape pf original image.
org_im_path (list): path of riginal image.
output_dir (str): output directory to store image.
visualization (bool): whether to save image or not.
Returns:
result (dict): The data of processed image.
"""
result = dict()
for sr in data_out:
sr = np.squeeze(sr, 0)
sr = np.clip(sr * 255, 0, 255)
sr = sr.astype(np.uint8)
sr = cv2.cvtColor(sr, cv2.COLOR_RGB2BGR)
if visualization:
check_dir(output_dir)
save_im_path = get_save_image_name(org_im, org_im_path, output_dir)
cv2.imwrite(save_im_path, sr)
print("save image at: ", save_im_path)
result['save_path'] = save_im_path
result['data'] = sr
else:
result['data'] = sr
return result
def check_dir(dir_path):
if not os.path.exists(dir_path):
os.makedirs(dir_path)
elif os.path.isfile(dir_path):
os.remove(dir_path)
os.makedirs(dir_path)
def get_save_image_name(org_im, org_im_path, output_dir):
"""
Get save image name from source image path.
"""
# name prefix of orginal image
org_im_name = os.path.split(org_im_path)[-1]
im_prefix = os.path.splitext(org_im_name)[0]
ext = '.png'
# save image path
save_im_path = os.path.join(output_dir, im_prefix + ext)
if os.path.exists(save_im_path):
save_im_path = os.path.join(output_dir, im_prefix + 'time={}'.format(int(time.time())) + ext)
return save_im_path
## 模型概述
falsr_b是基于Fast, Accurate and Lightweight Super-Resolution with Neural Architecture Search设计的轻量化超分辨模型。falsr_b较falsr_a更轻量化。该模型使用多目标方法处理超分问题,同时使用基于混合控制器的弹性搜索策略来提升模型性能。该模型提供的超分倍数为2倍。
## 命令行预测
```
$ hub run falsr_b --input_path "/PATH/TO/IMAGE"
```
## API
```python
def reconstruct(self,
images=None,
paths=None,
use_gpu=False,
visualization=True,
output_dir="falsr_b_output")
```
预测API,用于图像超分辨率。
**参数**
* images (list\[numpy.ndarray\]): 图片数据,ndarray.shape 为 \[H, W, C\],BGR格式;
* paths (list\[str\]): 图片的路径;
* use\_gpu (bool): 是否使用 GPU预测,如果使用GPU预测,则在预测之前,请设置CUDA_VISIBLE_DEVICES环境变量,否则不用设置;
* visualization (bool): 是否将识别结果保存为图片文件;
* output\_dir (str): 图片的保存路径。
**返回**
* res (list\[dict\]): 识别结果的列表,列表中每一个元素为 dict,关键字有 'save\_path', 'data',对应的取值为:
* save\_path (str, optional): 可视化图片的保存路径(仅当visualization=True时存在);
* data (numpy.ndarray): 超分辨后图像。
```python
def save_inference_model(self,
dirname='falsr_b_save_model',
model_filename=None,
params_filename=None,
combined=False)
```
将模型保存到指定路径。
**参数**
* dirname: 存在模型的目录名称
* model\_filename: 模型文件名称,默认为\_\_model\_\_
* params\_filename: 参数文件名称,默认为\_\_params\_\_(仅当`combined`为True时生效)
* combined: 是否将参数保存到统一的一个文件中
## 代码示例
```python
import cv2
import paddlehub as hub
sr_model = hub.Module(name='falsr_b')
im = cv2.imread('/PATH/TO/IMAGE').astype('float32')
#visualization=True可以用于查看超分图片效果,可设置为False提升运行速度。
res = sr_model.reconstruct(images=[im], visualization=True)
print(res[0]['data'])
sr_model.save_inference_model()
```
## 服务部署
PaddleHub Serving可以部署一个图像超分的在线服务。
## 第一步:启动PaddleHub Serving
运行启动命令:
```shell
$ hub serving start -m falsr_b
```
这样就完成了一个超分任务的服务化API的部署,默认端口号为8866。
**NOTE:** 如使用GPU预测,则需要在启动服务之前,设置CUDA_VISIBLE_DEVICES环境变量,否则不用设置。
## 第二步:发送预测请求
配置好服务端,以下数行代码即可实现发送预测请求,获取预测结果
```python
import requests
import json
import base64
import cv2
import numpy as np
def cv2_to_base64(image):
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tostring()).decode('utf8')
def base64_to_cv2(b64str):
data = base64.b64decode(b64str.encode('utf8'))
data = np.fromstring(data, np.uint8)
data = cv2.imdecode(data, cv2.IMREAD_COLOR)
return data
# 发送HTTP请求
org_im = cv2.imread('/PATH/TO/IMAGE')
data = {'images':[cv2_to_base64(org_im)]}
headers = {"Content-type": "application/json"}
url = "http://127.0.0.1:8866/predict/falsr_b"
r = requests.post(url=url, headers=headers, data=json.dumps(data))
sr = base64_to_cv2(r.json()["results"][0]['data'])
cv2.imwrite('falsr_b_X2.png', sr)
print("save image as falsr_b_X2.png")
```
### 查看代码
https://github.com/xiaomi-automl/FALSR
### 依赖
paddlepaddle >= 1.8.0
paddlehub >= 1.7.1
# -*- coding:utf-8 -*-
import os
import time
from collections import OrderedDict
import cv2
import numpy as np
from PIL import Image
__all__ = ['reader']
def reader(images=None, paths=None):
"""
Preprocess to yield image.
Args:
images (list(numpy.ndarray)): images data, shape of each is [H, W, C]
paths (list[str]): paths to images.
Yield:
each (collections.OrderedDict): info of original image, preprocessed image.
"""
component = list()
if paths:
for im_path in paths:
each = OrderedDict()
assert os.path.isfile(im_path), "The {} isn't a valid file path.".format(im_path)
im = cv2.imread(im_path).astype('float32')
each['org_im'] = im
each['org_im_path'] = im_path
each['org_im_shape'] = im.shape
component.append(each)
if images is not None:
assert type(images) is list, "images should be a list."
for im in images:
im = im.astype(np.float32)
each = OrderedDict()
each['org_im'] = im
each['org_im_path'] = 'ndarray_time={}'.format(round(time.time(), 6) * 1e6)
each['org_im_shape'] = im.shape
component.append(each)
for element in component:
img = element['org_im'].copy()
img = cv2.cvtColor(img, cv2.COLOR_BGR2YUV)
shape = img.shape
img_scale = cv2.resize(img, (shape[1] * 2, shape[0] * 2), interpolation=cv2.INTER_CUBIC)
img_y = np.expand_dims(img[:, :, 0], axis=2)
img_scale_pbpr = img_scale[..., 1:]
img_y = img_y.transpose((2, 0, 1)) / 255
img_scale_pbpr = img_scale_pbpr.transpose(2, 0, 1) / 255
element['img_y'] = img_y
element['img_scale_pbpr'] = img_scale_pbpr
yield element
if __name__ == "__main__":
path = ['BSD100_001.png']
reader(paths=path)
# -*- coding:utf-8 -*-
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ast
import os
import argparse
import numpy as np
import paddle.fluid as fluid
import paddlehub as hub
from paddle.fluid.core import PaddleTensor, AnalysisConfig, create_paddle_predictor
from paddlehub.module.module import moduleinfo, runnable, serving
from falsr_b.data_feed import reader
from falsr_b.processor import postprocess, base64_to_cv2, cv2_to_base64, check_dir
@moduleinfo(
name="falsr_b",
type="CV/image_editing",
author="paddlepaddle",
author_email="",
summary="falsr_b is a super resolution model.",
version="1.0.0")
class Falsr_B(hub.Module):
def _initialize(self):
self.default_pretrained_model_path = os.path.join(self.directory, "falsr_b_model")
self._set_config()
def _set_config(self):
"""
predictor config setting
"""
self.model_file_path = self.default_pretrained_model_path
cpu_config = AnalysisConfig(self.model_file_path)
cpu_config.disable_glog_info()
cpu_config.disable_gpu()
self.cpu_predictor = create_paddle_predictor(cpu_config)
try:
_places = os.environ["CUDA_VISIBLE_DEVICES"]
int(_places[0])
use_gpu = True
except:
use_gpu = False
if use_gpu:
gpu_config = AnalysisConfig(self.model_file_path)
gpu_config.disable_glog_info()
gpu_config.enable_use_gpu(memory_pool_init_size_mb=1000, device_id=0)
self.gpu_predictor = create_paddle_predictor(gpu_config)
def reconstruct(self, images=None, paths=None, use_gpu=False, visualization=False, output_dir="falsr_b_output"):
"""
API for super resolution.
Args:
images (list(numpy.ndarray)): images data, shape of each is [H, W, C], the color space is BGR.
paths (list[str]): The paths of images.
use_gpu (bool): Whether to use gpu.
visualization (bool): Whether to save image or not.
output_dir (str): The path to store output images.
Returns:
res (list[dict]): each element in the list is a dict, the keys and values are:
save_path (str, optional): the path to save images. (Exists only if visualization is True)
data (numpy.ndarray): data of post processed image.
"""
if use_gpu:
try:
_places = os.environ["CUDA_VISIBLE_DEVICES"]
int(_places[0])
except:
raise RuntimeError(
"Environment Variable CUDA_VISIBLE_DEVICES is not set correctly. If you wanna use gpu, please set CUDA_VISIBLE_DEVICES as cuda_device_id."
)
all_data = list()
for yield_data in reader(images, paths):
all_data.append(yield_data)
total_num = len(all_data)
res = list()
for i in range(total_num):
image_y = np.array([all_data[i]['img_y']])
image_scale_pbpr = np.array([all_data[i]['img_scale_pbpr']])
image_y = PaddleTensor(image_y.copy())
image_scale_pbpr = PaddleTensor(image_scale_pbpr.copy())
output = self.gpu_predictor.run([image_y, image_scale_pbpr]) if use_gpu else self.cpu_predictor.run(
[image_y, image_scale_pbpr])
output = np.expand_dims(output[0].as_ndarray(), axis=1)
out = postprocess(
data_out=output,
org_im=all_data[i]['org_im'],
org_im_shape=all_data[i]['org_im_shape'],
org_im_path=all_data[i]['org_im_path'],
output_dir=output_dir,
visualization=visualization)
res.append(out)
return res
def save_inference_model(self,
dirname='falsr_b_save_model',
model_filename=None,
params_filename=None,
combined=False):
if combined:
model_filename = "__model__" if not model_filename else model_filename
params_filename = "__params__" if not params_filename else params_filename
place = fluid.CPUPlace()
exe = fluid.Executor(place)
program, feeded_var_names, target_vars = fluid.io.load_inference_model(
dirname=self.default_pretrained_model_path, executor=exe)
fluid.io.save_inference_model(
dirname=dirname,
main_program=program,
executor=exe,
feeded_var_names=feeded_var_names,
target_vars=target_vars,
model_filename=model_filename,
params_filename=params_filename)
@serving
def serving_method(self, images, **kwargs):
"""
Run as a service.
"""
images_decode = [base64_to_cv2(image) for image in images]
results = self.reconstruct(images=images_decode, **kwargs)
results = [{'data': cv2_to_base64(result['data'])} for result in results]
return results
@runnable
def run_cmd(self, argvs):
"""
Run as a command.
"""
self.parser = argparse.ArgumentParser(
description="Run the {} module.".format(self.name),
prog='hub run {}'.format(self.name),
usage='%(prog)s',
add_help=True)
self.arg_input_group = self.parser.add_argument_group(title="Input options", description="Input data. Required")
self.arg_config_group = self.parser.add_argument_group(
title="Config options", description="Run configuration for controlling module behavior, not required.")
self.add_module_config_arg()
self.add_module_input_arg()
args = self.parser.parse_args(argvs)
results = self.reconstruct(
paths=[args.input_path], use_gpu=args.use_gpu, output_dir=args.output_dir, visualization=args.visualization)
if args.save_dir is not None:
check_dir(args.save_dir)
self.save_inference_model(args.save_dir)
return results
def add_module_config_arg(self):
"""
Add the command config options.
"""
self.arg_config_group.add_argument(
'--use_gpu', type=ast.literal_eval, default=False, help="whether use GPU or not")
self.arg_config_group.add_argument(
'--output_dir', type=str, default='falsr_b_output', help="The directory to save output images.")
self.arg_config_group.add_argument(
'--save_dir', type=str, default='falsr_b_save_model', help="The directory to save model.")
self.arg_config_group.add_argument(
'--visualization', type=ast.literal_eval, default=True, help="whether to save output as images.")
def add_module_input_arg(self):
"""
Add the command input options.
"""
self.arg_input_group.add_argument('--input_path', type=str, help="path to image.")
if __name__ == "__main__":
module = Falsr_B()
module.reconstruct(paths=["BSD100_001.png", "BSD100_002.png", "Set5_003.png"])
module.save_inference_model()
# -*- coding:utf-8 -*-
import os
import time
import base64
import cv2
import numpy as np
__all__ = ['cv2_to_base64', 'base64_to_cv2', 'postprocess']
def cv2_to_base64(image):
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tostring()).decode('utf8')
def base64_to_cv2(b64str):
data = base64.b64decode(b64str.encode('utf8'))
data = np.fromstring(data, np.uint8)
data = cv2.imdecode(data, cv2.IMREAD_COLOR)
return data
def postprocess(data_out, org_im, org_im_shape, org_im_path, output_dir, visualization):
"""
Postprocess output of network. one image at a time.
Args:
data_out (numpy.ndarray): output of network.
org_im (numpy.ndarray): original image.
org_im_shape (list): shape pf original image.
org_im_path (list): path of riginal image.
output_dir (str): output directory to store image.
visualization (bool): whether to save image or not.
Returns:
result (dict): The data of processed image.
"""
result = dict()
for sr in data_out:
sr = np.squeeze(sr, 0)
sr = np.clip(sr * 255, 0, 255)
sr = sr.astype(np.uint8)
sr = cv2.cvtColor(sr, cv2.COLOR_RGB2BGR)
if visualization:
check_dir(output_dir)
save_im_path = get_save_image_name(org_im, org_im_path, output_dir)
cv2.imwrite(save_im_path, sr)
print("save image at: ", save_im_path)
result['save_path'] = save_im_path
result['data'] = sr
else:
result['data'] = sr
return result
def check_dir(dir_path):
if not os.path.exists(dir_path):
os.makedirs(dir_path)
elif os.path.isfile(dir_path):
os.remove(dir_path)
os.makedirs(dir_path)
def get_save_image_name(org_im, org_im_path, output_dir):
"""
Get save image name from source image path.
"""
# name prefix of orginal image
org_im_name = os.path.split(org_im_path)[-1]
im_prefix = os.path.splitext(org_im_name)[0]
ext = '.png'
# save image path
save_im_path = os.path.join(output_dir, im_prefix + ext)
if os.path.exists(save_im_path):
save_im_path = os.path.join(output_dir, im_prefix + 'time={}'.format(int(time.time())) + ext)
return save_im_path
## 模型概述
falsr_c是基于Fast, Accurate and Lightweight Super-Resolution with Neural Architecture Search设计的轻量化超分辨模型。该模型使用多目标方法处理超分问题,同时使用基于混合控制器的弹性搜索策略来提升模型性能。该模型提供的超分倍数为2倍。
## 命令行预测
```
$ hub run falsr_c --input_path "/PATH/TO/IMAGE"
```
## API
```python
def reconstruct(self,
images=None,
paths=None,
use_gpu=False,
visualization=False,
output_dir="falsr_c_output")
```
预测API,用于图像超分辨率。
**参数**
* images (list\[numpy.ndarray\]): 图片数据,ndarray.shape 为 \[H, W, C\],BGR格式;
* paths (list\[str\]): 图片的路径;
* use\_gpu (bool): 是否使用 GPU预测,如果使用GPU预测,则在预测之前,请设置CUDA_VISIBLE_DEVICES环境变量,否则不用设置;
* visualization (bool): 是否将识别结果保存为图片文件;
* output\_dir (str): 图片的保存路径。
**返回**
* res (list\[dict\]): 识别结果的列表,列表中每一个元素为 dict,关键字有 'save\_path', 'data',对应的取值为:
* save\_path (str, optional): 可视化图片的保存路径(仅当visualization=True时存在);
* data (numpy.ndarray): 超分辨后图像。
```python
def save_inference_model(self,
dirname='falsr_c_save_model',
model_filename=None,
params_filename=None,
combined=False)
```
将模型保存到指定路径。
**参数**
* dirname: 存在模型的目录名称
* model\_filename: 模型文件名称,默认为\_\_model\_\_
* params\_filename: 参数文件名称,默认为\_\_params\_\_(仅当`combined`为True时生效)
* combined: 是否将参数保存到统一的一个文件中
## 代码示例
```python
import cv2
import paddlehub as hub
sr_model = hub.Module(name='falsr_c')
im = cv2.imread('/PATH/TO/IMAGE').astype('float32')
#visualization=True可以用于查看超分图片效果,可设置为False提升运行速度。
res = sr_model.reconstruct(images=[im], visualization=True)
print(res[0]['data'])
sr_model.save_inference_model()
```
## 服务部署
PaddleHub Serving可以部署一个图像超分的在线服务。
## 第一步:启动PaddleHub Serving
运行启动命令:
```shell
$ hub serving start -m falsr_c
```
这样就完成了一个超分任务的服务化API的部署,默认端口号为8866。
**NOTE:** 如使用GPU预测,则需要在启动服务之前,设置CUDA_VISIBLE_DEVICES环境变量,否则不用设置。
## 第二步:发送预测请求
配置好服务端,以下数行代码即可实现发送预测请求,获取预测结果
```python
import requests
import json
import base64
import cv2
import numpy as np
def cv2_to_base64(image):
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tostring()).decode('utf8')
def base64_to_cv2(b64str):
data = base64.b64decode(b64str.encode('utf8'))
data = np.fromstring(data, np.uint8)
data = cv2.imdecode(data, cv2.IMREAD_COLOR)
return data
# 发送HTTP请求
org_im = cv2.imread('/PATH/TO/IMAGE')
data = {'images':[cv2_to_base64(org_im)]}
headers = {"Content-type": "application/json"}
url = "http://127.0.0.1:8866/predict/falsr_c"
r = requests.post(url=url, headers=headers, data=json.dumps(data))
sr = base64_to_cv2(r.json()["results"][0]['data'])
cv2.imwrite('falsr_c_X2.png', sr)
print("save image as falsr_c_X2.png")
```
### 查看代码
https://github.com/xiaomi-automl/FALSR
### 依赖
paddlepaddle >= 1.8.0
paddlehub >= 1.7.1
# -*- coding:utf-8 -*-
import os
import time
from collections import OrderedDict
import cv2
import numpy as np
from PIL import Image
__all__ = ['reader']
def reader(images=None, paths=None):
"""
Preprocess to yield image.
Args:
images (list(numpy.ndarray)): images data, shape of each is [H, W, C]
paths (list[str]): paths to images.
Yield:
each (collections.OrderedDict): info of original image, preprocessed image.
"""
component = list()
if paths:
for im_path in paths:
each = OrderedDict()
assert os.path.isfile(im_path), "The {} isn't a valid file path.".format(im_path)
im = cv2.imread(im_path).astype('float32')
each['org_im'] = im
each['org_im_path'] = im_path
each['org_im_shape'] = im.shape
component.append(each)
if images is not None:
assert type(images) is list, "images should be a list."
for im in images:
im = im.astype(np.float32)
each = OrderedDict()
each['org_im'] = im
each['org_im_path'] = 'ndarray_time={}'.format(round(time.time(), 6) * 1e6)
each['org_im_shape'] = im.shape
component.append(each)
for element in component:
img = element['org_im'].copy()
img = cv2.cvtColor(img, cv2.COLOR_BGR2YUV)
shape = img.shape
img_scale = cv2.resize(img, (shape[1] * 2, shape[0] * 2), interpolation=cv2.INTER_CUBIC)
img_y = np.expand_dims(img[:, :, 0], axis=2)
img_scale_pbpr = img_scale[..., 1:]
img_y = img_y.transpose((2, 0, 1)) / 255
img_scale_pbpr = img_scale_pbpr.transpose(2, 0, 1) / 255
element['img_y'] = img_y
element['img_scale_pbpr'] = img_scale_pbpr
yield element
if __name__ == "__main__":
path = ['BSD100_001.png']
reader(paths=path)
# -*- coding:utf-8 -*-
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ast
import os
import argparse
import numpy as np
import paddle.fluid as fluid
import paddlehub as hub
from paddle.fluid.core import PaddleTensor, AnalysisConfig, create_paddle_predictor
from paddlehub.module.module import moduleinfo, runnable, serving
from falsr_c.data_feed import reader
from falsr_c.processor import postprocess, base64_to_cv2, cv2_to_base64, check_dir
@moduleinfo(
name="falsr_c",
type="CV/image_editing",
author="paddlepaddle",
author_email="",
summary="falsr_c is a super resolution model.",
version="1.0.0")
class Falsr_C(hub.Module):
def _initialize(self):
self.default_pretrained_model_path = os.path.join(self.directory, "falsr_c_model")
self._set_config()
def _set_config(self):
"""
predictor config setting
"""
self.model_file_path = self.default_pretrained_model_path
cpu_config = AnalysisConfig(self.model_file_path)
cpu_config.disable_glog_info()
cpu_config.disable_gpu()
self.cpu_predictor = create_paddle_predictor(cpu_config)
try:
_places = os.environ["CUDA_VISIBLE_DEVICES"]
int(_places[0])
use_gpu = True
except:
use_gpu = False
if use_gpu:
gpu_config = AnalysisConfig(self.model_file_path)
gpu_config.disable_glog_info()
gpu_config.enable_use_gpu(memory_pool_init_size_mb=1000, device_id=0)
self.gpu_predictor = create_paddle_predictor(gpu_config)
def reconstruct(self, images=None, paths=None, use_gpu=False, visualization=False, output_dir="falsr_c_output"):
"""
API for super resolution.
Args:
images (list(numpy.ndarray)): images data, shape of each is [H, W, C], the color space is BGR.
paths (list[str]): The paths of images.
use_gpu (bool): Whether to use gpu.
visualization (bool): Whether to save image or not.
output_dir (str): The path to store output images.
Returns:
res (list[dict]): each element in the list is a dict, the keys and values are:
save_path (str, optional): the path to save images. (Exists only if visualization is True)
data (numpy.ndarray): data of post processed image.
"""
if use_gpu:
try:
_places = os.environ["CUDA_VISIBLE_DEVICES"]
int(_places[0])
except:
raise RuntimeError(
"Environment Variable CUDA_VISIBLE_DEVICES is not set correctly. If you wanna use gpu, please set CUDA_VISIBLE_DEVICES as cuda_device_id."
)
all_data = list()
for yield_data in reader(images, paths):
all_data.append(yield_data)
total_num = len(all_data)
res = list()
for i in range(total_num):
image_y = np.array([all_data[i]['img_y']])
image_scale_pbpr = np.array([all_data[i]['img_scale_pbpr']])
image_y = PaddleTensor(image_y.copy())
image_scale_pbpr = PaddleTensor(image_scale_pbpr.copy())
output = self.gpu_predictor.run([image_y, image_scale_pbpr]) if use_gpu else self.cpu_predictor.run(
[image_y, image_scale_pbpr])
output = np.expand_dims(output[0].as_ndarray(), axis=1)
out = postprocess(
data_out=output,
org_im=all_data[i]['org_im'],
org_im_shape=all_data[i]['org_im_shape'],
org_im_path=all_data[i]['org_im_path'],
output_dir=output_dir,
visualization=visualization)
res.append(out)
return res
def save_inference_model(self,
dirname='falsr_c_save_model',
model_filename=None,
params_filename=None,
combined=False):
if combined:
model_filename = "__model__" if not model_filename else model_filename
params_filename = "__params__" if not params_filename else params_filename
place = fluid.CPUPlace()
exe = fluid.Executor(place)
program, feeded_var_names, target_vars = fluid.io.load_inference_model(
dirname=self.default_pretrained_model_path, executor=exe)
fluid.io.save_inference_model(
dirname=dirname,
main_program=program,
executor=exe,
feeded_var_names=feeded_var_names,
target_vars=target_vars,
model_filename=model_filename,
params_filename=params_filename)
@serving
def serving_method(self, images, **kwargs):
"""
Run as a service.
"""
images_decode = [base64_to_cv2(image) for image in images]
results = self.reconstruct(images=images_decode, **kwargs)
results = [{'data': cv2_to_base64(result['data'])} for result in results]
return results
@runnable
def run_cmd(self, argvs):
"""
Run as a command.
"""
self.parser = argparse.ArgumentParser(
description="Run the {} module.".format(self.name),
prog='hub run {}'.format(self.name),
usage='%(prog)s',
add_help=True)
self.arg_input_group = self.parser.add_argument_group(title="Input options", description="Input data. Required")
self.arg_config_group = self.parser.add_argument_group(
title="Config options", description="Run configuration for controlling module behavior, not required.")
self.add_module_config_arg()
self.add_module_input_arg()
args = self.parser.parse_args(argvs)
results = self.reconstruct(
paths=[args.input_path], use_gpu=args.use_gpu, output_dir=args.output_dir, visualization=args.visualization)
if args.save_dir is not None:
check_dir(args.save_dir)
self.save_inference_model(args.save_dir)
return results
def add_module_config_arg(self):
"""
Add the command config options.
"""
self.arg_config_group.add_argument(
'--use_gpu', type=ast.literal_eval, default=False, help="whether use GPU or not")
self.arg_config_group.add_argument(
'--output_dir', type=str, default='falsr_c_output', help="The directory to save output images.")
self.arg_config_group.add_argument(
'--save_dir', type=str, default='falsr_c_save_model', help="The directory to save model.")
self.arg_config_group.add_argument(
'--visualization', type=ast.literal_eval, default=True, help="whether to save output as images.")
def add_module_input_arg(self):
"""
Add the command input options.
"""
self.arg_input_group.add_argument('--input_path', type=str, help="path to image.")
if __name__ == "__main__":
module = Falsr_C()
#module.reconstruct(paths=["BSD100_001.png","BSD100_002.png", "Set5_003.png"])
import cv2
img = cv2.imread("BSD100_001.png").astype('float32')
res = module.reconstruct(images=[img])
module.save_inference_model()
# -*- coding:utf-8 -*-
import os
import time
import base64
import cv2
import numpy as np
__all__ = ['cv2_to_base64', 'base64_to_cv2', 'postprocess']
def cv2_to_base64(image):
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tostring()).decode('utf8')
def base64_to_cv2(b64str):
data = base64.b64decode(b64str.encode('utf8'))
data = np.fromstring(data, np.uint8)
data = cv2.imdecode(data, cv2.IMREAD_COLOR)
return data
def postprocess(data_out, org_im, org_im_shape, org_im_path, output_dir, visualization):
"""
Postprocess output of network. one image at a time.
Args:
data_out (numpy.ndarray): output of network.
org_im (numpy.ndarray): original image.
org_im_shape (list): shape pf original image.
org_im_path (list): path of riginal image.
output_dir (str): output directory to store image.
visualization (bool): whether to save image or not.
Returns:
result (dict): The data of processed image.
"""
result = dict()
for sr in data_out:
sr = np.squeeze(sr, 0)
sr = np.clip(sr * 255, 0, 255)
sr = sr.astype(np.uint8)
sr = cv2.cvtColor(sr, cv2.COLOR_RGB2BGR)
if visualization:
check_dir(output_dir)
save_im_path = get_save_image_name(org_im, org_im_path, output_dir)
cv2.imwrite(save_im_path, sr)
print("save image at: ", save_im_path)
result['save_path'] = save_im_path
result['data'] = sr
else:
result['data'] = sr
print("result['data'] shape", result['data'].shape)
return result
def check_dir(dir_path):
if not os.path.exists(dir_path):
os.makedirs(dir_path)
elif os.path.isfile(dir_path):
os.remove(dir_path)
os.makedirs(dir_path)
def get_save_image_name(org_im, org_im_path, output_dir):
"""
Get save image name from source image path.
"""
# name prefix of orginal image
org_im_name = os.path.split(org_im_path)[-1]
im_prefix = os.path.splitext(org_im_name)[0]
ext = '.png'
# save image path
save_im_path = os.path.join(output_dir, im_prefix + ext)
if os.path.exists(save_im_path):
save_im_path = os.path.join(output_dir, im_prefix + 'time={}'.format(int(time.time())) + ext)
return save_im_path
## 模型概述
realsr是用于图像和视频超分模型,该模型基于Toward Real-World Single Image Super-Resolution: A New Benchmark and A New Mode,它能够将输入的图片和视频超分四倍。
## API 说明
```python
def predict(self, input):
```
超分API,得到超分后的图片或者视频。
**参数**
* input (str): 图片或者视频的路径;
**返回**
若输入是图片,返回值为:
* pred_img(np.ndarray): BGR图片数据;
* out_path(str): 保存图片路径。
若输入是视频,返回值为:
* frame_pattern_combined(str): 视频超分后单帧数据保存路径;
* vid_out_path(str): 视频保存路径。
```python
def run_image(self, img):
```
图像超分API, 得到超分后的图片。
**参数**
* img (str|np.ndarray): 图片路径或则BGR格式图片。
**返回**
* pred_img(np.ndarray): BGR图片数据;
```python
def run_video(self, video):
```
视频超分API, 得到超分后的视频。
**参数**
* video(str): 待处理视频路径。
**返回**
* frame_pattern_combined(str): 视频超分后单帧数据保存路径;
* vid_out_path(str): 视频保存路径。
## 预测代码示例
```python
import paddlehub as hub
model = hub.Module(name='realsr')
model.predict('/PATH/TO/IMAGE/OR/VIDEO')
```
## 服务部署
PaddleHub Serving可以部署一个在线照片超分服务。
## 第一步:启动PaddleHub Serving
运行启动命令:
```shell
$ hub serving start -m realsr
```
这样就完成了一个图像超分的在线服务API的部署,默认端口号为8866。
**NOTE:** 如使用GPU预测,则需要在启动服务之前,请设置CUDA\_VISIBLE\_DEVICES环境变量,否则不用设置。
## 第二步:发送预测请求
配置好服务端,以下数行代码即可实现发送预测请求,获取预测结果
```python
import requests
import json
import base64
import cv2
import numpy as np
def cv2_to_base64(image):
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tostring()).decode('utf8')
def base64_to_cv2(b64str):
data = base64.b64decode(b64str.encode('utf8'))
data = np.fromstring(data, np.uint8)
data = cv2.imdecode(data, cv2.IMREAD_COLOR)
return data
# 发送HTTP请求
org_im = cv2.imread('/PATH/TO/IMAGE')
data = {'images':cv2_to_base64(org_im)}
headers = {"Content-type": "application/json"}
url = "http://127.0.0.1:8866/predict/realsr"
r = requests.post(url=url, headers=headers, data=json.dumps(data))
img = base64_to_cv2(r.json()["results"])
cv2.imwrite('/PATH/TO/SAVE/IMAGE', img)
```
## 模型相关信息
### 模型代码
https://github.com/csjcai/RealSR
### 依赖
paddlepaddle >= 2.0.0rc
paddlehub >= 1.8.3
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import cv2
import glob
from tqdm import tqdm
import numpy as np
from PIL import Image
import paddle
import paddle.nn as nn
from paddlehub.module.module import moduleinfo, serving, Module
from realsr.rrdb import RRDBNet
import realsr.utils as U
@moduleinfo(
name="realsr",
type="CV/image_editing",
author="paddlepaddle",
author_email="",
summary="realsr is a super resolution model",
version="1.0.0")
class RealSRPredictor(Module):
def _initialize(self, output='output', weight_path=None, load_checkpoint: str = None):
#super(RealSRPredictor, self).__init__()
self.input = input
self.output = os.path.join(output, 'RealSR')
self.model = RRDBNet(3, 3, 64, 23)
if load_checkpoint is not None:
state_dict = paddle.load(load_checkpoint)
self.model.load_dict(state_dict)
print("load custom checkpoint success")
else:
checkpoint = os.path.join(self.directory, 'DF2K_JPEG.pdparams')
state_dict = paddle.load(checkpoint)
self.model.load_dict(state_dict)
print("load pretrained checkpoint success")
self.model.eval()
def norm(self, img):
img = np.array(img).transpose([2, 0, 1]).astype('float32') / 255.0
return img.astype('float32')
def denorm(self, img):
img = img.transpose((1, 2, 0))
return (img * 255).clip(0, 255).astype('uint8')
def run_image(self, img):
if isinstance(img, str):
ori_img = Image.open(img).convert('RGB')
elif isinstance(img, np.ndarray):
# ori_img = Image.fromarray(img).convert('RGB')
ori_img = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
elif isinstance(img, Image.Image):
ori_img = img
img = self.norm(ori_img)
x = paddle.to_tensor(img[np.newaxis, ...])
out = self.model(x)
pred_img = self.denorm(out.numpy()[0])
# pred_img = Image.fromarray(pred_img)
pred_img = cv2.cvtColor(pred_img, cv2.COLOR_RGB2BGR)
return pred_img
def run_video(self, video):
base_name = os.path.basename(video).split('.')[0]
output_path = os.path.join(self.output, base_name)
pred_frame_path = os.path.join(output_path, 'frames_pred')
if not os.path.exists(output_path):
os.makedirs(output_path)
if not os.path.exists(pred_frame_path):
os.makedirs(pred_frame_path)
cap = cv2.VideoCapture(video)
fps = cap.get(cv2.CAP_PROP_FPS)
out_path = U.video2frames(video, output_path)
frames = sorted(glob.glob(os.path.join(out_path, '*.png')))
for frame in tqdm(frames):
pred_img = self.run_image(frame)
pred_img = cv2.cvtColor(pred_img, cv2.COLOR_BGR2RGB)
pred_img = Image.fromarray(pred_img)
frame_name = os.path.basename(frame)
pred_img.save(os.path.join(pred_frame_path, frame_name))
frame_pattern_combined = os.path.join(pred_frame_path, '%08d.png')
vid_out_path = os.path.join(output_path, '{}_realsr_out.mp4'.format(base_name))
U.frames2video(frame_pattern_combined, vid_out_path, str(int(fps)))
print("save result at {}".format(vid_out_path))
return frame_pattern_combined, vid_out_path
def predict(self, input):
if not os.path.exists(self.output):
os.makedirs(self.output)
if not U.is_image(input):
return self.run_video(input)
else:
pred_img = self.run_image(input)
out_path = None
if self.output:
final = cv2.cvtColor(pred_img, cv2.COLOR_BGR2RGB)
final = Image.fromarray(final)
base_name = os.path.splitext(os.path.basename(input))[0]
out_path = os.path.join(self.output, base_name + '.png')
final.save(out_path)
print('save result at {}'.format(out_path))
return pred_img, out_path
@serving
def serving_method(self, images, **kwargs):
"""
Run as a service.
"""
images_decode = U.base64_to_cv2(images)
results = self.run_image(img=images_decode)
results = U.cv2_to_base64(results)
return results
import functools
import paddle
import paddle.nn as nn
import paddle.nn.functional as F
class Registry(object):
"""
The registry that provides name -> object mapping, to support third-party users' custom modules.
To create a registry (inside segmentron):
.. code-block:: python
BACKBONE_REGISTRY = Registry('BACKBONE')
To register an object:
.. code-block:: python
@BACKBONE_REGISTRY.register()
class MyBackbone():
...
Or:
.. code-block:: python
BACKBONE_REGISTRY.register(MyBackbone)
"""
def __init__(self, name):
"""
Args:
name (str): the name of this registry
"""
self._name = name
self._obj_map = {}
def _do_register(self, name, obj):
assert (name not in self._obj_map), "An object named '{}' was already registered in '{}' registry!".format(
name, self._name)
self._obj_map[name] = obj
def register(self, obj=None, name=None):
"""
Register the given object under the the name `obj.__name__`.
Can be used as either a decorator or not. See docstring of this class for usage.
"""
if obj is None:
# used as a decorator
def deco(func_or_class, name=name):
if name is None:
name = func_or_class.__name__
self._do_register(name, func_or_class)
return func_or_class
return deco
# used as a function call
if name is None:
name = obj.__name__
self._do_register(name, obj)
def get(self, name):
ret = self._obj_map.get(name)
if ret is None:
raise KeyError("No object named '{}' found in '{}' registry!".format(name, self._name))
return ret
class ResidualDenseBlock_5C(nn.Layer):
def __init__(self, nf=64, gc=32, bias=True):
super(ResidualDenseBlock_5C, self).__init__()
# gc: growth channel, i.e. intermediate channels
self.conv1 = nn.Conv2D(nf, gc, 3, 1, 1, bias_attr=bias)
self.conv2 = nn.Conv2D(nf + gc, gc, 3, 1, 1, bias_attr=bias)
self.conv3 = nn.Conv2D(nf + 2 * gc, gc, 3, 1, 1, bias_attr=bias)
self.conv4 = nn.Conv2D(nf + 3 * gc, gc, 3, 1, 1, bias_attr=bias)
self.conv5 = nn.Conv2D(nf + 4 * gc, nf, 3, 1, 1, bias_attr=bias)
self.lrelu = nn.LeakyReLU(negative_slope=0.2)
def forward(self, x):
x1 = self.lrelu(self.conv1(x))
x2 = self.lrelu(self.conv2(paddle.concat((x, x1), 1)))
x3 = self.lrelu(self.conv3(paddle.concat((x, x1, x2), 1)))
x4 = self.lrelu(self.conv4(paddle.concat((x, x1, x2, x3), 1)))
x5 = self.conv5(paddle.concat((x, x1, x2, x3, x4), 1))
return x5 * 0.2 + x
class RRDB(nn.Layer):
'''Residual in Residual Dense Block'''
def __init__(self, nf, gc=32):
super(RRDB, self).__init__()
self.RDB1 = ResidualDenseBlock_5C(nf, gc)
self.RDB2 = ResidualDenseBlock_5C(nf, gc)
self.RDB3 = ResidualDenseBlock_5C(nf, gc)
def forward(self, x):
out = self.RDB1(x)
out = self.RDB2(out)
out = self.RDB3(out)
return out * 0.2 + x
def make_layer(block, n_layers):
layers = []
for _ in range(n_layers):
layers.append(block())
return nn.Sequential(*layers)
GENERATORS = Registry("GENERATOR")
@GENERATORS.register()
class RRDBNet(nn.Layer):
def __init__(self, in_nc, out_nc, nf, nb, gc=32):
super(RRDBNet, self).__init__()
RRDB_block_f = functools.partial(RRDB, nf=nf, gc=gc)
self.conv_first = nn.Conv2D(in_nc, nf, 3, 1, 1, bias_attr=True)
self.RRDB_trunk = make_layer(RRDB_block_f, nb)
self.trunk_conv = nn.Conv2D(nf, nf, 3, 1, 1, bias_attr=True)
#### upsampling
self.upconv1 = nn.Conv2D(nf, nf, 3, 1, 1, bias_attr=True)
self.upconv2 = nn.Conv2D(nf, nf, 3, 1, 1, bias_attr=True)
self.HRconv = nn.Conv2D(nf, nf, 3, 1, 1, bias_attr=True)
self.conv_last = nn.Conv2D(nf, out_nc, 3, 1, 1, bias_attr=True)
self.lrelu = nn.LeakyReLU(negative_slope=0.2)
def forward(self, x):
fea = self.conv_first(x)
trunk = self.trunk_conv(self.RRDB_trunk(fea))
fea = fea + trunk
fea = self.lrelu(self.upconv1(F.interpolate(fea, scale_factor=2, mode='nearest')))
fea = self.lrelu(self.upconv2(F.interpolate(fea, scale_factor=2, mode='nearest')))
out = self.conv_last(self.lrelu(self.HRconv(fea)))
return out
import os
import sys
import base64
import cv2
from PIL import Image
import numpy as np
def video2frames(video_path, outpath, **kargs):
def _dict2str(kargs):
cmd_str = ''
for k, v in kargs.items():
cmd_str += (' ' + str(k) + ' ' + str(v))
return cmd_str
ffmpeg = ['ffmpeg ', ' -y -loglevel ', ' error ']
vid_name = video_path.split('/')[-1].split('.')[0]
out_full_path = os.path.join(outpath, vid_name)
if not os.path.exists(out_full_path):
os.makedirs(out_full_path)
# video file name
outformat = out_full_path + '/%08d.png'
cmd = ffmpeg
cmd = ffmpeg + [' -i ', video_path, ' -start_number ', ' 0 ', outformat]
cmd = ''.join(cmd) + _dict2str(kargs)
if os.system(cmd) != 0:
raise RuntimeError('ffmpeg process video: {} error'.format(vid_name))
sys.stdout.flush()
return out_full_path
def frames2video(frame_path, video_path, r):
ffmpeg = ['ffmpeg ', ' -y -loglevel ', ' error ']
cmd = ffmpeg + [' -r ', r, ' -f ', ' image2 ', ' -i ', frame_path, ' -pix_fmt ', ' yuv420p ', video_path]
cmd = ''.join(cmd)
if os.system(cmd) != 0:
raise RuntimeError('ffmpeg process video: {} error'.format(video_path))
sys.stdout.flush()
def is_image(input):
try:
img = Image.open(input)
_ = img.size
return True
except:
return False
def cv2_to_base64(image):
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tostring()).decode('utf8')
def base64_to_cv2(b64str):
data = base64.b64decode(b64str.encode('utf8'))
data = np.fromstring(data, np.uint8)
data = cv2.imdecode(data, cv2.IMREAD_COLOR)
return data
# stgan_bald
基于PaddleHub的秃头生成器
# 模型概述
秃头生成器(stgan_bald),该模型可自动根据图像生成1年、3年、5年的秃头效果。
# 模型效果:
详情请查看此链接:https://aistudio.baidu.com/aistudio/projectdetail/1145381
本模型为大家提供了小程序,欢迎大家体验
![image](https://github.com/1084667371/stgan_bald/blob/main/images/code.jpg)
# 选择模型版本进行安装
$ hub install stgan_bald==1.0.0
# Module API说明
def bald(self,
images=None,
paths=None,
use_gpu=False,
visualization=False):
秃头生成器API预测接口,预测输入一张人像,输出三张秃头效果(1年、3年、5年)
## 参数
images (list(numpy.ndarray)): 图像数据,每个图像的形状为[H,W,C],颜色空间为BGR。
paths (list[str]): 图像的路径。
use_gpu (bool): 是否使用gpu。
visualization (bool): 是否保存图像。
## 返回
data_0 ([numpy.ndarray]):秃头一年的预测结果图。
data_1 ([numpy.ndarray]):秃头三年的预测结果图。
data_2 ([numpy.ndarray]):秃头五年的预测结果图。
# API预测代码示例
import paddlehub as hub
import cv2
stgan_bald = hub.Module(name='stgan_bald')
im = cv2.imread('/PATH/TO/IMAGE')
res = stgan_bald.bald(images=[im],visualization=True)
# 服务部署
## 第一步:启动PaddleHub Serving
$ hub serving start -m stgan_bald
## 第二步:发送预测请求
import requests
import json
import base64
import cv2
import numpy as np
def cv2_to_base64(image):
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tostring()).decode('utf8')
def base64_to_cv2(b64str):
data = base64.b64decode(b64str.encode('utf8'))
data = np.fromstring(data, np.uint8)
data = cv2.imdecode(data, cv2.IMREAD_COLOR)
return data
# 发送HTTP请求
org_im = cv2.imread('/PATH/TO/IMAGE')
data = {'images':[cv2_to_base64(org_im)]}
headers = {"Content-type": "application/json"}
url = "http://127.0.0.1:8866/predict/stgan_bald"
r = requests.post(url=url, headers=headers, data=json.dumps(data))
# 保存图片 1年 3年 5年
one_year =cv2.cvtColor(base64_to_cv2(r.json()["results"]['data_0']), cv2.COLOR_RGB2BGR)
three_year =cv2.cvtColor(base64_to_cv2(r.json()["results"]['data_1']), cv2.COLOR_RGB2BGR)
five_year =cv2.cvtColor(base64_to_cv2(r.json()["results"]['data_2']), cv2.COLOR_RGB2BGR)
cv2.imwrite("stgan_bald_server.png", one_year)
# 贡献者
刘炫、彭兆帅、郑博培
# 依赖
paddlepaddle >= 1.8.2
paddlehub >= 1.8.0
# 查看代码
[基于PaddleHub的秃头生成器](https://github.com/PaddlePaddle/PaddleHub/tree/release/v1.8/hub_module/modules/image/gan/stgan_bald)
# -*- coding:utf-8 -*-
import os
import time
from collections import OrderedDict
from PIL import Image, ImageOps
import numpy as np
from PIL import Image
import cv2
__all__ = ['reader']
def reader(images=None, paths=None, org_labels=None, target_labels=None):
"""
Preprocess to yield image.
Args:
images (list(numpy.ndarray)): images data, shape of each is [H, W, C]
paths (list[str]): paths to images.
Yield:
each (collections.OrderedDict): info of original image, preprocessed image.
"""
component = list()
if paths:
for i, im_path in enumerate(paths):
each = OrderedDict()
assert os.path.isfile(im_path), "The {} isn't a valid file path.".format(im_path)
im = cv2.imread(im_path)
each['org_im'] = im
each['org_im_path'] = im_path
each['org_label'] = np.array(org_labels[i]).astype('float32')
if not target_labels:
each['target_label'] = np.array(org_labels[i]).astype('float32')
else:
each['target_label'] = np.array(target_labels[i]).astype('float32')
component.append(each)
if images is not None:
assert type(images) is list, "images should be a list."
for i, im in enumerate(images):
each = OrderedDict()
each['org_im'] = im
each['org_im_path'] = 'ndarray_time={}'.format(round(time.time(), 6) * 1e6)
each['org_label'] = np.array(org_labels[i]).astype('float32')
if not target_labels:
each['target_label'] = np.array(org_labels[i]).astype('float32')
else:
each['target_label'] = np.array(target_labels[i]).astype('float32')
component.append(each)
for element in component:
img = cv2.cvtColor(element['org_im'], cv2.COLOR_BGR2RGB)
img = cv2.resize(img, (128, 128), interpolation=cv2.INTER_LINEAR)
img = (img.astype('float32') / 255.0 - 0.5) / 0.5
img = img.transpose([2, 0, 1])
element['img'] = img[np.newaxis, :, :, :]
yield element
# -*- coding:utf-8 -*-
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ast
import os
import argparse
import copy
import numpy as np
import paddle.fluid as fluid
import paddlehub as hub
from paddle.fluid.core import PaddleTensor, AnalysisConfig, create_paddle_predictor
from paddlehub.module.module import moduleinfo, runnable, serving
from stgan_bald.data_feed import reader
from stgan_bald.processor import postprocess, base64_to_cv2, cv2_to_base64, check_dir
def check_attribute_conflict(label_batch):
''' Based on https://github.com/LynnHo/AttGAN-Tensorflow'''
attrs = "Bald,Bangs,Black_Hair,Blond_Hair,Brown_Hair,Bushy_Eyebrows,Eyeglasses,Male,Mouth_Slightly_Open,Mustache,No_Beard,Pale_Skin,Young".split(
',')
def _set(label, value, attr):
if attr in attrs:
label[attrs.index(attr)] = value
attr_id = attrs.index('Bald')
for label in label_batch:
if attrs[attr_id] != 0:
_set(label, 0, 'Bangs')
return label_batch
@moduleinfo(
name="stgan_bald",
version="1.0.0",
summary="Baldness generator",
author="Arrow, 七年期限,Mr.郑先生_",
author_email="1084667371@qq.com,2733821739@qq.com",
type="image/gan")
class StganBald(hub.Module):
def _initialize(self):
self.default_pretrained_model_path = os.path.join(self.directory, "module")
self._set_config()
def _set_config(self):
"""
predictor config setting
"""
self.model_file_path = os.path.join(self.default_pretrained_model_path, '__model__')
self.params_file_path = os.path.join(self.default_pretrained_model_path, '__params__')
cpu_config = AnalysisConfig(self.model_file_path, self.params_file_path)
cpu_config.disable_glog_info()
cpu_config.disable_gpu()
self.cpu_predictor = create_paddle_predictor(cpu_config)
try:
_places = os.environ["CUDA_VISIBLE_DEVICES"]
int(_places[0])
use_gpu = True
self.place = fluid.CUDAPlace(0)
except:
use_gpu = False
self.place = fluid.CPUPlace()
if use_gpu:
gpu_config = AnalysisConfig(self.model_file_path, self.params_file_path)
gpu_config.disable_glog_info()
gpu_config.enable_use_gpu(memory_pool_init_size_mb=1000, device_id=0)
self.gpu_predictor = create_paddle_predictor(gpu_config)
def bald(self,
images=None,
paths=None,
data=None,
use_gpu=False,
org_labels=[[0., 0., 1., 0., 0., 1., 1., 1., 0., 0., 0., 0., 1.]],
target_labels=None,
visualization=True,
output_dir="bald_output"):
"""
API for super resolution.
Args:
images (list(numpy.ndarray)): images data, shape of each is [H, W, C], the color space is BGR.
paths (list[str]): The paths of images.
data (dict): key is 'image', the corresponding value is the path to image.
use_gpu (bool): Whether to use gpu.
visualization (bool): Whether to save image or not.
output_dir (str): The path to store output images.
Returns:
res (list[dict]): each element in the list is a dict, the keys and values are:
save_path (str, optional): the path to save images. (Exists only if visualization is True)
data (numpy.ndarray): data of post processed image.
"""
if use_gpu:
try:
_places = os.environ["CUDA_VISIBLE_DEVICES"]
int(_places[0])
except:
raise RuntimeError(
"Environment Variable CUDA_VISIBLE_DEVICES is not set correctly. If you wanna use gpu, please set CUDA_VISIBLE_DEVICES as cuda_device_id."
)
if data and 'image' in data:
if paths is None:
paths = list()
paths += data['image']
all_data = list()
for yield_data in reader(images, paths, org_labels, target_labels):
all_data.append(yield_data)
total_num = len(all_data)
res = list()
outputs = []
for i in range(total_num):
image_np = all_data[i]['img']
org_label_np = [all_data[i]['org_label']]
target_label_np = [all_data[i]['target_label']]
for j in range(5):
if j % 2 == 0:
label_trg_tmp = copy.deepcopy(target_label_np)
new_i = 0
label_trg_tmp[0][new_i] = 1.0 - label_trg_tmp[0][new_i]
label_trg_tmp = check_attribute_conflict(label_trg_tmp)
change_num = j * 0.02 + 0.3
label_org_tmp = list(map(lambda x: ((x * 2) - 1) * change_num, org_label_np))
label_trg_tmp = list(map(lambda x: ((x * 2) - 1) * change_num, label_trg_tmp))
image = PaddleTensor(image_np.copy())
org_label = PaddleTensor(np.array(label_org_tmp).astype('float32'))
target_label = PaddleTensor(np.array(label_trg_tmp).astype('float32'))
output = self.gpu_predictor.run([
image, target_label, org_label
]) if use_gpu else self.cpu_predictor.run([image, org_label, target_label])
outputs.append(output)
out = postprocess(
data_out=outputs,
org_im=all_data[i]['org_im'],
org_im_path=all_data[i]['org_im_path'],
output_dir=output_dir,
visualization=visualization)
res.append(out)
return res
@serving
def serving_method(self, images, **kwargs):
"""
Run as a service.
"""
images_decode = [base64_to_cv2(image) for image in images]
results = self.bald(images=images_decode, **kwargs)
output = {}
for key, value in results[0].items():
output[key] = cv2_to_base64(value)
return output
# -*- coding:utf-8 -*-
import os
import time
import base64
import cv2
from PIL import Image
import numpy as np
__all__ = ['cv2_to_base64', 'base64_to_cv2', 'postprocess']
def cv2_to_base64(image):
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tostring()).decode('utf8')
def base64_to_cv2(b64str):
data = base64.b64decode(b64str.encode('utf8'))
data = np.fromstring(data, np.uint8)
data = cv2.imdecode(data, cv2.IMREAD_COLOR)
return data
def postprocess(data_out, org_im, org_im_path, output_dir, visualization, thresh=120):
"""
Postprocess output of network. one image at a time.
Args:
data_out (numpy.ndarray): output of network.
org_im (numpy.ndarray): original image.
org_im_shape (list): shape pf original image.
org_im_path (list): path of riginal image.
output_dir (str): output directory to store image.
visualization (bool): whether to save image or not.
thresh (float): threshold.
Returns:
result (dict): The data of processed image.
"""
result = dict()
for i, img in enumerate(data_out):
img = np.squeeze(img[0].as_ndarray(), 0).transpose((1, 2, 0))
img = ((img + 1) * 127.5).astype(np.uint8)
img = cv2.resize(img, (256, 341), cv2.INTER_CUBIC)
fake_image = Image.fromarray(img)
if visualization:
check_dir(output_dir)
save_im_path = get_save_image_name(org_im_path, output_dir, i)
img_name = '{}.png'.format(i)
fake_image.save(os.path.join(output_dir, img_name))
result['data_{}'.format(i)] = img
return result
def check_dir(dir_path):
if not os.path.exists(dir_path):
os.makedirs(dir_path)
elif os.path.isfile(dir_path):
os.remove(dir_path)
os.makedirs(dir_path)
def get_save_image_name(org_im_path, output_dir, num):
"""
Get save image name from source image path.
"""
# name prefix of orginal image
org_im_name = os.path.split(org_im_path)[-1]
im_prefix = os.path.splitext(org_im_name)[0]
ext = '.png'
# save image path
save_im_path = os.path.join(output_dir, im_prefix + ext)
if os.path.exists(save_im_path):
save_im_path = os.path.join(output_dir, im_prefix + str(num) + ext)
return save_im_path
## 模型概述
UGATIT 图像风格转换模型
模型可将输入的人脸图像转换成动漫风格
模型权重来自UGATIT-Paddle开源项目
模型所使用的权重为genA2B_1000000
模型详情请参考[UGATIT-Paddle开源项目](https://github.com/miraiwk/UGATIT-paddle)
## 模型安装
```shell
$hub install UGATIT_100w
```
## API 说明
```python
def style_transfer(
self,
images=None,
paths=None,
batch_size=1,
output_dir='output',
visualization=False
)
```
风格转换API,将输入的人脸图像转换成动漫风格。
转换效果图如下:
![输入图像](https://ai-studio-static-online.cdn.bcebos.com/d130fabd8bd34e53b2f942b3766eb6bbd3c19c0676d04abfbd5cc4b83b66f8b6)
![输出图像](https://ai-studio-static-online.cdn.bcebos.com/8538af03b3f14b1884fcf4eec48965baf939e35a783d40129085102057438c77)
**参数**
* images (list\[numpy.ndarray\]): 图片数据,ndarray.shape 为 \[H, W, C\],默认为 None;
* paths (list\[str\]): 图片的路径,默认为 None;
* batch\_size (int): batch 的大小,默认设为 1;
* visualization (bool): 是否将识别结果保存为图片文件,默认设为 False;
* output\_dir (str): 图片的保存路径,默认设为 output。
**返回**
* res (list\[numpy.ndarray\]): 输出图像数据,ndarray.shape 为 \[H, W, C\]
## 预测代码示例
```python
import cv2
import paddlehub as hub
# 模型加载
# use_gpu:是否使用GPU进行预测
model = hub.Module(name='UGATIT_100w', use_gpu=False)
# 模型预测
result = model.style_transfer(images=[cv2.imread('/PATH/TO/IMAGE')])
# or
# result = model.style_transfer(paths=['/PATH/TO/IMAGE'])
```
## 服务部署
PaddleHub Serving可以部署一个在线图像风格转换服务。
## 第一步:启动PaddleHub Serving
运行启动命令:
```shell
$ hub serving start -m UGATIT_100w
```
这样就完成了一个图像风格转换的在线服务API的部署,默认端口号为8866。
**NOTE:** 如使用GPU预测,则需要在启动服务之前,请设置CUDA\_VISIBLE\_DEVICES环境变量,否则不用设置。
## 第二步:发送预测请求
配置好服务端,以下数行代码即可实现发送预测请求,获取预测结果
```python
import requests
import json
import cv2
import base64
def cv2_to_base64(image):
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tostring()).decode('utf8')
# 发送HTTP请求
data = {'images':[cv2_to_base64(cv2.imread("/PATH/TO/IMAGE"))]}
headers = {"Content-type": "application/json"}
url = "http://127.0.0.1:8866/predict/UGATIT_100w"
r = requests.post(url=url, headers=headers, data=json.dumps(data))
# 打印预测结果
print(r.json()["results"])
```
## 模型相关信息
### 模型代码
https://github.com/miraiwk/UGATIT-paddle
### 依赖
paddlepaddle >= 1.8.0
paddlehub >= 1.8.0
import os
import numpy as np
from paddle.fluid.core import AnalysisConfig, create_paddle_predictor
__all__ = ['Model']
class Model():
# 初始化函数
def __init__(self, modelpath, use_gpu):
# 加载模型预测器
self.predictor = self.load_model(modelpath, use_gpu)
# 获取模型的输入输出
self.input_names = self.predictor.get_input_names()
self.output_names = self.predictor.get_output_names()
self.input_tensor = self.predictor.get_input_tensor(self.input_names[0])
self.output_tensor = self.predictor.get_output_tensor(self.output_names[0])
# 模型加载函数
def load_model(self, modelpath, use_gpu):
# 对运行位置进行配置
if use_gpu:
try:
places = os.environ["CUDA_VISIBLE_DEVICES"]
places = int(places[0])
except Exception as e:
print('Error: %s. Please set the environment variables "CUDA_VISIBLE_DEVICES".' % e)
use_gpu = False
# 加载模型参数
config = AnalysisConfig(modelpath)
# 设置参数
if use_gpu:
config.enable_use_gpu(100, places)
else:
config.disable_gpu()
config.disable_glog_info()
config.switch_ir_optim(True)
config.enable_memory_optim()
config.switch_use_feed_fetch_ops(False)
config.switch_specify_input_names(True)
# 通过参数加载模型预测器
predictor = create_paddle_predictor(config)
# 返回预测器
return predictor
# 模型预测函数
def predict(self, input_datas):
outputs = []
# 遍历输入数据进行预测
for input_data in input_datas:
inputs = input_data.copy()
self.input_tensor.copy_from_cpu(inputs)
self.predictor.zero_copy_run()
output = self.output_tensor.copy_to_cpu()
outputs.append(output)
# 预测结果合并
outputs = np.concatenate(outputs, 0)
# 返回预测结果
return outputs
import os
from paddlehub import Module
from paddlehub.module.module import moduleinfo, serving
from UGATIT_100w.model import Model
from UGATIT_100w.processor import base64_to_cv2, cv2_to_base64, Processor
@moduleinfo(
name="UGATIT_100w", # 模型名称
type="CV/style_transfer", # 模型类型
author="jm12138", # 作者名称
author_email="jm12138@qq.com", # 作者邮箱
summary="UGATIT_100w", # 模型介绍
version="1.0.0" # 版本号
)
class UGATIT_100w(Module):
# 初始化函数
def _initialize(self, use_gpu=False):
# 设置模型路径
self.model_path = os.path.join(self.directory, "UGATIT_100w")
# 加载模型
self.model = Model(self.model_path, use_gpu)
# 关键点检测函数
def style_transfer(self, images=None, paths=None, batch_size=1, output_dir='output', visualization=False):
# 加载数据处理器
processor = Processor(images, paths, output_dir, batch_size)
# 模型预测
outputs = self.model.predict(processor.input_datas)
# 结果后处理
results = processor.postprocess(outputs, visualization)
# 返回结果
return results
# Hub Serving
@serving
def serving_method(self, images, **kwargs):
# 获取输入数据
images_decode = [base64_to_cv2(image) for image in images]
# 图片风格转换
results = self.style_transfer(images_decode, **kwargs)
# 对输出图片进行编码
encodes = []
for result in results:
encode = cv2_to_base64(result)
encodes.append(encode)
# 返回结果
return encodes
import os
import cv2
import time
import base64
import numpy as np
__all__ = ['base64_to_cv2', 'cv2_to_base64', 'Processor']
def check_dir(dir_path):
# 目录检查函数
if not os.path.exists(dir_path):
os.makedirs(dir_path)
elif os.path.isfile(dir_path):
os.remove(dir_path)
os.makedirs(dir_path)
def base64_to_cv2(b64str):
# base64转cv2函数
data = base64.b64decode(b64str.encode('utf8'))
data = np.frombuffer(data, np.uint8)
data = cv2.imdecode(data, cv2.IMREAD_COLOR)
return data
def cv2_to_base64(image):
# cv2转base64函数
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tostring()).decode('utf8')
class Processor():
# 初始化函数
def __init__(self, images=None, paths=None, output_dir='output', batch_size=1):
# 变量设置
self.images = images
self.paths = paths
self.output_dir = output_dir
self.batch_size = batch_size
# 获取原始输入数据
self.datas = self.load_datas()
# 对原始输入数据进行预处理
self.input_datas = self.preprocess()
# 读取数据函数
def load_datas(self):
datas = []
# 读取数据列表
if self.paths is not None:
for im_path in self.paths:
assert os.path.isfile(im_path), "The {} isn't a valid file path.".format(im_path)
im = cv2.imread(im_path)
datas.append(im)
if self.images is not None:
datas = self.images
# 返回数据列表
return datas
# 数据预处理函数
def preprocess(self):
input_datas = []
# 数据预处理
for i, img in enumerate(self.datas):
# 图像缩放
img = cv2.resize(img, (256, 256))
# 归一化
img = (img.astype('float32') / 255.0 - 0.5) / 0.5
# 转置
img = img.transpose((2, 0, 1))
# 增加维度
img = np.expand_dims(img, axis=0)
# 加入输入数据列表
input_datas.append(img)
# 数据按batch_size切分
input_datas = np.concatenate(input_datas, 0)
split_num = len(self.datas) // self.batch_size + 1 if len(self.datas) % self.batch_size != 0 else len(
self.datas) // self.batch_size
input_datas = np.array_split(input_datas, split_num)
# 返回预处理完成的数据
return input_datas
def postprocess(self, outputs, visualization):
results = []
for im_id, output in enumerate(outputs):
# 图像后处理
img = (output * 0.5 + 0.5) * 255.
# 限幅
img = np.clip(img, 0, 255).astype(np.uint8)
# 转置
img = img.transpose((1, 2, 0))
# 可视化
if visualization:
# 检查输出目录
check_dir(self.output_dir)
# 写入输出图片
cv2.imwrite(os.path.join(self.output_dir, '%d_%d.jpg' % (im_id, time.time())), img)
results.append(img)
# 返回结果
return results
## 模型概述
UGATIT 图像风格转换模型
模型可将输入的人脸图像转换成动漫风格
模型权重来自UGATIT-Paddle开源项目
模型所使用的权重为genA2B_0835000
模型详情请参考[UGATIT-Paddle开源项目](https://github.com/miraiwk/UGATIT-paddle)
## 模型安装
```shell
$hub install UGATIT_83w
```
## API 说明
```python
def style_transfer(
self,
images=None,
paths=None,
batch_size=1,
output_dir='output',
visualization=False
)
```
风格转换API,将输入的人脸图像转换成动漫风格。
转换效果图如下:
![输入图像](https://ai-studio-static-online.cdn.bcebos.com/d130fabd8bd34e53b2f942b3766eb6bbd3c19c0676d04abfbd5cc4b83b66f8b6)
![输出图像](https://ai-studio-static-online.cdn.bcebos.com/78653331ee2d472b81ff5bbccd6a904a80d2c5208f9c42c789b4f09a1ef46332)
**参数**
* images (list\[numpy.ndarray\]): 图片数据,ndarray.shape 为 \[H, W, C\],默认为 None;
* paths (list\[str\]): 图片的路径,默认为 None;
* batch\_size (int): batch 的大小,默认设为 1;
* visualization (bool): 是否将识别结果保存为图片文件,默认设为 False;
* output\_dir (str): 图片的保存路径,默认设为 output。
**返回**
* res (list\[numpy.ndarray\]): 输出图像数据,ndarray.shape 为 \[H, W, C\]
## 预测代码示例
```python
import cv2
import paddlehub as hub
# 模型加载
# use_gpu:是否使用GPU进行预测
model = hub.Module('UGATIT_83w', use_gpu=False)
# 模型预测
result = model.style_transfer(images=[cv2.imread('/PATH/TO/IMAGE')])
# or
# result = model.style_transfer(paths=['/PATH/TO/IMAGE'])
```
## 服务部署
PaddleHub Serving可以部署一个在线图像风格转换服务。
## 第一步:启动PaddleHub Serving
运行启动命令:
```shell
$ hub serving start -m UGATIT_w83
```
这样就完成了一个图像风格转换的在线服务API的部署,默认端口号为8866。
**NOTE:** 如使用GPU预测,则需要在启动服务之前,请设置CUDA\_VISIBLE\_DEVICES环境变量,否则不用设置。
## 第二步:发送预测请求
配置好服务端,以下数行代码即可实现发送预测请求,获取预测结果
```python
import requests
import json
import cv2
import base64
def cv2_to_base64(image):
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tostring()).decode('utf8')
# 发送HTTP请求
data = {'images':[cv2_to_base64(cv2.imread("/PATH/TO/IMAGE"))]}
headers = {"Content-type": "application/json"}
url = "http://127.0.0.1:8866/predict/UGATIT_w83"
r = requests.post(url=url, headers=headers, data=json.dumps(data))
# 打印预测结果
print(r.json()["results"])
```
## 模型相关信息
### 模型代码
https://github.com/miraiwk/UGATIT-paddle
### 依赖
paddlepaddle >= 1.8.0
paddlehub >= 1.8.0
import os
import numpy as np
from paddle.fluid.core import AnalysisConfig, create_paddle_predictor
__all__ = ['Model']
class Model():
# 初始化函数
def __init__(self, modelpath, use_gpu):
# 加载模型预测器
self.predictor = self.load_model(modelpath, use_gpu)
# 获取模型的输入输出
self.input_names = self.predictor.get_input_names()
self.output_names = self.predictor.get_output_names()
self.input_tensor = self.predictor.get_input_tensor(self.input_names[0])
self.output_tensor = self.predictor.get_output_tensor(self.output_names[0])
# 模型加载函数
def load_model(self, modelpath, use_gpu):
# 对运行位置进行配置
if use_gpu:
try:
places = os.environ["CUDA_VISIBLE_DEVICES"]
places = int(places[0])
except Exception as e:
print('Error: %s. Please set the environment variables "CUDA_VISIBLE_DEVICES".' % e)
use_gpu = False
# 加载模型参数
config = AnalysisConfig(modelpath)
# 设置参数
if use_gpu:
config.enable_use_gpu(100, places)
else:
config.disable_gpu()
config.disable_glog_info()
config.switch_ir_optim(True)
config.enable_memory_optim()
config.switch_use_feed_fetch_ops(False)
config.switch_specify_input_names(True)
# 通过参数加载模型预测器
predictor = create_paddle_predictor(config)
# 返回预测器
return predictor
# 模型预测函数
def predict(self, input_datas):
outputs = []
# 遍历输入数据进行预测
for input_data in input_datas:
inputs = input_data.copy()
self.input_tensor.copy_from_cpu(inputs)
self.predictor.zero_copy_run()
output = self.output_tensor.copy_to_cpu()
outputs.append(output)
# 预测结果合并
outputs = np.concatenate(outputs, 0)
# 返回预测结果
return outputs
import os
from paddlehub import Module
from paddlehub.module.module import moduleinfo, serving
from UGATIT_83w.model import Model
from UGATIT_83w.processor import base64_to_cv2, cv2_to_base64, Processor
@moduleinfo(
name="UGATIT_83w", # 模型名称
type="CV/style_transfer", # 模型类型
author="jm12138", # 作者名称
author_email="jm12138@qq.com", # 作者邮箱
summary="UGATIT", # 模型介绍
version="1.0.0" # 版本号
)
class UGATIT_83w(Module):
# 初始化函数
def _initialize(self, use_gpu=False):
# 设置模型路径
self.model_path = os.path.join(self.directory, "UGATIT_83w")
# 加载模型
self.model = Model(self.model_path, use_gpu)
# 关键点检测函数
def style_transfer(self, images=None, paths=None, batch_size=1, output_dir='output', visualization=False):
# 加载数据处理器
processor = Processor(images, paths, output_dir, batch_size)
# 模型预测
outputs = self.model.predict(processor.input_datas)
# 结果后处理
results = processor.postprocess(outputs, visualization)
# 返回结果
return results
# Hub Serving
@serving
def serving_method(self, images, **kwargs):
# 获取输入数据
images_decode = [base64_to_cv2(image) for image in images]
# 图片风格转换
results = self.style_transfer(images_decode, **kwargs)
# 对输出图片进行编码
encodes = []
for result in results:
encode = cv2_to_base64(result)
encodes.append(encode)
# 返回结果
return encodes
import os
import cv2
import time
import base64
import numpy as np
__all__ = ['base64_to_cv2', 'cv2_to_base64', 'Processor']
def check_dir(dir_path):
# 目录检查函数
if not os.path.exists(dir_path):
os.makedirs(dir_path)
elif os.path.isfile(dir_path):
os.remove(dir_path)
os.makedirs(dir_path)
def base64_to_cv2(b64str):
# base64转cv2函数
data = base64.b64decode(b64str.encode('utf8'))
data = np.frombuffer(data, np.uint8)
data = cv2.imdecode(data, cv2.IMREAD_COLOR)
return data
def cv2_to_base64(image):
# cv2转base64函数
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tostring()).decode('utf8')
class Processor():
# 初始化函数
def __init__(self, images=None, paths=None, output_dir='output', batch_size=1):
# 变量设置
self.images = images
self.paths = paths
self.output_dir = output_dir
self.batch_size = batch_size
# 获取原始输入数据
self.datas = self.load_datas()
# 对原始输入数据进行预处理
self.input_datas = self.preprocess()
# 读取数据函数
def load_datas(self):
datas = []
# 读取数据列表
if self.paths is not None:
for im_path in self.paths:
assert os.path.isfile(im_path), "The {} isn't a valid file path.".format(im_path)
im = cv2.imread(im_path)
datas.append(im)
if self.images is not None:
datas = self.images
# 返回数据列表
return datas
# 数据预处理函数
def preprocess(self):
input_datas = []
# 数据预处理
for i, img in enumerate(self.datas):
# 图像缩放
img = cv2.resize(img, (256, 256))
# 归一化
img = (img.astype('float32') / 255.0 - 0.5) / 0.5
# 转置
img = img.transpose((2, 0, 1))
# 增加维度
img = np.expand_dims(img, axis=0)
# 加入输入数据列表
input_datas.append(img)
# 数据按batch_size切分
input_datas = np.concatenate(input_datas, 0)
split_num = len(self.datas) // self.batch_size + 1 if len(self.datas) % self.batch_size != 0 else len(
self.datas) // self.batch_size
input_datas = np.array_split(input_datas, split_num)
# 返回预处理完成的数据
return input_datas
def postprocess(self, outputs, visualization):
results = []
for im_id, output in enumerate(outputs):
# 图像后处理
img = (output * 0.5 + 0.5) * 255.
# 限幅
img = np.clip(img, 0, 255).astype(np.uint8)
# 转置
img = img.transpose((1, 2, 0))
# 可视化
if visualization:
# 检查输出目录
check_dir(self.output_dir)
# 写入输出图片
cv2.imwrite(os.path.join(self.output_dir, '%d_%d.jpg' % (im_id, time.time())), img)
results.append(img)
# 返回结果
return results
import os
import numpy as np
from paddle.fluid.core import AnalysisConfig, create_paddle_predictor
__all__ = ['Model']
class Model():
# 初始化函数
def __init__(self, modelpath, use_gpu):
# 加载模型预测器
self.predictor = self.load_model(modelpath, use_gpu)
# 获取模型的输入输出
self.input_names = self.predictor.get_input_names()
self.output_names = self.predictor.get_output_names()
self.input_tensor = self.predictor.get_input_tensor(self.input_names[0])
self.output_tensor = self.predictor.get_output_tensor(self.output_names[0])
# 模型加载函数
def load_model(self, modelpath, use_gpu):
# 对运行位置进行配置
if use_gpu:
try:
places = os.environ["CUDA_VISIBLE_DEVICES"]
places = int(places[0])
except Exception as e:
print('Error: %s. Please set the environment variables "CUDA_VISIBLE_DEVICES".' % e)
use_gpu = False
# 加载模型参数
config = AnalysisConfig(modelpath)
# 设置参数
if use_gpu:
config.enable_use_gpu(100, places)
else:
config.disable_gpu()
config.disable_glog_info()
config.switch_ir_optim(True)
config.enable_memory_optim()
config.switch_use_feed_fetch_ops(False)
config.switch_specify_input_names(True)
# 通过参数加载模型预测器
predictor = create_paddle_predictor(config)
# 返回预测器
return predictor
# 模型预测函数
def predict(self, input_datas):
outputs = []
# 遍历输入数据进行预测
for input_data in input_datas:
inputs = input_data.copy()
self.input_tensor.copy_from_cpu(inputs)
self.predictor.zero_copy_run()
output = self.output_tensor.copy_to_cpu()
outputs.append(output)
# 预测结果合并
outputs = np.concatenate(outputs, 0)
# 返回预测结果
return outputs
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册