提交 355bec54 编写于 作者: Eric.Lee2021's avatar Eric.Lee2021 🚴🏻

create light pose

上级 eb84acb7
# light pose
人体关键点检测
* step 1: python prepare_train_labels.py
* step 2: python make_val_subset.py
* step 3: python train.py
* step 4: python inference_video.py
import copy
import json
import math
import os
import pickle
import cv2
import numpy as np
import pycocotools
from torch.utils.data.dataset import Dataset
BODY_PARTS_KPT_IDS = [[1, 8], [8, 9], [9, 10], [1, 11], [11, 12], [12, 13], [1, 2], [2, 3], [3, 4], [2, 16],
[1, 5], [5, 6], [6, 7], [5, 17], [1, 0], [0, 14], [0, 15], [14, 16], [15, 17]]
def get_mask(segmentations, mask):
for segmentation in segmentations:
rle = pycocotools.mask.frPyObjects(segmentation, mask.shape[0], mask.shape[1])
mask[pycocotools.mask.decode(rle) > 0.5] = 0
return mask
class CocoTrainDataset(Dataset):
def __init__(self, labels, images_folder, stride, sigma, paf_thickness, transform=None):
super().__init__()
self._images_folder = images_folder
self._stride = stride
self._sigma = sigma
self._paf_thickness = paf_thickness
self._transform = transform
with open(labels, 'rb') as f:
self._labels = pickle.load(f)
def __getitem__(self, idx):
label = copy.deepcopy(self._labels[idx]) # label modified in transform
image = cv2.imread(os.path.join(self._images_folder, label['img_paths']), cv2.IMREAD_COLOR)
mask = np.ones(shape=(label['img_height'], label['img_width']), dtype=np.float32)
mask = get_mask(label['segmentations'], mask)
sample = {
'label': label,
'image': image,
'mask': mask
}
if self._transform:
sample = self._transform(sample)
mask = cv2.resize(sample['mask'], dsize=None, fx=1/self._stride, fy=1/self._stride, interpolation=cv2.INTER_AREA)
keypoint_maps = self._generate_keypoint_maps(sample)
sample['keypoint_maps'] = keypoint_maps
keypoint_mask = np.zeros(shape=keypoint_maps.shape, dtype=np.float32)
for idx in range(keypoint_mask.shape[0]):
keypoint_mask[idx] = mask
sample['keypoint_mask'] = keypoint_mask
paf_maps = self._generate_paf_maps(sample)
sample['paf_maps'] = paf_maps
paf_mask = np.zeros(shape=paf_maps.shape, dtype=np.float32)
for idx in range(paf_mask.shape[0]):
paf_mask[idx] = mask
sample['paf_mask'] = paf_mask
image = sample['image'].astype(np.float32)
image = (image - 128) / 256
sample['image'] = image.transpose((2, 0, 1))
del sample['label']
return sample
def __len__(self):
return len(self._labels)
def _generate_keypoint_maps(self, sample):
n_keypoints = 18
n_rows, n_cols, _ = sample['image'].shape
keypoint_maps = np.zeros(shape=(n_keypoints + 1,
n_rows // self._stride, n_cols // self._stride), dtype=np.float32) # +1 for bg
label = sample['label']
for keypoint_idx in range(n_keypoints):
keypoint = label['keypoints'][keypoint_idx]
if keypoint[2] <= 1:
self._add_gaussian(keypoint_maps[keypoint_idx], keypoint[0], keypoint[1], self._stride, self._sigma)
for another_annotation in label['processed_other_annotations']:
keypoint = another_annotation['keypoints'][keypoint_idx]
if keypoint[2] <= 1:
self._add_gaussian(keypoint_maps[keypoint_idx], keypoint[0], keypoint[1], self._stride, self._sigma)
keypoint_maps[-1] = 1 - keypoint_maps.max(axis=0)
return keypoint_maps
def _add_gaussian(self, keypoint_map, x, y, stride, sigma):
n_sigma = 4
tl = [int(x - n_sigma * sigma), int(y - n_sigma * sigma)]
tl[0] = max(tl[0], 0)
tl[1] = max(tl[1], 0)
br = [int(x + n_sigma * sigma), int(y + n_sigma * sigma)]
map_h, map_w = keypoint_map.shape
br[0] = min(br[0], map_w * stride)
br[1] = min(br[1], map_h * stride)
shift = stride / 2 - 0.5
for map_y in range(tl[1] // stride, br[1] // stride):
for map_x in range(tl[0] // stride, br[0] // stride):
d2 = (map_x * stride + shift - x) * (map_x * stride + shift - x) + \
(map_y * stride + shift - y) * (map_y * stride + shift - y)
exponent = d2 / 2 / sigma / sigma
if exponent > 4.6052: # threshold, ln(100), ~0.01
continue
keypoint_map[map_y, map_x] += math.exp(-exponent)
if keypoint_map[map_y, map_x] > 1:
keypoint_map[map_y, map_x] = 1
def _generate_paf_maps(self, sample):
n_pafs = len(BODY_PARTS_KPT_IDS)
n_rows, n_cols, _ = sample['image'].shape
paf_maps = np.zeros(shape=(n_pafs * 2, n_rows // self._stride, n_cols // self._stride), dtype=np.float32)
label = sample['label']
for paf_idx in range(n_pafs):
keypoint_a = label['keypoints'][BODY_PARTS_KPT_IDS[paf_idx][0]]
keypoint_b = label['keypoints'][BODY_PARTS_KPT_IDS[paf_idx][1]]
if keypoint_a[2] <= 1 and keypoint_b[2] <= 1:
self._set_paf(paf_maps[paf_idx * 2:paf_idx * 2 + 2],
keypoint_a[0], keypoint_a[1], keypoint_b[0], keypoint_b[1],
self._stride, self._paf_thickness)
for another_annotation in label['processed_other_annotations']:
keypoint_a = another_annotation['keypoints'][BODY_PARTS_KPT_IDS[paf_idx][0]]
keypoint_b = another_annotation['keypoints'][BODY_PARTS_KPT_IDS[paf_idx][1]]
if keypoint_a[2] <= 1 and keypoint_b[2] <= 1:
self._set_paf(paf_maps[paf_idx * 2:paf_idx * 2 + 2],
keypoint_a[0], keypoint_a[1], keypoint_b[0], keypoint_b[1],
self._stride, self._paf_thickness)
return paf_maps
def _set_paf(self, paf_map, x_a, y_a, x_b, y_b, stride, thickness):
x_a /= stride
y_a /= stride
x_b /= stride
y_b /= stride
x_ba = x_b - x_a
y_ba = y_b - y_a
_, h_map, w_map = paf_map.shape
x_min = int(max(min(x_a, x_b) - thickness, 0))
x_max = int(min(max(x_a, x_b) + thickness, w_map))
y_min = int(max(min(y_a, y_b) - thickness, 0))
y_max = int(min(max(y_a, y_b) + thickness, h_map))
norm_ba = (x_ba * x_ba + y_ba * y_ba) ** 0.5
if norm_ba < 1e-7: # Same points, no paf
return
x_ba /= norm_ba
y_ba /= norm_ba
for y in range(y_min, y_max):
for x in range(x_min, x_max):
x_ca = x - x_a
y_ca = y - y_a
d = math.fabs(x_ca * y_ba - y_ca * x_ba)
if d <= thickness:
paf_map[0, y, x] = x_ba
paf_map[1, y, x] = y_ba
class CocoValDataset(Dataset):
def __init__(self, labels, images_folder):
super().__init__()
with open(labels, 'r') as f:
self._labels = json.load(f)
self._images_folder = images_folder
def __getitem__(self, idx):
file_name = self._labels['images'][idx]['file_name']
img = cv2.imread(os.path.join(self._images_folder, file_name), cv2.IMREAD_COLOR)
return {
'img': img,
'file_name': file_name
}
def __len__(self):
return len(self._labels['images'])
import random
import cv2
import numpy as np
class ConvertKeypoints:
def __call__(self, sample):
label = sample['label']
h, w, _ = sample['image'].shape
keypoints = label['keypoints']
for keypoint in keypoints: # keypoint[2] == 0: occluded, == 1: visible, == 2: not in image
if keypoint[0] == keypoint[1] == 0:
keypoint[2] = 2
if (keypoint[0] < 0
or keypoint[0] >= w
or keypoint[1] < 0
or keypoint[1] >= h):
keypoint[2] = 2
for other_label in label['processed_other_annotations']:
keypoints = other_label['keypoints']
for keypoint in keypoints:
if keypoint[0] == keypoint[1] == 0:
keypoint[2] = 2
if (keypoint[0] < 0
or keypoint[0] >= w
or keypoint[1] < 0
or keypoint[1] >= h):
keypoint[2] = 2
label['keypoints'] = self._convert(label['keypoints'], w, h)
for other_label in label['processed_other_annotations']:
other_label['keypoints'] = self._convert(other_label['keypoints'], w, h)
return sample
def _convert(self, keypoints, w, h):
# Nose, Neck, R hand, L hand, R leg, L leg, Eyes, Ears
reorder_map = [1, 7, 9, 11, 6, 8, 10, 13, 15, 17, 12, 14, 16, 3, 2, 5, 4]
converted_keypoints = list(keypoints[i - 1] for i in reorder_map)
converted_keypoints.insert(1, [(keypoints[5][0] + keypoints[6][0]) / 2,
(keypoints[5][1] + keypoints[6][1]) / 2, 0]) # Add neck as a mean of shoulders
if keypoints[5][2] == 2 or keypoints[6][2] == 2:
converted_keypoints[1][2] = 2
elif keypoints[5][2] == 1 and keypoints[6][2] == 1:
converted_keypoints[1][2] = 1
if (converted_keypoints[1][0] < 0
or converted_keypoints[1][0] >= w
or converted_keypoints[1][1] < 0
or converted_keypoints[1][1] >= h):
converted_keypoints[1][2] = 2
return converted_keypoints
class Scale:
def __init__(self, prob=1, min_scale=0.5, max_scale=1.1, target_dist=0.6):
self._prob = prob
self._min_scale = min_scale
self._max_scale = max_scale
self._target_dist = target_dist
def __call__(self, sample):
prob = random.random()
scale_multiplier = 1
if prob <= self._prob:
prob = random.random()
scale_multiplier = (self._max_scale - self._min_scale) * prob + self._min_scale
label = sample['label']
scale_abs = self._target_dist / label['scale_provided']
scale = scale_abs * scale_multiplier
sample['image'] = cv2.resize(sample['image'], dsize=(0, 0), fx=scale, fy=scale)
label['img_height'], label['img_width'], _ = sample['image'].shape
sample['mask'] = cv2.resize(sample['mask'], dsize=(0, 0), fx=scale, fy=scale)
label['objpos'][0] *= scale
label['objpos'][1] *= scale
for keypoint in sample['label']['keypoints']:
keypoint[0] *= scale
keypoint[1] *= scale
for other_annotation in sample['label']['processed_other_annotations']:
other_annotation['objpos'][0] *= scale
other_annotation['objpos'][1] *= scale
for keypoint in other_annotation['keypoints']:
keypoint[0] *= scale
keypoint[1] *= scale
return sample
class Rotate:
def __init__(self, pad, max_rotate_degree=40):
self._pad = pad
self._max_rotate_degree = max_rotate_degree
def __call__(self, sample):
prob = random.random()
degree = (prob - 0.5) * 2 * self._max_rotate_degree
h, w, _ = sample['image'].shape
img_center = (w / 2, h / 2)
R = cv2.getRotationMatrix2D(img_center, degree, 1)
abs_cos = abs(R[0, 0])
abs_sin = abs(R[0, 1])
bound_w = int(h * abs_sin + w * abs_cos)
bound_h = int(h * abs_cos + w * abs_sin)
dsize = (bound_w, bound_h)
R[0, 2] += dsize[0] / 2 - img_center[0]
R[1, 2] += dsize[1] / 2 - img_center[1]
sample['image'] = cv2.warpAffine(sample['image'], R, dsize=dsize,
borderMode=cv2.BORDER_CONSTANT, borderValue=self._pad)
sample['label']['img_height'], sample['label']['img_width'], _ = sample['image'].shape
sample['mask'] = cv2.warpAffine(sample['mask'], R, dsize=dsize,
borderMode=cv2.BORDER_CONSTANT, borderValue=(1, 1, 1)) # border is ok
label = sample['label']
label['objpos'] = self._rotate(label['objpos'], R)
for keypoint in label['keypoints']:
point = [keypoint[0], keypoint[1]]
point = self._rotate(point, R)
keypoint[0], keypoint[1] = point[0], point[1]
for other_annotation in label['processed_other_annotations']:
for keypoint in other_annotation['keypoints']:
point = [keypoint[0], keypoint[1]]
point = self._rotate(point, R)
keypoint[0], keypoint[1] = point[0], point[1]
return sample
def _rotate(self, point, R):
return [R[0, 0] * point[0] + R[0, 1] * point[1] + R[0, 2],
R[1, 0] * point[0] + R[1, 1] * point[1] + R[1, 2]]
class CropPad:
def __init__(self, pad, center_perterb_max=40, crop_x=368, crop_y=368):
self._pad = pad
self._center_perterb_max = center_perterb_max
self._crop_x = crop_x
self._crop_y = crop_y
def __call__(self, sample):
prob_x = random.random()
prob_y = random.random()
offset_x = int((prob_x - 0.5) * 2 * self._center_perterb_max)
offset_y = int((prob_y - 0.5) * 2 * self._center_perterb_max)
label = sample['label']
shifted_center = (label['objpos'][0] + offset_x, label['objpos'][1] + offset_y)
offset_left = -int(shifted_center[0] - self._crop_x / 2)
offset_up = -int(shifted_center[1] - self._crop_y / 2)
cropped_image = np.empty(shape=(self._crop_y, self._crop_x, 3), dtype=np.uint8)
for i in range(3):
cropped_image[:, :, i].fill(self._pad[i])
cropped_mask = np.empty(shape=(self._crop_y, self._crop_x), dtype=np.uint8)
cropped_mask.fill(1)
image_x_start = int(shifted_center[0] - self._crop_x / 2)
image_y_start = int(shifted_center[1] - self._crop_y / 2)
image_x_finish = image_x_start + self._crop_x
image_y_finish = image_y_start + self._crop_y
crop_x_start = 0
crop_y_start = 0
crop_x_finish = self._crop_x
crop_y_finish = self._crop_y
w, h = label['img_width'], label['img_height']
should_crop = True
if image_x_start < 0: # Adjust crop area
crop_x_start -= image_x_start
image_x_start = 0
if image_x_start >= w:
should_crop = False
if image_y_start < 0:
crop_y_start -= image_y_start
image_y_start = 0
if image_y_start >= w:
should_crop = False
if image_x_finish > w:
diff = image_x_finish - w
image_x_finish -= diff
crop_x_finish -= diff
if image_x_finish < 0:
should_crop = False
if image_y_finish > h:
diff = image_y_finish - h
image_y_finish -= diff
crop_y_finish -= diff
if image_y_finish < 0:
should_crop = False
if should_crop:
cropped_image[crop_y_start:crop_y_finish, crop_x_start:crop_x_finish, :] =\
sample['image'][image_y_start:image_y_finish, image_x_start:image_x_finish, :]
cropped_mask[crop_y_start:crop_y_finish, crop_x_start:crop_x_finish] =\
sample['mask'][image_y_start:image_y_finish, image_x_start:image_x_finish]
sample['image'] = cropped_image
sample['mask'] = cropped_mask
label['img_width'] = self._crop_x
label['img_height'] = self._crop_y
label['objpos'][0] += offset_left
label['objpos'][1] += offset_up
for keypoint in label['keypoints']:
keypoint[0] += offset_left
keypoint[1] += offset_up
for other_annotation in label['processed_other_annotations']:
for keypoint in other_annotation['keypoints']:
keypoint[0] += offset_left
keypoint[1] += offset_up
return sample
def _inside(self, point, width, height):
if point[0] < 0 or point[1] < 0:
return False
if point[0] >= width or point[1] >= height:
return False
return True
class Flip:
def __init__(self, prob=0.5):
self._prob = prob
def __call__(self, sample):
prob = random.random()
do_flip = prob <= self._prob
if not do_flip:
return sample
sample['image'] = cv2.flip(sample['image'], 1)
sample['mask'] = cv2.flip(sample['mask'], 1)
label = sample['label']
w, h = label['img_width'], label['img_height']
label['objpos'][0] = w - 1 - label['objpos'][0]
for keypoint in label['keypoints']:
keypoint[0] = w - 1 - keypoint[0]
label['keypoints'] = self._swap_left_right(label['keypoints'])
for other_annotation in label['processed_other_annotations']:
other_annotation['objpos'][0] = w - 1 - other_annotation['objpos'][0]
for keypoint in other_annotation['keypoints']:
keypoint[0] = w - 1 - keypoint[0]
other_annotation['keypoints'] = self._swap_left_right(other_annotation['keypoints'])
return sample
def _swap_left_right(self, keypoints):
right = [2, 3, 4, 8, 9, 10, 14, 16]
left = [5, 6, 7, 11, 12, 13, 15, 17]
for r, l in zip(right, left):
keypoints[r], keypoints[l] = keypoints[l], keypoints[r]
return keypoints
#-*-coding:utf-8-*-
# date:2020-09-23
# Author: Eric.Lee
# function: inference pose video
import os
import cv2
import numpy as np
import torch
import time
from models.with_mobilenet import PoseEstimationWithMobileNet
from modules.keypoints import extract_keypoints, group_keypoints
from modules.load_state import load_state
from modules.pose import Pose, track_poses
from val import normalize, pad_width
import random
from modules.keypoints import BODY_PARTS_KPT_IDS, BODY_PARTS_PAF_IDS
def process_data(img, img_size=416):# 图像预处理
img, _, _, _ = letterbox(img, height=img_size)
# Normalize RGB
img = img[:, :, ::-1].transpose(2, 0, 1) # BGR to RGB
img = np.ascontiguousarray(img, dtype=np.float32) # uint8 to float32
img /= 255.0 # 0 - 255 to 0.0 - 1.0
return img
def infer_fast(net, img, net_input_height_size, stride, upsample_ratio, cpu,
pad_value=(0, 0, 0), img_mean=(128, 128, 128), img_scale=1/256):
height, width, _ = img.shape
scale = net_input_height_size / height
scaled_img = cv2.resize(img, (0, 0), fx=scale, fy=scale, interpolation=cv2.INTER_CUBIC)
scaled_img = normalize(scaled_img, img_mean, img_scale)
min_dims = [net_input_height_size, max(scaled_img.shape[1], net_input_height_size)]
padded_img, pad = pad_width(scaled_img, stride, pad_value, min_dims)
tensor_img = torch.from_numpy(padded_img).permute(2, 0, 1).unsqueeze(0).float()
if not cpu:
tensor_img = tensor_img.cuda()
stages_output = net(tensor_img)
stage2_heatmaps = stages_output[-2]
heatmaps = np.transpose(stage2_heatmaps.squeeze().cpu().data.numpy(), (1, 2, 0))
heatmaps = cv2.resize(heatmaps, (0, 0), fx=upsample_ratio, fy=upsample_ratio, interpolation=cv2.INTER_CUBIC)
stage2_pafs = stages_output[-1]
pafs = np.transpose(stage2_pafs.squeeze().cpu().data.numpy(), (1, 2, 0))
pafs = cv2.resize(pafs, (0, 0), fx=upsample_ratio, fy=upsample_ratio, interpolation=cv2.INTER_CUBIC)
return heatmaps, pafs, scale, pad
#---------------------------------------------------------
class light_pose_model(object):
def __init__(self,
model_path='finetune_model/light_pose.pth',
heatmaps_thr = 0.05,
track = 1,
smooth = 1,
):
self.model_path=model_path
self.height_size=256
self.track = track
self.smooth = smooth
self.net = PoseEstimationWithMobileNet()
checkpoint = torch.load(self.model_path, map_location='cpu')
load_state(self.net, checkpoint)
self.net = self.net.eval()
self.net = self.net.cuda()
self.stride = 8
self.upsample_ratio = 4
self.num_keypoints = Pose.num_kpts
self.previous_poses = []
self.dict_id_color = {}
self.heatmaps_thr = heatmaps_thr
def predict(self, img,vis = False):
with torch.no_grad():
heatmaps, pafs, scale, pad = infer_fast(self.net, img, self.height_size, self.stride, self.upsample_ratio, False)
total_keypoints_num = 0
all_keypoints_by_type = []
for kpt_idx in range(self.num_keypoints): # 19th for bg
total_keypoints_num += extract_keypoints(heatmaps[:, :, kpt_idx],self.heatmaps_thr, all_keypoints_by_type, total_keypoints_num)
pose_entries, all_keypoints = group_keypoints(all_keypoints_by_type, pafs, demo=True)
for kpt_id in range(all_keypoints.shape[0]):
all_keypoints[kpt_id, 0] = (all_keypoints[kpt_id, 0] * self.stride / self.upsample_ratio - pad[1]) / scale
all_keypoints[kpt_id, 1] = (all_keypoints[kpt_id, 1] * self.stride / self.upsample_ratio - pad[0]) / scale
current_poses = []
Flag_Pose = False
for n in range(len(pose_entries)):
if len(pose_entries[n]) == 0:
continue
pose_keypoints = np.ones((self.num_keypoints, 2), dtype=np.int32) * -1
for kpt_id in range(self.num_keypoints):
if pose_entries[n][kpt_id] != -1.0: # keypoint was found
pose_keypoints[kpt_id, 0] = int(all_keypoints[int(pose_entries[n][kpt_id]), 0])
pose_keypoints[kpt_id, 1] = int(all_keypoints[int(pose_entries[n][kpt_id]), 1])
pose = Pose(pose_keypoints, pose_entries[n][18])
current_poses.append(pose)
Flag_Pose = True
if Flag_Pose == False:
return None
if self.track:
track_poses(self.previous_poses, current_poses, smooth=self.smooth)
self.previous_poses = current_poses
dict_id_color_r = {}
for id_ in self.dict_id_color.keys():
flag_track = False
for pose in current_poses:
if id_ == pose.id:
flag_track = True
break
if flag_track:
dict_id_color_r[pose.id] = self.dict_id_color[pose.id]
dict_id_color = dict_id_color_r
for pose in current_poses:
if pose.id not in self.dict_id_color.keys():
R_ = random.randint(30,255)
G_ = random.randint(30,255)
B_ = random.randint(30,255)
self.dict_id_color[pose.id] = [[B_,G_,R_],1]
else:
self.dict_id_color[pose.id][1] += 1
pose_dict = {}
pose_dict['data'] = []
for pose in current_poses:
keypoints_list = []
for k in range(pose.keypoints.shape[0]):
keypoints_list.append((float(pose.keypoints[k][0]),float(pose.keypoints[k][1])))
dict_ = {
'bbox':(float(pose.bbox[0]),float(pose.bbox[1]),float(pose.bbox[2]),float(pose.bbox[3])),
'id': str(pose.id),
'keypoints':keypoints_list,
'color':(float(self.dict_id_color[pose.id][0][0]),float(self.dict_id_color[pose.id][0][1]),float(self.dict_id_color[pose.id][0][2])),
}
pose_dict['data'].append(dict_)
if vis:
for pose in pose_dict['data']:
bbox = pose['bbox']
cv2.rectangle(img, (int(bbox[0]), int(bbox[1])),
(int(bbox[0] + bbox[2]), int(bbox[1] + bbox[3])), (255, 255, 0),3)
cv2.putText(img, 'ID: {}'.format(pose['id']), (int(bbox[0]), int(bbox[1]) - 16),
cv2.FONT_HERSHEY_COMPLEX, 0.5, (255, 0, 0),4)
cv2.putText(img, 'ID: {}'.format(pose['id']), (int(bbox[0]), int(bbox[1] - 16)),
cv2.FONT_HERSHEY_COMPLEX, 0.5, (0, 0, 255))
draw_one_pose(img,np.array(pose['keypoints']),(int(pose['color'][0]),int(pose['color'][1]),int(pose['color'][2])))
return pose_dict
def draw_one_pose(img,keypoints,color_x = [255, 0, 0]):
color = [0, 224, 255]
for part_id in range(len(BODY_PARTS_PAF_IDS) - 2):
kpt_a_id = BODY_PARTS_KPT_IDS[part_id][0]
global_kpt_a_id = keypoints[kpt_a_id, 0]
if global_kpt_a_id != -1:
x_a, y_a = keypoints[kpt_a_id]
cv2.circle(img, (int(x_a), int(y_a)), 3, color, -1)
kpt_b_id = BODY_PARTS_KPT_IDS[part_id][1]
global_kpt_b_id = keypoints[kpt_b_id, 0]
if global_kpt_b_id != -1:
x_b, y_b = keypoints[kpt_b_id]
cv2.circle(img, (int(x_b), int(y_b)), 3, color, -1)
if global_kpt_a_id != -1 and global_kpt_b_id != -1:
cv2.line(img, (int(x_a), int(y_a)), (int(x_b), int(y_b)), (255,60,60), 5)
cv2.line(img, (int(x_a), int(y_a)), (int(x_b), int(y_b)), color_x, 2)
if __name__ == '__main__':
video_path = "./video/rw_7.mp4" # 加载视频
# video_path = 0 # 加载相机
model_path = "finetune_model/light_pose.pth"
model_pose = light_pose_model(model_path = model_path,heatmaps_thr = 0.08) # 定义模型推理类
video_capture = cv2.VideoCapture(video_path)
flag_write_video = True # 是否记录推理 demo 视频
print('flag_write_video',flag_write_video)
flag_video_start = False
video_writer = None
while True:
ret, im0 = video_capture.read()
if ret:
if flag_video_start == False and flag_write_video:
loc_time = time.localtime()
str_time = time.strftime("%Y-%m-%d_%H-%M-%S", loc_time)
video_writer = cv2.VideoWriter("./demo/demo_{}.mp4".format(str_time), cv2.VideoWriter_fourcc(*"mp4v"), fps=25, frameSize=(int(im0.shape[1]), int(im0.shape[0])))
flag_video_start = True
pose_dict = model_pose.predict(im0.copy())
if pose_dict is not None:
for pose in pose_dict['data']:
bbox = pose['bbox']
cv2.rectangle(im0, (int(bbox[0]), int(bbox[1])),
(int(bbox[0] + bbox[2]), int(bbox[1] + bbox[3])), (25, 155, 255),2)
cv2.putText(im0, 'ID: {}'.format(pose['id']), (int(bbox[0]), int(bbox[1]) - 16),
cv2.FONT_HERSHEY_COMPLEX, 0.5, (255, 0, 0),4)
cv2.putText(im0, 'ID: {}'.format(pose['id']), (int(bbox[0]), int(bbox[1] - 16)),
cv2.FONT_HERSHEY_COMPLEX, 0.5, (0, 0, 255))
draw_one_pose(im0,np.array(pose['keypoints']),(int(pose['color'][0]),int(pose['color'][1]),int(pose['color'][2])))
cv2.namedWindow('image',0)
cv2.imshow('image',im0)
if flag_write_video and flag_video_start:
video_writer.write(im0)
if cv2.waitKey(1) == 27:
break
cv2.destroyAllWindows()
if flag_write_video:
video_writer.release()
import argparse
import json
import random
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--labels', type=str, default='./coco2017/annotations/person_keypoints_val2017.json', help='path to json with keypoints val labels')
parser.add_argument('--output-name', type=str, default='val_subset.json',
help='name of output file with subset of val labels')
parser.add_argument('--num-images', type=int, default=250, help='number of images in subset')
args = parser.parse_args()
with open(args.labels, 'r') as f:
data = json.load(f)
random.seed(0)
total_val_images = 5000
idxs = list(range(total_val_images))
random.shuffle(idxs)
images_by_id = {}
for idx in idxs[:args.num_images]:
images_by_id[data['images'][idx]['id']] = data['images'][idx]
annotations_by_image_id = {}
for annotation in data['annotations']:
if annotation['image_id'] in images_by_id:
if not annotation['image_id'] in annotations_by_image_id:
annotations_by_image_id[annotation['image_id']] = []
annotations_by_image_id[annotation['image_id']].append(annotation)
subset = {
'info': data['info'],
'licenses': data['licenses'],
'images': [],
'annotations': [],
'categories': data['categories']
}
for image_id, image in images_by_id.items():
subset['images'].append(image)
if image_id in annotations_by_image_id: # image has at least 1 annotation
subset['annotations'].extend(annotations_by_image_id[image_id])
with open(args.output_name, 'w') as f:
json.dump(subset, f, indent=4)
import torch
from torch import nn
from modules.conv import conv, conv_dw, conv_dw_no_bn
class Cpm(nn.Module):
def __init__(self, in_channels, out_channels):
super().__init__()
self.align = conv(in_channels, out_channels, kernel_size=1, padding=0, bn=False)
self.trunk = nn.Sequential(
conv_dw_no_bn(out_channels, out_channels),
conv_dw_no_bn(out_channels, out_channels),
conv_dw_no_bn(out_channels, out_channels)
)
self.conv = conv(out_channels, out_channels, bn=False)
def forward(self, x):
x = self.align(x)
x = self.conv(x + self.trunk(x))
return x
class InitialStage(nn.Module):
def __init__(self, num_channels, num_heatmaps, num_pafs):
super().__init__()
self.trunk = nn.Sequential(
conv(num_channels, num_channels, bn=False),
conv(num_channels, num_channels, bn=False),
conv(num_channels, num_channels, bn=False)
)
self.heatmaps = nn.Sequential(
conv(num_channels, 512, kernel_size=1, padding=0, bn=False),
conv(512, num_heatmaps, kernel_size=1, padding=0, bn=False, relu=False)
)
self.pafs = nn.Sequential(
conv(num_channels, 512, kernel_size=1, padding=0, bn=False),
conv(512, num_pafs, kernel_size=1, padding=0, bn=False, relu=False)
)
def forward(self, x):
trunk_features = self.trunk(x)
heatmaps = self.heatmaps(trunk_features)
pafs = self.pafs(trunk_features)
return [heatmaps, pafs]
class RefinementStageBlock(nn.Module):
def __init__(self, in_channels, out_channels):
super().__init__()
self.initial = conv(in_channels, out_channels, kernel_size=1, padding=0, bn=False)
self.trunk = nn.Sequential(
conv(out_channels, out_channels),
conv(out_channels, out_channels, dilation=2, padding=2)
)
def forward(self, x):
initial_features = self.initial(x)
trunk_features = self.trunk(initial_features)
return initial_features + trunk_features
class RefinementStage(nn.Module):
def __init__(self, in_channels, out_channels, num_heatmaps, num_pafs):
super().__init__()
self.trunk = nn.Sequential(
RefinementStageBlock(in_channels, out_channels),
RefinementStageBlock(out_channels, out_channels),
RefinementStageBlock(out_channels, out_channels),
RefinementStageBlock(out_channels, out_channels),
RefinementStageBlock(out_channels, out_channels)
)
self.heatmaps = nn.Sequential(
conv(out_channels, out_channels, kernel_size=1, padding=0, bn=False),
conv(out_channels, num_heatmaps, kernel_size=1, padding=0, bn=False, relu=False)
)
self.pafs = nn.Sequential(
conv(out_channels, out_channels, kernel_size=1, padding=0, bn=False),
conv(out_channels, num_pafs, kernel_size=1, padding=0, bn=False, relu=False)
)
def forward(self, x):
trunk_features = self.trunk(x)
heatmaps = self.heatmaps(trunk_features)
pafs = self.pafs(trunk_features)
return [heatmaps, pafs]
class PoseEstimationWithMobileNet(nn.Module):
def __init__(self, num_refinement_stages=1, num_channels=128, num_heatmaps=19, num_pafs=38):
super().__init__()
self.model = nn.Sequential(
conv( 3, 32, stride=2, bias=False),
conv_dw( 32, 64),
conv_dw( 64, 128, stride=2),
conv_dw(128, 128),
conv_dw(128, 256, stride=2),
conv_dw(256, 256),
conv_dw(256, 512), # conv4_2
conv_dw(512, 512, dilation=2, padding=2),
conv_dw(512, 512),
conv_dw(512, 512),
conv_dw(512, 512),
conv_dw(512, 512) # conv5_5
)
self.cpm = Cpm(512, num_channels)
self.initial_stage = InitialStage(num_channels, num_heatmaps, num_pafs)
self.refinement_stages = nn.ModuleList()
for idx in range(num_refinement_stages):
self.refinement_stages.append(RefinementStage(num_channels + num_heatmaps + num_pafs, num_channels,
num_heatmaps, num_pafs))
def forward(self, x):
backbone_features = self.model(x)
backbone_features = self.cpm(backbone_features)
stages_output = self.initial_stage(backbone_features)
for refinement_stage in self.refinement_stages:
stages_output.extend(
refinement_stage(torch.cat([backbone_features, stages_output[-2], stages_output[-1]], dim=1)))
return stages_output
from torch import nn
def conv(in_channels, out_channels, kernel_size=3, padding=1, bn=True, dilation=1, stride=1, relu=True, bias=True):
modules = [nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding, dilation, bias=bias)]
if bn:
modules.append(nn.BatchNorm2d(out_channels))
if relu:
modules.append(nn.ReLU(inplace=True))
return nn.Sequential(*modules)
def conv_dw(in_channels, out_channels, kernel_size=3, padding=1, stride=1, dilation=1):
return nn.Sequential(
nn.Conv2d(in_channels, in_channels, kernel_size, stride, padding, dilation=dilation, groups=in_channels, bias=False),
nn.BatchNorm2d(in_channels),
nn.ReLU(inplace=True),
nn.Conv2d(in_channels, out_channels, 1, 1, 0, bias=False),
nn.BatchNorm2d(out_channels),
nn.ReLU(inplace=True),
)
def conv_dw_no_bn(in_channels, out_channels, kernel_size=3, padding=1, stride=1, dilation=1):
return nn.Sequential(
nn.Conv2d(in_channels, in_channels, kernel_size, stride, padding, dilation=dilation, groups=in_channels, bias=False),
nn.ELU(inplace=True),
nn.Conv2d(in_channels, out_channels, 1, 1, 0, bias=False),
nn.ELU(inplace=True),
)
from torch import nn
def get_parameters(model, predicate):
for module in model.modules():
for param_name, param in module.named_parameters():
if predicate(module, param_name):
yield param
def get_parameters_conv(model, name):
return get_parameters(model, lambda m, p: isinstance(m, nn.Conv2d) and m.groups == 1 and p == name)
def get_parameters_conv_depthwise(model, name):
return get_parameters(model, lambda m, p: isinstance(m, nn.Conv2d)
and m.groups == m.in_channels
and m.in_channels == m.out_channels
and p == name)
def get_parameters_bn(model, name):
return get_parameters(model, lambda m, p: isinstance(m, nn.BatchNorm2d) and p == name)
import math
import numpy as np
from operator import itemgetter
BODY_PARTS_KPT_IDS = [[1, 2], [1, 5], [2, 3], [3, 4], [5, 6], [6, 7], [1, 8], [8, 9], [9, 10], [1, 11],
[11, 12], [12, 13], [1, 0], [0, 14], [14, 16], [0, 15], [15, 17], [2, 16], [5, 17]]
BODY_PARTS_PAF_IDS = ([12, 13], [20, 21], [14, 15], [16, 17], [22, 23], [24, 25], [0, 1], [2, 3], [4, 5],
[6, 7], [8, 9], [10, 11], [28, 29], [30, 31], [34, 35], [32, 33], [36, 37], [18, 19], [26, 27])
def linspace2d(start, stop, n=10):
points = 1 / (n - 1) * (stop - start)
return points[:, None] * np.arange(n) + start[:, None]
def extract_keypoints(heatmap, heatmaps_thr,all_keypoints, total_keypoint_num):
heatmap[heatmap < heatmaps_thr] = 0
heatmap_with_borders = np.pad(heatmap, [(2, 2), (2, 2)], mode='constant')
heatmap_center = heatmap_with_borders[1:heatmap_with_borders.shape[0]-1, 1:heatmap_with_borders.shape[1]-1]
heatmap_left = heatmap_with_borders[1:heatmap_with_borders.shape[0]-1, 2:heatmap_with_borders.shape[1]]
heatmap_right = heatmap_with_borders[1:heatmap_with_borders.shape[0]-1, 0:heatmap_with_borders.shape[1]-2]
heatmap_up = heatmap_with_borders[2:heatmap_with_borders.shape[0], 1:heatmap_with_borders.shape[1]-1]
heatmap_down = heatmap_with_borders[0:heatmap_with_borders.shape[0]-2, 1:heatmap_with_borders.shape[1]-1]
heatmap_peaks = (heatmap_center > heatmap_left) &\
(heatmap_center > heatmap_right) &\
(heatmap_center > heatmap_up) &\
(heatmap_center > heatmap_down)
heatmap_peaks = heatmap_peaks[1:heatmap_center.shape[0]-1, 1:heatmap_center.shape[1]-1]
keypoints = list(zip(np.nonzero(heatmap_peaks)[1], np.nonzero(heatmap_peaks)[0])) # (w, h)
keypoints = sorted(keypoints, key=itemgetter(0))
suppressed = np.zeros(len(keypoints), np.uint8)
keypoints_with_score_and_id = []
keypoint_num = 0
for i in range(len(keypoints)):
if suppressed[i]:
continue
for j in range(i+1, len(keypoints)):
if math.sqrt((keypoints[i][0] - keypoints[j][0]) ** 2 +
(keypoints[i][1] - keypoints[j][1]) ** 2) < 6:
suppressed[j] = 1
keypoint_with_score_and_id = (keypoints[i][0], keypoints[i][1], heatmap[keypoints[i][1], keypoints[i][0]],
total_keypoint_num + keypoint_num)
keypoints_with_score_and_id.append(keypoint_with_score_and_id)
keypoint_num += 1
all_keypoints.append(keypoints_with_score_and_id)
return keypoint_num
def group_keypoints(all_keypoints_by_type, pafs, pose_entry_size=20, min_paf_score=0.05, demo=False):
pose_entries = []
all_keypoints = np.array([item for sublist in all_keypoints_by_type for item in sublist])
for part_id in range(len(BODY_PARTS_PAF_IDS)):
part_pafs = pafs[:, :, BODY_PARTS_PAF_IDS[part_id]]
kpts_a = all_keypoints_by_type[BODY_PARTS_KPT_IDS[part_id][0]]
kpts_b = all_keypoints_by_type[BODY_PARTS_KPT_IDS[part_id][1]]
num_kpts_a = len(kpts_a)
num_kpts_b = len(kpts_b)
kpt_a_id = BODY_PARTS_KPT_IDS[part_id][0]
kpt_b_id = BODY_PARTS_KPT_IDS[part_id][1]
if num_kpts_a == 0 and num_kpts_b == 0: # no keypoints for such body part
continue
elif num_kpts_a == 0: # body part has just 'b' keypoints
for i in range(num_kpts_b):
num = 0
for j in range(len(pose_entries)): # check if already in some pose, was added by another body part
if pose_entries[j][kpt_b_id] == kpts_b[i][3]:
num += 1
continue
if num == 0:
pose_entry = np.ones(pose_entry_size) * -1
pose_entry[kpt_b_id] = kpts_b[i][3] # keypoint idx
pose_entry[-1] = 1 # num keypoints in pose
pose_entry[-2] = kpts_b[i][2] # pose score
pose_entries.append(pose_entry)
continue
elif num_kpts_b == 0: # body part has just 'a' keypoints
for i in range(num_kpts_a):
num = 0
for j in range(len(pose_entries)):
if pose_entries[j][kpt_a_id] == kpts_a[i][3]:
num += 1
continue
if num == 0:
pose_entry = np.ones(pose_entry_size) * -1
pose_entry[kpt_a_id] = kpts_a[i][3]
pose_entry[-1] = 1
pose_entry[-2] = kpts_a[i][2]
pose_entries.append(pose_entry)
continue
connections = []
for i in range(num_kpts_a):
kpt_a = np.array(kpts_a[i][0:2])
for j in range(num_kpts_b):
kpt_b = np.array(kpts_b[j][0:2])
mid_point = [(), ()]
mid_point[0] = (int(round((kpt_a[0] + kpt_b[0]) * 0.5)),
int(round((kpt_a[1] + kpt_b[1]) * 0.5)))
mid_point[1] = mid_point[0]
vec = [kpt_b[0] - kpt_a[0], kpt_b[1] - kpt_a[1]]
vec_norm = math.sqrt(vec[0] ** 2 + vec[1] ** 2)
if vec_norm == 0:
continue
vec[0] /= vec_norm
vec[1] /= vec_norm
cur_point_score = (vec[0] * part_pafs[mid_point[0][1], mid_point[0][0], 0] +
vec[1] * part_pafs[mid_point[1][1], mid_point[1][0], 1])
height_n = pafs.shape[0] // 2
success_ratio = 0
point_num = 10 # number of points to integration over paf
if cur_point_score > -100:
passed_point_score = 0
passed_point_num = 0
x, y = linspace2d(kpt_a, kpt_b)
for point_idx in range(point_num):
if not demo:
px = int(round(x[point_idx]))
py = int(round(y[point_idx]))
else:
px = int(x[point_idx])
py = int(y[point_idx])
paf = part_pafs[py, px, 0:2]
cur_point_score = vec[0] * paf[0] + vec[1] * paf[1]
if cur_point_score > min_paf_score:
passed_point_score += cur_point_score
passed_point_num += 1
success_ratio = passed_point_num / point_num
ratio = 0
if passed_point_num > 0:
ratio = passed_point_score / passed_point_num
ratio += min(height_n / vec_norm - 1, 0)
if ratio > 0 and success_ratio > 0.8:
score_all = ratio + kpts_a[i][2] + kpts_b[j][2]
connections.append([i, j, ratio, score_all])
if len(connections) > 0:
connections = sorted(connections, key=itemgetter(2), reverse=True)
num_connections = min(num_kpts_a, num_kpts_b)
has_kpt_a = np.zeros(num_kpts_a, dtype=np.int32)
has_kpt_b = np.zeros(num_kpts_b, dtype=np.int32)
filtered_connections = []
for row in range(len(connections)):
if len(filtered_connections) == num_connections:
break
i, j, cur_point_score = connections[row][0:3]
if not has_kpt_a[i] and not has_kpt_b[j]:
filtered_connections.append([kpts_a[i][3], kpts_b[j][3], cur_point_score])
has_kpt_a[i] = 1
has_kpt_b[j] = 1
connections = filtered_connections
if len(connections) == 0:
continue
if part_id == 0:
pose_entries = [np.ones(pose_entry_size) * -1 for _ in range(len(connections))]
for i in range(len(connections)):
pose_entries[i][BODY_PARTS_KPT_IDS[0][0]] = connections[i][0]
pose_entries[i][BODY_PARTS_KPT_IDS[0][1]] = connections[i][1]
pose_entries[i][-1] = 2
pose_entries[i][-2] = np.sum(all_keypoints[connections[i][0:2], 2]) + connections[i][2]
elif part_id == 17 or part_id == 18:
kpt_a_id = BODY_PARTS_KPT_IDS[part_id][0]
kpt_b_id = BODY_PARTS_KPT_IDS[part_id][1]
for i in range(len(connections)):
for j in range(len(pose_entries)):
if pose_entries[j][kpt_a_id] == connections[i][0] and pose_entries[j][kpt_b_id] == -1:
pose_entries[j][kpt_b_id] = connections[i][1]
elif pose_entries[j][kpt_b_id] == connections[i][1] and pose_entries[j][kpt_a_id] == -1:
pose_entries[j][kpt_a_id] = connections[i][0]
continue
else:
kpt_a_id = BODY_PARTS_KPT_IDS[part_id][0]
kpt_b_id = BODY_PARTS_KPT_IDS[part_id][1]
for i in range(len(connections)):
num = 0
for j in range(len(pose_entries)):
if pose_entries[j][kpt_a_id] == connections[i][0]:
pose_entries[j][kpt_b_id] = connections[i][1]
num += 1
pose_entries[j][-1] += 1
pose_entries[j][-2] += all_keypoints[connections[i][1], 2] + connections[i][2]
if num == 0:
pose_entry = np.ones(pose_entry_size) * -1
pose_entry[kpt_a_id] = connections[i][0]
pose_entry[kpt_b_id] = connections[i][1]
pose_entry[-1] = 2
pose_entry[-2] = np.sum(all_keypoints[connections[i][0:2], 2]) + connections[i][2]
pose_entries.append(pose_entry)
filtered_entries = []
for i in range(len(pose_entries)):
if pose_entries[i][-1] < 3 or (pose_entries[i][-2] / pose_entries[i][-1] < 0.2):
continue
filtered_entries.append(pose_entries[i])
pose_entries = np.asarray(filtered_entries)
return pose_entries, all_keypoints
import collections
def load_state(net, checkpoint):
# source_state = checkpoint['state_dict']
source_state = checkpoint
target_state = net.state_dict()
new_target_state = collections.OrderedDict()
for target_key, target_value in target_state.items():
if target_key in source_state and source_state[target_key].size() == target_state[target_key].size():
new_target_state[target_key] = source_state[target_key]
else:
new_target_state[target_key] = target_state[target_key]
print('[WARNING] Not found pre-trained parameters for {}'.format(target_key))
net.load_state_dict(new_target_state)
def l2_loss(input, target, mask, batch_size):
loss = (input - target) * mask
loss = (loss * loss) / 2 / batch_size
return loss.sum()
import math
def get_alpha(rate=30, cutoff=1):
tau = 1 / (2 * math.pi * cutoff)
te = 1 / rate
return 1 / (1 + tau / te)
class LowPassFilter:
def __init__(self):
self.x_previous = None
def __call__(self, x, alpha=0.5):
if self.x_previous is None:
self.x_previous = x
return x
x_filtered = alpha * x + (1 - alpha) * self.x_previous
self.x_previous = x_filtered
return x_filtered
class OneEuroFilter:
def __init__(self, freq=15, mincutoff=1, beta=0.05, dcutoff=1):
self.freq = freq
self.mincutoff = mincutoff
self.beta = beta
self.dcutoff = dcutoff
self.filter_x = LowPassFilter()
self.filter_dx = LowPassFilter()
self.x_previous = None
self.dx = None
def __call__(self, x):
if self.dx is None:
self.dx = 0
else:
self.dx = (x - self.x_previous) * self.freq
dx_smoothed = self.filter_dx(self.dx, get_alpha(self.freq, self.dcutoff))
cutoff = self.mincutoff + self.beta * abs(dx_smoothed)
x_filtered = self.filter_x(x, get_alpha(self.freq, cutoff))
self.x_previous = x
return x_filtered
if __name__ == '__main__':
filter = OneEuroFilter(freq=15, beta=0.1)
for val in range(10):
x = val + (-1)**(val % 2)
x_filtered = filter(x)
print(x_filtered, x)
import cv2
import numpy as np
from modules.keypoints import BODY_PARTS_KPT_IDS, BODY_PARTS_PAF_IDS
from modules.one_euro_filter import OneEuroFilter
class Pose:
num_kpts = 18
kpt_names = ['nose', 'neck',
'r_sho', 'r_elb', 'r_wri', 'l_sho', 'l_elb', 'l_wri',
'r_hip', 'r_knee', 'r_ank', 'l_hip', 'l_knee', 'l_ank',
'r_eye', 'l_eye',
'r_ear', 'l_ear']
sigmas = np.array([.26, .79, .79, .72, .62, .79, .72, .62, 1.07, .87, .89, 1.07, .87, .89, .25, .25, .35, .35],
dtype=np.float32) / 10.0
vars = (sigmas * 2) ** 2
last_id = -1
color = [0, 224, 255]
def __init__(self, keypoints, confidence):
super().__init__()
self.keypoints = keypoints
self.confidence = confidence
self.bbox = Pose.get_bbox(self.keypoints)
self.id = None
self.filters = [[OneEuroFilter(), OneEuroFilter()] for _ in range(Pose.num_kpts)]
@staticmethod
def get_bbox(keypoints):
found_keypoints = np.zeros((np.count_nonzero(keypoints[:, 0] != -1), 2), dtype=np.int32)
found_kpt_id = 0
for kpt_id in range(Pose.num_kpts):
if keypoints[kpt_id, 0] == -1:
continue
found_keypoints[found_kpt_id] = keypoints[kpt_id]
found_kpt_id += 1
bbox = cv2.boundingRect(found_keypoints)
return bbox
def update_id(self, id=None):
self.id = id
if self.id is None:
self.id = Pose.last_id + 1
Pose.last_id += 1
def draw(self, img,color_x = [255, 0, 0]):
assert self.keypoints.shape == (Pose.num_kpts, 2)
for part_id in range(len(BODY_PARTS_PAF_IDS) - 2):
kpt_a_id = BODY_PARTS_KPT_IDS[part_id][0]
global_kpt_a_id = self.keypoints[kpt_a_id, 0]
if global_kpt_a_id != -1:
x_a, y_a = self.keypoints[kpt_a_id]
cv2.circle(img, (int(x_a), int(y_a)), 3, Pose.color, -1)
kpt_b_id = BODY_PARTS_KPT_IDS[part_id][1]
global_kpt_b_id = self.keypoints[kpt_b_id, 0]
if global_kpt_b_id != -1:
x_b, y_b = self.keypoints[kpt_b_id]
cv2.circle(img, (int(x_b), int(y_b)), 3, Pose.color, -1)
if global_kpt_a_id != -1 and global_kpt_b_id != -1:
cv2.line(img, (int(x_a), int(y_a)), (int(x_b), int(y_b)), (255,60,60), 9)
cv2.line(img, (int(x_a), int(y_a)), (int(x_b), int(y_b)), color_x, 4)
def get_similarity(a, b, threshold=0.5):
num_similar_kpt = 0
for kpt_id in range(Pose.num_kpts):
if a.keypoints[kpt_id, 0] != -1 and b.keypoints[kpt_id, 0] != -1:
distance = np.sum((a.keypoints[kpt_id] - b.keypoints[kpt_id]) ** 2)
area = max(a.bbox[2] * a.bbox[3], b.bbox[2] * b.bbox[3])
similarity = np.exp(-distance / (2 * (area + np.spacing(1)) * Pose.vars[kpt_id]))
if similarity > threshold:
num_similar_kpt += 1
return num_similar_kpt
def track_poses(previous_poses, current_poses, threshold=3, smooth=False):
"""Propagate poses ids from previous frame results. Id is propagated,
if there are at least `threshold` similar keypoints between pose from previous frame and current.
If correspondence between pose on previous and current frame was established, pose keypoints are smoothed.
:param previous_poses: poses from previous frame with ids
:param current_poses: poses from current frame to assign ids
:param threshold: minimal number of similar keypoints between poses
:param smooth: smooth pose keypoints between frames
:return: None
"""
current_poses = sorted(current_poses, key=lambda pose: pose.confidence, reverse=True) # match confident poses first
mask = np.ones(len(previous_poses), dtype=np.int32)
for current_pose in current_poses:
best_matched_id = None
best_matched_pose_id = None
best_matched_iou = 0
for id, previous_pose in enumerate(previous_poses):
if not mask[id]:
continue
iou = get_similarity(current_pose, previous_pose)
if iou > best_matched_iou:
best_matched_iou = iou
best_matched_pose_id = previous_pose.id
best_matched_id = id
if best_matched_iou >= threshold:
mask[best_matched_id] = 0
else: # pose not similar to any previous
best_matched_pose_id = None
current_pose.update_id(best_matched_pose_id)
if smooth:
for kpt_id in range(Pose.num_kpts):
if current_pose.keypoints[kpt_id, 0] == -1:
continue
# reuse filter if previous pose has valid filter
if (best_matched_pose_id is not None
and previous_poses[best_matched_id].keypoints[kpt_id, 0] != -1):
current_pose.filters[kpt_id] = previous_poses[best_matched_id].filters[kpt_id]
current_pose.keypoints[kpt_id, 0] = current_pose.filters[kpt_id][0](current_pose.keypoints[kpt_id, 0])
current_pose.keypoints[kpt_id, 1] = current_pose.filters[kpt_id][1](current_pose.keypoints[kpt_id, 1])
current_pose.bbox = Pose.get_bbox(current_pose.keypoints)
import argparse
import json
import pickle
def prepare_annotations(annotations_per_image, images_info, net_input_size):
"""Prepare labels for training. For each annotated person calculates center
to perform crop around it during the training. Also converts data to the internal format.
:param annotations_per_image: all annotations for specified image id
:param images_info: auxiliary information about all images
:param net_input_size: network input size during training
:return: list of prepared annotations
"""
prepared_annotations = []
for _, annotations in annotations_per_image.items():
previous_centers = []
for annotation in annotations[0]:
if (annotation['num_keypoints'] < 5
or annotation['area'] < 32 * 32):
continue
person_center = [annotation['bbox'][0] + annotation['bbox'][2] / 2,
annotation['bbox'][1] + annotation['bbox'][3] / 2]
is_close = False
for previous_center in previous_centers:
distance_to_previous = ((person_center[0] - previous_center[0]) ** 2
+ (person_center[1] - previous_center[1]) ** 2) ** 0.5
if distance_to_previous < previous_center[2] * 0.3:
is_close = True
break
if is_close:
continue
prepared_annotation = {
'img_paths': images_info[annotation['image_id']]['file_name'],
'img_width': images_info[annotation['image_id']]['width'],
'img_height': images_info[annotation['image_id']]['height'],
'objpos': person_center,
'image_id': annotation['image_id'],
'bbox': annotation['bbox'],
'segment_area': annotation['area'],
'scale_provided': annotation['bbox'][3] / net_input_size,
'num_keypoints': annotation['num_keypoints'],
'segmentations': annotations[1]
}
keypoints = []
for i in range(len(annotation['keypoints']) // 3):
keypoint = [annotation['keypoints'][i * 3], annotation['keypoints'][i * 3 + 1], 2]
if annotation['keypoints'][i * 3 + 2] == 1:
keypoint[2] = 0
elif annotation['keypoints'][i * 3 + 2] == 2:
keypoint[2] = 1
keypoints.append(keypoint)
prepared_annotation['keypoints'] = keypoints
prepared_other_annotations = []
for other_annotation in annotations[0]:
if other_annotation == annotation:
continue
prepared_other_annotation = {
'objpos': [other_annotation['bbox'][0] + other_annotation['bbox'][2] / 2,
other_annotation['bbox'][1] + other_annotation['bbox'][3] / 2],
'bbox': other_annotation['bbox'],
'segment_area': other_annotation['area'],
'scale_provided': other_annotation['bbox'][3] / net_input_size,
'num_keypoints': other_annotation['num_keypoints']
}
keypoints = []
for i in range(len(other_annotation['keypoints']) // 3):
keypoint = [other_annotation['keypoints'][i * 3], other_annotation['keypoints'][i * 3 + 1], 2]
if other_annotation['keypoints'][i * 3 + 2] == 1:
keypoint[2] = 0
elif other_annotation['keypoints'][i * 3 + 2] == 2:
keypoint[2] = 1
keypoints.append(keypoint)
prepared_other_annotation['keypoints'] = keypoints
prepared_other_annotations.append(prepared_other_annotation)
prepared_annotation['processed_other_annotations'] = prepared_other_annotations
prepared_annotations.append(prepared_annotation)
previous_centers.append((person_center[0], person_center[1], annotation['bbox'][2], annotation['bbox'][3]))
return prepared_annotations
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--labels', type=str, default='./coco2017/annotations/person_keypoints_train2017.json', help='path to json with keypoints train labels')
parser.add_argument('--output-name', type=str, default='prepared_train_annotation.pkl',
help='name of output file with prepared keypoints annotation')
parser.add_argument('--net-input-size', type=int, default=368, help='network input size')
args = parser.parse_args()
with open(args.labels, 'r') as f:
data = json.load(f)
annotations_per_image_mapping = {}
for annotation in data['annotations']:
if annotation['num_keypoints'] != 0 and not annotation['iscrowd']:
if annotation['image_id'] not in annotations_per_image_mapping:
annotations_per_image_mapping[annotation['image_id']] = [[], []]
annotations_per_image_mapping[annotation['image_id']][0].append(annotation)
crowd_segmentations_per_image_mapping = {}
for annotation in data['annotations']:
if annotation['iscrowd']:
if annotation['image_id'] not in crowd_segmentations_per_image_mapping:
crowd_segmentations_per_image_mapping[annotation['image_id']] = []
crowd_segmentations_per_image_mapping[annotation['image_id']].append(annotation['segmentation'])
for image_id, crowd_segmentations in crowd_segmentations_per_image_mapping.items():
if image_id in annotations_per_image_mapping:
annotations_per_image_mapping[image_id][1] = crowd_segmentations
images_info = {}
for image_info in data['images']:
images_info[image_info['id']] = image_info
prepared_annotations = prepare_annotations(annotations_per_image_mapping, images_info, args.net_input_size)
with open(args.output_name, 'wb') as f:
pickle.dump(prepared_annotations, f)
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
import argparse
import cv2
import torch
from torch.nn import DataParallel
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms
from datasets.coco import CocoTrainDataset
from datasets.transformations import ConvertKeypoints, Scale, Rotate, CropPad, Flip
from modules.get_parameters import get_parameters_conv, get_parameters_bn, get_parameters_conv_depthwise
from models.with_mobilenet import PoseEstimationWithMobileNet
from modules.loss import l2_loss
from modules.load_state import load_state
from val import evaluate
cv2.setNumThreads(0)
cv2.ocl.setUseOpenCL(False) # To prevent freeze of DataLoader
def train(prepared_train_labels, train_images_folder, num_refinement_stages, base_lr, batch_size, batches_per_iter,
num_workers, checkpoint_path, weights_only, checkpoints_folder, log_after,
val_labels, val_images_folder, val_output_name, checkpoint_after, val_after):
net = PoseEstimationWithMobileNet(num_refinement_stages)
stride = 8
sigma = 7
path_thickness = 1
dataset = CocoTrainDataset(prepared_train_labels, train_images_folder,
stride, sigma, path_thickness,
transform=transforms.Compose([
ConvertKeypoints(),
Scale(),
Rotate(pad=(128, 128, 128)),
CropPad(pad=(128, 128, 128)),
Flip()]))
train_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)
optimizer = optim.Adam([
{'params': get_parameters_conv(net.model, 'weight')},
{'params': get_parameters_conv_depthwise(net.model, 'weight'), 'weight_decay': 0},
{'params': get_parameters_bn(net.model, 'weight'), 'weight_decay': 0},
{'params': get_parameters_bn(net.model, 'bias'), 'lr': base_lr * 2, 'weight_decay': 0},
{'params': get_parameters_conv(net.cpm, 'weight'), 'lr': base_lr},
{'params': get_parameters_conv(net.cpm, 'bias'), 'lr': base_lr * 2, 'weight_decay': 0},
{'params': get_parameters_conv_depthwise(net.cpm, 'weight'), 'weight_decay': 0},
{'params': get_parameters_conv(net.initial_stage, 'weight'), 'lr': base_lr},
{'params': get_parameters_conv(net.initial_stage, 'bias'), 'lr': base_lr * 2, 'weight_decay': 0},
{'params': get_parameters_conv(net.refinement_stages, 'weight'), 'lr': base_lr * 4},
{'params': get_parameters_conv(net.refinement_stages, 'bias'), 'lr': base_lr * 8, 'weight_decay': 0},
{'params': get_parameters_bn(net.refinement_stages, 'weight'), 'weight_decay': 0},
{'params': get_parameters_bn(net.refinement_stages, 'bias'), 'lr': base_lr * 2, 'weight_decay': 0},
], lr=base_lr, weight_decay=5e-4)
num_iter = 0
current_epoch = 0
drop_after_epoch = [100, 200, 260]
scheduler = optim.lr_scheduler.MultiStepLR(optimizer, milestones=drop_after_epoch, gamma=0.333)
if checkpoint_path:
checkpoint = torch.load(checkpoint_path)
load_state(net, checkpoint)
print("load : {}".format(checkpoint_path))
net = net.cuda()
net.train()
flag_start = False
for epochId in range(current_epoch, 280):
if flag_start:
scheduler.step()
flag_start = True
total_losses = [0, 0] * (num_refinement_stages + 1) # heatmaps loss, paf loss per stage
batch_per_iter_idx = 0
for batch_data in train_loader:
if batch_per_iter_idx == 0:
optimizer.zero_grad()
images = batch_data['image'].cuda()
keypoint_masks = batch_data['keypoint_mask'].cuda()
paf_masks = batch_data['paf_mask'].cuda()
keypoint_maps = batch_data['keypoint_maps'].cuda()
paf_maps = batch_data['paf_maps'].cuda()
stages_output = net(images)
losses = []
for loss_idx in range(len(total_losses) // 2):
losses.append(l2_loss(stages_output[loss_idx * 2], keypoint_maps, keypoint_masks, images.shape[0]))
losses.append(l2_loss(stages_output[loss_idx * 2 + 1], paf_maps, paf_masks, images.shape[0]))
total_losses[loss_idx * 2] += losses[-2].item() / batches_per_iter
total_losses[loss_idx * 2 + 1] += losses[-1].item() / batches_per_iter
loss = losses[0]
for loss_idx in range(1, len(losses)):
loss += losses[loss_idx]
loss /= batches_per_iter
loss.backward()
batch_per_iter_idx += 1
if batch_per_iter_idx == batches_per_iter:
optimizer.step()
batch_per_iter_idx = 0
num_iter += 1
else:
continue
if num_iter % log_after == 0:
print('Iter: {}'.format(num_iter))
for loss_idx in range(len(total_losses) // 2):
print('\n'.join(['stage{}_pafs_loss: {}', 'stage{}_heatmaps_loss: {}']).format(
loss_idx + 1, total_losses[loss_idx * 2 + 1] / log_after,
loss_idx + 1, total_losses[loss_idx * 2] / log_after))
for loss_idx in range(len(total_losses)):
total_losses[loss_idx] = 0
if num_iter % checkpoint_after == 0:
snapshot_name = '{}/light_pose-iter_{}.pth'.format(checkpoints_folder, num_iter)
torch.save(net.state_dict(),snapshot_name)
if num_iter % val_after == 0:
print('Validation...')
evaluate(val_labels, val_output_name, val_images_folder, net)
net.train()
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--prepared-train-labels', type=str, default='prepared_train_annotation.pkl',
help='path to the file with prepared annotations')
parser.add_argument('--train-images-folder', type=str, default='./coco2017/train2017/', help='path to COCO train images folder')
parser.add_argument('--num-refinement-stages', type=int, default=3, help='number of refinement stages')
parser.add_argument('--base-lr', type=float, default=4e-5, help='initial learning rate')
parser.add_argument('--batch-size', type=int, default=8, help='batch size')
parser.add_argument('--batches-per-iter', type=int, default=1, help='number of batches to accumulate gradient from')
parser.add_argument('--num-workers', type=int, default=8, help='number of workers')
parser.add_argument('--checkpoint-path', type=str, default='./finetune_model/light_pose.pth', help='path to the checkpoint to continue training from')
parser.add_argument('--weights-only', type=bool,default=True,
help='just initialize layers with pre-trained weights and start training from the beginning')
parser.add_argument('--experiment-name', type=str, default='light_pose',
help='experiment name to create folder for checkpoints')
parser.add_argument('--log-after', type=int, default=100, help='number of iterations to print train loss')
parser.add_argument('--val-labels', type=str, default='val_subset.json', help='path to json with keypoints val labels')
parser.add_argument('--val-images-folder', type=str, default='./coco2017/val2017/', help='path to COCO val images folder')
parser.add_argument('--val-output-name', type=str, default='detections.json',
help='name of output json file with detected keypoints')
parser.add_argument('--checkpoint-after', type=int, default=1000,
help='number of iterations to save checkpoint')
parser.add_argument('--val-after', type=int, default=10000,
help='number of iterations to run validation')
args = parser.parse_args()
checkpoints_folder = '{}_checkpoints'.format(args.experiment_name)
if not os.path.exists(checkpoints_folder):
os.makedirs(checkpoints_folder)
train(args.prepared_train_labels, args.train_images_folder, args.num_refinement_stages, args.base_lr, args.batch_size,
args.batches_per_iter, args.num_workers, args.checkpoint_path, args.weights_only,
checkpoints_folder, args.log_after, args.val_labels, args.val_images_folder, args.val_output_name,
args.checkpoint_after, args.val_after)
import argparse
import cv2
import json
import math
import numpy as np
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval
import torch
from datasets.coco import CocoValDataset
from models.with_mobilenet import PoseEstimationWithMobileNet
from modules.keypoints import extract_keypoints, group_keypoints
from modules.load_state import load_state
def run_coco_eval(gt_file_path, dt_file_path):
annotation_type = 'keypoints'
print('Running test for {} results.'.format(annotation_type))
coco_gt = COCO(gt_file_path)
coco_dt = coco_gt.loadRes(dt_file_path)
result = COCOeval(coco_gt, coco_dt, annotation_type)
result.evaluate()
result.accumulate()
result.summarize()
def normalize(img, img_mean, img_scale):
img = np.array(img, dtype=np.float32)
img = (img - img_mean) * img_scale
return img
def pad_width(img, stride, pad_value, min_dims):
h, w, _ = img.shape
h = min(min_dims[0], h)
min_dims[0] = math.ceil(min_dims[0] / float(stride)) * stride
min_dims[1] = max(min_dims[1], w)
min_dims[1] = math.ceil(min_dims[1] / float(stride)) * stride
pad = []
pad.append(int(math.floor((min_dims[0] - h) / 2.0)))
pad.append(int(math.floor((min_dims[1] - w) / 2.0)))
pad.append(int(min_dims[0] - h - pad[0]))
pad.append(int(min_dims[1] - w - pad[1]))
padded_img = cv2.copyMakeBorder(img, pad[0], pad[2], pad[1], pad[3],
cv2.BORDER_CONSTANT, value=pad_value)
return padded_img, pad
def convert_to_coco_format(pose_entries, all_keypoints):
coco_keypoints = []
scores = []
for n in range(len(pose_entries)):
if len(pose_entries[n]) == 0:
continue
keypoints = [0] * 17 * 3
to_coco_map = [0, -1, 6, 8, 10, 5, 7, 9, 12, 14, 16, 11, 13, 15, 2, 1, 4, 3]
person_score = pose_entries[n][-2]
position_id = -1
for keypoint_id in pose_entries[n][:-2]:
position_id += 1
if position_id == 1: # no 'neck' in COCO
continue
cx, cy, score, visibility = 0, 0, 0, 0 # keypoint not found
if keypoint_id != -1:
cx, cy, score = all_keypoints[int(keypoint_id), 0:3]
cx = cx + 0.5
cy = cy + 0.5
visibility = 1
keypoints[to_coco_map[position_id] * 3 + 0] = cx
keypoints[to_coco_map[position_id] * 3 + 1] = cy
keypoints[to_coco_map[position_id] * 3 + 2] = visibility
coco_keypoints.append(keypoints)
scores.append(person_score * max(0, (pose_entries[n][-1] - 1))) # -1 for 'neck'
return coco_keypoints, scores
def infer(net, img, scales, base_height, stride, pad_value=(0, 0, 0), img_mean=(128, 128, 128), img_scale=1/256):
normed_img = normalize(img, img_mean, img_scale)
height, width, _ = normed_img.shape
scales_ratios = [scale * base_height / float(height) for scale in scales]
avg_heatmaps = np.zeros((height, width, 19), dtype=np.float32)
avg_pafs = np.zeros((height, width, 38), dtype=np.float32)
for ratio in scales_ratios:
scaled_img = cv2.resize(normed_img, (0, 0), fx=ratio, fy=ratio, interpolation=cv2.INTER_CUBIC)
min_dims = [base_height, max(scaled_img.shape[1], base_height)]
padded_img, pad = pad_width(scaled_img, stride, pad_value, min_dims)
tensor_img = torch.from_numpy(padded_img).permute(2, 0, 1).unsqueeze(0).float().cuda()
stages_output = net(tensor_img)
stage2_heatmaps = stages_output[-2]
heatmaps = np.transpose(stage2_heatmaps.squeeze().cpu().data.numpy(), (1, 2, 0))
heatmaps = cv2.resize(heatmaps, (0, 0), fx=stride, fy=stride, interpolation=cv2.INTER_CUBIC)
heatmaps = heatmaps[pad[0]:heatmaps.shape[0] - pad[2], pad[1]:heatmaps.shape[1] - pad[3]:, :]
heatmaps = cv2.resize(heatmaps, (width, height), interpolation=cv2.INTER_CUBIC)
avg_heatmaps = avg_heatmaps + heatmaps / len(scales_ratios)
stage2_pafs = stages_output[-1]
pafs = np.transpose(stage2_pafs.squeeze().cpu().data.numpy(), (1, 2, 0))
pafs = cv2.resize(pafs, (0, 0), fx=stride, fy=stride, interpolation=cv2.INTER_CUBIC)
pafs = pafs[pad[0]:pafs.shape[0] - pad[2], pad[1]:pafs.shape[1] - pad[3], :]
pafs = cv2.resize(pafs, (width, height), interpolation=cv2.INTER_CUBIC)
avg_pafs = avg_pafs + pafs / len(scales_ratios)
return avg_heatmaps, avg_pafs
def evaluate(labels, output_name, images_folder, net, multiscale=False, visualize=False):
net = net.cuda().eval()
base_height = 368
scales = [1]
if multiscale:
scales = [0.5, 1.0, 1.5, 2.0]
stride = 8
dataset = CocoValDataset(labels, images_folder)
coco_result = []
for sample in dataset:
file_name = sample['file_name']
img = sample['img']
avg_heatmaps, avg_pafs = infer(net, img, scales, base_height, stride)
total_keypoints_num = 0
all_keypoints_by_type = []
for kpt_idx in range(18): # 19th for bg
total_keypoints_num += extract_keypoints(avg_heatmaps[:, :, kpt_idx], all_keypoints_by_type, total_keypoints_num)
pose_entries, all_keypoints = group_keypoints(all_keypoints_by_type, avg_pafs)
coco_keypoints, scores = convert_to_coco_format(pose_entries, all_keypoints)
image_id = int(file_name[0:file_name.rfind('.')])
for idx in range(len(coco_keypoints)):
coco_result.append({
'image_id': image_id,
'category_id': 1, # person
'keypoints': coco_keypoints[idx],
'score': scores[idx]
})
if visualize:
for keypoints in coco_keypoints:
for idx in range(len(keypoints) // 3):
cv2.circle(img, (int(keypoints[idx * 3]), int(keypoints[idx * 3 + 1])),
3, (255, 0, 255), -1)
cv2.imshow('keypoints', img)
key = cv2.waitKey()
if key == 27: # esc
return
with open(output_name, 'w') as f:
json.dump(coco_result, f, indent=4)
run_coco_eval(labels, output_name)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--labels', type=str, required=True, help='path to json with keypoints val labels')
parser.add_argument('--output-name', type=str, default='detections.json',
help='name of output json file with detected keypoints')
parser.add_argument('--images-folder', type=str, required=True, help='path to COCO val images folder')
parser.add_argument('--checkpoint-path', type=str, required=True, help='path to the checkpoint')
parser.add_argument('--multiscale', action='store_true', help='average inference results over multiple scales')
parser.add_argument('--visualize', action='store_true', help='show keypoints')
args = parser.parse_args()
net = PoseEstimationWithMobileNet()
checkpoint = torch.load(args.checkpoint_path)
load_state(net, checkpoint)
evaluate(args.labels, args.output_name, args.images_folder, net, args.multiscale, args.visualize)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册