# Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ https://github.com/LCFractal/AIC21-MTMC/tree/main/reid/reid-matching/tools """ import os import cv2 import numpy as np from sklearn.cluster import AgglomerativeClustering BBOX_B = 10 / 15 class Zone(object): def __init__(self, zone_path='datasets/zone'): # 0: b 1: g 3: r 123:w # w r not high speed # b g high speed assert zone_path != '', "Error: zone_path is not empty!" zones = {} for img_name in os.listdir(zone_path): camnum = int(img_name.split('.')[0][-3:]) zone_img = cv2.imread(os.path.join(zone_path, img_name)) zones[camnum] = zone_img self.zones = zones self.current_cam = 0 def set_cam(self, cam): self.current_cam = cam def get_zone(self, bbox): cx = int((bbox[0] + bbox[2]) / 2) cy = int((bbox[1] + bbox[3]) / 2) pix = self.zones[self.current_cam][max(cy - 1, 0), max(cx - 1, 0), :] zone_num = 0 if pix[0] > 50 and pix[1] > 50 and pix[2] > 50: # w zone_num = 1 if pix[0] < 50 and pix[1] < 50 and pix[2] > 50: # r zone_num = 2 if pix[0] < 50 and pix[1] > 50 and pix[2] < 50: # g zone_num = 3 if pix[0] > 50 and pix[1] < 50 and pix[2] < 50: # b zone_num = 4 return zone_num def is_ignore(self, zone_list, frame_list, cid): # 0 not in any corssroad, 1 white 2 red 3 green 4 bule zs, ze = zone_list[0], zone_list[-1] fs, fe = frame_list[0], frame_list[-1] if zs == ze: # if always on one section, excluding if ze in [1, 2]: return 2 if zs != 0 and 0 in zone_list: return 0 if fe - fs > 1500: return 2 if fs < 2: if cid in [45]: if ze in [3, 4]: return 1 else: return 2 if fe > 1999: if cid in [41]: if ze not in [3]: return 2 else: return 0 if fs < 2 or fe > 1999: if ze in [3, 4]: return 0 if ze in [3, 4]: return 1 return 2 else: # if camera section change if cid in [41, 42, 43, 44, 45, 46]: # come from road extension, exclusing if zs == 1 and ze == 2: return 2 if zs == 2 and ze == 1: return 2 if cid in [41]: # On 41 camera, no vehicle come into 42 camera if (zs in [1, 2]) and ze == 4: return 2 if zs == 4 and (ze in [1, 2]): return 2 if cid in [46]: # On 46 camera,no vehicle come into 45 if (zs in [1, 2]) and ze == 3: return 2 if zs == 3 and (ze in [1, 2]): return 2 return 0 def filter_mot(self, mot_list, cid): new_mot_list = dict() sub_mot_list = dict() for tracklet in mot_list: tracklet_dict = mot_list[tracklet] frame_list = list(tracklet_dict.keys()) frame_list.sort() zone_list = [] for f in frame_list: zone_list.append(tracklet_dict[f]['zone']) if self.is_ignore(zone_list, frame_list, cid) == 0: new_mot_list[tracklet] = tracklet_dict if self.is_ignore(zone_list, frame_list, cid) == 1: sub_mot_list[tracklet] = tracklet_dict return new_mot_list def filter_bbox(self, mot_list, cid): new_mot_list = dict() yh = self.zones[cid].shape[0] for tracklet in mot_list: tracklet_dict = mot_list[tracklet] frame_list = list(tracklet_dict.keys()) frame_list.sort() bbox_list = [] for f in frame_list: bbox_list.append(tracklet_dict[f]['bbox']) bbox_x = [b[0] for b in bbox_list] bbox_y = [b[1] for b in bbox_list] bbox_w = [b[2] - b[0] for b in bbox_list] bbox_h = [b[3] - b[1] for b in bbox_list] new_frame_list = list() if 0 in bbox_x or 0 in bbox_y: b0 = [ i for i, f in enumerate(frame_list) if bbox_x[i] < 5 or bbox_y[i] + bbox_h[i] > yh - 5 ] if len(b0) == len(frame_list): if cid in [41, 42, 44, 45, 46]: continue max_w = max(bbox_w) max_h = max(bbox_h) for i, f in enumerate(frame_list): if bbox_w[i] > max_w * BBOX_B and bbox_h[ i] > max_h * BBOX_B: new_frame_list.append(f) else: l_i, r_i = 0, len(frame_list) - 1 if len(b0) == 0: continue if b0[0] == 0: for i in range(len(b0) - 1): if b0[i] + 1 == b0[i + 1]: l_i = b0[i + 1] else: break if b0[-1] == len(frame_list) - 1: for i in range(len(b0) - 1): i = len(b0) - 1 - i if b0[i] - 1 == b0[i - 1]: r_i = b0[i - 1] else: break max_lw, max_lh = bbox_w[l_i], bbox_h[l_i] max_rw, max_rh = bbox_w[r_i], bbox_h[r_i] for i, f in enumerate(frame_list): if i < l_i: if bbox_w[i] > max_lw * BBOX_B and bbox_h[ i] > max_lh * BBOX_B: new_frame_list.append(f) elif i > r_i: if bbox_w[i] > max_rw * BBOX_B and bbox_h[ i] > max_rh * BBOX_B: new_frame_list.append(f) else: new_frame_list.append(f) new_tracklet_dict = dict() for f in new_frame_list: new_tracklet_dict[f] = tracklet_dict[f] new_mot_list[tracklet] = new_tracklet_dict else: new_mot_list[tracklet] = tracklet_dict return new_mot_list def break_mot(self, mot_list, cid): new_mot_list = dict() new_num_tracklets = max(mot_list) + 1 for tracklet in mot_list: tracklet_dict = mot_list[tracklet] frame_list = list(tracklet_dict.keys()) frame_list.sort() zone_list = [] back_tracklet = False new_zone_f = 0 pre_frame = frame_list[0] time_break = False for f in frame_list: if f - pre_frame > 100: if cid in [44, 45]: time_break = True break if not cid in [41, 44, 45, 46]: break pre_frame = f new_zone = tracklet_dict[f]['zone'] if len(zone_list) > 0 and zone_list[-1] == new_zone: continue if new_zone_f > 1: if len(zone_list) > 1 and new_zone in zone_list: back_tracklet = True zone_list.append(new_zone) new_zone_f = 0 else: new_zone_f += 1 if back_tracklet: new_tracklet_dict = dict() pre_bbox = -1 pre_arrow = 0 have_break = False for f in frame_list: now_bbox = tracklet_dict[f]['bbox'] if type(pre_bbox) == int: if pre_bbox == -1: pre_bbox = now_bbox now_arrow = now_bbox[0] - pre_bbox[0] if pre_arrow * now_arrow < 0 and len( new_tracklet_dict) > 15 and not have_break: new_mot_list[tracklet] = new_tracklet_dict new_tracklet_dict = dict() have_break = True if have_break: tracklet_dict[f]['id'] = new_num_tracklets new_tracklet_dict[f] = tracklet_dict[f] pre_bbox, pre_arrow = now_bbox, now_arrow if have_break: new_mot_list[new_num_tracklets] = new_tracklet_dict new_num_tracklets += 1 else: new_mot_list[tracklet] = new_tracklet_dict elif time_break: new_tracklet_dict = dict() have_break = False pre_frame = frame_list[0] for f in frame_list: if f - pre_frame > 100: new_mot_list[tracklet] = new_tracklet_dict new_tracklet_dict = dict() have_break = True new_tracklet_dict[f] = tracklet_dict[f] pre_frame = f if have_break: new_mot_list[new_num_tracklets] = new_tracklet_dict new_num_tracklets += 1 else: new_mot_list[tracklet] = new_tracklet_dict else: new_mot_list[tracklet] = tracklet_dict return new_mot_list def intra_matching(self, mot_list, sub_mot_list): sub_zone_dict = dict() new_mot_list = dict() new_mot_list, new_sub_mot_list = self.do_intra_matching2(mot_list, sub_mot_list) return new_mot_list def do_intra_matching2(self, mot_list, sub_list): new_zone_dict = dict() def get_trac_info(tracklet1): t1_f = list(tracklet1) t1_f.sort() t1_fs = t1_f[0] t1_fe = t1_f[-1] t1_zs = tracklet1[t1_fs]['zone'] t1_ze = tracklet1[t1_fe]['zone'] t1_boxs = tracklet1[t1_fs]['bbox'] t1_boxe = tracklet1[t1_fe]['bbox'] t1_boxs = [(t1_boxs[2] + t1_boxs[0]) / 2, (t1_boxs[3] + t1_boxs[1]) / 2] t1_boxe = [(t1_boxe[2] + t1_boxe[0]) / 2, (t1_boxe[3] + t1_boxe[1]) / 2] return t1_fs, t1_fe, t1_zs, t1_ze, t1_boxs, t1_boxe for t1id in sub_list: tracklet1 = sub_list[t1id] if tracklet1 == -1: continue t1_fs, t1_fe, t1_zs, t1_ze, t1_boxs, t1_boxe = get_trac_info( tracklet1) sim_dict = dict() for t2id in mot_list: tracklet2 = mot_list[t2id] t2_fs, t2_fe, t2_zs, t2_ze, t2_boxs, t2_boxe = get_trac_info( tracklet2) if t1_ze == t2_zs: if abs(t2_fs - t1_fe) < 5 and abs(t2_boxe[0] - t1_boxs[ 0]) < 50 and abs(t2_boxe[1] - t1_boxs[1]) < 50: t1_feat = tracklet1[t1_fe]['feat'] t2_feat = tracklet2[t2_fs]['feat'] sim_dict[t2id] = np.matmul(t1_feat, t2_feat) if t1_zs == t2_ze: if abs(t2_fe - t1_fs) < 5 and abs(t2_boxs[0] - t1_boxe[ 0]) < 50 and abs(t2_boxs[1] - t1_boxe[1]) < 50: t1_feat = tracklet1[t1_fs]['feat'] t2_feat = tracklet2[t2_fe]['feat'] sim_dict[t2id] = np.matmul(t1_feat, t2_feat) if len(sim_dict) > 0: max_sim = 0 max_id = 0 for t2id in sim_dict: if sim_dict[t2id] > max_sim: sim_dict[t2id] = max_sim max_id = t2id if max_sim > 0.5: t2 = mot_list[max_id] for t1f in tracklet1: if t1f not in t2: tracklet1[t1f]['id'] = max_id t2[t1f] = tracklet1[t1f] mot_list[max_id] = t2 sub_list[t1id] = -1 return mot_list, sub_list def do_intra_matching(self, sub_zone_dict, sub_zone): new_zone_dict = dict() id_list = list(sub_zone_dict) id2index = dict() for index, id in enumerate(id_list): id2index[id] = index def get_trac_info(tracklet1): t1_f = list(tracklet1) t1_f.sort() t1_fs = t1_f[0] t1_fe = t1_f[-1] t1_zs = tracklet1[t1_fs]['zone'] t1_ze = tracklet1[t1_fe]['zone'] t1_boxs = tracklet1[t1_fs]['bbox'] t1_boxe = tracklet1[t1_fe]['bbox'] t1_boxs = [(t1_boxs[2] + t1_boxs[0]) / 2, (t1_boxs[3] + t1_boxs[1]) / 2] t1_boxe = [(t1_boxe[2] + t1_boxe[0]) / 2, (t1_boxe[3] + t1_boxe[1]) / 2] return t1_fs, t1_fe, t1_zs, t1_ze, t1_boxs, t1_boxe sim_matrix = np.zeros([len(id_list), len(id_list)]) for t1id in sub_zone_dict: tracklet1 = sub_zone_dict[t1id] t1_fs, t1_fe, t1_zs, t1_ze, t1_boxs, t1_boxe = get_trac_info( tracklet1) t1_feat = tracklet1[t1_fe]['feat'] for t2id in sub_zone_dict: if t1id == t2id: continue tracklet2 = sub_zone_dict[t2id] t2_fs, t2_fe, t2_zs, t2_ze, t2_boxs, t2_boxe = get_trac_info( tracklet2) if t1_zs != t1_ze and t2_ze != t2_zs or t1_fe > t2_fs: continue if abs(t1_boxe[0] - t2_boxs[0]) > 50 or abs(t1_boxe[1] - t2_boxs[1]) > 50: continue if t2_fs - t1_fe > 5: continue t2_feat = tracklet2[t2_fs]['feat'] sim_matrix[id2index[t1id], id2index[t2id]] = np.matmul(t1_feat, t2_feat) sim_matrix[id2index[t2id], id2index[t1id]] = np.matmul(t1_feat, t2_feat) sim_matrix = 1 - sim_matrix cluster_labels = AgglomerativeClustering( n_clusters=None, distance_threshold=0.7, affinity='precomputed', linkage='complete').fit_predict(sim_matrix) new_zone_dict = dict() label2id = dict() for index, label in enumerate(cluster_labels): tracklet = sub_zone_dict[id_list[index]] if label not in label2id: new_id = tracklet[list(tracklet)[0]] new_tracklet = dict() else: new_id = label2id[label] new_tracklet = new_zone_dict[label2id[label]] for tf in tracklet: tracklet[tf]['id'] = new_id new_tracklet[tf] = tracklet[tf] new_zone_dict[label] = new_tracklet return new_zone_dict