# copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import os.path as osp import psutil import cv2 import numpy as np import yaml import paddlex import paddle.fluid as fluid from paddlex.cv.transforms import build_transforms from paddlex.cv.models import BaseClassifier from paddlex.cv.models import PPYOLO, FasterRCNN, MaskRCNN from paddlex.cv.models import DeepLabv3p class Predictor: def __init__(self, model_dir, use_gpu=True, gpu_id=0, use_mkl=False, mkl_thread_num=psutil.cpu_count(), use_trt=False, use_glog=False, memory_optimize=True): """ 创建Paddle Predictor Args: model_dir: 模型路径(必须是导出的部署或量化模型) use_gpu: 是否使用gpu,默认True gpu_id: 使用gpu的id,默认0 use_mkl: 是否使用mkldnn计算库,CPU情况下使用,默认False mkl_thread_num: mkldnn计算线程数,默认为4 use_trt: 是否使用TensorRT,默认False use_glog: 是否启用glog日志, 默认False memory_optimize: 是否启动内存优化,默认True """ if not osp.isdir(model_dir): raise Exception("[ERROR] Path {} not exist.".format(model_dir)) if not osp.exists(osp.join(model_dir, "model.yml")): raise Exception("There's not model.yml in {}".format(model_dir)) with open(osp.join(model_dir, "model.yml")) as f: self.info = yaml.load(f.read(), Loader=yaml.Loader) self.status = self.info['status'] if self.status != "Quant" and self.status != "Infer": raise Exception("[ERROR] Only quantized model or exported " "inference model is supported.") self.model_dir = model_dir self.model_type = self.info['_Attributes']['model_type'] self.model_name = self.info['Model'] self.num_classes = self.info['_Attributes']['num_classes'] self.labels = self.info['_Attributes']['labels'] if self.info['Model'] == 'MaskRCNN': if self.info['_init_params']['with_fpn']: self.mask_head_resolution = 28 else: self.mask_head_resolution = 14 transforms_mode = self.info.get('TransformsMode', 'RGB') if transforms_mode == 'RGB': to_rgb = True else: to_rgb = False self.transforms = build_transforms(self.model_type, self.info['Transforms'], to_rgb) self.predictor = self.create_predictor(use_gpu, gpu_id, use_mkl, mkl_thread_num, use_trt, use_glog, memory_optimize) def create_predictor(self, use_gpu=True, gpu_id=0, use_mkl=False, mkl_thread_num=psutil.cpu_count(), use_trt=False, use_glog=False, memory_optimize=True): config = fluid.core.AnalysisConfig( os.path.join(self.model_dir, '__model__'), os.path.join(self.model_dir, '__params__')) if use_gpu: # 设置GPU初始显存(单位M)和Device ID config.enable_use_gpu(100, gpu_id) else: config.disable_gpu() if use_mkl: if self.model_name not in ["HRNet", "DeepLabv3p"]: config.enable_mkldnn() config.set_cpu_math_library_num_threads(mkl_thread_num) if use_glog: config.enable_glog_info() else: config.disable_glog_info() if memory_optimize: config.enable_memory_optim() # 开启计算图分析优化,包括OP融合等 config.switch_ir_optim(True) # 关闭feed和fetch OP使用,使用ZeroCopy接口必须设置此项 config.switch_use_feed_fetch_ops(False) predictor = fluid.core.create_paddle_predictor(config) return predictor def preprocess(self, image, thread_num=1): """ 对图像做预处理 Args: image(list|tuple): 数组中的元素可以是图像路径,也可以是解码后的排列格式为(H,W,C) 且类型为float32且为BGR格式的数组。 """ res = dict() if self.model_type == "classifier": im = BaseClassifier._preprocess( image, self.transforms, self.model_type, self.model_name, thread_num=thread_num) res['image'] = im elif self.model_type == "detector": if self.model_name in ["PPYOLO", "YOLOv3"]: im, im_size = PPYOLO._preprocess( image, self.transforms, self.model_type, self.model_name, thread_num=thread_num) res['image'] = im res['im_size'] = im_size if self.model_name.count('RCNN') > 0: im, im_resize_info, im_shape = FasterRCNN._preprocess( image, self.transforms, self.model_type, self.model_name, thread_num=thread_num) res['image'] = im res['im_info'] = im_resize_info res['im_shape'] = im_shape elif self.model_type == "segmenter": im, im_info = DeepLabv3p._preprocess( image, self.transforms, self.model_type, self.model_name, thread_num=thread_num) res['image'] = im res['im_info'] = im_info return res def postprocess(self, results, topk=1, batch_size=1, im_shape=None, im_info=None): """ 对预测结果做后处理 Args: results (list): 预测结果 topk (int): 分类预测时前k个最大值 batch_size (int): 预测时图像批量大小 im_shape (list): MaskRCNN的图像输入大小 im_info (list):RCNN系列和分割网络的原图大小 """ def offset_to_lengths(lod): offset = lod[0] lengths = [ offset[i + 1] - offset[i] for i in range(len(offset) - 1) ] return [lengths] if self.model_type == "classifier": true_topk = min(self.num_classes, topk) preds = BaseClassifier._postprocess([results[0][0]], true_topk, self.labels) elif self.model_type == "detector": res = {'bbox': (results[0][0], offset_to_lengths(results[0][1])), } res['im_id'] = (np.array( [[i] for i in range(batch_size)]).astype('int32'), [[]]) if self.model_name in ["PPYOLO", "YOLOv3"]: preds = PPYOLO._postprocess(res, batch_size, self.num_classes, self.labels) elif self.model_name == "FasterRCNN": preds = FasterRCNN._postprocess(res, batch_size, self.num_classes, self.labels) elif self.model_name == "MaskRCNN": res['mask'] = (results[1][0], offset_to_lengths(results[1][1])) res['im_shape'] = (im_shape, []) preds = MaskRCNN._postprocess( res, batch_size, self.num_classes, self.mask_head_resolution, self.labels) elif self.model_type == "segmenter": res = [results[0][0], results[1][0]] preds = DeepLabv3p._postprocess(res, im_info) return preds def raw_predict(self, inputs): """ 接受预处理过后的数据进行预测 Args: inputs(tuple): 预处理过后的数据 """ for k, v in inputs.items(): try: tensor = self.predictor.get_input_tensor(k) except: continue tensor.copy_from_cpu(v) self.predictor.zero_copy_run() output_names = self.predictor.get_output_names() output_results = list() for name in output_names: output_tensor = self.predictor.get_output_tensor(name) output_tensor_lod = output_tensor.lod() output_results.append( [output_tensor.copy_to_cpu(), output_tensor_lod]) return output_results def predict(self, image, topk=1): """ 图片预测 Args: image(str|np.ndarray): 图像路径;或者是解码后的排列格式为(H, W, C)且类型为float32且为BGR格式的数组。 topk(int): 分类预测时使用,表示预测前topk的结果 """ preprocessed_input = self.preprocess([image]) model_pred = self.raw_predict(preprocessed_input) im_shape = None if 'im_shape' not in preprocessed_input else preprocessed_input[ 'im_shape'] im_info = None if 'im_info' not in preprocessed_input else preprocessed_input[ 'im_info'] results = self.postprocess( model_pred, topk=topk, batch_size=1, im_shape=im_shape, im_info=im_info) return results[0] def batch_predict(self, image_list, topk=1, thread_num=2): """ 图片预测 Args: image_list(list|tuple): 对列表(或元组)中的图像同时进行预测,列表中的元素可以是图像路径 也可以是解码后的排列格式为(H,W,C)且类型为float32且为BGR格式的数组。 thread_num (int): 并发执行各图像预处理时的线程数。 topk(int): 分类预测时使用,表示预测前topk的结果 """ preprocessed_input = self.preprocess(image_list) model_pred = self.raw_predict(preprocessed_input) im_shape = None if 'im_shape' not in preprocessed_input else preprocessed_input[ 'im_shape'] im_info = None if 'im_info' not in preprocessed_input else preprocessed_input[ 'im_info'] results = self.postprocess( model_pred, topk=topk, batch_size=len(image_list), im_shape=im_shape, im_info=im_info) return results