提交 894bcef1 编写于 作者: HypoX64's avatar HypoX64

preview

上级 708d9c62
import sys
import os
import random
import numpy as np
import cv2
import torch
from models import runmodel,loadmodel
from util import mosaic,util,ffmpeg
from util import image_processing as impro
from options.addmosaic_options import AddOptions
opt = AddOptions().getparse()
#find mosaic position in image and add mosaic to this image
def add_mosaic_to_image(path):
img = cv2.imread(path)
mask =runmodel.run_unet_rectim(img,net,use_gpu = opt.use_gpu)
mask = impro.mask_threshold(mask,opt.mask_extend,opt.mask_threshold)
img = mosaic.addmosaic(img,mask,opt.mosaic_size,opt.output_size,model = opt.mosaic_mod)
return img
net = loadmodel.unet(os.path.join(opt.model_dir,opt.model_name),use_gpu = opt.use_gpu)
filepaths = util.Traversal(opt.input_dir)
for path in filepaths:
if util.is_img(path):
img = add_mosaic_to_image(path)
cv2.imwrite(os.path.join(opt.result_dir,os.path.basename(path)),img)
elif util.is_video(path):
util.clean_tempfiles()
fps = ffmpeg.get_video_infos(path)[0]
ffmpeg.video2voice(path,'./tmp/voice_tmp.mp3')
ffmpeg.video2image(path,'./tmp/video2image/output_%05d.png')
for imagepath in os.listdir('./tmp/video2image'):
imagepath = os.path.join('./tmp/video2image',imagepath)
print(imagepath)
img = add_mosaic_to_image(imagepath)
cv2.imwrite(os.path.join('./tmp/addmosaic_image',
os.path.basename(imagepath)),img)
ffmpeg.image2video( fps,
'./tmp/addmosaic_image/output_%05d.png',
'./tmp/voice_tmp.mp3',
os.path.join(opt.result_dir,os.path.splitext(os.path.basename(path))[0]+'_AddMosaic.mp4'))
import os
import random
import numpy as np
import cv2
import torch
import scipy.signal as signal
from models import runmodel,loadmodel
from util import util,ffmpeg,data
from util import image_processing as impro
from options.cleanmosaic_options import CleanOptions
opt = CleanOptions().getparse()
def get_mosaic_position(img_origin):
mask =runmodel.run_unet_rectim(img_origin,net_mosaic_pos,use_gpu = opt.use_gpu)
mask = impro.mask_threshold(mask,10,128)
x,y,size,area = impro.boundingSquare(mask,threshold=128,Ex_mul=1.5)
rat = min(img_origin.shape[:2])/128.0
x,y,size = int(rat*x),int(rat*y),int(rat*size)
return x,y,size
def replace_mosaic(img_origin,img_fake,x,y,size):
img_fake = impro.resize(img_fake,size*2)
img_origin[y-size:y+size,x-size:x+size]=img_fake
return img_origin
netG = loadmodel.pix2pix(os.path.join(opt.model_dir,opt.model_name),opt.model_type_netG,use_gpu = opt.use_gpu)
net_mosaic_pos = loadmodel.unet(os.path.join(opt.model_dir,opt.mosaic_position_model_name),use_gpu = opt.use_gpu)
filepaths = util.Traversal(opt.input_dir)
for path in filepaths:
if util.is_img(path):
print('Clean Mosaic:',path)
img_origin = cv2.imread(path)
x,y,size = get_mosaic_position(img_origin)
img_result = img_origin.copy()
if size != 0 :
img_mosaic = img_origin[y-size:y+size,x-size:x+size]
img_fake=runmodel.run_pix2pix(img_mosaic,netG,use_gpu = opt.use_gpu)
img_result = replace_mosaic(img_origin,img_fake,x,y,size)
cv2.imwrite(os.path.join(opt.result_dir,os.path.basename(path)),img_result)
elif util.is_video(path):
util.clean_tempfiles()
fps = ffmpeg.get_video_infos(path)[0]
ffmpeg.video2voice(path,'./tmp/voice_tmp.mp3')
ffmpeg.video2image(path,'./tmp/video2image/output_%05d.png')
positions = []
imagepaths=os.listdir('./tmp/video2image')
imagepaths.sort()
for imagepath in imagepaths:
imagepath=os.path.join('./tmp/video2image',imagepath)
img_origin = cv2.imread(imagepath)
x,y,size = get_mosaic_position(img_origin)
positions.append([x,y,size])
print('Find Positions:',imagepath)
positions =np.array(positions)
for i in range(3):positions[:,i] =signal.medfilt(positions[:,i],21)
for i,imagepath in enumerate(imagepaths,0):
imagepath=os.path.join('./tmp/video2image',imagepath)
x,y,size = positions[i][0],positions[i][1],positions[i][2]
img_origin = cv2.imread(imagepath)
img_result = img_origin.copy()
if size != 0:
img_mosaic = img_origin[y-size:y+size,x-size:x+size]
img_fake=runmodel.run_pix2pix(img_mosaic,netG,use_gpu = opt.use_gpu)
img_result = replace_mosaic(img_origin,img_fake,x,y,size)
cv2.imwrite(os.path.join('./tmp/replace_mosaic',os.path.basename(imagepath)),img_result)
print('Clean Mosaic:',imagepath)
ffmpeg.image2video( fps,
'./tmp/replace_mosaic/output_%05d.png',
'./tmp/voice_tmp.mp3',
os.path.join(opt.result_dir,os.path.splitext(os.path.basename(path))[0]+'_CleanMosaic.mp4'))
# DeepMosaics
*You can use it to automatically add or remove mosaics in images or videos
*I will finish this porject in a few days.
from .pix2pix_model import *
from .unet_model import UNet
import torch
from .pix2pix_model import *
from .unet_model import UNet
def pix2pix(model_path,G_model_type,use_gpu = True):
gpu_ids=[]
if use_gpu:
gpu_ids=[0]
netG = define_G(3, 3, 64, G_model_type, norm='instance', init_type='normal', gpu_ids=gpu_ids)
netG.load_state_dict(torch.load(model_path))
netG.eval()
if use_gpu:
netG.cuda()
return netG
def unet(model_path,use_gpu = True):
net = UNet(n_channels = 3, n_classes = 1)
net.load_state_dict(torch.load(model_path))
net.eval()
if use_gpu:
net.cuda()
return net
# def unet():
\ No newline at end of file
import torch
import torch.nn as nn
from torch.nn import init
import functools
from torch.autograd import Variable
from torch.optim import lr_scheduler
###############################################################################
# Functions
###############################################################################
def weights_init_normal(m):
classname = m.__class__.__name__
# print(classname)
if classname.find('Conv') != -1:
init.normal(m.weight.data, 0.0, 0.02)
elif classname.find('Linear') != -1:
init.normal(m.weight.data, 0.0, 0.02)
elif classname.find('BatchNorm2d') != -1:
init.normal(m.weight.data, 1.0, 0.02)
init.constant(m.bias.data, 0.0)
def weights_init_xavier(m):
classname = m.__class__.__name__
# print(classname)
if classname.find('Conv') != -1:
init.xavier_normal(m.weight.data, gain=0.02)
elif classname.find('Linear') != -1:
init.xavier_normal(m.weight.data, gain=0.02)
elif classname.find('BatchNorm2d') != -1:
init.normal(m.weight.data, 1.0, 0.02)
init.constant(m.bias.data, 0.0)
def weights_init_kaiming(m):
classname = m.__class__.__name__
# print(classname)
if classname.find('Conv') != -1:
init.kaiming_normal(m.weight.data, a=0, mode='fan_in')
elif classname.find('Linear') != -1:
init.kaiming_normal(m.weight.data, a=0, mode='fan_in')
elif classname.find('BatchNorm2d') != -1:
init.normal(m.weight.data, 1.0, 0.02)
init.constant(m.bias.data, 0.0)
def weights_init_orthogonal(m):
classname = m.__class__.__name__
print(classname)
if classname.find('Conv') != -1:
init.orthogonal(m.weight.data, gain=1)
elif classname.find('Linear') != -1:
init.orthogonal(m.weight.data, gain=1)
elif classname.find('BatchNorm2d') != -1:
init.normal(m.weight.data, 1.0, 0.02)
init.constant(m.bias.data, 0.0)
def init_weights(net, init_type='normal'):
print('initialization method [%s]' % init_type)
if init_type == 'normal':
net.apply(weights_init_normal)
elif init_type == 'xavier':
net.apply(weights_init_xavier)
elif init_type == 'kaiming':
net.apply(weights_init_kaiming)
elif init_type == 'orthogonal':
net.apply(weights_init_orthogonal)
else:
raise NotImplementedError('initialization method [%s] is not implemented' % init_type)
def get_norm_layer(norm_type='instance'):
if norm_type == 'batch':
norm_layer = functools.partial(nn.BatchNorm2d, affine=True)
elif norm_type == 'instance':
norm_layer = functools.partial(nn.InstanceNorm2d, affine=False)
elif norm_type == 'none':
norm_layer = None
else:
raise NotImplementedError('normalization layer [%s] is not found' % norm_type)
return norm_layer
def get_scheduler(optimizer, opt):
if opt.lr_policy == 'lambda':
def lambda_rule(epoch):
lr_l = 1.0 - max(0, epoch + 1 + opt.epoch_count - opt.niter) / float(opt.niter_decay + 1)
return lr_l
scheduler = lr_scheduler.LambdaLR(optimizer, lr_lambda=lambda_rule)
elif opt.lr_policy == 'step':
scheduler = lr_scheduler.StepLR(optimizer, step_size=opt.lr_decay_iters, gamma=0.1)
elif opt.lr_policy == 'plateau':
scheduler = lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.2, threshold=0.01, patience=5)
else:
return NotImplementedError('learning rate policy [%s] is not implemented', opt.lr_policy)
return scheduler
def define_G(input_nc, output_nc, ngf, which_model_netG, norm='batch', use_dropout=False, init_type='normal', gpu_ids=[]):
netG = None
use_gpu = len(gpu_ids) > 0
norm_layer = get_norm_layer(norm_type=norm)
if use_gpu:
assert(torch.cuda.is_available())
if which_model_netG == 'resnet_9blocks':
netG = ResnetGenerator(input_nc, output_nc, ngf, norm_layer=norm_layer, use_dropout=use_dropout, n_blocks=9, gpu_ids=gpu_ids)
elif which_model_netG == 'resnet_6blocks':
netG = ResnetGenerator(input_nc, output_nc, ngf, norm_layer=norm_layer, use_dropout=use_dropout, n_blocks=6, gpu_ids=gpu_ids)
elif which_model_netG == 'unet_128':
netG = UnetGenerator(input_nc, output_nc, 7, ngf, norm_layer=norm_layer, use_dropout=use_dropout, gpu_ids=gpu_ids)
elif which_model_netG == 'unet_256':
netG = UnetGenerator(input_nc, output_nc, 8, ngf, norm_layer=norm_layer, use_dropout=use_dropout, gpu_ids=gpu_ids)
else:
raise NotImplementedError('Generator model name [%s] is not recognized' % which_model_netG)
if len(gpu_ids) > 0:
netG.cuda(gpu_ids[0])
init_weights(netG, init_type=init_type)
return netG
def define_D(input_nc, ndf, which_model_netD,
n_layers_D=3, norm='batch', use_sigmoid=False, init_type='normal', gpu_ids=[]):
netD = None
use_gpu = len(gpu_ids) > 0
norm_layer = get_norm_layer(norm_type=norm)
if use_gpu:
assert(torch.cuda.is_available())
if which_model_netD == 'basic':
netD = NLayerDiscriminator(input_nc, ndf, n_layers=3, norm_layer=norm_layer, use_sigmoid=use_sigmoid, gpu_ids=gpu_ids)
elif which_model_netD == 'n_layers':
netD = NLayerDiscriminator(input_nc, ndf, n_layers_D, norm_layer=norm_layer, use_sigmoid=use_sigmoid, gpu_ids=gpu_ids)
elif which_model_netD == 'pixel':
netD = PixelDiscriminator(input_nc, ndf, norm_layer=norm_layer, use_sigmoid=use_sigmoid, gpu_ids=gpu_ids)
else:
raise NotImplementedError('Discriminator model name [%s] is not recognized' %
which_model_netD)
if use_gpu:
netD.cuda(gpu_ids[0])
init_weights(netD, init_type=init_type)
return netD
def print_network(net):
num_params = 0
for param in net.parameters():
num_params += param.numel()
print(net)
print('Total number of parameters: %d' % num_params)
##############################################################################
# Classes
##############################################################################
# Defines the GAN loss which uses either LSGAN or the regular GAN.
# When LSGAN is used, it is basically same as MSELoss,
# but it abstracts away the need to create the target label tensor
# that has the same size as the input
class GANLoss(nn.Module):
def __init__(self, use_lsgan=True, target_real_label=1.0, target_fake_label=0.0,
tensor=torch.FloatTensor):
super(GANLoss, self).__init__()
self.real_label = target_real_label
self.fake_label = target_fake_label
self.real_label_var = None
self.fake_label_var = None
self.Tensor = tensor
if use_lsgan:
self.loss = nn.MSELoss()
else:
self.loss = nn.BCELoss()
def get_target_tensor(self, input, target_is_real):
target_tensor = None
if target_is_real:
create_label = ((self.real_label_var is None) or
(self.real_label_var.numel() != input.numel()))
if create_label:
real_tensor = self.Tensor(input.size()).fill_(self.real_label)
self.real_label_var = Variable(real_tensor, requires_grad=False)
target_tensor = self.real_label_var
else:
create_label = ((self.fake_label_var is None) or
(self.fake_label_var.numel() != input.numel()))
if create_label:
fake_tensor = self.Tensor(input.size()).fill_(self.fake_label)
self.fake_label_var = Variable(fake_tensor, requires_grad=False)
target_tensor = self.fake_label_var
return target_tensor
def __call__(self, input, target_is_real):
target_tensor = self.get_target_tensor(input, target_is_real)
return self.loss(input, target_tensor)
# Defines the generator that consists of Resnet blocks between a few
# downsampling/upsampling operations.
# Code and idea originally from Justin Johnson's architecture.
# https://github.com/jcjohnson/fast-neural-style/
class ResnetGenerator(nn.Module):
def __init__(self, input_nc, output_nc, ngf=64, norm_layer=nn.BatchNorm2d, use_dropout=False, n_blocks=6, gpu_ids=[], padding_type='reflect'):
assert(n_blocks >= 0)
super(ResnetGenerator, self).__init__()
self.input_nc = input_nc
self.output_nc = output_nc
self.ngf = ngf
self.gpu_ids = gpu_ids
if type(norm_layer) == functools.partial:
use_bias = norm_layer.func == nn.InstanceNorm2d
else:
use_bias = norm_layer == nn.InstanceNorm2d
model = [nn.ReflectionPad2d(3),
nn.Conv2d(input_nc, ngf, kernel_size=7, padding=0,
bias=use_bias),
norm_layer(ngf),
nn.ReLU(True)]
n_downsampling = 2
for i in range(n_downsampling):
mult = 2**i
model += [nn.Conv2d(ngf * mult, ngf * mult * 2, kernel_size=3,
stride=2, padding=1, bias=use_bias),
norm_layer(ngf * mult * 2),
nn.ReLU(True)]
mult = 2**n_downsampling
for i in range(n_blocks):
model += [ResnetBlock(ngf * mult, padding_type=padding_type, norm_layer=norm_layer, use_dropout=use_dropout, use_bias=use_bias)]
for i in range(n_downsampling):
mult = 2**(n_downsampling - i)
model += [nn.ConvTranspose2d(ngf * mult, int(ngf * mult / 2),
kernel_size=3, stride=2,
padding=1, output_padding=1,
bias=use_bias),
norm_layer(int(ngf * mult / 2)),
nn.ReLU(True)]
model += [nn.ReflectionPad2d(3)]
model += [nn.Conv2d(ngf, output_nc, kernel_size=7, padding=0)]
model += [nn.Tanh()]
self.model = nn.Sequential(*model)
def forward(self, input):
if self.gpu_ids and isinstance(input.data, torch.cuda.FloatTensor):
return nn.parallel.data_parallel(self.model, input, self.gpu_ids)
else:
return self.model(input)
# Define a resnet block
class ResnetBlock(nn.Module):
def __init__(self, dim, padding_type, norm_layer, use_dropout, use_bias):
super(ResnetBlock, self).__init__()
self.conv_block = self.build_conv_block(dim, padding_type, norm_layer, use_dropout, use_bias)
def build_conv_block(self, dim, padding_type, norm_layer, use_dropout, use_bias):
conv_block = []
p = 0
if padding_type == 'reflect':
conv_block += [nn.ReflectionPad2d(1)]
elif padding_type == 'replicate':
conv_block += [nn.ReplicationPad2d(1)]
elif padding_type == 'zero':
p = 1
else:
raise NotImplementedError('padding [%s] is not implemented' % padding_type)
conv_block += [nn.Conv2d(dim, dim, kernel_size=3, padding=p, bias=use_bias),
norm_layer(dim),
nn.ReLU(True)]
if use_dropout:
conv_block += [nn.Dropout(0.5)]
p = 0
if padding_type == 'reflect':
conv_block += [nn.ReflectionPad2d(1)]
elif padding_type == 'replicate':
conv_block += [nn.ReplicationPad2d(1)]
elif padding_type == 'zero':
p = 1
else:
raise NotImplementedError('padding [%s] is not implemented' % padding_type)
conv_block += [nn.Conv2d(dim, dim, kernel_size=3, padding=p, bias=use_bias),
norm_layer(dim)]
return nn.Sequential(*conv_block)
def forward(self, x):
out = x + self.conv_block(x)
return out
# Defines the Unet generator.
# |num_downs|: number of downsamplings in UNet. For example,
# if |num_downs| == 7, image of size 128x128 will become of size 1x1
# at the bottleneck
class UnetGenerator(nn.Module):
def __init__(self, input_nc, output_nc, num_downs, ngf=64,
norm_layer=nn.BatchNorm2d, use_dropout=False, gpu_ids=[]):
super(UnetGenerator, self).__init__()
self.gpu_ids = gpu_ids
# construct unet structure
unet_block = UnetSkipConnectionBlock(ngf * 8, ngf * 8, input_nc=None, submodule=None, norm_layer=norm_layer, innermost=True)
for i in range(num_downs - 5):
unet_block = UnetSkipConnectionBlock(ngf * 8, ngf * 8, input_nc=None, submodule=unet_block, norm_layer=norm_layer, use_dropout=use_dropout)
unet_block = UnetSkipConnectionBlock(ngf * 4, ngf * 8, input_nc=None, submodule=unet_block, norm_layer=norm_layer)
unet_block = UnetSkipConnectionBlock(ngf * 2, ngf * 4, input_nc=None, submodule=unet_block, norm_layer=norm_layer)
unet_block = UnetSkipConnectionBlock(ngf, ngf * 2, input_nc=None, submodule=unet_block, norm_layer=norm_layer)
unet_block = UnetSkipConnectionBlock(output_nc, ngf, input_nc=input_nc, submodule=unet_block, outermost=True, norm_layer=norm_layer)
self.model = unet_block
def forward(self, input):
if self.gpu_ids and isinstance(input.data, torch.cuda.FloatTensor):
return nn.parallel.data_parallel(self.model, input, self.gpu_ids)
else:
return self.model(input)
# Defines the submodule with skip connection.
# X -------------------identity---------------------- X
# |-- downsampling -- |submodule| -- upsampling --|
class UnetSkipConnectionBlock(nn.Module):
def __init__(self, outer_nc, inner_nc, input_nc=None,
submodule=None, outermost=False, innermost=False, norm_layer=nn.BatchNorm2d, use_dropout=False):
super(UnetSkipConnectionBlock, self).__init__()
self.outermost = outermost
if type(norm_layer) == functools.partial:
use_bias = norm_layer.func == nn.InstanceNorm2d
else:
use_bias = norm_layer == nn.InstanceNorm2d
if input_nc is None:
input_nc = outer_nc
downconv = nn.Conv2d(input_nc, inner_nc, kernel_size=4,
stride=2, padding=1, bias=use_bias)
downrelu = nn.LeakyReLU(0.2, True)
downnorm = norm_layer(inner_nc)
uprelu = nn.ReLU(True)
upnorm = norm_layer(outer_nc)
if outermost:
upconv = nn.ConvTranspose2d(inner_nc * 2, outer_nc,
kernel_size=4, stride=2,
padding=1)
down = [downconv]
up = [uprelu, upconv, nn.Tanh()]
model = down + [submodule] + up
elif innermost:
upconv = nn.ConvTranspose2d(inner_nc, outer_nc,
kernel_size=4, stride=2,
padding=1, bias=use_bias)
down = [downrelu, downconv]
up = [uprelu, upconv, upnorm]
model = down + up
else:
upconv = nn.ConvTranspose2d(inner_nc * 2, outer_nc,
kernel_size=4, stride=2,
padding=1, bias=use_bias)
down = [downrelu, downconv, downnorm]
up = [uprelu, upconv, upnorm]
if use_dropout:
model = down + [submodule] + up + [nn.Dropout(0.5)]
else:
model = down + [submodule] + up
self.model = nn.Sequential(*model)
def forward(self, x):
if self.outermost:
return self.model(x)
else:
return torch.cat([x, self.model(x)], 1)
# Defines the PatchGAN discriminator with the specified arguments.
class NLayerDiscriminator(nn.Module):
def __init__(self, input_nc, ndf=64, n_layers=3, norm_layer=nn.BatchNorm2d, use_sigmoid=False, gpu_ids=[]):
super(NLayerDiscriminator, self).__init__()
self.gpu_ids = gpu_ids
if type(norm_layer) == functools.partial:
use_bias = norm_layer.func == nn.InstanceNorm2d
else:
use_bias = norm_layer == nn.InstanceNorm2d
kw = 4
padw = 1
sequence = [
nn.Conv2d(input_nc, ndf, kernel_size=kw, stride=2, padding=padw),
nn.LeakyReLU(0.2, True)
]
nf_mult = 1
nf_mult_prev = 1
for n in range(1, n_layers):
nf_mult_prev = nf_mult
nf_mult = min(2**n, 8)
sequence += [
nn.Conv2d(ndf * nf_mult_prev, ndf * nf_mult,
kernel_size=kw, stride=2, padding=padw, bias=use_bias),
norm_layer(ndf * nf_mult),
nn.LeakyReLU(0.2, True)
]
nf_mult_prev = nf_mult
nf_mult = min(2**n_layers, 8)
sequence += [
nn.Conv2d(ndf * nf_mult_prev, ndf * nf_mult,
kernel_size=kw, stride=1, padding=padw, bias=use_bias),
norm_layer(ndf * nf_mult),
nn.LeakyReLU(0.2, True)
]
sequence += [nn.Conv2d(ndf * nf_mult, 1, kernel_size=kw, stride=1, padding=padw)]
if use_sigmoid:
sequence += [nn.Sigmoid()]
self.model = nn.Sequential(*sequence)
def forward(self, input):
if len(self.gpu_ids) and isinstance(input.data, torch.cuda.FloatTensor):
return nn.parallel.data_parallel(self.model, input, self.gpu_ids)
else:
return self.model(input)
class PixelDiscriminator(nn.Module):
def __init__(self, input_nc, ndf=64, norm_layer=nn.BatchNorm2d, use_sigmoid=False, gpu_ids=[]):
super(PixelDiscriminator, self).__init__()
self.gpu_ids = gpu_ids
if type(norm_layer) == functools.partial:
use_bias = norm_layer.func == nn.InstanceNorm2d
else:
use_bias = norm_layer == nn.InstanceNorm2d
self.net = [
nn.Conv2d(input_nc, ndf, kernel_size=1, stride=1, padding=0),
nn.LeakyReLU(0.2, True),
nn.Conv2d(ndf, ndf * 2, kernel_size=1, stride=1, padding=0, bias=use_bias),
norm_layer(ndf * 2),
nn.LeakyReLU(0.2, True),
nn.Conv2d(ndf * 2, 1, kernel_size=1, stride=1, padding=0, bias=use_bias)]
if use_sigmoid:
self.net.append(nn.Sigmoid())
self.net = nn.Sequential(*self.net)
def forward(self, input):
if len(self.gpu_ids) and isinstance(input.data, torch.cuda.FloatTensor):
return nn.parallel.data_parallel(self.net, input, self.gpu_ids)
else:
return self.net(input)
import sys
sys.path.append("..")
import util.image_processing as impro
from util import data
import torch
def run_unet(img,net,size = 128,use_gpu = True):
img=impro.image2folat(img,3)
img=img.reshape(1,3,size,size)
img = torch.from_numpy(img)
if use_gpu:
img=img.cuda()
pred = net(img)
pred = (pred.cpu().detach().numpy()*255)
pred = pred.reshape(size,size).astype('uint8')
return pred
def run_unet_rectim(img,net,size = 128,use_gpu = True):
img = impro.resize(img,size)
img1,img2 = impro.spiltimage(img)
mask1 = run_unet(img1,net,size = 128,use_gpu = use_gpu)
mask2 = run_unet(img2,net,size = 128,use_gpu = use_gpu)
mask = impro.mergeimage(mask1,mask2,img)
return mask
def run_pix2pix(img,net,size = 128,use_gpu = True):
img = impro.resize(img,size)
img = data.im2tensor(img,use_gpu=use_gpu)
img_fake = net(img)
img_fake = data.tensor2im(img_fake)
return img_fake
# full assembly of the sub-parts to form the complete net
import torch.nn.functional as F
from .unet_parts import *
class UNet(nn.Module):
def __init__(self, n_channels, n_classes):
super(UNet, self).__init__()
self.inc = inconv(n_channels, 64)
self.down1 = down(64, 128)
self.down2 = down(128, 256)
self.down3 = down(256, 512)
self.down4 = down(512, 512)
self.up1 = up(1024, 256)
self.up2 = up(512, 128)
self.up3 = up(256, 64)
self.up4 = up(128, 64)
self.outc = outconv(64, n_classes)
def forward(self, x):
x1 = self.inc(x)
x2 = self.down1(x1)
x3 = self.down2(x2)
x4 = self.down3(x3)
x5 = self.down4(x4)
x = self.up1(x5, x4)
x = self.up2(x, x3)
x = self.up3(x, x2)
x = self.up4(x, x1)
x = self.outc(x)
return F.sigmoid(x)
# sub-parts of the U-Net model
import torch
import torch.nn as nn
import torch.nn.functional as F
class double_conv(nn.Module):
'''(conv => BN => ReLU) * 2'''
def __init__(self, in_ch, out_ch):
super(double_conv, self).__init__()
self.conv = nn.Sequential(
nn.Conv2d(in_ch, out_ch, 3, padding=1),
nn.BatchNorm2d(out_ch),
nn.ReLU(inplace=True),
nn.Conv2d(out_ch, out_ch, 3, padding=1),
nn.BatchNorm2d(out_ch),
nn.ReLU(inplace=True)
)
def forward(self, x):
x = self.conv(x)
return x
class inconv(nn.Module):
def __init__(self, in_ch, out_ch):
super(inconv, self).__init__()
self.conv = double_conv(in_ch, out_ch)
def forward(self, x):
x = self.conv(x)
return x
class down(nn.Module):
def __init__(self, in_ch, out_ch):
super(down, self).__init__()
self.mpconv = nn.Sequential(
nn.MaxPool2d(2),
double_conv(in_ch, out_ch)
)
def forward(self, x):
x = self.mpconv(x)
return x
class up(nn.Module):
def __init__(self, in_ch, out_ch, bilinear=True):
super(up, self).__init__()
# would be a nice idea if the upsampling could be learned too,
# but my machine do not have enough memory to handle all those weights
if bilinear:
self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
else:
self.up = nn.ConvTranspose2d(in_ch//2, in_ch//2, 2, stride=2)
self.conv = double_conv(in_ch, out_ch)
def forward(self, x1, x2):
x1 = self.up(x1)
# input is CHW
diffY = x2.size()[2] - x1.size()[2]
diffX = x2.size()[3] - x1.size()[3]
x1 = F.pad(x1, (diffX // 2, diffX - diffX//2,
diffY // 2, diffY - diffY//2))
# for padding issues, see
# https://github.com/HaiyongJiang/U-Net-Pytorch-Unstructured-Buggy/commit/0e854509c2cea854e247a9c615f175f76fbb2e3a
# https://github.com/xiaopeng-liao/Pytorch-UNet/commit/8ebac70e633bac59fc22bb5195e513d5832fb3bd
x = torch.cat([x2, x1], dim=1)
x = self.conv(x)
return x
class outconv(nn.Module):
def __init__(self, in_ch, out_ch):
super(outconv, self).__init__()
self.conv = nn.Conv2d(in_ch, out_ch, 1)
def forward(self, x):
x = self.conv(x)
return x
import argparse
import os
class AddOptions():
def __init__(self):
self.parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
self.initialized = False
def initialize(self):
self.parser.add_argument('--use_gpu', action='store_true', help='if true, use gpu')
self.parser.add_argument('--input_dir', type=str, default='./video_or_image',help='put your videos or images here')
self.parser.add_argument('--result_dir', type=str, default='./result',help='result will be saved here')
self.parser.add_argument('--model_dir', type=str, default='./pretrained_models/AddMosaic',
help='put pre_train model here')
self.parser.add_argument('--model_name', type=str, default='hands_128.pth',help='name of model use to Add mosaic')
self.parser.add_argument('--mosaic_mod', type=str, default='squa_avg',help='type of mosaic -> squa_avg | squa_random | squa_avg_circle_edge | rect_avg')
self.parser.add_argument('--mosaic_size', type=int, default=20,help='mosaic size')
self.parser.add_argument('--mask_extend', type=int, default=20,help='more mosaic')
self.parser.add_argument('--mask_threshold', type=int, default=64,help='threshold of recognize mosaic position 0~255')
self.parser.add_argument('--output_size', type=int, default=0,help='size of output file,if 0 -> origin')
self.initialized = True
def getparse(self):
if not self.initialized:
self.initialize()
self.opt = self.parser.parse_args()
return self.opt
\ No newline at end of file
import argparse
import os
class CleanOptions():
def __init__(self):
self.parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
self.initialized = False
def initialize(self):
self.parser.add_argument('--use_gpu', action='store_true', help='if true, use gpu')
self.parser.add_argument('--input_dir', type=str, default='./video_or_image',help='put your videos or images here')
self.parser.add_argument('--result_dir', type=str, default='./result',help='result will be saved here')
self.parser.add_argument('--model_dir', type=str, default='./pretrained_models/CleanMosaic',
help='put pre_train model here, including 1.model use to find mosaic position 2.model use to clean mosaic')
self.parser.add_argument('--model_name', type=str, default='hands_unet_128.pth',help='name of model use to clean mosaic')
self.parser.add_argument('--model_type_netG', type=str, default='unet_128',help='select model to use for netG')
self.parser.add_argument('--mosaic_position_model_name', type=str, default='mosaic_position.pkl',
help='name of model use to find mosaic position')
# self.parser.add_argument('--zoom_multiple', type=float, default=1.0,help='zoom video')
self.initialized = True
def getparse(self):
if not self.initialized:
self.initialize()
self.opt = self.parser.parse_args()
return self.opt
\ No newline at end of file
import numpy as np
import torch
import torchvision.transforms as transforms
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(mean = (0.5, 0.5, 0.5), std = (0.5, 0.5, 0.5))
]
)
def tensor2im(image_tensor, imtype=np.uint8, rgb2bgr = True):
image_tensor =image_tensor.data
image_numpy = image_tensor[0].cpu().float().numpy()
if image_numpy.shape[0] == 1:
image_numpy = np.tile(image_numpy, (3, 1, 1))
image_numpy = (np.transpose(image_numpy, (1, 2, 0)) + 1) / 2.0 * 255.0
if rgb2bgr:
image_numpy = image_numpy[...,::-1]-np.zeros_like(image_numpy)
return image_numpy.astype(imtype)
def im2tensor(image_numpy, imtype=np.uint8, bgr2rgb = True, reshape = True, use_gpu = True):
if bgr2rgb:
image_numpy = image_numpy[...,::-1]-np.zeros_like(image_numpy)
image_tensor = transform(image_numpy)
if reshape:
image_tensor=image_tensor.reshape(1,3,128,128)
if use_gpu:
image_tensor = image_tensor.cuda()
return image_tensor
\ No newline at end of file
import os,json
def video2image(videopath,imagepath):
os.system('ffmpeg -i '+videopath+' -f image2 '+imagepath)
def video2voice(videopath,voicepath):
os.system('ffmpeg -i '+videopath+' -f mp3 '+voicepath)
def image2video(fps,imagepath,voicepath,videopath):
os.system('ffmpeg -y -r '+str(fps)+' -i '+imagepath+' -vcodec libx264 '+'./tmp/video_tmp.mp4')
os.system('ffmpeg -i ./tmp/video_tmp.mp4 -i '+voicepath+' -vcodec copy -acodec copy '+videopath)
def get_video_infos(videopath):
cmd_str = 'ffprobe -v quiet -print_format json -show_format -show_streams -i "' + videopath + '"'
out_string = os.popen(cmd_str).read()
infos = json.loads(out_string)
fps = eval(infos['streams'][0]['avg_frame_rate'])
endtime = float(infos['streams'][0]['duration'])
width = int(infos['streams'][0]['width'])
height = int(infos['streams'][0]['height'])
return fps,endtime,width,height
import cv2
import numpy as np
def resize(img,size):
h, w = img.shape[:2]
if min(h, w) ==size:
return img
if w >= h:
res = cv2.resize(img,(int(size*w/h), size))
else:
res = cv2.resize(img,(size, int(size*h/w)))
return res
def channel_one2three(img):
#zeros = np.zeros(img.shape[:2], dtype = "uint8")
ret,thresh = cv2.threshold(img,127,255,cv2.THRESH_BINARY)
res = cv2.merge([thresh, thresh, thresh])
return res
def makedataset(target_image,orgin_image):
target_image = resize(target_image,256)
orgin_image = resize(orgin_image,256)
img = np.zeros((256,512,3), dtype = "uint8")
w = orgin_image.shape[1]
img[0:256,0:256] = target_image[0:256,int(w/2-256/2):int(w/2+256/2)]
img[0:256,256:512] = orgin_image[0:256,int(w/2-256/2):int(w/2+256/2)]
return img
def image2folat(img,ch):
size=img.shape[0]
if ch == 1:
img = (img[:,:,0].reshape(1,size,size)/255.0).astype(np.float32)
else:
img = (img.transpose((2, 0, 1))/255.0).astype(np.float32)
return img
def spiltimage(img):
h, w = img.shape[:2]
size = min(h,w)
if w >= h:
img1 = img[:,0:size]
img2 = img[:,w-size:w]
else:
img1 = img[0:size,:]
img2 = img[h-size:h,:]
return img1,img2
def mergeimage(img1,img2,orgin_image):
h, w = orgin_image.shape[:2]
new_img1 = np.zeros((h,w), dtype = "uint8")
new_img2 = np.zeros((h,w), dtype = "uint8")
size = min(h,w)
if w >= h:
new_img1[:,0:size]=img1
new_img2[:,w-size:w]=img2
else:
new_img1[0:size,:]=img1
new_img2[h-size:h,:]=img2
result_img = cv2.add(new_img1,new_img2)
return result_img
def boundingSquare(mask,threshold,Ex_mul):
# thresh = mask_threshold(mask,10,threshold)
area = mask_area(mask)
if area == 0 :
return 0,0,0,0
x,y,w,h = cv2.boundingRect(mask)
center = np.array([int(x+w/2),int(y+h/2)])
size = max(w,h)
point0=np.array([x,y])
point1=np.array([x+size,y+size])
h, w = mask.shape[:2]
if size*Ex_mul > min(h, w):
size = min(h, w)
halfsize = int(min(h, w)/2)
else:
size = Ex_mul*size
halfsize = int(size/2)
size = halfsize*2
point0 = center - halfsize
point1 = center + halfsize
if point0[0]<0:
point0[0]=0
point1[0]=size
if point0[1]<0:
point0[1]=0
point1[1]=size
if point1[0]>w:
point1[0]=w
point0[0]=w-size
if point1[1]>h:
point1[1]=h
point0[1]=h-size
center = ((point0+point1)/2).astype('int')
return center[0],center[1],halfsize,area
def mask_threshold(mask,blur,threshold):
mask = cv2.threshold(mask,threshold,255,cv2.THRESH_BINARY)[1]
mask = cv2.blur(mask, (blur, blur))
mask = cv2.threshold(mask,threshold/3,255,cv2.THRESH_BINARY)[1]
return mask
def mask_area(mask):
contours= cv2.findContours(mask,cv2.RETR_TREE,cv2.CHAIN_APPROX_SIMPLE)[1]
try:
area = cv2.contourArea(contours[0])
except:
area = 0
return area
\ No newline at end of file
import cv2
import numpy as np
import os
import random
from .image_processing import resize,channel_one2three
def addmosaic(img,mask,n,out_size = 0,model = 'squa_avg'):
if out_size:
img = resize(img,out_size)
h, w = img.shape[:2]
mask = cv2.resize(mask,(w,h))
img_mosaic=img.copy()
if model=='squa_avg':
for i in range(int(h/n)):
for j in range(int(w/n)):
if mask[int(i*n+n/2),int(j*n+n/2)] == 255:
img_mosaic[i*n:(i+1)*n,j*n:(j+1)*n,:]=img[i*n:(i+1)*n,j*n:(j+1)*n,:].mean(0).mean(0)
elif model == 'squa_random':
for i in range(int(h/n)):
for j in range(int(w/n)):
if mask[int(i*n+n/2),int(j*n+n/2)] == 255:
img_mosaic[i*n:(i+1)*n,j*n:(j+1)*n,:]=img[int(i*n-n/2+n*random.random()),int(j*n-n/2+n*random.random()),:]
elif model == 'squa_avg_circle_edge':
for i in range(int(h/n)):
for j in range(int(w/n)):
img_mosaic[i*n:(i+1)*n,j*n:(j+1)*n,:]=img[i*n:(i+1)*n,j*n:(j+1)*n,:].mean(0).mean(0)
mask = channel_one2three(mask)
mask_inv = cv2.bitwise_not(mask)
imgroi1 = cv2.bitwise_and(mask,img_mosaic)
imgroi2 = cv2.bitwise_and(mask_inv,img)
img_mosaic = cv2.add(imgroi1,imgroi2)
elif model =='rect_avg':
rect_ratio=1+0.6*random.random()
n_h=n
n_w=int(n*rect_ratio)
for i in range(int(h/n_h)):
for j in range(int(w/n_w)):
if mask[int(i*n_h+n_h/2),int(j*n_w+n_w/2)] == 255:
img_mosaic[i*n_h:(i+1)*n_h,j*n_w:(j+1)*n_w,:]=img[i*n_h:(i+1)*n_h,j*n_w:(j+1)*n_w,:].mean(0).mean(0)
return img_mosaic
def random_mosaic_mod(img,mask,n):
ran=random.random()
if ran < 0.1:
img = addmosaic(img,mask,n,model = 'squa_random')
if 0.1 <= ran < 0.3:
img = addmosaic(img,mask,n,model = 'squa_avg')
elif 0.3 <= ran <0.5:
img = addmosaic(img,mask,n,model = 'squa_avg_circle_edge')
else:
img = addmosaic(img,mask,n,model = 'rect_avg')
return img
def random_mosaic(img,mask):
img = resize(img,512)
h,w = img.shape[:2]
mask = cv2.resize(mask,(w,h))
#area_avg=5925*4
image, contours, hierarchy = cv2.findContours(mask,cv2.RETR_TREE,cv2.CHAIN_APPROX_SIMPLE)
try:
area = cv2.contourArea(contours[0])
except:
area = 0
if area>50000:
img_mosaic = random_mosaic_mod(img,mask,random.randint(14,26))
elif 20000<area<=50000:
img_mosaic = random_mosaic_mod(img,mask,random.randint(10,18))
elif 5000<area<=20000:
img_mosaic = random_mosaic_mod(img,mask,random.randint(8,14))
elif 0<=area<=5000:
img_mosaic = random_mosaic_mod(img,mask,random.randint(4,8))
else:
pass
return img_mosaic
import os
def Traversal(filedir):
file_list=[]
for root,dirs,files in os.walk(filedir):
for file in files:
file_list.append(os.path.join(root,file))
for dir in dirs:
Traversal(dir)
return file_list
def is_img(path):
ext = os.path.splitext(path)[1]
ext = ext.lower()
if ext in ['.jpg','.png','.jpeg','.bmp']:
return True
else:
return False
def is_video(path):
ext = os.path.splitext(path)[1]
ext = ext.lower()
if ext in ['.mp4','.flv','.avi','.mov','.mkv','.wmv','.rmvb']:
return True
else:
return False
def writelog(path,log):
f = open(path,'a+')
f.write(log+'\n')
def clean_tempfiles():
if os.path.isdir('./tmp'):
os.system('rm -rf ./tmp')
os.makedirs('./tmp')
os.makedirs('./tmp/video2image')
os.makedirs('./tmp/addmosaic_image')
os.makedirs('./tmp/mosaic_crop')
os.makedirs('./tmp/replace_mosaic')
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册