# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import sys import os import os.path as osp import time import cv2 import numpy as np import yaml from six import text_type as _text_type from openvino.inference_engine import IECore class Predictor: def __init__(self, model_xml, model_yaml, device="CPU"): self.device = device if not osp.exists(model_xml): print("model xml file is not exists in {}".format(model_xml)) self.model_xml = model_xml self.model_bin = osp.splitext(model_xml)[0] + ".bin" if not osp.exists(model_yaml): print("model yaml file is not exists in {}".format(model_yaml)) with open(model_yaml) as f: self.info = yaml.load(f.read(), Loader=yaml.Loader) self.model_type = self.info['_Attributes']['model_type'] self.model_name = self.info['Model'] self.num_classes = self.info['_Attributes']['num_classes'] self.labels = self.info['_Attributes']['labels'] if self.info['Model'] == 'MaskRCNN': if self.info['_init_params']['with_fpn']: self.mask_head_resolution = 28 else: self.mask_head_resolution = 14 transforms_mode = self.info.get('TransformsMode', 'RGB') if transforms_mode == 'RGB': to_rgb = True else: to_rgb = False self.transforms = self.build_transforms(self.info['Transforms'], to_rgb) self.predictor, self.net = self.create_predictor() self.total_time = 0 self.count_num = 0 def create_predictor(self): #initialization for specified device print("Creating Inference Engine") ie = IECore() print("Loading network files:\n\t{}\n\t{}".format(self.model_xml, self.model_bin)) net = ie.read_network(model=self.model_xml, weights=self.model_bin) net.batch_size = 1 network_config = {} if self.device == "MYRIAD": network_config = {'VPU_HW_STAGES_OPTIMIZATION': 'NO'} exec_net = ie.load_network( network=net, device_name=self.device, config=network_config) return exec_net, net def build_transforms(self, transforms_info, to_rgb=True): if self.model_type == "classifier": import transforms.cls_transforms as transforms elif self.model_type == "detector": import transforms.det_transforms as transforms elif self.model_type == "segmenter": import transforms.seg_transforms as transforms op_list = list() for op_info in transforms_info: op_name = list(op_info.keys())[0] op_attr = op_info[op_name] if not hasattr(transforms, op_name): raise Exception( "There's no operator named '{}' in transforms of {}". format(op_name, self.model_type)) op_list.append(getattr(transforms, op_name)(**op_attr)) eval_transforms = transforms.Compose(op_list) if hasattr(eval_transforms, 'to_rgb'): eval_transforms.to_rgb = to_rgb self.arrange_transforms(eval_transforms) return eval_transforms def arrange_transforms(self, eval_transforms): if self.model_type == 'classifier': import transforms.cls_transforms as transforms arrange_transform = transforms.ArrangeClassifier elif self.model_type == 'segmenter': import transforms.seg_transforms as transforms arrange_transform = transforms.ArrangeSegmenter elif self.model_type == 'detector': import transforms.det_transforms as transforms arrange_name = 'Arrange{}'.format(self.model_name) arrange_transform = getattr(transforms, arrange_name) else: raise Exception("Unrecognized model type: {}".format( self.model_type)) if type(eval_transforms.transforms[-1]).__name__.startswith('Arrange'): eval_transforms.transforms[-1] = arrange_transform(mode='test') else: eval_transforms.transforms.append(arrange_transform(mode='test')) def raw_predict(self, preprocessed_input): self.count_num += 1 feed_dict = {} if self.model_name == "YOLOv3": inputs = self.net.inputs for name in inputs: if (len(inputs[name].shape) == 2): feed_dict[name] = preprocessed_input['im_size'] elif (len(inputs[name].shape) == 4): feed_dict[name] = preprocessed_input['image'] else: pass else: input_blob = next(iter(self.net.inputs)) feed_dict[input_blob] = preprocessed_input['image'] #Start sync inference print("Starting inference in synchronous mode") res = self.predictor.infer(inputs=feed_dict) #Processing output blob print("Processing output blob") return res def preprocess(self, image): res = dict() if self.model_type == "classifier": im, = self.transforms(image) im = np.expand_dims(im, axis=0).copy() res['image'] = im elif self.model_type == "detector": if self.model_name == "YOLOv3": im, im_shape = self.transforms(image) im = np.expand_dims(im, axis=0).copy() im_shape = np.expand_dims(im_shape, axis=0).copy() res['image'] = im res['im_size'] = im_shape if self.model_name.count('RCNN') > 0: im, im_resize_info, im_shape = self.transforms(image) im = np.expand_dims(im, axis=0).copy() im_resize_info = np.expand_dims(im_resize_info, axis=0).copy() im_shape = np.expand_dims(im_shape, axis=0).copy() res['image'] = im res['im_info'] = im_resize_info res['im_shape'] = im_shape elif self.model_type == "segmenter": im, im_info = self.transforms(image) im = np.expand_dims(im, axis=0).copy() res['image'] = im res['im_info'] = im_info return res def classifier_postprocess(self, preds, topk=1): """ 对分类模型的预测结果做后处理 """ true_topk = min(self.num_classes, topk) output_name = next(iter(self.net.outputs)) pred_label = np.argsort(-preds[output_name][0])[:true_topk] result = [{ 'category_id': l, 'category': self.labels[l], 'score': preds[output_name][0][l], } for l in pred_label] print(result) return result def segmenter_postprocess(self, preds, preprocessed_inputs): """ 对语义分割结果做后处理 """ it = iter(self.net.outputs) next(it) score_name = next(it) score_map = np.squeeze(preds[score_name]) score_map = np.transpose(score_map, (1, 2, 0)) label_name = next(it) label_map = np.squeeze(preds[label_name]).astype('uint8') im_info = preprocessed_inputs['im_info'] for info in im_info[::-1]: if info[0] == 'resize': w, h = info[1][1], info[1][0] label_map = cv2.resize(label_map, (w, h), cv2.INTER_NEAREST) score_map = cv2.resize(score_map, (w, h), cv2.INTER_LINEAR) elif info[0] == 'padding': w, h = info[1][1], info[1][0] label_map = label_map[0:h, 0:w] score_map = score_map[0:h, 0:w, :] else: raise Exception("Unexpected info '{}' in im_info".format(info[ 0])) return {'label_map': label_map, 'score_map': score_map} def detector_postprocess(self, preds, preprocessed_inputs): """对图像检测结果做后处理 """ output_name = next(iter(self.net.outputs)) outputs = preds[output_name][0] result = [] for out in outputs: if (out[0] > 0): result.append(out.tolist()) else: pass print(result) return result def predict(self, image, topk=1, threshold=0.5): preprocessed_input = self.preprocess(image) model_pred = self.raw_predict(preprocessed_input) if self.model_type == "classifier": results = self.classifier_postprocess(model_pred, topk) elif self.model_type == "detector": results = self.detector_postprocess(model_pred, preprocessed_input) elif self.model_type == "segmenter": results = self.segmenter_postprocess(model_pred, preprocessed_input)