# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import absolute_import from __future__ import division from __future__ import print_function import os import copy import numpy as np import paddle import paddle.nn.functional as F from ppdet.modeling.bbox_utils import bbox_iou_np_expand from .map_utils import ap_per_class from .metrics import Metric from ppdet.utils.logger import setup_logger logger = setup_logger(__name__) __all__ = ['MOTEvaluator', 'MOTMetric', 'JDEDetMetric'] def read_mot_results(filename, is_gt=False, is_ignore=False): valid_labels = {1} ignore_labels = {2, 7, 8, 12} results_dict = dict() if os.path.isfile(filename): with open(filename, 'r') as f: for line in f.readlines(): linelist = line.split(',') if len(linelist) < 7: continue fid = int(linelist[0]) if fid < 1: continue results_dict.setdefault(fid, list()) box_size = float(linelist[4]) * float(linelist[5]) if is_gt: if 'MOT16-' in filename or 'MOT17-' in filename or 'MOT15-' in filename or 'MOT20-' in filename: label = int(float(linelist[7])) mark = int(float(linelist[6])) if mark == 0 or label not in valid_labels: continue score = 1 elif is_ignore: if 'MOT16-' in filename or 'MOT17-' in filename or 'MOT15-' in filename or 'MOT20-' in filename: label = int(float(linelist[7])) vis_ratio = float(linelist[8]) if label not in ignore_labels and vis_ratio >= 0: continue else: continue score = 1 else: score = float(linelist[6]) tlwh = tuple(map(float, linelist[2:6])) target_id = int(linelist[1]) results_dict[fid].append((tlwh, target_id, score)) return results_dict """ labels={'ped', ... % 1 'person_on_vhcl', ... % 2 'car', ... % 3 'bicycle', ... % 4 'mbike', ... % 5 'non_mot_vhcl', ... % 6 'static_person', ... % 7 'distractor', ... % 8 'occluder', ... % 9 'occluder_on_grnd', ... % 10 'occluder_full', ... % 11 'reflection', ... % 12 'crowd' ... % 13 }; """ def unzip_objs(objs): if len(objs) > 0: tlwhs, ids, scores = zip(*objs) else: tlwhs, ids, scores = [], [], [] tlwhs = np.asarray(tlwhs, dtype=float).reshape(-1, 4) return tlwhs, ids, scores class MOTEvaluator(object): def __init__(self, data_root, seq_name, data_type): self.data_root = data_root self.seq_name = seq_name self.data_type = data_type self.load_annotations() self.reset_accumulator() def load_annotations(self): assert self.data_type == 'mot' gt_filename = os.path.join(self.data_root, self.seq_name, 'gt', 'gt.txt') self.gt_frame_dict = read_mot_results(gt_filename, is_gt=True) self.gt_ignore_frame_dict = read_mot_results( gt_filename, is_ignore=True) def reset_accumulator(self): import motmetrics as mm mm.lap.default_solver = 'lap' self.acc = mm.MOTAccumulator(auto_id=True) def eval_frame(self, frame_id, trk_tlwhs, trk_ids, rtn_events=False): import motmetrics as mm mm.lap.default_solver = 'lap' # results trk_tlwhs = np.copy(trk_tlwhs) trk_ids = np.copy(trk_ids) # gts gt_objs = self.gt_frame_dict.get(frame_id, []) gt_tlwhs, gt_ids = unzip_objs(gt_objs)[:2] # ignore boxes ignore_objs = self.gt_ignore_frame_dict.get(frame_id, []) ignore_tlwhs = unzip_objs(ignore_objs)[0] # remove ignored results keep = np.ones(len(trk_tlwhs), dtype=bool) iou_distance = mm.distances.iou_matrix( ignore_tlwhs, trk_tlwhs, max_iou=0.5) if len(iou_distance) > 0: match_is, match_js = mm.lap.linear_sum_assignment(iou_distance) match_is, match_js = map(lambda a: np.asarray(a, dtype=int), [match_is, match_js]) match_ious = iou_distance[match_is, match_js] match_js = np.asarray(match_js, dtype=int) match_js = match_js[np.logical_not(np.isnan(match_ious))] keep[match_js] = False trk_tlwhs = trk_tlwhs[keep] trk_ids = trk_ids[keep] # get distance matrix iou_distance = mm.distances.iou_matrix(gt_tlwhs, trk_tlwhs, max_iou=0.5) # acc self.acc.update(gt_ids, trk_ids, iou_distance) if rtn_events and iou_distance.size > 0 and hasattr(self.acc, 'last_mot_events'): events = self.acc.last_mot_events # only supported by https://github.com/longcw/py-motmetrics else: events = None return events def eval_file(self, filename): self.reset_accumulator() result_frame_dict = read_mot_results(filename, is_gt=False) frames = sorted(list(set(result_frame_dict.keys()))) for frame_id in frames: trk_objs = result_frame_dict.get(frame_id, []) trk_tlwhs, trk_ids = unzip_objs(trk_objs)[:2] self.eval_frame(frame_id, trk_tlwhs, trk_ids, rtn_events=False) return self.acc @staticmethod def get_summary(accs, names, metrics=('mota', 'num_switches', 'idp', 'idr', 'idf1', 'precision', 'recall')): import motmetrics as mm mm.lap.default_solver = 'lap' names = copy.deepcopy(names) if metrics is None: metrics = mm.metrics.motchallenge_metrics metrics = copy.deepcopy(metrics) mh = mm.metrics.create() summary = mh.compute_many( accs, metrics=metrics, names=names, generate_overall=True) return summary @staticmethod def save_summary(summary, filename): import pandas as pd writer = pd.ExcelWriter(filename) summary.to_excel(writer) writer.save() class MOTMetric(Metric): def __init__(self, save_summary=False): self.save_summary = save_summary self.MOTEvaluator = MOTEvaluator self.result_root = None self.reset() def reset(self): self.accs = [] self.seqs = [] def update(self, data_root, seq, data_type, result_root, result_filename): evaluator = self.MOTEvaluator(data_root, seq, data_type) self.accs.append(evaluator.eval_file(result_filename)) self.seqs.append(seq) self.result_root = result_root def accumulate(self): import motmetrics as mm import openpyxl metrics = mm.metrics.motchallenge_metrics mh = mm.metrics.create() summary = self.MOTEvaluator.get_summary(self.accs, self.seqs, metrics) self.strsummary = mm.io.render_summary( summary, formatters=mh.formatters, namemap=mm.io.motchallenge_metric_names) if self.save_summary: self.MOTEvaluator.save_summary( summary, os.path.join(self.result_root, 'summary.xlsx')) def log(self): print(self.strsummary) def get_results(self): return self.strsummary class JDEDetMetric(Metric): # Note this detection AP metric is different from COCOMetric or VOCMetric, # and the bboxes coordinates are not scaled to the original image def __init__(self, overlap_thresh=0.5): self.overlap_thresh = overlap_thresh self.reset() def reset(self): self.AP_accum = np.zeros(1) self.AP_accum_count = np.zeros(1) def update(self, inputs, outputs): bboxes = outputs['bbox'][:, 2:].numpy() scores = outputs['bbox'][:, 1].numpy() labels = outputs['bbox'][:, 0].numpy() bbox_lengths = outputs['bbox_num'].numpy() if bboxes.shape[0] == 1 and bboxes.sum() == 0.0: return gt_boxes = inputs['gt_bbox'].numpy()[0] gt_labels = inputs['gt_class'].numpy()[0] if gt_labels.shape[0] == 0: return correct = [] detected = [] for i in range(bboxes.shape[0]): obj_pred = 0 pred_bbox = bboxes[i].reshape(1, 4) # Compute iou with target boxes iou = bbox_iou_np_expand(pred_bbox, gt_boxes, x1y1x2y2=True)[0] # Extract index of largest overlap best_i = np.argmax(iou) # If overlap exceeds threshold and classification is correct mark as correct if iou[best_i] > self.overlap_thresh and obj_pred == gt_labels[ best_i] and best_i not in detected: correct.append(1) detected.append(best_i) else: correct.append(0) # Compute Average Precision (AP) per class target_cls = list(gt_labels.T[0]) AP, AP_class, R, P = ap_per_class( tp=correct, conf=scores, pred_cls=np.zeros_like(scores), target_cls=target_cls) self.AP_accum_count += np.bincount(AP_class, minlength=1) self.AP_accum += np.bincount(AP_class, minlength=1, weights=AP) def accumulate(self): logger.info("Accumulating evaluatation results...") self.map_stat = self.AP_accum[0] / (self.AP_accum_count[0] + 1E-16) def log(self): map_stat = 100. * self.map_stat logger.info("mAP({:.2f}) = {:.2f}%".format(self.overlap_thresh, map_stat)) def get_results(self): return self.map_stat