未验证 提交 8887832e 编写于 作者: S SunAhong1993 提交者: GitHub

Merge pull request #4 from PaddlePaddle/develop

00
# download pre-compiled paddle encrypt
ENCRYPTION_URL=https://bj.bcebos.com/paddlex/tools/paddlex-encryption.zip
if [ ! -d "./paddlex-encryption" ]; then
wget -c ${ENCRYPTION_URL}
unzip paddlex-encryption.zip
rm -rf paddlex-encryption.zip
fi
# download pre-compiled opencv lib
OPENCV_URL=https://paddleseg.bj.bcebos.com/deploy/docker/opencv3gcc4.8.tar.bz2
if [ ! -d "./deps/opencv3gcc4.8" ]; then
......
......@@ -4,26 +4,27 @@ WITH_GPU=OFF
WITH_MKL=ON
# 是否集成 TensorRT(仅WITH_GPU=ON 有效)
WITH_TENSORRT=OFF
# TensorRT 的lib路径
# TensorRT 的路径
TENSORRT_DIR=/path/to/TensorRT/
# Paddle 预测库路径
PADDLE_DIR=/path/to/fluid_inference/
PADDLE_DIR=/docker/jiangjiajun/PaddleDetection/deploy/cpp/fluid_inference
# Paddle 的预测库是否使用静态库来编译
# 使用TensorRT时,Paddle的预测库通常为动态库
WITH_STATIC_LIB=OFF
# CUDA 的 lib 路径
CUDA_LIB=/path/to/cuda/lib/
CUDA_LIB=/usr/local/cuda/lib64
# CUDNN 的 lib 路径
CUDNN_LIB=/path/to/cudnn/lib/
CUDNN_LIB=/usr/local/cuda/lib64
# 是否加载加密后的模型
WITH_ENCRYPTION=OFF
# 加密工具的路径
ENCRYPTION_DIR=/path/to/encryption_tool/
WITH_ENCRYPTION=ON
# 加密工具的路径, 如果使用自带预编译版本可不修改
sh $(pwd)/scripts/bootstrap.sh # 下载预编译版本的加密工具
ENCRYPTION_DIR=$(pwd)/paddlex-encryption
# OPENCV 路径, 如果使用自带预编译版本可不修改
sh $(pwd)/scripts/bootstrap.sh # 下载预编译版本的opencv
OPENCV_DIR=$(pwd)/deps/opencv3gcc4.8/
sh $(pwd)/scripts/bootstrap.sh
# 以下无需改动
rm -rf build
......
......@@ -31,6 +31,8 @@ void Model::create_predictor(const std::string& model_dir,
std::string params_file = model_dir + OS_PATH_SEP + "__params__";
#ifdef WITH_ENCRYPTION
if (key != ""){
model_file = model_dir + OS_PATH_SEP + "__model__.encrypted";
params_file = model_dir + OS_PATH_SEP + "__params__.encrypted";
paddle_security_load_model(&config, key.c_str(), model_file.c_str(), params_file.c_str());
}
#endif
......
......@@ -8,7 +8,7 @@ paddlex.slim.cal_params_sensitivities(model, save_file, eval_dataset, batch_size
1. 获取模型中可裁剪卷积Kernel的名称。
2. 计算每个可裁剪卷积Kernel不同裁剪率下的敏感度。
【注意】卷积的敏感度是指在不同裁剪率下评估数据集预测精度的损失,通过得到的敏感度,可以决定最终模型需要裁剪的参数列表和各裁剪参数对应的裁剪率。
[查看使用示例](https://github.com/PaddlePaddle/PaddleX/blob/develop/tutorials/compress/classification/cal_sensitivities_file.py#L33)
[查看使用示例](https://github.com/PaddlePaddle/PaddleX/blob/develop/tutorials/compress/classification/cal_sensitivities_file.py#L33) [查看裁剪教程](../tutorials/compress/classification.md)
### 参数
......
......@@ -67,6 +67,12 @@ CUDA_LIB=/path/to/cuda/lib/
# CUDNN 的 lib 路径
CUDNN_LIB=/path/to/cudnn/lib/
# 是否加载加密后的模型
WITH_ENCRYPTION=ON
# 加密工具的路径, 如果使用自带预编译版本可不修改
sh $(pwd)/scripts/bootstrap.sh # 下载预编译版本的加密工具
ENCRYPTION_DIR=$(pwd)/paddlex-encryption
# OPENCV 路径, 如果使用自带预编译版本可不修改
OPENCV_DIR=$(pwd)/deps/opencv3gcc4.8/
sh $(pwd)/scripts/bootstrap.sh
......@@ -79,11 +85,13 @@ cmake .. \
-DWITH_GPU=${WITH_GPU} \
-DWITH_MKL=${WITH_MKL} \
-DWITH_TENSORRT=${WITH_TENSORRT} \
-DWITH_ENCRYPTION=${WITH_ENCRYPTION} \
-DTENSORRT_DIR=${TENSORRT_DIR} \
-DPADDLE_DIR=${PADDLE_DIR} \
-DWITH_STATIC_LIB=${WITH_STATIC_LIB} \
-DCUDA_LIB=${CUDA_LIB} \
-DCUDNN_LIB=${CUDNN_LIB} \
-DENCRYPTION_DIR=${ENCRYPTION_DIR} \
-DOPENCV_DIR=${OPENCV_DIR}
make
......
......@@ -2,48 +2,76 @@
飞桨团队推出模型加密方案,使用业内主流的AES加密技术对最终模型进行加密。飞桨用户可以通过PaddleX导出模型后,使用该方案对模型进行加密,预测时使用解密SDK进行模型解密并完成推理,大大提升AI应用安全性和开发效率。
## 1. 方案介绍
**注意:目前加密方案仅支持Linux系统**
### 1.1 工具组成
## 1. 方案简介
[PaddleX模型加密SDK下载](https://bj.bcebos.com/paddlex/tools/paddlex-encryption.zip)
### 1.1 加密工具
下载并解压后,目录包含内容为:
[PaddleX模型加密工具](https://bj.bcebos.com/paddlex/tools/paddlex-encryption.zip)。在编译部署代码时,编译脚本会自动下载加密工具,您也可以选择手动下载。
加密工具包含内容为:
```
paddle_model_encrypt
├── include # 头文件:paddle_model_decrypt.h(解密)和paddle_model_encrypt.h(加密)
paddlex-encryption
├── include # 头文件:paddle_model_decrypt.h(解密)和paddle_model_encrypt.h(加密)
|
├── lib # libpmodel-encrypt.so和libpmodel-decrypt.so动态库
|
└── tool # paddle_encrypt_tool
└── tool # paddlex_encrypt_tool
```
### 1.2 二进制工具
#### 1.2.1 生成密钥
### 1.2 加密PaddleX模型
产生随机密钥信息(用于AES加解密使用)(32字节key + 16字节iv, 注意这里产生的key是经过base64编码后的,这样可以扩充选取key的范围)
对模型完成加密后,加密工具会产生随机密钥信息(用于AES加解密使用),需要在后续加密部署时传入该密钥来用于解密。
> 密钥由32字节key + 16字节iv组成, 注意这里产生的key是经过base64编码后的,这样可以扩充key的选取范围
```
paddle_encrypt_tool -g
./paddlex-encryption/tool/paddlex_encrypt_tool -model_dir /path/to/paddlex_inference_model -save_dir /path/to/paddlex_encrypted_model
```
#### 1.2.1 文件加密
`-model_dir`用于指定inference模型路径,可使用[导出小度熊识别模型](deploy.md#导出inference模型)中导出的`inference_model`。加密完成后,加密过的模型会保存至指定的`-save_dir`下,包含`__model__.encrypted``__params__.encrypted``model.yml`三个文件,同时生成密钥信息,命令输出如下图所示,密钥为`kLAl1qOs5uRbFt0/RrIDTZW2+tOf5bzvUIaHGF8lJ1c=`
![](images/encryt.png)
## 2. PaddleX C++加密部署
参考[Linux平台编译指南](deploy_cpp_linux.md)编译C++部署代码。编译成功后,预测demo的可执行程序分别为`build/demo/detector``build/demo/classifer``build/demo/segmenter`,用户可根据自己的模型类型选择,其主要命令参数说明如下:
| 参数 | 说明 |
| ---- | ---- |
| model_dir | 导出的预测模型所在路径 |
| image | 要预测的图片文件路径 |
| image_list | 按行存储图片路径的.txt文件 |
| use_gpu | 是否使用 GPU 预测, 支持值为0或1(默认值为0) |
| use_trt | 是否使用 TensorTr 预测, 支持值为0或1(默认值为0) |
| gpu_id | GPU 设备ID, 默认值为0 |
| save_dir | 保存可视化结果的路径, 默认值为"output",classfier无该参数 |
| key | 加密过程中产生的密钥信息,默认值为""表示加载的是未加密的模型 |
## 样例
可使用[导出小度熊识别模型](deploy.md#导出inference模型)中的测试图片进行预测。
`样例一`
不使用`GPU`测试图片 `/path/to/xiaoduxiong.jpeg`
```shell
./build/demo/detector --model_dir=/path/to/inference_model --image=/path/to/xiaoduxiong.jpeg --save_dir=output --key=kLAl1qOs5uRbFt0/RrIDTZW2+tOf5bzvUIaHGF8lJ1c=
```
paddle_encrypt_tool -e -key keydata -infile infile -outfile outfile
```
`--key`传入加密工具输出的密钥,例如`kLAl1qOs5uRbFt0/RrIDTZW2+tOf5bzvUIaHGF8lJ1c=`, 图片文件`可视化预测结果`会保存在`save_dir`参数设置的目录下。
#### 1.3 SDK
`样例二`:
使用`GPU`预测多个图片`/path/to/image_list.txt`,image_list.txt内容的格式如下:
```
// 加密API
int paddle_encrypt_model(const char* keydata, const char* infile, const char* outfile);
// 加载加密模型API:
int paddle_security_load_model(
paddle::AnalysisConfig *config,
const char *key,
const char *model_file,
const char *param_file);
/path/to/images/xiaoduxiong1.jpeg
/path/to/images/xiaoduxiong2.jpeg
...
/path/to/images/xiaoduxiongn.jpeg
```
## 2. PaddleX C++加密部署
```shell
./build/demo/detector --model_dir=/path/to/models/inference_model --image_list=/root/projects/images_list.txt --use_gpu=1 --save_dir=output --key=kLAl1qOs5uRbFt0/RrIDTZW2+tOf5bzvUIaHGF8lJ1c=
```
`--key`传入加密工具输出的密钥,例如`kLAl1qOs5uRbFt0/RrIDTZW2+tOf5bzvUIaHGF8lJ1c=`, 图片文件`可视化预测结果`会保存在`save_dir`参数设置的目录下。
......@@ -29,6 +29,8 @@ from . import cls
from . import slim
from . import convertor
from . import tools
from . import interpret
from . import deploy
try:
import pycocotools
......@@ -40,9 +42,9 @@ except:
"[WARNING] pycocotools install: https://github.com/PaddlePaddle/PaddleX/blob/develop/docs/install.md"
)
#import paddlehub as hub
#if hub.version.hub_version < '1.6.2':
# raise Exception("[ERROR] paddlehub >= 1.6.2 is required")
import paddlehub as hub
if hub.version.hub_version < '1.6.2':
raise Exception("[ERROR] paddlehub >= 1.6.2 is required")
env_info = get_environ_info()
load_model = cv.models.load_model
......
......@@ -27,7 +27,6 @@ from .base import BaseAPI
class BaseClassifier(BaseAPI):
"""构建分类器,并实现其训练、评估、预测和模型导出。
Args:
model_name (str): 分类器的模型名字,取值范围为['ResNet18',
'ResNet34', 'ResNet50', 'ResNet101',
......@@ -65,6 +64,8 @@ class BaseClassifier(BaseAPI):
softmax_out = fluid.layers.softmax(net_out, use_cudnn=False)
inputs = OrderedDict([('image', image)])
outputs = OrderedDict([('predict', softmax_out)])
if mode == 'test':
self.interpretation_feats = OrderedDict([('logits', net_out)])
if mode != 'test':
cost = fluid.layers.cross_entropy(input=softmax_out, label=label)
avg_cost = fluid.layers.mean(cost)
......@@ -115,7 +116,6 @@ class BaseClassifier(BaseAPI):
early_stop_patience=5,
resume_checkpoint=None):
"""训练。
Args:
num_epochs (int): 训练迭代轮数。
train_dataset (paddlex.datasets): 训练数据读取器。
......@@ -139,7 +139,6 @@ class BaseClassifier(BaseAPI):
early_stop_patience (int): 当使用提前终止训练策略时,如果验证集精度在`early_stop_patience`个epoch内
连续下降或持平,则终止训练。默认值为5。
resume_checkpoint (str): 恢复训练时指定上次训练保存的模型路径。若为None,则不会恢复训练。默认值为None。
Raises:
ValueError: 模型从inference model进行加载。
"""
......@@ -183,13 +182,11 @@ class BaseClassifier(BaseAPI):
epoch_id=None,
return_details=False):
"""评估。
Args:
eval_dataset (paddlex.datasets): 验证数据读取器。
batch_size (int): 验证数据批大小。默认为1。
epoch_id (int): 当前评估模型所在的训练轮数。
return_details (bool): 是否返回详细信息。
Returns:
dict: 当return_details为False时,返回dict, 包含关键字:'acc1'、'acc5',
分别表示最大值的accuracy、前5个最大值的accuracy。
......@@ -248,12 +245,10 @@ class BaseClassifier(BaseAPI):
def predict(self, img_file, transforms=None, topk=1):
"""预测。
Args:
img_file (str): 预测图像路径。
transforms (paddlex.cls.transforms): 数据预处理操作。
topk (int): 预测时前k个最大值。
Returns:
list: 其中元素均为字典。字典的关键字为'category_id'、'category'、'score',
分别对应预测类别id、预测类别标签、预测得分。
......
......@@ -136,10 +136,8 @@ class DarkNet(object):
def __call__(self, input):
"""
Get the backbone of DarkNet, that is output for the 5 stages.
Args:
input (Variable): input variable.
Returns:
The last variables of each stage.
"""
......
......@@ -24,7 +24,6 @@ from paddle.fluid.regularizer import L2Decay
class MobileNetV1(object):
"""
MobileNet v1, see https://arxiv.org/abs/1704.04861
Args:
norm_type (str): normalization type, 'bn' and 'sync_bn' are supported
norm_decay (float): weight decay for normalization layer weights
......
# copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import os.path as osp
import cv2
import numpy as np
import yaml
import paddlex
import paddle.fluid as fluid
class Predictor:
def __init__(self,
model_dir,
use_gpu=True,
gpu_id=0,
use_mkl=False,
use_trt=False,
use_glog=False,
memory_optimize=True):
""" 创建Paddle Predictor
Args:
model_dir: 模型路径(必须是导出的部署或量化模型)
use_gpu: 是否使用gpu,默认True
gpu_id: 使用gpu的id,默认0
use_mkl: 是否使用mkldnn计算库,CPU情况下使用,默认False
use_trt: 是否使用TensorRT,默认False
use_glog: 是否启用glog日志, 默认False
memory_optimize: 是否启动内存优化,默认True
"""
if not osp.isdir(model_dir):
raise Exception("[ERROR] Path {} not exist.".format(model_dir))
if not osp.exists(osp.join(model_dir, "model.yml")):
raise Exception("There's not model.yml in {}".format(model_dir))
with open(osp.join(model_dir, "model.yml")) as f:
self.info = yaml.load(f.read(), Loader=yaml.Loader)
self.status = self.info['status']
if self.status != "Quant" and self.status != "Infer":
raise Exception("[ERROR] Only quantized model or exported "
"inference model is supported.")
self.model_dir = model_dir
self.model_type = self.info['_Attributes']['model_type']
self.model_name = self.info['Model']
self.num_classes = self.info['_Attributes']['num_classes']
self.labels = self.info['_Attributes']['labels']
if self.info['Model'] == 'MaskRCNN':
if self.info['_init_params']['with_fpn']:
self.mask_head_resolution = 28
else:
self.mask_head_resolution = 14
transforms_mode = self.info.get('TransformsMode', 'RGB')
if transforms_mode == 'RGB':
to_rgb = True
else:
to_rgb = False
self.transforms = self.build_transforms(self.info['Transforms'],
to_rgb)
self.predictor = self.create_predictor(
use_gpu, gpu_id, use_mkl, use_trt, use_glog, memory_optimize)
def create_predictor(self,
use_gpu=True,
gpu_id=0,
use_mkl=False,
use_trt=False,
use_glog=False,
memory_optimize=True):
config = fluid.core.AnalysisConfig(
os.path.join(self.model_dir, '__model__'),
os.path.join(self.model_dir, '__params__'))
if use_gpu:
# 设置GPU初始显存(单位M)和Device ID
config.enable_use_gpu(100, gpu_id)
else:
config.disable_gpu()
if use_mkl:
config.enable_mkldnn()
if use_glog:
config.enable_glog_info()
else:
config.disable_glog_info()
if memory_optimize:
config.enable_memory_optim()
else:
config.diable_memory_optim()
# 开启计算图分析优化,包括OP融合等
config.switch_ir_optim(True)
# 关闭feed和fetch OP使用,使用ZeroCopy接口必须设置此项
config.switch_use_feed_fetch_ops(False)
predictor = fluid.core.create_paddle_predictor(config)
return predictor
def build_transforms(self, transforms_info, to_rgb=True):
if self.model_type == "classifier":
from paddlex.cls import transforms
elif self.model_type == "detector":
from paddlex.det import transforms
elif self.model_type == "segmenter":
from paddlex.seg import transforms
op_list = list()
for op_info in transforms_info:
op_name = list(op_info.keys())[0]
op_attr = op_info[op_name]
if not hasattr(transforms, op_name):
raise Exception(
"There's no operator named '{}' in transforms of {}".
format(op_name, self.model_type))
op_list.append(getattr(transforms, op_name)(**op_attr))
eval_transforms = transforms.Compose(op_list)
if hasattr(eval_transforms, 'to_rgb'):
eval_transforms.to_rgb = to_rgb
self.arrange_transforms(eval_transforms)
return eval_transforms
def arrange_transforms(self, transforms):
if self.model_type == 'classifier':
arrange_transform = paddlex.cls.transforms.ArrangeClassifier
elif self.model_type == 'segmenter':
arrange_transform = paddlex.seg.transforms.ArrangeSegmenter
elif self.model_type == 'detector':
arrange_name = 'Arrange{}'.format(self.model_name)
arrange_transform = getattr(paddlex.det.transforms, arrange_name)
else:
raise Exception("Unrecognized model type: {}".format(
self.model_type))
if type(transforms.transforms[-1]).__name__.startswith('Arrange'):
transforms.transforms[-1] = arrange_transform(mode='test')
else:
transforms.transforms.append(arrange_transform(mode='test'))
def preprocess(self, image):
""" 对图像做预处理
Args:
image(str|np.ndarray): 图片路径或np.ndarray,如为后者,要求是BGR格式
"""
res = dict()
if self.model_type == "classifier":
im, = self.transforms(image)
im = np.expand_dims(im, axis=0).copy()
res['image'] = im
elif self.model_type == "detector":
if self.model_name == "YOLOv3":
im, im_shape = self.transforms(image)
im = np.expand_dims(im, axis=0).copy()
im_shape = np.expand_dims(im_shape, axis=0).copy()
res['image'] = im
res['im_size'] = im_shape
if self.model_name.count('RCNN') > 0:
im, im_resize_info, im_shape = self.transforms(image)
im = np.expand_dims(im, axis=0).copy()
im_resize_info = np.expand_dims(im_resize_info, axis=0).copy()
im_shape = np.expand_dims(im_shape, axis=0).copy()
res['image'] = im
res['im_info'] = im_resize_info
res['im_shape'] = im_shape
elif self.model_type == "segmenter":
im, im_info = self.transforms(image)
im = np.expand_dims(im, axis=0).copy()
res['image'] = im
res['im_info'] = im_info
return res
def raw_predict(self, inputs):
""" 接受预处理过后的数据进行预测
Args:
inputs(tuple): 预处理过后的数据
"""
for k, v in inputs.items():
try:
tensor = self.predictor.get_input_tensor(k)
except:
continue
tensor.copy_from_cpu(v)
self.predictor.zero_copy_run()
output_names = self.predictor.get_output_names()
output_results = list()
for name in output_names:
output_tensor = self.predictor.get_output_tensor(name)
output_results.append(output_tensor.copy_to_cpu())
return output_results
def classifier_postprocess(self, preds, topk=1):
""" 对分类模型的预测结果做后处理
"""
true_topk = min(self.num_classes, topk)
pred_label = np.argsort(preds[0][0])[::-1][:true_topk]
result = [{
'category_id': l,
'category': self.labels[l],
'score': preds[0][0, l],
} for l in pred_label]
return result
def segmenter_postprocess(self, preds, preprocessed_inputs):
""" 对语义分割结果做后处理
"""
label_map = np.squeeze(preds[0]).astype('uint8')
score_map = np.squeeze(preds[1])
score_map = np.transpose(score_map, (1, 2, 0))
im_info = preprocessed_inputs['im_info']
for info in im_info[::-1]:
if info[0] == 'resize':
w, h = info[1][1], info[1][0]
label_map = cv2.resize(label_map, (w, h), cv2.INTER_NEAREST)
score_map = cv2.resize(score_map, (w, h), cv2.INTER_LINEAR)
elif info[0] == 'padding':
w, h = info[1][1], info[1][0]
label_map = label_map[0:h, 0:w]
score_map = score_map[0:h, 0:w, :]
else:
raise Exception("Unexpected info '{}' in im_info".format(info[
0]))
return {'label_map': label_map, 'score_map': score_map}
def detector_postprocess(self, preds, preprocessed_inputs):
""" 对目标检测和实例分割结果做后处理
"""
bboxes = {'bbox': (np.array(preds[0]), [[len(preds[0])]])}
bboxes['im_id'] = (np.array([[0]]).astype('int32'), [])
clsid2catid = dict({i: i for i in range(self.num_classes)})
xywh_results = paddlex.cv.models.utils.detection_eval.bbox2out(
[bboxes], clsid2catid)
results = list()
for xywh_res in xywh_results:
del xywh_res['image_id']
xywh_res['category'] = self.labels[xywh_res['category_id']]
results.append(xywh_res)
if len(preds) > 1:
im_shape = preprocessed_inputs['im_shape']
bboxes['im_shape'] = (im_shape, [])
bboxes['mask'] = (np.array(preds[1]), [[len(preds[1])]])
segm_results = paddlex.cv.models.utils.detection_eval.mask2out(
[bboxes], clsid2catid, self.mask_head_resolution)
import pycocotools.mask as mask_util
for i in range(len(results)):
results[i]['mask'] = mask_util.decode(segm_results[i][
'segmentation'])
return results
def predict(self, image, topk=1, threshold=0.5):
""" 图片预测
Args:
image(str|np.ndarray): 图片路径或np.ndarray格式,如果后者,要求为BGR输入格式
topk(int): 分类预测时使用,表示预测前topk的结果
"""
preprocessed_input = self.preprocess(image)
model_pred = self.raw_predict(preprocessed_input)
if self.model_type == "classifier":
results = self.classifier_postprocess(model_pred, topk)
elif self.model_type == "detector":
results = self.detector_postprocess(model_pred, preprocessed_input)
elif self.model_type == "segmenter":
results = self.segmenter_postprocess(model_pred,
preprocessed_input)
return results
# copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
from . import visualize
visualize = visualize.visualize
\ No newline at end of file
# copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve.
#
#Licensed under the Apache License, Version 2.0 (the "License");
#you may not use this file except in compliance with the License.
#You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#Unless required by applicable law or agreed to in writing, software
#distributed under the License is distributed on an "AS IS" BASIS,
#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#See the License for the specific language governing permissions and
#limitations under the License.
import os
def _find_classes(dir):
# Faster and available in Python 3.5 and above
classes = [d.name for d in os.scandir(dir) if d.is_dir()]
classes.sort()
class_to_idx = {classes[i]: i for i in range(len(classes))}
return classes, class_to_idx
\ No newline at end of file
#copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve.
#
#Licensed under the Apache License, Version 2.0 (the "License");
#you may not use this file except in compliance with the License.
#You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#Unless required by applicable law or agreed to in writing, software
#distributed under the License is distributed on an "AS IS" BASIS,
#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#See the License for the specific language governing permissions and
#limitations under the License.
import os
import sys
import cv2
import numpy as np
import six
import glob
from .data_path_utils import _find_classes
from PIL import Image
def resize_short(img, target_size, interpolation=None):
"""resize image
Args:
img: image data
target_size: resize short target size
interpolation: interpolation mode
Returns:
resized image data
"""
percent = float(target_size) / min(img.shape[0], img.shape[1])
resized_width = int(round(img.shape[1] * percent))
resized_height = int(round(img.shape[0] * percent))
if interpolation:
resized = cv2.resize(
img, (resized_width, resized_height), interpolation=interpolation)
else:
resized = cv2.resize(img, (resized_width, resized_height))
return resized
def crop_image(img, target_size, center=True):
"""crop image
Args:
img: images data
target_size: crop target size
center: crop mode
Returns:
img: cropped image data
"""
height, width = img.shape[:2]
size = target_size
if center:
w_start = (width - size) // 2
h_start = (height - size) // 2
else:
w_start = np.random.randint(0, width - size + 1)
h_start = np.random.randint(0, height - size + 1)
w_end = w_start + size
h_end = h_start + size
img = img[h_start:h_end, w_start:w_end, :]
return img
def preprocess_image(img, random_mirror=False):
"""
centered, scaled by 1/255.
:param img: np.array: shape: [ns, h, w, 3], color order: rgb.
:return: np.array: shape: [ns, h, w, 3]
"""
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]
# transpose to [ns, 3, h, w]
img = img.astype('float32').transpose((0, 3, 1, 2)) / 255
img_mean = np.array(mean).reshape((3, 1, 1))
img_std = np.array(std).reshape((3, 1, 1))
img -= img_mean
img /= img_std
if random_mirror:
mirror = int(np.random.uniform(0, 2))
if mirror == 1:
img = img[:, :, ::-1, :]
return img
def read_image(img_path, target_size=256, crop_size=224):
"""
resize_short to 256, then center crop to 224.
:param img_path: one image path
:return: np.array: shape: [1, h, w, 3], color order: rgb.
"""
if isinstance(img_path, str):
with open(img_path, 'rb') as f:
img = Image.open(f)
img = img.convert('RGB')
img = np.array(img)
# img = cv2.imread(img_path)
img = resize_short(img, target_size, interpolation=None)
img = crop_image(img, target_size=crop_size, center=True)
# img = img[:, :, ::-1]
img = np.expand_dims(img, axis=0)
return img
elif isinstance(img_path, np.ndarray):
assert len(img_path.shape) == 4
return img_path
else:
ValueError(f"Not recognized data type {type(img_path)}.")
class ReaderConfig(object):
"""
A generic data loader where the images are arranged in this way:
root/train/dog/xxy.jpg
root/train/dog/xxz.jpg
...
root/train/cat/nsdf3.jpg
root/train/cat/asd932_.jpg
...
root/test/dog/xxx.jpg
...
root/test/cat/123.jpg
...
"""
def __init__(self, dataset_dir, is_test):
image_paths, labels, self.num_classes = self.get_dataset_info(dataset_dir, is_test)
random_per = np.random.permutation(range(len(image_paths)))
self.image_paths = image_paths[random_per]
self.labels = labels[random_per]
self.is_test = is_test
def get_reader(self):
def reader():
IMG_EXTENSIONS = ('.jpg', '.jpeg', '.png', '.ppm', '.bmp', '.pgm', '.tif', '.tiff', '.webp')
target_size = 256
crop_size = 224
for i, img_path in enumerate(self.image_paths):
if not img_path.lower().endswith(IMG_EXTENSIONS):
continue
img = cv2.imread(img_path)
if img is None:
print(img_path)
continue
img = resize_short(img, target_size, interpolation=None)
img = crop_image(img, crop_size, center=self.is_test)
img = img[:, :, ::-1]
img = np.expand_dims(img, axis=0)
img = preprocess_image(img, not self.is_test)
yield img, self.labels[i]
return reader
def get_dataset_info(self, dataset_dir, is_test=False):
IMG_EXTENSIONS = ('.jpg', '.jpeg', '.png', '.ppm', '.bmp', '.pgm', '.tif', '.tiff', '.webp')
# read
if is_test:
datasubset_dir = os.path.join(dataset_dir, 'test')
else:
datasubset_dir = os.path.join(dataset_dir, 'train')
class_names, class_to_idx = _find_classes(datasubset_dir)
# num_classes = len(class_names)
image_paths = []
labels = []
for class_name in class_names:
classes_dir = os.path.join(datasubset_dir, class_name)
for img_path in glob.glob(os.path.join(classes_dir, '*')):
if not img_path.lower().endswith(IMG_EXTENSIONS):
continue
image_paths.append(img_path)
labels.append(class_to_idx[class_name])
image_paths = np.array(image_paths)
labels = np.array(labels)
return image_paths, labels, len(class_names)
def create_reader(list_image_path, list_label=None, is_test=False):
def reader():
IMG_EXTENSIONS = ('.jpg', '.jpeg', '.png', '.ppm', '.bmp', '.pgm', '.tif', '.tiff', '.webp')
target_size = 256
crop_size = 224
for i, img_path in enumerate(list_image_path):
if not img_path.lower().endswith(IMG_EXTENSIONS):
continue
img = cv2.imread(img_path)
if img is None:
print(img_path)
continue
img = resize_short(img, target_size, interpolation=None)
img = crop_image(img, crop_size, center=is_test)
img = img[:, :, ::-1]
img_show = np.expand_dims(img, axis=0)
img = preprocess_image(img_show, not is_test)
label = 0 if list_label is None else list_label[i]
yield img_show, img, label
return reader
\ No newline at end of file
# copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve.
#
#Licensed under the Apache License, Version 2.0 (the "License");
#you may not use this file except in compliance with the License.
#You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#Unless required by applicable law or agreed to in writing, software
#distributed under the License is distributed on an "AS IS" BASIS,
#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#See the License for the specific language governing permissions and
#limitations under the License.
import os
import paddle.fluid as fluid
import numpy as np
from paddle.fluid.param_attr import ParamAttr
from ..as_data_reader.readers import preprocess_image
root_path = os.environ['HOME']
root_path = os.path.join(root_path, '.paddlex')
h_pre_models = os.path.join(root_path, "pre_models")
h_pre_models_kmeans = os.path.join(h_pre_models, "kmeans_model.pkl")
def paddle_get_fc_weights(var_name="fc_0.w_0"):
fc_weights = fluid.global_scope().find_var(var_name).get_tensor()
return np.array(fc_weights)
def paddle_resize(extracted_features, outsize):
resized_features = fluid.layers.resize_bilinear(extracted_features, outsize)
return resized_features
def compute_features_for_kmeans(data_content):
def conv_bn_layer(input,
num_filters,
filter_size,
stride=1,
groups=1,
act=None,
name=None,
is_test=True,
global_name=''):
conv = fluid.layers.conv2d(
input=input,
num_filters=num_filters,
filter_size=filter_size,
stride=stride,
padding=(filter_size - 1) // 2,
groups=groups,
act=None,
param_attr=ParamAttr(name=global_name + name + "_weights"),
bias_attr=False,
name=global_name + name + '.conv2d.output.1')
if name == "conv1":
bn_name = "bn_" + name
else:
bn_name = "bn" + name[3:]
return fluid.layers.batch_norm(
input=conv,
act=act,
name=global_name + bn_name + '.output.1',
param_attr=ParamAttr(global_name + bn_name + '_scale'),
bias_attr=ParamAttr(global_name + bn_name + '_offset'),
moving_mean_name=global_name + bn_name + '_mean',
moving_variance_name=global_name + bn_name + '_variance',
use_global_stats=is_test
)
startup_prog = fluid.default_startup_program().clone(for_test=True)
prog = fluid.Program()
with fluid.program_guard(prog, startup_prog):
with fluid.unique_name.guard():
image_op = fluid.data(name='image', shape=[None, 3, 224, 224], dtype='float32')
conv = conv_bn_layer(
input=image_op,
num_filters=32,
filter_size=3,
stride=2,
act='relu',
name='conv1_1')
conv = conv_bn_layer(
input=conv,
num_filters=32,
filter_size=3,
stride=1,
act='relu',
name='conv1_2')
conv = conv_bn_layer(
input=conv,
num_filters=64,
filter_size=3,
stride=1,
act='relu',
name='conv1_3')
extracted_features = conv
resized_features = fluid.layers.resize_bilinear(extracted_features, image_op.shape[2:])
gpu_id = int(os.environ.get('FLAGS_selected_gpus', 0))
place = fluid.CUDAPlace(gpu_id)
# place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(startup_prog)
fluid.io.load_persistables(exe, h_pre_models, prog)
images = preprocess_image(data_content) # transpose to [N, 3, H, W], scaled to [0.0, 1.0]
result = exe.run(prog, fetch_list=[resized_features], feed={'image': images})
return result[0][0]
\ No newline at end of file
#copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve.
#
#Licensed under the Apache License, Version 2.0 (the "License");
#you may not use this file except in compliance with the License.
#You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#Unless required by applicable law or agreed to in writing, software
#distributed under the License is distributed on an "AS IS" BASIS,
#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#See the License for the specific language governing permissions and
#limitations under the License.
from .interpretation_algorithms import CAM, LIME, NormLIME
from .normlime_base import precompute_normlime_weights
class Interpretation(object):
"""
Base class for all interpretation algorithms.
"""
def __init__(self, interpretation_algorithm_name, predict_fn, label_names, **kwargs):
supported_algorithms = {
'cam': CAM,
'lime': LIME,
'normlime': NormLIME
}
self.algorithm_name = interpretation_algorithm_name.lower()
assert self.algorithm_name in supported_algorithms.keys()
self.predict_fn = predict_fn
# initialization for the interpretation algorithm.
self.algorithm = supported_algorithms[self.algorithm_name](
self.predict_fn, label_names, **kwargs
)
def interpret(self, data_, visualization=True, save_to_disk=True, save_dir='./tmp'):
"""
Args:
data_: data_ can be a path or numpy.ndarray.
visualization: whether to show using matplotlib.
save_to_disk: whether to save the figure in local disk.
save_dir: dir to save figure if save_to_disk is True.
Returns:
"""
return self.algorithm.interpret(data_, visualization, save_to_disk, save_dir)
#copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve.
#
#Licensed under the Apache License, Version 2.0 (the "License");
#you may not use this file except in compliance with the License.
#You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#Unless required by applicable law or agreed to in writing, software
#distributed under the License is distributed on an "AS IS" BASIS,
#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#See the License for the specific language governing permissions and
#limitations under the License.
import os
import numpy as np
import time
from . import lime_base
from ..as_data_reader.readers import read_image
from ._session_preparation import paddle_get_fc_weights, compute_features_for_kmeans, h_pre_models_kmeans
from .normlime_base import combine_normlime_and_lime, get_feature_for_kmeans, load_kmeans_model
import cv2
class CAM(object):
def __init__(self, predict_fn, label_names):
"""
Args:
predict_fn: input: images_show [N, H, W, 3], RGB range(0, 255)
output: [
logits [N, num_classes],
feature map before global average pooling [N, num_channels, h_, w_]
]
"""
self.predict_fn = predict_fn
self.label_names = label_names
def preparation_cam(self, data_):
image_show = read_image(data_)
result = self.predict_fn(image_show)
logit = result[0][0]
if abs(np.sum(logit) - 1.0) > 1e-4:
# softmax
logit = logit - np.max(logit)
exp_result = np.exp(logit)
probability = exp_result / np.sum(exp_result)
else:
probability = logit
# only interpret top 1
pred_label = np.argsort(probability)
pred_label = pred_label[-1:]
self.predicted_label = pred_label[0]
self.predicted_probability = probability[pred_label[0]]
self.image = image_show[0]
self.labels = pred_label
fc_weights = paddle_get_fc_weights()
feature_maps = result[1]
l = pred_label[0]
ln = l
if self.label_names is not None:
ln = self.label_names[l]
print(f'predicted result: {ln} with probability {probability[pred_label[0]]:.3f}')
return feature_maps, fc_weights
def interpret(self, data_, visualization=True, save_to_disk=True, save_outdir=None):
feature_maps, fc_weights = self.preparation_cam(data_)
cam = get_cam(self.image, feature_maps, fc_weights, self.predicted_label)
if visualization or save_to_disk:
import matplotlib.pyplot as plt
from skimage.segmentation import mark_boundaries
l = self.labels[0]
ln = l
if self.label_names is not None:
ln = self.label_names[l]
psize = 5
nrows = 1
ncols = 2
plt.close()
f, axes = plt.subplots(nrows, ncols, figsize=(psize * ncols, psize * nrows))
for ax in axes.ravel():
ax.axis("off")
axes = axes.ravel()
axes[0].imshow(self.image)
axes[0].set_title(f"label {ln}, proba: {self.predicted_probability: .3f}")
axes[1].imshow(cam)
axes[1].set_title("CAM")
if save_to_disk and save_outdir is not None:
os.makedirs(save_outdir, exist_ok=True)
save_fig(data_, save_outdir, 'cam')
if visualization:
plt.show()
return
class LIME(object):
def __init__(self, predict_fn, label_names, num_samples=3000, batch_size=50):
"""
LIME wrapper. See lime_base.py for the detailed LIME implementation.
Args:
predict_fn: from image [N, H, W, 3] to logits [N, num_classes], this is necessary for computing LIME.
num_samples: the number of samples that LIME takes for fitting.
batch_size: batch size for model inference each time.
"""
self.num_samples = num_samples
self.batch_size = batch_size
self.predict_fn = predict_fn
self.labels = None
self.image = None
self.lime_interpreter = None
self.label_names = label_names
def preparation_lime(self, data_):
image_show = read_image(data_)
result = self.predict_fn(image_show)
result = result[0] # only one image here.
if abs(np.sum(result) - 1.0) > 1e-4:
# softmax
result = result - np.max(result)
exp_result = np.exp(result)
probability = exp_result / np.sum(exp_result)
else:
probability = result
# only interpret top 1
pred_label = np.argsort(probability)
pred_label = pred_label[-1:]
self.predicted_label = pred_label[0]
self.predicted_probability = probability[pred_label[0]]
self.image = image_show[0]
self.labels = pred_label
l = pred_label[0]
ln = l
if self.label_names is not None:
ln = self.label_names[l]
print(f'predicted result: {ln} with probability {probability[pred_label[0]]:.3f}')
end = time.time()
algo = lime_base.LimeImageInterpreter()
interpreter = algo.interpret_instance(self.image, self.predict_fn, self.labels, 0,
num_samples=self.num_samples, batch_size=self.batch_size)
self.lime_interpreter = interpreter
print('lime time: ', time.time() - end, 's.')
def interpret(self, data_, visualization=True, save_to_disk=True, save_outdir=None):
if self.lime_interpreter is None:
self.preparation_lime(data_)
if visualization or save_to_disk:
import matplotlib.pyplot as plt
from skimage.segmentation import mark_boundaries
l = self.labels[0]
ln = l
if self.label_names is not None:
ln = self.label_names[l]
psize = 5
nrows = 2
weights_choices = [0.6, 0.7, 0.75, 0.8, 0.85]
ncols = len(weights_choices)
plt.close()
f, axes = plt.subplots(nrows, ncols, figsize=(psize * ncols, psize * nrows))
for ax in axes.ravel():
ax.axis("off")
axes = axes.ravel()
axes[0].imshow(self.image)
axes[0].set_title(f"label {ln}, proba: {self.predicted_probability: .3f}")
axes[1].imshow(mark_boundaries(self.image, self.lime_interpreter.segments))
axes[1].set_title("superpixel segmentation")
# LIME visualization
for i, w in enumerate(weights_choices):
num_to_show = auto_choose_num_features_to_show(self.lime_interpreter, l, w)
temp, mask = self.lime_interpreter.get_image_and_mask(
l, positive_only=False, hide_rest=False, num_features=num_to_show
)
axes[ncols + i].imshow(mark_boundaries(temp, mask))
axes[ncols + i].set_title(f"label {ln}, first {num_to_show} superpixels")
if save_to_disk and save_outdir is not None:
os.makedirs(save_outdir, exist_ok=True)
save_fig(data_, save_outdir, 'lime', self.num_samples)
if visualization:
plt.show()
return
class NormLIME(object):
def __init__(self, predict_fn, label_names, num_samples=3000, batch_size=50,
kmeans_model_for_normlime=None, normlime_weights=None):
if kmeans_model_for_normlime is None:
try:
self.kmeans_model = load_kmeans_model(h_pre_models_kmeans)
except:
raise ValueError("NormLIME needs the KMeans model, where we provided a default one in "
"pre_models/kmeans_model.pkl.")
else:
print("Warning: It is *strongly* suggested to use the default KMeans model in pre_models/kmeans_model.pkl. "
"Use another one will change the final result.")
self.kmeans_model = load_kmeans_model(kmeans_model_for_normlime)
self.num_samples = num_samples
self.batch_size = batch_size
try:
self.normlime_weights = np.load(normlime_weights, allow_pickle=True).item()
except:
self.normlime_weights = None
print("Warning: not find the correct precomputed Normlime result.")
self.predict_fn = predict_fn
self.labels = None
self.image = None
self.label_names = label_names
def predict_cluster_labels(self, feature_map, segments):
return self.kmeans_model.predict(get_feature_for_kmeans(feature_map, segments))
def predict_using_normlime_weights(self, pred_labels, predicted_cluster_labels):
# global weights
g_weights = {y: [] for y in pred_labels}
for y in pred_labels:
cluster_weights_y = self.normlime_weights.get(y, {})
g_weights[y] = [
(i, cluster_weights_y.get(k, 0.0)) for i, k in enumerate(predicted_cluster_labels)
]
g_weights[y] = sorted(g_weights[y],
key=lambda x: np.abs(x[1]), reverse=True)
return g_weights
def preparation_normlime(self, data_):
self._lime = LIME(
self.predict_fn,
self.label_names,
self.num_samples,
self.batch_size
)
self._lime.preparation_lime(data_)
image_show = read_image(data_)
self.predicted_label = self._lime.predicted_label
self.predicted_probability = self._lime.predicted_probability
self.image = image_show[0]
self.labels = self._lime.labels
# print(f'predicted result: {self.predicted_label} with probability {self.predicted_probability: .3f}')
print('performing NormLIME operations ...')
cluster_labels = self.predict_cluster_labels(
compute_features_for_kmeans(image_show).transpose((1, 2, 0)), self._lime.lime_interpreter.segments
)
g_weights = self.predict_using_normlime_weights(self.labels, cluster_labels)
return g_weights
def interpret(self, data_, visualization=True, save_to_disk=True, save_outdir=None):
if self.normlime_weights is None:
raise ValueError("Not find the correct precomputed NormLIME result. \n"
"\t Try to call compute_normlime_weights() first or load the correct path.")
g_weights = self.preparation_normlime(data_)
lime_weights = self._lime.lime_interpreter.local_weights
if visualization or save_to_disk:
import matplotlib.pyplot as plt
from skimage.segmentation import mark_boundaries
l = self.labels[0]
ln = l
if self.label_names is not None:
ln = self.label_names[l]
psize = 5
nrows = 4
weights_choices = [0.6, 0.7, 0.75, 0.8, 0.85]
nums_to_show = []
ncols = len(weights_choices)
plt.close()
f, axes = plt.subplots(nrows, ncols, figsize=(psize * ncols, psize * nrows))
for ax in axes.ravel():
ax.axis("off")
axes = axes.ravel()
axes[0].imshow(self.image)
axes[0].set_title(f"label {ln}, proba: {self.predicted_probability: .3f}")
axes[1].imshow(mark_boundaries(self.image, self._lime.lime_interpreter.segments))
axes[1].set_title("superpixel segmentation")
# LIME visualization
for i, w in enumerate(weights_choices):
num_to_show = auto_choose_num_features_to_show(self._lime.lime_interpreter, l, w)
nums_to_show.append(num_to_show)
temp, mask = self._lime.lime_interpreter.get_image_and_mask(
l, positive_only=False, hide_rest=False, num_features=num_to_show
)
axes[ncols + i].imshow(mark_boundaries(temp, mask))
axes[ncols + i].set_title(f"LIME: first {num_to_show} superpixels")
# NormLIME visualization
self._lime.lime_interpreter.local_weights = g_weights
for i, num_to_show in enumerate(nums_to_show):
temp, mask = self._lime.lime_interpreter.get_image_and_mask(
l, positive_only=False, hide_rest=False, num_features=num_to_show
)
axes[ncols * 2 + i].imshow(mark_boundaries(temp, mask))
axes[ncols * 2 + i].set_title(f"NormLIME: first {num_to_show} superpixels")
# NormLIME*LIME visualization
combined_weights = combine_normlime_and_lime(lime_weights, g_weights)
self._lime.lime_interpreter.local_weights = combined_weights
for i, num_to_show in enumerate(nums_to_show):
temp, mask = self._lime.lime_interpreter.get_image_and_mask(
l, positive_only=False, hide_rest=False, num_features=num_to_show
)
axes[ncols * 3 + i].imshow(mark_boundaries(temp, mask))
axes[ncols * 3 + i].set_title(f"Combined: first {num_to_show} superpixels")
self._lime.lime_interpreter.local_weights = lime_weights
if save_to_disk and save_outdir is not None:
os.makedirs(save_outdir, exist_ok=True)
save_fig(data_, save_outdir, 'normlime', self.num_samples)
if visualization:
plt.show()
def auto_choose_num_features_to_show(lime_interpreter, label, percentage_to_show):
segments = lime_interpreter.segments
lime_weights = lime_interpreter.local_weights[label]
num_pixels_threshold_in_a_sp = segments.shape[0] * segments.shape[1] // len(np.unique(segments)) // 8
# l1 norm with filtered weights.
used_weights = [(tuple_w[0], tuple_w[1]) for i, tuple_w in enumerate(lime_weights) if tuple_w[1] > 0]
norm = np.sum([tuple_w[1] for i, tuple_w in enumerate(used_weights)])
normalized_weights = [(tuple_w[0], tuple_w[1] / norm) for i, tuple_w in enumerate(lime_weights)]
a = 0.0
n = 0
for i, tuple_w in enumerate(normalized_weights):
if tuple_w[1] < 0:
continue
if len(np.where(segments == tuple_w[0])[0]) < num_pixels_threshold_in_a_sp:
continue
a += tuple_w[1]
if a > percentage_to_show:
n = i + 1
break
if percentage_to_show <= 0.0:
return 5
if n == 0:
return auto_choose_num_features_to_show(lime_interpreter, label, percentage_to_show-0.1)
return n
def get_cam(image_show, feature_maps, fc_weights, label_index, cam_min=None, cam_max=None):
_, nc, h, w = feature_maps.shape
cam = feature_maps * fc_weights[:, label_index].reshape(1, nc, 1, 1)
cam = cam.sum((0, 1))
if cam_min is None:
cam_min = np.min(cam)
if cam_max is None:
cam_max = np.max(cam)
cam = cam - cam_min
cam = cam / cam_max
cam = np.uint8(255 * cam)
cam_img = cv2.resize(cam, image_show.shape[0:2], interpolation=cv2.INTER_LINEAR)
heatmap = cv2.applyColorMap(np.uint8(255 * cam_img), cv2.COLORMAP_JET)
heatmap = np.float32(heatmap)
cam = heatmap + np.float32(image_show)
cam = cam / np.max(cam)
return cam
def save_fig(data_, save_outdir, algorithm_name, num_samples=3000):
import matplotlib.pyplot as plt
if isinstance(data_, str):
if algorithm_name == 'cam':
f_out = f"{algorithm_name}_{data_.split('/')[-1]}.png"
else:
f_out = f"{algorithm_name}_{data_.split('/')[-1]}_s{num_samples}.png"
plt.savefig(
os.path.join(save_outdir, f_out)
)
else:
n = 0
if algorithm_name == 'cam':
f_out = f'cam-{n}.png'
else:
f_out = f'{algorithm_name}_s{num_samples}-{n}.png'
while os.path.exists(
os.path.join(save_outdir, f_out)
):
n += 1
if algorithm_name == 'cam':
f_out = f'cam-{n}.png'
else:
f_out = f'{algorithm_name}_s{num_samples}-{n}.png'
continue
plt.savefig(
os.path.join(
save_outdir, f_out
)
)
"""
Copyright (c) 2016, Marco Tulio Correia Ribeiro
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
"""
The code in this file (lime_base.py) is modified from https://github.com/marcotcr/lime.
"""
import numpy as np
import scipy as sp
import sklearn
import sklearn.preprocessing
from skimage.color import gray2rgb
from sklearn.linear_model import Ridge, lars_path
from sklearn.utils import check_random_state
import copy
from functools import partial
from skimage.segmentation import quickshift
from skimage.measure import regionprops
class LimeBase(object):
"""Class for learning a locally linear sparse model from perturbed data"""
def __init__(self,
kernel_fn,
verbose=False,
random_state=None):
"""Init function
Args:
kernel_fn: function that transforms an array of distances into an
array of proximity values (floats).
verbose: if true, print local prediction values from linear model.
random_state: an integer or numpy.RandomState that will be used to
generate random numbers. If None, the random state will be
initialized using the internal numpy seed.
"""
self.kernel_fn = kernel_fn
self.verbose = verbose
self.random_state = check_random_state(random_state)
@staticmethod
def generate_lars_path(weighted_data, weighted_labels):
"""Generates the lars path for weighted data.
Args:
weighted_data: data that has been weighted by kernel
weighted_label: labels, weighted by kernel
Returns:
(alphas, coefs), both are arrays corresponding to the
regularization parameter and coefficients, respectively
"""
x_vector = weighted_data
alphas, _, coefs = lars_path(x_vector,
weighted_labels,
method='lasso',
verbose=False)
return alphas, coefs
def forward_selection(self, data, labels, weights, num_features):
"""Iteratively adds features to the model"""
clf = Ridge(alpha=0, fit_intercept=True, random_state=self.random_state)
used_features = []
for _ in range(min(num_features, data.shape[1])):
max_ = -100000000
best = 0
for feature in range(data.shape[1]):
if feature in used_features:
continue
clf.fit(data[:, used_features + [feature]], labels,
sample_weight=weights)
score = clf.score(data[:, used_features + [feature]],
labels,
sample_weight=weights)
if score > max_:
best = feature
max_ = score
used_features.append(best)
return np.array(used_features)
def feature_selection(self, data, labels, weights, num_features, method):
"""Selects features for the model. see interpret_instance_with_data to
understand the parameters."""
if method == 'none':
return np.array(range(data.shape[1]))
elif method == 'forward_selection':
return self.forward_selection(data, labels, weights, num_features)
elif method == 'highest_weights':
clf = Ridge(alpha=0.01, fit_intercept=True,
random_state=self.random_state)
clf.fit(data, labels, sample_weight=weights)
coef = clf.coef_
if sp.sparse.issparse(data):
coef = sp.sparse.csr_matrix(clf.coef_)
weighted_data = coef.multiply(data[0])
# Note: most efficient to slice the data before reversing
sdata = len(weighted_data.data)
argsort_data = np.abs(weighted_data.data).argsort()
# Edge case where data is more sparse than requested number of feature importances
# In that case, we just pad with zero-valued features
if sdata < num_features:
nnz_indexes = argsort_data[::-1]
indices = weighted_data.indices[nnz_indexes]
num_to_pad = num_features - sdata
indices = np.concatenate((indices, np.zeros(num_to_pad, dtype=indices.dtype)))
indices_set = set(indices)
pad_counter = 0
for i in range(data.shape[1]):
if i not in indices_set:
indices[pad_counter + sdata] = i
pad_counter += 1
if pad_counter >= num_to_pad:
break
else:
nnz_indexes = argsort_data[sdata - num_features:sdata][::-1]
indices = weighted_data.indices[nnz_indexes]
return indices
else:
weighted_data = coef * data[0]
feature_weights = sorted(
zip(range(data.shape[1]), weighted_data),
key=lambda x: np.abs(x[1]),
reverse=True)
return np.array([x[0] for x in feature_weights[:num_features]])
elif method == 'lasso_path':
weighted_data = ((data - np.average(data, axis=0, weights=weights))
* np.sqrt(weights[:, np.newaxis]))
weighted_labels = ((labels - np.average(labels, weights=weights))
* np.sqrt(weights))
nonzero = range(weighted_data.shape[1])
_, coefs = self.generate_lars_path(weighted_data,
weighted_labels)
for i in range(len(coefs.T) - 1, 0, -1):
nonzero = coefs.T[i].nonzero()[0]
if len(nonzero) <= num_features:
break
used_features = nonzero
return used_features
elif method == 'auto':
if num_features <= 6:
n_method = 'forward_selection'
else:
n_method = 'highest_weights'
return self.feature_selection(data, labels, weights,
num_features, n_method)
def interpret_instance_with_data(self,
neighborhood_data,
neighborhood_labels,
distances,
label,
num_features,
feature_selection='auto',
model_regressor=None):
"""Takes perturbed data, labels and distances, returns interpretation.
Args:
neighborhood_data: perturbed data, 2d array. first element is
assumed to be the original data point.
neighborhood_labels: corresponding perturbed labels. should have as
many columns as the number of possible labels.
distances: distances to original data point.
label: label for which we want an interpretation
num_features: maximum number of features in interpretation
feature_selection: how to select num_features. options are:
'forward_selection': iteratively add features to the model.
This is costly when num_features is high
'highest_weights': selects the features that have the highest
product of absolute weight * original data point when
learning with all the features
'lasso_path': chooses features based on the lasso
regularization path
'none': uses all features, ignores num_features
'auto': uses forward_selection if num_features <= 6, and
'highest_weights' otherwise.
model_regressor: sklearn regressor to use in interpretation.
Defaults to Ridge regression if None. Must have
model_regressor.coef_ and 'sample_weight' as a parameter
to model_regressor.fit()
Returns:
(intercept, exp, score, local_pred):
intercept is a float.
exp is a sorted list of tuples, where each tuple (x,y) corresponds
to the feature id (x) and the local weight (y). The list is sorted
by decreasing absolute value of y.
score is the R^2 value of the returned interpretation
local_pred is the prediction of the interpretation model on the original instance
"""
weights = self.kernel_fn(distances)
labels_column = neighborhood_labels[:, label]
used_features = self.feature_selection(neighborhood_data,
labels_column,
weights,
num_features,
feature_selection)
if model_regressor is None:
model_regressor = Ridge(alpha=1, fit_intercept=True,
random_state=self.random_state)
easy_model = model_regressor
easy_model.fit(neighborhood_data[:, used_features],
labels_column, sample_weight=weights)
prediction_score = easy_model.score(
neighborhood_data[:, used_features],
labels_column, sample_weight=weights)
local_pred = easy_model.predict(neighborhood_data[0, used_features].reshape(1, -1))
if self.verbose:
print('Intercept', easy_model.intercept_)
print('Prediction_local', local_pred,)
print('Right:', neighborhood_labels[0, label])
return (easy_model.intercept_,
sorted(zip(used_features, easy_model.coef_),
key=lambda x: np.abs(x[1]), reverse=True),
prediction_score, local_pred)
class ImageInterpretation(object):
def __init__(self, image, segments):
"""Init function.
Args:
image: 3d numpy array
segments: 2d numpy array, with the output from skimage.segmentation
"""
self.image = image
self.segments = segments
self.intercept = {}
self.local_weights = {}
self.local_pred = None
def get_image_and_mask(self, label, positive_only=True, negative_only=False, hide_rest=False,
num_features=5, min_weight=0.):
"""Init function.
Args:
label: label to interpret
positive_only: if True, only take superpixels that positively contribute to
the prediction of the label.
negative_only: if True, only take superpixels that negatively contribute to
the prediction of the label. If false, and so is positive_only, then both
negativey and positively contributions will be taken.
Both can't be True at the same time
hide_rest: if True, make the non-interpretation part of the return
image gray
num_features: number of superpixels to include in interpretation
min_weight: minimum weight of the superpixels to include in interpretation
Returns:
(image, mask), where image is a 3d numpy array and mask is a 2d
numpy array that can be used with
skimage.segmentation.mark_boundaries
"""
if label not in self.local_weights:
raise KeyError('Label not in interpretation')
if positive_only & negative_only:
raise ValueError("Positive_only and negative_only cannot be true at the same time.")
segments = self.segments
image = self.image
local_weights_label = self.local_weights[label]
mask = np.zeros(segments.shape, segments.dtype)
if hide_rest:
temp = np.zeros(self.image.shape)
else:
temp = self.image.copy()
if positive_only:
fs = [x[0] for x in local_weights_label
if x[1] > 0 and x[1] > min_weight][:num_features]
if negative_only:
fs = [x[0] for x in local_weights_label
if x[1] < 0 and abs(x[1]) > min_weight][:num_features]
if positive_only or negative_only:
for f in fs:
temp[segments == f] = image[segments == f].copy()
mask[segments == f] = 1
return temp, mask
else:
for f, w in local_weights_label[:num_features]:
if np.abs(w) < min_weight:
continue
c = 0 if w < 0 else 1
mask[segments == f] = -1 if w < 0 else 1
temp[segments == f] = image[segments == f].copy()
temp[segments == f, c] = np.max(image)
return temp, mask
def get_rendered_image(self, label, min_weight=0.005):
"""
Args:
label: label to interpret
min_weight:
Returns:
image, is a 3d numpy array
"""
if label not in self.local_weights:
raise KeyError('Label not in interpretation')
from matplotlib import cm
segments = self.segments
image = self.image
local_weights_label = self.local_weights[label]
temp = np.zeros_like(image)
weight_max = abs(local_weights_label[0][1])
local_weights_label = [(f, w/weight_max) for f, w in local_weights_label]
local_weights_label = sorted(local_weights_label, key=lambda x: x[1], reverse=True) # negatives are at last.
cmaps = cm.get_cmap('Spectral')
colors = cmaps(np.linspace(0, 1, len(local_weights_label)))
colors = colors[:, :3]
for i, (f, w) in enumerate(local_weights_label):
if np.abs(w) < min_weight:
continue
temp[segments == f] = image[segments == f].copy()
temp[segments == f] = colors[i] * 255
return temp
class LimeImageInterpreter(object):
"""Interpres predictions on Image (i.e. matrix) data.
For numerical features, perturb them by sampling from a Normal(0,1) and
doing the inverse operation of mean-centering and scaling, according to the
means and stds in the training data. For categorical features, perturb by
sampling according to the training distribution, and making a binary
feature that is 1 when the value is the same as the instance being
interpreted."""
def __init__(self, kernel_width=.25, kernel=None, verbose=False,
feature_selection='auto', random_state=None):
"""Init function.
Args:
kernel_width: kernel width for the exponential kernel.
If None, defaults to sqrt(number of columns) * 0.75.
kernel: similarity kernel that takes euclidean distances and kernel
width as input and outputs weights in (0,1). If None, defaults to
an exponential kernel.
verbose: if true, print local prediction values from linear model
feature_selection: feature selection method. can be
'forward_selection', 'lasso_path', 'none' or 'auto'.
See function 'einterpret_instance_with_data' in lime_base.py for
details on what each of the options does.
random_state: an integer or numpy.RandomState that will be used to
generate random numbers. If None, the random state will be
initialized using the internal numpy seed.
"""
kernel_width = float(kernel_width)
if kernel is None:
def kernel(d, kernel_width):
return np.sqrt(np.exp(-(d ** 2) / kernel_width ** 2))
kernel_fn = partial(kernel, kernel_width=kernel_width)
self.random_state = check_random_state(random_state)
self.feature_selection = feature_selection
self.base = LimeBase(kernel_fn, verbose, random_state=self.random_state)
def interpret_instance(self, image, classifier_fn, labels=(1,),
hide_color=None,
num_features=100000, num_samples=1000,
batch_size=10,
distance_metric='cosine',
model_regressor=None
):
"""Generates interpretations for a prediction.
First, we generate neighborhood data by randomly perturbing features
from the instance (see __data_inverse). We then learn locally weighted
linear models on this neighborhood data to interpret each of the classes
in an interpretable way (see lime_base.py).
Args:
image: 3 dimension RGB image. If this is only two dimensional,
we will assume it's a grayscale image and call gray2rgb.
classifier_fn: classifier prediction probability function, which
takes a numpy array and outputs prediction probabilities. For
ScikitClassifiers , this is classifier.predict_proba.
labels: iterable with labels to be interpreted.
hide_color: TODO
num_features: maximum number of features present in interpretation
num_samples: size of the neighborhood to learn the linear model
batch_size: TODO
distance_metric: the distance metric to use for weights.
model_regressor: sklearn regressor to use in interpretation. Defaults
to Ridge regression in LimeBase. Must have model_regressor.coef_
and 'sample_weight' as a parameter to model_regressor.fit()
Returns:
An ImageIinterpretation object (see lime_image.py) with the corresponding
interpretations.
"""
if len(image.shape) == 2:
image = gray2rgb(image)
try:
segments = quickshift(image, sigma=1)
except ValueError as e:
raise e
self.segments = segments
fudged_image = image.copy()
if hide_color is None:
# if no hide_color, use the mean
for x in np.unique(segments):
mx = np.mean(image[segments == x], axis=0)
fudged_image[segments == x] = mx
elif hide_color == 'avg_from_neighbor':
from scipy.spatial.distance import cdist
n_features = np.unique(segments).shape[0]
regions = regionprops(segments + 1)
centroids = np.zeros((n_features, 2))
for i, x in enumerate(regions):
centroids[i] = np.array(x.centroid)
d = cdist(centroids, centroids, 'sqeuclidean')
for x in np.unique(segments):
# print(np.argmin(d[x]))
a = [image[segments == i] for i in np.argsort(d[x])[1:6]]
mx = np.mean(np.concatenate(a), axis=0)
fudged_image[segments == x] = mx
else:
fudged_image[:] = 0
top = labels
data, labels = self.data_labels(image, fudged_image, segments,
classifier_fn, num_samples,
batch_size=batch_size)
distances = sklearn.metrics.pairwise_distances(
data,
data[0].reshape(1, -1),
metric=distance_metric
).ravel()
interpretation_image = ImageInterpretation(image, segments)
for label in top:
(interpretation_image.intercept[label],
interpretation_image.local_weights[label],
interpretation_image.score, interpretation_image.local_pred) = self.base.interpret_instance_with_data(
data, labels, distances, label, num_features,
model_regressor=model_regressor,
feature_selection=self.feature_selection)
return interpretation_image
def data_labels(self,
image,
fudged_image,
segments,
classifier_fn,
num_samples,
batch_size=10):
"""Generates images and predictions in the neighborhood of this image.
Args:
image: 3d numpy array, the image
fudged_image: 3d numpy array, image to replace original image when
superpixel is turned off
segments: segmentation of the image
classifier_fn: function that takes a list of images and returns a
matrix of prediction probabilities
num_samples: size of the neighborhood to learn the linear model
batch_size: classifier_fn will be called on batches of this size.
Returns:
A tuple (data, labels), where:
data: dense num_samples * num_superpixels
labels: prediction probabilities matrix
"""
n_features = np.unique(segments).shape[0]
data = self.random_state.randint(0, 2, num_samples * n_features) \
.reshape((num_samples, n_features))
labels = []
data[0, :] = 1
imgs = []
for row in data:
temp = copy.deepcopy(image)
zeros = np.where(row == 0)[0]
mask = np.zeros(segments.shape).astype(bool)
for z in zeros:
mask[segments == z] = True
temp[mask] = fudged_image[mask]
imgs.append(temp)
if len(imgs) == batch_size:
preds = classifier_fn(np.array(imgs))
labels.extend(preds)
imgs = []
if len(imgs) > 0:
preds = classifier_fn(np.array(imgs))
labels.extend(preds)
return data, np.array(labels)
#copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve.
#
#Licensed under the Apache License, Version 2.0 (the "License");
#you may not use this file except in compliance with the License.
#You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#Unless required by applicable law or agreed to in writing, software
#distributed under the License is distributed on an "AS IS" BASIS,
#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#See the License for the specific language governing permissions and
#limitations under the License.
import os
import numpy as np
import glob
from ..as_data_reader.readers import read_image
from . import lime_base
from ._session_preparation import compute_features_for_kmeans, h_pre_models_kmeans
def load_kmeans_model(fname):
import pickle
with open(fname, 'rb') as f:
kmeans_model = pickle.load(f)
return kmeans_model
def combine_normlime_and_lime(lime_weights, g_weights):
pred_labels = lime_weights.keys()
combined_weights = {y: [] for y in pred_labels}
for y in pred_labels:
normlized_lime_weights_y = lime_weights[y]
lime_weights_dict = {tuple_w[0]: tuple_w[1] for tuple_w in normlized_lime_weights_y}
normlized_g_weight_y = g_weights[y]
normlime_weights_dict = {tuple_w[0]: tuple_w[1] for tuple_w in normlized_g_weight_y}
combined_weights[y] = [
(seg_k, lime_weights_dict[seg_k] * normlime_weights_dict[seg_k])
for seg_k in lime_weights_dict.keys()
]
combined_weights[y] = sorted(combined_weights[y],
key=lambda x: np.abs(x[1]), reverse=True)
return combined_weights
def avg_using_superpixels(features, segments):
one_list = np.zeros((len(np.unique(segments)), features.shape[2]))
for x in np.unique(segments):
one_list[x] = np.mean(features[segments == x], axis=0)
return one_list
def centroid_using_superpixels(features, segments):
from skimage.measure import regionprops
regions = regionprops(segments + 1)
one_list = np.zeros((len(np.unique(segments)), features.shape[2]))
for i, r in enumerate(regions):
one_list[i] = features[int(r.centroid[0] + 0.5), int(r.centroid[1] + 0.5), :]
# print(one_list.shape)
return one_list
def get_feature_for_kmeans(feature_map, segments):
from sklearn.preprocessing import normalize
centroid_feature = centroid_using_superpixels(feature_map, segments)
avg_feature = avg_using_superpixels(feature_map, segments)
x = np.concatenate((centroid_feature, avg_feature), axis=-1)
x = normalize(x)
return x
def precompute_normlime_weights(list_data_, predict_fn, num_samples=3000, batch_size=50, save_dir='./tmp'):
# save lime weights and kmeans cluster labels
precompute_lime_weights(list_data_, predict_fn, num_samples, batch_size, save_dir)
# load precomputed results, compute normlime weights and save.
fname_list = glob.glob(os.path.join(save_dir, f'lime_weights_s{num_samples}*.npy'))
return compute_normlime_weights(fname_list, save_dir, num_samples)
def save_one_lime_predict_and_kmean_labels(lime_all_weights, image_pred_labels, cluster_labels, save_path):
lime_weights = {}
for label in image_pred_labels:
lime_weights[label] = lime_all_weights[label]
for_normlime_weights = {
'lime_weights': lime_weights, # a dict: class_label: (seg_label, weight)
'cluster': cluster_labels # a list with segments as indices.
}
np.save(save_path, for_normlime_weights)
def precompute_lime_weights(list_data_, predict_fn, num_samples, batch_size, save_dir):
kmeans_model = load_kmeans_model(h_pre_models_kmeans)
for data_index, each_data_ in enumerate(list_data_):
if isinstance(each_data_, str):
save_path = f"lime_weights_s{num_samples}_{each_data_.split('/')[-1].split('.')[0]}.npy"
save_path = os.path.join(save_dir, save_path)
else:
save_path = f"lime_weights_s{num_samples}_{data_index}.npy"
save_path = os.path.join(save_dir, save_path)
if os.path.exists(save_path):
print(f'{save_path} exists, not computing this one.')
continue
print('processing', each_data_ if isinstance(each_data_, str) else data_index,
f', {data_index}/{len(list_data_)}')
image_show = read_image(each_data_)
result = predict_fn(image_show)
result = result[0] # only one image here.
if abs(np.sum(result) - 1.0) > 1e-4:
# softmax
exp_result = np.exp(result)
probability = exp_result / np.sum(exp_result)
else:
probability = result
pred_label = np.argsort(probability)[::-1]
# top_k = argmin(top_n) > threshold
threshold = 0.05
top_k = 0
for l in pred_label:
if probability[l] < threshold or top_k == 5:
break
top_k += 1
if top_k == 0:
top_k = 1
pred_label = pred_label[:top_k]
algo = lime_base.LimeImageInterpreter()
interpreter = algo.interpret_instance(image_show[0], predict_fn, pred_label, 0,
num_samples=num_samples, batch_size=batch_size)
cluster_labels = kmeans_model.predict(
get_feature_for_kmeans(compute_features_for_kmeans(image_show).transpose((1, 2, 0)), interpreter.segments)
)
save_one_lime_predict_and_kmean_labels(
interpreter.local_weights, pred_label,
cluster_labels,
save_path
)
def compute_normlime_weights(a_list_lime_fnames, save_dir, lime_num_samples):
normlime_weights_all_labels = {}
for f in a_list_lime_fnames:
try:
lime_weights_and_cluster = np.load(f, allow_pickle=True).item()
lime_weights = lime_weights_and_cluster['lime_weights']
cluster = lime_weights_and_cluster['cluster']
except:
print('When loading precomputed LIME result, skipping', f)
continue
print('Loading precomputed LIME result,', f)
pred_labels = lime_weights.keys()
for y in pred_labels:
normlime_weights = normlime_weights_all_labels.get(y, {})
w_f_y = [abs(w[1]) for w in lime_weights[y]]
w_f_y_l1norm = sum(w_f_y)
for w in lime_weights[y]:
seg_label = w[0]
weight = w[1] * w[1] / w_f_y_l1norm
a = normlime_weights.get(cluster[seg_label], [])
a.append(weight)
normlime_weights[cluster[seg_label]] = a
normlime_weights_all_labels[y] = normlime_weights
# compute normlime
for y in normlime_weights_all_labels:
normlime_weights = normlime_weights_all_labels.get(y, {})
for k in normlime_weights:
normlime_weights[k] = sum(normlime_weights[k]) / len(normlime_weights[k])
# check normlime
if len(normlime_weights_all_labels.keys()) < max(normlime_weights_all_labels.keys()) + 1:
print(
"\n"
"Warning: !!! \n"
f"There are at least {max(normlime_weights_all_labels.keys()) + 1} classes, "
f"but the NormLIME has results of only {len(normlime_weights_all_labels.keys())} classes. \n"
"It may have cause unstable results in the later computation"
" but can be improved by computing more test samples."
"\n"
)
n = 0
f_out = f'normlime_weights_s{lime_num_samples}_samples_{len(a_list_lime_fnames)}-{n}.npy'
while os.path.exists(
os.path.join(save_dir, f_out)
):
n += 1
f_out = f'normlime_weights_s{lime_num_samples}_samples_{len(a_list_lime_fnames)}-{n}.npy'
continue
np.save(
os.path.join(save_dir, f_out),
normlime_weights_all_labels
)
return os.path.join(save_dir, f_out)
# copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
def interpretation_predict(model, images):
model.arrange_transforms(
transforms=model.test_transforms, mode='test')
new_imgs = []
for i in range(images.shape[0]):
img = images[i]
new_imgs.append(model.test_transforms(img)[0])
new_imgs = np.array(new_imgs)
result = model.exe.run(
model.test_prog,
feed={'image': new_imgs},
fetch_list=list(model.interpretation_feats.values()))
return result
\ No newline at end of file
#copyright (c) 2020 PaddlePaddle Authors. All Rights Reserve.
#
#Licensed under the Apache License, Version 2.0 (the "License");
#you may not use this file except in compliance with the License.
#You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#Unless required by applicable law or agreed to in writing, software
#distributed under the License is distributed on an "AS IS" BASIS,
#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#See the License for the specific language governing permissions and
#limitations under the License.
import os
import cv2
import copy
import os.path as osp
import numpy as np
import paddlex as pdx
from .interpretation_predict import interpretation_predict
from .core.interpretation import Interpretation
from .core.normlime_base import precompute_normlime_weights
def visualize(img_file,
model,
dataset=None,
algo='lime',
num_samples=3000,
batch_size=50,
save_dir='./'):
"""可解释性可视化。
Args:
img_file (str): 预测图像路径。
model (paddlex.cv.models): paddlex中的模型。
dataset (paddlex.datasets): 数据集读取器,默认为None。
algo (str): 可解释性方式,当前可选'lime'和'normlime'。
num_samples (int): LIME用于学习线性模型的采样数,默认为3000。
batch_size (int): 预测数据batch大小,默认为50。
save_dir (str): 可解释性可视化结果(保存为png格式文件)和中间文件存储路径。
"""
assert model.model_type == 'classifier', \
'Now the interpretation visualize only be supported in classifier!'
if model.status != 'Normal':
raise Exception('The interpretation only can deal with the Normal model')
model.arrange_transforms(
transforms=model.test_transforms, mode='test')
tmp_transforms = copy.deepcopy(model.test_transforms)
tmp_transforms.transforms = tmp_transforms.transforms[:-2]
img = tmp_transforms(img_file)[0]
img = np.around(img).astype('uint8')
img = np.expand_dims(img, axis=0)
interpreter = None
if algo == 'lime':
interpreter = get_lime_interpreter(img, model, dataset, num_samples=num_samples, batch_size=batch_size)
elif algo == 'normlime':
if dataset is None:
raise Exception('The dataset is None. Cannot implement this kind of interpretation')
interpreter = get_normlime_interpreter(img, model, dataset,
num_samples=num_samples, batch_size=batch_size,
save_dir=save_dir)
else:
raise Exception('The {} interpretation method is not supported yet!'.format(algo))
img_name = osp.splitext(osp.split(img_file)[-1])[0]
interpreter.interpret(img, save_dir=save_dir)
def get_lime_interpreter(img, model, dataset, num_samples=3000, batch_size=50):
def predict_func(image):
image = image.astype('float32')
for i in range(image.shape[0]):
image[i] = cv2.cvtColor(image[i], cv2.COLOR_RGB2BGR)
tmp_transforms = copy.deepcopy(model.test_transforms.transforms)
model.test_transforms.transforms = model.test_transforms.transforms[-2:]
out = interpretation_predict(model, image)
model.test_transforms.transforms = tmp_transforms
return out[0]
labels_name = None
if dataset is not None:
labels_name = dataset.labels
interpreter = Interpretation('lime',
predict_func,
labels_name,
num_samples=num_samples,
batch_size=batch_size)
return interpreter
def get_normlime_interpreter(img, model, dataset, num_samples=3000, batch_size=50, save_dir='./'):
def precompute_predict_func(image):
image = image.astype('float32')
tmp_transforms = copy.deepcopy(model.test_transforms.transforms)
model.test_transforms.transforms = model.test_transforms.transforms[-2:]
out = interpretation_predict(model, image)
model.test_transforms.transforms = tmp_transforms
return out[0]
def predict_func(image):
image = image.astype('float32')
for i in range(image.shape[0]):
image[i] = cv2.cvtColor(image[i], cv2.COLOR_RGB2BGR)
tmp_transforms = copy.deepcopy(model.test_transforms.transforms)
model.test_transforms.transforms = model.test_transforms.transforms[-2:]
out = interpretation_predict(model, image)
model.test_transforms.transforms = tmp_transforms
return out[0]
labels_name = None
if dataset is not None:
labels_name = dataset.labels
root_path = os.environ['HOME']
root_path = osp.join(root_path, '.paddlex')
pre_models_path = osp.join(root_path, "pre_models")
if not osp.exists(pre_models_path):
os.makedirs(pre_models_path)
url = "https://bj.bcebos.com/paddlex/interpret/pre_models.tar.gz"
pdx.utils.download_and_decompress(url, path=pre_models_path)
npy_dir = precompute_for_normlime(precompute_predict_func,
dataset,
num_samples=num_samples,
batch_size=batch_size,
save_dir=save_dir)
interpreter = Interpretation('normlime',
predict_func,
labels_name,
num_samples=num_samples,
batch_size=batch_size,
normlime_weights=npy_dir)
return interpreter
def precompute_for_normlime(predict_func, dataset, num_samples=3000, batch_size=50, save_dir='./'):
image_list = []
for item in dataset.file_list:
image_list.append(item[0])
return precompute_normlime_weights(
image_list,
predict_func,
num_samples=num_samples,
batch_size=batch_size,
save_dir=save_dir)
......@@ -14,11 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from .x2imagenet import EasyData2ImageNet
from .x2coco import LabelMe2COCO
from .x2coco import EasyData2COCO
from .x2voc import LabelMe2VOC
from .x2voc import EasyData2VOC
from .x2seg import JingLing2Seg
from .x2seg import LabelMe2Seg
from .x2seg import EasyData2Seg
from .convert import *
\ No newline at end of file
#!/usr/bin/env python
# coding: utf-8
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .x2imagenet import EasyData2ImageNet
from .x2coco import LabelMe2COCO
from .x2coco import EasyData2COCO
from .x2voc import LabelMe2VOC
from .x2voc import EasyData2VOC
from .x2seg import JingLing2Seg
from .x2seg import LabelMe2Seg
from .x2seg import EasyData2Seg
easydata2imagenet = EasyData2ImageNet().convert
labelme2coco = LabelMe2COCO().convert
easydata2coco = EasyData2COCO().convert
labelme2voc = LabelMe2VOC().convert
easydata2voc = EasyData2VOC().convert
jingling2seg = JingLing2Seg().convert
labelme2seg = LabelMe2Seg().convert
easydata2seg = EasyData2Seg().convert
......@@ -30,7 +30,7 @@ setuptools.setup(
setup_requires=['cython', 'numpy'],
install_requires=[
"pycocotools;platform_system!='Windows'", 'pyyaml', 'colorama', 'tqdm',
'paddleslim==1.0.1', 'visualdl==2.0.0a2'
'paddleslim==1.0.1', 'visualdl>=2.0.0a2'
],
classifiers=[
"Programming Language :: Python :: 3",
......
import os
# 选择使用0号卡
os.environ['CUDA_VISIBLE_DEVICES'] = '0'
import os.path as osp
import paddlex as pdx
# 下载和解压Imagenet果蔬分类数据集
veg_dataset = 'https://bj.bcebos.com/paddlex/interpret/mini_imagenet_veg.tar.gz'
pdx.utils.download_and_decompress(veg_dataset, path='./')
# 下载和解压已训练好的MobileNetV2模型
model_file = 'https://bj.bcebos.com/paddlex/interpret/mini_imagenet_veg_mobilenetv2.tar.gz'
pdx.utils.download_and_decompress(model_file, path='./')
# 加载模型
model = pdx.load_model('mini_imagenet_veg_mobilenetv2')
# 定义测试所用的数据集
test_dataset = pdx.datasets.ImageNet(
data_dir='mini_imagenet_veg',
file_list=osp.join('mini_imagenet_veg', 'test_list.txt'),
label_list=osp.join('mini_imagenet_veg', 'labels.txt'),
transforms=model.test_transforms)
# 可解释性可视化
# LIME算法
pdx.interpret.visualize(
'mini_imagenet_veg/mushroom/n07734744_1106.JPEG',
model,
test_dataset,
algo='lime',
save_dir='./')
# NormLIME算法
pdx.interpret.visualize(
'mini_imagenet_veg/mushroom/n07734744_1106.JPEG',
model,
test_dataset,
algo='normlime',
save_dir='./')
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册