# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import argparse import time import yaml from PIL import Image import cv2 import numpy as np import paddle.fluid as fluid from visualize import visualize_box_mask def decode_image(im_file, im_info): """read rgb image Args: im_file (str/np.ndarray): path of image/ np.ndarray read by cv2 im_info (dict): info of image Returns: im (np.ndarray): processed image (np.ndarray) im_info (dict): info of processed image """ if isinstance(im_file, str): with open(im_file, 'rb') as f: im_read = f.read() data = np.frombuffer(im_read, dtype='uint8') im = cv2.imdecode(data, 1) # BGR mode, but need RGB mode im = cv2.cvtColor(im, cv2.COLOR_BGR2RGB) im_info['origin_shape'] = im.shape[:2] im_info['resize_shape'] = im.shape[:2] else: im = im_file im_info['origin_shape'] = im.shape[:2] im_info['resize_shape'] = im.shape[:2] return im, im_info class Resize(object): """resize image by target_size and max_size Args: arch (str): model type target_size (int): the target size of image max_size (int): the max size of image use_cv2 (bool): whether us cv2 image_shape (list): input shape of model interp (int): method of resize """ def __init__(self, arch, target_size, max_size, use_cv2=True, image_shape=None, interp=cv2.INTER_LINEAR): self.target_size = target_size self.max_size = max_size self.image_shape = image_shape, self.arch = arch self.use_cv2 = use_cv2 self.interp = interp self.scale_set = {'RCNN', 'RetinaNet'} def __call__(self, im, im_info): """ Args: im (np.ndarray): image (np.ndarray) im_info (dict): info of image Returns: im (np.ndarray): processed image (np.ndarray) im_info (dict): info of processed image """ im_channel = im.shape[2] im_scale_x, im_scale_y = self.generate_scale(im) if self.use_cv2: im = cv2.resize( im, None, None, fx=im_scale_x, fy=im_scale_y, interpolation=self.interp) else: resize_w = int(im_scale_x * float(im.shape[1])) resize_h = int(im_scale_y * float(im.shape[0])) if self.max_size != 0: raise TypeError( 'If you set max_size to cap the maximum size of image,' 'please set use_cv2 to True to resize the image.') im = im.astype('uint8') im = Image.fromarray(im) im = im.resize((int(resize_w), int(resize_h)), self.interp) im = np.array(im) # padding im when image_shape fixed by infer_cfg.yml if self.max_size != 0 and self.image_shape is not None: padding_im = np.zeros( (self.max_size, self.max_size, im_channel), dtype=np.float32) im_h, im_w = im.shape[:2] padding_im[:im_h, :im_w, :] = im im = padding_im if self.arch in self.scale_set: im_info['scale'] = im_scale_x im_info['resize_shape'] = im.shape[:2] return im, im_info def generate_scale(self, im): """ Args: im (np.ndarray): image (np.ndarray) Returns: im_scale_x: the resize ratio of X im_scale_y: the resize ratio of Y """ origin_shape = im.shape[:2] im_c = im.shape[2] if self.max_size != 0 and self.arch in self.scale_set: im_size_min = np.min(origin_shape[0:2]) im_size_max = np.max(origin_shape[0:2]) im_scale = float(self.target_size) / float(im_size_min) if np.round(im_scale * im_size_max) > self.max_size: im_scale = float(self.max_size) / float(im_size_max) im_scale_x = im_scale im_scale_y = im_scale else: im_scale_x = float(self.target_size) / float(origin_shape[1]) im_scale_y = float(self.target_size) / float(origin_shape[0]) return im_scale_x, im_scale_y class Normalize(object): """normalize image Args: mean (list): im - mean std (list): im / std is_scale (bool): whether need im / 255 is_channel_first (bool): if True: image shape is CHW, else: HWC """ def __init__(self, mean, std, is_scale=True, is_channel_first=False): self.mean = mean self.std = std self.is_scale = is_scale self.is_channel_first = is_channel_first def __call__(self, im, im_info): """ Args: im (np.ndarray): image (np.ndarray) im_info (dict): info of image Returns: im (np.ndarray): processed image (np.ndarray) im_info (dict): info of processed image """ im = im.astype(np.float32, copy=False) if self.is_channel_first: mean = np.array(self.mean)[:, np.newaxis, np.newaxis] std = np.array(self.std)[:, np.newaxis, np.newaxis] else: mean = np.array(self.mean)[np.newaxis, np.newaxis, :] std = np.array(self.std)[np.newaxis, np.newaxis, :] if self.is_scale: im = im / 255.0 im -= mean im /= std return im, im_info class Permute(object): """permute image Args: to_bgr (bool): whether convert RGB to BGR channel_first (bool): whether convert HWC to CHW """ def __init__(self, to_bgr=False, channel_first=True): self.to_bgr = to_bgr self.channel_first = channel_first def __call__(self, im, im_info): """ Args: im (np.ndarray): image (np.ndarray) im_info (dict): info of image Returns: im (np.ndarray): processed image (np.ndarray) im_info (dict): info of processed image """ if self.channel_first: im = im.transpose((2, 0, 1)).copy() if self.to_bgr: im = im[[2, 1, 0], :, :] return im, im_info class PadStride(object): """ padding image for model with FPN Args: stride (bool): model with FPN need image shape % stride == 0 """ def __init__(self, stride=0): self.coarsest_stride = stride def __call__(self, im, im_info): """ Args: im (np.ndarray): image (np.ndarray) im_info (dict): info of image Returns: im (np.ndarray): processed image (np.ndarray) im_info (dict): info of processed image """ coarsest_stride = self.coarsest_stride if coarsest_stride == 0: return im im_c, im_h, im_w = im.shape pad_h = int(np.ceil(float(im_h) / coarsest_stride) * coarsest_stride) pad_w = int(np.ceil(float(im_w) / coarsest_stride) * coarsest_stride) padding_im = np.zeros((im_c, pad_h, pad_w), dtype=np.float32) padding_im[:, :im_h, :im_w] = im im_info['resize_shape'] = padding_im.shape[1:] return padding_im, im_info def create_inputs(im, im_info, model_arch='YOLO'): """generate input for different model type Args: im (np.ndarray): image (np.ndarray) im_info (dict): info of image model_arch (str): model type Returns: inputs (dict): input of model """ inputs = {} inputs['image'] = im origin_shape = list(im_info['origin_shape']) resize_shape = list(im_info['resize_shape']) scale = im_info['scale'] if 'YOLO' in model_arch: im_size = np.array([origin_shape]).astype('int32') inputs['im_size'] = im_size elif 'RetinaNet' in model_arch: im_info = np.array([resize_shape + [scale]]).astype('float32') inputs['im_info'] = im_info elif 'RCNN' in model_arch: im_info = np.array([resize_shape + [scale]]).astype('float32') im_shape = np.array([origin_shape + [1.]]).astype('float32') inputs['im_info'] = im_info inputs['im_shape'] = im_shape return inputs class Config(): """set config of preprocess, postprocess and visualize Args: model_dir (str): root path of model.yml """ support_models = ['YOLO', 'SSD', 'RetinaNet', 'RCNN', 'Face'] def __init__(self, model_dir): # parsing Yaml config for Preprocess deploy_file = os.path.join(model_dir, 'infer_cfg.yml') with open(deploy_file) as f: yml_conf = yaml.safe_load(f) self.check_model(yml_conf) self.arch = yml_conf['arch'] self.preprocess_infos = yml_conf['Preprocess'] self.use_python_inference = yml_conf['use_python_inference'] self.min_subgraph_size = yml_conf['min_subgraph_size'] self.labels = yml_conf['label_list'] self.mask_resolution = None if 'mask_resolution' in yml_conf: self.mask_resolution = yml_conf['mask_resolution'] def check_model(self, yml_conf): """ Raises: ValueError: loaded model not in supported model type """ for support_model in self.support_models: if support_model in yml_conf['arch']: return True raise ValueError( "Unsupported arch: {}, expect SSD, YOLO, RetinaNet, RCNN and Face". format(yml_conf['arch'])) def load_predictor(model_dir, run_mode='fluid', batch_size=1, use_gpu=False, min_subgraph_size=3): """set AnalysisConfig, generate AnalysisPredictor Args: model_dir (str): root path of __model__ and __params__ use_gpu (bool): whether use gpu Returns: predictor (PaddlePredictor): AnalysisPredictor Raises: ValueError: predict by TensorRT need use_gpu == True. """ if not use_gpu and not run_mode == 'fluid': raise ValueError( "Predict by TensorRT mode: {}, expect use_gpu==True, but use_gpu == {}" .format(run_mode, use_gpu)) if run_mode == 'trt_int8': raise ValueError("TensorRT int8 mode is not supported now, " "please use trt_fp32 or trt_fp16 instead.") precision_map = { 'trt_fp32': fluid.core.AnalysisConfig.Precision.Float32, 'trt_fp16': fluid.core.AnalysisConfig.Precision.Half } config = fluid.core.AnalysisConfig( os.path.join(model_dir, '__model__'), os.path.join(model_dir, '__params__')) if use_gpu: # initial GPU memory(M), device ID config.enable_use_gpu(100, 0) # optimize graph and fuse op config.switch_ir_optim(True) else: config.disable_gpu() if run_mode in precision_map.keys(): config.enable_tensorrt_engine( workspace_size=1 << 10, max_batch_size=batch_size, min_subgraph_size=min_subgraph_size, precision_mode=precision_map[run_mode], use_static=False, use_calib_mode=False) # disable print log when predict config.disable_glog_info() # enable shared memory config.enable_memory_optim() # disable feed, fetch OP, needed by zero_copy_run config.switch_use_feed_fetch_ops(False) predictor = fluid.core.create_paddle_predictor(config) return predictor def load_executor(model_dir, use_gpu=False): if use_gpu: place = fluid.CUDAPlace(0) else: place = fluid.CPUPlace() exe = fluid.Executor(place) program, feed_names, fetch_targets = fluid.io.load_inference_model( dirname=model_dir, executor=exe, model_filename='__model__', params_filename='__params__') return exe, program, fetch_targets def visualize(image_file, results, labels, mask_resolution=14, output_dir='output/'): # visualize the predict result im = visualize_box_mask( image_file, results, labels, mask_resolution=mask_resolution) img_name = os.path.split(image_file)[-1] if not os.path.exists(output_dir): os.makedirs(output_dir) out_path = os.path.join(output_dir, img_name) im.save(out_path, quality=95) print("save result to: " + out_path) class Detector(): """ Args: model_dir (str): root path of __model__, __params__ and infer_cfg.yml use_gpu (bool): whether use gpu """ def __init__(self, model_dir, use_gpu=False, run_mode='fluid', threshold=0.5): self.config = Config(model_dir) if self.config.use_python_inference: self.executor, self.program, self.fecth_targets = load_executor( model_dir, use_gpu=use_gpu) else: self.predictor = load_predictor( model_dir, run_mode=run_mode, min_subgraph_size=self.config.min_subgraph_size, use_gpu=use_gpu) self.preprocess_ops = [] for op_info in self.config.preprocess_infos: op_type = op_info.pop('type') if op_type == 'Resize': op_info['arch'] = self.config.arch self.preprocess_ops.append(eval(op_type)(**op_info)) def preprocess(self, im): # process image by preprocess_ops im_info = { 'scale': 1., 'origin_shape': None, 'resize_shape': None, } im, im_info = decode_image(im, im_info) for operator in self.preprocess_ops: im, im_info = operator(im, im_info) im = np.array((im, )).astype('float32') inputs = create_inputs(im, im_info, self.config.arch) return inputs, im_info def postprocess(self, np_boxes, np_masks, im_info, threshold=0.5): # postprocess output of predictor results = {} if self.config.arch in ['SSD', 'Face']: w, h = im_info['origin_shape'] np_boxes[:, 2] *= h np_boxes[:, 3] *= w np_boxes[:, 4] *= h np_boxes[:, 5] *= w expect_boxes = np_boxes[:, 1] > threshold np_boxes = np_boxes[expect_boxes, :] for box in np_boxes: print('class_id:{:d}, confidence:{:.2f},' 'left_top:[{:.2f},{:.2f}],' ' right_bottom:[{:.2f},{:.2f}]'.format( int(box[0]), box[1], box[2], box[3], box[4], box[5])) results['boxes'] = np_boxes if np_masks is not None: np_masks = np_masks[expect_boxes, :, :, :] results['masks'] = np_masks return results def predict(self, image, threshold=0.5): ''' Args: image (str/np.ndarray): path of image/ np.ndarray read by cv2 threshold (float): threshold of predicted box' score Returns: results (dict): include 'boxes': np.ndarray: shape:[N,6], N: number of box, matix element:[class, score, x_min, y_min, x_max, y_max] MaskRCNN's results include 'masks': np.ndarray: shape:[N, class_num, mask_resolution, mask_resolution] ''' inputs, im_info = self.preprocess(image) np_boxes, np_masks = None, None if self.config.use_python_inference: t1 = time.time() outs = self.executor.run(self.program, feed=inputs, fetch_list=self.fecth_targets, return_numpy=False) t2 = time.time() ms = (t2 - t1) * 1000.0 print("Inference: {} ms per batch image".format(ms)) np_boxes = np.array(outs[0]) if self.config.mask_resolution is not None: np_masks = np.array(outs[1]) else: input_names = self.predictor.get_input_names() for i in range(len(inputs)): input_tensor = self.predictor.get_input_tensor(input_names[i]) input_tensor.copy_from_cpu(inputs[input_names[i]]) t1 = time.time() self.predictor.zero_copy_run() t2 = time.time() output_names = self.predictor.get_output_names() boxes_tensor = self.predictor.get_output_tensor(output_names[0]) np_boxes = boxes_tensor.copy_to_cpu() if self.config.mask_resolution is not None: masks_tensor = self.predictor.get_output_tensor(output_names[1]) np_masks = masks_tensor.copy_to_cpu() ms = (t2 - t1) * 1000.0 print("Inference: {} ms per batch image".format(ms)) results = self.postprocess( np_boxes, np_masks, im_info, threshold=threshold) return results def predict_image(): detector = Detector( FLAGS.model_dir, use_gpu=FLAGS.use_gpu, run_mode=FLAGS.run_mode) results = detector.predict(FLAGS.image_file, FLAGS.threshold) visualize( FLAGS.image_file, results, detector.config.labels, mask_resolution=detector.config.mask_resolution, output_dir=FLAGS.output_dir) def predict_video(): detector = Detector( FLAGS.model_dir, use_gpu=FLAGS.use_gpu, run_mode=FLAGS.run_mode) capture = cv2.VideoCapture(FLAGS.video_file) fps = 30 width = int(capture.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(capture.get(cv2.CAP_PROP_FRAME_HEIGHT)) fourcc = cv2.VideoWriter_fourcc(*'mp4v') video_name = os.path.split(FLAGS.video_file)[-1] if not os.path.exists(FLAGS.output_dir): os.makedirs(FLAGS.output_dir) out_path = os.path.join(FLAGS.output_dir, video_name) writer = cv2.VideoWriter(out_path, fourcc, fps, (width, height)) index = 1 while (1): ret, frame = capture.read() if not ret: break print('detect frame:%d' % (index)) index += 1 results = detector.predict(frame, FLAGS.threshold) im = visualize_box_mask( frame, results, detector.config.labels, mask_resolution=detector.config.mask_resolution) im = np.array(im) writer.write(im) writer.release() if __name__ == '__main__': parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--model_dir", type=str, default=None, help=("Directory include:'__model__', '__params__', " "'infer_cfg.yml', created by tools/export_model.py."), required=True) parser.add_argument( "--image_file", type=str, default='', help="Path of image file.") parser.add_argument( "--video_file", type=str, default='', help="Path of video file.") parser.add_argument( "--run_mode", type=str, default='fluid', help="mode of running(fluid/trt_fp32/trt_fp16)") parser.add_argument( "--use_gpu", default=False, help="Whether to predict with GPU.") parser.add_argument( "--threshold", type=float, default=0.5, help="Threshold of score.") parser.add_argument( "--output_dir", type=str, default="output", help="Directory of output visualization files.") FLAGS = parser.parse_args() if FLAGS.image_file != '' and FLAGS.video_file != '': assert "Cannot predict image and video at the same time" if FLAGS.image_file != '': predict_image() if FLAGS.video_file != '': predict_video()