# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import time import numpy as np from PIL import Image, ImageDraw import cv2 import yaml import copy import argparse import logging import paddle.fluid as fluid import json FORMAT = '%(asctime)s-%(levelname)s: %(message)s' logging.basicConfig(level=logging.INFO, format=FORMAT) logger = logging.getLogger(__name__) precision_map = { 'trt_int8': fluid.core.AnalysisConfig.Precision.Int8, 'trt_fp32': fluid.core.AnalysisConfig.Precision.Float32, 'trt_fp16': fluid.core.AnalysisConfig.Precision.Half } class Resize(object): def __init__(self, target_size, max_size=0, interp=cv2.INTER_LINEAR, use_cv2=True, image_shape=None): super(Resize, self).__init__() self.target_size = target_size self.max_size = max_size self.interp = interp self.use_cv2 = use_cv2 self.image_shape = image_shape def __call__(self, im): origin_shape = im.shape[:2] im_c = im.shape[2] if self.max_size != 0: im_size_min = np.min(origin_shape[0:2]) im_size_max = np.max(origin_shape[0:2]) im_scale = float(self.target_size) / float(im_size_min) if np.round(im_scale * im_size_max) > self.max_size: im_scale = float(self.max_size) / float(im_size_max) im_scale_x = im_scale im_scale_y = im_scale resize_w = int(im_scale_x * float(origin_shape[1])) resize_h = int(im_scale_y * float(origin_shape[0])) else: im_scale_x = float(self.target_size) / float(origin_shape[1]) im_scale_y = float(self.target_size) / float(origin_shape[0]) resize_w = self.target_size resize_h = self.target_size if self.use_cv2: im = cv2.resize( im, None, None, fx=im_scale_x, fy=im_scale_y, interpolation=self.interp) else: if self.max_size != 0: raise TypeError( 'If you set max_size to cap the maximum size of image,' 'please set use_cv2 to True to resize the image.') im = im.astype('uint8') im = Image.fromarray(im) im = im.resize((int(resize_w), int(resize_h)), self.interp) im = np.array(im) # padding im if self.max_size != 0 and self.image_shape is not None: padding_im = np.zeros( (self.max_size, self.max_size, im_c), dtype=np.float32) im_h, im_w = im.shape[:2] padding_im[:im_h, :im_w, :] = im im = padding_im return im, im_scale_x class Normalize(object): def __init__(self, mean, std, is_scale=True, is_channel_first=False): super(Normalize, self).__init__() self.mean = mean self.std = std self.is_scale = is_scale self.is_channel_first = is_channel_first def __call__(self, im): im = im.astype(np.float32, copy=False) if self.is_channel_first: mean = np.array(self.mean)[:, np.newaxis, np.newaxis] std = np.array(self.std)[:, np.newaxis, np.newaxis] else: mean = np.array(self.mean)[np.newaxis, np.newaxis, :] std = np.array(self.std)[np.newaxis, np.newaxis, :] if self.is_scale: im = im / 255.0 im -= mean im /= std return im class Permute(object): def __init__(self, to_bgr=False, channel_first=True): self.to_bgr = to_bgr self.channel_first = channel_first def __call__(self, im): if self.channel_first: im = im.transpose((2, 0, 1)) if self.to_bgr: im = im[[2, 1, 0], :, :] return im.copy() class PadStride(object): def __init__(self, stride=0): assert stride >= 0, "Unsupported stride: {}," " the stride in PadStride must be greater " "or equal to 0".format(stride) self.coarsest_stride = stride def __call__(self, im): coarsest_stride = self.coarsest_stride if coarsest_stride == 0: return im im_c, im_h, im_w = im.shape pad_h = int(np.ceil(float(im_h) / coarsest_stride) * coarsest_stride) pad_w = int(np.ceil(float(im_w) / coarsest_stride) * coarsest_stride) padding_im = np.zeros((im_c, pad_h, pad_w), dtype=np.float32) padding_im[:, :im_h, :im_w] = im return padding_im class Detection(): def __init__(self, config_path, output_dir): self.config_path = config_path self.if_visualize = True self.if_dump_result = True self.output_dir = output_dir def DecodeImage(self, im_path): assert os.path.exists(im_path), "Image path {} can not be found".format( im_path) with open(im_path, 'rb') as f: im = f.read() data = np.frombuffer(im, dtype='uint8') im = cv2.imdecode(data, 1) # BGR mode, but need RGB mode im = cv2.cvtColor(im, cv2.COLOR_BGR2RGB) return im def Preprocess(self, img_path, arch, config): img = self.DecodeImage(img_path) orig_shape = img.shape scale = 1. data = [] data_config = copy.deepcopy(config) for data_aug_conf in data_config: obj = data_aug_conf.pop('type') preprocess = eval(obj)(**data_aug_conf) if obj == 'Resize': img, scale = preprocess(img) else: img = preprocess(img) img = img[np.newaxis, :] # N, C, H, W data.append(img) extra_info = self.get_extra_info(img, arch, orig_shape, scale) data += extra_info return data def expand_boxes(self, boxes, scale): """ Expand an array of boxes by a given scale. """ w_half = (boxes[:, 2] - boxes[:, 0]) * .5 h_half = (boxes[:, 3] - boxes[:, 1]) * .5 x_c = (boxes[:, 2] + boxes[:, 0]) * .5 y_c = (boxes[:, 3] + boxes[:, 1]) * .5 w_half *= scale h_half *= scale boxes_exp = np.zeros(boxes.shape) boxes_exp[:, 0] = x_c - w_half boxes_exp[:, 2] = x_c + w_half boxes_exp[:, 1] = y_c - h_half boxes_exp[:, 3] = y_c + h_half return boxes_exp def mask2out(self, results, clsid2catid, resolution, thresh_binarize=0.5): import pycocotools.mask as mask_util scale = (resolution + 2.0) / resolution segm_res = [] for t in results: bboxes = t['bbox'][0] lengths = t['bbox'][1][0] if bboxes.shape == (1, 1) or bboxes is None: continue if len(bboxes.tolist()) == 0: continue masks = t['mask'][0] s = 0 # for each sample for i in range(len(lengths)): num = lengths[i] im_shape = t['im_shape'][i] bbox = bboxes[s:s + num][:, 2:] clsid_scores = bboxes[s:s + num][:, 0:2] mask = masks[s:s + num] s += num im_h = int(im_shape[0]) im_w = int(im_shape[1]) expand_bbox = expand_boxes(bbox, scale) expand_bbox = expand_bbox.astype(np.int32) padded_mask = np.zeros( (resolution + 2, resolution + 2), dtype=np.float32) for j in range(num): xmin, ymin, xmax, ymax = expand_bbox[j].tolist() clsid, score = clsid_scores[j].tolist() clsid = int(clsid) padded_mask[1:-1, 1:-1] = mask[j, clsid, :, :] catid = clsid2catid[clsid] w = xmax - xmin + 1 h = ymax - ymin + 1 w = np.maximum(w, 1) h = np.maximum(h, 1) resized_mask = cv2.resize(padded_mask, (w, h)) resized_mask = np.array( resized_mask > thresh_binarize, dtype=np.uint8) im_mask = np.zeros((im_h, im_w), dtype=np.uint8) x0 = min(max(xmin, 0), im_w) x1 = min(max(xmax + 1, 0), im_w) y0 = min(max(ymin, 0), im_h) y1 = min(max(ymax + 1, 0), im_h) im_mask[y0:y1, x0:x1] = resized_mask[(y0 - ymin):( y1 - ymin), (x0 - xmin):(x1 - xmin)] segm = mask_util.encode( np.array( im_mask[:, :, np.newaxis], order='F'))[0] catid = clsid2catid[clsid] segm['counts'] = segm['counts'].decode('utf8') coco_res = { 'category_id': catid, 'segmentation': segm, 'score': score } segm_res.append(coco_res) return segm_res def draw_bbox(self, image, catid2name, bboxes, threshold, color_list): """ draw bbox on image """ draw = ImageDraw.Draw(image) for dt in np.array(bboxes): catid, bbox, score = dt['category_id'], dt['bbox'], dt['score'] if score < threshold: continue xmin, ymin, w, h = bbox xmax = xmin + w ymax = ymin + h color = tuple(color_list[catid]) # draw bbox draw.line( [(xmin, ymin), (xmin, ymax), (xmax, ymax), (xmax, ymin), (xmin, ymin)], width=2, fill=color) # draw label text = "{} {:.2f}".format(catid2name[catid], score) tw, th = draw.textsize(text) draw.rectangle( [(xmin + 1, ymin - th), (xmin + tw + 1, ymin)], fill=color) draw.text((xmin + 1, ymin - th), text, fill=(255, 255, 255)) return image def draw_mask(self, image, masks, threshold, color_list, alpha=0.7): """ Draw mask on image """ mask_color_id = 0 w_ratio = .4 img_array = np.array(image).astype('float32') for dt in np.array(masks): segm, score = dt['segmentation'], dt['score'] if score < threshold: continue import pycocotools.mask as mask_util mask = mask_util.decode(segm) * 255 color_mask = color_list[mask_color_id % len(color_list), 0:3] mask_color_id += 1 for c in range(3): color_mask[c] = color_mask[c] * (1 - w_ratio) + w_ratio * 255 idx = np.nonzero(mask) img_array[idx[0], idx[1], :] *= 1.0 - alpha img_array[idx[0], idx[1], :] += alpha * color_mask return Image.fromarray(img_array.astype('uint8')) def get_extra_info(self, im, arch, shape, scale): info = [] input_shape = [] im_shape = [] logger.info('The architecture is {}'.format(arch)) if 'YOLO' in arch: im_size = np.array([shape[:2]]).astype('int32') logger.info('Extra info: im_size') info.append(im_size) elif 'SSD' in arch: im_shape = np.array([shape[:2]]).astype('int32') logger.info('Extra info: im_shape') info.append([im_shape]) elif 'RetinaNet' in arch: input_shape.extend(im.shape[2:]) im_info = np.array([input_shape + [scale]]).astype('float32') logger.info('Extra info: im_info') info.append(im_info) elif 'RCNN' in arch: input_shape.extend(im.shape[2:]) im_shape.extend(shape[:2]) im_info = np.array([input_shape + [scale]]).astype('float32') im_shape = np.array([im_shape + [1.]]).astype('float32') logger.info('Extra info: im_info, im_shape') info.append(im_info) info.append(im_shape) else: logger.error( "Unsupported arch: {}, expect YOLO, SSD, RetinaNet and RCNN". format(arch)) return info def offset_to_lengths(self, lod): offset = lod[0] lengths = [offset[i + 1] - offset[i] for i in range(len(offset) - 1)] return [lengths] def bbox2out(self, results, clsid2catid, is_bbox_normalized=False): """ Args: results: request a dict, should include: `bbox`, `im_id`, if is_bbox_normalized=True, also need `im_shape`. clsid2catid: class id to category id map of COCO2017 dataset. is_bbox_normalized: whether or not bbox is normalized. """ xywh_res = [] for t in results: bboxes = t['bbox'][0] lengths = t['bbox'][1][0] if bboxes.shape == (1, 1) or bboxes is None: continue k = 0 for i in range(len(lengths)): num = lengths[i] for j in range(num): dt = bboxes[k] clsid, score, xmin, ymin, xmax, ymax = dt.tolist() catid = (clsid2catid[int(clsid)]) if is_bbox_normalized: xmin, ymin, xmax, ymax = \ self.clip_bbox([xmin, ymin, xmax, ymax]) w = xmax - xmin h = ymax - ymin im_shape = t['im_shape'][0][i].tolist() im_height, im_width = int(im_shape[0]), int(im_shape[1]) xmin *= im_width ymin *= im_height w *= im_width h *= im_height else: w = xmax - xmin + 1 h = ymax - ymin + 1 bbox = [xmin, ymin, w, h] coco_res = { 'category_id': catid, 'bbox': bbox, 'score': score } xywh_res.append(coco_res) k += 1 return xywh_res def get_bbox_result(self, fetch_map, fetch_name, result, conf, clsid2catid): is_bbox_normalized = True if 'SSD' in conf['arch'] else False output = fetch_map[fetch_name] lod = [fetch_map[fetch_name + '.lod']] lengths = self.offset_to_lengths(lod) np_data = np.array(output) result['bbox'] = (np_data, lengths) result['im_id'] = np.array([[0]]) bbox_results = self.bbox2out([result], clsid2catid, is_bbox_normalized) return bbox_results def mask2out(self, results, clsid2catid, resolution, thresh_binarize=0.5): import pycocotools.mask as mask_util scale = (resolution + 2.0) / resolution segm_res = [] for t in results: bboxes = t['bbox'][0] lengths = t['bbox'][1][0] if bboxes.shape == (1, 1) or bboxes is None: continue if len(bboxes.tolist()) == 0: continue masks = t['mask'][0] s = 0 # for each sample for i in range(len(lengths)): num = lengths[i] im_shape = t['im_shape'][i] bbox = bboxes[s:s + num][:, 2:] clsid_scores = bboxes[s:s + num][:, 0:2] mask = masks[s:s + num] s += num im_h = int(im_shape[0]) im_w = int(im_shape[1]) expand_bbox = expand_boxes(bbox, scale) expand_bbox = expand_bbox.astype(np.int32) padded_mask = np.zeros( (resolution + 2, resolution + 2), dtype=np.float32) for j in range(num): xmin, ymin, xmax, ymax = expand_bbox[j].tolist() clsid, score = clsid_scores[j].tolist() clsid = int(clsid) padded_mask[1:-1, 1:-1] = mask[j, clsid, :, :] catid = clsid2catid[clsid] w = xmax - xmin + 1 h = ymax - ymin + 1 w = np.maximum(w, 1) h = np.maximum(h, 1) resized_mask = cv2.resize(padded_mask, (w, h)) resized_mask = np.array( resized_mask > thresh_binarize, dtype=np.uint8) im_mask = np.zeros((im_h, im_w), dtype=np.uint8) x0 = min(max(xmin, 0), im_w) x1 = min(max(xmax + 1, 0), im_w) y0 = min(max(ymin, 0), im_h) y1 = min(max(ymax + 1, 0), im_h) im_mask[y0:y1, x0:x1] = resized_mask[(y0 - ymin):( y1 - ymin), (x0 - xmin):(x1 - xmin)] segm = mask_util.encode( np.array( im_mask[:, :, np.newaxis], order='F'))[0] catid = clsid2catid[clsid] segm['counts'] = segm['counts'].decode('utf8') coco_res = { 'category_id': catid, 'segmentation': segm, 'score': score } segm_res.append(coco_res) return segm_res def get_mask_result(self, fetch_map, fetch_var_names, result, conf, clsid2catid): resolution = conf['mask_resolution'] bbox_out, mask_out = fetch_map[fetch_var_names] lengths = self.offset_to_lengths(bbox_out.lod()) bbox = np.array(bbox_out) mask = np.array(mask_out) result['bbox'] = (bbox, lengths) result['mask'] = (mask, lengths) mask_results = self.mask2out([result], clsid2catid, conf['mask_resolution']) return mask_results def get_category_info(self, with_background, label_list): if label_list[0] != 'background' and with_background: label_list.insert(0, 'background') if label_list[0] == 'background' and not with_background: label_list = label_list[1:] clsid2catid = {i: i for i in range(len(label_list))} catid2name = {i: name for i, name in enumerate(label_list)} return clsid2catid, catid2name def color_map(self, num_classes): color_map = num_classes * [0, 0, 0] for i in range(0, num_classes): j = 0 lab = i while lab: color_map[i * 3] |= (((lab >> 0) & 1) << (7 - j)) color_map[i * 3 + 1] |= (((lab >> 1) & 1) << (7 - j)) color_map[i * 3 + 2] |= (((lab >> 2) & 1) << (7 - j)) j += 1 lab >>= 3 color_map = np.array(color_map).reshape(-1, 3) return color_map def visualize(self, bbox_results, catid2name, num_classes, mask_results=None): image = Image.open(self.infer_img).convert('RGB') color_list = self.color_map(num_classes) image = self.draw_bbox(image, catid2name, bbox_results, 0.5, color_list) if mask_results is not None: image = self.draw_mask(image, mask_results, 0.5, color_list) image_path = os.path.split(self.infer_img)[-1] if not os.path.exists(self.output_dir): os.makedirs(self.output_dir) out_path = os.path.join(self.output_dir, image_path) image.save(out_path, quality=95) logger.info('Save visualize result to {}'.format(out_path)) def preprocess(self, feed_var_names, image_file): self.infer_img = image_file config_path = self.config_path res = {} assert config_path is not None, "Config path: {} des not exist!".format( model_path) with open(config_path) as f: conf = yaml.safe_load(f) img_data = self.Preprocess(image_file, conf['arch'], conf['Preprocess']) if 'SSD' in conf['arch']: img_data, res['im_shape'] = img_data img_data = [img_data] if len(feed_var_names) != len(img_data): raise ValueError( 'the length of feed vars does not equals the length of preprocess of img data, please check your feed dict' ) def processImg(v): np_data = np.array(v[0]) res = np_data return res feed_dict = {k: processImg(v) for k, v in zip(feed_var_names, img_data)} return feed_dict def postprocess(self, fetch_map, fetch_var_names): config_path = self.config_path res = {} with open(config_path) as f: conf = yaml.safe_load(f) if 'SSD' in conf['arch']: img_data, res['im_shape'] = img_data img_data = [img_data] clsid2catid, catid2name = self.get_category_info( conf['with_background'], conf['label_list']) bbox_result = self.get_bbox_result(fetch_map, fetch_var_names[0], res, conf, clsid2catid) mask_result = None if 'mask_resolution' in conf: res['im_shape'] = img_data[-1] mask_result = self.get_mask_result(fetch_map, fetch_var_names, res, conf, clsid2catid) if self.if_visualize: if os.path.isdir(self.output_dir) is False: os.mkdir(self.output_dir) self.visualize(bbox_result, catid2name, len(conf['label_list']), mask_result) if self.if_dump_result: if os.path.isdir(self.output_dir) is False: os.mkdir(self.output_dir) bbox_file = os.path.join(self.output_dir, 'bbox.json') logger.info('dump bbox to {}'.format(bbox_file)) with open(bbox_file, 'w') as f: json.dump(bbox_result, f, indent=4) if mask_result is not None: mask_file = os.path.join(flags.output_dir, 'mask.json') logger.info('dump mask to {}'.format(mask_file)) with open(mask_file, 'w') as f: json.dump(mask_result, f, indent=4)