# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Reference: https://github.com/CAPTAIN-WHU/DOTA_devkit from __future__ import absolute_import from __future__ import division from __future__ import print_function import os import math import copy from numbers import Number from multiprocessing import Pool import cv2 import numpy as np from tqdm import tqdm import shapely.geometry as shgeo def choose_best_pointorder_fit_another(poly1, poly2): """ To make the two polygons best fit with each point """ x1, y1, x2, y2, x3, y3, x4, y4 = poly1 combinate = [ np.array([x1, y1, x2, y2, x3, y3, x4, y4]), np.array([x2, y2, x3, y3, x4, y4, x1, y1]), np.array([x3, y3, x4, y4, x1, y1, x2, y2]), np.array([x4, y4, x1, y1, x2, y2, x3, y3]) ] dst_coordinate = np.array(poly2) distances = np.array( [np.sum((coord - dst_coordinate)**2) for coord in combinate]) sorted = distances.argsort() return combinate[sorted[0]] def cal_line_length(point1, point2): return math.sqrt( math.pow(point1[0] - point2[0], 2) + math.pow(point1[1] - point2[1], 2)) class SliceBase(object): def __init__(self, gap=512, subsize=1024, thresh=0.7, choosebestpoint=True, ext='.png', padding=True, num_process=8, image_only=False): self.gap = gap self.subsize = subsize self.slide = subsize - gap self.thresh = thresh self.choosebestpoint = choosebestpoint self.ext = ext self.padding = padding self.num_process = num_process self.image_only = image_only def get_windows(self, height, width): windows = [] left, up = 0, 0 while (left < width): if (left + self.subsize >= width): left = max(width - self.subsize, 0) up = 0 while (up < height): if (up + self.subsize >= height): up = max(height - self.subsize, 0) right = min(left + self.subsize, width - 1) down = min(up + self.subsize, height - 1) windows.append((left, up, right, down)) if (up + self.subsize >= height): break else: up = up + self.slide if (left + self.subsize >= width): break else: left = left + self.slide return windows def slice_image_single(self, image, windows, output_dir, output_name): image_dir = os.path.join(output_dir, 'images') for (left, up, right, down) in windows: image_name = output_name + str(left) + '___' + str(up) + self.ext subimg = copy.deepcopy(image[up:up + self.subsize, left:left + self.subsize]) h, w, c = subimg.shape if (self.padding): outimg = np.zeros((self.subsize, self.subsize, 3)) outimg[0:h, 0:w, :] = subimg cv2.imwrite(os.path.join(image_dir, image_name), outimg) else: cv2.imwrite(os.path.join(image_dir, image_name), subimg) def iof(self, poly1, poly2): inter_poly = poly1.intersection(poly2) inter_area = inter_poly.area poly1_area = poly1.area half_iou = inter_area / poly1_area return inter_poly, half_iou def translate(self, poly, left, up): n = len(poly) out_poly = np.zeros(n) for i in range(n // 2): out_poly[i * 2] = int(poly[i * 2] - left) out_poly[i * 2 + 1] = int(poly[i * 2 + 1] - up) return out_poly def get_poly4_from_poly5(self, poly): distances = [ cal_line_length((poly[i * 2], poly[i * 2 + 1]), (poly[(i + 1) * 2], poly[(i + 1) * 2 + 1])) for i in range(int(len(poly) / 2 - 1)) ] distances.append( cal_line_length((poly[0], poly[1]), (poly[8], poly[9]))) pos = np.array(distances).argsort()[0] count = 0 out_poly = [] while count < 5: if (count == pos): out_poly.append( (poly[count * 2] + poly[(count * 2 + 2) % 10]) / 2) out_poly.append( (poly[(count * 2 + 1) % 10] + poly[(count * 2 + 3) % 10]) / 2) count = count + 1 elif (count == (pos + 1) % 5): count = count + 1 continue else: out_poly.append(poly[count * 2]) out_poly.append(poly[count * 2 + 1]) count = count + 1 return out_poly def slice_anno_single(self, annos, windows, output_dir, output_name): anno_dir = os.path.join(output_dir, 'labelTxt') for (left, up, right, down) in windows: image_poly = shgeo.Polygon( [(left, up), (right, up), (right, down), (left, down)]) anno_file = output_name + str(left) + '___' + str(up) + '.txt' with open(os.path.join(anno_dir, anno_file), 'w') as f: for anno in annos: gt_poly = shgeo.Polygon( [(anno['poly'][0], anno['poly'][1]), (anno['poly'][2], anno['poly'][3]), (anno['poly'][4], anno['poly'][5]), (anno['poly'][6], anno['poly'][7])]) if gt_poly.area <= 0: continue inter_poly, iof = self.iof(gt_poly, image_poly) if iof == 1: final_poly = self.translate(anno['poly'], left, up) elif iof > 0: inter_poly = shgeo.polygon.orient(inter_poly, sign=1) out_poly = list(inter_poly.exterior.coords)[0:-1] if len(out_poly) < 4 or len(out_poly) > 5: continue final_poly = [] for p in out_poly: final_poly.append(p[0]) final_poly.append(p[1]) if len(out_poly) == 5: final_poly = self.get_poly4_from_poly5(final_poly) if self.choosebestpoint: final_poly = choose_best_pointorder_fit_another( final_poly, anno['poly']) final_poly = self.translate(final_poly, left, up) final_poly = np.clip(final_poly, 1, self.subsize) else: continue outline = ' '.join(list(map(str, final_poly))) if iof >= self.thresh: outline = outline + ' ' + anno['name'] + ' ' + str(anno[ 'difficult']) else: outline = outline + ' ' + anno['name'] + ' ' + '2' f.write(outline + '\n') def slice_data_single(self, info, rate, output_dir): file_name = info['image_file'] base_name = os.path.splitext(os.path.split(file_name)[-1])[0] base_name = base_name + '__' + str(rate) + '__' img = cv2.imread(file_name) if img.shape == (): return if (rate != 1): resize_img = cv2.resize( img, None, fx=rate, fy=rate, interpolation=cv2.INTER_CUBIC) else: resize_img = img height, width, _ = resize_img.shape windows = self.get_windows(height, width) self.slice_image_single(resize_img, windows, output_dir, base_name) if not self.image_only: self.slice_anno_single(info['annotation'], windows, output_dir, base_name) def check_or_mkdirs(self, path): if not os.path.exists(path): os.makedirs(path, exist_ok=True) def slice_data(self, infos, rates, output_dir): """ Args: infos (list[dict]): data_infos rates (float, list): scale rates output_dir (str): output directory """ if isinstance(rates, Number): rates = [rates, ] self.check_or_mkdirs(output_dir) self.check_or_mkdirs(os.path.join(output_dir, 'images')) if not self.image_only: self.check_or_mkdirs(os.path.join(output_dir, 'labelTxt')) pbar = tqdm(total=len(rates) * len(infos), desc='slicing data') if self.num_process <= 1: for rate in rates: for info in infos: self.slice_data_single(info, rate, output_dir) pbar.update() else: pool = Pool(self.num_process) for rate in rates: for info in infos: pool.apply_async( self.slice_data_single, (info, rate, output_dir), callback=lambda x: pbar.update()) pool.close() pool.join() pbar.close()