# Copyright 2019 Huawei Technologies Co., Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Hop-skip-jump attack. """ import numpy as np from mindarmour.attacks.attack import Attack from mindarmour.utils.logger import LogUtil from mindarmour.attacks.black.black_model import BlackModel from mindarmour.utils._check_param import check_pair_numpy_param, check_model, \ check_numpy_param, check_int_positive, check_value_positive, \ check_value_non_negative, check_param_type LOGGER = LogUtil.get_instance() TAG = 'HopSkipJumpAttack' def _clip_image(image, clip_min, clip_max): """ Clip an image, or an image batch, with upper and lower threshold. """ return np.clip(image, clip_min, clip_max) class HopSkipJumpAttack(Attack): """ HopSkipJumpAttack proposed by Chen, Jordan and Wainwright is a decision-based attack. The attack requires access to output labels of target model. References: `Chen J, Michael I. Jordan, Martin J. Wainwright. HopSkipJumpAttack: A Query-Efficient Decision-Based Attack. 2019. arXiv:1904.02144 `_ Args: model (BlackModel): Target model. init_num_evals (int): The initial number of evaluations for gradient estimation. Default: 100. max_num_evals (int): The maximum number of evaluations for gradient estimation. Default: 1000. stepsize_search (str): Indicating how to search for stepsize; Possible values are 'geometric_progression', 'grid_search', 'geometric_progression'. num_iterations (int): The number of iterations. Default: 64. gamma (float): Used to set binary search threshold theta. Default: 1.0. For l2 attack the binary search threshold `theta` is: math:`gamma / d^{3/2}`. For linf attack is math:`gamma / d^2`. constraint (str): The norm distance to optimize. Possible values are 'l2', 'linf'. Default: l2. batch_size (int): Batch size. Default: 32. clip_min (float, optional): The minimum image component value. Default: 0. clip_max (float, optional): The maximum image component value. Default: 1. sparse (bool): If True, input labels are sparse-encoded. If False, input labels are one-hot-encoded. Default: True. Raises: ValueError: If stepsize_search not in ['geometric_progression', 'grid_search'] ValueError: If constraint not in ['l2', 'linf'] Examples: >>> x_test = np.asarray(np.random.random((sample_num, >>> sample_length)), np.float32) >>> y_test = np.random.randint(0, class_num, size=sample_num) >>> instance = HopSkipJumpAttack(user_model) >>> adv_x = instance.generate(x_test, y_test) """ def __init__(self, model, init_num_evals=100, max_num_evals=1000, stepsize_search='geometric_progression', num_iterations=20, gamma=1.0, constraint='l2', batch_size=32, clip_min=0.0, clip_max=1.0, sparse=True): super(HopSkipJumpAttack, self).__init__() self._model = check_model('model', model, BlackModel) self._init_num_evals = check_int_positive('initial_num_evals', init_num_evals) self._max_num_evals = check_int_positive('max_num_evals', max_num_evals) self._batch_size = check_int_positive('batch_size', batch_size) self._clip_min = check_value_non_negative('clip_min', clip_min) self._clip_max = check_value_non_negative('clip_max', clip_max) self._sparse = check_param_type('sparse', sparse, bool) self._np_dtype = np.dtype('float32') if stepsize_search in ['geometric_progression', 'grid_search']: self._stepsize_search = stepsize_search else: msg = "stepsize_search must be in ['geometric_progression'," \ " 'grid_search'], but got {}".format(stepsize_search) LOGGER.error(TAG, msg) raise ValueError(msg) self._num_iterations = check_int_positive('num_iterations', num_iterations) self._gamma = check_value_positive('gamma', gamma) if constraint in ['l2', 'linf']: self._constraint = constraint else: msg = "constraint must be in ['l2', 'linf'], " \ "but got {}".format(constraint) LOGGER.error(TAG, msg) raise ValueError(msg) self.queries = 0 self.is_adv = True self.y_targets = None self.image_targets = None self.y_target = None self.image_target = None def _generate_one(self, sample): """ Return a tensor that constructs adversarial examples for the given input. Args: sample (Tensor): Input samples. Returns: Tensor, generated adversarial examples. """ shape = list(np.shape(sample)) dim = int(np.prod(shape)) # Set binary search threshold. if self._constraint == 'l2': theta = self._gamma / (np.sqrt(dim)*dim) else: theta = self._gamma / (dim*dim) wrap = self._hsja(sample, self.y_target, self.image_target, dim, theta) if wrap is None: self.is_adv = False else: self.is_adv = True return self.is_adv, wrap, self.queries def set_target_images(self, target_images): """ Setting target images for target attack. Args: target_images (numpy.ndarray): Target images. """ self.image_targets = check_numpy_param('target_images', target_images) def generate(self, inputs, labels): """ Generate adversarial images in a for loop. Args: inputs (numpy.ndarray): Origin images. labels (numpy.ndarray): Target labels. Returns: - numpy.ndarray, bool values for each attack result. - numpy.ndarray, generated adversarial examples. - numpy.ndarray, query times for each sample. Examples: >>> generate([[0.1,0.2,0.2],[0.2,0.3,0.4]],[2,6]) """ if labels is not None: inputs, labels = check_pair_numpy_param('inputs', inputs, 'labels', labels) if not self._sparse: labels = np.argmax(labels, axis=1) x_adv = [] is_advs = [] queries_times = [] if labels is not None: self.y_targets = labels for i, x_single in enumerate(inputs): self.queries = 0 if self.image_targets is not None: self.image_target = self.image_targets[i] if self.y_targets is not None: self.y_target = self.y_targets[i] is_adv, adv_img, query_time = self._generate_one(x_single) x_adv.append(adv_img) is_advs.append(is_adv) queries_times.append(query_time) return np.asarray(is_advs), \ np.asarray(x_adv), \ np.asarray(queries_times) def _hsja(self, sample, target_label, target_image, dim, theta): """ The main algorithm for HopSkipJumpAttack. Args: sample (numpy.ndarray): Input image. Without the batchsize dimension. target_label (int): Integer for targeted attack, None for nontargeted attack. Without the batchsize dimension. target_image (numpy.ndarray): An array with the same size as input sample, or None. Without the batchsize dimension. Returns: numpy.ndarray, perturbed images. """ original_label = None # Original label for untargeted attack. if target_label is None: original_label = self._model.predict(sample) original_label = np.argmax(original_label) # Initialize perturbed image. # untarget attack if target_image is None: perturbed = self._initialize(sample, original_label, target_label) if perturbed is None: msg = 'Can not find an initial adversarial example' LOGGER.info(TAG, msg) return perturbed else: # Target attack perturbed = target_image # Project the initial perturbed image to the decision boundary. perturbed, dist_post_update = self._binary_search_batch(sample, np.expand_dims(perturbed, 0), original_label, target_label, theta) # Calculate the distance of perturbed image and original sample dist = self._compute_distance(perturbed, sample) for j in np.arange(self._num_iterations): current_iteration = j + 1 # Select delta. delta = self._select_delta(dist_post_update, current_iteration, dim, theta) # Choose number of evaluations. num_evals = int(min([self._init_num_evals*np.sqrt(j + 1), self._max_num_evals])) # approximate gradient. gradf = self._approximate_gradient(perturbed, num_evals, original_label, target_label, delta, theta) if self._constraint == 'linf': update = np.sign(gradf) else: update = gradf # search step size. if self._stepsize_search == 'geometric_progression': # find step size. epsilon = self._geometric_progression_for_stepsize( perturbed, update, dist, current_iteration, original_label, target_label) # Update the sample. perturbed = _clip_image(perturbed + epsilon*update, self._clip_min, self._clip_max) # Binary search to return to the boundary. perturbed, dist_post_update = self._binary_search_batch( sample, perturbed[None], original_label, target_label, theta) elif self._stepsize_search == 'grid_search': epsilons = np.logspace(-4, 0, num=20, endpoint=True)*dist epsilons_shape = [20] + len(np.shape(sample))*[1] perturbeds = perturbed + epsilons.reshape( epsilons_shape)*update perturbeds = _clip_image(perturbeds, self._clip_min, self._clip_max) idx_perturbed = self._decision_function(perturbeds, original_label, target_label) if np.sum(idx_perturbed) > 0: # Select the perturbation that yields the minimum distance # after binary search. perturbed, dist_post_update = self._binary_search_batch( sample, perturbeds[idx_perturbed], original_label, target_label, theta) # compute new distance. dist = self._compute_distance(perturbed, sample) LOGGER.debug(TAG, 'iteration: %d, %s distance %4f', j + 1, self._constraint, dist) perturbed = np.expand_dims(perturbed, 0) return perturbed def _decision_function(self, images, original_label, target_label): """ Decision function returns 1 if the input sample is on the desired side of the boundary, and 0 otherwise. """ images = _clip_image(images, self._clip_min, self._clip_max) prob = [] self.queries += len(images) for i in range(0, len(images), self._batch_size): batch = images[i:i + self._batch_size] length = len(batch) prob_i = self._model.predict(batch)[:length] prob.append(prob_i) prob = np.concatenate(prob) if target_label is None: res = np.argmax(prob, axis=1) != original_label else: res = np.argmax(prob, axis=1) == target_label return res def _compute_distance(self, original_img, perturbation_img): """ Compute the distance between original image and perturbation images. """ if self._constraint == 'l2': distance = np.linalg.norm(original_img - perturbation_img) else: distance = np.max(abs(original_img - perturbation_img)) return distance def _approximate_gradient(self, sample, num_evals, original_label, target_label, delta, theta): """ Gradient direction estimation. """ # Generate random noise based on constraint. noise_shape = [num_evals] + list(np.shape(sample)) if self._constraint == 'l2': random_noise = np.random.randn(*noise_shape) else: random_noise = np.random.uniform(low=-1, high=1, size=noise_shape) axis = tuple(range(1, 1 + len(np.shape(sample)))) random_noise = random_noise / np.sqrt( np.sum(random_noise**2, axis=axis, keepdims=True)) # perturbed images perturbed = sample + delta*random_noise perturbed = _clip_image(perturbed, self._clip_min, self._clip_max) random_noise = (perturbed - sample) / theta # Whether the perturbed images are on the desired side of the boundary. decisions = self._decision_function(perturbed, original_label, target_label) decision_shape = [len(decisions)] + [1]*len(np.shape(sample)) # transform decisions value from 1, 0 to 1, -2 re_decision = 2*np.array(decisions).astype(self._np_dtype).reshape( decision_shape) - 1.0 if np.mean(re_decision) == 1.0: grad_direction = np.mean(random_noise, axis=0) elif np.mean(re_decision) == -1.0: grad_direction = - np.mean(random_noise, axis=0) else: re_decision = re_decision - np.mean(re_decision) grad_direction = np.mean(re_decision*random_noise, axis=0) # The gradient direction. grad_direction = grad_direction / (np.linalg.norm(grad_direction) + 1e-10) return grad_direction def _project(self, original_image, perturbed_images, alphas): """ Projection input samples onto given l2 or linf balls. """ alphas_shape = [len(alphas)] + [1]*len(np.shape(original_image)) alphas = alphas.reshape(alphas_shape) if self._constraint == 'l2': projected = (1 - alphas)*original_image + alphas*perturbed_images else: projected = _clip_image(perturbed_images, original_image - alphas, original_image + alphas) return projected def _binary_search_batch(self, original_image, perturbed_images, original_label, target_label, theta): """ Binary search to approach the model decision boundary. """ # Compute distance between perturbed image and original image. dists_post_update = np.array([self._compute_distance(original_image, perturbed_image,) for perturbed_image in perturbed_images]) # Get higher thresholds if self._constraint == 'l2': highs = np.ones(len(perturbed_images)) thresholds = theta else: highs = dists_post_update thresholds = np.minimum(dists_post_update*theta, theta) # Get lower thresholds lows = np.zeros(len(perturbed_images)) # Update thresholds. while np.max((highs - lows) / thresholds) > 1: mids = (highs + lows) / 2.0 mid_images = self._project(original_image, perturbed_images, mids) decisions = self._decision_function(mid_images, original_label, target_label) lows = np.where(decisions == [0], mids, lows) highs = np.where(decisions == [1], mids, highs) out_images = self._project(original_image, perturbed_images, highs) # Select the best choice based on the distance of the output image. dists = np.array( [self._compute_distance(original_image, out_image) for out_image in out_images]) idx = np.argmin(dists) dist = dists_post_update[idx] out_image = out_images[idx] return out_image, dist def _initialize(self, sample, original_label, target_label): """ Implementation of BlendedUniformNoiseAttack """ num_evals = 0 while True: random_noise = np.random.uniform(self._clip_min, self._clip_max, size=np.shape(sample)) success = self._decision_function(random_noise[None], original_label, target_label) if success: break num_evals += 1 if num_evals > 1e3: return None # Binary search. low = 0.0 high = 1.0 while high - low > 0.001: mid = (high + low) / 2.0 blended = (1 - mid)*sample + mid*random_noise success = self._decision_function(blended[None], original_label, target_label) if success: high = mid else: low = mid initialization = (1 - high)*sample + high*random_noise return initialization def _geometric_progression_for_stepsize(self, perturbed, update, dist, current_iteration, original_label, target_label): """ Search for stepsize in the way of Geometric progression. Keep decreasing stepsize by half until reaching the desired side of the decision boundary. """ epsilon = dist / np.sqrt(current_iteration) while True: updated = perturbed + epsilon*update success = self._decision_function(updated, original_label, target_label) if success: break epsilon = epsilon / 2.0 return epsilon def _select_delta(self, dist_post_update, current_iteration, dim, theta): """ Choose the delta based on the distance between the input sample and the perturbed sample. """ if current_iteration == 1: delta = 0.1*(self._clip_max - self._clip_min) else: if self._constraint == 'l2': delta = np.sqrt(dim)*theta*dist_post_update else: delta = dim*theta*dist_post_update return delta