diff --git a/examples/coverage/measure_coverage.py b/examples/coverage/measure_coverage.py new file mode 100644 index 00000000..e2b3171c --- /dev/null +++ b/examples/coverage/measure_coverage.py @@ -0,0 +1,239 @@ +""" + +python measure_coverage.py --split test --ratio 1.0 --coverage attention --length 128 --prespecify-limits --attack textfooler --test-ratio 1000 --save-dir ./coverage_wp/ --seed 1 --dataset sst2 + + +""" + +import argparse +from math import floor +import os +import pickle +import random + +import numpy as np +import torch +from transformers import AutoModelForSequenceClassification + +import textattack +from textattack.attack_recipes import ( + BAEGarg2019, + DeepWordBugGao2018, + FasterGeneticAlgorithmJia2019, + HotFlipEbrahimi2017, + TextFoolerJin2019, +) +from textattack.attack_results import SuccessfulAttackResult +from textattack.coverage import neuronMultiSectionCoverage +from textattack.datasets import HuggingFaceDataset +from textattack.models.tokenizers import AutoTokenizer +from textattack.models.wrappers import HuggingFaceModelWrapper, ModelWrapper + + +def random_seed(seed): + torch.manual_seed(seed) + np.random.seed(seed) + random.seed(seed) + return + + +random_seed(1) +parser = argparse.ArgumentParser( + description="Measure Coverage of pretrained NLP Models" +) +parser.add_argument("--seed", type=int, default=1, help="set random seed") +parser.add_argument("--length", type=int, default=128, help="set max seq length") +parser.add_argument("--bins", type=int, default=10, help="set number of bins/sections") +parser.add_argument( + "--ratio", + type=float, + default=1, + help="proportion of train set used for dataset sampling", +) +parser.add_argument( + "--test-ratio", + type=int, + default=1.0, + help="proportion of train set used for dataset sampling", +) +parser.add_argument( + "--dataset", type=str, default="imdb", help="dataset to use for measuring coverage" +) +parser.add_argument( + "--save-dir", + type=str, + default="./coverage/", + help="dataset to use for measuring coverage", +) +parser.add_argument( + "--model", + type=str, + default="bert-base-uncased", + help="model f whose weights to use", +) +parser.add_argument("--coverage", type=str, default="attention", help="coverage type") +parser.add_argument("--attack", type=str, default="none", help="attack type") +parser.add_argument( + "--split", type=str, default="test", help="split to use for measuring coverage" +) +parser.add_argument("--base-only", action="store_true", help="loading only base model") +parser.add_argument("--prespecify-limits", action="store_true", help="prespecify") +args = parser.parse_args() +random_seed(args.seed) + + +if not args.base_only: + if args.dataset == "sst2": + test_model = "textattack/" + str(args.model) + "-" + "SST-2" + elif args.dataset == "rotten-tomatoes": + test_model = "textattack/" + str(args.model) + "-" + "rotten_tomatoes" + else: + test_model = "textattack/" + str(args.model) + "-" + str(args.dataset) +else: + test_model = args.model +text_key = "text" +# test_model="textattack/bert-base-uncased-ag-news", +if args.dataset == "sst2": + text_key = "sentence" + trainset = HuggingFaceDataset("glue", "sst2", "train", shuffle=True) + testset = HuggingFaceDataset("glue", "sst2", args.split, shuffle=True) +elif args.dataset == "rotten-tomatoes": + trainset = HuggingFaceDataset("rotten_tomatoes", None, "train", shuffle=True) + testset = HuggingFaceDataset("rotten_tomatoes", None, args.split, shuffle=True) +else: + trainset = HuggingFaceDataset(args.dataset, None, "train", shuffle=True) + testset = HuggingFaceDataset(args.dataset, None, args.split, shuffle=True) + + +if args.ratio <= 1.0: + trainset = trainset[0 : floor(args.ratio * len(trainset))] +else: + trainset = trainset[0 : floor(args.ratio)] + + +trainset_str = [] +for example in trainset: + trainset_str.append(example[0][text_key]) + +if args.test_ratio <= 1.0: + testset = testset[0 : floor(args.test_ratio * len(testset))] +else: + testset = testset[0 : floor(args.test_ratio)] + +testset_str = [] +for example in testset: + testset_str.append(example[0][text_key]) + + +args.save_dir += "COVER_" + args.coverage + "/" +os.makedirs(args.save_dir, exist_ok=True) +args.save_dir += "SEED_" + str(args.seed) + "_BINS_" + str(args.bins) + "/" +os.makedirs(args.save_dir, exist_ok=True) +args.save_dir += ( + "data_" + + str(args.dataset) + + "_model_" + + str(args.model) + + "_ratio_" + + str(args.ratio) + + "_test_ratio_" + + str(args.test_ratio) + + "_L_" + + str(args.length) + + "_B_" + + str(args.base_only) + + "/" +) + + +os.makedirs(args.save_dir, exist_ok=True) +args.save_dir += "Attack_" + args.attack + "_limits_" + str(args.prespecify_limits) +os.makedirs(args.save_dir, exist_ok=True) + +# make coverage object +coverage = neuronMultiSectionCoverage( + test_model=test_model, + max_seq_len=args.length, + k_m=args.bins, + coverage=(args.coverage), + pre_limits=(not (args.coverage == "word") and args.prespecify_limits), +) +print("initializing from training data") +coverage.initialize_from_training_dataset(trainset_str) + +print("--" * 50) +print("generating test set!") +print("--" * 50) +num_successes = 0.0 +total = 1.0 +if args.attack != "none": + original_model = AutoModelForSequenceClassification.from_pretrained(test_model) + original_tokenizer = AutoTokenizer(test_model) + model = HuggingFaceModelWrapper(original_model, original_tokenizer) + if args.attack == "textfooler": + attack = TextFoolerJin2019.build(model) + elif args.attack == "alzantot": + attack = FasterGeneticAlgorithmJia2019.build(model) + elif args.attack == "bae": + attack = BAEGarg2019.build(model) + elif args.attack == "deepwordbug": + attack = DeepWordBugGao2018.build(model) + elif args.attack == "hotflip": + attack = HotFlipEbrahimi2017.build(model) + else: + print("This Attack has not been added!") + raise NotImplementedError + results_iterable = attack.attack_dataset(testset, indices=None) + # save the results too + results_iterable = [result for result in results_iterable] + total = len(results_iterable) + pickle.dump( + results_iterable, open(os.path.join(args.save_dir, "attack_results"), "wb") + ) + for n, result in enumerate(results_iterable): + print("---original: \n", result.original_text()) + print("---perturbed: \n", result.perturbed_text()) + testset_str.append(result.perturbed_text()) + if isinstance(result, SuccessfulAttackResult): + num_successes += 1 + + +print("=+" * 20) +print("successes: ", num_successes, "total: ", total) +print("rate: ", num_successes / total) +print("--" * 50) +print("length of generated test set: ", len(testset_str)) +print("--" * 50) + + +word_coverage = coverage(testset_str) + + +print("the coverage: ", word_coverage) + +results_file = open(os.path.join(args.save_dir, "stats.txt"), "w") +results_file.write( + "dataset, model, ratio, length, attack, limits, coverage, num_examples, num_test_examples, seed, split, coverage, num_successes, total\n" +) +results_file.write( + ",".join( + [ + args.dataset, + test_model, + str(args.ratio), + str(args.test_ratio), + str(args.length), + args.attack, + str(args.prespecify_limits), + str(args.coverage), + str(len(trainset_str)), + str(len(testset_str)), + str(args.seed), + args.split, + str(word_coverage), + str(num_successes), + str(total) + "\n", + ] + ) +) +results_file.close() diff --git a/textattack/__init__.py b/textattack/__init__.py index 30629406..b0e3f3c0 100644 --- a/textattack/__init__.py +++ b/textattack/__init__.py @@ -18,6 +18,7 @@ from .attacker import Attacker from .trainer import Trainer from .metrics import Metric +from .coverage import Coverage from . import ( attack_recipes, @@ -30,6 +31,7 @@ goal_functions, loggers, metrics, + coverage, models, search_methods, shared, diff --git a/textattack/coverage/__init__.py b/textattack/coverage/__init__.py new file mode 100644 index 00000000..63736137 --- /dev/null +++ b/textattack/coverage/__init__.py @@ -0,0 +1,4 @@ +from .coverage import Coverage, IntrinsicCoverage, ExtrinsicCoverage +from .perplexity_coverage import PerplexityCoverage +from .neuron_coverage import neuronCoverage +from .kmn_coverage import neuronMultiSectionCoverage diff --git a/textattack/coverage/coverage.py b/textattack/coverage/coverage.py new file mode 100644 index 00000000..125f2f94 --- /dev/null +++ b/textattack/coverage/coverage.py @@ -0,0 +1,30 @@ +from abc import ABC, abstractmethod + + +class Coverage(ABC): + """``Coverage`` class measures how well a given test dataset tests the + given model. + + This is an abstract base class for other ``Coverage`` classes. + """ + + +class ExtrinsicCoverage(Coverage): + """Represents coverage methods that do not access the model that is subject + of testing to measure the quality of test set.""" + + @abstractmethod + def __call__(self, testset): + raise NotImplementedError() + + +class IntrinsicCoverage(Coverage): + """Represents coverage methods that do access the model that is subject of + testing to measure the quality of test set.""" + + def __init__(self, model): + self.model = model + + @abstractmethod + def __call__(self, testset): + raise NotImplementedError() diff --git a/textattack/coverage/kmn_coverage.py b/textattack/coverage/kmn_coverage.py new file mode 100644 index 00000000..d3b6fdc8 --- /dev/null +++ b/textattack/coverage/kmn_coverage.py @@ -0,0 +1,612 @@ +from collections import defaultdict +import copy +import itertools +import logging +import time + +import numpy as np +import torch +import torch.nn.functional as F +from tqdm import tqdm +import transformers + +import textattack + +from .coverage import ExtrinsicCoverage + +logging.getLogger("transformers.tokenization_utils_base").setLevel(logging.ERROR) + + +COVERAGE_MODEL_TYPES = ["bert", "albert", "distilbert", "roberta"] + + +class neuronMultiSectionCoverage(ExtrinsicCoverage): + """ + ``neuronMultiSectionCoverage`` measures the neuron coverage acheived by a testset + Args: + test_model(Union[str, torch.nn.Module]): name of the pretrained language model from `transformers` + or the actual test model as a `torch.nn.Module` class. Default is "bert base uncased" from `transformers`. + tokenizer (:obj:``, optional): If `test_model` is not a pretrained model from `transformers, need to provide + the tokenizer here. + max_seq_len (int): Maximum sequence length accepted by the model to be tested. However, if you are using a pretrained model from `transformers`, this is handled + automatically using information from `model.config`. + threshold(float): threshold for marking a neuron as activated + coverage(str): measure type of neuron coverage at the level of layer outputs + """ + + def __init__( + self, + test_model="textattack/bert-base-uncased-ag-news", + tokenizer=None, + max_seq_len=-1, + threshold=0.0, + num_labels=2, + coverage="multisection", + pre_limits=False, + bins_attention=4, + bins_word=4, + min_value=np.inf, + max_value=-np.inf, + bz=128, + word_mask=False, + ): + self.coverage = coverage + + self.word_mask = word_mask + self.pre_limits = pre_limits + self.bins_attention = bins_attention + self.bins_word = bins_word # number of sections for each neuron + self.max_seq_len = 128 + self.model_type = "bert" + + config = transformers.AutoConfig.from_pretrained( + test_model, output_hidden_states=True, num_labels=num_labels + ) + if config.model_type in COVERAGE_MODEL_TYPES: + self.test_model = ( + transformers.AutoModelForSequenceClassification.from_pretrained( + test_model, config=config + ) + ) + self.test_model.tokenizer = transformers.AutoTokenizer.from_pretrained( + test_model + ) + self.model_type = self.test_model.config.model_type + self.max_seq_len = ( + max_seq_len + if max_seq_len != -1 + else self.test_model.config.max_position_embeddings + ) + else: + raise ValueError( + "`neuronCoverage` only accepts models in " + + ",".join(COVERAGE_MODEL_TYPES) + ) + + self.test_model.to(textattack.shared.utils.device) + self.threshold = threshold + self.test_model.eval() + + # initialize min and max for coverage + min_attention_value = min_value + max_attention_value = max_value + if pre_limits: + min_attention_value = 0.0 + max_attention_value = 1.0 + + self.coverage_word_dicts = torch.zeros( + (self.bins_word + 3, 13, self.max_seq_len, 768) + ) + self.coverage_attention_dicts = torch.zeros( + (self.bins_attention + 3, 12, 12, self.max_seq_len, self.max_seq_len) + ) + self.min_word_coverage_tracker = torch.zeros((13, self.max_seq_len, 768)).fill_( + min_value + ) + self.min_attention_coverage_tracker = torch.zeros( + (12, 12, self.max_seq_len, self.max_seq_len) + ).fill_(min_attention_value) + + self.max_word_coverage_tracker = torch.zeros((13, self.max_seq_len, 768)).fill_( + max_value + ) + self.max_attention_coverage_tracker = torch.zeros( + (12, 12, self.max_seq_len, self.max_seq_len) + ).fill_(max_attention_value) + + if "snac" in self.coverage: + self.k_m = 2 + if "nbc" in self.coverage: + self.k_m = 1 + """ + for i in range(self.bins_word): + word_tracker = self._init_word_coverage(fill_value=0.0) + self.coverage_word_dicts.append(word_tracker) + for i in range(self.bins_attention): + attention_tracker = self._init_attention_coverage(fill_value=0.0) + self.coverage_attention_dicts.append(attention_tracker) + """ + + def _init_word_coverage(self, fill_value): + """Initialize `coverage_tracker` dictionary. + + Returns: + `coverage_tracker`(dict): a dictionary with key: neuron and value: (bool) intialized False + """ + coverage_word_tracker = torch.zeros_like(self.coverage_word_dicts) + + """ + coverage_tracker["classifier"] = ( + torch.zeros((len(self.test_model.config.label2id)), requires_grad=False) + .fill_(fill_value) + .to(textattack.shared.utils.device) + .detach() + ) + """ + # embedding is L X H + + """ + coverage_tracker["classifier"] = ( + torch.zeros((len(self.test_model.config.label2id)), requires_grad=False) + .fill_(fill_value) + .to(textattack.shared.utils.device) + .detach() + ) + """ + + return coverage_word_tracker + + def _init_attention_coverage(self, fill_value): + """Initialize `coverage_tracker` dictionary. + + Returns: + `coverage_tracker`(dict): a dictionary with key: neuron and value: (bool) intialized False + """ + # attention neurons + coverage_attention_tracker = torch.zeros_like(self.coverage_attention_dicts) + return coverage_attention_tracker + + def _update_initial_word_coverage( + self, embeddings, word_mask=None, interaction_mask=None + ): + """Update `coverage_tracker` for input `text` for coarse coverage + Args: + `text`(str): text to update neuron coverage of. + + """ + + """ + encodings = self.test_model.tokenizer(text, return_tensors="pt") + if self.max_seq_len > 0: + input_ids = encodings.input_ids[:, : self.max_seq_len] + attention_mask = encodings.attention_mask[:, : self.max_seq_len] + + input_ids = input_ids.to(textattack.shared.utils.device) + attention_mask = attention_mask.to(textattack.shared.utils.device) + outputs = self.test_model(input_ids, attention_mask=attention_mask) + outputs[1][0] + """ + + sentence_length = embeddings[0][0, ...].size(0) + + embeddings = [e.unsqueeze(1) for e in embeddings] + + embeddings = torch.cat(embeddings, dim=1).cpu() + + if self.word_mask: + indices_to_fill = [int(index) for index in range(sentence_length)] + else: + indices_to_fill = [index for index in range(sentence_length)] + # print(embeddings,, self.max_word_coverage_tracker.device) + self.max_word_coverage_tracker[:, indices_to_fill, :] = torch.where( + torch.max(embeddings, dim=0).values.detach() + > self.max_word_coverage_tracker[:, indices_to_fill, :], + torch.max(embeddings, dim=0).values.detach(), + self.max_word_coverage_tracker[:, indices_to_fill, :], + ) + self.min_word_coverage_tracker[:, indices_to_fill, :] = torch.where( + torch.min(embeddings, dim=0).values.detach() + < self.min_word_coverage_tracker[:, indices_to_fill, :], + torch.min(embeddings, dim=0).values.detach(), + self.min_word_coverage_tracker[:, indices_to_fill, :], + ) + + """ + self.max_coverage_tracker["classifier"] = torch.where( + (outputs[0][0, ...].detach()) > self.max_coverage_tracker["classifier"], + outputs[0][0, ...].detach(), + self.max_coverage_tracker["classifier"], + ) + """ + + def _update_initial_attention_coverage(self, all_attentions): + """Update `coverage_tracker` for input `text` for coarse coverage + Args: + `text`(str): text to update neuron coverage of. + + """ + + # all_attentions = list of attentions of size B X H X L X L + + sentence_length = all_attentions[0][0, 0, ...].size(-1) + all_attentions = torch.cat( + [a.unsqueeze(1) for a in all_attentions], dim=1 + ) # B X LA X HD X L X L + all_attentions_max = torch.max(all_attentions, dim=0).values.cpu() + all_attentions_min = torch.min(all_attentions, dim=0).values.cpu() + self.max_attention_coverage_tracker = torch.where( + all_attentions_max > self.max_attention_coverage_tracker, + all_attentions_max, + self.max_attention_coverage_tracker, + ) + self.min_attention_coverage_tracker = torch.where( + all_attentions_min < self.min_attention_coverage_tracker, + all_attentions_min, + self.min_attention_coverage_tracker, + ) + + def _update_initial_coverage( + self, all_hidden_states, all_attentions, word_mask=None + ): + """Update `coverage_tracker` for input `text` + Args: + `text`(str): text to update neuron coverage of. + + """ + + self._update_initial_word_coverage(all_hidden_states, word_mask) + + self._update_initial_attention_coverage(all_attentions) + + def initialize_from_training_dataset(self, trainset, masks=None, bz=1): + """Update coverage from training dataset + `trainset`(list[str]): training dataset coverage statistics + + + """ + mask_no = 0 + + start = 0 + with torch.no_grad(): + for t in tqdm(trainset): + if mask_no + bz >= len(trainset): + end = len(trainset) + else: + end = start + bz + if start >= end or start > len(trainset): + break + # print('current indices : ', trainset[start:end], start, end, len(trainset)) + encodings = self.test_model.tokenizer( + trainset[start:end], + padding="max_length", + truncation=True, + return_tensors="pt", + max_length=self.max_seq_len, + ) + + if self.max_seq_len > 0: + input_ids = encodings.input_ids[:, : self.max_seq_len] + attention_mask = encodings.attention_mask[:, : self.max_seq_len] + + input_ids = input_ids.to(textattack.shared.utils.device) + attention_mask = attention_mask.to(textattack.shared.utils.device) + + outputs = self.test_model( + input_ids, + attention_mask=attention_mask, + output_attentions=True, + output_hidden_states=True, + ) + all_hidden_states, all_attentions = outputs[-2:] + self._update_initial_coverage( + all_hidden_states, all_attentions, masks[start:end] + ) + start = end + + self.training_word_coverage_dicts = copy.deepcopy(self.coverage_word_dicts) + self.training_attention_coverage_dicts = copy.deepcopy( + self.coverage_attention_dicts + ) + + def _eval(self, text): + """Update `coverage_tracker` for input `text` for coarse coverage + Args: + `text`(str): text to update neuron coverage of. + + """ + encodings = self.test_model.tokenizer(text, return_tensors="pt") + if self.max_seq_len > 0: + input_ids = encodings.input_ids[:, : self.max_seq_len] + attention_mask = encodings.attention_mask[:, : self.max_seq_len] + + input_ids = input_ids.to(textattack.shared.utils.device) + attention_mask = attention_mask.to(textattack.shared.utils.device) + outputs = self.test_model(input_ids, attention_mask=attention_mask) + return outputs + + def _update_word_coverage(self, all_hidden_states, word_mask=None): + """Update `coverage_tracker` for input `text` for coarse coverage + Args: + `text`(str): text to update neuron coverage of. + + + + a = time.time() + encodings = self.test_model.tokenizer(text, padding='max_length', truncation=True, return_tensors="pt", max_length = self.max_seq_len) + if self.max_seq_len > 0: + input_ids = encodings.input_ids[:, : self.max_seq_len] + attention_mask = encodings.attention_mask[:, : self.max_seq_len] + + input_ids = input_ids.to(textattack.shared.utils.device) + attention_mask = attention_mask.to(textattack.shared.utils.device) + outputs = self.test_model(input_ids, attention_mask=attention_mask) + b = time.time() + + sentence_length = outputs[1][0][0, ...].size(0) + """ + hidden_vectors = torch.cat([o.unsqueeze(1) for o in all_hidden_states], dim=1) + sentence_length = hidden_vectors.size(2) + # print('size of output hidden bectors: ', hidden_vectors.size()) + if self.word_mask: + indices_to_fill = [index for index in range(sentence_length)] + else: + indices_to_fill = [index for index in range(sentence_length)] + current_coverage_tracker = self._init_word_coverage(fill_value=0) + a = time.time() + section_length = ( + self.max_word_coverage_tracker[:, indices_to_fill, :] + - self.min_word_coverage_tracker[:, indices_to_fill, :] + ) / self.bins_word + section_length = section_length.unsqueeze(0).repeat( + hidden_vectors.size(0), 1, 1, 1 + ) + # print('section length: ', section_length.size()) + section_index = torch.where( + section_length > 0, + ( + torch.floor( + ( + hidden_vectors.cpu().detach() + - self.min_word_coverage_tracker[:, indices_to_fill, :] + ) + / section_length + ) + ), + torch.zeros_like(hidden_vectors.cpu().detach(), requires_grad=False) - 1, + ).long() + # print('section index: ', section_index.size()) + + # section_index = torch.where(section_index, section_index, self.bins_word + 1) + # section_index = torch.where(section_index>0, section_index, torch.zeros_like(section_index) + self.bins_word + 1) + section_index = torch.where( + section_index < self.bins_word, + section_index, + torch.zeros_like(section_index) + self.bins_word + 2, + ) + section_index = torch.where( + section_index > 0, + section_index, + torch.zeros_like(section_index) + self.bins_word + 1, + ) + + # print('section index: ', section_index.size()) + + temp_store_activations = torch.max( + (F.one_hot(section_index, num_classes=self.bins_word + 3)).permute( + 0, 4, 1, 2, 3 + ), + dim=0, + ).values + + # print('Temp Store Activations: ', temp_store_activations.size()) + self.coverage_word_dicts += temp_store_activations + del temp_store_activations + del current_coverage_tracker + + def _update_attention_coverage(self, all_attentions, masks): + """Update `coverage_tracker` for input `text` for coarse coverage + Args: + `text`(str): text to update neuron coverage of. + + + encodings = self.test_model.tokenizer(text, padding='max_length', truncation=True, return_tensors="pt", max_length = self.max_seq_len) + if self.max_seq_len > 0: + input_ids = encodings.input_ids[:, : self.max_seq_len] + attention_mask = encodings.attention_mask[:, : self.max_seq_len] + + input_ids = input_ids.to(textattack.shared.utils.device) + attention_mask = attention_mask.to(textattack.shared.utils.device) + outputs = self.test_model(input_ids, attention_mask=attention_mask, output_attentions=True, output_hidden_states = True) + + all_hidden_states, all_attentions = outputs[-2:] + # all_attentions = list of attentions of size B X H X L X L + + """ + sentence_length = all_attentions[0][0, 0, ...].size(-1) + + all_attentions = torch.cat( + [a.unsqueeze(1) for a in all_attentions], dim=1 + ).cpu()[:, :, 0:sentence_length, 0:sentence_length] + # B X layers X heads X l X l + # print('attentions size: ', all_attentions.size()) + current_coverage_tracker = self._init_attention_coverage(fill_value=0) + + section_length = ( + self.max_attention_coverage_tracker[ + :, :, 0:sentence_length, 0:sentence_length + ] + - self.min_attention_coverage_tracker[ + :, :, 0:sentence_length, 0:sentence_length + ] + ) / self.bins_attention + section_length = section_length.unsqueeze(0).repeat( + all_attentions.size(0), 1, 1, 1, 1 + ) + # print(' section length: ', section_length.size()) + section_index = torch.where( + section_length > 0, + ( + torch.floor( + ( + all_attentions.cpu().detach() + - self.min_attention_coverage_tracker + ) + / section_length + ) + ), + torch.zeros_like(all_attentions.cpu().detach(), requires_grad=False) - 1, + ).long() + + # print('section index: ', section_index.size()) + section_index = torch.where( + section_index < self.bins_attention, + section_index, + torch.zeros_like(section_index) + self.bins_attention + 2, + ) + section_index = torch.where( + section_index > 0, + section_index, + torch.zeros_like(section_index) + self.bins_word + 1, + ) + temp_storage_activations = torch.max( + (F.one_hot(section_index, num_classes=self.bins_attention + 3)).permute( + 0, 5, 1, 2, 3, 4 + ), + dim=0, + ).values + # print(' temp storage activations: ', temp_storage_activations.size()) + self.coverage_attention_dicts += temp_storage_activations + del temp_storage_activations + del current_coverage_tracker + + def _compute_coverage(self): + """Calculate `neuron_coverage` for current model.""" + neuron_word_coverage, neuron_word_coverage_total = 0.0, 0.0 + neuron_attention_coverage, neuron_attention_coverage_total = 0.0, 0.0 + neuron_word_coverage += np.count_nonzero(self.coverage_word_dicts.numpy()) + neuron_word_coverage_total += self.coverage_word_dicts.numel() + + neuron_attention_coverage += np.count_nonzero( + self.coverage_attention_dicts.numpy() + ) + neuron_attention_coverage_total += self.coverage_attention_dicts.numel() + + neuron_coverage = neuron_word_coverage + neuron_attention_coverage + # print('Word and Attention Only: ', neuron_word_coverage , neuron_attention_coverage) + neuron_coverage_total = ( + neuron_word_coverage_total + neuron_attention_coverage_total + ) + # print('Total Word and Attention Only: ', neuron_word_coverage_total , neuron_attention_coverage_total) + return neuron_coverage / neuron_coverage_total + + def _compute_vector(self): + """Calculate `neuron_coverage` for current model.""" + neuron_coverage_vector = [] + for section in self.coverage_word_dicts: + for entry in section.values(): + neuron_coverage_vector += [ + entry_val.item() for entry_val in entry.flatten() + ] + for section in self.coverage_attention_dicts: + for entry in section.values(): + neuron_coverage_vector += [ + entry_val.item() for entry_val in entry.flatten() + ] + + return neuron_coverage_vector + + def _update_coverage(self, text, word_mask=None): + """Update `coverage_tracker` for input `text` + Args: + `text`(str): text to update neuron coverage of. + + """ + + self._update_word_coverage(text, word_mask) + self._update_attention_coverage(text) + + def __call__(self, testset, masks=None, bz=1): + """ + Returns neuron of `testset` + Args: + testset: Iterable of strings + Returns: + neuron coverage (float) + """ + # # # print('*'*50) + # # # print('Updating Coverage using test set: ') + mask_no, start = 0, 0 + with torch.no_grad(): + for t in tqdm(testset): + if mask_no + bz >= len(testset): + end = len(testset) + else: + end = start + bz + if start >= end or start > len(testset): + break + + encodings = self.test_model.tokenizer( + testset[start:end], + padding="max_length", + truncation=True, + return_tensors="pt", + max_length=self.max_seq_len, + ) + + if self.max_seq_len > 0: + input_ids = encodings.input_ids[:, : self.max_seq_len] + attention_mask = encodings.attention_mask[:, : self.max_seq_len] + + input_ids = input_ids.to(textattack.shared.utils.device) + attention_mask = attention_mask.to(textattack.shared.utils.device) + + outputs = self.test_model( + input_ids, + attention_mask=attention_mask, + output_attentions=True, + output_hidden_states=True, + ) + all_hidden_states, all_attentions = outputs[-2:] + self._update_word_coverage(all_hidden_states, masks[start:end]) + self._update_attention_coverage(all_attentions, masks[start:end]) + + start = end + + # # # print('*'*50) + # # # print() + # # # print('*'*50) + # # # print('Computing Coverage: ') + neuron_coverage = self._compute_coverage() + # # # print('*'*50) + return neuron_coverage + + def vector(self, testset, start=False): + """ + Returns neuron of `testset` + Args: + testset: Iterable of strings + Returns: + neuron coverage (float) + """ + # # # print('*'*50) + if start: + self.coverage_word_dicts = copy.deepcopy(self.training_word_coverage_dicts) + self.coverage_attention_dicts = copy.deepcopy( + self.training_attention_coverage_dicts + ) + # # # print('Updating Coverage using test set: ') + # # # print('#'*100) + # # # print(len(testset)) + # # # print(testset) + # # # print('#'*100) + for t in tqdm(testset): + # # # print(t) + self._update_coverage(t) + + # # # print('*'*50) + # # # print() + # # # print('*'*50) + # # # print('Computing Coverage: ') + neuron_coverage = self._compute_vector() + # # print('*'*50) + return neuron_coverage diff --git a/textattack/coverage/neuron_coverage.py b/textattack/coverage/neuron_coverage.py new file mode 100644 index 00000000..f2c42d4e --- /dev/null +++ b/textattack/coverage/neuron_coverage.py @@ -0,0 +1,202 @@ +from collections import defaultdict +import copy +import itertools +import logging + +import torch +from tqdm import tqdm +import transformers + +import textattack + +from .coverage import ExtrinsicCoverage + +logging.getLogger("transformers.tokenization_utils_base").setLevel(logging.ERROR) + + +COVERAGE_MODEL_TYPES = ["bert", "albert", "distilbert", "roberta"] + + +class neuronCoverage(ExtrinsicCoverage): + """ + ``neuronCoverage`` measures the neuron coverage acheived by a testset + Args: + test_model(Union[str, torch.nn.Module]): name of the pretrained language model from `transformers` + or the actual test model as a `torch.nn.Module` class. Default is "bert base uncased" from `transformers`. + tokenizer (:obj:``, optional): If `test_model` is not a pretrained model from `transformers, need to provide + the tokenizer here. + max_seq_len (int): Maximum sequence length accepted by the model to be tested. However, if you are using a pretrained model from `transformers`, this is handled + automatically using information from `model.config`. + threshold(float): threshold for marking a neuron as activated + coarse_coverage(bool): measure neuron coverage at the level of layer outputs + """ + + def __init__( + self, + test_model="textattack/bert-base-uncased-ag-news", + tokenizer=None, + num_labels=2, + max_seq_len=-1, + threshold=0.0, + coarse_coverage=True, + ): + self.coarse_coverage = coarse_coverage + + config = transformers.AutoConfig.from_pretrained( + test_model, output_hidden_states=True, num_labels=num_labels + ) + if config.model_type in COVERAGE_MODEL_TYPES: + self.test_model = ( + transformers.AutoModelForSequenceClassification.from_pretrained( + test_model, config=config + ) + ) + self.tokenizer = transformers.AutoTokenizer.from_pretrained( + test_model, use_fast=True + ) + self.model_type = self.test_model.config.model_type + self.max_seq_len = ( + max_seq_len + if max_seq_len != -1 + else self.test_model.config.max_position_embeddings + ) + else: + raise ValueError( + "`neuronCoverage` only accepts models in " + + ",".join(COVERAGE_MODEL_TYPES) + ) + + self.test_model.to(textattack.shared.utils.device) + self.threshold = threshold + self.test_model.eval() + self.coverage_tracker = self._init_coverage() + + def _init_coverage(self): + """Initialize `coverage_tracker` dictionary. + + Returns: + `coverage_tracker`(dict): a dictionary with key: neuron and value: (bool) intialized False + """ + coverage_tracker = defaultdict(bool) + + for bert_layer_index in range(self.test_model.config.num_hidden_layers): + coverage_tracker[(bert_layer_index, "output")] = torch.zeros( + (self.max_seq_len, self.test_model.config.hidden_size), dtype=bool + ).to(textattack.shared.utils.device) + coverage_tracker["classifier"] = torch.zeros( + (len(self.test_model.config.label2id)), dtype=bool + ).to(textattack.shared.utils.device) + coverage_tracker["embedding"] = torch.zeros( + (self.max_seq_len, self.test_model.config.hidden_size), dtype=bool + ).to(textattack.shared.utils.device) + + return coverage_tracker + + def _eval(self, text): + """Update `coverage_tracker` for input `text` for coarse coverage + Args: + `text`(str): text to update neuron coverage of. + + """ + encodings = self.tokenizer(text, return_tensors="pt") + if self.max_seq_len > 0: + input_ids = encodings.input_ids[:, : self.max_seq_len] + attention_mask = encodings.attention_mask[:, : self.max_seq_len] + + input_ids = input_ids.to(textattack.shared.utils.device) + attention_mask = attention_mask.to(textattack.shared.utils.device) + outputs = self.test_model(input_ids, attention_mask=attention_mask) + return outputs + + def _update_coarse_coverage(self, text): + """Update `coverage_tracker` for input `text` for coarse coverage + Args: + `text`(str): text to update neuron coverage of. + + """ + encodings = self.tokenizer(text, return_tensors="pt") + if self.max_seq_len > 0: + input_ids = encodings.input_ids[:, : self.max_seq_len] + attention_mask = encodings.attention_mask[:, : self.max_seq_len] + + input_ids = input_ids.to(textattack.shared.utils.device) + attention_mask = attention_mask.to(textattack.shared.utils.device) + outputs = self.test_model(input_ids, attention_mask=attention_mask) + sentence_length = outputs[1][0][0, ...].size(0) + + def scale(layer_outputs, rmax=1, rmin=0): + divider = layer_outputs.max() - layer_outputs.min() + + if divider == 0: + return torch.zeros_like(layer_outputs) + + X_std = (layer_outputs - layer_outputs.min()) / divider + + X_scaled = X_std * (rmax - rmin) + rmin + return X_scaled + + self.coverage_tracker[("embedding")][0:sentence_length, ...] = torch.where( + scale(outputs[1][0][0, ...]) > self.threshold, + torch.ones( + (sentence_length, self.test_model.config.hidden_size), dtype=bool + ).to(textattack.shared.utils.device), + self.coverage_tracker[("embedding")][0:sentence_length, ...], + ) + for h_index, hidden_vector in enumerate(outputs[1][1:]): + self.coverage_tracker[(h_index, "output")][ + 0:sentence_length, ... + ] = torch.where( + scale(hidden_vector[0, ...]) > self.threshold, + torch.ones( + (sentence_length, self.test_model.config.hidden_size), dtype=bool + ).to(textattack.shared.utils.device), + self.coverage_tracker[(h_index, "output")][0:sentence_length, ...], + ) + + self.coverage_tracker["classifier"] = torch.where( + scale(outputs[0][0, ...]) > self.threshold, + torch.ones((len(self.test_model.config.label2id)), dtype=bool).to( + textattack.shared.utils.device + ), + self.coverage_tracker["classifier"], + ) + + def _update_refined_coverage(self, text): + """Update `coverage_tracker` for input `text` for refined coverage + Args: + `text`(str): text to update neuron coverage of. + + """ + + def _compute_coverage(self): + """Calculate `neuron_coverage` for current model.""" + + neuron_coverage = sum( + [entry.sum().item() for entry in self.coverage_tracker.values()] + ) / sum([entry.numel() for entry in self.coverage_tracker.values()]) + + return neuron_coverage + + def _update_coverage(self, text): + """Update `coverage_tracker` for input `text` + Args: + `text`(str): text to update neuron coverage of. + + """ + if self.coarse_coverage: + self._update_coarse_coverage(text) + else: + pass + + def __call__(self, testset): + """ + Returns neuron of `testset` + Args: + testset: Iterable of strings + Returns: + neuron coverage (float) + """ + for t in tqdm(testset): + self._update_coverage(t[0]["text"]) + neuron_coverage = self._compute_coverage() + return neuron_coverage diff --git a/textattack/coverage/perplexity_coverage.py b/textattack/coverage/perplexity_coverage.py new file mode 100644 index 00000000..84b15672 --- /dev/null +++ b/textattack/coverage/perplexity_coverage.py @@ -0,0 +1,97 @@ +import logging + +import torch +from tqdm import tqdm +import transformers + +import textattack + +from .coverage import ExtrinsicCoverage + +logging.getLogger("transformers.tokenization_utils_base").setLevel(logging.ERROR) + + +class PerplexityCoverage(ExtrinsicCoverage): + """ + ``PerplexityCoverage`` meausures the average perplexity of a given test datsaet using a language model + Args: + language_model(Union[str, torch.nn.Module]): name of the pretrained language model from `transformers` + or the actual language model as a `torch.nn.Module` class. Default is "gpt2" from `transformers`. + tokenizer (:obj:``, optional): If `language_model` is not a pretrained model from `transformers, need to provide + the tokenizer here. + max_seq_len(:obj:`int`, optional): Max sequence length to consider. If not set and if the language model is a fixed-length model, + defaults to the max sequence of length of the model. + batch_size (int): Batch size when calculating perplexity. + """ + + def __init__( + self, language_model="gpt2", tokenizer=None, max_seq_len=None, stride_size=512 + ): + if isinstance(language_model, str): + self.language_model = transformers.AutoModelForCausalLM.from_pretrained( + language_model + ) + self.tokenizer = transformers.AutoTokenizer.from_pretrained( + language_model, use_fast=True + ) + self.max_seq_len = ( + max_seq_len if max_seq_len else self.language_model.config.n_positions + ) + if stride_size > self.max_seq_len: + raise ValueError( + f"Stride size cannot be greater than max sequence length ({stride_size} > {max_seq_len})." + ) + self.stride_size = stride_size + else: + raise ValueError('`PerplexityCoverage` only currently supports "gpt2"') + + self.language_model.to(textattack.shared.utils.device) + self.language_model.eval() + + def _gpt2_calc_perplexity(self, text): + encodings = self.tokenizer(text, return_tensors="pt") + if self.max_seq_len > 0: + input_ids = encodings.input_ids[:, : self.max_seq_len] + attention_mask = encodings.attention_mask[:, : self.max_seq_len] + + input_ids = input_ids.to(textattack.shared.utils.device) + attention_mask = attention_mask.to(textattack.shared.utils.device) + + lls = [] + for i in range(0, input_ids.size(1), self.stride_size): + begin_loc = max(i + self.stride_size - self.max_seq_len, 0) + end_loc = min(i + self.stride_size, input_ids.size(1)) + trg_len = end_loc - i # may be different from stride on last loop + input_ids = input_ids[:, begin_loc:end_loc].to( + textattack.shared.utils.device + ) + attention_mask = attention_mask[:, begin_loc:end_loc].to( + textattack.shared.utils.device + ) + target_ids = input_ids.clone() + target_ids[:, :-trg_len] = -100 + + with torch.no_grad(): + outputs = self.language_model( + input_ids, attention_mask=attention_mask, labels=target_ids + ) + log_likelihood = outputs[0] * trg_len + + lls.append(log_likelihood) + + ppl = torch.exp(torch.stack(lls).sum() / end_loc) + return ppl.item() + + def __call__(self, testset): + """ + Returns average perplexity of `testset` + Args: + testset: Iterable of strings + Returns: + average perplexity (float) + """ + ppls = [] + for text in tqdm(testset): + pp = self._gpt2_calc_perplexity(text) + ppls.append(pp) + return sum(ppls) / len(testset), ppls