# copyright (c) 2021 PaddlePaddle Authors. All Rights Reserve. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import copy import shutil from functools import partial import importlib import numpy as np import paddle import paddle.nn.functional as F def build_postprocess(config): if config is None: return None mod = importlib.import_module(__name__) config = copy.deepcopy(config) main_indicator = config.pop( "main_indicator") if "main_indicator" in config else None main_indicator = main_indicator if main_indicator else "" func_list = [] for func in config: func_list.append(getattr(mod, func)(**config[func])) return PostProcesser(func_list, main_indicator) class PostProcesser(object): def __init__(self, func_list, main_indicator="Topk"): self.func_list = func_list self.main_indicator = main_indicator def __call__(self, x, image_file=None): rtn = None for func in self.func_list: tmp = func(x, image_file) if type(func).__name__ in self.main_indicator: rtn = tmp return rtn class Topk(object): def __init__(self, topk=1, class_id_map_file=None): assert isinstance(topk, (int, )) self.class_id_map = self.parse_class_id_map(class_id_map_file) self.topk = topk def parse_class_id_map(self, class_id_map_file): if class_id_map_file is None: return None if not os.path.exists(class_id_map_file): print( "Warning: If want to use your own label_dict, please input legal path!\nOtherwise label_names will be empty!" ) return None try: class_id_map = {} with open(class_id_map_file, "r") as fin: lines = fin.readlines() for line in lines: partition = line.split("\n")[0].partition(" ") class_id_map[int(partition[0])] = str(partition[-1]) except Exception as ex: print(ex) class_id_map = None return class_id_map def __call__(self, x, file_names=None, multilabel=False): if file_names is not None: assert x.shape[0] == len(file_names) y = [] for idx, probs in enumerate(x): index = probs.argsort(axis=0)[-self.topk:][::-1].astype( "int32") if not multilabel else np.where( probs >= 0.5)[0].astype("int32") clas_id_list = [] score_list = [] label_name_list = [] for i in index: clas_id_list.append(i.item()) score_list.append(probs[i].item()) if self.class_id_map is not None: label_name_list.append(self.class_id_map[i.item()]) result = { "class_ids": clas_id_list, "scores": np.around( score_list, decimals=5).tolist(), } if file_names is not None: result["file_name"] = file_names[idx] if label_name_list is not None: result["label_names"] = label_name_list y.append(result) return y class MultiLabelTopk(Topk): def __init__(self, topk=1, class_id_map_file=None): super().__init__() def __call__(self, x, file_names=None): return super().__call__(x, file_names, multilabel=True) class SavePreLabel(object): def __init__(self, save_dir): if save_dir is None: raise Exception( "Please specify save_dir if SavePreLabel specified.") self.save_dir = partial(os.path.join, save_dir) def __call__(self, x, file_names=None): if file_names is None: return assert x.shape[0] == len(file_names) for idx, probs in enumerate(x): index = probs.argsort(axis=0)[-1].astype("int32") self.save(index, file_names[idx]) def save(self, id, image_file): output_dir = self.save_dir(str(id)) os.makedirs(output_dir, exist_ok=True) shutil.copy(image_file, output_dir) class Binarize(object): def __init__(self, method="round"): self.method = method self.unit = np.array([[128, 64, 32, 16, 8, 4, 2, 1]]).T def __call__(self, x, file_names=None): if self.method == "round": x = np.round(x + 1).astype("uint8") - 1 if self.method == "sign": x = ((np.sign(x) + 1) / 2).astype("uint8") embedding_size = x.shape[1] assert embedding_size % 8 == 0, "The Binary index only support vectors with sizes multiple of 8" byte = np.zeros([x.shape[0], embedding_size // 8], dtype=np.uint8) for i in range(embedding_size // 8): byte[:, i:i + 1] = np.dot(x[:, i * 8:(i + 1) * 8], self.unit) return byte