提交 cd167ce2 编写于 作者: L LielinJiang

fix conflict

import os
import sys
cur_path = os.path.abspath(os.path.dirname(__file__))
sys.path.append(cur_path)
import time
import glob
import numpy as np
from imageio import imread, imsave
from tqdm import tqdm
import cv2
import paddle.fluid as fluid
from paddle.utils.download import get_path_from_url
from ppgan.utils.video import video2frames, frames2video
from util import *
from my_args import parser
DAIN_WEIGHT_URL = 'https://paddlegan.bj.bcebos.com/applications/DAIN_weight.tar'
def infer_engine(model_dir,
run_mode='fluid',
batch_size=1,
use_gpu=False,
min_subgraph_size=3):
if not use_gpu and not run_mode == 'fluid':
raise ValueError(
"Predict by TensorRT mode: {}, expect use_gpu==True, but use_gpu == {}"
.format(run_mode, use_gpu))
precision_map = {
'trt_fp32': fluid.core.AnalysisConfig.Precision.Float32,
'trt_fp16': fluid.core.AnalysisConfig.Precision.Half
}
config = fluid.core.AnalysisConfig(os.path.join(model_dir, 'model'),
os.path.join(model_dir, 'params'))
if use_gpu:
# initial GPU memory(M), device ID
config.enable_use_gpu(100, 0)
# optimize graph and fuse op
config.switch_ir_optim(True)
else:
config.disable_gpu()
if run_mode in precision_map.keys():
config.enable_tensorrt_engine(workspace_size=1 << 10,
max_batch_size=batch_size,
min_subgraph_size=min_subgraph_size,
precision_mode=precision_map[run_mode],
use_static=False,
use_calib_mode=False)
# disable print log when predict
config.disable_glog_info()
# enable shared memory
config.enable_memory_optim()
# disable feed, fetch OP, needed by zero_copy_run
config.switch_use_feed_fetch_ops(False)
predictor = fluid.core.create_paddle_predictor(config)
return predictor
def executor(model_dir, use_gpu=False):
place = fluid.CUDAPlace(0) if use_gpu else fluid.CPUPlace()
exe = fluid.Executor(place)
program, feed_names, fetch_targets = fluid.io.load_inference_model(
dirname=model_dir,
executor=exe,
model_filename='model',
params_filename='params')
return exe, program, fetch_targets
class VideoFrameInterp(object):
def __init__(self,
time_step,
model_path,
video_path,
use_gpu=True,
key_frame_thread=0.,
output_path='output'):
self.video_path = video_path
self.output_path = os.path.join(output_path, 'DAIN')
if model_path is None:
model_path = get_path_from_url(DAIN_WEIGHT_URL, cur_path)
self.model_path = model_path
self.time_step = time_step
self.key_frame_thread = key_frame_thread
self.exe, self.program, self.fetch_targets = executor(model_path,
use_gpu=use_gpu)
def run(self):
frame_path_input = os.path.join(self.output_path, 'frames-input')
frame_path_interpolated = os.path.join(self.output_path,
'frames-interpolated')
frame_path_combined = os.path.join(self.output_path, 'frames-combined')
video_path_output = os.path.join(self.output_path, 'videos-output')
if not os.path.exists(self.output_path):
os.makedirs(self.output_path)
if not os.path.exists(frame_path_input):
os.makedirs(frame_path_input)
if not os.path.exists(frame_path_interpolated):
os.makedirs(frame_path_interpolated)
if not os.path.exists(frame_path_combined):
os.makedirs(frame_path_combined)
if not os.path.exists(video_path_output):
os.makedirs(video_path_output)
timestep = self.time_step
num_frames = int(1.0 / timestep) - 1
if self.video_path.endswith('.mp4'):
videos = [self.video_path]
else:
videos = sorted(glob.glob(os.path.join(self.video_path, '*.mp4')))
for cnt, vid in enumerate(videos):
print("Interpolating video:", vid)
cap = cv2.VideoCapture(vid)
fps = cap.get(cv2.CAP_PROP_FPS)
print("Old fps (frame rate): ", fps)
times_interp = int(1.0 / timestep)
r2 = str(int(fps) * times_interp)
print("New fps (frame rate): ", r2)
out_path = video2frames(vid, frame_path_input)
vidname = vid.split('/')[-1].split('.')[0]
tot_timer = AverageMeter()
proc_timer = AverageMeter()
end = time.time()
frames = sorted(glob.glob(os.path.join(out_path, '*.png')))
img = imread(frames[0])
int_width = img.shape[1]
int_height = img.shape[0]
channel = img.shape[2]
if not channel == 3:
continue
if int_width != ((int_width >> 7) << 7):
int_width_pad = (
((int_width >> 7) + 1) << 7) # more than necessary
padding_left = int((int_width_pad - int_width) / 2)
padding_right = int_width_pad - int_width - padding_left
else:
int_width_pad = int_width
padding_left = 32
padding_right = 32
if int_height != ((int_height >> 7) << 7):
int_height_pad = (
((int_height >> 7) + 1) << 7) # more than necessary
padding_top = int((int_height_pad - int_height) / 2)
padding_bottom = int_height_pad - int_height - padding_top
else:
int_height_pad = int_height
padding_top = 32
padding_bottom = 32
frame_num = len(frames)
print('processing {} frames, from video: {}'.format(frame_num, vid))
if not os.path.exists(os.path.join(frame_path_interpolated,
vidname)):
os.makedirs(os.path.join(frame_path_interpolated, vidname))
if not os.path.exists(os.path.join(frame_path_combined, vidname)):
os.makedirs(os.path.join(frame_path_combined, vidname))
for i in tqdm(range(frame_num - 1)):
first = frames[i]
second = frames[i + 1]
img_first = imread(first)
img_second = imread(second)
'''--------------Frame change test------------------------'''
img_first_gray = np.dot(img_first[..., :3],
[0.299, 0.587, 0.114])
img_second_gray = np.dot(img_second[..., :3],
[0.299, 0.587, 0.114])
img_first_gray = img_first_gray.flatten(order='C')
img_second_gray = img_second_gray.flatten(order='C')
corr = np.corrcoef(img_first_gray, img_second_gray)[0, 1]
key_frame = False
if corr < self.key_frame_thread:
key_frame = True
'''-------------------------------------------------------'''
X0 = img_first.astype('float32').transpose((2, 0, 1)) / 255
X1 = img_second.astype('float32').transpose((2, 0, 1)) / 255
if key_frame:
y_ = [
np.transpose(255.0 * X0.clip(0, 1.0), (1, 2, 0))
for i in range(num_frames)
]
else:
assert (X0.shape[1] == X1.shape[1])
assert (X0.shape[2] == X1.shape[2])
X0 = np.pad(X0, ((0,0), (padding_top, padding_bottom), \
(padding_left, padding_right)), mode='edge')
X1 = np.pad(X1, ((0,0), (padding_top, padding_bottom), \
(padding_left, padding_right)), mode='edge')
X0 = np.expand_dims(X0, axis=0)
X1 = np.expand_dims(X1, axis=0)
X0 = np.expand_dims(X0, axis=0)
X1 = np.expand_dims(X1, axis=0)
X = np.concatenate((X0, X1), axis=0)
proc_end = time.time()
o = self.exe.run(self.program,
fetch_list=self.fetch_targets,
feed={"image": X})
y_ = o[0]
proc_timer.update(time.time() - proc_end)
tot_timer.update(time.time() - end)
end = time.time()
y_ = [
np.transpose(
255.0 * item.clip(
0, 1.0)[0, :,
padding_top:padding_top + int_height,
padding_left:padding_left + int_width],
(1, 2, 0)) for item in y_
]
time_offsets = [
kk * timestep for kk in range(1, 1 + num_frames, 1)
]
count = 1
for item, time_offset in zip(y_, time_offsets):
out_dir = os.path.join(
frame_path_interpolated, vidname,
"{:0>6d}_{:0>4d}.png".format(i, count))
count = count + 1
imsave(out_dir, np.round(item).astype(np.uint8))
num_frames = int(1.0 / timestep) - 1
input_dir = os.path.join(frame_path_input, vidname)
interpolated_dir = os.path.join(frame_path_interpolated, vidname)
combined_dir = os.path.join(frame_path_combined, vidname)
combine_frames(input_dir, interpolated_dir, combined_dir,
num_frames)
frame_pattern_combined = os.path.join(frame_path_combined, vidname,
'%08d.png')
video_pattern_output = os.path.join(video_path_output,
vidname + '.mp4')
if os.path.exists(video_pattern_output):
os.remove(video_pattern_output)
frames2video(frame_pattern_combined, video_pattern_output, r2)
return frame_pattern_combined, video_pattern_output
if __name__ == '__main__':
args = parser.parse_args()
predictor = VideoFrameInterp(args.time_step, args.saved_model,
args.video_path, args.output_path)
predictor.run()
# 模型说明
# 目前包含DAIN(插帧模型),DeOldify(上色模型),DeepRemaster(去噪与上色模型),EDVR(基于连续帧(视频)超分辨率模型),RealSR(基于图片的超分辨率模型)
# 参数说明
# input 输入视频的路径
# output 输出视频保存的路径
# proccess_order 要使用的模型及顺序
python tools/video-enhance.py \
--input input.mp4 --output output --proccess_order DeOldify RealSR
# copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import argparse
from pathlib import Path
from PIL import Image
from fire import Fire
import numpy as np
import paddle
import paddle.vision.transforms as T
import ppgan.faceutils as futils
from ppgan.utils.options import parse_args
from ppgan.utils.config import get_config
from ppgan.utils.setup import setup
from ppgan.utils.filesystem import load
from ppgan.engine.trainer import Trainer
from ppgan.models.builder import build_model
from ppgan.utils.preprocess import *
def toImage(net_output):
img = net_output.squeeze(0).transpose(
(1, 2, 0)).numpy() # [1,c,h,w]->[h,w,c]
img = (img * 255.0).clip(0, 255)
img = np.uint8(img)
img = Image.fromarray(img, mode='RGB')
return img
def mask2image(mask: np.array, format="HWC"):
H, W = mask.shape
canvas = np.zeros((H, W, 3), dtype=np.uint8)
for i in range(int(mask.max())):
color = np.random.rand(1, 1, 3) * 255
canvas += (mask == i)[:, :, None] * color.astype(np.uint8)
return canvas
class PreProcess:
def __init__(self, config, need_parser=True):
self.img_size = 256
self.transform = transform = T.Compose([
T.Resize(size=256),
T.Permute(to_rgb=False),
])
self.norm = T.Normalize([127.5, 127.5, 127.5], [127.5, 127.5, 127.5])
if need_parser:
self.face_parser = futils.mask.FaceParser()
self.up_ratio = 0.6 / 0.85
self.down_ratio = 0.2 / 0.85
self.width_ratio = 0.2 / 0.85
def __call__(self, image):
face = futils.dlib.detect(image)
if not face:
return
face_on_image = face[0]
image, face, crop_face = futils.dlib.crop(image, face_on_image,
self.up_ratio,
self.down_ratio,
self.width_ratio)
np_image = np.array(image)
mask = self.face_parser.parse(
np.float32(cv2.resize(np_image, (512, 512))))
mask = cv2.resize(mask.numpy(), (self.img_size, self.img_size),
interpolation=cv2.INTER_NEAREST)
mask = mask.astype(np.uint8)
mask_color = mask2image(mask)
cv2.imwrite('mask_temp.png', mask_color)
mask_tensor = paddle.to_tensor(mask)
lms = futils.dlib.landmarks(image, face) * self.img_size / image.width
lms = lms.round()
P_np = generate_P_from_lmks(lms, self.img_size, self.img_size,
self.img_size)
mask_aug = generate_mask_aug(mask, lms)
image = self.transform(np_image)
return [
self.norm(image),
np.float32(mask_aug),
np.float32(P_np),
np.float32(mask)
], face_on_image, crop_face
class PostProcess:
def __init__(self, config):
self.denoise = True
self.img_size = 256
def __call__(self, source: Image, result: Image):
# TODO: Refract -> name, resize
source = np.array(source)
result = np.array(result)
height, width = source.shape[:2]
small_source = cv2.resize(source, (self.img_size, self.img_size))
laplacian_diff = source.astype(np.float) - cv2.resize(
small_source, (width, height)).astype(np.float)
result = (cv2.resize(result,
(width, height)) + laplacian_diff).round().clip(
0, 255).astype(np.uint8)
if self.denoise:
result = cv2.fastNlMeansDenoisingColored(result)
result = Image.fromarray(result).convert('RGB')
return result
class Inference:
def __init__(self, config, model_path=''):
self.model = build_model(config)
self.preprocess = PreProcess(config)
self.model_path = model_path
def transfer(self, source, reference, with_face=False):
source_input, face, crop_face = self.preprocess(source)
reference_input, face, crop_face = self.preprocess(reference)
consis_mask = np.float32(
calculate_consis_mask(source_input[1], reference_input[1]))
consis_mask = paddle.to_tensor(np.expand_dims(consis_mask, 0))
if not (source_input and reference_input):
if with_face:
return None, None
return
for i in range(len(source_input) - 1):
source_input[i] = paddle.to_tensor(
np.expand_dims(source_input[i], 0))
for i in range(len(reference_input) - 1):
reference_input[i] = paddle.to_tensor(
np.expand_dims(reference_input[i], 0))
input_data = {
'image_A': source_input[0],
'image_B': reference_input[0],
'mask_A_aug': source_input[1],
'mask_B_aug': reference_input[1],
'P_A': source_input[2],
'P_B': reference_input[2],
'consis_mask': consis_mask
}
state_dicts = load(self.model_path)
net = getattr(self.model, 'netG')
net.set_dict(state_dicts['netG'])
result, _ = self.model.test(input_data)
print('result shape: ', result.shape)
min_, max_ = result.min(), result.max()
result += -min_
result = paddle.divide(result, max_ - min_ + 1e-5)
img = toImage(result)
if with_face:
return img, crop_face
img.save('before.png')
return img
def main(args, cfg, save_path='transferred_image.png'):
setup(args, cfg)
inference = Inference(cfg, args.model_path)
postprocess = PostProcess(cfg)
source = Image.open(args.source_path).convert("RGB")
reference_paths = list(Path(args.reference_dir).glob("*"))
np.random.shuffle(reference_paths)
for reference_path in reference_paths:
if not reference_path.is_file():
print(reference_path, "is not a valid file.")
continue
reference = Image.open(reference_path).convert("RGB")
# Transfer the psgan from reference to source.
image, face = inference.transfer(source, reference, with_face=True)
image.save('before.png')
source_crop = source.crop(
(face.left(), face.top(), face.right(), face.bottom()))
image = postprocess(source_crop, image)
image.save(save_path)
if __name__ == '__main__':
args = parse_args()
cfg = get_config(args.config_file)
main(args, cfg)
epochs: 100
isTrain: True
output_dir: tmp
checkpoints_dir: checkpoints
lambda_A: 10.0
lambda_B: 10.0
lambda_identity: 0.5
model:
name: MakeupModel
generator:
name: GeneratorPSGANAttention
conv_dim: 64
repeat_num: 6
discriminator:
name: NLayerDiscriminator
ndf: 64
n_layers: 3
input_nc: 3
norm_type: spectral
gan_mode: lsgan
dataset:
train:
name: MakeupDataset
trans_size: 256
dataroot: MT-Dataset
cls_list: [non-makeup, makeup]
phase: train
pool_size: 16
test:
name: MakeupDataset
trans_size: 256
dataroot: MT-Dataset
cls_list: [non-makeup, makeup]
phase: test
pool_size: 16
optimizer:
name: Adam
beta1: 0.5
lr_scheduler:
name: linear
learning_rate: 0.0002
start_epoch: 100
decay_epochs: 100
log_config:
interval: 10
visiual_interval: 500
snapshot_config:
interval: 1
...@@ -84,9 +84,14 @@ class DAINPredictor(BasePredictor): ...@@ -84,9 +84,14 @@ class DAINPredictor(BasePredictor):
vidname = video_path.split('/')[-1].split('.')[0] vidname = video_path.split('/')[-1].split('.')[0]
frames = sorted(glob.glob(os.path.join(out_path, '*.png'))) frames = sorted(glob.glob(os.path.join(out_path, '*.png')))
orig_frames = len(frames)
need_frames = orig_frames * times_interp
if self.remove_duplicates: if self.remove_duplicates:
frames = self.remove_duplicate_frames(out_path) frames = self.remove_duplicate_frames(out_path)
left_frames = len(frames)
timestep = left_frames / need_frames
num_frames = int(1.0 / timestep) - 1
img = imread(frames[0]) img = imread(frames[0])
......
# copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .unpaired_dataset import UnpairedDataset from .unpaired_dataset import UnpairedDataset
from .single_dataset import SingleDataset from .single_dataset import SingleDataset
from .paired_dataset import PairedDataset from .paired_dataset import PairedDataset
from .sr_image_dataset import SRImageDataset from .sr_image_dataset import SRImageDataset
from .makeup_dataset import MakeupDataset
# copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import cv2
import os.path
from .base_dataset import BaseDataset, get_transform
from .transforms.makeup_transforms import get_makeup_transform
import paddle.vision.transforms as T
from PIL import Image
import random
import numpy as np
from ..utils.preprocess import *
from .builder import DATASETS
@DATASETS.register()
class MakeupDataset(BaseDataset):
def __init__(self, cfg):
"""Initialize this dataset class.
Parameters:
opt (Option class) -- stores all the experiment flags; needs to be a subclass of BaseOptions
"""
BaseDataset.__init__(self, cfg)
self.image_path = cfg.dataroot
self.mode = cfg.phase
self.transform = get_makeup_transform(cfg)
self.norm = T.Normalize([127.5, 127.5, 127.5], [127.5, 127.5, 127.5])
self.transform_mask = get_makeup_transform(cfg, pic="mask")
self.trans_size = cfg.trans_size
self.cls_list = cfg.cls_list
self.cls_A = self.cls_list[0]
self.cls_B = self.cls_list[1]
for cls in self.cls_list:
setattr(
self, cls + "_list_path",
os.path.join(self.image_path, self.mode + '_' + cls + ".txt"))
setattr(self, cls + "_lines",
open(getattr(self, cls + "_list_path"), 'r').readlines())
setattr(self, "num_of_" + cls + "_data",
len(getattr(self, cls + "_lines")))
print('Start preprocessing dataset..!')
self.preprocess()
print('Finished preprocessing dataset..!')
def preprocess(self):
"""preprocess image"""
for cls in self.cls_list:
setattr(self, cls + "_filenames", [])
setattr(self, cls + "_mask_filenames", [])
setattr(self, cls + "_lmks_filenames", [])
lines = getattr(self, cls + "_lines")
random.shuffle(lines)
for i, line in enumerate(lines):
splits = line.split()
getattr(self, cls + "_filenames").append(splits[0])
getattr(self, cls + "_mask_filenames").append(splits[1])
getattr(self, cls + "_lmks_filenames").append(splits[2])
def __getitem__(self, index):
"""Return MANet and MDNet needed params.
Parameters:
index (int) -- a random integer for data indexing
Returns a dictionary that contains needed params.
"""
try:
index_A = random.randint(
0, getattr(self, "num_of_" + self.cls_A + "_data"))
index_B = random.randint(
0, getattr(self, "num_of_" + self.cls_B + "_data"))
if self.mode == 'test':
num_b = getattr(self, 'num_of_' + self.cls_list[1] + '_data')
index_A = int(index / num_b)
index_B = int(index % num_b)
image_A = Image.open(
os.path.join(self.image_path,
getattr(self, self.cls_A +
"_filenames")[index_A])).convert("RGB")
image_B = Image.open(
os.path.join(self.image_path,
getattr(self, self.cls_B +
"_filenames")[index_B])).convert("RGB")
mask_A = np.array(
Image.open(
os.path.join(
self.image_path,
getattr(self,
self.cls_A + "_mask_filenames")[index_A])))
mask_B = np.array(
Image.open(
os.path.join(
self.image_path,
getattr(self, self.cls_B +
"_mask_filenames")[index_B])).convert('L'))
image_A = np.array(image_A)
image_B = np.array(image_B)
image_A = self.transform(image_A)
image_B = self.transform(image_B)
mask_A = cv2.resize(mask_A, (256, 256),
interpolation=cv2.INTER_NEAREST)
mask_B = cv2.resize(mask_B, (256, 256),
interpolation=cv2.INTER_NEAREST)
lmks_A = np.loadtxt(
os.path.join(
self.image_path,
getattr(self, self.cls_A + "_lmks_filenames")[index_A]))
lmks_B = np.loadtxt(
os.path.join(
self.image_path,
getattr(self, self.cls_B + "_lmks_filenames")[index_B]))
lmks_A = lmks_A / image_A.shape[:2] * self.trans_size
lmks_B = lmks_B / image_B.shape[:2] * self.trans_size
P_A = generate_P_from_lmks(lmks_A, self.trans_size,
image_A.shape[0], image_A.shape[1])
P_B = generate_P_from_lmks(lmks_B, self.trans_size,
image_B.shape[0], image_B.shape[1])
mask_A_aug = generate_mask_aug(mask_A, lmks_A)
mask_B_aug = generate_mask_aug(mask_B, lmks_B)
consis_mask = calculate_consis_mask(mask_A_aug, mask_B_aug)
consis_mask_idt_A = calculate_consis_mask(mask_A_aug, mask_A_aug)
consis_mask_idt_B = calculate_consis_mask(mask_A_aug, mask_B_aug)
except Exception as e:
print(e)
return self.__getitem__(index + 1)
return {
'image_A': self.norm(image_A),
'image_B': self.norm(image_B),
'mask_A': np.float32(mask_A),
'mask_B': np.float32(mask_B),
'consis_mask': np.float32(consis_mask),
'P_A': np.float32(P_A),
'P_B': np.float32(P_B),
'consis_mask_idt_A': np.float32(consis_mask_idt_A),
'consis_mask_idt_B': np.float32(consis_mask_idt_B),
'mask_A_aug': np.float32(mask_A_aug),
'mask_B_aug': np.float32(mask_B_aug)
}
def __len__(self):
"""Return the total number of images in the dataset.
As we have two datasets with potentially different number of images,
we take a maximum of
"""
if self.mode == 'train':
num_A = getattr(self, 'num_of_' + self.cls_list[0] + '_data')
num_B = getattr(self, 'num_of_' + self.cls_list[1] + '_data')
return max(num_A, num_B)
elif self.mode == "test":
num_A = getattr(self, 'num_of_' + self.cls_list[0] + '_data')
num_B = getattr(self, 'num_of_' + self.cls_list[1] + '_data')
return num_A * num_B
return max(self.A_size, self.B_size)
# copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle.vision.transforms as T
import cv2
def get_makeup_transform(cfg, pic="image"):
if pic == "image":
transform = T.Compose([
T.Resize(size=cfg.trans_size),
T.Permute(to_rgb=False),
])
else:
transform = T.Resize(size=cfg.trans_size,
interpolation=cv2.INTER_NEAREST)
return transform
# copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from . import dlibutils as dlib
from . import mask
from . import image
from .dlib_utils import detect, crop, landmarks, crop_from_array
# copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os.path as osp
import numpy as np
from PIL import Image
import dlib
import cv2
from ..image import resize_by_max
detector = dlib.get_frontal_face_detector()
predictor = dlib.shape_predictor(
osp.split(osp.realpath(__file__))[0] + '/lms.dat')
def detect(image: Image):
image = np.asarray(image)
h, w = image.shape[:2]
image = resize_by_max(image, 361)
actual_h, actual_w = image.shape[:2]
faces_on_small = detector(image, 1)
faces = dlib.rectangles()
for face in faces_on_small:
faces.append(
dlib.rectangle(int(face.left() / actual_w * w + 0.5),
int(face.top() / actual_h * h + 0.5),
int(face.right() / actual_w * w + 0.5),
int(face.bottom() / actual_h * h + 0.5)))
return faces
def crop(image: Image, face, up_ratio, down_ratio, width_ratio):
width, height = image.size
face_height = face.height()
face_width = face.width()
delta_up = up_ratio * face_height
delta_down = down_ratio * face_height
delta_width = width_ratio * width
img_left = int(max(0, face.left() - delta_width))
img_top = int(max(0, face.top() - delta_up))
img_right = int(min(width, face.right() + delta_width))
img_bottom = int(min(height, face.bottom() + delta_down))
image = image.crop((img_left, img_top, img_right, img_bottom))
face = dlib.rectangle(face.left() - img_left,
face.top() - img_top,
face.right() - img_left,
face.bottom() - img_top)
face_expand = dlib.rectangle(img_left, img_top, img_right, img_bottom)
center = face_expand.center()
width, height = image.size
crop_left = img_left
crop_top = img_top
crop_right = img_right
crop_bottom = img_bottom
if width > height:
left = int(center.x - height / 2)
right = int(center.x + height / 2)
if left < 0:
left, right = 0, height
elif right > width:
left, right = width - height, width
image = image.crop((left, 0, right, height))
face = dlib.rectangle(face.left() - left, face.top(),
face.right() - left, face.bottom())
crop_left += left
crop_right = crop_left + height
elif width < height:
top = int(center.y - width / 2)
bottom = int(center.y + width / 2)
if top < 0:
top, bottom = 0, width
elif bottom > height:
top, bottom = height - width, height
image = image.crop((0, top, width, bottom))
face = dlib.rectangle(face.left(),
face.top() - top, face.right(),
face.bottom() - top)
crop_top += top
crop_bottom = crop_top + width
crop_face = dlib.rectangle(crop_left, crop_top, crop_right, crop_bottom)
return image, face, crop_face
def crop_by_image_size(image: Image, face):
center = face.center()
width, height = image.size
if width > height:
left = int(center.x - height / 2)
right = int(center.x + height / 2)
if left < 0:
left, right = 0, height
elif right > width:
left, right = width - height, width
image = image.crop((left, 0, right, height))
face = dlib.rectangle(face.left() - left, face.top(),
face.right() - left, face.bottom())
elif width < height:
top = int(center.y - width / 2)
bottom = int(center.y + width / 2)
if top < 0:
top, bottom = 0, width
elif bottom > height:
top, bottom = height - width, height
image = image.crop((0, top, width, bottom))
face = dlib.rectangle(face.left(),
face.top() - top, face.right(),
face.bottom() - top)
return image, face
def landmarks(image: Image, face):
shape = predictor(np.asarray(image), face).parts()
return np.array([[p.y, p.x] for p in shape])
def crop_from_array(image: np.array, face):
ratio = 0.20 / 0.85 # delta_size / face_size
height, width = image.shape[:2]
face_height = face.height()
face_width = face.width()
delta_height = ratio * face_height
delta_width = ratio * width
img_left = int(max(0, face.left() - delta_width))
img_top = int(max(0, face.top() - delta_height))
img_right = int(min(width, face.right() + delta_width))
img_bottom = int(min(height, face.bottom() + delta_height))
image = image[img_top:img_bottom, img_left:img_right]
face = dlib.rectangle(face.left() - img_left,
face.top() - img_top,
face.right() - img_left,
face.bottom() - img_top)
center = face.center()
height, width = image.shape[:2]
if width > height:
left = int(center.x - height / 2)
right = int(center.x + height / 2)
if left < 0:
left, right = 0, height
elif right > width:
left, right = width - height, width
image = image[0:height, left:right]
face = dlib.rectangle(face.left() - left, face.top(),
face.right() - left, face.bottom())
elif width < height:
top = int(center.y - width / 2)
bottom = int(center.y + width / 2)
if top < 0:
top, bottom = 0, width
elif bottom > height:
top, bottom = height - width, height
image = image[top:bottom, 0:width]
face = dlib.rectangle(face.left(),
face.top() - top, face.right(),
face.bottom() - top)
return image, face
import numpy as np
import cv2
from io import BytesIO
def resize_by_max(image, max_side=512, force=False):
h, w = image.shape[:2]
if max(h, w) < max_side and not force:
return image
ratio = max(h, w) / max_side
w = int(w / ratio + 0.5)
h = int(h / ratio + 0.5)
return cv2.resize(image, (w, h))
# copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .face_parser import FaceParser
import os.path as osp
import numpy as np
import cv2
from PIL import Image
import paddle
import paddle.vision.transforms as T
import pickle
from .model import BiSeNet
class FaceParser:
def __init__(self, device="cpu"):
self.mapper = {
0: 0,
1: 1,
2: 2,
3: 3,
4: 4,
5: 5,
6: 0,
7: 11,
8: 12,
9: 0,
10: 6,
11: 8,
12: 7,
13: 9,
14: 13,
15: 0,
16: 0,
17: 10,
18: 0
}
#self.dict = paddle.to_tensor(mapper)
self.save_pth = osp.split(
osp.realpath(__file__))[0] + '/resnet.pdparams'
self.net = BiSeNet(n_classes=19)
self.transforms = T.Compose([
T.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
])
def parse(self, image):
assert image.shape[:2] == (512, 512)
image = image / 255.0
image = image.transpose((2, 0, 1))
image = self.transforms(image)
state_dict, _ = paddle.load(self.save_pth)
self.net.set_dict(state_dict)
self.net.eval()
with paddle.no_grad():
image = paddle.to_tensor(image)
image = image.unsqueeze(0)
out = self.net(image)[0]
parsing = out.squeeze(0).argmax(0) #argmax(0).astype('float32')
#parsing = paddle.nn.functional.embedding(x=self.dict, weight=parsing)
parse_np = parsing.numpy()
h, w = parse_np.shape
result = np.zeros((h, w))
for i in range(h):
for j in range(w):
result[i][j] = self.mapper[parse_np[i][j]]
result = paddle.to_tensor(result).astype('float32')
return result
# copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle
from paddle import nn
import paddle.nn.functional as F
from paddle.utils.download import get_weights_path_from_url
import numpy as np
from .resnet import resnet18
class ConvBNReLU(paddle.nn.Layer):
def __init__(self,
in_chan,
out_chan,
ks=3,
stride=1,
padding=1,
*args,
**kwargs):
super(ConvBNReLU, self).__init__()
self.conv = nn.Conv2d(in_chan,
out_chan,
kernel_size=ks,
stride=stride,
padding=padding,
bias_attr=False)
self.bn = nn.BatchNorm2d(out_chan)
self.relu = nn.ReLU()
def forward(self, x):
x = self.conv(x)
x = self.bn(x)
x = self.relu(x)
return x
class BiSeNetOutput(paddle.nn.Layer):
def __init__(self, in_chan, mid_chan, n_classes, *args, **kwargs):
super(BiSeNetOutput, self).__init__()
self.conv = ConvBNReLU(in_chan, mid_chan, ks=3, stride=1, padding=1)
self.conv_out = nn.Conv2d(mid_chan,
n_classes,
kernel_size=1,
bias_attr=False)
def forward(self, x):
x = self.conv(x)
x = self.conv_out(x)
return x
class AttentionRefinementModule(paddle.nn.Layer):
def __init__(self, in_chan, out_chan, *args, **kwargs):
super(AttentionRefinementModule, self).__init__()
self.conv = ConvBNReLU(in_chan, out_chan, ks=3, stride=1, padding=1)
self.conv_atten = nn.Conv2d(out_chan,
out_chan,
kernel_size=1,
bias_attr=False)
self.bn_atten = nn.BatchNorm(out_chan)
self.sigmoid_atten = nn.Sigmoid()
def forward(self, x):
feat = self.conv(x)
atten = F.avg_pool2d(feat, feat.shape[2:])
atten = self.conv_atten(atten)
atten = self.bn_atten(atten)
atten = self.sigmoid_atten(atten)
out = feat * atten
return out
class ContextPath(paddle.nn.Layer):
def __init__(self, *args, **kwargs):
super(ContextPath, self).__init__()
self.resnet = resnet18()
self.arm16 = AttentionRefinementModule(256, 128)
self.arm32 = AttentionRefinementModule(512, 128)
self.conv_head32 = ConvBNReLU(128, 128, ks=3, stride=1, padding=1)
self.conv_head16 = ConvBNReLU(128, 128, ks=3, stride=1, padding=1)
self.conv_avg = ConvBNReLU(512, 128, ks=1, stride=1, padding=0)
def forward(self, x):
H0, W0 = x.shape[2:]
feat8, feat16, feat32 = self.resnet(x)
H8, W8 = feat8.shape[2:]
H16, W16 = feat16.shape[2:]
H32, W32 = feat32.shape[2:]
avg = F.avg_pool2d(feat32, feat32.shape[2:])
avg = self.conv_avg(avg)
avg_up = F.interpolate(avg, size=(H32, W32), mode='nearest')
feat32_arm = self.arm32(feat32)
feat32_sum = feat32_arm + avg_up
feat32_up = F.interpolate(feat32_sum, size=(H16, W16), mode='nearest')
feat32_up = self.conv_head32(feat32_up)
feat16_arm = self.arm16(feat16)
feat16_sum = feat16_arm + feat32_up
feat16_up = F.interpolate(feat16_sum, size=(H8, W8), mode='nearest')
feat16_up = self.conv_head16(feat16_up)
return feat8, feat16_up, feat32_up # x8, x8, x16
class SpatialPath(paddle.nn.Layer):
def __init__(self, *args, **kwargs):
super(SpatialPath, self).__init__()
self.conv1 = ConvBNReLU(3, 64, ks=7, stride=2, padding=3)
self.conv2 = ConvBNReLU(64, 64, ks=3, stride=2, padding=1)
self.conv3 = ConvBNReLU(64, 64, ks=3, stride=2, padding=1)
self.conv_out = ConvBNReLU(64, 128, ks=1, stride=1, padding=0)
def forward(self, x):
feat = self.conv1(x)
feat = self.conv2(feat)
feat = self.conv3(feat)
feat = self.conv_out(feat)
return feat
class FeatureFusionModule(paddle.nn.Layer):
def __init__(self, in_chan, out_chan, *args, **kwargs):
super(FeatureFusionModule, self).__init__()
self.convblk = ConvBNReLU(in_chan, out_chan, ks=1, stride=1, padding=0)
self.conv1 = nn.Conv2d(out_chan,
out_chan // 4,
kernel_size=1,
stride=1,
padding=0,
bias_attr=False)
self.conv2 = nn.Conv2d(out_chan // 4,
out_chan,
kernel_size=1,
stride=1,
padding=0,
bias_attr=False)
self.relu = nn.ReLU()
self.sigmoid = nn.Sigmoid()
def forward(self, fsp, fcp):
fcat = paddle.concat([fsp, fcp], axis=1)
feat = self.convblk(fcat)
atten = F.avg_pool2d(feat, feat.shape[2:])
atten = self.conv1(atten)
atten = self.relu(atten)
atten = self.conv2(atten)
atten = self.sigmoid(atten)
feat_atten = feat * atten
feat_out = feat_atten + feat
return feat_out
class BiSeNet(paddle.nn.Layer):
def __init__(self, n_classes, *args, **kwargs):
super(BiSeNet, self).__init__()
self.cp = ContextPath()
self.ffm = FeatureFusionModule(256, 256)
self.conv_out = BiSeNetOutput(256, 256, n_classes)
self.conv_out16 = BiSeNetOutput(128, 64, n_classes)
self.conv_out32 = BiSeNetOutput(128, 64, n_classes)
def forward(self, x):
H, W = x.shape[2:]
feat_res8, feat_cp8, feat_cp16 = self.cp(
x) # here return res3b1 feature
feat_sp = feat_res8 # use res3b1 feature to replace spatial path feature
feat_fuse = self.ffm(feat_sp, feat_cp8)
feat_out = self.conv_out(feat_fuse)
feat_out16 = self.conv_out16(feat_cp8)
feat_out32 = self.conv_out32(feat_cp16)
feat_out = F.interpolate(feat_out, size=(H, W))
feat_out16 = F.interpolate(feat_out16, size=(H, W))
feat_out32 = F.interpolate(feat_out32, size=(H, W))
return feat_out, feat_out16, feat_out32
#copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
from __future__ import print_function
import paddle
from paddle import nn
import paddle.nn.functional as F
from paddle.utils.download import get_weights_path_from_url
import numpy as np
import math
model_urls = {
'resnet18': ('https://paddle-hapi.bj.bcebos.com/models/resnet18.pdparams',
'0ba53eea9bc970962d0ef96f7b94057e'),
}
def conv3x3(in_planes, out_planes, stride=1):
"""3x3 convolution with padding"""
return nn.Conv2d(in_planes,
out_planes,
kernel_size=3,
stride=stride,
padding=1,
bias_attr=False)
class BasicBlock(paddle.nn.Layer):
def __init__(self, in_chan, out_chan, stride=1):
super(BasicBlock, self).__init__()
self.conv1 = conv3x3(in_chan, out_chan, stride)
self.bn1 = nn.BatchNorm(out_chan)
self.conv2 = conv3x3(out_chan, out_chan)
self.bn2 = nn.BatchNorm(out_chan)
self.relu = nn.ReLU()
self.downsample = None
if in_chan != out_chan or stride != 1:
self.downsample = nn.Sequential(
nn.Conv2d(in_chan,
out_chan,
kernel_size=1,
stride=stride,
bias_attr=False),
nn.BatchNorm(out_chan),
)
def forward(self, x):
residual = self.conv1(x)
residual = self.relu(self.bn1(residual))
residual = self.conv2(residual)
residual = self.bn2(residual)
shortcut = x
if self.downsample is not None:
shortcut = self.downsample(x)
out = shortcut + residual
out = self.relu(out)
return out
def create_layer_basic(in_chan, out_chan, bnum, stride=1):
layers = [BasicBlock(in_chan, out_chan, stride=stride)]
for i in range(bnum - 1):
layers.append(BasicBlock(out_chan, out_chan, stride=1))
return nn.Sequential(*layers)
class Resnet18(paddle.nn.Layer):
def __init__(self):
super(Resnet18, self).__init__()
self.conv1 = nn.Conv2d(3,
64,
kernel_size=7,
stride=2,
padding=3,
bias_attr=False)
self.bn1 = nn.BatchNorm(64)
self.relu = nn.ReLU()
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
self.layer1 = create_layer_basic(64, 64, bnum=2, stride=1)
self.layer2 = create_layer_basic(64, 128, bnum=2, stride=2)
self.layer3 = create_layer_basic(128, 256, bnum=2, stride=2)
self.layer4 = create_layer_basic(256, 512, bnum=2, stride=2)
def forward(self, x):
x = self.conv1(x)
x = self.relu(self.bn1(x))
x = self.maxpool(x)
x = self.layer1(x)
feat8 = self.layer2(x) # 1/8
feat16 = self.layer3(feat8) # 1/16
feat32 = self.layer4(feat16) # 1/32
return feat8, feat16, feat32
def resnet18(pretrained=False, **kwargs):
model = Resnet18()
arch = 'resnet18'
if pretrained:
weight_path = './resnet.pdparams'
param, _ = paddle.load(weight_path)
model.set_dict(param)
return model
# copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .base_model import BaseModel from .base_model import BaseModel
from .cycle_gan_model import CycleGANModel from .cycle_gan_model import CycleGANModel
from .pix2pix_model import Pix2PixModel from .pix2pix_model import Pix2PixModel
from .srgan_model import SRGANModel from .srgan_model import SRGANModel
from .sr_model import SRModel from .sr_model import SRModel
from .makeup_model import MakeupModel
from .vgg import vgg16
import functools # copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve.
import numpy as np #
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle import paddle
import functools
import numpy as np
import paddle.nn as nn import paddle.nn as nn
import paddle.nn.functional as F
from ...modules.nn import Spectralnorm
from ...modules.norm import build_norm_layer from ...modules.norm import build_norm_layer
from .builder import DISCRIMINATORS from .builder import DISCRIMINATORS
...@@ -14,7 +30,7 @@ class NLayerDiscriminator(nn.Layer): ...@@ -14,7 +30,7 @@ class NLayerDiscriminator(nn.Layer):
def __init__(self, input_nc, ndf=64, n_layers=3, norm_type='instance'): def __init__(self, input_nc, ndf=64, n_layers=3, norm_type='instance'):
"""Construct a PatchGAN discriminator """Construct a PatchGAN discriminator
Args: Parameters:
input_nc (int) -- the number of channels in input images input_nc (int) -- the number of channels in input images
ndf (int) -- the number of filters in the last conv layer ndf (int) -- the number of filters in the last conv layer
n_layers (int) -- the number of conv layers in the discriminator n_layers (int) -- the number of conv layers in the discriminator
...@@ -22,22 +38,52 @@ class NLayerDiscriminator(nn.Layer): ...@@ -22,22 +38,52 @@ class NLayerDiscriminator(nn.Layer):
""" """
super(NLayerDiscriminator, self).__init__() super(NLayerDiscriminator, self).__init__()
norm_layer = build_norm_layer(norm_type) norm_layer = build_norm_layer(norm_type)
if type(norm_layer) == functools.partial: if type(
use_bias = norm_layer.func == nn.InstanceNorm norm_layer
) == functools.partial: # no need to use bias as BatchNorm2d has affine parameters
use_bias = norm_layer.func == nn.InstanceNorm2d
else: else:
use_bias = norm_layer == nn.InstanceNorm use_bias = norm_layer == nn.InstanceNorm2d
kw = 4 kw = 4
padw = 1 padw = 1
if norm_type == 'spectral':
sequence = [
Spectralnorm(
nn.Conv2d(input_nc,
ndf,
kernel_size=kw,
stride=2,
padding=padw)),
nn.LeakyReLU(0.01)
]
else:
sequence = [ sequence = [
nn.Conv2d(input_nc, ndf, kernel_size=kw, stride=2, padding=padw), nn.Conv2d(input_nc,
ndf,
kernel_size=kw,
stride=2,
padding=padw,
bias_attr=use_bias),
nn.LeakyReLU(0.2) nn.LeakyReLU(0.2)
] ]
nf_mult = 1 nf_mult = 1
nf_mult_prev = 1 nf_mult_prev = 1
for n in range(1, n_layers): for n in range(1, n_layers): # gradually increase the number of filters
nf_mult_prev = nf_mult nf_mult_prev = nf_mult
nf_mult = min(2**n, 8) nf_mult = min(2**n, 8)
if norm_type == 'spectral':
sequence += [
Spectralnorm(
nn.Conv2d(ndf * nf_mult_prev,
ndf * nf_mult,
kernel_size=kw,
stride=2,
padding=padw)),
nn.LeakyReLU(0.01)
]
else:
sequence += [ sequence += [
nn.Conv2d(ndf * nf_mult_prev, nn.Conv2d(ndf * nf_mult_prev,
ndf * nf_mult, ndf * nf_mult,
...@@ -51,6 +97,17 @@ class NLayerDiscriminator(nn.Layer): ...@@ -51,6 +97,17 @@ class NLayerDiscriminator(nn.Layer):
nf_mult_prev = nf_mult nf_mult_prev = nf_mult
nf_mult = min(2**n_layers, 8) nf_mult = min(2**n_layers, 8)
if norm_type == 'spectral':
sequence += [
Spectralnorm(
nn.Conv2d(ndf * nf_mult_prev,
ndf * nf_mult,
kernel_size=kw,
stride=1,
padding=padw)),
nn.LeakyReLU(0.01)
]
else:
sequence += [ sequence += [
nn.Conv2d(ndf * nf_mult_prev, nn.Conv2d(ndf * nf_mult_prev,
ndf * nf_mult, ndf * nf_mult,
...@@ -62,9 +119,26 @@ class NLayerDiscriminator(nn.Layer): ...@@ -62,9 +119,26 @@ class NLayerDiscriminator(nn.Layer):
nn.LeakyReLU(0.2) nn.LeakyReLU(0.2)
] ]
if norm_type == 'spectral':
sequence += [ sequence += [
nn.Conv2d(ndf * nf_mult, 1, kernel_size=kw, stride=1, padding=padw) Spectralnorm(
] nn.Conv2d(ndf * nf_mult,
1,
kernel_size=kw,
stride=1,
padding=padw,
bias_attr=False))
] # output 1 channel prediction map
else:
sequence += [
nn.Conv2d(ndf * nf_mult,
1,
kernel_size=kw,
stride=1,
padding=padw,
bias_attr=False)
] # output 1 channel prediction map
self.model = nn.Sequential(*sequence) self.model = nn.Sequential(*sequence)
def forward(self, input): def forward(self, input):
......
# copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .resnet import ResnetGenerator from .resnet import ResnetGenerator
from .unet import UnetGenerator from .unet import UnetGenerator
from .rrdb_net import RRDBNet from .rrdb_net import RRDBNet
from .makeup import GeneratorPSGANAttention
# copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle
import paddle.nn as nn
import paddle.nn.functional as F
import functools
import numpy as np
from ...modules.norm import build_norm_layer
from .builder import GENERATORS
class PONO(paddle.nn.Layer):
def __init__(self, eps=1e-5):
super(PONO, self).__init__()
self.eps = eps
def forward(self, x):
mean = paddle.mean(x, axis=1, keepdim=True)
var = paddle.mean(paddle.square(x - mean), axis=1, keepdim=True)
tmp = (x - mean) / paddle.sqrt(var + self.eps)
return tmp
class ResidualBlock(paddle.nn.Layer):
"""Residual Block with instance normalization."""
def __init__(self, dim_in, dim_out, mode=None):
super(ResidualBlock, self).__init__()
if mode == 't':
weight_attr = False
bias_attr = False
elif mode == 'p' or (mode is None):
weight_attr = None
bias_attr = None
self.main = nn.Sequential(
nn.Conv2d(dim_in,
dim_out,
kernel_size=3,
stride=1,
padding=1,
bias_attr=False),
nn.InstanceNorm2d(dim_out,
weight_attr=weight_attr,
bias_attr=bias_attr), nn.ReLU(),
nn.Conv2d(dim_out,
dim_out,
kernel_size=3,
stride=1,
padding=1,
bias_attr=False),
nn.InstanceNorm2d(dim_out,
weight_attr=weight_attr,
bias_attr=bias_attr))
def forward(self, x):
"""forward"""
return x + self.main(x)
class StyleResidualBlock(paddle.nn.Layer):
"""Residual Block with instance normalization."""
def __init__(self, dim_in, dim_out):
super(StyleResidualBlock, self).__init__()
self.block1 = nn.Sequential(
nn.Conv2d(dim_in,
dim_out,
kernel_size=3,
stride=1,
padding=1,
bias_attr=False), PONO())
ks = 3
pw = ks // 2
self.beta1 = nn.Conv2d(dim_in, dim_out, kernel_size=ks, padding=pw)
self.gamma1 = nn.Conv2d(dim_in, dim_out, kernel_size=ks, padding=pw)
self.block2 = nn.Sequential(
nn.ReLU(),
nn.Conv2d(dim_out,
dim_out,
kernel_size=3,
stride=1,
padding=1,
bias_attr=False), PONO())
self.beta2 = nn.Conv2d(dim_in, dim_out, kernel_size=ks, padding=pw)
self.gamma2 = nn.Conv2d(dim_in, dim_out, kernel_size=ks, padding=pw)
def forward(self, x, y):
"""forward"""
x_ = self.block1(x)
b = self.beta1(y)
g = self.gamma1(y)
x_ = (g + 1) * x_ + b
x_ = self.block2(x_)
b = self.beta2(y)
g = self.gamma2(y)
x_ = (g + 1) * x_ + b
return x + x_
class MDNet(paddle.nn.Layer):
"""MDNet in PSGAN"""
def __init__(self, conv_dim=64, repeat_num=3):
super(MDNet, self).__init__()
layers = []
layers.append(
nn.Conv2d(3,
conv_dim,
kernel_size=7,
stride=1,
padding=3,
bias_attr=False))
layers.append(
nn.InstanceNorm2d(conv_dim, weight_attr=None, bias_attr=None))
layers.append(nn.ReLU())
# Down-Sampling
curr_dim = conv_dim
for i in range(2):
layers.append(
nn.Conv2d(curr_dim,
curr_dim * 2,
kernel_size=4,
stride=2,
padding=1,
bias_attr=False))
layers.append(
nn.InstanceNorm2d(curr_dim * 2,
weight_attr=None,
bias_attr=None))
layers.append(nn.ReLU())
curr_dim = curr_dim * 2
# Bottleneck
for i in range(repeat_num):
layers.append(ResidualBlock(dim_in=curr_dim, dim_out=curr_dim))
self.main = nn.Sequential(*layers)
def forward(self, x):
"""forward"""
out = self.main(x)
return out
class TNetDown(paddle.nn.Layer):
"""MDNet in PSGAN"""
def __init__(self, conv_dim=64, repeat_num=3):
super(TNetDown, self).__init__()
layers = []
layers.append(
nn.Conv2d(3,
conv_dim,
kernel_size=7,
stride=1,
padding=3,
bias_attr=False))
layers.append(
nn.InstanceNorm2d(conv_dim, weight_attr=False, bias_attr=False))
layers.append(nn.ReLU())
# Down-Sampling
curr_dim = conv_dim
for i in range(2):
layers.append(
nn.Conv2d(curr_dim,
curr_dim * 2,
kernel_size=4,
stride=2,
padding=1,
bias_attr=False))
layers.append(
nn.InstanceNorm2d(curr_dim * 2,
weight_attr=False,
bias_attr=False))
layers.append(nn.ReLU())
curr_dim = curr_dim * 2
# Bottleneck
for i in range(repeat_num):
layers.append(
ResidualBlock(dim_in=curr_dim, dim_out=curr_dim, mode='t'))
self.main = nn.Sequential(*layers)
def forward(self, x):
"""forward"""
out = self.main(x)
return out
class GetMatrix(paddle.fluid.dygraph.Layer):
def __init__(self, dim_in, dim_out):
super(GetMatrix, self).__init__()
self.get_gamma = nn.Conv2d(dim_in,
dim_out,
kernel_size=1,
stride=1,
padding=0,
bias_attr=False)
self.get_beta = nn.Conv2d(dim_in,
dim_out,
kernel_size=1,
stride=1,
padding=0,
bias_attr=False)
def forward(self, x):
gamma = self.get_gamma(x)
beta = self.get_beta(x)
return gamma, beta
class MANet(paddle.nn.Layer):
"""MANet in PSGAN"""
def __init__(self, conv_dim=64, repeat_num=3, w=0.01):
super(MANet, self).__init__()
self.encoder = TNetDown(conv_dim=conv_dim, repeat_num=repeat_num)
curr_dim = conv_dim * 4
self.w = w
self.beta = nn.Conv2d(curr_dim, curr_dim, kernel_size=3, padding=1)
self.gamma = nn.Conv2d(curr_dim, curr_dim, kernel_size=3, padding=1)
self.simple_spade = GetMatrix(curr_dim, 1) # get the makeup matrix
self.repeat_num = repeat_num
for i in range(repeat_num):
setattr(self, "bottlenecks_" + str(i),
ResidualBlock(dim_in=curr_dim, dim_out=curr_dim, mode='t'))
# Up-Sampling
self.upsamplers = []
self.up_betas = []
self.up_gammas = []
self.up_acts = []
y_dim = curr_dim
for i in range(2):
layers = []
layers.append(
nn.ConvTranspose2d(curr_dim,
curr_dim // 2,
kernel_size=4,
stride=2,
padding=1,
bias_attr=False))
layers.append(
nn.InstanceNorm2d(curr_dim // 2,
weight_attr=False,
bias_attr=False))
setattr(self, "up_acts_" + str(i), nn.ReLU())
setattr(
self, "up_betas_" + str(i),
nn.ConvTranspose2d(y_dim,
curr_dim // 2,
kernel_size=4,
stride=2,
padding=1))
setattr(
self, "up_gammas_" + str(i),
nn.ConvTranspose2d(y_dim,
curr_dim // 2,
kernel_size=4,
stride=2,
padding=1))
setattr(self, "up_samplers_" + str(i), nn.Sequential(*layers))
curr_dim = curr_dim // 2
self.img_reg = [
nn.Conv2d(curr_dim,
3,
kernel_size=7,
stride=1,
padding=3,
bias_attr=False)
]
self.img_reg = nn.Sequential(*self.img_reg)
def forward(self, x, y, x_p, y_p, consistency_mask, mask_x, mask_y):
"""forward"""
# y -> ref feature
# x -> src img
x = self.encoder(x)
_, c, h, w = x.shape
x_flat = x.reshape([-1, c, h * w])
x_flat = self.w * x_flat
if x_p is not None:
x_flat = paddle.concat([x_flat, x_p], axis=1)
_, c2, h2, w2 = y.shape
y_flat = y.reshape([-1, c2, h2 * w2])
y_flat = self.w * y_flat
if y_p is not None:
y_flat = paddle.concat([y_flat, y_p], axis=1)
a_ = paddle.matmul(x_flat, y_flat, transpose_x=True) * 200.0
# mask softmax
if consistency_mask is not None:
a_ = a_ - 100.0 * (1 - consistency_mask)
a = F.softmax(a_, axis=-1)
gamma, beta = self.simple_spade(y)
beta = beta.reshape([-1, h2 * w2, 1])
beta = paddle.matmul(a, beta)
beta = beta.reshape([-1, 1, h2, w2])
gamma = gamma.reshape([-1, h2 * w2, 1])
gamma = paddle.matmul(a, gamma)
gamma = gamma.reshape([-1, 1, h2, w2])
x = x * (1 + gamma) + beta
for i in range(self.repeat_num):
layer = getattr(self, "bottlenecks_" + str(i))
x = layer(x)
for idx in range(2):
layer = getattr(self, "up_samplers_" + str(idx))
x = layer(x)
layer = getattr(self, "up_acts_" + str(idx))
x = layer(x)
x = self.img_reg(x)
x = paddle.tanh(x)
return x, a
@GENERATORS.register()
class GeneratorPSGANAttention(paddle.nn.Layer):
def __init__(self, conv_dim=64, repeat_num=3):
super(GeneratorPSGANAttention, self).__init__()
self.ma_net = MANet(conv_dim=conv_dim, repeat_num=repeat_num)
self.md_net = MDNet(conv_dim=conv_dim, repeat_num=repeat_num)
def forward(self, x, y, x_p, y_p, consistency_mask, mask_x, mask_y):
"""forward"""
y = self.md_net(y)
out, a = self.ma_net(x, y, x_p, y_p, consistency_mask, mask_x, mask_y)
return out, a
...@@ -45,7 +45,6 @@ class GANLoss(nn.Layer): ...@@ -45,7 +45,6 @@ class GANLoss(nn.Layer):
Returns: Returns:
A label tensor filled with ground truth label, and with the size of the input A label tensor filled with ground truth label, and with the size of the input
""" """
if target_is_real: if target_is_real:
if not hasattr(self, 'target_real_tensor'): if not hasattr(self, 'target_real_tensor'):
self.target_real_tensor = paddle.fill_constant( self.target_real_tensor = paddle.fill_constant(
......
# copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle
import paddle.nn as nn
import paddle.nn.functional as F
from .base_model import BaseModel
from .builder import MODELS
from .generators.builder import build_generator
from .discriminators.builder import build_discriminator
from .losses import GANLoss
from ..modules.init import init_weights
from ..solver import build_optimizer
from ..utils.image_pool import ImagePool
from ..utils.preprocess import *
from ..datasets.makeup_dataset import MakeupDataset
import numpy as np
from .vgg import vgg16
@MODELS.register()
class MakeupModel(BaseModel):
"""
This class implements the CycleGAN model, for learning image-to-image translation without paired data.
The model training requires '--dataset_mode unaligned' dataset.
By default, it uses a '--netG resnet_9blocks' ResNet generator,
a '--netD basic' discriminator (PatchGAN introduced by pix2pix),
and a least-square GANs objective ('--gan_mode lsgan').
CycleGAN paper: https://arxiv.org/pdf/1703.10593.pdf
"""
def __init__(self, opt):
"""Initialize the CycleGAN class.
Parameters:
opt (Option class)-- stores all the experiment flags; needs to be a subclass of BaseOptions
"""
BaseModel.__init__(self, opt)
# specify the training losses you want to print out. The training/test scripts will call <BaseModel.get_current_losses>
self.loss_names = [
'D_A',
'G_A',
'rec',
'idt',
'D_B',
'G_B',
'G_A_his',
'G_B_his',
'G_bg_consis',
'A_vgg',
'B_vgg',
]
# specify the images you want to save/display. The training/test scripts will call <BaseModel.get_current_visuals>
visual_names_A = ['real_A', 'fake_A', 'rec_A']
visual_names_B = ['real_B', 'fake_B', 'rec_B']
if self.isTrain and self.opt.lambda_identity > 0.0: # if identity loss is used, we also visualize idt_B=G_A(B) ad idt_A=G_A(B)
visual_names_A.append('idt_B')
visual_names_B.append('idt_A')
self.visual_names = visual_names_A + visual_names_B # combine visualizations for A and B
self.vgg = vgg16(pretrained=True)
# specify the models you want to save to the disk. The training/test scripts will call <BaseModel.save_networks> and <BaseModel.load_networks>.
if self.isTrain:
self.model_names = ['G', 'D_A', 'D_B']
else: # during test time, only load Gs
self.model_names = ['G']
# define networks (both Generators and discriminators)
# The naming is different from those used in the paper.
# Code (vs. paper): G_A (G), G_B (F), D_A (D_Y), D_B (D_X)
self.netG = build_generator(opt.model.generator)
init_weights(self.netG, init_type='xavier', init_gain=1.0)
if self.isTrain: # define discriminators
self.netD_A = build_discriminator(opt.model.discriminator)
self.netD_B = build_discriminator(opt.model.discriminator)
init_weights(self.netD_A, init_type='xavier', init_gain=1.0)
init_weights(self.netD_B, init_type='xavier', init_gain=1.0)
if self.isTrain:
self.fake_A_pool = ImagePool(
opt.dataset.train.pool_size
) # create image buffer to store previously generated images
self.fake_B_pool = ImagePool(
opt.dataset.train.pool_size
) # create image buffer to store previously generated images
# define loss functions
self.criterionGAN = GANLoss(
opt.model.gan_mode) #.to(self.device) # define GAN loss.
self.criterionCycle = paddle.nn.L1Loss()
self.criterionIdt = paddle.nn.L1Loss()
self.criterionL1 = paddle.nn.L1Loss()
self.criterionL2 = paddle.nn.MSELoss()
self.build_lr_scheduler()
self.optimizer_G = build_optimizer(
opt.optimizer,
self.lr_scheduler,
parameter_list=self.netG.parameters())
# self.optimizer_D = paddle.optimizer.Adam(learning_rate=lr_scheduler_d, parameter_list=self.netD_A.parameters() + self.netD_B.parameters(), beta1=opt.beta1)
self.optimizer_DA = build_optimizer(
opt.optimizer,
self.lr_scheduler,
parameter_list=self.netD_A.parameters())
self.optimizer_DB = build_optimizer(
opt.optimizer,
self.lr_scheduler,
parameter_list=self.netD_B.parameters())
self.optimizers.append(self.optimizer_G)
# self.optimizers.append(self.optimizer_D)
self.optimizers.append(self.optimizer_DA)
self.optimizers.append(self.optimizer_DB)
self.optimizer_names.extend(
['optimizer_G', 'optimizer_DA', 'optimizer_DB'])
def set_input(self, input):
"""Unpack input data from the dataloader and perform necessary pre-processing steps.
Parameters:
input (dict): include the data itself and its metadata information.
The option 'direction' can be used to swap domain A and domain B.
"""
self.real_A = paddle.to_tensor(input['image_A'])
self.real_B = paddle.to_tensor(input['image_B'])
self.c_m = paddle.to_tensor(input['consis_mask'])
self.P_A = paddle.to_tensor(input['P_A'])
self.P_B = paddle.to_tensor(input['P_B'])
self.mask_A_aug = paddle.to_tensor(input['mask_A_aug'])
self.mask_B_aug = paddle.to_tensor(input['mask_B_aug'])
self.c_m_t = paddle.transpose(self.c_m, perm=[0, 2, 1])
if self.isTrain:
self.mask_A = paddle.to_tensor(input['mask_A'])
self.mask_B = paddle.to_tensor(input['mask_B'])
self.c_m_idt_a = paddle.to_tensor(input['consis_mask_idt_A'])
self.c_m_idt_b = paddle.to_tensor(input['consis_mask_idt_B'])
#self.hm_gt_A = self.hm_gt_A_lip + self.hm_gt_A_skin + self.hm_gt_A_eye
#self.hm_gt_B = self.hm_gt_B_lip + self.hm_gt_B_skin + self.hm_gt_B_eye
def forward(self):
"""Run forward pass; called by both functions <optimize_parameters> and <test>."""
self.fake_A, amm = self.netG(self.real_A, self.real_B, self.P_A,
self.P_B, self.c_m, self.mask_A_aug,
self.mask_B_aug) # G_A(A)
self.fake_B, _ = self.netG(self.real_B, self.real_A, self.P_B, self.P_A,
self.c_m_t, self.mask_A_aug,
self.mask_B_aug) # G_A(A)
self.rec_A, _ = self.netG(self.fake_A, self.real_A, self.P_A, self.P_A,
self.c_m_idt_a, self.mask_A_aug,
self.mask_B_aug) # G_A(A)
self.rec_B, _ = self.netG(self.fake_B, self.real_B, self.P_B, self.P_B,
self.c_m_idt_b, self.mask_A_aug,
self.mask_B_aug) # G_A(A)
def forward_test(self, input):
'''
not implement now
'''
return self.netG(input['image_A'], input['image_B'], input['P_A'],
input['P_B'], input['consis_mask'],
input['mask_A_aug'], input['mask_B_aug'])
def test(self, input):
"""Forward function used in test time.
This function wraps <forward> function in no_grad() so we don't save intermediate steps for backprop
It also calls <compute_visuals> to produce additional visualization results
"""
with paddle.no_grad():
return self.forward_test(input)
def backward_D_basic(self, netD, real, fake):
"""Calculate GAN loss for the discriminator
Parameters:
netD (network) -- the discriminator D
real (tensor array) -- real images
fake (tensor array) -- images generated by a generator
Return the discriminator loss.
We also call loss_D.backward() to calculate the gradients.
"""
# Real
pred_real = netD(real)
loss_D_real = self.criterionGAN(pred_real, True)
# Fake
pred_fake = netD(fake.detach())
loss_D_fake = self.criterionGAN(pred_fake, False)
# Combined loss and calculate gradients
loss_D = (loss_D_real + loss_D_fake) * 0.5
loss_D.backward()
return loss_D
def backward_D_A(self):
"""Calculate GAN loss for discriminator D_A"""
fake_B = self.fake_B_pool.query(self.fake_B)
self.loss_D_A = self.backward_D_basic(self.netD_A, self.real_B, fake_B)
def backward_D_B(self):
"""Calculate GAN loss for discriminator D_B"""
fake_A = self.fake_A_pool.query(self.fake_A)
self.loss_D_B = self.backward_D_basic(self.netD_B, self.real_A, fake_A)
def backward_G(self):
"""Calculate the loss for generators G_A and G_B"""
'''
self.loss_names = [
'G_A_vgg',
'G_B_vgg',
'G_bg_consis'
]
# specify the images you want to save/display. The training/test scripts will call <BaseModel.get_current_visuals>
visual_names_A = ['real_A', 'fake_B', 'rec_A', 'amm_a']
visual_names_B = ['real_B', 'fake_A', 'rec_B', 'amm_b']
'''
lambda_idt = self.opt.lambda_identity
lambda_A = self.opt.lambda_A
lambda_B = self.opt.lambda_B
lambda_vgg = 5e-3
# Identity loss
if lambda_idt > 0:
self.idt_A, _ = self.netG(self.real_A, self.real_A, self.P_A,
self.P_A, self.c_m_idt_a, self.mask_A_aug,
self.mask_B_aug) # G_A(A)
self.loss_idt_A = self.criterionIdt(
self.idt_A, self.real_A) * lambda_A * lambda_idt
self.idt_B, _ = self.netG(self.real_B, self.real_B, self.P_B,
self.P_B, self.c_m_idt_b, self.mask_A_aug,
self.mask_B_aug) # G_A(A)
self.loss_idt_B = self.criterionIdt(
self.idt_B, self.real_B) * lambda_B * lambda_idt
else:
self.loss_idt_A = 0
self.loss_idt_B = 0
# GAN loss D_A(G_A(A))
self.loss_G_A = self.criterionGAN(self.netD_A(self.fake_A), True)
# GAN loss D_B(G_B(B))
self.loss_G_B = self.criterionGAN(self.netD_B(self.fake_B), True)
# Forward cycle loss || G_B(G_A(A)) - A||
self.loss_cycle_A = self.criterionCycle(self.rec_A,
self.real_A) * lambda_A
# Backward cycle loss || G_A(G_B(B)) - B||
self.loss_cycle_B = self.criterionCycle(self.rec_B,
self.real_B) * lambda_B
mask_A_lip = self.mask_A_aug[:, 0].unsqueeze(1)
mask_B_lip = self.mask_B_aug[:, 0].unsqueeze(1)
mask_A_lip_np = mask_A_lip.numpy().squeeze()
mask_B_lip_np = mask_B_lip.numpy().squeeze()
mask_A_lip_np, mask_B_lip_np, index_A_lip, index_B_lip = mask_preprocess(
mask_A_lip_np, mask_B_lip_np)
real_A = paddle.nn.clip((self.real_A + 1.0) / 2.0, 0.0, 1.0) * 255.0
real_A_np = real_A.numpy().squeeze()
real_B = paddle.nn.clip((self.real_B + 1.0) / 2.0, 0.0, 1.0) * 255.0
real_B_np = real_B.numpy().squeeze()
fake_A = paddle.nn.clip((self.fake_A + 1.0) / 2.0, 0.0, 1.0) * 255.0
fake_A_np = fake_A.numpy().squeeze()
fake_B = paddle.nn.clip((self.fake_B + 1.0) / 2.0, 0.0, 1.0) * 255.0
fake_B_np = fake_B.numpy().squeeze()
fake_match_lip_A = hisMatch(fake_A_np, real_B_np, mask_A_lip_np,
mask_B_lip_np, index_A_lip)
fake_match_lip_B = hisMatch(fake_B_np, real_A_np, mask_B_lip_np,
mask_A_lip_np, index_B_lip)
fake_match_lip_A = paddle.to_tensor(fake_match_lip_A)
fake_match_lip_A.stop_gradient = True
fake_match_lip_A = fake_match_lip_A.unsqueeze(0)
fake_match_lip_B = paddle.to_tensor(fake_match_lip_B)
fake_match_lip_B.stop_gradient = True
fake_match_lip_B = fake_match_lip_B.unsqueeze(0)
fake_A_lip_masked = fake_A * mask_A_lip
fake_B_lip_masked = fake_B * mask_B_lip
g_A_lip_loss_his = self.criterionL1(fake_A_lip_masked, fake_match_lip_A)
g_B_lip_loss_his = self.criterionL1(fake_B_lip_masked, fake_match_lip_B)
#skin
mask_A_skin = self.mask_A_aug[:, 1].unsqueeze(1)
mask_B_skin = self.mask_B_aug[:, 1].unsqueeze(1)
mask_A_skin_np = mask_A_skin.numpy().squeeze()
mask_B_skin_np = mask_B_skin.numpy().squeeze()
mask_A_skin_np, mask_B_skin_np, index_A_skin, index_B_skin = mask_preprocess(
mask_A_skin_np, mask_B_skin_np)
fake_match_skin_A = hisMatch(fake_A_np, real_B_np, mask_A_skin_np,
mask_B_skin_np, index_A_skin)
fake_match_skin_B = hisMatch(fake_B_np, real_A_np, mask_B_skin_np,
mask_A_skin_np, index_B_skin)
fake_match_skin_A = paddle.to_tensor(fake_match_skin_A)
fake_match_skin_A.stop_gradient = True
fake_match_skin_A = fake_match_skin_A.unsqueeze(0)
fake_match_skin_B = paddle.to_tensor(fake_match_skin_B)
fake_match_skin_B.stop_gradient = True
fake_match_skin_B = fake_match_skin_B.unsqueeze(0)
fake_A_skin_masked = fake_A * mask_A_skin
fake_B_skin_masked = fake_B * mask_B_skin
g_A_skin_loss_his = self.criterionL1(fake_A_skin_masked,
fake_match_skin_A)
g_B_skin_loss_his = self.criterionL1(fake_B_skin_masked,
fake_match_skin_B)
#eye
mask_A_eye = self.mask_A_aug[:, 2].unsqueeze(1)
mask_B_eye = self.mask_B_aug[:, 2].unsqueeze(1)
mask_A_eye_np = mask_A_eye.numpy().squeeze()
mask_B_eye_np = mask_B_eye.numpy().squeeze()
mask_A_eye_np, mask_B_eye_np, index_A_eye, index_B_eye = mask_preprocess(
mask_A_eye_np, mask_B_eye_np)
fake_match_eye_A = hisMatch(fake_A_np, real_B_np, mask_A_eye_np,
mask_B_eye_np, index_A_eye)
fake_match_eye_B = hisMatch(fake_B_np, real_A_np, mask_B_eye_np,
mask_A_eye_np, index_B_eye)
fake_match_eye_A = paddle.to_tensor(fake_match_eye_A)
fake_match_eye_A.stop_gradient = True
fake_match_eye_A = fake_match_eye_A.unsqueeze(0)
fake_match_eye_B = paddle.to_tensor(fake_match_eye_B)
fake_match_eye_B.stop_gradient = True
fake_match_eye_B = fake_match_eye_B.unsqueeze(0)
fake_A_eye_masked = fake_A * mask_A_eye
fake_B_eye_masked = fake_B * mask_B_eye
g_A_eye_loss_his = self.criterionL1(fake_A_eye_masked, fake_match_eye_A)
g_B_eye_loss_his = self.criterionL1(fake_B_eye_masked, fake_match_eye_B)
self.loss_G_A_his = (g_A_eye_loss_his + g_A_lip_loss_his +
g_A_skin_loss_his * 0.1) * 0.01
self.loss_G_B_his = (g_B_eye_loss_his + g_B_lip_loss_his +
g_B_skin_loss_his * 0.1) * 0.01
#self.loss_G_A_his = self.criterionL1(tmp_1, tmp_2) * 2048 * 255
#tmp_3 = self.hm_gt_B*self.hm_mask_weight_B
#tmp_4 = self.fake_B*self.hm_mask_weight_B
#self.loss_G_B_his = self.criterionL1(tmp_3, tmp_4) * 2048 * 255
#vgg loss
vgg_s = self.vgg(self.real_A)
vgg_s.stop_gradient = True
vgg_fake_A = self.vgg(self.fake_A)
self.loss_A_vgg = self.criterionL2(vgg_fake_A,
vgg_s) * lambda_A * lambda_vgg
vgg_r = self.vgg(self.real_B)
vgg_r.stop_gradient = True
vgg_fake_B = self.vgg(self.fake_B)
self.loss_B_vgg = self.criterionL2(vgg_fake_B,
vgg_r) * lambda_B * lambda_vgg
self.loss_rec = (self.loss_cycle_A + self.loss_cycle_B +
self.loss_A_vgg + self.loss_B_vgg) * 0.2
self.loss_idt = (self.loss_idt_A + self.loss_idt_B) * 0.2
# bg consistency loss
mask_A_consis = paddle.cast(
(self.mask_A == 0), dtype='float32') + paddle.cast(
(self.mask_A == 10), dtype='float32') + paddle.cast(
(self.mask_A == 8), dtype='float32')
mask_A_consis = paddle.unsqueeze(paddle.clip(mask_A_consis, 0, 1), 1)
self.loss_G_bg_consis = self.criterionL1(
self.real_A * mask_A_consis, self.fake_A * mask_A_consis) * 0.1
# combined loss and calculate gradients
self.loss_G = self.loss_G_A + self.loss_G_B + self.loss_rec + self.loss_idt + self.loss_G_A_his + self.loss_G_B_his + self.loss_G_bg_consis
self.loss_G.backward()
def optimize_parameters(self):
"""Calculate losses, gradients, and update network weights; called in every training iteration"""
# forward
self.forward() # compute fake images and reconstruction images.
# G_A and G_B
self.set_requires_grad(
[self.netD_A, self.netD_B],
False) # Ds require no gradients when optimizing Gs
# self.optimizer_G.clear_gradients() #zero_grad() # set G_A and G_B's gradients to zero
self.backward_G() # calculate gradients for G_A and G_B
self.optimizer_G.minimize(
self.loss_G) #step() # update G_A and G_B's weights
self.optimizer_G.clear_gradients()
# self.optimizer_G.clear_gradients()
# D_A and D_B
# self.set_requires_grad([self.netD_A, self.netD_B], True)
self.set_requires_grad(self.netD_A, True)
# self.optimizer_D.clear_gradients() #zero_grad() # set D_A and D_B's gradients to zero
self.backward_D_A() # calculate gradients for D_A
self.optimizer_DA.minimize(
self.loss_D_A) #step() # update D_A and D_B's weights
self.optimizer_DA.clear_gradients() #zero_g
self.set_requires_grad(self.netD_B, True)
# self.optimizer_DB.clear_gradients() #zero_grad() # set D_A and D_B's gradients to zero
self.backward_D_B() # calculate graidents for D_B
self.optimizer_DB.minimize(
self.loss_D_B) #step() # update D_A and D_B's weights
self.optimizer_DB.clear_gradients(
) #zero_grad() # set D_A and D_B's gradients to zero
...@@ -81,8 +81,8 @@ class Pix2PixModel(BaseModel): ...@@ -81,8 +81,8 @@ class Pix2PixModel(BaseModel):
""" """
AtoB = self.opt.dataset.train.direction == 'AtoB' AtoB = self.opt.dataset.train.direction == 'AtoB'
self.real_A = paddle.to_variable(input['A' if AtoB else 'B']) self.real_A = paddle.to_tensor(input['A' if AtoB else 'B'])
self.real_B = paddle.to_variable(input['B' if AtoB else 'A']) self.real_B = paddle.to_tensor(input['B' if AtoB else 'A'])
self.image_paths = input['A_paths' if AtoB else 'B_paths'] self.image_paths = input['A_paths' if AtoB else 'B_paths']
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle
import paddle.nn as nn
from paddle.utils.download import get_weights_path_from_url
from paddle.vision.models.vgg import make_layers
cfg = [
64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M', 512, 512, 512, 'M', 512,
512, 512, 'M'
]
model_urls = {
'vgg16': ('https://paddle-hapi.bj.bcebos.com/models/vgg16.pdparams',
'89bbffc0f87d260be9b8cdc169c991c4')
}
class VGG(nn.Layer):
def __init__(self, features):
super(VGG, self).__init__()
self.features = features
def forward(self, x):
x = self.features(x)
return x
def vgg16(pretrained=False):
features = make_layers(cfg)
model = VGG(features)
if pretrained:
weight_path = get_weights_path_from_url(model_urls['vgg16'][0],
model_urls['vgg16'][1])
param, _ = paddle.load(weight_path)
model.load_dict(param)
return model
# copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle import paddle
import paddle.nn as nn import paddle.nn as nn
import math
class _SpectralNorm(nn.SpectralNorm): class _SpectralNorm(nn.SpectralNorm):
......
import paddle import paddle
import functools import functools
import paddle.nn as nn import paddle.nn as nn
from .nn import Spectralnorm
class Identity(nn.Layer): class Identity(nn.Layer):
...@@ -35,6 +36,8 @@ def build_norm_layer(norm_type='instance'): ...@@ -35,6 +36,8 @@ def build_norm_layer(norm_type='instance'):
bias_attr=paddle.ParamAttr(initializer=nn.initializer.Constant(0.0), bias_attr=paddle.ParamAttr(initializer=nn.initializer.Constant(0.0),
learning_rate=0.0, learning_rate=0.0,
trainable=False)) trainable=False))
elif norm_type == 'spectral':
norm_layer = functools.partial(Spectralnorm)
elif norm_type == 'none': elif norm_type == 'none':
def norm_layer(x): def norm_layer(x):
......
...@@ -12,25 +12,8 @@ def build_lr_scheduler(cfg): ...@@ -12,25 +12,8 @@ def build_lr_scheduler(cfg):
0, epoch + 1 - cfg.start_epoch) / float(cfg.decay_epochs + 1) 0, epoch + 1 - cfg.start_epoch) / float(cfg.decay_epochs + 1)
return lr_l return lr_l
scheduler = paddle.optimizer.lr_scheduler.LambdaLR( scheduler = paddle.optimizer.lr.LambdaDecay(cfg.learning_rate,
cfg.learning_rate, lr_lambda=lambda_rule) lr_lambda=lambda_rule)
return scheduler return scheduler
else: else:
raise NotImplementedError raise NotImplementedError
# paddle.optimizer.lr_scheduler
class LinearDecay(paddle.optimizer.lr_scheduler._LRScheduler):
def __init__(self, learning_rate, step_per_epoch, start_epoch,
decay_epochs):
super(LinearDecay, self).__init__()
self.learning_rate = learning_rate
self.start_epoch = start_epoch
self.decay_epochs = decay_epochs
self.step_per_epoch = step_per_epoch
def step(self):
cur_epoch = int(self.step_num // self.step_per_epoch)
decay_rate = 1.0 - max(
0, cur_epoch + 1 - self.start_epoch) / float(self.decay_epochs + 1)
return self.create_lr_var(decay_rate * self.learning_rate)
# copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse import argparse
def parse_args(): def parse_args():
parser = argparse.ArgumentParser(description='Segmentron') parser = argparse.ArgumentParser(description='Segmentron')
parser.add_argument('--config-file', metavar="FILE", parser.add_argument('--config-file',
metavar="FILE",
help='config file path') help='config file path')
# cuda setting # cuda setting
parser.add_argument('--no-cuda', action='store_true', default=False, parser.add_argument('--no-cuda',
action='store_true',
default=False,
help='disables CUDA training') help='disables CUDA training')
# checkpoint and log # checkpoint and log
parser.add_argument('--resume', type=str, default=None, parser.add_argument('--resume',
type=str,
default=None,
help='put the path to resuming file if needed') help='put the path to resuming file if needed')
parser.add_argument('--load', type=str, default=None, parser.add_argument('--load',
type=str,
default=None,
help='put the path to resuming file if needed') help='put the path to resuming file if needed')
# for evaluation # for evaluation
parser.add_argument('--val-interval', type=int, default=1, parser.add_argument('--val-interval',
type=int,
default=1,
help='run validation every interval') help='run validation every interval')
parser.add_argument('--evaluate-only', action='store_true', default=False, parser.add_argument('--evaluate-only',
action='store_true',
default=False,
help='skip validation during training') help='skip validation during training')
# config options # config options
parser.add_argument('opts', help='See config for all options', parser.add_argument('opts',
default=None, nargs=argparse.REMAINDER) help='See config for all options',
default=None,
nargs=argparse.REMAINDER)
#for inference
parser.add_argument("--source_path",
default="",
metavar="FILE",
help="path to source image")
parser.add_argument("--reference_dir",
default="",
help="path to reference images")
parser.add_argument("--model_path", default="", help="model for loading")
args = parser.parse_args() args = parser.parse_args()
return args return args
# copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import cv2
import numpy as np
def generate_P_from_lmks(lmks, resize, w, h):
"""generate P from lmks"""
diff_size = (64, 64)
xs, ys = np.meshgrid(np.linspace(0, resize - 1, resize),
np.linspace(0, resize - 1, resize))
xs = xs[None].repeat(68, axis=0)
ys = ys[None].repeat(68, axis=0)
fix = np.concatenate([ys, xs], axis=0)
lmks = lmks.transpose(1, 0).reshape(-1, 1, 1)
diff = fix - lmks
diff = diff.transpose(1, 2, 0)
diff = cv2.resize(diff, diff_size, interpolation=cv2.INTER_NEAREST)
diff = diff.transpose(2, 0, 1).reshape(136, -1)
norm = np.linalg.norm(diff, axis=0)
P_np = diff / norm
return P_np
def copy_area(tar, src, lms):
rect = [
int(min(lms[:, 1])) - 16,
int(min(lms[:, 0])) - 16,
int(max(lms[:, 1])) + 16 + 1,
int(max(lms[:, 0])) + 16 + 1
]
tar[rect[1]:rect[3], rect[0]:rect[2]] = \
src[rect[1]:rect[3], rect[0]:rect[2]]
src[rect[1]:rect[3], rect[0]:rect[2]] = 0
def rebound_box(mask, mask_B, mask_face):
"""solver ps"""
index_tmp = mask.nonzero()
x_index = index_tmp[0]
y_index = index_tmp[1]
index_tmp = mask_B.nonzero()
x_B_index = index_tmp[0]
y_B_index = index_tmp[1]
mask_temp = np.copy(mask)
mask_B_temp = np.copy(mask_B)
mask_temp[min(x_index) - 16:max(x_index) + 17, min(y_index) - 16:max(y_index) + 17] =\
mask_face[min(x_index) -
16:max(x_index) +
17, min(y_index) -
16:max(y_index) +
17]
mask_B_temp[min(x_B_index) - 16:max(x_B_index) + 17, min(y_B_index) - 16:max(y_B_index) + 17] =\
mask_face[min(x_B_index) -
16:max(x_B_index) +
17, min(y_B_index) -
16:max(y_B_index) +
17]
return mask_temp, mask_B_temp
def calculate_consis_mask(mask, mask_B):
h_a, w_a = mask.shape[1:]
h_b, w_b = mask_B.shape[1:]
mask_transpose = np.transpose(mask, (1, 2, 0))
mask_B_transpose = np.transpose(mask_B, (1, 2, 0))
mask = cv2.resize(mask_transpose,
dsize=(w_a // 4, h_a // 4),
interpolation=cv2.INTER_NEAREST)
mask = np.transpose(mask, (2, 0, 1))
mask_B = cv2.resize(mask_B_transpose,
dsize=(w_b // 4, h_b // 4),
interpolation=cv2.INTER_NEAREST)
mask_B = np.transpose(mask_B, (2, 0, 1))
"""calculate consistency mask between images"""
h_a, w_a = mask.shape[1:]
h_b, w_b = mask_B.shape[1:]
mask_lip = mask[0]
mask_skin = mask[1]
mask_eye = mask[2]
mask_B_lip = mask_B[0]
mask_B_skin = mask_B[1]
mask_B_eye = mask_B[2]
maskA_one_hot = np.zeros((h_a * w_a, 3))
maskA_one_hot[:, 0] = mask_skin.flatten()
maskA_one_hot[:, 1] = mask_eye.flatten()
maskA_one_hot[:, 2] = mask_lip.flatten()
maskB_one_hot = np.zeros((h_b * w_b, 3))
maskB_one_hot[:, 0] = mask_B_skin.flatten()
maskB_one_hot[:, 1] = mask_B_eye.flatten()
maskB_one_hot[:, 2] = mask_B_lip.flatten()
con_mask = np.matmul(maskA_one_hot.reshape((h_a * w_a, 3)),
np.transpose(maskB_one_hot.reshape((h_b * w_b, 3))))
con_mask = np.clip(con_mask, 0, 1)
return con_mask
def cal_hist(image):
"""
cal cumulative hist for channel list
"""
hists = []
for i in range(0, 3):
channel = image[i]
# channel = image[i, :, :]
#channel = torch.from_numpy(channel)
hist, _ = np.histogram(channel, bins=256, range=(0, 255))
#hist = torch.histc(channel, bins=256, min=0, max=256)
# refHist=hist.view(256,1)
sum = hist.sum()
pdf = [v / sum for v in hist]
for i in range(1, 256):
pdf[i] = pdf[i - 1] + pdf[i]
hists.append(pdf)
return hists
def cal_trans(ref, adj):
"""
calculate transfer function
algorithm refering to wiki item: Histogram matching
"""
table = list(range(0, 256))
for i in list(range(1, 256)):
for j in list(range(1, 256)):
if ref[i] >= adj[j - 1] and ref[i] <= adj[j]:
table[i] = j
break
table[255] = 255
return table
def histogram_matching(dstImg, refImg, index):
"""
perform histogram matching
dstImg is transformed to have the same the histogram with refImg's
index[0], index[1]: the index of pixels that need to be transformed in dstImg
index[2], index[3]: the index of pixels that to compute histogram in refImg
"""
dst_align = [dstImg[i, index[0], index[1]] for i in range(0, 3)]
ref_align = [refImg[i, index[2], index[3]] for i in range(0, 3)]
hist_ref = cal_hist(ref_align)
hist_dst = cal_hist(dst_align)
tables = [cal_trans(hist_dst[i], hist_ref[i]) for i in range(0, 3)]
mid = dst_align.copy()
for i in range(0, 3):
for k in range(0, len(index[0])):
dst_align[i][k] = tables[i][int(mid[i][k])]
for i in range(0, 3):
dstImg[i, index[0], index[1]] = dst_align[i]
return dstImg
def hisMatch(input_data, target_data, mask_src, mask_tar, index):
"""solver ps"""
mask_src = np.float32(np.clip(mask_src, 0, 1))
mask_tar = np.float32(np.clip(mask_tar, 0, 1))
input_masked = np.float32(input_data) * mask_src
target_masked = np.float32(target_data) * mask_tar
input_match = histogram_matching(input_masked, target_masked, index)
return input_match
def mask_preprocess(mask, mask_B):
"""solver ps"""
index_tmp = mask.nonzero()
x_index = index_tmp[0]
y_index = index_tmp[1]
index_tmp = mask_B.nonzero()
x_B_index = index_tmp[0]
y_B_index = index_tmp[1]
index = [x_index, y_index, x_B_index, y_B_index]
index_2 = [x_B_index, y_B_index, x_index, y_index]
return [mask, mask_B, index, index_2]
def generate_mask_aug(mask, lmks):
lms_eye_left = lmks[42:48]
lms_eye_right = lmks[36:42]
mask_eye_left = np.zeros_like(mask)
mask_eye_right = np.zeros_like(mask)
mask_face = np.float32(mask == 1) + np.float32(mask == 6)
copy_area(mask_eye_left, mask_face, lms_eye_left)
copy_area(mask_eye_right, mask_face, lms_eye_right)
mask_skin = mask_face
mask_lip = np.float32(mask == 7) + np.float32(mask == 9)
mask_eye = mask_eye_left + mask_eye_right
mask_aug = np.concatenate(
(np.expand_dims(mask_lip, 0), np.expand_dims(
mask_skin, 0), np.expand_dims(mask_eye, 0)), 0)
return mask_aug
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import os import os
import sys import sys
...@@ -50,4 +51,3 @@ if __name__ == '__main__': ...@@ -50,4 +51,3 @@ if __name__ == '__main__':
cfg = get_config(args.config_file) cfg = get_config(args.config_file)
main(args, cfg) main(args, cfg)
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册