import os import base64 import cv2 import numpy as np from PIL import Image, ImageDraw import paddle.fluid as fluid def create_inputs(im, im_info): """generate input for different model type Args: im (np.ndarray): image (np.ndarray) im_info (dict): info of image Returns: inputs (dict): input of model """ inputs = {} inputs['image'] = im origin_shape = list(im_info['origin_shape']) resize_shape = list(im_info['resize_shape']) pad_shape = list(im_info['pad_shape']) if im_info['pad_shape'] is not None else list(im_info['resize_shape']) scale_x, scale_y = im_info['scale'] scale = scale_x im_info = np.array([resize_shape + [scale]]).astype('float32') inputs['im_info'] = im_info return inputs def visualize_box_mask(im, results, labels=None, mask_resolution=14, threshold=0.5): """ Args: im (str/np.ndarray): path of image/np.ndarray read by cv2 results (dict): include 'boxes': np.ndarray: shape:[N,6], N: number of box, matix element:[class, score, x_min, y_min, x_max, y_max] MaskRCNN's results include 'masks': np.ndarray: shape:[N, class_num, mask_resolution, mask_resolution] labels (list): labels:['class1', ..., 'classn'] mask_resolution (int): shape of a mask is:[mask_resolution, mask_resolution] threshold (float): Threshold of score. Returns: im (PIL.Image.Image): visualized image """ if not labels: labels = ['background', 'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus', 'train', 'truck', 'boat', 'traffic light', 'fire', 'hydrant', 'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow', 'elephant', 'bear', 'zebra', 'giraffe', 'backpack', 'umbrella', 'handbag', 'tie', 'suitcase', 'frisbee', 'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard', 'tennis racket', 'bottle', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple', 'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair', 'couch', 'potted plant', 'bed', 'dining table', 'toilet', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone', 'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'book', 'clock', 'vase', 'scissors', 'teddy bear', 'hair drier', 'toothbrush'] if isinstance(im, str): im = Image.open(im).convert('RGB') else: im = cv2.cvtColor(im, cv2.COLOR_BGR2RGB) im = Image.fromarray(im) if 'masks' in results and 'boxes' in results: im = draw_mask( im, results['boxes'], results['masks'], labels, resolution=mask_resolution) if 'boxes' in results: im = draw_box(im, results['boxes'], labels) if 'segm' in results: im = draw_segm( im, results['segm'], results['label'], results['score'], labels, threshold=threshold) return im def get_color_map_list(num_classes): """ Args: num_classes (int): number of class Returns: color_map (list): RGB color list """ color_map = num_classes * [0, 0, 0] for i in range(0, num_classes): j = 0 lab = i while lab: color_map[i * 3] |= (((lab >> 0) & 1) << (7 - j)) color_map[i * 3 + 1] |= (((lab >> 1) & 1) << (7 - j)) color_map[i * 3 + 2] |= (((lab >> 2) & 1) << (7 - j)) j += 1 lab >>= 3 color_map = [color_map[i:i + 3] for i in range(0, len(color_map), 3)] return color_map def expand_boxes(boxes, scale=0.0): """ Args: boxes (np.ndarray): shape:[N,4], N:number of box, matix element:[x_min, y_min, x_max, y_max] scale (float): scale of boxes Returns: boxes_exp (np.ndarray): expanded boxes """ w_half = (boxes[:, 2] - boxes[:, 0]) * .5 h_half = (boxes[:, 3] - boxes[:, 1]) * .5 x_c = (boxes[:, 2] + boxes[:, 0]) * .5 y_c = (boxes[:, 3] + boxes[:, 1]) * .5 w_half *= scale h_half *= scale boxes_exp = np.zeros(boxes.shape) boxes_exp[:, 0] = x_c - w_half boxes_exp[:, 2] = x_c + w_half boxes_exp[:, 1] = y_c - h_half boxes_exp[:, 3] = y_c + h_half return boxes_exp def draw_mask(im, np_boxes, np_masks, labels, resolution=14, threshold=0.5): """ Args: im (PIL.Image.Image): PIL image np_boxes (np.ndarray): shape:[N,6], N: number of box, matix element:[class, score, x_min, y_min, x_max, y_max] np_masks (np.ndarray): shape:[N, class_num, resolution, resolution] labels (list): labels:['class1', ..., 'classn'] resolution (int): shape of a mask is:[resolution, resolution] threshold (float): threshold of mask Returns: im (PIL.Image.Image): visualized image """ color_list = get_color_map_list(len(labels)) scale = (resolution + 2.0) / resolution im_w, im_h = im.size w_ratio = 0.4 alpha = 0.7 im = np.array(im).astype('float32') rects = np_boxes[:, 2:] expand_rects = expand_boxes(rects, scale) expand_rects = expand_rects.astype(np.int32) clsid_scores = np_boxes[:, 0:2] padded_mask = np.zeros((resolution + 2, resolution + 2), dtype=np.float32) clsid2color = {} for idx in range(len(np_boxes)): clsid, score = clsid_scores[idx].tolist() clsid = int(clsid) xmin, ymin, xmax, ymax = expand_rects[idx].tolist() w = xmax - xmin + 1 h = ymax - ymin + 1 w = np.maximum(w, 1) h = np.maximum(h, 1) padded_mask[1:-1, 1:-1] = np_masks[idx, int(clsid), :, :] resized_mask = cv2.resize(padded_mask, (w, h)) resized_mask = np.array(resized_mask > threshold, dtype=np.uint8) x0 = min(max(xmin, 0), im_w) x1 = min(max(xmax + 1, 0), im_w) y0 = min(max(ymin, 0), im_h) y1 = min(max(ymax + 1, 0), im_h) im_mask = np.zeros((im_h, im_w), dtype=np.uint8) im_mask[y0:y1, x0:x1] = resized_mask[(y0 - ymin):(y1 - ymin), (x0 - xmin):(x1 - xmin)] if clsid not in clsid2color: clsid2color[clsid] = color_list[clsid] color_mask = clsid2color[clsid] for c in range(3): color_mask[c] = color_mask[c] * (1 - w_ratio) + w_ratio * 255 idx = np.nonzero(im_mask) color_mask = np.array(color_mask) im[idx[0], idx[1], :] *= 1.0 - alpha im[idx[0], idx[1], :] += alpha * color_mask return Image.fromarray(im.astype('uint8')) def draw_box(im, np_boxes, labels): """ Args: im (PIL.Image.Image): PIL image np_boxes (np.ndarray): shape:[N,6], N: number of box, matix element:[class, score, x_min, y_min, x_max, y_max] labels (list): labels:['class1', ..., 'classn'] Returns: im (PIL.Image.Image): visualized image """ draw_thickness = min(im.size) // 320 draw = ImageDraw.Draw(im) clsid2color = {} color_list = get_color_map_list(len(labels)) for dt in np_boxes: clsid, bbox, score = int(dt[0]), dt[2:], dt[1] xmin, ymin, xmax, ymax = bbox w = xmax - xmin h = ymax - ymin if clsid not in clsid2color: clsid2color[clsid] = color_list[clsid] color = tuple(clsid2color[clsid]) # draw bbox draw.line( [(xmin, ymin), (xmin, ymax), (xmax, ymax), (xmax, ymin), (xmin, ymin)], width=draw_thickness, fill=color) # draw label text = "{} {:.4f}".format(labels[clsid], score) tw, th = draw.textsize(text) draw.rectangle( [(xmin + 1, ymin - th), (xmin + tw + 1, ymin)], fill=color) draw.text((xmin + 1, ymin - th), text, fill=(255, 255, 255)) return im def draw_segm(im, np_segms, np_label, np_score, labels, threshold=0.5, alpha=0.7): """ Draw segmentation on image. """ mask_color_id = 0 w_ratio = .4 color_list = get_color_map_list(len(labels)) im = np.array(im).astype('float32') clsid2color = {} np_segms = np_segms.astype(np.uint8) for i in range(np_segms.shape[0]): mask, score, clsid = np_segms[i], np_score[i], np_label[i] + 1 if score < threshold: continue if clsid not in clsid2color: clsid2color[clsid] = color_list[clsid] color_mask = clsid2color[clsid] for c in range(3): color_mask[c] = color_mask[c] * (1 - w_ratio) + w_ratio * 255 idx = np.nonzero(mask) color_mask = np.array(color_mask) im[idx[0], idx[1], :] *= 1.0 - alpha im[idx[0], idx[1], :] += alpha * color_mask sum_x = np.sum(mask, axis=0) x = np.where(sum_x > 0.5)[0] sum_y = np.sum(mask, axis=1) y = np.where(sum_y > 0.5)[0] x0, x1, y0, y1 = x[0], x[-1], y[0], y[-1] cv2.rectangle(im, (x0, y0), (x1, y1), tuple(color_mask.astype('int32').tolist()), 1) bbox_text = '%s %.2f' % (labels[clsid], score) t_size = cv2.getTextSize(bbox_text, 0, 0.3, thickness=1)[0] cv2.rectangle(im, (x0, y0), (x0 + t_size[0], y0 - t_size[1] - 3), tuple(color_mask.astype('int32').tolist()), -1) cv2.putText( im, bbox_text, (x0, y0 - 2), cv2.FONT_HERSHEY_SIMPLEX, 0.3, (0, 0, 0), 1, lineType=cv2.LINE_AA) return Image.fromarray(im.astype('uint8')) def load_predictor(model_dir, run_mode='fluid', batch_size=1, use_gpu=False, min_subgraph_size=3): """set AnalysisConfig, generate AnalysisPredictor Args: model_dir (str): root path of __model__ and __params__ use_gpu (bool): whether use gpu Returns: predictor (PaddlePredictor): AnalysisPredictor Raises: ValueError: predict by TensorRT need use_gpu == True. """ if not use_gpu and not run_mode == 'fluid': raise ValueError( "Predict by TensorRT mode: {}, expect use_gpu==True, but use_gpu == {}" .format(run_mode, use_gpu)) if run_mode == 'trt_int8': raise ValueError("TensorRT int8 mode is not supported now, " "please use trt_fp32 or trt_fp16 instead.") precision_map = { 'trt_int8': fluid.core.AnalysisConfig.Precision.Int8, 'trt_fp32': fluid.core.AnalysisConfig.Precision.Float32, 'trt_fp16': fluid.core.AnalysisConfig.Precision.Half } config = fluid.core.AnalysisConfig( os.path.join(model_dir, '__model__'), os.path.join(model_dir, '__params__')) if use_gpu: # initial GPU memory(M), device ID config.enable_use_gpu(100, 0) # optimize graph and fuse op config.switch_ir_optim(True) else: config.disable_gpu() if run_mode in precision_map.keys(): config.enable_tensorrt_engine( workspace_size=1 << 10, max_batch_size=batch_size, min_subgraph_size=min_subgraph_size, precision_mode=precision_map[run_mode], use_static=False, use_calib_mode=False) # disable print log when predict config.disable_glog_info() # enable shared memory config.enable_memory_optim() # disable feed, fetch OP, needed by zero_copy_run config.switch_use_feed_fetch_ops(False) predictor = fluid.core.create_paddle_predictor(config) return predictor def cv2_to_base64(image: np.ndarray): data = cv2.imencode('.jpg', image)[1] return base64.b64encode(data.tostring()).decode('utf8') def base64_to_cv2(b64str: str): data = base64.b64decode(b64str.encode('utf8')) data = np.fromstring(data, np.uint8) data = cv2.imdecode(data, cv2.IMREAD_COLOR) return data