From 77dd584a5c3ea57693301186529dc4fc82b2913f Mon Sep 17 00:00:00 2001 From: "Eric.Lee2021" <305141918@qq.com> Date: Thu, 22 Apr 2021 05:17:35 +0800 Subject: [PATCH] add components\insight_face --- components/insight_face/face_verify.py | 117 ++++++++++ components/insight_face/model.py | 305 +++++++++++++++++++++++++ components/insight_face/utils.py | 147 ++++++++++++ 3 files changed, 569 insertions(+) create mode 100644 components/insight_face/face_verify.py create mode 100644 components/insight_face/model.py create mode 100644 components/insight_face/utils.py diff --git a/components/insight_face/face_verify.py b/components/insight_face/face_verify.py new file mode 100644 index 0000000..3fbac74 --- /dev/null +++ b/components/insight_face/face_verify.py @@ -0,0 +1,117 @@ +#-*-coding:utf-8-*- +# date:2021-04-16 +# Author: Eric.Lee +# function: face verify + +import warnings +warnings.filterwarnings("ignore") +import os +import torch +from insight_face.model import Backbone,MobileFaceNet +from insight_face.utils import load_facebank,infer +from pathlib import Path +from PIL import Image +import cv2 + +class insight_face_model(object): + def __init__(self, + net_mode = "ir_se", # [ir, ir_se, mobilefacenet] + net_depth = 50, # [50,100,152] + backbone_model_path = "./components/insight_face/weights/model_ir_se-50.pth", + facebank_path = "./components/insight_face/facebank", # 人脸比对底库 + tta = False, + threshold = 1.2 , + embedding_size = 512, + ): + + self.threshold = threshold + self.tta = tta + device_ = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") + + if net_mode == "mobilefacenet": + model_ = MobileFaceNet(embedding_size).to(device_) + print('MobileFaceNet model generated') + else: + model_ = Backbone(net_depth, 1., net_mode).to(device_) + print('{}_{} model generated'.format(net_mode, net_depth)) + + if os.access(backbone_model_path,os.F_OK): + model_.load_state_dict(torch.load(backbone_model_path)) + print("-------->>> load model : {}".format(backbone_model_path)) + + model_.eval() + self.model_ = model_ + self.device_ = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") + #------------------- 加载人脸比对底库 + targets, names = load_facebank(facebank_path) + + self.face_targets = targets + self.face_names = names + + print("faces verify names : \n {}".format(self.face_names)) + print("targets size : {}".format(self.face_targets.size())) + + def predict(self, faces_identify, vis = False): + with torch.no_grad(): + + results, face_dst = infer(self.model_, self.device_, faces_identify, self.face_targets, threshold = self.threshold ,tta=self.tta) + # print(results, face_dst) + + return results, face_dst + # print("names : {}".format(names)) + # print("targets size : {}".format(targets.size())) + # + # #--------------------------------------------------------------------------- + # if True: + # print("\n---------------------------\n") + # faces_identify = [] + # idx = 0 + # for file in os.listdir(args.example): + # img = cv2.imread(args.example + file) # 图像必须 112*112 + # faces_identify.append(Image.fromarray(img)) + # + # results, face_dst = infer(model_, device_, faces_identify, targets, threshold = 1.2 ,tta=False) + # + # face_dst = list(face_dst.cpu().detach().numpy()) + # + # print("{}) recognize:{} ,dst : {}".format(idx+1,names[results[idx] + 1],face_dst[idx])) + # + # cv2.putText(img, names[results[idx] + 1], (2,13),cv2.FONT_HERSHEY_DUPLEX, 0.38, (55, 0, 220),5) + # cv2.putText(img, names[results[idx] + 1], (2,13),cv2.FONT_HERSHEY_DUPLEX, 0.38, (255, 50, 50),1) + # + # cv2.namedWindow("imag_face",0) + # cv2.imshow("imag_face",img) + # cv2.waitKey(0) + # + # idx += 1 + # cv2.destroyAllWindows() + # else: + # #--------------------------------------------------------------------------- + # print("\n---------------------------\n") + # faces_identify = [] + # idx = 0 + # sum = 0 + # r_ = 0 + # for doc_ in os.listdir(args.example): + # for file in os.listdir(args.example + doc_): + # img = cv2.imread(args.example + doc_ + "/" + file) # 图像必须 112*112 + # faces_identify.append(Image.fromarray(img)) + # + # results, face_dst = infer(model_, device_, faces_identify, targets, threshold = 1.2 ,tta=False) + # + # face_dst = list(face_dst.cpu().detach().numpy()) + # + # print("{}) gt : {} ~ recognize:{} , dst : {}".format(idx+1,doc_,names[results[idx] + 1],face_dst[idx])) + # + # # + # sum += 1 + # if doc_ == names[results[idx] + 1]: + # r_ += 1 + # print(" {}- {} -->> precision : {}".format(r_,sum,r_/sum)) + # + # idx += 1 + # + # cv2.namedWindow("imag_face",0) + # cv2.imshow("imag_face",img) + # cv2.waitKey(1) + # cv2.destroyAllWindows() diff --git a/components/insight_face/model.py b/components/insight_face/model.py new file mode 100644 index 0000000..237751d --- /dev/null +++ b/components/insight_face/model.py @@ -0,0 +1,305 @@ +from torch.nn import Linear, Conv2d, BatchNorm1d, BatchNorm2d, PReLU, ReLU, Sigmoid, Dropout2d, Dropout, AvgPool2d, MaxPool2d, AdaptiveAvgPool2d, Sequential, Module, Parameter +import torch.nn.functional as F +import torch +from collections import namedtuple +import math +import pdb + +################################## Original Arcface Model ############################################################# + +class Flatten(Module): + def forward(self, input): + return input.view(input.size(0), -1) + +def l2_norm(input,axis=1): + norm = torch.norm(input,2,axis,True) + output = torch.div(input, norm) + return output + +class SEModule(Module): + def __init__(self, channels, reduction): + super(SEModule, self).__init__() + self.avg_pool = AdaptiveAvgPool2d(1) + self.fc1 = Conv2d( + channels, channels // reduction, kernel_size=1, padding=0 ,bias=False) + self.relu = ReLU(inplace=True) + self.fc2 = Conv2d( + channels // reduction, channels, kernel_size=1, padding=0 ,bias=False) + self.sigmoid = Sigmoid() + + def forward(self, x): + module_input = x + x = self.avg_pool(x) + x = self.fc1(x) + x = self.relu(x) + x = self.fc2(x) + x = self.sigmoid(x) + return module_input * x + +class bottleneck_IR(Module): + def __init__(self, in_channel, depth, stride): + super(bottleneck_IR, self).__init__() + if in_channel == depth: + self.shortcut_layer = MaxPool2d(1, stride) + else: + self.shortcut_layer = Sequential( + Conv2d(in_channel, depth, (1, 1), stride ,bias=False), BatchNorm2d(depth)) + self.res_layer = Sequential( + BatchNorm2d(in_channel), + Conv2d(in_channel, depth, (3, 3), (1, 1), 1 ,bias=False), PReLU(depth), + Conv2d(depth, depth, (3, 3), stride, 1 ,bias=False), BatchNorm2d(depth)) + + def forward(self, x): + shortcut = self.shortcut_layer(x) + res = self.res_layer(x) + return res + shortcut + +class bottleneck_IR_SE(Module): + def __init__(self, in_channel, depth, stride): + super(bottleneck_IR_SE, self).__init__() + if in_channel == depth: + self.shortcut_layer = MaxPool2d(1, stride) + else: + self.shortcut_layer = Sequential( + Conv2d(in_channel, depth, (1, 1), stride ,bias=False), + BatchNorm2d(depth)) + self.res_layer = Sequential( + BatchNorm2d(in_channel), + Conv2d(in_channel, depth, (3,3), (1,1),1 ,bias=False), + PReLU(depth), + Conv2d(depth, depth, (3,3), stride, 1 ,bias=False), + BatchNorm2d(depth), + SEModule(depth,16) + ) + def forward(self,x): + shortcut = self.shortcut_layer(x) + res = self.res_layer(x) + return res + shortcut + +class Bottleneck(namedtuple('Block', ['in_channel', 'depth', 'stride'])): + '''A named tuple describing a ResNet block.''' + +def get_block(in_channel, depth, num_units, stride = 2): + return [Bottleneck(in_channel, depth, stride)] + [Bottleneck(depth, depth, 1) for i in range(num_units-1)] + +def get_blocks(num_layers): + if num_layers == 50: + blocks = [ + get_block(in_channel=64, depth=64, num_units = 3), + get_block(in_channel=64, depth=128, num_units=4), + get_block(in_channel=128, depth=256, num_units=14), + get_block(in_channel=256, depth=512, num_units=3) + ] + elif num_layers == 100: + blocks = [ + get_block(in_channel=64, depth=64, num_units=3), + get_block(in_channel=64, depth=128, num_units=13), + get_block(in_channel=128, depth=256, num_units=30), + get_block(in_channel=256, depth=512, num_units=3) + ] + elif num_layers == 152: + blocks = [ + get_block(in_channel=64, depth=64, num_units=3), + get_block(in_channel=64, depth=128, num_units=8), + get_block(in_channel=128, depth=256, num_units=36), + get_block(in_channel=256, depth=512, num_units=3) + ] + return blocks + +class Backbone(Module): + def __init__(self, num_layers, drop_ratio, mode='ir'): + super(Backbone, self).__init__() + assert num_layers in [50, 100, 152], 'num_layers should be 50,100, or 152' + assert mode in ['ir', 'ir_se'], 'mode should be ir or ir_se' + blocks = get_blocks(num_layers) + if mode == 'ir': + unit_module = bottleneck_IR + elif mode == 'ir_se': + unit_module = bottleneck_IR_SE + self.input_layer = Sequential(Conv2d(3, 64, (3, 3), 1, 1 ,bias=False), + BatchNorm2d(64), + PReLU(64)) + self.output_layer = Sequential(BatchNorm2d(512), + Dropout(drop_ratio), + Flatten(), + Linear(512 * 7 * 7, 512), + BatchNorm1d(512)) + modules = [] + for block in blocks: + for bottleneck in block: + modules.append( + unit_module(bottleneck.in_channel, + bottleneck.depth, + bottleneck.stride)) + self.body = Sequential(*modules) + + def forward(self,x): + x = self.input_layer(x) + x = self.body(x) + x = self.output_layer(x) + return l2_norm(x) + +################################## MobileFaceNet ############################################################# + +class Conv_block(Module): + def __init__(self, in_c, out_c, kernel=(1, 1), stride=(1, 1), padding=(0, 0), groups=1): + super(Conv_block, self).__init__() + self.conv = Conv2d(in_c, out_channels=out_c, kernel_size=kernel, groups=groups, stride=stride, padding=padding, bias=False) + self.bn = BatchNorm2d(out_c) + self.prelu = PReLU(out_c) + def forward(self, x): + x = self.conv(x) + x = self.bn(x) + x = self.prelu(x) + return x + +class Linear_block(Module): + def __init__(self, in_c, out_c, kernel=(1, 1), stride=(1, 1), padding=(0, 0), groups=1): + super(Linear_block, self).__init__() + self.conv = Conv2d(in_c, out_channels=out_c, kernel_size=kernel, groups=groups, stride=stride, padding=padding, bias=False) + self.bn = BatchNorm2d(out_c) + def forward(self, x): + x = self.conv(x) + x = self.bn(x) + return x + +class Depth_Wise(Module): + def __init__(self, in_c, out_c, residual = False, kernel=(3, 3), stride=(2, 2), padding=(1, 1), groups=1): + super(Depth_Wise, self).__init__() + self.conv = Conv_block(in_c, out_c=groups, kernel=(1, 1), padding=(0, 0), stride=(1, 1)) + self.conv_dw = Conv_block(groups, groups, groups=groups, kernel=kernel, padding=padding, stride=stride) + self.project = Linear_block(groups, out_c, kernel=(1, 1), padding=(0, 0), stride=(1, 1)) + self.residual = residual + def forward(self, x): + if self.residual: + short_cut = x + x = self.conv(x) + x = self.conv_dw(x) + x = self.project(x) + if self.residual: + output = short_cut + x + else: + output = x + return output + +class Residual(Module): + def __init__(self, c, num_block, groups, kernel=(3, 3), stride=(1, 1), padding=(1, 1)): + super(Residual, self).__init__() + modules = [] + for _ in range(num_block): + modules.append(Depth_Wise(c, c, residual=True, kernel=kernel, padding=padding, stride=stride, groups=groups)) + self.model = Sequential(*modules) + def forward(self, x): + return self.model(x) + +class MobileFaceNet(Module): + def __init__(self, embedding_size): + super(MobileFaceNet, self).__init__() + self.conv1 = Conv_block(3, 64, kernel=(3, 3), stride=(2, 2), padding=(1, 1)) + self.conv2_dw = Conv_block(64, 64, kernel=(3, 3), stride=(1, 1), padding=(1, 1), groups=64) + self.conv_23 = Depth_Wise(64, 64, kernel=(3, 3), stride=(2, 2), padding=(1, 1), groups=128) + self.conv_3 = Residual(64, num_block=4, groups=128, kernel=(3, 3), stride=(1, 1), padding=(1, 1)) + self.conv_34 = Depth_Wise(64, 128, kernel=(3, 3), stride=(2, 2), padding=(1, 1), groups=256) + self.conv_4 = Residual(128, num_block=6, groups=256, kernel=(3, 3), stride=(1, 1), padding=(1, 1)) + self.conv_45 = Depth_Wise(128, 128, kernel=(3, 3), stride=(2, 2), padding=(1, 1), groups=512) + self.conv_5 = Residual(128, num_block=2, groups=256, kernel=(3, 3), stride=(1, 1), padding=(1, 1)) + self.conv_6_sep = Conv_block(128, 512, kernel=(1, 1), stride=(1, 1), padding=(0, 0)) + self.conv_6_dw = Linear_block(512, 512, groups=512, kernel=(7,7), stride=(1, 1), padding=(0, 0)) + self.conv_6_flatten = Flatten() + self.linear = Linear(512, embedding_size, bias=False) + self.bn = BatchNorm1d(embedding_size) + + def forward(self, x): + out = self.conv1(x) + + out = self.conv2_dw(out) + + out = self.conv_23(out) + + out = self.conv_3(out) + + out = self.conv_34(out) + + out = self.conv_4(out) + + out = self.conv_45(out) + + out = self.conv_5(out) + + out = self.conv_6_sep(out) + + out = self.conv_6_dw(out) + + out = self.conv_6_flatten(out) + + out = self.linear(out) + + out = self.bn(out) + return l2_norm(out) + +################################## Arcface head ############################################################# + +class Arcface(Module): + # implementation of additive margin softmax loss in https://arxiv.org/abs/1801.05599 + def __init__(self, embedding_size=512, classnum=51332, s=64., m=0.5): + super(Arcface, self).__init__() + self.classnum = classnum + self.kernel = Parameter(torch.Tensor(embedding_size,classnum)) + # initial kernel + self.kernel.data.uniform_(-1, 1).renorm_(2,1,1e-5).mul_(1e5) + self.m = m # the margin value, default is 0.5 + self.s = s # scalar value default is 64, see normface https://arxiv.org/abs/1704.06369 + self.cos_m = math.cos(m) + self.sin_m = math.sin(m) + self.mm = self.sin_m * m # issue 1 + self.threshold = math.cos(math.pi - m) + def forward(self, embbedings, label): + # weights norm + nB = len(embbedings) + kernel_norm = l2_norm(self.kernel,axis=0) + # cos(theta+m) + cos_theta = torch.mm(embbedings,kernel_norm) +# output = torch.mm(embbedings,kernel_norm) + cos_theta = cos_theta.clamp(-1,1) # for numerical stability + cos_theta_2 = torch.pow(cos_theta, 2) + sin_theta_2 = 1 - cos_theta_2 + sin_theta = torch.sqrt(sin_theta_2) + cos_theta_m = (cos_theta * self.cos_m - sin_theta * self.sin_m) + # this condition controls the theta+m should in range [0, pi] + # 0<=theta+m<=pi + # -m<=theta<=pi-m + cond_v = cos_theta - self.threshold + cond_mask = cond_v <= 0 + keep_val = (cos_theta - self.mm) # when theta not in [0,pi], use cosface instead + cos_theta_m[cond_mask] = keep_val[cond_mask] + output = cos_theta * 1.0 # a little bit hacky way to prevent in_place operation on cos_theta + idx_ = torch.arange(0, nB, dtype=torch.long) + output[idx_, label] = cos_theta_m[idx_, label] + output *= self.s # scale up in order to make softmax work, first introduced in normface + return output + +################################## Cosface head ############################################################# + +class Am_softmax(Module): + # implementation of additive margin softmax loss in https://arxiv.org/abs/1801.05599 + def __init__(self,embedding_size=512,classnum=51332): + super(Am_softmax, self).__init__() + self.classnum = classnum + self.kernel = Parameter(torch.Tensor(embedding_size,classnum)) + # initial kernel + self.kernel.data.uniform_(-1, 1).renorm_(2,1,1e-5).mul_(1e5) + self.m = 0.35 # additive margin recommended by the paper + self.s = 30. # see normface https://arxiv.org/abs/1704.06369 + def forward(self,embbedings,label): + kernel_norm = l2_norm(self.kernel,axis=0) + cos_theta = torch.mm(embbedings,kernel_norm) + cos_theta = cos_theta.clamp(-1,1) # for numerical stability + phi = cos_theta - self.m + label = label.view(-1,1) #size=(B,1) + index = cos_theta.data * 0.0 #size=(B,Classnum) + index.scatter_(1,label.data.view(-1,1),1) + index = index.byte() + output = cos_theta * 1.0 + output[index] = phi[index] #only change the correct predicted output + output *= self.s # scale up in order to make softmax work, first introduced in normface + return output diff --git a/components/insight_face/utils.py b/components/insight_face/utils.py new file mode 100644 index 0000000..add8601 --- /dev/null +++ b/components/insight_face/utils.py @@ -0,0 +1,147 @@ +from datetime import datetime +from PIL import Image +import numpy as np +import io +from torchvision import transforms as trans + +import torch +from insight_face.model import l2_norm +import pdb +import cv2 + +def separate_bn_paras(modules): + if not isinstance(modules, list): + modules = [*modules.modules()] + paras_only_bn = [] + paras_wo_bn = [] + for layer in modules: + if 'model' in str(layer.__class__): + continue + if 'container' in str(layer.__class__): + continue + else: + if 'batchnorm' in str(layer.__class__): + paras_only_bn.extend([*layer.parameters()]) + else: + paras_wo_bn.extend([*layer.parameters()]) + return paras_only_bn, paras_wo_bn + +def prepare_facebank(path_images,facebank_path, model, mtcnn, device , tta = True): + # + test_transform_ = trans.Compose([ + trans.ToTensor(), + trans.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5]) + ]) + # + model.eval() + embeddings = [] + names = ['Unknown'] + idx = 0 + for path in path_images.iterdir(): + if path.is_file(): + continue + else: + idx += 1 + print() + embs = [] + for file in path.iterdir(): + if not file.is_file(): + continue + else: + try: + img = Image.open(file) + print(" {}) {}".format(idx+1,file)) + except: + continue + if img.size != (112, 112): + try: + img = mtcnn.align(img) + except: + continue + with torch.no_grad(): + if tta: + mirror = trans.functional.hflip(img) + emb = model(test_transform_(img).to(device).unsqueeze(0)) + emb_mirror = model(test_transform_(mirror).to(device).unsqueeze(0)) + embs.append(l2_norm(emb + emb_mirror)) + else: + embs.append(model(test_transform_(img).to(device).unsqueeze(0))) + if len(embs) == 0: + continue + embedding = torch.cat(embs).mean(0,keepdim=True) + embeddings.append(embedding) + names.append(path.name) + embeddings = torch.cat(embeddings) + names = np.array(names) + torch.save(embeddings, facebank_path+'/facebank.pth') + np.save(facebank_path + '/names', names) + return embeddings, names + +def load_facebank(facebank_path): + embeddings = torch.load(facebank_path + '/facebank.pth') + names = np.load(facebank_path + '/names.npy') + return embeddings, names + +def de_preprocess(tensor): + return tensor*0.5 + 0.5 + +hflip = trans.Compose([ + de_preprocess, + trans.ToPILImage(), + trans.functional.hflip, + trans.ToTensor(), + trans.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5]) + ]) + +def hflip_batch(imgs_tensor): + hfliped_imgs = torch.empty_like(imgs_tensor) + for i, img_ten in enumerate(imgs_tensor): + hfliped_imgs[i] = hflip(img_ten) + return hfliped_imgs + +def draw_box_name(bbox,name,frame): + frame = cv2.rectangle(frame,(bbox[0],bbox[1]),(bbox[2],bbox[3]),(0,0,255),6) + frame = cv2.putText(frame, + name, + (bbox[0],bbox[1]), + cv2.FONT_HERSHEY_SIMPLEX, + 2, + (0,255,0), + 3, + cv2.LINE_AA) + return frame + + +def infer(model, device, faces, target_embs, threshold = 1.2 ,tta=False): + ''' + faces : list of PIL Image + target_embs : [n, 512] computed embeddings of faces in facebank + names : recorded names of faces in facebank + tta : test time augmentation (hfilp, that's all) + ''' + test_transform = trans.Compose([ + trans.ToTensor(), + trans.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5]) + ]) + + # + embs = [] + + for img in faces: + if tta: + mirror = trans.functional.hflip(img) + emb = model(test_transform(img).to(device).unsqueeze(0)) + emb_mirror = model(test_transform(mirror).to(device).unsqueeze(0)) + embs.append(l2_norm(emb + emb_mirror)) + else: + with torch.no_grad(): + embs.append(model(test_transform(img).to(device).unsqueeze(0))) + source_embs = torch.cat(embs) + + diff = source_embs.unsqueeze(-1) - target_embs.transpose(1,0).unsqueeze(0) + dist = torch.sum(torch.pow(diff, 2), dim=1) + + minimum, min_idx = torch.min(dist, dim=1) + min_idx[minimum > threshold] = -1 # if no match, set idx to -1 + + return min_idx, minimum -- GitLab