未验证 提交 aef60376 编写于 作者: H haoyuying 提交者: GitHub

hrnet_w18_samll_v1_humanseg and shufflenet_humanseg (#778)

上级 b7ac690c
## 模型概述
HUmanSeg_lite是在ShuffleNetV2网络结构的基础上进行优化,进一步减小了网络规模,网络大小只有541K,量化后只有187K,适用于手机自拍人像分割,且能在移动端进行实时分割。
## 命令行预测
```
hub run humanseg_lite --input_path "/PATH/TO/IMAGE"
```
## API
```python
def segment(images=None,
paths=None,
batch_size=1,
use_gpu=False,
visualization=True,
output_dir='humanseg_output')
```
预测API,用于人像分割。
**参数**
* images (list\[numpy.ndarray\]): 图片数据,ndarray.shape 为 \[H, W, C\],BGR格式;
* paths (list\[str\]): 图片的路径;
* batch\_size (int): batch 的大小;
* use\_gpu (bool): 是否使用 GPU预测,如果使用GPU预测,则在预测之前,请设置CUDA_VISIBLE_DEVICES环境变量,否则不用设置;
* visualization (bool): 是否将识别结果保存为图片文件;
* output\_dir (str): 图片的保存路径。
**返回**
* res (list\[dict\]): 识别结果的列表,列表中每一个元素为 dict,关键字有 'save\_path', 'data',对应的取值为:
* save\_path (str, optional): 可视化图片的保存路径(仅当visualization=True时存在);
* data (numpy.ndarray): 人像分割结果,仅包含Alpha通道,取值为0-255 (0为全透明,255为不透明),也即取值越大的像素点越可能为人体,取值越小的像素点越可能为背景。
```python
def save_inference_model(dirname,
model_filename=None,
params_filename=None,
combined=True)
```
将模型保存到指定路径。
**参数**
* dirname: 存在模型的目录名称
* model\_filename: 模型文件名称,默认为\_\_model\_\_
* params\_filename: 参数文件名称,默认为\_\_params\_\_(仅当`combined`为True时生效)
* combined: 是否将参数保存到统一的一个文件中
## 代码示例
```python
import cv2
import paddlehub as hub
human_seg = hub.Module('humanseg_lite')
im = cv2.imread('/PATH/TO/IMAGE')
res = human_seg.segment(images=[im],visualization=True)
```
## 服务部署
PaddleHub Serving可以部署一个人像分割的在线服务。
## 第一步:启动PaddleHub Serving
运行启动命令:
```shell
$ hub serving start -m humanseg_lite
```
这样就完成了一个人像分割的服务化API的部署,默认端口号为8866。
**NOTE:** 如使用GPU预测,则需要在启动服务之前,设置CUDA\_VISIBLE\_DEVICES环境变量,否则不用设置。
## 第二步:发送预测请求
配置好服务端,以下数行代码即可实现发送预测请求,获取预测结果
```python
import requests
import json
import base64
import cv2
import numpy as np
def cv2_to_base64(image):
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tostring()).decode('utf8')
def base64_to_cv2(b64str):
data = base64.b64decode(b64str.encode('utf8'))
data = np.fromstring(data, np.uint8)
data = cv2.imdecode(data, cv2.IMREAD_COLOR)
return data
# 发送HTTP请求
data = {'images':[cv2_to_base64(cv2.imread('/PATH/TO/IMAGE'))]}
headers = {"Content-type": "application/json"}
url = "http://127.0.0.1:8866/predict/humanseg_lite"
r = requests.post(url=url, headers=headers, data=json.dumps(data))
# 打印预测结果
print(base64_to_cv2(r.json()["results"][0]['data']))
```
### 依赖
paddlepaddle >= 1.8.1
paddlehub >= 1.7.1
# -*- coding:utf-8 -*-
import os
import time
from collections import OrderedDict
import cv2
import numpy as np
from PIL import Image
__all__ = ['reader']
def reader(images=None, paths=None):
"""
Preprocess to yield image.
Args:
images (list(numpy.ndarray)): images data, shape of each is [H, W, C]
paths (list[str]): paths to images.
Yield:
each (collections.OrderedDict): info of original image, preprocessed image.
"""
component = list()
if paths:
for im_path in paths:
each = OrderedDict()
assert os.path.isfile(
im_path), "The {} isn't a valid file path.".format(im_path)
#print(im_path)
im = cv2.imread(im_path).astype('float32')
each['org_im'] = im
each['org_im_path'] = im_path
each['org_im_shape'] = im.shape
component.append(each)
if images is not None:
assert type(images) is list, "images should be a list."
for im in images:
each = OrderedDict()
each['org_im'] = im
each['org_im_path'] = 'ndarray_time={}'.format(
round(time.time(), 6) * 1e6)
each['org_im_shape'] = im.shape
component.append(each)
for element in component:
img = element['org_im'].copy()
img = cv2.resize(img, (192, 192)).astype(np.float32)
img_mean = np.array([0.5, 0.5, 0.5]).reshape((3, 1, 1))
img_std = np.array([0.5, 0.5, 0.5]).reshape((3, 1, 1))
img = img.transpose((2, 0, 1)) / 255
img -= img_mean
img /= img_std
element['image'] = img
yield element
# -*- coding:utf-8 -*-
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ast
import os
import argparse
import numpy as np
import paddle.fluid as fluid
import paddlehub as hub
from paddle.fluid.core import PaddleTensor, AnalysisConfig, create_paddle_predictor
from paddlehub.module.module import moduleinfo, runnable, serving
from humanseg_lite.processor import postprocess, base64_to_cv2, cv2_to_base64, check_dir
from humanseg_lite.data_feed import reader
@moduleinfo(
name="humanseg_lite",
type="CV/semantic_segmentation",
author="paddlepaddle",
author_email="",
summary="humanseg_lite is a semantic segmentation model.",
version="1.0.0")
class ShufflenetHumanSeg(hub.Module):
def _initialize(self):
self.default_pretrained_model_path = os.path.join(
self.directory, "humanseg_lite_inference")
self._set_config()
def _set_config(self):
"""
predictor config setting
"""
self.model_file_path = os.path.join(self.default_pretrained_model_path,
'__model__')
self.params_file_path = os.path.join(self.default_pretrained_model_path,
'__params__')
cpu_config = AnalysisConfig(self.model_file_path, self.params_file_path)
cpu_config.disable_glog_info()
cpu_config.disable_gpu()
self.cpu_predictor = create_paddle_predictor(cpu_config)
try:
_places = os.environ["CUDA_VISIBLE_DEVICES"]
int(_places[0])
use_gpu = True
except:
use_gpu = False
if use_gpu:
gpu_config = AnalysisConfig(self.model_file_path,
self.params_file_path)
gpu_config.disable_glog_info()
gpu_config.enable_use_gpu(
memory_pool_init_size_mb=1000, device_id=0)
self.gpu_predictor = create_paddle_predictor(gpu_config)
def segment(self,
images=None,
paths=None,
data=None,
batch_size=1,
use_gpu=False,
visualization=True,
output_dir='humanseg_output'):
"""
API for human segmentation.
Args:
images (list(numpy.ndarray)): images data, shape of each is [H, W, C], the color space is BGR.
paths (list[str]): The paths of images.
data (dict): key is 'image', the corresponding value is the path to image.
batch_size (int): batch size.
use_gpu (bool): Whether to use gpu.
visualization (bool): Whether to save image or not.
output_dir (str): The path to store output images.
Returns:
res (list[dict]): each element in the list is a dict, the keys and values are:
save_path (str, optional): the path to save images. (Exists only if visualization is True)
data (numpy.ndarray): data of post processed image.
"""
if use_gpu:
try:
_places = os.environ["CUDA_VISIBLE_DEVICES"]
int(_places[0])
except:
raise RuntimeError(
"Environment Variable CUDA_VISIBLE_DEVICES is not set correctly. If you wanna use gpu, please set CUDA_VISIBLE_DEVICES as cuda_device_id."
)
if data and 'image' in data:
if paths is None:
paths = list()
paths += data['image']
all_data = list()
for yield_data in reader(images, paths):
all_data.append(yield_data)
total_num = len(all_data)
loop_num = int(np.ceil(total_num / batch_size))
res = list()
for iter_id in range(loop_num):
batch_data = list()
handle_id = iter_id * batch_size
for image_id in range(batch_size):
try:
batch_data.append(all_data[handle_id + image_id])
except:
pass
# feed batch image
batch_image = np.array([data['image'] for data in batch_data])
batch_image = PaddleTensor(batch_image.copy())
output = self.gpu_predictor.run([
batch_image
]) if use_gpu else self.cpu_predictor.run([batch_image])
output = np.expand_dims(output[0].as_ndarray(), axis=1)
# postprocess one by one
for i in range(len(batch_data)):
out = postprocess(
data_out=output[i],
org_im=batch_data[i]['org_im'],
org_im_shape=batch_data[i]['org_im_shape'],
org_im_path=batch_data[i]['org_im_path'],
output_dir=output_dir,
visualization=visualization)
res.append(out)
return res
def save_inference_model(self,
dirname,
model_filename=None,
params_filename=None,
combined=True):
if combined:
model_filename = "__model__" if not model_filename else model_filename
params_filename = "__params__" if not params_filename else params_filename
place = fluid.CPUPlace()
exe = fluid.Executor(place)
program, feeded_var_names, target_vars = fluid.io.load_inference_model(
dirname=self.default_pretrained_model_path,
model_filename=model_filename,
params_filename=params_filename,
executor=exe)
fluid.io.save_inference_model(
dirname=dirname,
main_program=program,
executor=exe,
feeded_var_names=feeded_var_names,
target_vars=target_vars,
model_filename=model_filename,
params_filename=params_filename)
@serving
def serving_method(self, images, **kwargs):
"""
Run as a service.
"""
images_decode = [base64_to_cv2(image) for image in images]
results = self.segment(images=images_decode, **kwargs)
results = [{
'data': cv2_to_base64(result['data'])
} for result in results]
return results
@runnable
def run_cmd(self, argvs):
"""
Run as a command.
"""
self.parser = argparse.ArgumentParser(
description="Run the {} module.".format(self.name),
prog='hub run {}'.format(self.name),
usage='%(prog)s',
add_help=True)
self.arg_input_group = self.parser.add_argument_group(
title="Input options", description="Input data. Required")
self.arg_config_group = self.parser.add_argument_group(
title="Config options",
description=
"Run configuration for controlling module behavior, not required.")
self.add_module_config_arg()
self.add_module_input_arg()
args = self.parser.parse_args(argvs)
results = self.segment(
paths=[args.input_path],
batch_size=args.batch_size,
use_gpu=args.use_gpu,
output_dir=args.output_dir,
visualization=args.visualization)
if args.save_dir is not None:
check_dir(args.save_dir)
self.save_inference_model(args.save_dir)
return results
def add_module_config_arg(self):
"""
Add the command config options.
"""
self.arg_config_group.add_argument(
'--use_gpu',
type=ast.literal_eval,
default=False,
help="whether use GPU or not")
self.arg_config_group.add_argument(
'--output_dir',
type=str,
default='humanseg_output',
help="The directory to save output images.")
self.arg_config_group.add_argument(
'--save_dir',
type=str,
default='humanseg_model',
help="The directory to save model.")
self.arg_config_group.add_argument(
'--visualization',
type=ast.literal_eval,
default=True,
help="whether to save output as images.")
self.arg_config_group.add_argument(
'--batch_size',
type=ast.literal_eval,
default=1,
help="batch size.")
def add_module_input_arg(self):
"""
Add the command input options.
"""
self.arg_input_group.add_argument(
'--input_path', type=str, help="path to image.")
if __name__ == "__main__":
m = ShufflenetHumanSeg()
import cv2
img = cv2.imread('./meditation.jpg')
res = m.segment(images=[img])
# -*- coding:utf-8 -*-
import os
import time
import base64
import cv2
import numpy as np
__all__ = ['cv2_to_base64', 'base64_to_cv2', 'postprocess']
def cv2_to_base64(image):
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tostring()).decode('utf8')
def base64_to_cv2(b64str):
data = base64.b64decode(b64str.encode('utf8'))
data = np.fromstring(data, np.uint8)
data = cv2.imdecode(data, cv2.IMREAD_COLOR)
return data
def postprocess(data_out,
org_im,
org_im_shape,
org_im_path,
output_dir,
visualization,
thresh=120):
"""
Postprocess output of network. one image at a time.
Args:
data_out (numpy.ndarray): output of network.
org_im (numpy.ndarray): original image.
org_im_shape (list): shape pf original image.
org_im_path (list): path of riginal image.
output_dir (str): output directory to store image.
visualization (bool): whether to save image or not.
thresh (float): threshold.
Returns:
result (dict): The data of processed image.
"""
result = dict()
for logit in data_out:
logit = np.squeeze(logit * 255, axis=2).astype(np.uint8)
logit = cv2.resize(logit, (org_im_shape[1], org_im_shape[0]))
rgba = np.concatenate((org_im, np.expand_dims(logit, axis=2)), axis=2)
if visualization:
check_dir(output_dir)
save_im_path = get_save_image_name(org_im, org_im_path, output_dir)
cv2.imwrite(save_im_path, rgba)
result['save_path'] = save_im_path
result['data'] = logit
else:
result['data'] = logit
return result
def check_dir(dir_path):
if not os.path.exists(dir_path):
os.makedirs(dir_path)
elif os.path.isfile(dir_path):
os.remove(dir_path)
os.makedirs(dir_path)
def get_save_image_name(org_im, org_im_path, output_dir):
"""
Get save image name from source image path.
"""
# name prefix of orginal image
org_im_name = os.path.split(org_im_path)[-1]
im_prefix = os.path.splitext(org_im_name)[0]
ext = '.png'
# save image path
save_im_path = os.path.join(output_dir, im_prefix + ext)
if os.path.exists(save_im_path):
save_im_path = os.path.join(
output_dir, im_prefix + 'time={}'.format(int(time.time())) + ext)
return save_im_path
## 模型概述
HumanSeg-mobile是基于HRNet(Deep High-Resolution Representation Learning for Visual Recognition)的人像分割网络。HRNet在特征提取过程中保持了高分辨率的信息,保持了物体的细节信息,并可通过控制每个分支的通道数调整模型的大小。HumanSeg-mobile采用了HRNet_w18_small_v1的网络结构,模型大小只有5.8M, 适用于移动端或服务端CPU的前置摄像头场景。
## 命令行预测
```
hub run humanseg_mobile --input_path "/PATH/TO/IMAGE"
```
## API
```python
def segment(images=None,
paths=None,
batch_size=1,
use_gpu=False,
visualization=True,
output_dir='humanseg_output')
```
预测API,用于人像分割。
**参数**
* images (list\[numpy.ndarray\]): 图片数据,ndarray.shape 为 \[H, W, C\],BGR格式;
* paths (list\[str\]): 图片的路径;
* batch\_size (int): batch 的大小;
* use\_gpu (bool): 是否使用 GPU预测,如果使用GPU预测,则在预测之前,请设置CUDA_VISIBLE_DEVICES环境变量,否则不用设置;
* visualization (bool): 是否将识别结果保存为图片文件;
* output\_dir (str): 图片的保存路径。
**返回**
* res (list\[dict\]): 识别结果的列表,列表中每一个元素为 dict,关键字有 'save\_path', 'data',对应的取值为:
* save\_path (str, optional): 可视化图片的保存路径(仅当visualization=True时存在);
* data (numpy.ndarray): 人像分割结果,仅包含Alpha通道,取值为0-255 (0为全透明,255为不透明),也即取值越大的像素点越可能为人体,取值越小的像素点越可能为背景。
```python
def save_inference_model(dirname,
model_filename=None,
params_filename=None,
combined=True)
```
将模型保存到指定路径。
**参数**
* dirname: 存在模型的目录名称
* model\_filename: 模型文件名称,默认为\_\_model\_\_
* params\_filename: 参数文件名称,默认为\_\_params\_\_(仅当`combined`为True时生效)
* combined: 是否将参数保存到统一的一个文件中
## 代码示例
```python
import cv2
import paddlehub as hub
human_seg = hub.Module('humanseg_mobile')
im = cv2.imread('/PATH/TO/IMAGE')
res = human_seg.segment(images=[im],visualization=True)
```
## 服务部署
PaddleHub Serving可以部署一个人像分割的在线服务。
## 第一步:启动PaddleHub Serving
运行启动命令:
```shell
$ hub serving start -m humanseg_mobile
```
这样就完成了一个人像分割的服务化API的部署,默认端口号为8866。
**NOTE:** 如使用GPU预测,则需要在启动服务之前,设置CUDA_VISIBLE_DEVICES环境变量,否则不用设置。
## 第二步:发送预测请求
配置好服务端,以下数行代码即可实现发送预测请求,获取预测结果
```python
import requests
import json
import base64
import cv2
import numpy as np
def cv2_to_base64(image):
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tostring()).decode('utf8')
def base64_to_cv2(b64str):
data = base64.b64decode(b64str.encode('utf8'))
data = np.fromstring(data, np.uint8)
data = cv2.imdecode(data, cv2.IMREAD_COLOR)
return data
# 发送HTTP请求
data = {'images':[cv2_to_base64(cv2.imread('/PATH/TO/IMAGE'))]}
headers = {"Content-type": "application/json"}
url = "http://127.0.0.1:8866/predict/humanseg_mobile"
r = requests.post(url=url, headers=headers, data=json.dumps(data))
# 打印预测结果
print(base64_to_cv2(r.json()["results"][0]['data']))
```
### 依赖
paddlepaddle >= 1.8.1
paddlehub >= 1.7.1
# -*- coding:utf-8 -*-
import os
import time
from collections import OrderedDict
import cv2
import numpy as np
__all__ = ['reader']
def reader(images=None, paths=None):
"""
Preprocess to yield image.
Args:
images (list(numpy.ndarray)): images data, shape of each is [H, W, C]
paths (list[str]): paths to images.
Yield:
each (collections.OrderedDict): info of original image, preprocessed image.
"""
component = list()
if paths:
for im_path in paths:
each = OrderedDict()
assert os.path.isfile(
im_path), "The {} isn't a valid file path.".format(im_path)
#print(im_path)
im = cv2.imread(im_path).astype('float32')
each['org_im'] = im
each['org_im_path'] = im_path
each['org_im_shape'] = im.shape
component.append(each)
if images is not None:
assert type(images) is list, "images should be a list."
for im in images:
each = OrderedDict()
each['org_im'] = im
each['org_im_path'] = 'ndarray_time={}'.format(
round(time.time(), 6) * 1e6)
each['org_im_shape'] = im.shape
component.append(each)
for element in component:
img = element['org_im'].copy()
img = cv2.resize(img, (192, 192)).astype(np.float32)
img_mean = np.array([0.5, 0.5, 0.5]).reshape((3, 1, 1))
img_std = np.array([0.5, 0.5, 0.5]).reshape((3, 1, 1))
img = img.transpose((2, 0, 1)) / 255
img -= img_mean
img /= img_std
element['image'] = img
yield element
# -*- coding:utf-8 -*-
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ast
import os
import argparse
import numpy as np
import paddle.fluid as fluid
import paddlehub as hub
from paddle.fluid.core import PaddleTensor, AnalysisConfig, create_paddle_predictor
from paddlehub.module.module import moduleinfo, runnable, serving
from humanseg_mobile.processor import postprocess, base64_to_cv2, cv2_to_base64, check_dir
from humanseg_mobile.data_feed import reader
@moduleinfo(
name="humanseg_mobile",
type="CV/semantic_segmentation",
author="paddlepaddle",
author_email="",
summary="HRNet_w18_samll_v1 is a semantic segmentation model.",
version="1.0.0")
class HRNetw18samllv1humanseg(hub.Module):
def _initialize(self):
self.default_pretrained_model_path = os.path.join(
self.directory, "humanseg_mobile_inference")
self._set_config()
def _set_config(self):
"""
predictor config setting
"""
self.model_file_path = os.path.join(self.default_pretrained_model_path,
'__model__')
self.params_file_path = os.path.join(self.default_pretrained_model_path,
'__params__')
cpu_config = AnalysisConfig(self.model_file_path, self.params_file_path)
cpu_config.disable_glog_info()
cpu_config.disable_gpu()
self.cpu_predictor = create_paddle_predictor(cpu_config)
try:
_places = os.environ["CUDA_VISIBLE_DEVICES"]
int(_places[0])
use_gpu = True
except:
use_gpu = False
if use_gpu:
gpu_config = AnalysisConfig(self.model_file_path,
self.params_file_path)
gpu_config.disable_glog_info()
gpu_config.enable_use_gpu(
memory_pool_init_size_mb=1000, device_id=0)
self.gpu_predictor = create_paddle_predictor(gpu_config)
def segment(self,
images=None,
paths=None,
data=None,
batch_size=1,
use_gpu=False,
visualization=False,
output_dir='humanseg_output'):
"""
API for human segmentation.
Args:
images (list(numpy.ndarray)): images data, shape of each is [H, W, C], the color space is BGR.
paths (list[str]): The paths of images.
data (dict): key is 'image', the corresponding value is the path to image.
batch_size (int): batch size.
use_gpu (bool): Whether to use gpu.
visualization (bool): Whether to save image or not.
output_dir (str): The path to store output images.
Returns:
res (list[dict]): each element in the list is a dict, the keys and values are:
save_path (str, optional): the path to save images. (Exists only if visualization is True)
data (numpy.ndarray): data of post processed image.
"""
if use_gpu:
try:
_places = os.environ["CUDA_VISIBLE_DEVICES"]
int(_places[0])
except:
raise RuntimeError(
"Environment Variable CUDA_VISIBLE_DEVICES is not set correctly. If you wanna use gpu, please set CUDA_VISIBLE_DEVICES as cuda_device_id."
)
# compatibility with older versions
if data and 'image' in data:
if paths is None:
paths = list()
paths += data['image']
all_data = list()
for yield_data in reader(images, paths):
all_data.append(yield_data)
total_num = len(all_data)
loop_num = int(np.ceil(total_num / batch_size))
res = list()
for iter_id in range(loop_num):
batch_data = list()
handle_id = iter_id * batch_size
for image_id in range(batch_size):
try:
batch_data.append(all_data[handle_id + image_id])
except:
pass
# feed batch image
batch_image = np.array([data['image'] for data in batch_data])
batch_image = PaddleTensor(batch_image.copy())
output = self.gpu_predictor.run([
batch_image
]) if use_gpu else self.cpu_predictor.run([batch_image])
output = np.expand_dims(output[0].as_ndarray(), axis=1)
# postprocess one by one
for i in range(len(batch_data)):
out = postprocess(
data_out=output[i],
org_im=batch_data[i]['org_im'],
org_im_shape=batch_data[i]['org_im_shape'],
org_im_path=batch_data[i]['org_im_path'],
output_dir=output_dir,
visualization=visualization)
res.append(out)
return res
def save_inference_model(self,
dirname,
model_filename=None,
params_filename=None,
combined=True):
if combined:
model_filename = "__model__" if not model_filename else model_filename
params_filename = "__params__" if not params_filename else params_filename
place = fluid.CPUPlace()
exe = fluid.Executor(place)
program, feeded_var_names, target_vars = fluid.io.load_inference_model(
dirname=self.default_pretrained_model_path,
model_filename=model_filename,
params_filename=params_filename,
executor=exe)
fluid.io.save_inference_model(
dirname=dirname,
main_program=program,
executor=exe,
feeded_var_names=feeded_var_names,
target_vars=target_vars,
model_filename=model_filename,
params_filename=params_filename)
@serving
def serving_method(self, images, **kwargs):
"""
Run as a service.
"""
images_decode = [base64_to_cv2(image) for image in images]
results = self.segment(images=images_decode, **kwargs)
results = [{
'data': cv2_to_base64(result['data'])
} for result in results]
return results
@runnable
def run_cmd(self, argvs):
"""
Run as a command.
"""
self.parser = argparse.ArgumentParser(
description="Run the {} module.".format(self.name),
prog='hub run {}'.format(self.name),
usage='%(prog)s',
add_help=True)
self.arg_input_group = self.parser.add_argument_group(
title="Input options", description="Input data. Required")
self.arg_config_group = self.parser.add_argument_group(
title="Config options",
description=
"Run configuration for controlling module behavior, not required.")
self.add_module_config_arg()
self.add_module_input_arg()
args = self.parser.parse_args(argvs)
results = self.segment(
paths=[args.input_path],
batch_size=args.batch_size,
use_gpu=args.use_gpu,
output_dir=args.output_dir,
visualization=args.visualization)
if args.save_dir is not None:
check_dir(args.save_dir)
self.save_inference_model(args.save_dir)
return results
def add_module_config_arg(self):
"""
Add the command config options.
"""
self.arg_config_group.add_argument(
'--use_gpu',
type=ast.literal_eval,
default=False,
help="whether use GPU or not")
self.arg_config_group.add_argument(
'--output_dir',
type=str,
default='humanseg_output',
help="The directory to save output images.")
self.arg_config_group.add_argument(
'--save_dir',
type=str,
default='humanseg_model',
help="The directory to save model.")
self.arg_config_group.add_argument(
'--visualization',
type=ast.literal_eval,
default=False,
help="whether to save output as images.")
self.arg_config_group.add_argument(
'--batch_size',
type=ast.literal_eval,
default=1,
help="batch size.")
def add_module_input_arg(self):
"""
Add the command input options.
"""
self.arg_input_group.add_argument(
'--input_path', type=str, help="path to image.")
if __name__ == "__main__":
m = HRNetw18samllv1humanseg()
import cv2
img = cv2.imread('./meditation.jpg')
res = m.segment(images=[img], visualization=True)
print(res[0]['data'])
# -*- coding:utf-8 -*-
import os
import time
import base64
import cv2
import numpy as np
__all__ = ['cv2_to_base64', 'base64_to_cv2', 'postprocess']
def cv2_to_base64(image):
data = cv2.imencode('.jpg', image)[1]
return base64.b64encode(data.tostring()).decode('utf8')
def base64_to_cv2(b64str):
data = base64.b64decode(b64str.encode('utf8'))
data = np.fromstring(data, np.uint8)
data = cv2.imdecode(data, cv2.IMREAD_COLOR)
return data
def postprocess(data_out,
org_im,
org_im_shape,
org_im_path,
output_dir,
visualization,
thresh=120):
"""
Postprocess output of network. one image at a time.
Args:
data_out (numpy.ndarray): output of network.
org_im (numpy.ndarray): original image.
org_im_shape (list): shape pf original image.
org_im_path (list): path of riginal image.
output_dir (str): output directory to store image.
visualization (bool): whether to save image or not.
thresh (float): threshold.
Returns:
result (dict): The data of processed image.
"""
result = dict()
for logit in data_out:
logit = np.squeeze(logit * 255, axis=2).astype(np.uint8)
logit = cv2.resize(logit, (org_im_shape[1], org_im_shape[0]))
rgba = np.concatenate((org_im, np.expand_dims(logit, axis=2)), axis=2)
if visualization:
check_dir(output_dir)
save_im_path = get_save_image_name(org_im, org_im_path, output_dir)
cv2.imwrite(save_im_path, rgba)
result['save_path'] = save_im_path
result['data'] = logit
else:
result['data'] = logit
return result
def check_dir(dir_path):
if not os.path.exists(dir_path):
os.makedirs(dir_path)
elif os.path.isfile(dir_path):
os.remove(dir_path)
os.makedirs(dir_path)
def get_save_image_name(org_im, org_im_path, output_dir):
"""
Get save image name from source image path.
"""
# name prefix of orginal image
org_im_name = os.path.split(org_im_path)[-1]
im_prefix = os.path.splitext(org_im_name)[0]
ext = '.png'
# save image path
save_im_path = os.path.join(output_dir, im_prefix + ext)
if os.path.exists(save_im_path):
save_im_path = os.path.join(
output_dir, im_prefix + 'time={}'.format(int(time.time())) + ext)
return save_im_path
name: humanseg_lite
dir: "modules/image/semantic_segmentation/humanseg_lite"
exclude:
- README.md
resources:
-
url: https://bj.bcebos.com/paddlehub/model/image/semantic_segmentation/humanseg_lite_inference.tar.gz
dest: .
uncompress: True
name: humanseg_mobile
dir: "modules/image/semantic_segmentation/humanseg_mobile"
exclude:
- README.md
resources:
-
url: https://bj.bcebos.com/paddlehub/model/image/semantic_segmentation/humanseg_mobile_inference.tar.gz
dest: .
uncompress: True
../image_dataset/human_segmentation/image/ache-adult-depression-expression-41253.jpg
../image_dataset/human_segmentation/image/allergy-cold-disease-flu-41284.jpg
../image_dataset/human_segmentation/image/bored-female-girl-people-41321.jpg
../image_dataset/human_segmentation/image/colors-hairdresser-cutting-colorimetry-159780.jpg
../image_dataset/human_segmentation/image/pexels-photo-206339.jpg
# coding=utf-8
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import time
import unittest
import cv2
import numpy as np
import paddle.fluid as fluid
import paddlehub as hub
pic_dir = '../image_dataset/human_segmentation/image/'
imgpath = [
'../image_dataset/human_segmentation/image/ache-adult-depression-expression-41253.jpg',
'../image_dataset/human_segmentation/image/allergy-cold-disease-flu-41284.jpg',
'../image_dataset/human_segmentation/image/bored-female-girl-people-41321.jpg',
'../image_dataset/human_segmentation/image/colors-hairdresser-cutting-colorimetry-159780.jpg',
'../image_dataset/human_segmentation/image/pexels-photo-206339.jpg'
]
class TestHumanSeg(unittest.TestCase):
@classmethod
def setUpClass(self):
"""Prepare the environment once before execution of all tests.\n"""
self.human_seg = hub.Module(name="humanseg_lite")
@classmethod
def tearDownClass(self):
"""clean up the environment after the execution of all tests.\n"""
self.human_seg = None
def setUp(self):
"Call setUp() to prepare environment\n"
self.test_prog = fluid.Program()
def tearDown(self):
"Call tearDown to restore environment.\n"
self.test_prog = None
def test_single_pic(self):
with fluid.program_guard(self.test_prog):
pics_path_list = [
os.path.join(pic_dir, f) for f in os.listdir(pic_dir)
]
img = cv2.imread(pics_path_list[0])
result = self.human_seg.segment(
images=[img], use_gpu=False, visualization=True)
print(result[0]['data'])
def test_batch(self):
with fluid.program_guard(self.test_prog):
pics_path_list = [
os.path.join(pic_dir, f) for f in os.listdir(pic_dir)
]
result = self.human_seg.segment(
paths=imgpath,
batch_size=2,
output_dir='batch_output_shuffle',
use_gpu=False,
visualization=True)
print(result)
def test_ndarray(self):
with fluid.program_guard(self.test_prog):
pics_path_list = [
os.path.join(pic_dir, f) for f in os.listdir(pic_dir)
]
pics_ndarray = list()
for pic_path in pics_path_list:
img = cv2.imread(pic_path)
result = self.human_seg.segment(
images=[img],
output_dir='ndarray_output_shuffle',
use_gpu=False,
visualization=True)
def test_save_inference_model(self):
with fluid.program_guard(self.test_prog):
self.human_seg.save_inference_model(
dirname='humanseg_lite', combined=True)
if __name__ == "__main__":
suite = unittest.TestSuite()
suite.addTest(TestHumanSeg('test_single_pic'))
suite.addTest(TestHumanSeg('test_batch'))
suite.addTest(TestHumanSeg('test_ndarray'))
suite.addTest(TestHumanSeg('test_save_inference_model'))
runner = unittest.TextTestRunner(verbosity=2)
runner.run(suite)
# coding=utf-8
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import time
import unittest
import cv2
import numpy as np
import paddle.fluid as fluid
import paddlehub as hub
pic_dir = '../image_dataset/human_segmentation/image/'
imgpath = [
'../image_dataset/human_segmentation/image/ache-adult-depression-expression-41253.jpg',
'../image_dataset/human_segmentation/image/allergy-cold-disease-flu-41284.jpg',
'../image_dataset/human_segmentation/image/bored-female-girl-people-41321.jpg',
'../image_dataset/human_segmentation/image/colors-hairdresser-cutting-colorimetry-159780.jpg',
'../image_dataset/human_segmentation/image/pexels-photo-206339.jpg'
]
class TestHumanSeg(unittest.TestCase):
@classmethod
def setUpClass(self):
"""Prepare the environment once before execution of all tests.\n"""
self.human_seg = hub.Module(name="humanseg_mobile")
@classmethod
def tearDownClass(self):
"""clean up the environment after the execution of all tests.\n"""
self.human_seg = None
def setUp(self):
"Call setUp() to prepare environment\n"
self.test_prog = fluid.Program()
def tearDown(self):
"Call tearDown to restore environment.\n"
self.test_prog = None
def test_single_pic(self):
with fluid.program_guard(self.test_prog):
pics_path_list = [
os.path.join(pic_dir, f) for f in os.listdir(pic_dir)
]
img = cv2.imread(pics_path_list[0])
result = self.human_seg.segment(
images=[img], use_gpu=False, visualization=True)
print(result[0]['data'])
def test_batch(self):
with fluid.program_guard(self.test_prog):
result = self.human_seg.segment(
paths=imgpath,
batch_size=2,
output_dir='batch_output_hrnet',
use_gpu=False,
visualization=True)
print(result)
def test_ndarray(self):
with fluid.program_guard(self.test_prog):
pics_path_list = [
os.path.join(pic_dir, f) for f in os.listdir(pic_dir)
]
pics_ndarray = list()
for pic_path in pics_path_list:
result = self.human_seg.segment(
images=[cv2.imread(pic_path)],
output_dir='ndarray_output_hrnet',
use_gpu=False,
visualization=True)
def test_save_inference_model(self):
with fluid.program_guard(self.test_prog):
self.human_seg.save_inference_model(
dirname='humanseg_mobile', combined=True)
if __name__ == "__main__":
suite = unittest.TestSuite()
suite.addTest(TestHumanSeg('test_single_pic'))
suite.addTest(TestHumanSeg('test_batch'))
suite.addTest(TestHumanSeg('test_ndarray'))
suite.addTest(TestHumanSeg('test_save_inference_model'))
runner = unittest.TextTestRunner(verbosity=2)
runner.run(suite)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册