提交 151e10e8 编写于 作者: M mindspore-ci-bot 提交者: Gitee

!17 Add MNIST-examples which can be running on CPU device.

Merge pull request !17 from jxlang910/master
......@@ -27,12 +27,12 @@ from mindarmour.attacks.carlini_wagner import CarliniWagnerL2Attack
from mindarmour.evaluations.attack_evaluation import AttackEvaluate
from mindarmour.utils.logger import LogUtil
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
sys.path.append("..")
from data_processing import generate_mnist_dataset
LOGGER = LogUtil.get_instance()
LOGGER.set_level('INFO')
TAG = 'CW_Test'
......@@ -45,6 +45,80 @@ def test_carlini_wagner_attack():
"""
CW-Attack test
"""
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
# upload trained network
ckpt_name = './trained_ckpt_file/checkpoint_lenet-10_1875.ckpt'
net = LeNet5()
load_dict = load_checkpoint(ckpt_name)
load_param_into_net(net, load_dict)
# get test data
data_list = "./MNIST_unzip/test"
batch_size = 32
ds = generate_mnist_dataset(data_list, batch_size=batch_size)
# prediction accuracy before attack
model = Model(net)
batch_num = 3 # the number of batches of attacking samples
test_images = []
test_labels = []
predict_labels = []
i = 0
for data in ds.create_tuple_iterator():
i += 1
images = data[0].astype(np.float32)
labels = data[1]
test_images.append(images)
test_labels.append(labels)
pred_labels = np.argmax(model.predict(Tensor(images)).asnumpy(),
axis=1)
predict_labels.append(pred_labels)
if i >= batch_num:
break
predict_labels = np.concatenate(predict_labels)
true_labels = np.concatenate(test_labels)
accuracy = np.mean(np.equal(predict_labels, true_labels))
LOGGER.info(TAG, "prediction accuracy before attacking is : %s", accuracy)
# attacking
num_classes = 10
attack = CarliniWagnerL2Attack(net, num_classes, targeted=False)
start_time = time.clock()
adv_data = attack.batch_generate(np.concatenate(test_images),
np.concatenate(test_labels), batch_size=32)
stop_time = time.clock()
pred_logits_adv = model.predict(Tensor(adv_data)).asnumpy()
# rescale predict confidences into (0, 1).
pred_logits_adv = softmax(pred_logits_adv, axis=1)
pred_labels_adv = np.argmax(pred_logits_adv, axis=1)
accuracy_adv = np.mean(np.equal(pred_labels_adv, true_labels))
LOGGER.info(TAG, "prediction accuracy after attacking is : %s",
accuracy_adv)
test_labels = np.eye(10)[np.concatenate(test_labels)]
attack_evaluate = AttackEvaluate(np.concatenate(test_images).transpose(0, 2, 3, 1),
test_labels, adv_data.transpose(0, 2, 3, 1),
pred_logits_adv)
LOGGER.info(TAG, 'mis-classification rate of adversaries is : %s',
attack_evaluate.mis_classification_rate())
LOGGER.info(TAG, 'The average confidence of adversarial class is : %s',
attack_evaluate.avg_conf_adv_class())
LOGGER.info(TAG, 'The average confidence of true class is : %s',
attack_evaluate.avg_conf_true_class())
LOGGER.info(TAG, 'The average distance (l0, l2, linf) between original '
'samples and adversarial samples are: %s',
attack_evaluate.avg_lp_distance())
LOGGER.info(TAG, 'The average structural similarity between original '
'samples and adversarial samples are: %s',
attack_evaluate.avg_ssim())
LOGGER.info(TAG, 'The average costing time is %s',
(stop_time - start_time)/(batch_num*batch_size))
def test_carlini_wagner_attack_cpu():
"""
CW-Attack test for CPU device.
"""
context.set_context(mode=context.GRAPH_MODE, device_target="CPU")
# upload trained network
ckpt_name = './trained_ckpt_file/checkpoint_lenet-10_1875.ckpt'
net = LeNet5()
......@@ -114,4 +188,4 @@ def test_carlini_wagner_attack():
if __name__ == '__main__':
test_carlini_wagner_attack()
test_carlini_wagner_attack_cpu()
......@@ -27,13 +27,12 @@ from mindarmour.attacks.deep_fool import DeepFool
from mindarmour.evaluations.attack_evaluation import AttackEvaluate
from mindarmour.utils.logger import LogUtil
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
sys.path.append("..")
from data_processing import generate_mnist_dataset
LOGGER = LogUtil.get_instance()
LOGGER.set_level('INFO')
TAG = 'DeepFool_Test'
......@@ -46,6 +45,81 @@ def test_deepfool_attack():
"""
DeepFool-Attack test
"""
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
# upload trained network
ckpt_name = './trained_ckpt_file/checkpoint_lenet-10_1875.ckpt'
net = LeNet5()
load_dict = load_checkpoint(ckpt_name)
load_param_into_net(net, load_dict)
# get test data
data_list = "./MNIST_unzip/test"
batch_size = 32
ds = generate_mnist_dataset(data_list, batch_size=batch_size)
# prediction accuracy before attack
model = Model(net)
batch_num = 3 # the number of batches of attacking samples
test_images = []
test_labels = []
predict_labels = []
i = 0
for data in ds.create_tuple_iterator():
i += 1
images = data[0].astype(np.float32)
labels = data[1]
test_images.append(images)
test_labels.append(labels)
pred_labels = np.argmax(model.predict(Tensor(images)).asnumpy(),
axis=1)
predict_labels.append(pred_labels)
if i >= batch_num:
break
predict_labels = np.concatenate(predict_labels)
true_labels = np.concatenate(test_labels)
accuracy = np.mean(np.equal(predict_labels, true_labels))
LOGGER.info(TAG, "prediction accuracy before attacking is : %s", accuracy)
# attacking
classes = 10
attack = DeepFool(net, classes, norm_level=2,
bounds=(0.0, 1.0))
start_time = time.clock()
adv_data = attack.batch_generate(np.concatenate(test_images),
np.concatenate(test_labels), batch_size=32)
stop_time = time.clock()
pred_logits_adv = model.predict(Tensor(adv_data)).asnumpy()
# rescale predict confidences into (0, 1).
pred_logits_adv = softmax(pred_logits_adv, axis=1)
pred_labels_adv = np.argmax(pred_logits_adv, axis=1)
accuracy_adv = np.mean(np.equal(pred_labels_adv, true_labels))
LOGGER.info(TAG, "prediction accuracy after attacking is : %s",
accuracy_adv)
test_labels = np.eye(10)[np.concatenate(test_labels)]
attack_evaluate = AttackEvaluate(np.concatenate(test_images).transpose(0, 2, 3, 1),
test_labels, adv_data.transpose(0, 2, 3, 1),
pred_logits_adv)
LOGGER.info(TAG, 'mis-classification rate of adversaries is : %s',
attack_evaluate.mis_classification_rate())
LOGGER.info(TAG, 'The average confidence of adversarial class is : %s',
attack_evaluate.avg_conf_adv_class())
LOGGER.info(TAG, 'The average confidence of true class is : %s',
attack_evaluate.avg_conf_true_class())
LOGGER.info(TAG, 'The average distance (l0, l2, linf) between original '
'samples and adversarial samples are: %s',
attack_evaluate.avg_lp_distance())
LOGGER.info(TAG, 'The average structural similarity between original '
'samples and adversarial samples are: %s',
attack_evaluate.avg_ssim())
LOGGER.info(TAG, 'The average costing time is %s',
(stop_time - start_time)/(batch_num*batch_size))
def test_deepfool_attack_cpu():
"""
DeepFool-Attack test for CPU device.
"""
context.set_context(mode=context.GRAPH_MODE, device_target="CPU")
# upload trained network
ckpt_name = './trained_ckpt_file/checkpoint_lenet-10_1875.ckpt'
net = LeNet5()
......@@ -116,4 +190,4 @@ def test_deepfool_attack():
if __name__ == '__main__':
test_deepfool_attack()
test_deepfool_attack_cpu()
......@@ -20,6 +20,7 @@ from mindspore import Model
from mindspore import Tensor
from mindspore import context
from mindspore.train.serialization import load_checkpoint, load_param_into_net
from mindspore.nn import SoftmaxCrossEntropyWithLogits
from scipy.special import softmax
from lenet5_net import LeNet5
......@@ -27,13 +28,12 @@ from mindarmour.attacks.gradient_method import FastGradientSignMethod
from mindarmour.evaluations.attack_evaluation import AttackEvaluate
from mindarmour.utils.logger import LogUtil
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
sys.path.append("..")
from data_processing import generate_mnist_dataset
LOGGER = LogUtil.get_instance()
LOGGER.set_level('INFO')
TAG = 'FGSM_Test'
......@@ -46,6 +46,7 @@ def test_fast_gradient_sign_method():
"""
FGSM-Attack test
"""
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
# upload trained network
ckpt_name = './trained_ckpt_file/checkpoint_lenet-10_1875.ckpt'
net = LeNet5()
......@@ -113,5 +114,78 @@ def test_fast_gradient_sign_method():
(stop_time - start_time)/(batch_num*batch_size))
def test_fast_gradient_sign_method_cpu():
"""
FGSM-Attack test for CPU device.
"""
context.set_context(mode=context.GRAPH_MODE, device_target="CPU")
# upload trained network
ckpt_name = './trained_ckpt_file/checkpoint_lenet-10_1875.ckpt'
net = LeNet5()
load_dict = load_checkpoint(ckpt_name)
load_param_into_net(net, load_dict)
# get test data
data_list = "./MNIST_unzip/test"
batch_size = 32
ds = generate_mnist_dataset(data_list, batch_size)
# prediction accuracy before attack
model = Model(net)
batch_num = 3 # the number of batches of attacking samples
test_images = []
test_labels = []
predict_labels = []
i = 0
for data in ds.create_tuple_iterator():
i += 1
images = data[0].astype(np.float32)
labels = data[1]
test_images.append(images)
test_labels.append(labels)
pred_labels = np.argmax(model.predict(Tensor(images)).asnumpy(),
axis=1)
predict_labels.append(pred_labels)
if i >= batch_num:
break
predict_labels = np.concatenate(predict_labels)
true_labels = np.concatenate(test_labels)
accuracy = np.mean(np.equal(predict_labels, true_labels))
LOGGER.info(TAG, "prediction accuracy before attacking is : %s", accuracy)
# attacking
loss = SoftmaxCrossEntropyWithLogits(is_grad=False, sparse=True)
attack = FastGradientSignMethod(net, eps=0.3, loss_fn=loss)
start_time = time.clock()
adv_data = attack.batch_generate(np.concatenate(test_images),
true_labels, batch_size=32)
stop_time = time.clock()
np.save('./adv_data', adv_data)
pred_logits_adv = model.predict(Tensor(adv_data)).asnumpy()
# rescale predict confidences into (0, 1).
pred_logits_adv = softmax(pred_logits_adv, axis=1)
pred_labels_adv = np.argmax(pred_logits_adv, axis=1)
accuracy_adv = np.mean(np.equal(pred_labels_adv, true_labels))
LOGGER.info(TAG, "prediction accuracy after attacking is : %s", accuracy_adv)
attack_evaluate = AttackEvaluate(np.concatenate(test_images).transpose(0, 2, 3, 1),
np.eye(10)[true_labels],
adv_data.transpose(0, 2, 3, 1),
pred_logits_adv)
LOGGER.info(TAG, 'mis-classification rate of adversaries is : %s',
attack_evaluate.mis_classification_rate())
LOGGER.info(TAG, 'The average confidence of adversarial class is : %s',
attack_evaluate.avg_conf_adv_class())
LOGGER.info(TAG, 'The average confidence of true class is : %s',
attack_evaluate.avg_conf_true_class())
LOGGER.info(TAG, 'The average distance (l0, l2, linf) between original '
'samples and adversarial samples are: %s',
attack_evaluate.avg_lp_distance())
LOGGER.info(TAG, 'The average structural similarity between original '
'samples and adversarial samples are: %s',
attack_evaluate.avg_ssim())
LOGGER.info(TAG, 'The average costing time is %s',
(stop_time - start_time)/(batch_num*batch_size))
if __name__ == '__main__':
test_fast_gradient_sign_method()
test_fast_gradient_sign_method_cpu()
......@@ -27,12 +27,12 @@ from mindarmour.attacks.black.genetic_attack import GeneticAttack
from mindarmour.evaluations.attack_evaluation import AttackEvaluate
from mindarmour.utils.logger import LogUtil
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
sys.path.append("..")
from data_processing import generate_mnist_dataset
LOGGER = LogUtil.get_instance()
LOGGER.set_level('INFO')
TAG = 'Genetic_Attack'
......@@ -58,6 +58,87 @@ def test_genetic_attack_on_mnist():
"""
Genetic-Attack test
"""
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
# upload trained network
ckpt_name = './trained_ckpt_file/checkpoint_lenet-10_1875.ckpt'
net = LeNet5()
load_dict = load_checkpoint(ckpt_name)
load_param_into_net(net, load_dict)
# get test data
data_list = "./MNIST_unzip/test"
batch_size = 32
ds = generate_mnist_dataset(data_list, batch_size=batch_size)
# prediction accuracy before attack
model = ModelToBeAttacked(net)
batch_num = 3 # the number of batches of attacking samples
test_images = []
test_labels = []
predict_labels = []
i = 0
for data in ds.create_tuple_iterator():
i += 1
images = data[0].astype(np.float32)
labels = data[1]
test_images.append(images)
test_labels.append(labels)
pred_labels = np.argmax(model.predict(images), axis=1)
predict_labels.append(pred_labels)
if i >= batch_num:
break
predict_labels = np.concatenate(predict_labels)
true_labels = np.concatenate(test_labels)
accuracy = np.mean(np.equal(predict_labels, true_labels))
LOGGER.info(TAG, "prediction accuracy before attacking is : %g", accuracy)
# attacking
attack = GeneticAttack(model=model, pop_size=6, mutation_rate=0.05,
per_bounds=0.1, step_size=0.25, temp=0.1,
sparse=True)
targeted_labels = np.random.randint(0, 10, size=len(true_labels))
for i, true_l in enumerate(true_labels):
if targeted_labels[i] == true_l:
targeted_labels[i] = (targeted_labels[i] + 1) % 10
start_time = time.clock()
success_list, adv_data, query_list = attack.generate(
np.concatenate(test_images), targeted_labels)
stop_time = time.clock()
LOGGER.info(TAG, 'success_list: %s', success_list)
LOGGER.info(TAG, 'average of query times is : %s', np.mean(query_list))
pred_logits_adv = model.predict(adv_data)
# rescale predict confidences into (0, 1).
pred_logits_adv = softmax(pred_logits_adv, axis=1)
pred_lables_adv = np.argmax(pred_logits_adv, axis=1)
accuracy_adv = np.mean(np.equal(pred_lables_adv, true_labels))
LOGGER.info(TAG, "prediction accuracy after attacking is : %g",
accuracy_adv)
test_labels_onehot = np.eye(10)[true_labels]
attack_evaluate = AttackEvaluate(np.concatenate(test_images),
test_labels_onehot, adv_data,
pred_logits_adv, targeted=True,
target_label=targeted_labels)
LOGGER.info(TAG, 'mis-classification rate of adversaries is : %s',
attack_evaluate.mis_classification_rate())
LOGGER.info(TAG, 'The average confidence of adversarial class is : %s',
attack_evaluate.avg_conf_adv_class())
LOGGER.info(TAG, 'The average confidence of true class is : %s',
attack_evaluate.avg_conf_true_class())
LOGGER.info(TAG, 'The average distance (l0, l2, linf) between original '
'samples and adversarial samples are: %s',
attack_evaluate.avg_lp_distance())
LOGGER.info(TAG, 'The average structural similarity between original '
'samples and adversarial samples are: %s',
attack_evaluate.avg_ssim())
LOGGER.info(TAG, 'The average costing time is %s',
(stop_time - start_time)/(batch_num*batch_size))
def test_genetic_attack_on_mnist_cpu():
"""
Genetic-Attack test for CPU device.
"""
context.set_context(mode=context.GRAPH_MODE, device_target="CPU")
# upload trained network
ckpt_name = './trained_ckpt_file/checkpoint_lenet-10_1875.ckpt'
net = LeNet5()
......@@ -134,4 +215,4 @@ def test_genetic_attack_on_mnist():
if __name__ == '__main__':
test_genetic_attack_on_mnist()
test_genetic_attack_on_mnist_cpu()
......@@ -27,10 +27,8 @@ from mindarmour.utils.logger import LogUtil
sys.path.append("..")
from data_processing import generate_mnist_dataset
context.set_context(mode=context.GRAPH_MODE)
context.set_context(device_target="Ascend")
LOGGER = LogUtil.get_instance()
LOGGER.set_level('INFO')
TAG = 'HopSkipJumpAttack'
......@@ -79,6 +77,81 @@ def test_hsja_mnist_attack():
"""
hsja-Attack test
"""
context.set_context(mode=context.GRAPH_MODE)
context.set_context(device_target="Ascend")
# upload trained network
ckpt_name = './trained_ckpt_file/checkpoint_lenet-10_1875.ckpt'
net = LeNet5()
load_dict = load_checkpoint(ckpt_name)
load_param_into_net(net, load_dict)
net.set_train(False)
# get test data
data_list = "./MNIST_unzip/test"
batch_size = 32
ds = generate_mnist_dataset(data_list, batch_size=batch_size)
# prediction accuracy before attack
model = ModelToBeAttacked(net)
batch_num = 5 # the number of batches of attacking samples
test_images = []
test_labels = []
predict_labels = []
i = 0
for data in ds.create_tuple_iterator():
i += 1
images = data[0].astype(np.float32)
labels = data[1]
test_images.append(images)
test_labels.append(labels)
pred_labels = np.argmax(model.predict(images), axis=1)
predict_labels.append(pred_labels)
if i >= batch_num:
break
predict_labels = np.concatenate(predict_labels)
true_labels = np.concatenate(test_labels)
accuracy = np.mean(np.equal(predict_labels, true_labels))
LOGGER.info(TAG, "prediction accuracy before attacking is : %s",
accuracy)
test_images = np.concatenate(test_images)
# attacking
norm = 'l2'
search = 'grid_search'
target = False
attack = HopSkipJumpAttack(model, constraint=norm, stepsize_search=search)
if target:
target_labels = random_target_labels(true_labels)
target_images = create_target_images(test_images, predict_labels,
target_labels)
attack.set_target_images(target_images)
success_list, adv_data, _ = attack.generate(test_images, target_labels)
else:
success_list, adv_data, _ = attack.generate(test_images, None)
adv_datas = []
gts = []
for success, adv, gt in zip(success_list, adv_data, true_labels):
if success:
adv_datas.append(adv)
gts.append(gt)
if gts:
adv_datas = np.concatenate(np.asarray(adv_datas), axis=0)
gts = np.asarray(gts)
pred_logits_adv = model.predict(adv_datas)
pred_lables_adv = np.argmax(pred_logits_adv, axis=1)
accuracy_adv = np.mean(np.equal(pred_lables_adv, gts))
mis_rate = (1 - accuracy_adv)*(len(adv_datas) / len(success_list))
LOGGER.info(TAG, 'mis-classification rate of adversaries is : %s',
mis_rate)
def test_hsja_mnist_attack_cpu():
"""
hsja-Attack test
"""
context.set_context(mode=context.GRAPH_MODE)
context.set_context(device_target="CPU")
# upload trained network
ckpt_name = './trained_ckpt_file/checkpoint_lenet-10_1875.ckpt'
net = LeNet5()
......@@ -141,9 +214,10 @@ def test_hsja_mnist_attack():
pred_logits_adv = model.predict(adv_datas)
pred_lables_adv = np.argmax(pred_logits_adv, axis=1)
accuracy_adv = np.mean(np.equal(pred_lables_adv, gts))
mis_rate = (1 - accuracy_adv)*(len(adv_datas) / len(success_list))
LOGGER.info(TAG, 'mis-classification rate of adversaries is : %s',
accuracy_adv)
mis_rate)
if __name__ == '__main__':
test_hsja_mnist_attack()
test_hsja_mnist_attack_cpu()
......@@ -27,13 +27,14 @@ from mindarmour.attacks.jsma import JSMAAttack
from mindarmour.evaluations.attack_evaluation import AttackEvaluate
from mindarmour.utils.logger import LogUtil
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
sys.path.append("..")
from data_processing import generate_mnist_dataset
LOGGER = LogUtil.get_instance()
LOGGER.set_level('INFO')
TAG = 'JSMA_Test'
......@@ -46,6 +47,85 @@ def test_jsma_attack():
"""
JSMA-Attack test
"""
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
# upload trained network
ckpt_name = './trained_ckpt_file/checkpoint_lenet-10_1875.ckpt'
net = LeNet5()
load_dict = load_checkpoint(ckpt_name)
load_param_into_net(net, load_dict)
# get test data
data_list = "./MNIST_unzip/test"
batch_size = 32
ds = generate_mnist_dataset(data_list, batch_size=batch_size)
# prediction accuracy before attack
model = Model(net)
batch_num = 3 # the number of batches of attacking samples
test_images = []
test_labels = []
predict_labels = []
i = 0
for data in ds.create_tuple_iterator():
i += 1
images = data[0].astype(np.float32)
labels = data[1]
test_images.append(images)
test_labels.append(labels)
pred_labels = np.argmax(model.predict(Tensor(images)).asnumpy(),
axis=1)
predict_labels.append(pred_labels)
if i >= batch_num:
break
predict_labels = np.concatenate(predict_labels)
true_labels = np.concatenate(test_labels)
targeted_labels = np.random.randint(0, 10, size=len(true_labels))
for i, true_l in enumerate(true_labels):
if targeted_labels[i] == true_l:
targeted_labels[i] = (targeted_labels[i] + 1) % 10
accuracy = np.mean(np.equal(predict_labels, true_labels))
LOGGER.info(TAG, "prediction accuracy before attacking is : %g", accuracy)
# attacking
classes = 10
attack = JSMAAttack(net, classes)
start_time = time.clock()
adv_data = attack.batch_generate(np.concatenate(test_images),
targeted_labels, batch_size=32)
stop_time = time.clock()
pred_logits_adv = model.predict(Tensor(adv_data)).asnumpy()
# rescale predict confidences into (0, 1).
pred_logits_adv = softmax(pred_logits_adv, axis=1)
pred_lables_adv = np.argmax(pred_logits_adv, axis=1)
accuracy_adv = np.mean(np.equal(pred_lables_adv, true_labels))
LOGGER.info(TAG, "prediction accuracy after attacking is : %g",
accuracy_adv)
test_labels = np.eye(10)[np.concatenate(test_labels)]
attack_evaluate = AttackEvaluate(
np.concatenate(test_images).transpose(0, 2, 3, 1),
test_labels, adv_data.transpose(0, 2, 3, 1),
pred_logits_adv, targeted=True, target_label=targeted_labels)
LOGGER.info(TAG, 'mis-classification rate of adversaries is : %s',
attack_evaluate.mis_classification_rate())
LOGGER.info(TAG, 'The average confidence of adversarial class is : %s',
attack_evaluate.avg_conf_adv_class())
LOGGER.info(TAG, 'The average confidence of true class is : %s',
attack_evaluate.avg_conf_true_class())
LOGGER.info(TAG, 'The average distance (l0, l2, linf) between original '
'samples and adversarial samples are: %s',
attack_evaluate.avg_lp_distance())
LOGGER.info(TAG, 'The average structural similarity between original '
'samples and adversarial samples are: %s',
attack_evaluate.avg_ssim())
LOGGER.info(TAG, 'The average costing time is %s',
(stop_time - start_time) / (batch_num*batch_size))
def test_jsma_attack_cpu():
"""
JSMA-Attack test for CPU device.
"""
context.set_context(mode=context.GRAPH_MODE, device_target="CPU")
# upload trained network
ckpt_name = './trained_ckpt_file/checkpoint_lenet-10_1875.ckpt'
net = LeNet5()
......@@ -120,4 +200,4 @@ def test_jsma_attack():
if __name__ == '__main__':
test_jsma_attack()
test_jsma_attack_cpu()
......@@ -20,6 +20,7 @@ from mindspore import Model
from mindspore import Tensor
from mindspore import context
from mindspore.train.serialization import load_checkpoint, load_param_into_net
from mindspore.nn import SoftmaxCrossEntropyWithLogits
from scipy.special import softmax
from lenet5_net import LeNet5
......@@ -27,13 +28,12 @@ from mindarmour.attacks.lbfgs import LBFGS
from mindarmour.evaluations.attack_evaluation import AttackEvaluate
from mindarmour.utils.logger import LogUtil
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
sys.path.append("..")
from data_processing import generate_mnist_dataset
LOGGER = LogUtil.get_instance()
LOGGER.set_level('INFO')
TAG = 'LBFGS_Test'
......@@ -46,6 +46,7 @@ def test_lbfgs_attack():
"""
LBFGS-Attack test
"""
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
# upload trained network
ckpt_name = './trained_ckpt_file/checkpoint_lenet-10_1875.ckpt'
net = LeNet5()
......@@ -127,5 +128,90 @@ def test_lbfgs_attack():
(stop_time - start_time)/(batch_num*batch_size))
def test_lbfgs_attack_cpu():
"""
LBFGS-Attack test for CPU device.
"""
context.set_context(mode=context.GRAPH_MODE, device_target="CPU")
# upload trained network
ckpt_name = './trained_ckpt_file/checkpoint_lenet-10_1875.ckpt'
net = LeNet5()
load_dict = load_checkpoint(ckpt_name)
load_param_into_net(net, load_dict)
# get test data
data_list = "./MNIST_unzip/test"
batch_size = 32
ds = generate_mnist_dataset(data_list, batch_size=batch_size)
# prediction accuracy before attack
model = Model(net)
batch_num = 3 # the number of batches of attacking samples
test_images = []
test_labels = []
predict_labels = []
i = 0
for data in ds.create_tuple_iterator():
i += 1
images = data[0].astype(np.float32)
labels = data[1]
test_images.append(images)
test_labels.append(labels)
pred_labels = np.argmax(model.predict(Tensor(images)).asnumpy(),
axis=1)
predict_labels.append(pred_labels)
if i >= batch_num:
break
predict_labels = np.concatenate(predict_labels)
true_labels = np.concatenate(test_labels)
accuracy = np.mean(np.equal(predict_labels, true_labels))
LOGGER.info(TAG, "prediction accuracy before attacking is : %s", accuracy)
# attacking
is_targeted = True
if is_targeted:
targeted_labels = np.random.randint(0, 10, size=len(true_labels)).astype(np.int32)
for i, true_l in enumerate(true_labels):
if targeted_labels[i] == true_l:
targeted_labels[i] = (targeted_labels[i] + 1) % 10
else:
targeted_labels = true_labels.astype(np.int32)
loss = SoftmaxCrossEntropyWithLogits(is_grad=False, sparse=True)
attack = LBFGS(net, is_targeted=is_targeted, loss_fn=loss)
start_time = time.clock()
adv_data = attack.batch_generate(np.concatenate(test_images),
targeted_labels,
batch_size=batch_size)
stop_time = time.clock()
pred_logits_adv = model.predict(Tensor(adv_data)).asnumpy()
# rescale predict confidences into (0, 1).
pred_logits_adv = softmax(pred_logits_adv, axis=1)
pred_labels_adv = np.argmax(pred_logits_adv, axis=1)
accuracy_adv = np.mean(np.equal(pred_labels_adv, true_labels))
LOGGER.info(TAG, "prediction accuracy after attacking is : %s",
accuracy_adv)
attack_evaluate = AttackEvaluate(np.concatenate(test_images).transpose(0, 2, 3, 1),
np.eye(10)[true_labels],
adv_data.transpose(0, 2, 3, 1),
pred_logits_adv,
targeted=is_targeted,
target_label=targeted_labels)
LOGGER.info(TAG, 'mis-classification rate of adversaries is : %s',
attack_evaluate.mis_classification_rate())
LOGGER.info(TAG, 'The average confidence of adversarial class is : %s',
attack_evaluate.avg_conf_adv_class())
LOGGER.info(TAG, 'The average confidence of true class is : %s',
attack_evaluate.avg_conf_true_class())
LOGGER.info(TAG, 'The average distance (l0, l2, linf) between original '
'samples and adversarial samples are: %s',
attack_evaluate.avg_lp_distance())
LOGGER.info(TAG, 'The average structural similarity between original '
'samples and adversarial samples are: %s',
attack_evaluate.avg_ssim())
LOGGER.info(TAG, 'The average costing time is %s',
(stop_time - start_time)/(batch_num*batch_size))
if __name__ == '__main__':
test_lbfgs_attack()
test_lbfgs_attack_cpu()
......@@ -20,6 +20,7 @@ from mindspore import Model
from mindspore import Tensor
from mindspore import context
from mindspore.train.serialization import load_checkpoint, load_param_into_net
from mindspore.nn import SoftmaxCrossEntropyWithLogits
from scipy.special import softmax
from lenet5_net import LeNet5
......@@ -28,8 +29,6 @@ from mindarmour.attacks.iterative_gradient_method import \
from mindarmour.evaluations.attack_evaluation import AttackEvaluate
from mindarmour.utils.logger import LogUtil
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
sys.path.append("..")
from data_processing import generate_mnist_dataset
......@@ -47,6 +46,7 @@ def test_momentum_diverse_input_iterative_method():
"""
M-DI2-FGSM Attack Test
"""
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
# upload trained network
ckpt_name = './trained_ckpt_file/checkpoint_lenet-10_1875.ckpt'
net = LeNet5()
......@@ -113,5 +113,77 @@ def test_momentum_diverse_input_iterative_method():
(stop_time - start_time)/(batch_num*batch_size))
def test_momentum_diverse_input_iterative_method_cpu():
"""
M-DI2-FGSM Attack Test for CPU device.
"""
context.set_context(mode=context.GRAPH_MODE, device_target="CPU")
# upload trained network
ckpt_name = './trained_ckpt_file/checkpoint_lenet-10_1875.ckpt'
net = LeNet5()
load_dict = load_checkpoint(ckpt_name)
load_param_into_net(net, load_dict)
# get test data
data_list = "./MNIST_unzip/test"
batch_size = 32
ds = generate_mnist_dataset(data_list, batch_size)
# prediction accuracy before attack
model = Model(net)
batch_num = 32 # the number of batches of attacking samples
test_images = []
test_labels = []
predict_labels = []
i = 0
for data in ds.create_tuple_iterator():
i += 1
images = data[0].astype(np.float32)
labels = data[1]
test_images.append(images)
test_labels.append(labels)
pred_labels = np.argmax(model.predict(Tensor(images)).asnumpy(),
axis=1)
predict_labels.append(pred_labels)
if i >= batch_num:
break
predict_labels = np.concatenate(predict_labels)
true_labels = np.concatenate(test_labels)
accuracy = np.mean(np.equal(predict_labels, true_labels))
LOGGER.info(TAG, "prediction accuracy before attacking is : %s", accuracy)
# attacking
loss = SoftmaxCrossEntropyWithLogits(is_grad=False, sparse=True)
attack = MomentumDiverseInputIterativeMethod(net, loss_fn=loss)
start_time = time.clock()
adv_data = attack.batch_generate(np.concatenate(test_images),
true_labels, batch_size=32)
stop_time = time.clock()
pred_logits_adv = model.predict(Tensor(adv_data)).asnumpy()
# rescale predict confidences into (0, 1).
pred_logits_adv = softmax(pred_logits_adv, axis=1)
pred_labels_adv = np.argmax(pred_logits_adv, axis=1)
accuracy_adv = np.mean(np.equal(pred_labels_adv, true_labels))
LOGGER.info(TAG, "prediction accuracy after attacking is : %s", accuracy_adv)
attack_evaluate = AttackEvaluate(np.concatenate(test_images).transpose(0, 2, 3, 1),
np.eye(10)[true_labels],
adv_data.transpose(0, 2, 3, 1),
pred_logits_adv)
LOGGER.info(TAG, 'mis-classification rate of adversaries is : %s',
attack_evaluate.mis_classification_rate())
LOGGER.info(TAG, 'The average confidence of adversarial class is : %s',
attack_evaluate.avg_conf_adv_class())
LOGGER.info(TAG, 'The average confidence of true class is : %s',
attack_evaluate.avg_conf_true_class())
LOGGER.info(TAG, 'The average distance (l0, l2, linf) between original '
'samples and adversarial samples are: %s',
attack_evaluate.avg_lp_distance())
LOGGER.info(TAG, 'The average structural similarity between original '
'samples and adversarial samples are: %s',
attack_evaluate.avg_ssim())
LOGGER.info(TAG, 'The average costing time is %s',
(stop_time - start_time)/(batch_num*batch_size))
if __name__ == '__main__':
test_momentum_diverse_input_iterative_method()
test_momentum_diverse_input_iterative_method_cpu()
......@@ -27,10 +27,9 @@ from mindarmour.utils.logger import LogUtil
sys.path.append("..")
from data_processing import generate_mnist_dataset
context.set_context(mode=context.GRAPH_MODE)
context.set_context(device_target="Ascend")
LOGGER = LogUtil.get_instance()
LOGGER.set_level('INFO')
TAG = 'HopSkipJumpAttack'
......@@ -88,6 +87,89 @@ def test_nes_mnist_attack():
"""
hsja-Attack test
"""
context.set_context(mode=context.GRAPH_MODE)
context.set_context(device_target="Ascend")
# upload trained network
ckpt_name = './trained_ckpt_file/checkpoint_lenet-10_1875.ckpt'
net = LeNet5()
load_dict = load_checkpoint(ckpt_name)
load_param_into_net(net, load_dict)
net.set_train(False)
# get test data
data_list = "./MNIST_unzip/test"
batch_size = 32
ds = generate_mnist_dataset(data_list, batch_size=batch_size)
# prediction accuracy before attack
model = ModelToBeAttacked(net)
# the number of batches of attacking samples
batch_num = 5
test_images = []
test_labels = []
predict_labels = []
i = 0
for data in ds.create_tuple_iterator():
i += 1
images = data[0].astype(np.float32)
labels = data[1]
test_images.append(images)
test_labels.append(labels)
pred_labels = np.argmax(model.predict(images), axis=1)
predict_labels.append(pred_labels)
if i >= batch_num:
break
predict_labels = np.concatenate(predict_labels)
true_labels = np.concatenate(test_labels)
accuracy = np.mean(np.equal(predict_labels, true_labels))
LOGGER.info(TAG, "prediction accuracy before attacking is : %s",
accuracy)
test_images = np.concatenate(test_images)
# attacking
scene = 'Query_Limit'
if scene == 'Query_Limit':
top_k = -1
elif scene == 'Partial_Info':
top_k = 5
elif scene == 'Label_Only':
top_k = 5
success = 0
queries_num = 0
nes_instance = NES(model, scene, top_k=top_k)
test_length = 32
advs = []
for img_index in range(test_length):
# Initial image and class selection
initial_img = test_images[img_index]
orig_class = true_labels[img_index]
initial_img = [initial_img]
target_class = random_target_labels([orig_class], true_labels)
target_image = create_target_images(test_images, true_labels,
target_class)
nes_instance.set_target_images(target_image)
tag, adv, queries = nes_instance.generate(initial_img, target_class)
if tag[0]:
success += 1
queries_num += queries[0]
advs.append(adv)
advs = np.reshape(advs, (len(advs), 1, 32, 32))
adv_pred = np.argmax(model.predict(advs), axis=1)
adv_accuracy = np.mean(np.equal(adv_pred, true_labels[:test_length]))
LOGGER.info(TAG, "prediction accuracy after attacking is : %s",
adv_accuracy)
def test_nes_mnist_attack_cpu():
"""
hsja-Attack test for CPU device.
"""
context.set_context(mode=context.GRAPH_MODE)
context.set_context(device_target="CPU")
# upload trained network
ckpt_name = './trained_ckpt_file/checkpoint_lenet-10_1875.ckpt'
net = LeNet5()
......@@ -164,4 +246,4 @@ def test_nes_mnist_attack():
if __name__ == '__main__':
test_nes_mnist_attack()
test_nes_mnist_attack_cpu()
......@@ -20,6 +20,7 @@ from mindspore import Model
from mindspore import Tensor
from mindspore import context
from mindspore.train.serialization import load_checkpoint, load_param_into_net
from mindspore.nn import SoftmaxCrossEntropyWithLogits
from scipy.special import softmax
from lenet5_net import LeNet5
......@@ -27,13 +28,12 @@ from mindarmour.attacks.iterative_gradient_method import ProjectedGradientDescen
from mindarmour.evaluations.attack_evaluation import AttackEvaluate
from mindarmour.utils.logger import LogUtil
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
sys.path.append("..")
from data_processing import generate_mnist_dataset
LOGGER = LogUtil.get_instance()
LOGGER.set_level('INFO')
TAG = 'PGD_Test'
......@@ -46,6 +46,7 @@ def test_projected_gradient_descent_method():
"""
PGD-Attack test
"""
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
# upload trained network
ckpt_name = './trained_ckpt_file/checkpoint_lenet-10_1875.ckpt'
net = LeNet5()
......@@ -113,5 +114,78 @@ def test_projected_gradient_descent_method():
(stop_time - start_time)/(batch_num*batch_size))
def test_projected_gradient_descent_method_cpu():
"""
PGD-Attack test for CPU device.
"""
context.set_context(mode=context.GRAPH_MODE, device_target="CPU")
# upload trained network
ckpt_name = './trained_ckpt_file/checkpoint_lenet-10_1875.ckpt'
net = LeNet5()
load_dict = load_checkpoint(ckpt_name)
load_param_into_net(net, load_dict)
# get test data
data_list = "./MNIST_unzip/test"
batch_size = 32
ds = generate_mnist_dataset(data_list, batch_size)
# prediction accuracy before attack
model = Model(net)
batch_num = 32 # the number of batches of attacking samples
test_images = []
test_labels = []
predict_labels = []
i = 0
for data in ds.create_tuple_iterator():
i += 1
images = data[0].astype(np.float32)
labels = data[1]
test_images.append(images)
test_labels.append(labels)
pred_labels = np.argmax(model.predict(Tensor(images)).asnumpy(),
axis=1)
predict_labels.append(pred_labels)
if i >= batch_num:
break
predict_labels = np.concatenate(predict_labels)
true_labels = np.concatenate(test_labels)
accuracy = np.mean(np.equal(predict_labels, true_labels))
LOGGER.info(TAG, "prediction accuracy before attacking is : %s", accuracy)
# attacking
loss = SoftmaxCrossEntropyWithLogits(is_grad=False, sparse=True)
attack = ProjectedGradientDescent(net, eps=0.3, loss_fn=loss)
start_time = time.clock()
adv_data = attack.batch_generate(np.concatenate(test_images),
true_labels, batch_size=32)
stop_time = time.clock()
np.save('./adv_data', adv_data)
pred_logits_adv = model.predict(Tensor(adv_data)).asnumpy()
# rescale predict confidences into (0, 1).
pred_logits_adv = softmax(pred_logits_adv, axis=1)
pred_labels_adv = np.argmax(pred_logits_adv, axis=1)
accuracy_adv = np.mean(np.equal(pred_labels_adv, true_labels))
LOGGER.info(TAG, "prediction accuracy after attacking is : %s", accuracy_adv)
attack_evaluate = AttackEvaluate(np.concatenate(test_images).transpose(0, 2, 3, 1),
np.eye(10)[true_labels],
adv_data.transpose(0, 2, 3, 1),
pred_logits_adv)
LOGGER.info(TAG, 'mis-classification rate of adversaries is : %s',
attack_evaluate.mis_classification_rate())
LOGGER.info(TAG, 'The average confidence of adversarial class is : %s',
attack_evaluate.avg_conf_adv_class())
LOGGER.info(TAG, 'The average confidence of true class is : %s',
attack_evaluate.avg_conf_true_class())
LOGGER.info(TAG, 'The average distance (l0, l2, linf) between original '
'samples and adversarial samples are: %s',
attack_evaluate.avg_lp_distance())
LOGGER.info(TAG, 'The average structural similarity between original '
'samples and adversarial samples are: %s',
attack_evaluate.avg_ssim())
LOGGER.info(TAG, 'The average costing time is %s',
(stop_time - start_time)/(batch_num*batch_size))
if __name__ == '__main__':
test_projected_gradient_descent_method()
test_projected_gradient_descent_method_cpu()
......@@ -26,8 +26,6 @@ from mindarmour.attacks.black.pointwise_attack import PointWiseAttack
from mindarmour.evaluations.attack_evaluation import AttackEvaluate
from mindarmour.utils.logger import LogUtil
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
sys.path.append("..")
from data_processing import generate_mnist_dataset
......@@ -60,6 +58,85 @@ def test_pointwise_attack_on_mnist():
"""
Salt-and-Pepper-Attack test
"""
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
# upload trained network
ckpt_name = './trained_ckpt_file/checkpoint_lenet-10_1875.ckpt'
net = LeNet5()
load_dict = load_checkpoint(ckpt_name)
load_param_into_net(net, load_dict)
# get test data
data_list = "./MNIST_unzip/test"
batch_size = 32
ds = generate_mnist_dataset(data_list, batch_size=batch_size)
# prediction accuracy before attack
model = ModelToBeAttacked(net)
batch_num = 3 # the number of batches of attacking samples
test_images = []
test_labels = []
predict_labels = []
i = 0
for data in ds.create_tuple_iterator():
i += 1
images = data[0].astype(np.float32)
labels = data[1]
test_images.append(images)
test_labels.append(labels)
pred_labels = np.argmax(model.predict(images), axis=1)
predict_labels.append(pred_labels)
if i >= batch_num:
break
predict_labels = np.concatenate(predict_labels)
true_labels = np.concatenate(test_labels)
accuracy = np.mean(np.equal(predict_labels, true_labels))
LOGGER.info(TAG, "prediction accuracy before attacking is : %g", accuracy)
# attacking
is_target = False
attack = PointWiseAttack(model=model, is_targeted=is_target)
if is_target:
targeted_labels = np.random.randint(0, 10, size=len(true_labels))
for i, true_l in enumerate(true_labels):
if targeted_labels[i] == true_l:
targeted_labels[i] = (targeted_labels[i] + 1) % 10
else:
targeted_labels = true_labels
success_list, adv_data, query_list = attack.generate(
np.concatenate(test_images), targeted_labels)
success_list = np.arange(success_list.shape[0])[success_list]
LOGGER.info(TAG, 'success_list: %s', success_list)
LOGGER.info(TAG, 'average of query times is : %s', np.mean(query_list))
adv_preds = []
for ite_data in adv_data:
pred_logits_adv = model.predict(ite_data)
# rescale predict confidences into (0, 1).
pred_logits_adv = softmax(pred_logits_adv, axis=1)
adv_preds.extend(pred_logits_adv)
accuracy_adv = np.mean(np.equal(np.max(adv_preds, axis=1), true_labels))
LOGGER.info(TAG, "prediction accuracy after attacking is : %g",
accuracy_adv)
test_labels_onehot = np.eye(10)[true_labels]
attack_evaluate = AttackEvaluate(np.concatenate(test_images),
test_labels_onehot, adv_data,
adv_preds, targeted=is_target,
target_label=targeted_labels)
LOGGER.info(TAG, 'mis-classification rate of adversaries is : %s',
attack_evaluate.mis_classification_rate())
LOGGER.info(TAG, 'The average confidence of adversarial class is : %s',
attack_evaluate.avg_conf_adv_class())
LOGGER.info(TAG, 'The average confidence of true class is : %s',
attack_evaluate.avg_conf_true_class())
LOGGER.info(TAG, 'The average distance (l0, l2, linf) between original '
'samples and adversarial samples are: %s',
attack_evaluate.avg_lp_distance())
def test_pointwise_attack_on_mnist_cpu():
"""
Salt-and-Pepper-Attack test for CPU device.
"""
context.set_context(mode=context.GRAPH_MODE, device_target="CPU")
# upload trained network
ckpt_name = './trained_ckpt_file/checkpoint_lenet-10_1875.ckpt'
net = LeNet5()
......@@ -134,4 +211,4 @@ def test_pointwise_attack_on_mnist():
if __name__ == '__main__':
test_pointwise_attack_on_mnist()
test_pointwise_attack_on_mnist_cpu()
......@@ -27,12 +27,12 @@ from mindarmour.attacks.black.pso_attack import PSOAttack
from mindarmour.evaluations.attack_evaluation import AttackEvaluate
from mindarmour.utils.logger import LogUtil
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
sys.path.append("..")
from data_processing import generate_mnist_dataset
LOGGER = LogUtil.get_instance()
LOGGER.set_level('INFO')
TAG = 'PSO_Attack'
......@@ -58,6 +58,80 @@ def test_pso_attack_on_mnist():
"""
PSO-Attack test
"""
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
# upload trained network
ckpt_name = './trained_ckpt_file/checkpoint_lenet-10_1875.ckpt'
net = LeNet5()
load_dict = load_checkpoint(ckpt_name)
load_param_into_net(net, load_dict)
# get test data
data_list = "./MNIST_unzip/test"
batch_size = 32
ds = generate_mnist_dataset(data_list, batch_size=batch_size)
# prediction accuracy before attack
model = ModelToBeAttacked(net)
batch_num = 3 # the number of batches of attacking samples
test_images = []
test_labels = []
predict_labels = []
i = 0
for data in ds.create_tuple_iterator():
i += 1
images = data[0].astype(np.float32)
labels = data[1]
test_images.append(images)
test_labels.append(labels)
pred_labels = np.argmax(model.predict(images), axis=1)
predict_labels.append(pred_labels)
if i >= batch_num:
break
predict_labels = np.concatenate(predict_labels)
true_labels = np.concatenate(test_labels)
accuracy = np.mean(np.equal(predict_labels, true_labels))
LOGGER.info(TAG, "prediction accuracy before attacking is : %s", accuracy)
# attacking
attack = PSOAttack(model, bounds=(0.0, 1.0), pm=0.5, sparse=True)
start_time = time.clock()
success_list, adv_data, query_list = attack.generate(
np.concatenate(test_images), np.concatenate(test_labels))
stop_time = time.clock()
LOGGER.info(TAG, 'success_list: %s', success_list)
LOGGER.info(TAG, 'average of query times is : %s', np.mean(query_list))
pred_logits_adv = model.predict(adv_data)
# rescale predict confidences into (0, 1).
pred_logits_adv = softmax(pred_logits_adv, axis=1)
pred_labels_adv = np.argmax(pred_logits_adv, axis=1)
accuracy_adv = np.mean(np.equal(pred_labels_adv, true_labels))
LOGGER.info(TAG, "prediction accuracy after attacking is : %s",
accuracy_adv)
test_labels_onehot = np.eye(10)[np.concatenate(test_labels)]
attack_evaluate = AttackEvaluate(np.concatenate(test_images),
test_labels_onehot, adv_data,
pred_logits_adv)
LOGGER.info(TAG, 'mis-classification rate of adversaries is : %s',
attack_evaluate.mis_classification_rate())
LOGGER.info(TAG, 'The average confidence of adversarial class is : %s',
attack_evaluate.avg_conf_adv_class())
LOGGER.info(TAG, 'The average confidence of true class is : %s',
attack_evaluate.avg_conf_true_class())
LOGGER.info(TAG, 'The average distance (l0, l2, linf) between original '
'samples and adversarial samples are: %s',
attack_evaluate.avg_lp_distance())
LOGGER.info(TAG, 'The average structural similarity between original '
'samples and adversarial samples are: %s',
attack_evaluate.avg_ssim())
LOGGER.info(TAG, 'The average costing time is %s',
(stop_time - start_time)/(batch_num*batch_size))
def test_pso_attack_on_mnist_cpu():
"""
PSO-Attack test for CPU device.
"""
context.set_context(mode=context.GRAPH_MODE, device_target="CPU")
# upload trained network
ckpt_name = './trained_ckpt_file/checkpoint_lenet-10_1875.ckpt'
net = LeNet5()
......@@ -127,4 +201,4 @@ def test_pso_attack_on_mnist():
if __name__ == '__main__':
test_pso_attack_on_mnist()
test_pso_attack_on_mnist_cpu()
......@@ -26,8 +26,6 @@ from mindarmour.attacks.black.salt_and_pepper_attack import SaltAndPepperNoiseAt
from mindarmour.evaluations.attack_evaluation import AttackEvaluate
from mindarmour.utils.logger import LogUtil
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
sys.path.append("..")
from data_processing import generate_mnist_dataset
......@@ -60,6 +58,89 @@ def test_salt_and_pepper_attack_on_mnist():
"""
Salt-and-Pepper-Attack test
"""
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
# upload trained network
ckpt_name = './trained_ckpt_file/checkpoint_lenet-10_1875.ckpt'
net = LeNet5()
load_dict = load_checkpoint(ckpt_name)
load_param_into_net(net, load_dict)
# get test data
data_list = "./MNIST_unzip/test"
batch_size = 32
ds = generate_mnist_dataset(data_list, batch_size=batch_size)
# prediction accuracy before attack
model = ModelToBeAttacked(net)
batch_num = 3 # the number of batches of attacking samples
test_images = []
test_labels = []
predict_labels = []
i = 0
for data in ds.create_tuple_iterator():
i += 1
images = data[0].astype(np.float32)
labels = data[1]
test_images.append(images)
test_labels.append(labels)
pred_labels = np.argmax(model.predict(images), axis=1)
predict_labels.append(pred_labels)
if i >= batch_num:
break
LOGGER.debug(TAG, 'model input image shape is: {}'.format(np.array(test_images).shape))
predict_labels = np.concatenate(predict_labels)
true_labels = np.concatenate(test_labels)
accuracy = np.mean(np.equal(predict_labels, true_labels))
LOGGER.info(TAG, "prediction accuracy before attacking is : %g", accuracy)
# attacking
is_target = False
attack = SaltAndPepperNoiseAttack(model=model,
is_targeted=is_target,
sparse=True)
if is_target:
targeted_labels = np.random.randint(0, 10, size=len(true_labels))
for i, true_l in enumerate(true_labels):
if targeted_labels[i] == true_l:
targeted_labels[i] = (targeted_labels[i] + 1) % 10
else:
targeted_labels = true_labels
LOGGER.debug(TAG, 'input shape is: {}'.format(np.concatenate(test_images).shape))
success_list, adv_data, query_list = attack.generate(
np.concatenate(test_images), targeted_labels)
success_list = np.arange(success_list.shape[0])[success_list]
LOGGER.info(TAG, 'success_list: %s', success_list)
LOGGER.info(TAG, 'average of query times is : %s', np.mean(query_list))
adv_preds = []
for ite_data in adv_data:
pred_logits_adv = model.predict(ite_data)
# rescale predict confidences into (0, 1).
pred_logits_adv = softmax(pred_logits_adv, axis=1)
adv_preds.extend(pred_logits_adv)
accuracy_adv = np.mean(np.equal(np.max(adv_preds, axis=1), true_labels))
LOGGER.info(TAG, "prediction accuracy after attacking is : %g",
accuracy_adv)
test_labels_onehot = np.eye(10)[true_labels]
attack_evaluate = AttackEvaluate(np.concatenate(test_images),
test_labels_onehot, adv_data,
adv_preds, targeted=is_target,
target_label=targeted_labels)
LOGGER.info(TAG, 'mis-classification rate of adversaries is : %s',
attack_evaluate.mis_classification_rate())
LOGGER.info(TAG, 'The average confidence of adversarial class is : %s',
attack_evaluate.avg_conf_adv_class())
LOGGER.info(TAG, 'The average confidence of true class is : %s',
attack_evaluate.avg_conf_true_class())
LOGGER.info(TAG, 'The average distance (l0, l2, linf) between original '
'samples and adversarial samples are: %s',
attack_evaluate.avg_lp_distance())
def test_salt_and_pepper_attack_on_mnist_cpu():
"""
Salt-and-Pepper-Attack test for CPU device.
"""
context.set_context(mode=context.GRAPH_MODE, device_target="CPU")
# upload trained network
ckpt_name = './trained_ckpt_file/checkpoint_lenet-10_1875.ckpt'
net = LeNet5()
......@@ -138,4 +219,4 @@ def test_salt_and_pepper_attack_on_mnist():
if __name__ == '__main__':
test_salt_and_pepper_attack_on_mnist()
test_salt_and_pepper_attack_on_mnist_cpu()
......@@ -31,7 +31,6 @@ from mindarmour.utils.logger import LogUtil
sys.path.append("..")
from data_processing import generate_mnist_dataset
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
LOGGER = LogUtil.get_instance()
TAG = 'Nad_Example'
......@@ -46,6 +45,7 @@ def test_nad_method():
"""
NAD-Defense test.
"""
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
# 1. load trained network
ckpt_name = './trained_ckpt_file/checkpoint_lenet-10_1875.ckpt'
net = LeNet5()
......@@ -136,6 +136,100 @@ def test_nad_method():
np.mean(acc_list))
def test_nad_method_cpu():
"""
NAD-Defense test for CPU device.
"""
context.set_context(mode=context.GRAPH_MODE, device_target="CPU")
# 1. load trained network
ckpt_name = './trained_ckpt_file/checkpoint_lenet-10_1875.ckpt'
net = LeNet5()
load_dict = load_checkpoint(ckpt_name)
load_param_into_net(net, load_dict)
loss = SoftmaxCrossEntropyWithLogits(is_grad=False, sparse=True)
opt = nn.Momentum(net.trainable_params(), 0.01, 0.09)
nad = NaturalAdversarialDefense(net, loss_fn=loss, optimizer=opt,
bounds=(0.0, 1.0), eps=0.3)
# 2. get test data
data_list = "./MNIST_unzip/test"
batch_size = 32
ds_test = generate_mnist_dataset(data_list, batch_size=batch_size)
inputs = []
labels = []
for data in ds_test.create_tuple_iterator():
inputs.append(data[0].astype(np.float32))
labels.append(data[1])
inputs = np.concatenate(inputs)
labels = np.concatenate(labels)
# 3. get accuracy of test data on original model
net.set_train(False)
acc_list = []
batchs = inputs.shape[0] // batch_size
for i in range(batchs):
batch_inputs = inputs[i*batch_size : (i + 1)*batch_size]
batch_labels = labels[i*batch_size : (i + 1)*batch_size]
logits = net(Tensor(batch_inputs)).asnumpy()
label_pred = np.argmax(logits, axis=1)
acc_list.append(np.mean(batch_labels == label_pred))
LOGGER.debug(TAG, 'accuracy of TEST data on original model is : %s',
np.mean(acc_list))
# 4. get adv of test data
attack = FastGradientSignMethod(net, eps=0.3, loss_fn=loss)
adv_data = attack.batch_generate(inputs, labels)
LOGGER.debug(TAG, 'adv_data.shape is : %s', adv_data.shape)
# 5. get accuracy of adv data on original model
net.set_train(False)
acc_list = []
batchs = adv_data.shape[0] // batch_size
for i in range(batchs):
batch_inputs = adv_data[i*batch_size : (i + 1)*batch_size]
batch_labels = labels[i*batch_size : (i + 1)*batch_size]
logits = net(Tensor(batch_inputs)).asnumpy()
label_pred = np.argmax(logits, axis=1)
acc_list.append(np.mean(batch_labels == label_pred))
LOGGER.debug(TAG, 'accuracy of adv data on original model is : %s',
np.mean(acc_list))
# 6. defense
net.set_train()
nad.batch_defense(inputs, labels, batch_size=32, epochs=10)
# 7. get accuracy of test data on defensed model
net.set_train(False)
acc_list = []
batchs = inputs.shape[0] // batch_size
for i in range(batchs):
batch_inputs = inputs[i*batch_size : (i + 1)*batch_size]
batch_labels = labels[i*batch_size : (i + 1)*batch_size]
logits = net(Tensor(batch_inputs)).asnumpy()
label_pred = np.argmax(logits, axis=1)
acc_list.append(np.mean(batch_labels == label_pred))
LOGGER.debug(TAG, 'accuracy of TEST data on defensed model is : %s',
np.mean(acc_list))
# 8. get accuracy of adv data on defensed model
acc_list = []
batchs = adv_data.shape[0] // batch_size
for i in range(batchs):
batch_inputs = adv_data[i*batch_size : (i + 1)*batch_size]
batch_labels = labels[i*batch_size : (i + 1)*batch_size]
logits = net(Tensor(batch_inputs)).asnumpy()
label_pred = np.argmax(logits, axis=1)
acc_list.append(np.mean(batch_labels == label_pred))
LOGGER.debug(TAG, 'accuracy of adv data on defensed model is : %s',
np.mean(acc_list))
if __name__ == '__main__':
LOGGER.set_level(logging.DEBUG)
test_nad_method()
test_nad_method_cpu()
......@@ -46,7 +46,8 @@ class NaturalAdversarialDefense(AdversarialDefenseWithAttacks):
attack = FastGradientSignMethod(network,
eps=eps,
alpha=None,
bounds=bounds)
bounds=bounds,
loss_fn=loss_fn)
super(NaturalAdversarialDefense, self).__init__(
network,
[attack],
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册