From b9b78d6020f4033fde4f97bbb959c726c2bd23d7 Mon Sep 17 00:00:00 2001 From: Yanjun Qi Date: Sun, 17 Sep 2023 15:31:01 -0400 Subject: [PATCH 1/7] adding in the coverage package / --- textattack/coverage/__init__.py | 4 + textattack/coverage/coverage.py | 30 ++ textattack/coverage/kmn_coverage.py | 516 +++++++++++++++++++++ textattack/coverage/neuron_coverage.py | 204 ++++++++ textattack/coverage/perplexity_coverage.py | 97 ++++ textattack/metrics/recipe.py | 20 + 6 files changed, 871 insertions(+) create mode 100644 textattack/coverage/__init__.py create mode 100644 textattack/coverage/coverage.py create mode 100644 textattack/coverage/kmn_coverage.py create mode 100644 textattack/coverage/neuron_coverage.py create mode 100644 textattack/coverage/perplexity_coverage.py create mode 100644 textattack/metrics/recipe.py diff --git a/textattack/coverage/__init__.py b/textattack/coverage/__init__.py new file mode 100644 index 000000000..63736137b --- /dev/null +++ b/textattack/coverage/__init__.py @@ -0,0 +1,4 @@ +from .coverage import Coverage, IntrinsicCoverage, ExtrinsicCoverage +from .perplexity_coverage import PerplexityCoverage +from .neuron_coverage import neuronCoverage +from .kmn_coverage import neuronMultiSectionCoverage diff --git a/textattack/coverage/coverage.py b/textattack/coverage/coverage.py new file mode 100644 index 000000000..125f2f947 --- /dev/null +++ b/textattack/coverage/coverage.py @@ -0,0 +1,30 @@ +from abc import ABC, abstractmethod + + +class Coverage(ABC): + """``Coverage`` class measures how well a given test dataset tests the + given model. + + This is an abstract base class for other ``Coverage`` classes. + """ + + +class ExtrinsicCoverage(Coverage): + """Represents coverage methods that do not access the model that is subject + of testing to measure the quality of test set.""" + + @abstractmethod + def __call__(self, testset): + raise NotImplementedError() + + +class IntrinsicCoverage(Coverage): + """Represents coverage methods that do access the model that is subject of + testing to measure the quality of test set.""" + + def __init__(self, model): + self.model = model + + @abstractmethod + def __call__(self, testset): + raise NotImplementedError() diff --git a/textattack/coverage/kmn_coverage.py b/textattack/coverage/kmn_coverage.py new file mode 100644 index 000000000..0a836208e --- /dev/null +++ b/textattack/coverage/kmn_coverage.py @@ -0,0 +1,516 @@ +import logging + +import torch +import transformers +from tqdm import tqdm +import itertools +import copy +import numpy as np +import textattack +from collections import defaultdict +from .coverage import ExtrinsicCoverage +import torch.nn.functional as F +import time +logging.getLogger("transformers.tokenization_utils_base").setLevel(logging.ERROR) + + +COVERAGE_MODEL_TYPES = ["bert", "albert", "distilbert", "roberta"] + + +class neuronMultiSectionCoverage(ExtrinsicCoverage): + """ + ``neuronMultiSectionCoverage`` measures the neuron coverage acheived by a testset + Args: + test_model(Union[str, torch.nn.Module]): name of the pretrained language model from `transformers` + or the actual test model as a `torch.nn.Module` class. Default is "bert base uncased" from `transformers`. + tokenizer (:obj:``, optional): If `test_model` is not a pretrained model from `transformers, need to provide + the tokenizer here. + max_seq_len (int): Maximum sequence length accepted by the model to be tested. However, if you are using a pretrained model from `transformers`, this is handled + automatically using information from `model.config`. + threshold(float): threshold for marking a neuron as activated + coverage(str): measure type of neuron coverage at the level of layer outputs + """ + + def __init__( + self, + test_model="textattack/bert-base-uncased-ag-news", + tokenizer=None, + max_seq_len=-1, + threshold=0.0, + num_labels = 2, + coverage = 'multisection', + pre_limits = False, + bins_attention =4, + bins_word = 4, + min_value=np.inf, + max_value=-np.inf, + bz = 128, + word_mask = False, + ): + + self.coverage = coverage + + self.word_mask = word_mask + self.pre_limits = pre_limits + self.bins_attention = bins_attention + self.bins_word = bins_word # number of sections for each neuron + self.max_seq_len = 128 + self.model_type = 'bert' + + config = transformers.AutoConfig.from_pretrained( + test_model, output_hidden_states=True, num_labels = num_labels + ) + if config.model_type in COVERAGE_MODEL_TYPES: + self.test_model = ( + transformers.AutoModelForSequenceClassification.from_pretrained( + test_model, config=config + ) + ) + self.test_model.tokenizer = transformers.AutoTokenizer.from_pretrained( + test_model + ) + self.model_type = self.test_model.config.model_type + self.max_seq_len = ( + max_seq_len + if max_seq_len != -1 + else self.test_model.config.max_position_embeddings + ) + else: + raise ValueError( + "`neuronCoverage` only accepts models in " + + ",".join(COVERAGE_MODEL_TYPES) + ) + + + self.test_model.to(textattack.shared.utils.device) + self.threshold = threshold + self.test_model.eval() + + # initialize min and max for coverage + min_attention_value = min_value + max_attention_value = max_value + if pre_limits: + min_attention_value = 0.0 + max_attention_value = 1.0 + + self.coverage_word_dicts = torch.zeros((self.bins_word+3, 13, self.max_seq_len, 768)) + self.coverage_attention_dicts = torch.zeros((self.bins_attention + 3, 12, 12, self.max_seq_len, self.max_seq_len)) + self.min_word_coverage_tracker = torch.zeros((13, self.max_seq_len, 768)).fill_(min_value) + self.min_attention_coverage_tracker = torch.zeros((12, 12, self.max_seq_len, self.max_seq_len)).fill_(min_attention_value) + + self.max_word_coverage_tracker = torch.zeros(( 13, self.max_seq_len, 768)).fill_(max_value) + self.max_attention_coverage_tracker = torch.zeros(( 12, 12, self.max_seq_len, self.max_seq_len)).fill_(max_attention_value) + + + + if 'snac' in self.coverage: + self.k_m = 2 + if 'nbc' in self.coverage: + self.k_m = 1 + ''' + for i in range(self.bins_word): + word_tracker = self._init_word_coverage(fill_value=0.0) + self.coverage_word_dicts.append(word_tracker) + for i in range(self.bins_attention): + attention_tracker = self._init_attention_coverage(fill_value=0.0) + self.coverage_attention_dicts.append(attention_tracker) + ''' + def _init_word_coverage(self, fill_value): + """Initialize `coverage_tracker` dictionary + + Returns: + `coverage_tracker`(dict): a dictionary with key: neuron and value: (bool) intialized False + """ + coverage_word_tracker = torch.zeros_like(self.coverage_word_dicts) + + ''' + coverage_tracker["classifier"] = ( + torch.zeros((len(self.test_model.config.label2id)), requires_grad=False) + .fill_(fill_value) + .to(textattack.shared.utils.device) + .detach() + ) + ''' + # embedding is L X H + + ''' + coverage_tracker["classifier"] = ( + torch.zeros((len(self.test_model.config.label2id)), requires_grad=False) + .fill_(fill_value) + .to(textattack.shared.utils.device) + .detach() + ) + ''' + + return coverage_word_tracker + def _init_attention_coverage(self, fill_value): + """Initialize `coverage_tracker` dictionary + + Returns: + `coverage_tracker`(dict): a dictionary with key: neuron and value: (bool) intialized False + """ + # attention neurons + coverage_attention_tracker = torch.zeros_like(self.coverage_attention_dicts) + return coverage_attention_tracker + + def _update_initial_word_coverage(self, embeddings, word_mask = None, interaction_mask = None): + """Update `coverage_tracker` for input `text` for coarse coverage + Args: + `text`(str): text to update neuron coverage of. + + """ + + ''' + encodings = self.test_model.tokenizer(text, return_tensors="pt") + if self.max_seq_len > 0: + input_ids = encodings.input_ids[:, : self.max_seq_len] + attention_mask = encodings.attention_mask[:, : self.max_seq_len] + + input_ids = input_ids.to(textattack.shared.utils.device) + attention_mask = attention_mask.to(textattack.shared.utils.device) + outputs = self.test_model(input_ids, attention_mask=attention_mask) + outputs[1][0] + ''' + + sentence_length = embeddings[0][0, ...].size(0) + + embeddings = [e.unsqueeze(1) for e in embeddings] + + embeddings = torch.cat(embeddings, dim = 1).cpu() + + if self.word_mask: + indices_to_fill = [int( index) for index in range(sentence_length)] + else: + indices_to_fill = [index for index in range(sentence_length)] + #print(embeddings,, self.max_word_coverage_tracker.device) + self.max_word_coverage_tracker[:,indices_to_fill, :] = torch.where(torch.max(embeddings, dim = 0).values.detach() > self.max_word_coverage_tracker[:,indices_to_fill, :] , torch.max(embeddings, dim = 0).values.detach(), self.max_word_coverage_tracker[:,indices_to_fill, :]) + self.min_word_coverage_tracker[:,indices_to_fill, :] = torch.where(torch.min(embeddings, dim = 0).values.detach() \ + < self.min_word_coverage_tracker[:,indices_to_fill, :] , torch.min(embeddings, dim = 0).values.detach(), self.min_word_coverage_tracker[:,indices_to_fill, :]) + + + ''' + self.max_coverage_tracker["classifier"] = torch.where( + (outputs[0][0, ...].detach()) > self.max_coverage_tracker["classifier"], + outputs[0][0, ...].detach(), + self.max_coverage_tracker["classifier"], + ) + ''' + + + def _update_initial_attention_coverage(self, all_attentions): + """Update `coverage_tracker` for input `text` for coarse coverage + Args: + `text`(str): text to update neuron coverage of. + + """ + + + # all_attentions = list of attentions of size B X H X L X L + + sentence_length = all_attentions[0][0,0, ...].size(-1) + all_attentions = torch.cat([a.unsqueeze(1) for a in all_attentions], dim = 1) # B X LA X HD X L X L + all_attentions_max = torch.max( all_attentions, dim = 0).values.cpu() + all_attentions_min = torch.min( all_attentions, dim = 0).values.cpu() + self.max_attention_coverage_tracker = torch.where(all_attentions_max > self.max_attention_coverage_tracker, all_attentions_max, self.max_attention_coverage_tracker) + self.min_attention_coverage_tracker = torch.where(all_attentions_min < self.min_attention_coverage_tracker, all_attentions_min, self.min_attention_coverage_tracker) + + + + def _update_initial_coverage(self, all_hidden_states, all_attentions, word_mask = None): + """Update `coverage_tracker` for input `text` + Args: + `text`(str): text to update neuron coverage of. + + """ + + + self._update_initial_word_coverage(all_hidden_states, word_mask) + + self._update_initial_attention_coverage(all_attentions) + + def initialize_from_training_dataset(self, trainset, masks = None, bz=1): + """Update coverage from training dataset + `trainset`(list[str]): training dataset coverage statistics + + + """ + mask_no = 0 + + + start = 0 + with torch.no_grad(): + for t in tqdm(trainset): + + if mask_no + bz >= len(trainset): + end = len(trainset) + else: + end = start + bz + if start >= end or start > len(trainset) : break + #print('current indices : ', trainset[start:end], start, end, len(trainset)) + encodings = self.test_model.tokenizer(trainset[start:end], padding='max_length', truncation=True, return_tensors="pt", max_length = self.max_seq_len) + + if self.max_seq_len > 0: + input_ids = encodings.input_ids[:, : self.max_seq_len] + attention_mask = encodings.attention_mask[:, : self.max_seq_len] + + input_ids = input_ids.to(textattack.shared.utils.device) + attention_mask = attention_mask.to(textattack.shared.utils.device) + + outputs = self.test_model(input_ids, attention_mask=attention_mask, output_attentions=True, output_hidden_states = True) + all_hidden_states, all_attentions = outputs[-2:] + self._update_initial_coverage(all_hidden_states, all_attentions, masks[start :end]) + start = end + + + self.training_word_coverage_dicts = copy.deepcopy(self.coverage_word_dicts) + self.training_attention_coverage_dicts = copy.deepcopy(self.coverage_attention_dicts) + + def _eval(self, text): + """Update `coverage_tracker` for input `text` for coarse coverage + Args: + `text`(str): text to update neuron coverage of. + + """ + encodings = self.test_model.tokenizer(text, return_tensors="pt") + if self.max_seq_len > 0: + input_ids = encodings.input_ids[:, : self.max_seq_len] + attention_mask = encodings.attention_mask[:, : self.max_seq_len] + + input_ids = input_ids.to(textattack.shared.utils.device) + attention_mask = attention_mask.to(textattack.shared.utils.device) + outputs = self.test_model(input_ids, attention_mask=attention_mask) + return outputs + + + def _update_word_coverage(self, all_hidden_states, word_mask = None): + """Update `coverage_tracker` for input `text` for coarse coverage + Args: + `text`(str): text to update neuron coverage of. + + + + a = time.time() + encodings = self.test_model.tokenizer(text, padding='max_length', truncation=True, return_tensors="pt", max_length = self.max_seq_len) + if self.max_seq_len > 0: + input_ids = encodings.input_ids[:, : self.max_seq_len] + attention_mask = encodings.attention_mask[:, : self.max_seq_len] + + input_ids = input_ids.to(textattack.shared.utils.device) + attention_mask = attention_mask.to(textattack.shared.utils.device) + outputs = self.test_model(input_ids, attention_mask=attention_mask) + b = time.time() + + sentence_length = outputs[1][0][0, ...].size(0) + """ + hidden_vectors = torch.cat([o.unsqueeze(1) for o in all_hidden_states], dim = 1) + sentence_length = hidden_vectors.size(2) + #print('size of output hidden bectors: ', hidden_vectors.size()) + if self.word_mask: + indices_to_fill = [index for index in range(sentence_length)] + else: + indices_to_fill = [index for index in range(sentence_length)] + current_coverage_tracker = self._init_word_coverage(fill_value=0) + a = time.time() + section_length = (self.max_word_coverage_tracker[:, indices_to_fill , :] - self.min_word_coverage_tracker[:, indices_to_fill , :] ) / self.bins_word + section_length = section_length.unsqueeze(0).repeat(hidden_vectors.size(0), 1, 1, 1) + #print('section length: ', section_length.size()) + section_index = torch.where( + section_length > 0, + ( + torch.floor( + ( + hidden_vectors.cpu().detach() + - self.min_word_coverage_tracker[:, + indices_to_fill , : + ] + ) + / section_length + ) + ), + torch.zeros_like(hidden_vectors.cpu().detach(), requires_grad=False) -1, + ).long() + # print('section index: ', section_index.size()) + + + #section_index = torch.where(section_index, section_index, self.bins_word + 1) + #section_index = torch.where(section_index>0, section_index, torch.zeros_like(section_index) + self.bins_word + 1) + section_index = torch.where(section_index0, section_index, torch.zeros_like(section_index) + self.bins_word + 1) + + # print('section index: ', section_index.size()) + + temp_store_activations = torch.max( (F.one_hot(section_index, num_classes = self.bins_word + 3)).permute(0,4,1,2,3), dim = 0).values + + # print('Temp Store Activations: ', temp_store_activations.size()) + self.coverage_word_dicts += temp_store_activations + del temp_store_activations + del current_coverage_tracker + + def _update_attention_coverage(self, all_attentions, masks): + """Update `coverage_tracker` for input `text` for coarse coverage + Args: + `text`(str): text to update neuron coverage of. + + + encodings = self.test_model.tokenizer(text, padding='max_length', truncation=True, return_tensors="pt", max_length = self.max_seq_len) + if self.max_seq_len > 0: + input_ids = encodings.input_ids[:, : self.max_seq_len] + attention_mask = encodings.attention_mask[:, : self.max_seq_len] + + input_ids = input_ids.to(textattack.shared.utils.device) + attention_mask = attention_mask.to(textattack.shared.utils.device) + outputs = self.test_model(input_ids, attention_mask=attention_mask, output_attentions=True, output_hidden_states = True) + + all_hidden_states, all_attentions = outputs[-2:] + # all_attentions = list of attentions of size B X H X L X L + + """ + sentence_length = all_attentions[0][0,0, ...].size(-1) + + + all_attentions = torch.cat( [a.unsqueeze(1) for a in all_attentions] , dim = 1).cpu()[:,:, 0:sentence_length, 0:sentence_length] + # B X layers X heads X l X l + # print('attentions size: ', all_attentions.size()) + current_coverage_tracker = self._init_attention_coverage(fill_value=0) + + + section_length = (self.max_attention_coverage_tracker[:,:, 0:sentence_length, 0:sentence_length] - \ + self.min_attention_coverage_tracker[:,:, 0:sentence_length, 0:sentence_length] ) / self.bins_attention + section_length = section_length.unsqueeze(0).repeat(all_attentions.size(0), 1, 1, 1, 1) + # print(' section length: ', section_length.size()) + section_index = torch.where( + section_length > 0, + ( + torch.floor( + ( + all_attentions.cpu().detach() + - self.min_attention_coverage_tracker + ) + / section_length + ) + ), + torch.zeros_like(all_attentions.cpu().detach(), requires_grad=False) - 1 + ).long() + + # print('section index: ', section_index.size()) + section_index = torch.where(section_index0, section_index, torch.zeros_like(section_index) + self.bins_word + 1) + temp_storage_activations = torch.max ((F.one_hot(section_index, num_classes = self.bins_attention + 3)).permute(0,5,1,2,3,4), dim = 0).values + # print(' temp storage activations: ', temp_storage_activations.size()) + self.coverage_attention_dicts += temp_storage_activations + del temp_storage_activations + del current_coverage_tracker + + def _compute_coverage(self): + """Calculate `neuron_coverage` for current model""" + neuron_word_coverage, neuron_word_coverage_total = 0.0, 0.0 + neuron_attention_coverage, neuron_attention_coverage_total = 0.0, 0.0 + neuron_word_coverage += np.count_nonzero(self.coverage_word_dicts.numpy()) + neuron_word_coverage_total += self.coverage_word_dicts.numel() + + neuron_attention_coverage += np.count_nonzero(self.coverage_attention_dicts.numpy()) + neuron_attention_coverage_total += self.coverage_attention_dicts.numel() + + + neuron_coverage = neuron_word_coverage + neuron_attention_coverage + # print('Word and Attention Only: ', neuron_word_coverage , neuron_attention_coverage) + neuron_coverage_total = neuron_word_coverage_total + neuron_attention_coverage_total + # print('Total Word and Attention Only: ', neuron_word_coverage_total , neuron_attention_coverage_total) + return neuron_coverage / neuron_coverage_total + + def _compute_vector(self): + """Calculate `neuron_coverage` for current model""" + neuron_coverage_vector = [] + for section in self.coverage_word_dicts: + for entry in section.values(): + neuron_coverage_vector += ([entry_val.item() for entry_val in entry.flatten()]) + for section in self.coverage_attention_dicts: + for entry in section.values(): + neuron_coverage_vector += ([entry_val.item() for entry_val in entry.flatten()]) + + return neuron_coverage_vector + def _update_coverage(self, text, word_mask = None): + """Update `coverage_tracker` for input `text` + Args: + `text`(str): text to update neuron coverage of. + + """ + + self._update_word_coverage(text, word_mask) + self._update_attention_coverage(text) + + def __call__(self, testset, masks = None, bz = 1): + """ + Returns neuron of `testset` + Args: + testset: Iterable of strings + Returns: + neuron coverage (float) + """ + # # # print('*'*50) + # # # print('Updating Coverage using test set: ') + mask_no, start = 0, 0 + with torch.no_grad(): + for t in tqdm(testset): + + if mask_no + bz >= len(testset): + end = len(testset) + else: + end = start + bz + if start >= end or start > len(testset) : break + + encodings = self.test_model.tokenizer(testset[start:end], padding='max_length', truncation=True, return_tensors="pt", max_length = self.max_seq_len) + + if self.max_seq_len > 0: + input_ids = encodings.input_ids[:, : self.max_seq_len] + attention_mask = encodings.attention_mask[:, : self.max_seq_len] + + input_ids = input_ids.to(textattack.shared.utils.device) + attention_mask = attention_mask.to(textattack.shared.utils.device) + + outputs = self.test_model(input_ids, attention_mask=attention_mask, output_attentions=True, output_hidden_states = True) + all_hidden_states, all_attentions = outputs[-2:] + self._update_word_coverage(all_hidden_states, masks[start:end]) + self._update_attention_coverage(all_attentions , masks[start:end]) + + + start = end + + + + + # # # print('*'*50) + # # # print() + # # # print('*'*50) + # # # print('Computing Coverage: ') + neuron_coverage = self._compute_coverage() + # # # print('*'*50) + return neuron_coverage + def vector(self, testset, start = False): + """ + Returns neuron of `testset` + Args: + testset: Iterable of strings + Returns: + neuron coverage (float) + """ + # # # print('*'*50) + if start: + self.coverage_word_dicts = copy.deepcopy(self.training_word_coverage_dicts) + self.coverage_attention_dicts = copy.deepcopy(self.training_attention_coverage_dicts) + # # # print('Updating Coverage using test set: ') + # # # print('#'*100) + # # # print(len(testset)) + # # # print(testset) + # # # print('#'*100) + for t in tqdm(testset): + # # # print(t) + self._update_coverage(t) + + # # # print('*'*50) + # # # print() + # # # print('*'*50) + # # # print('Computing Coverage: ') + neuron_coverage = self._compute_vector() + # # print('*'*50) + return neuron_coverage diff --git a/textattack/coverage/neuron_coverage.py b/textattack/coverage/neuron_coverage.py new file mode 100644 index 000000000..eef9d6f55 --- /dev/null +++ b/textattack/coverage/neuron_coverage.py @@ -0,0 +1,204 @@ +import logging + +import torch +import transformers +from tqdm import tqdm +import itertools +import copy + +import textattack +from collections import defaultdict +from .coverage import ExtrinsicCoverage + +logging.getLogger("transformers.tokenization_utils_base").setLevel(logging.ERROR) + + +COVERAGE_MODEL_TYPES = ["bert", "albert", "distilbert", "roberta"] + + +class neuronCoverage(ExtrinsicCoverage): + """ + ``neuronCoverage`` measures the neuron coverage acheived by a testset + Args: + test_model(Union[str, torch.nn.Module]): name of the pretrained language model from `transformers` + or the actual test model as a `torch.nn.Module` class. Default is "bert base uncased" from `transformers`. + tokenizer (:obj:``, optional): If `test_model` is not a pretrained model from `transformers, need to provide + the tokenizer here. + max_seq_len (int): Maximum sequence length accepted by the model to be tested. However, if you are using a pretrained model from `transformers`, this is handled + automatically using information from `model.config`. + threshold(float): threshold for marking a neuron as activated + coarse_coverage(bool): measure neuron coverage at the level of layer outputs + """ + + def __init__( + self, + test_model="textattack/bert-base-uncased-ag-news", + tokenizer=None, + num_labels = 2, + max_seq_len=-1, + threshold=0.0, + coarse_coverage=True, + ): + + self.coarse_coverage = coarse_coverage + + config = transformers.AutoConfig.from_pretrained( + test_model, output_hidden_states=True,num_labels=num_labels + ) + if config.model_type in COVERAGE_MODEL_TYPES: + self.test_model = ( + transformers.AutoModelForSequenceClassification.from_pretrained( + test_model, config=config + ) + ) + self.tokenizer = transformers.AutoTokenizer.from_pretrained( + test_model, use_fast=True + ) + self.model_type = self.test_model.config.model_type + self.max_seq_len = ( + max_seq_len + if max_seq_len != -1 + else self.test_model.config.max_position_embeddings + ) + else: + raise ValueError( + "`neuronCoverage` only accepts models in " + + ",".join(COVERAGE_MODEL_TYPES) + ) + + self.test_model.to(textattack.shared.utils.device) + self.threshold = threshold + self.test_model.eval() + self.coverage_tracker = self._init_coverage() + + def _init_coverage(self): + """Initialize `coverage_tracker` dictionary + + Returns: + `coverage_tracker`(dict): a dictionary with key: neuron and value: (bool) intialized False + """ + coverage_tracker = defaultdict(bool) + + for bert_layer_index in range(self.test_model.config.num_hidden_layers): + coverage_tracker[(bert_layer_index, "output")] = torch.zeros( + (self.max_seq_len, self.test_model.config.hidden_size), dtype=bool + ).to(textattack.shared.utils.device) + coverage_tracker["classifier"] = torch.zeros( + (len(self.test_model.config.label2id)), dtype=bool + ).to(textattack.shared.utils.device) + coverage_tracker["embedding"] = torch.zeros( + (self.max_seq_len, self.test_model.config.hidden_size), dtype=bool + ).to(textattack.shared.utils.device) + + return coverage_tracker + + def _eval(self, text): + """Update `coverage_tracker` for input `text` for coarse coverage + Args: + `text`(str): text to update neuron coverage of. + + """ + encodings = self.tokenizer(text, return_tensors="pt") + if self.max_seq_len > 0: + input_ids = encodings.input_ids[:, : self.max_seq_len] + attention_mask = encodings.attention_mask[:, : self.max_seq_len] + + input_ids = input_ids.to(textattack.shared.utils.device) + attention_mask = attention_mask.to(textattack.shared.utils.device) + outputs = self.test_model(input_ids, attention_mask=attention_mask) + return outputs + + def _update_coarse_coverage(self, text): + """Update `coverage_tracker` for input `text` for coarse coverage + Args: + `text`(str): text to update neuron coverage of. + + """ + encodings = self.tokenizer(text, return_tensors="pt") + if self.max_seq_len > 0: + input_ids = encodings.input_ids[:, : self.max_seq_len] + attention_mask = encodings.attention_mask[:, : self.max_seq_len] + + input_ids = input_ids.to(textattack.shared.utils.device) + attention_mask = attention_mask.to(textattack.shared.utils.device) + outputs = self.test_model(input_ids, attention_mask=attention_mask) + sentence_length = outputs[1][0][0, ...].size(0) + + def scale(layer_outputs, rmax=1, rmin=0): + divider = layer_outputs.max() - layer_outputs.min() + + if divider == 0: + return torch.zeros_like(layer_outputs) + + X_std = (layer_outputs - layer_outputs.min()) / divider + + X_scaled = X_std * (rmax - rmin) + rmin + return X_scaled + + self.coverage_tracker[("embedding")][0:sentence_length, ...] = torch.where( + scale(outputs[1][0][0, ...]) > self.threshold, + torch.ones( + (sentence_length, self.test_model.config.hidden_size), dtype=bool + ).to(textattack.shared.utils.device), + self.coverage_tracker[("embedding")][0:sentence_length, ...], + ) + for h_index, hidden_vector in enumerate(outputs[1][1:]): + + self.coverage_tracker[(h_index, "output")][ + 0:sentence_length, ... + ] = torch.where( + scale(hidden_vector[0, ...]) > self.threshold, + torch.ones( + (sentence_length, self.test_model.config.hidden_size), dtype=bool + ).to(textattack.shared.utils.device), + self.coverage_tracker[(h_index, "output")][0:sentence_length, ...], + ) + + self.coverage_tracker["classifier"] = torch.where( + scale(outputs[0][0, ...]) > self.threshold, + torch.ones((len(self.test_model.config.label2id)), dtype=bool).to( + textattack.shared.utils.device + ), + self.coverage_tracker["classifier"], + ) + + def _update_refined_coverage(self, text): + """Update `coverage_tracker` for input `text` for refined coverage + Args: + `text`(str): text to update neuron coverage of. + + """ + + def _compute_coverage(self): + """Calculate `neuron_coverage` for current model""" + + neuron_coverage = sum( + [entry.sum().item() for entry in self.coverage_tracker.values()] + ) / sum([entry.numel() for entry in self.coverage_tracker.values()]) + + return neuron_coverage + + def _update_coverage(self, text): + """Update `coverage_tracker` for input `text` + Args: + `text`(str): text to update neuron coverage of. + + """ + if self.coarse_coverage: + self._update_coarse_coverage(text) + else: + pass + + def __call__(self, testset): + """ + Returns neuron of `testset` + Args: + testset: Iterable of strings + Returns: + neuron coverage (float) + """ + for t in tqdm(testset): + + self._update_coverage(t[0]["text"]) + neuron_coverage = self._compute_coverage() + return neuron_coverage diff --git a/textattack/coverage/perplexity_coverage.py b/textattack/coverage/perplexity_coverage.py new file mode 100644 index 000000000..84b156722 --- /dev/null +++ b/textattack/coverage/perplexity_coverage.py @@ -0,0 +1,97 @@ +import logging + +import torch +from tqdm import tqdm +import transformers + +import textattack + +from .coverage import ExtrinsicCoverage + +logging.getLogger("transformers.tokenization_utils_base").setLevel(logging.ERROR) + + +class PerplexityCoverage(ExtrinsicCoverage): + """ + ``PerplexityCoverage`` meausures the average perplexity of a given test datsaet using a language model + Args: + language_model(Union[str, torch.nn.Module]): name of the pretrained language model from `transformers` + or the actual language model as a `torch.nn.Module` class. Default is "gpt2" from `transformers`. + tokenizer (:obj:``, optional): If `language_model` is not a pretrained model from `transformers, need to provide + the tokenizer here. + max_seq_len(:obj:`int`, optional): Max sequence length to consider. If not set and if the language model is a fixed-length model, + defaults to the max sequence of length of the model. + batch_size (int): Batch size when calculating perplexity. + """ + + def __init__( + self, language_model="gpt2", tokenizer=None, max_seq_len=None, stride_size=512 + ): + if isinstance(language_model, str): + self.language_model = transformers.AutoModelForCausalLM.from_pretrained( + language_model + ) + self.tokenizer = transformers.AutoTokenizer.from_pretrained( + language_model, use_fast=True + ) + self.max_seq_len = ( + max_seq_len if max_seq_len else self.language_model.config.n_positions + ) + if stride_size > self.max_seq_len: + raise ValueError( + f"Stride size cannot be greater than max sequence length ({stride_size} > {max_seq_len})." + ) + self.stride_size = stride_size + else: + raise ValueError('`PerplexityCoverage` only currently supports "gpt2"') + + self.language_model.to(textattack.shared.utils.device) + self.language_model.eval() + + def _gpt2_calc_perplexity(self, text): + encodings = self.tokenizer(text, return_tensors="pt") + if self.max_seq_len > 0: + input_ids = encodings.input_ids[:, : self.max_seq_len] + attention_mask = encodings.attention_mask[:, : self.max_seq_len] + + input_ids = input_ids.to(textattack.shared.utils.device) + attention_mask = attention_mask.to(textattack.shared.utils.device) + + lls = [] + for i in range(0, input_ids.size(1), self.stride_size): + begin_loc = max(i + self.stride_size - self.max_seq_len, 0) + end_loc = min(i + self.stride_size, input_ids.size(1)) + trg_len = end_loc - i # may be different from stride on last loop + input_ids = input_ids[:, begin_loc:end_loc].to( + textattack.shared.utils.device + ) + attention_mask = attention_mask[:, begin_loc:end_loc].to( + textattack.shared.utils.device + ) + target_ids = input_ids.clone() + target_ids[:, :-trg_len] = -100 + + with torch.no_grad(): + outputs = self.language_model( + input_ids, attention_mask=attention_mask, labels=target_ids + ) + log_likelihood = outputs[0] * trg_len + + lls.append(log_likelihood) + + ppl = torch.exp(torch.stack(lls).sum() / end_loc) + return ppl.item() + + def __call__(self, testset): + """ + Returns average perplexity of `testset` + Args: + testset: Iterable of strings + Returns: + average perplexity (float) + """ + ppls = [] + for text in tqdm(testset): + pp = self._gpt2_calc_perplexity(text) + ppls.append(pp) + return sum(ppls) / len(testset), ppls diff --git a/textattack/metrics/recipe.py b/textattack/metrics/recipe.py new file mode 100644 index 000000000..0ff7c18c2 --- /dev/null +++ b/textattack/metrics/recipe.py @@ -0,0 +1,20 @@ +""" +Attack Metric Quality Recipes: +============================== + +""" +import random + +from . import metric + +class AdvancedAttackMetric(results): + """Calculate a suite of advanced metrics to evaluate attackResults' quality + """ + + def __init__(self, results, **kwargs): + perplexity_stats = Perplexity().calculate(results) + use_stats = USEMetric().calculate(results) + bert_score = BERTScoreMetric().calculate(results) + meteor_score = MeteorMetric().calculate(results) + sbert_score = SBERTMetric().calculate(results) + return perplexity_stats, use_stats, bert_score, meteor_score, sbert_score \ No newline at end of file From 00894ae6bf986f47f6f5858f188e5802ed932a09 Mon Sep 17 00:00:00 2001 From: Yanjun Qi Date: Sun, 17 Sep 2023 15:36:59 -0400 Subject: [PATCH 2/7] adding examples of using coverage package --- examples/coverage/checklist_airline_tweets.py | 225 ++++++++++++++++++ examples/coverage/measure_coverage.py | 170 +++++++++++++ examples/coverage/measure_perp.py | 178 ++++++++++++++ examples/coverage/sanity_coverage.py | 203 ++++++++++++++++ 4 files changed, 776 insertions(+) create mode 100644 examples/coverage/checklist_airline_tweets.py create mode 100644 examples/coverage/measure_coverage.py create mode 100644 examples/coverage/measure_perp.py create mode 100644 examples/coverage/sanity_coverage.py diff --git a/examples/coverage/checklist_airline_tweets.py b/examples/coverage/checklist_airline_tweets.py new file mode 100644 index 000000000..9da8788b4 --- /dev/null +++ b/examples/coverage/checklist_airline_tweets.py @@ -0,0 +1,225 @@ + + + +import checklist +import copy +import torch +import random +import numpy as np +import pickle +import wandb + + +from checklist.test_types import MFT, INV, DIR +from checklist.test_suite import TestSuite +from sst_model import * + +from textattack.coverage import neuronMultiSectionCoverage +from textattack.datasets import HuggingFaceDataset +from coverage_args import * +from coverage_utils import * +TYPE_MAP = { + MFT: 'MFT', + INV: 'INV', + DIR: 'DIR', + } + +args = get_args() + +set_seed(args.seed) +wandb.init() +wandb.config.update(args) +wandb.init( + project="coverage", + notes="vanilla coverage only", + tags=["coverage", "bert"], + config=wandb.config, + ) +suite_path_dicts = { + 'sentiment' : 'sentiment/sentiment_suite.pkl', + 'qqp' : 'qqp/qqp_suite.pkl', + 'mc' : 'squad/squad_suite.pkl' +} +suite_path = './CHECKLIST_DATA/release_data/'+ suite_path_dicts[args.suite] + +suite = TestSuite.from_file(suite_path) + +if args.suite == 'sentiment': + # pretrained BERT model on SST-2 + model_name_or_path = 'textattack/'+args.base_model+'-SST-2' + model = SSTModel(model_name_or_path) +elif args.suite == 'qqp': + # pretrained BERT model on QQP + model_name_or_path = 'textattack/'+args.base_model+'-qqp' + model = QQPModel(model_name_or_path) +else: + quit() +threshold = args.threshold + + +coverage = neuronMultiSectionCoverage(test_model = model_name_or_path, max_seq_len = args.max_seq_len, + bins_word = args.bins_word, bins_attention = args.bins_attention, bz = 48, + pre_limits = False, word_mask = True) +print('initializing from training data') +if args.mask: + vocab = [] + vocab_file = open("selected_words.txt", "r") + content_list = vocab_file.readlines() + for a in content_list: + vocab.append(a.strip('\n')) +trainset_masks = [] +if args.suite == 'sentiment': + text_key = 'sentence' + + trainset = HuggingFaceDataset('glue', 'sst2', 'train', shuffle = True) + validset = HuggingFaceDataset('glue', 'sst2', 'validation', shuffle = True) +trainset_str = [] +validset_str = [] + + + +for example in trainset: + current_example = example[0][text_key] + if args.mask: + current_example = [word for word in current_example if word in selected_vocab] + + trainset_str.append(current_example) +for example in validset: + current_example = example[0][text_key] + if args.mask: + current_example = [word for word in current_example if word in selected_vocab] + + validset_str.append(current_example) +#testset = HuggingFaceDataset('glue', 'sst2', args.split, shuffle = False) +if args.debug == 1: + trainset_str = trainset_str[0:1000] + +for example in trainset_str: + + trainset_masks.append([1 for i in range(128)] ) +# initialize coverage from training set + + +save_coverage_init_file = os.path.join( './coverage_results/', args.base_model +'_'+args.suite+'_BW_'+ str(args.bins_word) + \ + '_BA_' + str(args.bins_attention) + '_INIT_' + str(len(trainset_str))+'.pkl') + +if not os.path.exists(save_coverage_init_file): + print('can\'t find!: ', save_coverage_init_file) + coverage.initialize_from_training_dataset(trainset_str, trainset_masks, bz = 128) + initial_coverage = coverage(trainset_str, trainset_masks, bz = 128) + + pickle.dump(coverage, open(save_coverage_init_file, 'wb')) +else: + print('*'*100) + print('exists!' , save_coverage_init_file) + print('*'*100) + coverage = pickle.load(open(save_coverage_init_file, 'rb')) + initial_coverage = coverage._compute_coverage() +for test in suite.tests: + + if not args.specify_test: + args.test_name = suite.tests[test].name + if TYPE_MAP[type(suite.tests[test])] == args.type.upper() and suite.tests[test].name == args.test_name: + if args.query_tests_only: + print(suite.tests[test].name) + continue + + if args.type.upper() == 'MFT' and (suite.tests[test].labels) is None: + continue + input_examples_orig = suite.tests[test].data + input_examples = [] + + test_examples = (suite.tests[test].to_raw_examples()) + #print(test_examples) + shuffled_indices = list(range(len(test_examples))) + if args.query_subset_only: + shuffled_indices = shuffled_indices[0:args.subset] + + #random.shuffle(shuffled_indices) + test_examples = [test_examples[t] for t in shuffled_indices] + test_indices = [suite.tests[test].example_list_and_indices()[1][t] for t in shuffled_indices] + + for initial, final in zip(test_indices, test_examples): + if type(input_examples_orig[initial]) is not list: + input_examples_orig[initial] = [input_examples_orig[initial]] + input_examples.append(input_examples_orig[initial][0]) + + #print('*'*100) + #print([a for a in input_examples]) + #print('*'*100) + labels = suite.tests[test].labels + if args.type.upper() == 'MFT' and type(suite.tests[test].labels) is not list: + labels = [labels]*len(test_examples) + + # coverage filtering + original_number_test_examples = len(test_examples) + if not args.baseline: + relevant_idxs, test_examples_list, skipped_examples_list = filter_using_coverage(coverage, initial_coverage, test_examples, threshold) + else: + relevant_idxs, test_examples_list, skipped_examples_list = [i for i in range(len(test_examples))], test_examples, [] + + with open(args.save_str + suite.tests[test].name +"_skipped_examples_"+str(len(trainset_str))+".txt", "wb") as fp: + pickle.dump(skipped_examples_list, fp) + with open(args.save_str + suite.tests[test].name +"_selected_examples_"+str(len(trainset_str))+".txt", "wb") as fp: + pickle.dump(test_examples_list, fp) + + test_examples = [test_examples[i] for i in relevant_idxs] + if args.type.upper() == 'MFT': labels = [labels[i] for i in relevant_idxs] + input_examples = [input_examples[i] for i in relevant_idxs] + + predictions_before_tx, predictions_prob_before_tx, predictions_after_tx, predictions_prob_after_tx \ + = get_predictions_after_tx(model, input_examples, test_examples) + if args.type.upper() == 'INV': + # to be fixed + failures = 0 + for p,t, pp, tp in zip(predictions_after_tx, predictions_before_tx, predictions_prob_after_tx, predictions_prob_before_tx): + if pp> (1.0/3) and pp<(2.0/3) and p == 1: + labelx = 1 + elif pp> (1.0/3) and pp<(2.0/3) and p == 0: + labelx = 0 + elif p == 1: + labelx = 2 + else: + labelx = 0 + if tp> (1.0/3) and tp<(2.0/3) and t == 1: + labelxt = 1 + elif tp> (1.0/3) and tp<(2.0/3) and t == 0: + labelxt = 0 + elif t == 1: + labelxt = 2 + else: + labelxt = 0 + if labelx != labelxt: + failures += 1 + else: + if abs(pp - tp) > 0.1: + failures += 1 + + failure_rate = failures/len(predictions_before_tx) + if args.type.upper() == 'MFT': + hard_predictions = [] + for p,l in zip(predictions_prob_after_tx, predictions_after_tx): + if p> (1.0/3) and p<(2.0/3) and l == 1: + hard_predictions.append(1) + elif p> (1.0/3) and p<(2.0/3) and l == 0: + hard_predictions.append(1) + elif l == 1: + hard_predictions.append(2) + else: + hard_predictions.append(0) + if len(labels) !=0 : + failure_rate = sum([p!=t for p,t in zip(hard_predictions, labels)])/len(labels) + else: + failure_rate = -1.0 + + + print(f'{suite.tests[test].name}, {len(predictions_before_tx)} ,{failure_rate*100.0} %') + with open(args.save_str + suite.tests[test].name +"_failure_rate_"+str(len(trainset_str))+".txt", "w") as f: + f.write(suite.tests[test].name+','+str(len(predictions_before_tx)) +','+str((original_number_test_examples)) + ','+ str(failure_rate*100.0)+'\n') + + + + + + + \ No newline at end of file diff --git a/examples/coverage/measure_coverage.py b/examples/coverage/measure_coverage.py new file mode 100644 index 000000000..e682098a2 --- /dev/null +++ b/examples/coverage/measure_coverage.py @@ -0,0 +1,170 @@ +""" + +python measure_coverage.py --split test --ratio 1.0 --coverage attention --length 128 --prespecify-limits --attack textfooler --test-ratio 1000 --save-dir ./coverage_workshop/ --seed 1 --dataset sst2 + + +""" + +import torch +import os +import textattack +import pickle +from textattack.models.tokenizers import AutoTokenizer +from textattack.models.wrappers import HuggingFaceModelWrapper +from textattack.models.wrappers import ModelWrapper +from transformers import AutoModelForSequenceClassification +from textattack.coverage import neuronMultiSectionCoverage +from textattack.attack_results import SuccessfulAttackResult +from textattack.datasets import HuggingFaceDataset +from textattack.attack_recipes import TextFoolerJin2019, HotFlipEbrahimi2017, DeepWordBugGao2018, FasterGeneticAlgorithmJia2019, BAEGarg2019 +from math import floor +import random +import numpy as np +import argparse + +def random_seed(seed): + torch.manual_seed(seed) + np.random.seed(seed) + random.seed(seed) + return + + + +random_seed(1) +parser = argparse.ArgumentParser(description='Measure Coverage of pretrained NLP Models') +parser.add_argument('--seed', type=int, default=1, help='set random seed') +parser.add_argument('--length', type=int, default=128, help='set max seq length') +parser.add_argument('--bins', type=int, default=10, help='set number of bins/sections') +parser.add_argument('--ratio', type=float, default=1, help='proportion of train set used for dataset sampling') +parser.add_argument('--test-ratio', type=int, default=1.0, help='proportion of train set used for dataset sampling') +parser.add_argument('--dataset', type=str, default='imdb', help='dataset to use for measuring coverage') +parser.add_argument('--save-dir', type=str, default='./coverage/', help='dataset to use for measuring coverage') +parser.add_argument('--model', type=str, default='bert-base-uncased', help='model f whose weights to use') +parser.add_argument('--coverage', type=str, default='attention', help='coverage type') +parser.add_argument('--attack', type=str, default='none', help='attack type') +parser.add_argument('--split', type=str, default='test', help='split to use for measuring coverage') +parser.add_argument('--base-only', action='store_true', help='loading only base model') +parser.add_argument('--prespecify-limits', action='store_true', help='prespecify') +args = parser.parse_args() +random_seed(args.seed) + + + +if not args.base_only: + if args.dataset == 'sst2': + test_model = 'textattack/' + str(args.model) + '-' + 'SST-2' + elif args.dataset == 'rotten-tomatoes': + test_model = 'textattack/' + str(args.model) + '-' + 'rotten_tomatoes' + else: + test_model = 'textattack/' + str(args.model) + '-' + str(args.dataset) +else: + test_model = args.model +text_key = 'text' +# test_model="textattack/bert-base-uncased-ag-news", +if args.dataset == 'sst2': + text_key = 'sentence' + trainset = HuggingFaceDataset('glue', 'sst2', 'train', shuffle = True) + testset = HuggingFaceDataset('glue', 'sst2', args.split, shuffle = True) +elif args.dataset == 'rotten-tomatoes': + trainset = HuggingFaceDataset('rotten_tomatoes', None, 'train', shuffle = True) + testset = HuggingFaceDataset('rotten_tomatoes', None, args.split, shuffle = True) +else: + trainset = HuggingFaceDataset(args.dataset, None, 'train', shuffle = True) + testset = HuggingFaceDataset(args.dataset, None, args.split, shuffle = True) + + + +if args.ratio <= 1.0: + trainset = trainset[0:floor(args.ratio*len(trainset))] +else: + trainset = trainset[0:floor(args.ratio)] + + +trainset_str = [] +for example in trainset: + + trainset_str.append(example[0][text_key]) + +if args.test_ratio <= 1.0: + testset = testset[0:floor(args.test_ratio*len(testset))] +else: + testset = testset[0:floor(args.test_ratio)] + +testset_str = [] +for example in testset: + testset_str.append(example[0][text_key]) + + +args.save_dir += 'COVER_' + args.coverage + '/' +os.makedirs(args.save_dir, exist_ok = True) +args.save_dir += 'SEED_'+str(args.seed) + '_BINS_' + str(args.bins) + '/' +os.makedirs(args.save_dir, exist_ok = True) +args.save_dir += 'data_' + str(args.dataset) + '_model_' + str(args.model) + '_ratio_' + str(args.ratio) + '_test_ratio_' + str(args.test_ratio) +'_L_'+ str(args.length) + '_B_' + str(args.base_only) + '/' + + +os.makedirs(args.save_dir, exist_ok = True) +args.save_dir += 'Attack_' + args.attack + '_limits_' + str(args.prespecify_limits) +os.makedirs(args.save_dir, exist_ok = True) + +# make coverage object +coverage = neuronMultiSectionCoverage(test_model = test_model, max_seq_len = args.length, k_m = args.bins, coverage = (args.coverage), pre_limits = (not (args.coverage == 'word') and args.prespecify_limits)) +print('initializing from training data') +coverage.initialize_from_training_dataset(trainset_str) + +print('--'*50) +print('generating test set!') +print('--'*50) +num_successes = 0.0 +total = 1.0 +if args.attack != 'none': + original_model = AutoModelForSequenceClassification.from_pretrained(test_model) + original_tokenizer = AutoTokenizer(test_model) + model = HuggingFaceModelWrapper(original_model,original_tokenizer) + if args.attack == 'textfooler': + attack = TextFoolerJin2019.build(model) + elif args.attack == 'alzantot': + attack = FasterGeneticAlgorithmJia2019.build(model) + elif args.attack == 'bae': + attack = BAEGarg2019.build(model) + elif args.attack == 'deepwordbug': + attack = DeepWordBugGao2018.build(model) + elif args.attack == 'hotflip': + attack = HotFlipEbrahimi2017.build(model) + else: + print('This Attack has not been added!') + raise NotImplementedError + results_iterable = attack.attack_dataset(testset, indices=None) + # save the results too + results_iterable = [result for result in results_iterable] + total = len(results_iterable) + pickle.dump( results_iterable , open( os.path.join(args.save_dir, "attack_results"), "wb" ) ) + for n,result in enumerate(results_iterable): + print('---original: \n', result.original_text()) + print('---perturbed: \n', result.perturbed_text()) + testset_str.append(result.perturbed_text()) + if isinstance(result, SuccessfulAttackResult): + num_successes += 1 + + +print('=+'*20) +print('successes: ', num_successes, 'total: ', total) +print('rate: ', num_successes / total) +print('--'*50) +print('length of generated test set: ', len(testset_str)) +print('--'*50) + + + +word_coverage = coverage(testset_str) + + +print('the coverage: ', word_coverage) + +results_file = open(os.path.join(args.save_dir, 'stats.txt'), 'w') +results_file.write('dataset, model, ratio, length, attack, limits, coverage, num_examples, num_test_examples, seed, split, coverage, num_successes, total\n') +results_file.write(','.join([args.dataset, test_model, str(args.ratio), str(args.test_ratio), str(args.length), args.attack, str(args.prespecify_limits), str(args.coverage), str(len(trainset_str)),str(len(testset_str)), str(args.seed), args.split, str(word_coverage) , str(num_successes), str(total)+'\n'])) +results_file.close() + + + + diff --git a/examples/coverage/measure_perp.py b/examples/coverage/measure_perp.py new file mode 100644 index 000000000..b5f82f6a9 --- /dev/null +++ b/examples/coverage/measure_perp.py @@ -0,0 +1,178 @@ + + + +import checklist +import copy +import torch +import random +import numpy as np +import pickle +import wandb + + +from checklist.test_types import MFT, INV, DIR +from checklist.test_suite import TestSuite +from sst_model import * + +from textattack.coverage import neuronMultiSectionCoverage +from textattack.datasets import HuggingFaceDataset +from textattack.metrics import * +from coverage_args import * +from repr_utils import * +from coverage_utils import * +TYPE_MAP = { + MFT: 'MFT', + INV: 'INV', + DIR: 'DIR', + } + +args = get_args() + +set_seed(args.seed) +wandb.init() +wandb.config.update(args) +wandb.init( + project="coverage", + notes="vanilla coverage only", + tags=["coverage", "bert"], + config=wandb.config, + ) +suite_path_dicts = { + 'sentiment' : 'sentiment/sentiment_suite.pkl', + 'qqp' : 'qqp/qqp_suite.pkl', + 'mc' : 'squad/squad_suite.pkl' +} +suite_path = './CHECKLIST_DATA/release_data/'+ suite_path_dicts[args.suite] + +suite = TestSuite.from_file(suite_path) + +if args.suite == 'sentiment': + # pretrained BERT model on SST-2 + model_name_or_path = 'textattack/'+args.base_model+'-SST-2' + model = SSTModel(model_name_or_path) +elif args.suite == 'qqp': + # pretrained BERT model on QQP + model_name_or_path = 'textattack/'+args.base_model+'-qqp' + model = QQPModel(model_name_or_path) +else: + quit() +threshold = args.threshold +ppl = Perplexity() + +coverage = neuronMultiSectionCoverage(test_model = model_name_or_path, max_seq_len = args.max_seq_len, + bins_word = args.bins_word, bins_attention = args.bins_attention, bz = 48, + pre_limits = False, word_mask = True) +print('initializing from training data') +if args.mask: + vocab = [] + vocab_file = open("selected_words.txt", "r") + content_list = vocab_file.readlines() + for a in content_list: + vocab.append(a.strip('\n')) +trainset_masks = [] +if args.suite == 'sentiment': + text_key = 'sentence' + + trainset = HuggingFaceDataset('glue', 'sst2', 'train', shuffle = True) + validset = HuggingFaceDataset('glue', 'sst2', 'validation', shuffle = True) +trainset_str = [] +validset_str = [] + + + +for example in trainset: + current_example = example[0][text_key] + if args.mask: + current_example = [word for word in current_example if word in selected_vocab] + + trainset_str.append(current_example) +for example in validset: + current_example = example[0][text_key] + if args.mask: + current_example = [word for word in current_example if word in selected_vocab] + + validset_str.append(current_example) +#testset = HuggingFaceDataset('glue', 'sst2', args.split, shuffle = False) +if args.debug == 1: + trainset_str = trainset_str[0:1000] + +for example in trainset_str: + + trainset_masks.append([1 for i in range(128)] ) +# initialize coverage from training set + + +save_coverage_init_file = os.path.join( './coverage_results/', args.base_model +'_'+args.suite+'_BW_'+ str(args.bins_word) + \ + '_BA_' + str(args.bins_attention) + '_INIT_' + str(len(trainset_str))+'.pkl') + +if not os.path.exists(save_coverage_init_file): + print('can\'t find!: ', save_coverage_init_file) + coverage.initialize_from_training_dataset(trainset_str, trainset_masks, bz = 128) + initial_coverage = coverage(trainset_str, trainset_masks, bz = 128) + + pickle.dump(coverage, open(save_coverage_init_file, 'wb')) +else: + print('*'*100) + print('exists!' , save_coverage_init_file) + print('*'*100) + coverage = pickle.load(open(save_coverage_init_file, 'rb')) + initial_coverage = coverage._compute_coverage() +for test in suite.tests: + + if not args.specify_test: + args.test_name = suite.tests[test].name + if TYPE_MAP[type(suite.tests[test])] == args.type.upper() and suite.tests[test].name == args.test_name: + if args.query_tests_only: + print(suite.tests[test].name) + continue + + if args.type.upper() == 'MFT' and (suite.tests[test].labels) is None: + continue + input_examples_orig = suite.tests[test].data + input_examples = [] + + test_examples = (suite.tests[test].to_raw_examples()) + #print(test_examples) + shuffled_indices = list(range(len(test_examples))) + if args.query_subset_only: + shuffled_indices = shuffled_indices[0:args.subset] + + #random.shuffle(shuffled_indices) + test_examples = [test_examples[t] for t in shuffled_indices] + test_indices = [suite.tests[test].example_list_and_indices()[1][t] for t in shuffled_indices] + + for initial, final in zip(test_indices, test_examples): + if type(input_examples_orig[initial]) is not list: + input_examples_orig[initial] = [input_examples_orig[initial]] + input_examples.append(input_examples_orig[initial][0]) + + #print('*'*100) + #print([a for a in input_examples]) + #print('*'*100) + labels = suite.tests[test].labels + if args.type.upper() == 'MFT' and type(suite.tests[test].labels) is not list: + labels = [labels]*len(test_examples) + + # coverage filtering + original_number_test_examples = len(test_examples) + with open(args.save_str + suite.tests[test].name +"_skipped_examples_"+str(len(trainset_str))+".txt", "rb") as fp: + skipped_examples_list = pickle.load(fp) + with open(args.save_str + suite.tests[test].name +"_selected_examples_"+str(len(trainset_str))+".txt", "rb") as fp: + test_examples_list = pickle.load(fp) + print([example[1] for example in test_examples_list]) + new_ppl = ppl.calc_ppl([example[1] for example in test_examples_list])[0] + orig_ppl = ppl.calc_ppl([example[1] for example in (test_examples_list + skipped_examples_list)] )[0] + #test_examples = [test_examples[i] for i in relevant_idxs] + #if args.type.upper() == 'MFT': labels = [labels[i] for i in relevant_idxs] + #input_examples = [input_examples[i] for i in relevant_idxs] + + + + print(f'{suite.tests[test].name}, {orig_ppl} ,{new_ppl} %') + + + + + + + \ No newline at end of file diff --git a/examples/coverage/sanity_coverage.py b/examples/coverage/sanity_coverage.py new file mode 100644 index 000000000..aa7df251e --- /dev/null +++ b/examples/coverage/sanity_coverage.py @@ -0,0 +1,203 @@ +""" + +python measure_coverage.py --split test --ratio 0.001 --coverage attention --length 3 --prespecify-limits --attack textwordbug --test-ratio 5 + + + +""" + +import torch +import os +import textattack +import copy +import pickle +from textattack.models.tokenizers import AutoTokenizer +from textattack.models.wrappers import HuggingFaceModelWrapper +from textattack.models.wrappers import ModelWrapper +from transformers import AutoModelForSequenceClassification +from textattack.coverage import neuronMultiSectionCoverage +from textattack.augmentation import Augmenter +from textattack.attack_results import SuccessfulAttackResult +from textattack.datasets import HuggingFaceDataset +from textattack.constraints.semantics.sentence_encoders import UniversalSentenceEncoder +from textattack.attack_recipes import TextFoolerJin2019, HotFlipEbrahimi2017, DeepWordBugGao2018, FasterGeneticAlgorithmJia2019, BAEGarg2019 +from math import floor +import random +import numpy as np +import argparse + +def random_seed(seed): + torch.manual_seed(seed) + np.random.seed(seed) + random.seed(seed) + return + +DEFAULT_CONSTRAINTS = [textattack.constraints.pre_transformation.RepeatModification(), textattack.constraints.pre_transformation.StopwordModification()] +available_transformations = [ + textattack.transformations.WordDeletion, + textattack.transformations.RandomSynonymInsertion, + textattack.transformations.WordSwapEmbedding, + textattack.transformations.WordSwapChangeLocation, + textattack.transformations.WordSwapChangeName, + textattack.transformations.WordSwapChangeNumber, + textattack.transformations.WordSwapContract, + textattack.transformations.WordSwapExtend, + textattack.transformations.WordSwapHomoglyphSwap, + textattack.transformations.WordSwapMaskedLM, + textattack.transformations.WordSwapQWERTY, + textattack.transformations.WordSwapNeighboringCharacterSwap, + textattack.transformations.WordSwapRandomCharacterDeletion, + textattack.transformations.WordSwapRandomCharacterInsertion, + textattack.transformations.WordSwapRandomCharacterSubstitution, + textattack.transformations.RandomSwap, + textattack.transformations.WordSwapWordNet + ] +random_seed(1) +parser = argparse.ArgumentParser(description='Measure Coverage of pretrained NLP Models') +parser.add_argument('--seed', type=int, default=1, help='set random seed') +parser.add_argument('--length', type=int, default=128, help='set max seq length') +parser.add_argument('--bins', type=int, default=10, help='set number of bins/sections') +parser.add_argument('--ratio', type=float, default=1, help='proportion of train set used for dataset sampling') +parser.add_argument('--use-threshold', type=float, default=0.6, help='proportion of train set used for dataset sampling') +parser.add_argument('--test-ratio', type=int, default=1.0, help='proportion of train set used for dataset sampling') +parser.add_argument('--pct-words-to-swap', type=int, default=0.1, help='proportion of train set used for dataset sampling') +parser.add_argument('--dataset', type=str, default='imdb', help='dataset to use for measuring coverage') +parser.add_argument('--save-dir', type=str, default='./coverage/', help='dataset to use for measuring coverage') +parser.add_argument('--model', type=str, default='bert-base-uncased', help='model f whose weights to use') +parser.add_argument('--coverage', type=str, default='attention', help='coverage type') +# takes as input a transformation and a constraint +parser.add_argument('--transformation', type=int, default=0, help='transformation type') +parser.add_argument('--constraint', type=str, default='none', help='constraint type') + +parser.add_argument('--split', type=str, default='test', help='split to use for measuring coverage') +parser.add_argument('--base-only', action='store_true', help='loading only base model') +parser.add_argument('--prespecify-limits', action='store_true', help='prespecify') +args = parser.parse_args() +random_seed(args.seed) + + + +if not args.base_only: + if args.dataset == 'sst2': + test_model = 'textattack/' + str(args.model) + '-' + 'SST-2' + elif args.dataset == 'rotten-tomatoes': + test_model = 'textattack/' + str(args.model) + '-' + 'rotten_tomatoes' + else: + test_model = 'textattack/' + str(args.model) + '-' + str(args.dataset) +else: + test_model = args.model +text_key = 'text' +# test_model="textattack/bert-base-uncased-ag-news", +if args.dataset == 'sst2': + text_key = 'sentence' + trainset = HuggingFaceDataset('glue', 'sst2', 'train', shuffle = True) + testset = HuggingFaceDataset('glue', 'sst2', args.split, shuffle = True) +elif args.dataset == 'rotten-tomatoes': + trainset = HuggingFaceDataset('rotten_tomatoes', None, 'train', shuffle = True) + testset = HuggingFaceDataset('rotten_tomatoes', None, args.split, shuffle = True) +else: + trainset = HuggingFaceDataset(args.dataset, None, 'train', shuffle = True) + testset = HuggingFaceDataset(args.dataset, None, args.split, shuffle = True) + + + +if args.ratio <= 1.0: + trainset = trainset[0:floor(args.ratio*len(trainset))] +else: + trainset = trainset[0:floor(args.ratio)] + + +trainset_str = [] +for example in trainset: + + trainset_str.append(example[0][text_key]) + +if args.test_ratio <= 1.0: + testset = testset[0:floor(args.test_ratio*len(testset))] +else: + testset = testset[0:floor(args.test_ratio)] + +testset_str = [] +for example in testset: + testset_str.append(example[0][text_key]) + + +args.save_dir += 'Sanity_COVER_' + args.coverage + '/' +os.makedirs(args.save_dir, exist_ok = True) +args.save_dir += 'SEED_'+str(args.seed) + '_BINS_' + str(args.bins) + '/' +os.makedirs(args.save_dir, exist_ok = True) +args.save_dir += 'data_' + str(args.dataset) + '_model_' + str(args.model) + '_ratio_' + str(args.ratio) + '_test_ratio_' + str(args.test_ratio) +'_L_'+ str(args.length) + '_B_' + str(args.base_only) + '/' + + +os.makedirs(args.save_dir, exist_ok = True) +args.save_dir += 'transformation_' + str(args.transformation) + '_limits_' + str(args.prespecify_limits) +os.makedirs(args.save_dir, exist_ok = True) + +# make coverage object +coverage = neuronMultiSectionCoverage(test_model = test_model, max_seq_len = args.length, k_m = args.bins, coverage = (args.coverage), pre_limits = (not (args.coverage == 'word') and args.prespecify_limits)) + +print('initializing from training data') +coverage.initialize_from_training_dataset(trainset_str) + +print('--'*50) +print('generating test set!') +print('--'*50) +num_successes = 0.0 +total = 1.0 +if args.transformation != -1: + if args.constraint != 'use': + constraints = DEFAULT_CONSTRAINTS + [(UniversalSentenceEncoder(threshold=args.use_threshold))] + else: + constraints = DEFAULT_CONSTRAINTS + augment_using_tf = Augmenter(transformation=available_transformations[args.transformation](),constraints=constraints,pct_words_to_swap=args.pct_words_to_swap,transformations_per_example=1,) + #augment_using_tf.augment()[0] + new_text = [] + for text in new_text: + new_text += augment_using_tf.augment(text )[0] + + + + +augmented_text_file = open(os.path.join(args.save_dir, 'examples.txt'), 'w') +pattern_text_file = open(os.path.join(args.save_dir, 'pattern.txt'), 'w') +for test in testset_str: + augmented_text_file.write(test+'\n') +augmented_text_file.write('\n') +for test in new_text: + augmented_text_file.write(test+'\n') +augmented_text_file.close() + + + +# get pattern independent of the other examples in the test set +# pattern for each example + +for test_example in testset_str: + print(test_example) + temp_coverage = copy.deepcopy(coverage) + coverage_vector = temp_coverage.vector(test_example) + # coverage_vector is a list + del temp_coverage + pattern_text_file.write(' '.join([str(i) for i in coverage_vector])+'\n') + del coverage_vector +pattern_text_file.write('\n') + + +# also get the same for each augmented example +for test_example in new_text: + temp_coverage = copy.deepcopy(coverage) + coverage_vector = temp_coverage.vector(test_example) + # coverage_vector is a list + pattern_text_file.write(' '.join([str(i) for i in coverage_vector])+'\n') + + + del temp_coverage +# save the results too + +pattern_text_file.close() + + + + + + From 00e9bae264dd4304de8efdc07eb314e779401185 Mon Sep 17 00:00:00 2001 From: Yanjun Qi Date: Mon, 18 Sep 2023 10:42:33 -0400 Subject: [PATCH 3/7] add - `back_trans' in the readme and the augumentor recipe documentation --- README.md | 1 + docs/3recipes/augmenter_recipes_cmd.md | 1 + examples/coverage/checklist_airline_tweets.py | 225 ------------------ examples/coverage/measure_coverage.py | 2 +- examples/coverage/measure_perp.py | 178 -------------- examples/coverage/sanity_coverage.py | 203 ---------------- textattack/metrics/metric.py | 2 +- 7 files changed, 4 insertions(+), 608 deletions(-) delete mode 100644 examples/coverage/checklist_airline_tweets.py delete mode 100644 examples/coverage/measure_perp.py delete mode 100644 examples/coverage/sanity_coverage.py diff --git a/README.md b/README.md index 4ef696cdb..c05e20263 100644 --- a/README.md +++ b/README.md @@ -319,6 +319,7 @@ for data augmentation: - `eda` augments text with a combination of word insertions, substitutions and deletions. - `checklist` augments text by contraction/extension and by substituting names, locations, numbers. - `clare` augments text by replacing, inserting, and merging with a pre-trained masked language model. +- `back_trans` augments text by backtranslation approach. #### Augmentation Command-Line Interface diff --git a/docs/3recipes/augmenter_recipes_cmd.md b/docs/3recipes/augmenter_recipes_cmd.md index c1d496143..bde5b3116 100644 --- a/docs/3recipes/augmenter_recipes_cmd.md +++ b/docs/3recipes/augmenter_recipes_cmd.md @@ -18,6 +18,7 @@ for data augmentation: - `eda` augments text with a combination of word insertions, substitutions and deletions. - `checklist` augments text by contraction/extension and by substituting names, locations, numbers. - `clare` augments text by replacing, inserting, and merging with a pre-trained masked language model. +- `back_trans` augments text by backtranslation method. ### Augmentation Command-Line Interface diff --git a/examples/coverage/checklist_airline_tweets.py b/examples/coverage/checklist_airline_tweets.py deleted file mode 100644 index 9da8788b4..000000000 --- a/examples/coverage/checklist_airline_tweets.py +++ /dev/null @@ -1,225 +0,0 @@ - - - -import checklist -import copy -import torch -import random -import numpy as np -import pickle -import wandb - - -from checklist.test_types import MFT, INV, DIR -from checklist.test_suite import TestSuite -from sst_model import * - -from textattack.coverage import neuronMultiSectionCoverage -from textattack.datasets import HuggingFaceDataset -from coverage_args import * -from coverage_utils import * -TYPE_MAP = { - MFT: 'MFT', - INV: 'INV', - DIR: 'DIR', - } - -args = get_args() - -set_seed(args.seed) -wandb.init() -wandb.config.update(args) -wandb.init( - project="coverage", - notes="vanilla coverage only", - tags=["coverage", "bert"], - config=wandb.config, - ) -suite_path_dicts = { - 'sentiment' : 'sentiment/sentiment_suite.pkl', - 'qqp' : 'qqp/qqp_suite.pkl', - 'mc' : 'squad/squad_suite.pkl' -} -suite_path = './CHECKLIST_DATA/release_data/'+ suite_path_dicts[args.suite] - -suite = TestSuite.from_file(suite_path) - -if args.suite == 'sentiment': - # pretrained BERT model on SST-2 - model_name_or_path = 'textattack/'+args.base_model+'-SST-2' - model = SSTModel(model_name_or_path) -elif args.suite == 'qqp': - # pretrained BERT model on QQP - model_name_or_path = 'textattack/'+args.base_model+'-qqp' - model = QQPModel(model_name_or_path) -else: - quit() -threshold = args.threshold - - -coverage = neuronMultiSectionCoverage(test_model = model_name_or_path, max_seq_len = args.max_seq_len, - bins_word = args.bins_word, bins_attention = args.bins_attention, bz = 48, - pre_limits = False, word_mask = True) -print('initializing from training data') -if args.mask: - vocab = [] - vocab_file = open("selected_words.txt", "r") - content_list = vocab_file.readlines() - for a in content_list: - vocab.append(a.strip('\n')) -trainset_masks = [] -if args.suite == 'sentiment': - text_key = 'sentence' - - trainset = HuggingFaceDataset('glue', 'sst2', 'train', shuffle = True) - validset = HuggingFaceDataset('glue', 'sst2', 'validation', shuffle = True) -trainset_str = [] -validset_str = [] - - - -for example in trainset: - current_example = example[0][text_key] - if args.mask: - current_example = [word for word in current_example if word in selected_vocab] - - trainset_str.append(current_example) -for example in validset: - current_example = example[0][text_key] - if args.mask: - current_example = [word for word in current_example if word in selected_vocab] - - validset_str.append(current_example) -#testset = HuggingFaceDataset('glue', 'sst2', args.split, shuffle = False) -if args.debug == 1: - trainset_str = trainset_str[0:1000] - -for example in trainset_str: - - trainset_masks.append([1 for i in range(128)] ) -# initialize coverage from training set - - -save_coverage_init_file = os.path.join( './coverage_results/', args.base_model +'_'+args.suite+'_BW_'+ str(args.bins_word) + \ - '_BA_' + str(args.bins_attention) + '_INIT_' + str(len(trainset_str))+'.pkl') - -if not os.path.exists(save_coverage_init_file): - print('can\'t find!: ', save_coverage_init_file) - coverage.initialize_from_training_dataset(trainset_str, trainset_masks, bz = 128) - initial_coverage = coverage(trainset_str, trainset_masks, bz = 128) - - pickle.dump(coverage, open(save_coverage_init_file, 'wb')) -else: - print('*'*100) - print('exists!' , save_coverage_init_file) - print('*'*100) - coverage = pickle.load(open(save_coverage_init_file, 'rb')) - initial_coverage = coverage._compute_coverage() -for test in suite.tests: - - if not args.specify_test: - args.test_name = suite.tests[test].name - if TYPE_MAP[type(suite.tests[test])] == args.type.upper() and suite.tests[test].name == args.test_name: - if args.query_tests_only: - print(suite.tests[test].name) - continue - - if args.type.upper() == 'MFT' and (suite.tests[test].labels) is None: - continue - input_examples_orig = suite.tests[test].data - input_examples = [] - - test_examples = (suite.tests[test].to_raw_examples()) - #print(test_examples) - shuffled_indices = list(range(len(test_examples))) - if args.query_subset_only: - shuffled_indices = shuffled_indices[0:args.subset] - - #random.shuffle(shuffled_indices) - test_examples = [test_examples[t] for t in shuffled_indices] - test_indices = [suite.tests[test].example_list_and_indices()[1][t] for t in shuffled_indices] - - for initial, final in zip(test_indices, test_examples): - if type(input_examples_orig[initial]) is not list: - input_examples_orig[initial] = [input_examples_orig[initial]] - input_examples.append(input_examples_orig[initial][0]) - - #print('*'*100) - #print([a for a in input_examples]) - #print('*'*100) - labels = suite.tests[test].labels - if args.type.upper() == 'MFT' and type(suite.tests[test].labels) is not list: - labels = [labels]*len(test_examples) - - # coverage filtering - original_number_test_examples = len(test_examples) - if not args.baseline: - relevant_idxs, test_examples_list, skipped_examples_list = filter_using_coverage(coverage, initial_coverage, test_examples, threshold) - else: - relevant_idxs, test_examples_list, skipped_examples_list = [i for i in range(len(test_examples))], test_examples, [] - - with open(args.save_str + suite.tests[test].name +"_skipped_examples_"+str(len(trainset_str))+".txt", "wb") as fp: - pickle.dump(skipped_examples_list, fp) - with open(args.save_str + suite.tests[test].name +"_selected_examples_"+str(len(trainset_str))+".txt", "wb") as fp: - pickle.dump(test_examples_list, fp) - - test_examples = [test_examples[i] for i in relevant_idxs] - if args.type.upper() == 'MFT': labels = [labels[i] for i in relevant_idxs] - input_examples = [input_examples[i] for i in relevant_idxs] - - predictions_before_tx, predictions_prob_before_tx, predictions_after_tx, predictions_prob_after_tx \ - = get_predictions_after_tx(model, input_examples, test_examples) - if args.type.upper() == 'INV': - # to be fixed - failures = 0 - for p,t, pp, tp in zip(predictions_after_tx, predictions_before_tx, predictions_prob_after_tx, predictions_prob_before_tx): - if pp> (1.0/3) and pp<(2.0/3) and p == 1: - labelx = 1 - elif pp> (1.0/3) and pp<(2.0/3) and p == 0: - labelx = 0 - elif p == 1: - labelx = 2 - else: - labelx = 0 - if tp> (1.0/3) and tp<(2.0/3) and t == 1: - labelxt = 1 - elif tp> (1.0/3) and tp<(2.0/3) and t == 0: - labelxt = 0 - elif t == 1: - labelxt = 2 - else: - labelxt = 0 - if labelx != labelxt: - failures += 1 - else: - if abs(pp - tp) > 0.1: - failures += 1 - - failure_rate = failures/len(predictions_before_tx) - if args.type.upper() == 'MFT': - hard_predictions = [] - for p,l in zip(predictions_prob_after_tx, predictions_after_tx): - if p> (1.0/3) and p<(2.0/3) and l == 1: - hard_predictions.append(1) - elif p> (1.0/3) and p<(2.0/3) and l == 0: - hard_predictions.append(1) - elif l == 1: - hard_predictions.append(2) - else: - hard_predictions.append(0) - if len(labels) !=0 : - failure_rate = sum([p!=t for p,t in zip(hard_predictions, labels)])/len(labels) - else: - failure_rate = -1.0 - - - print(f'{suite.tests[test].name}, {len(predictions_before_tx)} ,{failure_rate*100.0} %') - with open(args.save_str + suite.tests[test].name +"_failure_rate_"+str(len(trainset_str))+".txt", "w") as f: - f.write(suite.tests[test].name+','+str(len(predictions_before_tx)) +','+str((original_number_test_examples)) + ','+ str(failure_rate*100.0)+'\n') - - - - - - - \ No newline at end of file diff --git a/examples/coverage/measure_coverage.py b/examples/coverage/measure_coverage.py index e682098a2..39813f083 100644 --- a/examples/coverage/measure_coverage.py +++ b/examples/coverage/measure_coverage.py @@ -1,6 +1,6 @@ """ -python measure_coverage.py --split test --ratio 1.0 --coverage attention --length 128 --prespecify-limits --attack textfooler --test-ratio 1000 --save-dir ./coverage_workshop/ --seed 1 --dataset sst2 +python measure_coverage.py --split test --ratio 1.0 --coverage attention --length 128 --prespecify-limits --attack textfooler --test-ratio 1000 --save-dir ./coverage_wp/ --seed 1 --dataset sst2 """ diff --git a/examples/coverage/measure_perp.py b/examples/coverage/measure_perp.py deleted file mode 100644 index b5f82f6a9..000000000 --- a/examples/coverage/measure_perp.py +++ /dev/null @@ -1,178 +0,0 @@ - - - -import checklist -import copy -import torch -import random -import numpy as np -import pickle -import wandb - - -from checklist.test_types import MFT, INV, DIR -from checklist.test_suite import TestSuite -from sst_model import * - -from textattack.coverage import neuronMultiSectionCoverage -from textattack.datasets import HuggingFaceDataset -from textattack.metrics import * -from coverage_args import * -from repr_utils import * -from coverage_utils import * -TYPE_MAP = { - MFT: 'MFT', - INV: 'INV', - DIR: 'DIR', - } - -args = get_args() - -set_seed(args.seed) -wandb.init() -wandb.config.update(args) -wandb.init( - project="coverage", - notes="vanilla coverage only", - tags=["coverage", "bert"], - config=wandb.config, - ) -suite_path_dicts = { - 'sentiment' : 'sentiment/sentiment_suite.pkl', - 'qqp' : 'qqp/qqp_suite.pkl', - 'mc' : 'squad/squad_suite.pkl' -} -suite_path = './CHECKLIST_DATA/release_data/'+ suite_path_dicts[args.suite] - -suite = TestSuite.from_file(suite_path) - -if args.suite == 'sentiment': - # pretrained BERT model on SST-2 - model_name_or_path = 'textattack/'+args.base_model+'-SST-2' - model = SSTModel(model_name_or_path) -elif args.suite == 'qqp': - # pretrained BERT model on QQP - model_name_or_path = 'textattack/'+args.base_model+'-qqp' - model = QQPModel(model_name_or_path) -else: - quit() -threshold = args.threshold -ppl = Perplexity() - -coverage = neuronMultiSectionCoverage(test_model = model_name_or_path, max_seq_len = args.max_seq_len, - bins_word = args.bins_word, bins_attention = args.bins_attention, bz = 48, - pre_limits = False, word_mask = True) -print('initializing from training data') -if args.mask: - vocab = [] - vocab_file = open("selected_words.txt", "r") - content_list = vocab_file.readlines() - for a in content_list: - vocab.append(a.strip('\n')) -trainset_masks = [] -if args.suite == 'sentiment': - text_key = 'sentence' - - trainset = HuggingFaceDataset('glue', 'sst2', 'train', shuffle = True) - validset = HuggingFaceDataset('glue', 'sst2', 'validation', shuffle = True) -trainset_str = [] -validset_str = [] - - - -for example in trainset: - current_example = example[0][text_key] - if args.mask: - current_example = [word for word in current_example if word in selected_vocab] - - trainset_str.append(current_example) -for example in validset: - current_example = example[0][text_key] - if args.mask: - current_example = [word for word in current_example if word in selected_vocab] - - validset_str.append(current_example) -#testset = HuggingFaceDataset('glue', 'sst2', args.split, shuffle = False) -if args.debug == 1: - trainset_str = trainset_str[0:1000] - -for example in trainset_str: - - trainset_masks.append([1 for i in range(128)] ) -# initialize coverage from training set - - -save_coverage_init_file = os.path.join( './coverage_results/', args.base_model +'_'+args.suite+'_BW_'+ str(args.bins_word) + \ - '_BA_' + str(args.bins_attention) + '_INIT_' + str(len(trainset_str))+'.pkl') - -if not os.path.exists(save_coverage_init_file): - print('can\'t find!: ', save_coverage_init_file) - coverage.initialize_from_training_dataset(trainset_str, trainset_masks, bz = 128) - initial_coverage = coverage(trainset_str, trainset_masks, bz = 128) - - pickle.dump(coverage, open(save_coverage_init_file, 'wb')) -else: - print('*'*100) - print('exists!' , save_coverage_init_file) - print('*'*100) - coverage = pickle.load(open(save_coverage_init_file, 'rb')) - initial_coverage = coverage._compute_coverage() -for test in suite.tests: - - if not args.specify_test: - args.test_name = suite.tests[test].name - if TYPE_MAP[type(suite.tests[test])] == args.type.upper() and suite.tests[test].name == args.test_name: - if args.query_tests_only: - print(suite.tests[test].name) - continue - - if args.type.upper() == 'MFT' and (suite.tests[test].labels) is None: - continue - input_examples_orig = suite.tests[test].data - input_examples = [] - - test_examples = (suite.tests[test].to_raw_examples()) - #print(test_examples) - shuffled_indices = list(range(len(test_examples))) - if args.query_subset_only: - shuffled_indices = shuffled_indices[0:args.subset] - - #random.shuffle(shuffled_indices) - test_examples = [test_examples[t] for t in shuffled_indices] - test_indices = [suite.tests[test].example_list_and_indices()[1][t] for t in shuffled_indices] - - for initial, final in zip(test_indices, test_examples): - if type(input_examples_orig[initial]) is not list: - input_examples_orig[initial] = [input_examples_orig[initial]] - input_examples.append(input_examples_orig[initial][0]) - - #print('*'*100) - #print([a for a in input_examples]) - #print('*'*100) - labels = suite.tests[test].labels - if args.type.upper() == 'MFT' and type(suite.tests[test].labels) is not list: - labels = [labels]*len(test_examples) - - # coverage filtering - original_number_test_examples = len(test_examples) - with open(args.save_str + suite.tests[test].name +"_skipped_examples_"+str(len(trainset_str))+".txt", "rb") as fp: - skipped_examples_list = pickle.load(fp) - with open(args.save_str + suite.tests[test].name +"_selected_examples_"+str(len(trainset_str))+".txt", "rb") as fp: - test_examples_list = pickle.load(fp) - print([example[1] for example in test_examples_list]) - new_ppl = ppl.calc_ppl([example[1] for example in test_examples_list])[0] - orig_ppl = ppl.calc_ppl([example[1] for example in (test_examples_list + skipped_examples_list)] )[0] - #test_examples = [test_examples[i] for i in relevant_idxs] - #if args.type.upper() == 'MFT': labels = [labels[i] for i in relevant_idxs] - #input_examples = [input_examples[i] for i in relevant_idxs] - - - - print(f'{suite.tests[test].name}, {orig_ppl} ,{new_ppl} %') - - - - - - - \ No newline at end of file diff --git a/examples/coverage/sanity_coverage.py b/examples/coverage/sanity_coverage.py deleted file mode 100644 index aa7df251e..000000000 --- a/examples/coverage/sanity_coverage.py +++ /dev/null @@ -1,203 +0,0 @@ -""" - -python measure_coverage.py --split test --ratio 0.001 --coverage attention --length 3 --prespecify-limits --attack textwordbug --test-ratio 5 - - - -""" - -import torch -import os -import textattack -import copy -import pickle -from textattack.models.tokenizers import AutoTokenizer -from textattack.models.wrappers import HuggingFaceModelWrapper -from textattack.models.wrappers import ModelWrapper -from transformers import AutoModelForSequenceClassification -from textattack.coverage import neuronMultiSectionCoverage -from textattack.augmentation import Augmenter -from textattack.attack_results import SuccessfulAttackResult -from textattack.datasets import HuggingFaceDataset -from textattack.constraints.semantics.sentence_encoders import UniversalSentenceEncoder -from textattack.attack_recipes import TextFoolerJin2019, HotFlipEbrahimi2017, DeepWordBugGao2018, FasterGeneticAlgorithmJia2019, BAEGarg2019 -from math import floor -import random -import numpy as np -import argparse - -def random_seed(seed): - torch.manual_seed(seed) - np.random.seed(seed) - random.seed(seed) - return - -DEFAULT_CONSTRAINTS = [textattack.constraints.pre_transformation.RepeatModification(), textattack.constraints.pre_transformation.StopwordModification()] -available_transformations = [ - textattack.transformations.WordDeletion, - textattack.transformations.RandomSynonymInsertion, - textattack.transformations.WordSwapEmbedding, - textattack.transformations.WordSwapChangeLocation, - textattack.transformations.WordSwapChangeName, - textattack.transformations.WordSwapChangeNumber, - textattack.transformations.WordSwapContract, - textattack.transformations.WordSwapExtend, - textattack.transformations.WordSwapHomoglyphSwap, - textattack.transformations.WordSwapMaskedLM, - textattack.transformations.WordSwapQWERTY, - textattack.transformations.WordSwapNeighboringCharacterSwap, - textattack.transformations.WordSwapRandomCharacterDeletion, - textattack.transformations.WordSwapRandomCharacterInsertion, - textattack.transformations.WordSwapRandomCharacterSubstitution, - textattack.transformations.RandomSwap, - textattack.transformations.WordSwapWordNet - ] -random_seed(1) -parser = argparse.ArgumentParser(description='Measure Coverage of pretrained NLP Models') -parser.add_argument('--seed', type=int, default=1, help='set random seed') -parser.add_argument('--length', type=int, default=128, help='set max seq length') -parser.add_argument('--bins', type=int, default=10, help='set number of bins/sections') -parser.add_argument('--ratio', type=float, default=1, help='proportion of train set used for dataset sampling') -parser.add_argument('--use-threshold', type=float, default=0.6, help='proportion of train set used for dataset sampling') -parser.add_argument('--test-ratio', type=int, default=1.0, help='proportion of train set used for dataset sampling') -parser.add_argument('--pct-words-to-swap', type=int, default=0.1, help='proportion of train set used for dataset sampling') -parser.add_argument('--dataset', type=str, default='imdb', help='dataset to use for measuring coverage') -parser.add_argument('--save-dir', type=str, default='./coverage/', help='dataset to use for measuring coverage') -parser.add_argument('--model', type=str, default='bert-base-uncased', help='model f whose weights to use') -parser.add_argument('--coverage', type=str, default='attention', help='coverage type') -# takes as input a transformation and a constraint -parser.add_argument('--transformation', type=int, default=0, help='transformation type') -parser.add_argument('--constraint', type=str, default='none', help='constraint type') - -parser.add_argument('--split', type=str, default='test', help='split to use for measuring coverage') -parser.add_argument('--base-only', action='store_true', help='loading only base model') -parser.add_argument('--prespecify-limits', action='store_true', help='prespecify') -args = parser.parse_args() -random_seed(args.seed) - - - -if not args.base_only: - if args.dataset == 'sst2': - test_model = 'textattack/' + str(args.model) + '-' + 'SST-2' - elif args.dataset == 'rotten-tomatoes': - test_model = 'textattack/' + str(args.model) + '-' + 'rotten_tomatoes' - else: - test_model = 'textattack/' + str(args.model) + '-' + str(args.dataset) -else: - test_model = args.model -text_key = 'text' -# test_model="textattack/bert-base-uncased-ag-news", -if args.dataset == 'sst2': - text_key = 'sentence' - trainset = HuggingFaceDataset('glue', 'sst2', 'train', shuffle = True) - testset = HuggingFaceDataset('glue', 'sst2', args.split, shuffle = True) -elif args.dataset == 'rotten-tomatoes': - trainset = HuggingFaceDataset('rotten_tomatoes', None, 'train', shuffle = True) - testset = HuggingFaceDataset('rotten_tomatoes', None, args.split, shuffle = True) -else: - trainset = HuggingFaceDataset(args.dataset, None, 'train', shuffle = True) - testset = HuggingFaceDataset(args.dataset, None, args.split, shuffle = True) - - - -if args.ratio <= 1.0: - trainset = trainset[0:floor(args.ratio*len(trainset))] -else: - trainset = trainset[0:floor(args.ratio)] - - -trainset_str = [] -for example in trainset: - - trainset_str.append(example[0][text_key]) - -if args.test_ratio <= 1.0: - testset = testset[0:floor(args.test_ratio*len(testset))] -else: - testset = testset[0:floor(args.test_ratio)] - -testset_str = [] -for example in testset: - testset_str.append(example[0][text_key]) - - -args.save_dir += 'Sanity_COVER_' + args.coverage + '/' -os.makedirs(args.save_dir, exist_ok = True) -args.save_dir += 'SEED_'+str(args.seed) + '_BINS_' + str(args.bins) + '/' -os.makedirs(args.save_dir, exist_ok = True) -args.save_dir += 'data_' + str(args.dataset) + '_model_' + str(args.model) + '_ratio_' + str(args.ratio) + '_test_ratio_' + str(args.test_ratio) +'_L_'+ str(args.length) + '_B_' + str(args.base_only) + '/' - - -os.makedirs(args.save_dir, exist_ok = True) -args.save_dir += 'transformation_' + str(args.transformation) + '_limits_' + str(args.prespecify_limits) -os.makedirs(args.save_dir, exist_ok = True) - -# make coverage object -coverage = neuronMultiSectionCoverage(test_model = test_model, max_seq_len = args.length, k_m = args.bins, coverage = (args.coverage), pre_limits = (not (args.coverage == 'word') and args.prespecify_limits)) - -print('initializing from training data') -coverage.initialize_from_training_dataset(trainset_str) - -print('--'*50) -print('generating test set!') -print('--'*50) -num_successes = 0.0 -total = 1.0 -if args.transformation != -1: - if args.constraint != 'use': - constraints = DEFAULT_CONSTRAINTS + [(UniversalSentenceEncoder(threshold=args.use_threshold))] - else: - constraints = DEFAULT_CONSTRAINTS - augment_using_tf = Augmenter(transformation=available_transformations[args.transformation](),constraints=constraints,pct_words_to_swap=args.pct_words_to_swap,transformations_per_example=1,) - #augment_using_tf.augment()[0] - new_text = [] - for text in new_text: - new_text += augment_using_tf.augment(text )[0] - - - - -augmented_text_file = open(os.path.join(args.save_dir, 'examples.txt'), 'w') -pattern_text_file = open(os.path.join(args.save_dir, 'pattern.txt'), 'w') -for test in testset_str: - augmented_text_file.write(test+'\n') -augmented_text_file.write('\n') -for test in new_text: - augmented_text_file.write(test+'\n') -augmented_text_file.close() - - - -# get pattern independent of the other examples in the test set -# pattern for each example - -for test_example in testset_str: - print(test_example) - temp_coverage = copy.deepcopy(coverage) - coverage_vector = temp_coverage.vector(test_example) - # coverage_vector is a list - del temp_coverage - pattern_text_file.write(' '.join([str(i) for i in coverage_vector])+'\n') - del coverage_vector -pattern_text_file.write('\n') - - -# also get the same for each augmented example -for test_example in new_text: - temp_coverage = copy.deepcopy(coverage) - coverage_vector = temp_coverage.vector(test_example) - # coverage_vector is a list - pattern_text_file.write(' '.join([str(i) for i in coverage_vector])+'\n') - - - del temp_coverage -# save the results too - -pattern_text_file.close() - - - - - - diff --git a/textattack/metrics/metric.py b/textattack/metrics/metric.py index 015046c62..3e221dc2f 100644 --- a/textattack/metrics/metric.py +++ b/textattack/metrics/metric.py @@ -8,7 +8,7 @@ class Metric(ABC): - """A metric for evaluating Adversarial Attack candidates.""" + """A metric for evaluating results and data quality.""" @abstractmethod def __init__(self, **kwargs): From 884144ff791d3b6d766c5200862334e025c9643a Mon Sep 17 00:00:00 2001 From: Yanjun Qi Date: Mon, 18 Sep 2023 11:14:49 -0400 Subject: [PATCH 4/7] adding advancedMetric recipe --- textattack/__init__.py | 2 ++ textattack/metrics/recipe.py | 25 +++++++++++++++++-------- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/textattack/__init__.py b/textattack/__init__.py index a169173eb..2223ba626 100644 --- a/textattack/__init__.py +++ b/textattack/__init__.py @@ -17,6 +17,7 @@ from .attacker import Attacker from .trainer import Trainer from .metrics import Metric +from .coverage import Coverage from . import ( attack_recipes, @@ -29,6 +30,7 @@ goal_functions, loggers, metrics, + coverage, models, search_methods, shared, diff --git a/textattack/metrics/recipe.py b/textattack/metrics/recipe.py index 0ff7c18c2..39df568b3 100644 --- a/textattack/metrics/recipe.py +++ b/textattack/metrics/recipe.py @@ -7,14 +7,23 @@ from . import metric -class AdvancedAttackMetric(results): +class AdvancedAttackMetric(Metric): """Calculate a suite of advanced metrics to evaluate attackResults' quality """ - def __init__(self, results, **kwargs): - perplexity_stats = Perplexity().calculate(results) - use_stats = USEMetric().calculate(results) - bert_score = BERTScoreMetric().calculate(results) - meteor_score = MeteorMetric().calculate(results) - sbert_score = SBERTMetric().calculate(results) - return perplexity_stats, use_stats, bert_score, meteor_score, sbert_score \ No newline at end of file + def __init__(self, choices=['use']): + self.achoices = choices + + def calculate(self, results): + advanced_metrics = {} + if 'use' in self.achoices: + advanced_metrics['use_stats'] = USEMetric().calculate(results) + if 'perplexity' in self.achoices: + advanced_metrics['perplexity_stats'] = Perplexity().calculate(results) + if 'bert_score' in self.achoices: + advanced_metrics['bert_score'] = BERTScoreMetric().calculate(results) + if 'meteor_score' in self.achoices: + advanced_metrics['meteor_score'] = MeteorMetric().calculate(results) + if 'sbert_score' in self.achoices: + advanced_metrics['sbert_score'] = SBERTMetric().calculate(results) + return advanced_metrics \ No newline at end of file From 2d1f8d80b6b4fe13c652baf2d432149f6da2610d Mon Sep 17 00:00:00 2001 From: Yanjun Qi Date: Mon, 18 Sep 2023 11:20:40 -0400 Subject: [PATCH 5/7] format change with black --- examples/coverage/measure_coverage.py | 288 ++++--- textattack/coverage/kmn_coverage.py | 1022 +++++++++++++----------- textattack/coverage/neuron_coverage.py | 20 +- textattack/metrics/recipe.py | 29 +- 4 files changed, 761 insertions(+), 598 deletions(-) diff --git a/examples/coverage/measure_coverage.py b/examples/coverage/measure_coverage.py index 39813f083..a7991c6de 100644 --- a/examples/coverage/measure_coverage.py +++ b/examples/coverage/measure_coverage.py @@ -16,155 +16,223 @@ from textattack.coverage import neuronMultiSectionCoverage from textattack.attack_results import SuccessfulAttackResult from textattack.datasets import HuggingFaceDataset -from textattack.attack_recipes import TextFoolerJin2019, HotFlipEbrahimi2017, DeepWordBugGao2018, FasterGeneticAlgorithmJia2019, BAEGarg2019 +from textattack.attack_recipes import ( + TextFoolerJin2019, + HotFlipEbrahimi2017, + DeepWordBugGao2018, + FasterGeneticAlgorithmJia2019, + BAEGarg2019, +) from math import floor import random import numpy as np import argparse -def random_seed(seed): - torch.manual_seed(seed) - np.random.seed(seed) - random.seed(seed) - return +def random_seed(seed): + torch.manual_seed(seed) + np.random.seed(seed) + random.seed(seed) + return random_seed(1) -parser = argparse.ArgumentParser(description='Measure Coverage of pretrained NLP Models') -parser.add_argument('--seed', type=int, default=1, help='set random seed') -parser.add_argument('--length', type=int, default=128, help='set max seq length') -parser.add_argument('--bins', type=int, default=10, help='set number of bins/sections') -parser.add_argument('--ratio', type=float, default=1, help='proportion of train set used for dataset sampling') -parser.add_argument('--test-ratio', type=int, default=1.0, help='proportion of train set used for dataset sampling') -parser.add_argument('--dataset', type=str, default='imdb', help='dataset to use for measuring coverage') -parser.add_argument('--save-dir', type=str, default='./coverage/', help='dataset to use for measuring coverage') -parser.add_argument('--model', type=str, default='bert-base-uncased', help='model f whose weights to use') -parser.add_argument('--coverage', type=str, default='attention', help='coverage type') -parser.add_argument('--attack', type=str, default='none', help='attack type') -parser.add_argument('--split', type=str, default='test', help='split to use for measuring coverage') -parser.add_argument('--base-only', action='store_true', help='loading only base model') -parser.add_argument('--prespecify-limits', action='store_true', help='prespecify') +parser = argparse.ArgumentParser( + description="Measure Coverage of pretrained NLP Models" +) +parser.add_argument("--seed", type=int, default=1, help="set random seed") +parser.add_argument("--length", type=int, default=128, help="set max seq length") +parser.add_argument("--bins", type=int, default=10, help="set number of bins/sections") +parser.add_argument( + "--ratio", + type=float, + default=1, + help="proportion of train set used for dataset sampling", +) +parser.add_argument( + "--test-ratio", + type=int, + default=1.0, + help="proportion of train set used for dataset sampling", +) +parser.add_argument( + "--dataset", type=str, default="imdb", help="dataset to use for measuring coverage" +) +parser.add_argument( + "--save-dir", + type=str, + default="./coverage/", + help="dataset to use for measuring coverage", +) +parser.add_argument( + "--model", + type=str, + default="bert-base-uncased", + help="model f whose weights to use", +) +parser.add_argument("--coverage", type=str, default="attention", help="coverage type") +parser.add_argument("--attack", type=str, default="none", help="attack type") +parser.add_argument( + "--split", type=str, default="test", help="split to use for measuring coverage" +) +parser.add_argument("--base-only", action="store_true", help="loading only base model") +parser.add_argument("--prespecify-limits", action="store_true", help="prespecify") args = parser.parse_args() random_seed(args.seed) - if not args.base_only: - if args.dataset == 'sst2': - test_model = 'textattack/' + str(args.model) + '-' + 'SST-2' - elif args.dataset == 'rotten-tomatoes': - test_model = 'textattack/' + str(args.model) + '-' + 'rotten_tomatoes' - else: - test_model = 'textattack/' + str(args.model) + '-' + str(args.dataset) + if args.dataset == "sst2": + test_model = "textattack/" + str(args.model) + "-" + "SST-2" + elif args.dataset == "rotten-tomatoes": + test_model = "textattack/" + str(args.model) + "-" + "rotten_tomatoes" + else: + test_model = "textattack/" + str(args.model) + "-" + str(args.dataset) else: - test_model = args.model -text_key = 'text' -# test_model="textattack/bert-base-uncased-ag-news", -if args.dataset == 'sst2': - text_key = 'sentence' - trainset = HuggingFaceDataset('glue', 'sst2', 'train', shuffle = True) - testset = HuggingFaceDataset('glue', 'sst2', args.split, shuffle = True) -elif args.dataset == 'rotten-tomatoes': - trainset = HuggingFaceDataset('rotten_tomatoes', None, 'train', shuffle = True) - testset = HuggingFaceDataset('rotten_tomatoes', None, args.split, shuffle = True) + test_model = args.model +text_key = "text" +# test_model="textattack/bert-base-uncased-ag-news", +if args.dataset == "sst2": + text_key = "sentence" + trainset = HuggingFaceDataset("glue", "sst2", "train", shuffle=True) + testset = HuggingFaceDataset("glue", "sst2", args.split, shuffle=True) +elif args.dataset == "rotten-tomatoes": + trainset = HuggingFaceDataset("rotten_tomatoes", None, "train", shuffle=True) + testset = HuggingFaceDataset("rotten_tomatoes", None, args.split, shuffle=True) else: - trainset = HuggingFaceDataset(args.dataset, None, 'train', shuffle = True) - testset = HuggingFaceDataset(args.dataset, None, args.split, shuffle = True) - + trainset = HuggingFaceDataset(args.dataset, None, "train", shuffle=True) + testset = HuggingFaceDataset(args.dataset, None, args.split, shuffle=True) if args.ratio <= 1.0: - trainset = trainset[0:floor(args.ratio*len(trainset))] + trainset = trainset[0 : floor(args.ratio * len(trainset))] else: - trainset = trainset[0:floor(args.ratio)] + trainset = trainset[0 : floor(args.ratio)] trainset_str = [] for example in trainset: - - trainset_str.append(example[0][text_key]) + trainset_str.append(example[0][text_key]) if args.test_ratio <= 1.0: - testset = testset[0:floor(args.test_ratio*len(testset))] + testset = testset[0 : floor(args.test_ratio * len(testset))] else: - testset = testset[0:floor(args.test_ratio)] + testset = testset[0 : floor(args.test_ratio)] testset_str = [] for example in testset: - testset_str.append(example[0][text_key]) - - -args.save_dir += 'COVER_' + args.coverage + '/' -os.makedirs(args.save_dir, exist_ok = True) -args.save_dir += 'SEED_'+str(args.seed) + '_BINS_' + str(args.bins) + '/' -os.makedirs(args.save_dir, exist_ok = True) -args.save_dir += 'data_' + str(args.dataset) + '_model_' + str(args.model) + '_ratio_' + str(args.ratio) + '_test_ratio_' + str(args.test_ratio) +'_L_'+ str(args.length) + '_B_' + str(args.base_only) + '/' - - -os.makedirs(args.save_dir, exist_ok = True) -args.save_dir += 'Attack_' + args.attack + '_limits_' + str(args.prespecify_limits) -os.makedirs(args.save_dir, exist_ok = True) + testset_str.append(example[0][text_key]) + + +args.save_dir += "COVER_" + args.coverage + "/" +os.makedirs(args.save_dir, exist_ok=True) +args.save_dir += "SEED_" + str(args.seed) + "_BINS_" + str(args.bins) + "/" +os.makedirs(args.save_dir, exist_ok=True) +args.save_dir += ( + "data_" + + str(args.dataset) + + "_model_" + + str(args.model) + + "_ratio_" + + str(args.ratio) + + "_test_ratio_" + + str(args.test_ratio) + + "_L_" + + str(args.length) + + "_B_" + + str(args.base_only) + + "/" +) + + +os.makedirs(args.save_dir, exist_ok=True) +args.save_dir += "Attack_" + args.attack + "_limits_" + str(args.prespecify_limits) +os.makedirs(args.save_dir, exist_ok=True) # make coverage object -coverage = neuronMultiSectionCoverage(test_model = test_model, max_seq_len = args.length, k_m = args.bins, coverage = (args.coverage), pre_limits = (not (args.coverage == 'word') and args.prespecify_limits)) -print('initializing from training data') +coverage = neuronMultiSectionCoverage( + test_model=test_model, + max_seq_len=args.length, + k_m=args.bins, + coverage=(args.coverage), + pre_limits=(not (args.coverage == "word") and args.prespecify_limits), +) +print("initializing from training data") coverage.initialize_from_training_dataset(trainset_str) -print('--'*50) -print('generating test set!') -print('--'*50) +print("--" * 50) +print("generating test set!") +print("--" * 50) num_successes = 0.0 total = 1.0 -if args.attack != 'none': - original_model = AutoModelForSequenceClassification.from_pretrained(test_model) - original_tokenizer = AutoTokenizer(test_model) - model = HuggingFaceModelWrapper(original_model,original_tokenizer) - if args.attack == 'textfooler': - attack = TextFoolerJin2019.build(model) - elif args.attack == 'alzantot': - attack = FasterGeneticAlgorithmJia2019.build(model) - elif args.attack == 'bae': - attack = BAEGarg2019.build(model) - elif args.attack == 'deepwordbug': - attack = DeepWordBugGao2018.build(model) - elif args.attack == 'hotflip': - attack = HotFlipEbrahimi2017.build(model) - else: - print('This Attack has not been added!') - raise NotImplementedError - results_iterable = attack.attack_dataset(testset, indices=None) - # save the results too - results_iterable = [result for result in results_iterable] - total = len(results_iterable) - pickle.dump( results_iterable , open( os.path.join(args.save_dir, "attack_results"), "wb" ) ) - for n,result in enumerate(results_iterable): - print('---original: \n', result.original_text()) - print('---perturbed: \n', result.perturbed_text()) - testset_str.append(result.perturbed_text()) - if isinstance(result, SuccessfulAttackResult): - num_successes += 1 - - -print('=+'*20) -print('successes: ', num_successes, 'total: ', total) -print('rate: ', num_successes / total) -print('--'*50) -print('length of generated test set: ', len(testset_str)) -print('--'*50) - +if args.attack != "none": + original_model = AutoModelForSequenceClassification.from_pretrained(test_model) + original_tokenizer = AutoTokenizer(test_model) + model = HuggingFaceModelWrapper(original_model, original_tokenizer) + if args.attack == "textfooler": + attack = TextFoolerJin2019.build(model) + elif args.attack == "alzantot": + attack = FasterGeneticAlgorithmJia2019.build(model) + elif args.attack == "bae": + attack = BAEGarg2019.build(model) + elif args.attack == "deepwordbug": + attack = DeepWordBugGao2018.build(model) + elif args.attack == "hotflip": + attack = HotFlipEbrahimi2017.build(model) + else: + print("This Attack has not been added!") + raise NotImplementedError + results_iterable = attack.attack_dataset(testset, indices=None) + # save the results too + results_iterable = [result for result in results_iterable] + total = len(results_iterable) + pickle.dump( + results_iterable, open(os.path.join(args.save_dir, "attack_results"), "wb") + ) + for n, result in enumerate(results_iterable): + print("---original: \n", result.original_text()) + print("---perturbed: \n", result.perturbed_text()) + testset_str.append(result.perturbed_text()) + if isinstance(result, SuccessfulAttackResult): + num_successes += 1 + + +print("=+" * 20) +print("successes: ", num_successes, "total: ", total) +print("rate: ", num_successes / total) +print("--" * 50) +print("length of generated test set: ", len(testset_str)) +print("--" * 50) word_coverage = coverage(testset_str) -print('the coverage: ', word_coverage) - -results_file = open(os.path.join(args.save_dir, 'stats.txt'), 'w') -results_file.write('dataset, model, ratio, length, attack, limits, coverage, num_examples, num_test_examples, seed, split, coverage, num_successes, total\n') -results_file.write(','.join([args.dataset, test_model, str(args.ratio), str(args.test_ratio), str(args.length), args.attack, str(args.prespecify_limits), str(args.coverage), str(len(trainset_str)),str(len(testset_str)), str(args.seed), args.split, str(word_coverage) , str(num_successes), str(total)+'\n'])) +print("the coverage: ", word_coverage) + +results_file = open(os.path.join(args.save_dir, "stats.txt"), "w") +results_file.write( + "dataset, model, ratio, length, attack, limits, coverage, num_examples, num_test_examples, seed, split, coverage, num_successes, total\n" +) +results_file.write( + ",".join( + [ + args.dataset, + test_model, + str(args.ratio), + str(args.test_ratio), + str(args.length), + args.attack, + str(args.prespecify_limits), + str(args.coverage), + str(len(trainset_str)), + str(len(testset_str)), + str(args.seed), + args.split, + str(word_coverage), + str(num_successes), + str(total) + "\n", + ] + ) +) results_file.close() - - - - diff --git a/textattack/coverage/kmn_coverage.py b/textattack/coverage/kmn_coverage.py index 0a836208e..d3b6fdc80 100644 --- a/textattack/coverage/kmn_coverage.py +++ b/textattack/coverage/kmn_coverage.py @@ -1,16 +1,19 @@ +from collections import defaultdict +import copy +import itertools import logging +import time +import numpy as np import torch -import transformers +import torch.nn.functional as F from tqdm import tqdm -import itertools -import copy -import numpy as np +import transformers + import textattack -from collections import defaultdict + from .coverage import ExtrinsicCoverage -import torch.nn.functional as F -import time + logging.getLogger("transformers.tokenization_utils_base").setLevel(logging.ERROR) @@ -18,149 +21,161 @@ class neuronMultiSectionCoverage(ExtrinsicCoverage): - """ - ``neuronMultiSectionCoverage`` measures the neuron coverage acheived by a testset - Args: - test_model(Union[str, torch.nn.Module]): name of the pretrained language model from `transformers` - or the actual test model as a `torch.nn.Module` class. Default is "bert base uncased" from `transformers`. - tokenizer (:obj:``, optional): If `test_model` is not a pretrained model from `transformers, need to provide - the tokenizer here. - max_seq_len (int): Maximum sequence length accepted by the model to be tested. However, if you are using a pretrained model from `transformers`, this is handled - automatically using information from `model.config`. - threshold(float): threshold for marking a neuron as activated - coverage(str): measure type of neuron coverage at the level of layer outputs - """ - - def __init__( - self, - test_model="textattack/bert-base-uncased-ag-news", - tokenizer=None, - max_seq_len=-1, - threshold=0.0, - num_labels = 2, - coverage = 'multisection', - pre_limits = False, - bins_attention =4, - bins_word = 4, - min_value=np.inf, - max_value=-np.inf, - bz = 128, - word_mask = False, - ): - - self.coverage = coverage - - self.word_mask = word_mask - self.pre_limits = pre_limits - self.bins_attention = bins_attention - self.bins_word = bins_word # number of sections for each neuron - self.max_seq_len = 128 - self.model_type = 'bert' - - config = transformers.AutoConfig.from_pretrained( - test_model, output_hidden_states=True, num_labels = num_labels - ) - if config.model_type in COVERAGE_MODEL_TYPES: - self.test_model = ( - transformers.AutoModelForSequenceClassification.from_pretrained( - test_model, config=config - ) - ) - self.test_model.tokenizer = transformers.AutoTokenizer.from_pretrained( - test_model - ) - self.model_type = self.test_model.config.model_type - self.max_seq_len = ( - max_seq_len - if max_seq_len != -1 - else self.test_model.config.max_position_embeddings - ) - else: - raise ValueError( - "`neuronCoverage` only accepts models in " - + ",".join(COVERAGE_MODEL_TYPES) - ) - - - self.test_model.to(textattack.shared.utils.device) - self.threshold = threshold - self.test_model.eval() - - # initialize min and max for coverage - min_attention_value = min_value - max_attention_value = max_value - if pre_limits: - min_attention_value = 0.0 - max_attention_value = 1.0 - - self.coverage_word_dicts = torch.zeros((self.bins_word+3, 13, self.max_seq_len, 768)) - self.coverage_attention_dicts = torch.zeros((self.bins_attention + 3, 12, 12, self.max_seq_len, self.max_seq_len)) - self.min_word_coverage_tracker = torch.zeros((13, self.max_seq_len, 768)).fill_(min_value) - self.min_attention_coverage_tracker = torch.zeros((12, 12, self.max_seq_len, self.max_seq_len)).fill_(min_attention_value) - - self.max_word_coverage_tracker = torch.zeros(( 13, self.max_seq_len, 768)).fill_(max_value) - self.max_attention_coverage_tracker = torch.zeros(( 12, 12, self.max_seq_len, self.max_seq_len)).fill_(max_attention_value) - - - - if 'snac' in self.coverage: - self.k_m = 2 - if 'nbc' in self.coverage: - self.k_m = 1 - ''' + """ + ``neuronMultiSectionCoverage`` measures the neuron coverage acheived by a testset + Args: + test_model(Union[str, torch.nn.Module]): name of the pretrained language model from `transformers` + or the actual test model as a `torch.nn.Module` class. Default is "bert base uncased" from `transformers`. + tokenizer (:obj:``, optional): If `test_model` is not a pretrained model from `transformers, need to provide + the tokenizer here. + max_seq_len (int): Maximum sequence length accepted by the model to be tested. However, if you are using a pretrained model from `transformers`, this is handled + automatically using information from `model.config`. + threshold(float): threshold for marking a neuron as activated + coverage(str): measure type of neuron coverage at the level of layer outputs + """ + + def __init__( + self, + test_model="textattack/bert-base-uncased-ag-news", + tokenizer=None, + max_seq_len=-1, + threshold=0.0, + num_labels=2, + coverage="multisection", + pre_limits=False, + bins_attention=4, + bins_word=4, + min_value=np.inf, + max_value=-np.inf, + bz=128, + word_mask=False, + ): + self.coverage = coverage + + self.word_mask = word_mask + self.pre_limits = pre_limits + self.bins_attention = bins_attention + self.bins_word = bins_word # number of sections for each neuron + self.max_seq_len = 128 + self.model_type = "bert" + + config = transformers.AutoConfig.from_pretrained( + test_model, output_hidden_states=True, num_labels=num_labels + ) + if config.model_type in COVERAGE_MODEL_TYPES: + self.test_model = ( + transformers.AutoModelForSequenceClassification.from_pretrained( + test_model, config=config + ) + ) + self.test_model.tokenizer = transformers.AutoTokenizer.from_pretrained( + test_model + ) + self.model_type = self.test_model.config.model_type + self.max_seq_len = ( + max_seq_len + if max_seq_len != -1 + else self.test_model.config.max_position_embeddings + ) + else: + raise ValueError( + "`neuronCoverage` only accepts models in " + + ",".join(COVERAGE_MODEL_TYPES) + ) + + self.test_model.to(textattack.shared.utils.device) + self.threshold = threshold + self.test_model.eval() + + # initialize min and max for coverage + min_attention_value = min_value + max_attention_value = max_value + if pre_limits: + min_attention_value = 0.0 + max_attention_value = 1.0 + + self.coverage_word_dicts = torch.zeros( + (self.bins_word + 3, 13, self.max_seq_len, 768) + ) + self.coverage_attention_dicts = torch.zeros( + (self.bins_attention + 3, 12, 12, self.max_seq_len, self.max_seq_len) + ) + self.min_word_coverage_tracker = torch.zeros((13, self.max_seq_len, 768)).fill_( + min_value + ) + self.min_attention_coverage_tracker = torch.zeros( + (12, 12, self.max_seq_len, self.max_seq_len) + ).fill_(min_attention_value) + + self.max_word_coverage_tracker = torch.zeros((13, self.max_seq_len, 768)).fill_( + max_value + ) + self.max_attention_coverage_tracker = torch.zeros( + (12, 12, self.max_seq_len, self.max_seq_len) + ).fill_(max_attention_value) + + if "snac" in self.coverage: + self.k_m = 2 + if "nbc" in self.coverage: + self.k_m = 1 + """ for i in range(self.bins_word): word_tracker = self._init_word_coverage(fill_value=0.0) self.coverage_word_dicts.append(word_tracker) for i in range(self.bins_attention): attention_tracker = self._init_attention_coverage(fill_value=0.0) self.coverage_attention_dicts.append(attention_tracker) - ''' - def _init_word_coverage(self, fill_value): - """Initialize `coverage_tracker` dictionary - - Returns: - `coverage_tracker`(dict): a dictionary with key: neuron and value: (bool) intialized False """ - coverage_word_tracker = torch.zeros_like(self.coverage_word_dicts) - - ''' + + def _init_word_coverage(self, fill_value): + """Initialize `coverage_tracker` dictionary. + + Returns: + `coverage_tracker`(dict): a dictionary with key: neuron and value: (bool) intialized False + """ + coverage_word_tracker = torch.zeros_like(self.coverage_word_dicts) + + """ coverage_tracker["classifier"] = ( torch.zeros((len(self.test_model.config.label2id)), requires_grad=False) .fill_(fill_value) .to(textattack.shared.utils.device) .detach() ) - ''' - # embedding is L X H - - ''' + """ + # embedding is L X H + + """ coverage_tracker["classifier"] = ( torch.zeros((len(self.test_model.config.label2id)), requires_grad=False) .fill_(fill_value) .to(textattack.shared.utils.device) .detach() ) - ''' + """ - return coverage_word_tracker - def _init_attention_coverage(self, fill_value): - """Initialize `coverage_tracker` dictionary + return coverage_word_tracker - Returns: - `coverage_tracker`(dict): a dictionary with key: neuron and value: (bool) intialized False - """ - # attention neurons - coverage_attention_tracker = torch.zeros_like(self.coverage_attention_dicts) - return coverage_attention_tracker + def _init_attention_coverage(self, fill_value): + """Initialize `coverage_tracker` dictionary. - def _update_initial_word_coverage(self, embeddings, word_mask = None, interaction_mask = None): - """Update `coverage_tracker` for input `text` for coarse coverage - Args: - `text`(str): text to update neuron coverage of. + Returns: + `coverage_tracker`(dict): a dictionary with key: neuron and value: (bool) intialized False + """ + # attention neurons + coverage_attention_tracker = torch.zeros_like(self.coverage_attention_dicts) + return coverage_attention_tracker - """ + def _update_initial_word_coverage( + self, embeddings, word_mask=None, interaction_mask=None + ): + """Update `coverage_tracker` for input `text` for coarse coverage + Args: + `text`(str): text to update neuron coverage of. - ''' + """ + + """ encodings = self.test_model.tokenizer(text, return_tensors="pt") if self.max_seq_len > 0: input_ids = encodings.input_ids[:, : self.max_seq_len] @@ -170,347 +185,428 @@ def _update_initial_word_coverage(self, embeddings, word_mask = None, interactio attention_mask = attention_mask.to(textattack.shared.utils.device) outputs = self.test_model(input_ids, attention_mask=attention_mask) outputs[1][0] - ''' - - sentence_length = embeddings[0][0, ...].size(0) - - embeddings = [e.unsqueeze(1) for e in embeddings] - - embeddings = torch.cat(embeddings, dim = 1).cpu() - - if self.word_mask: - indices_to_fill = [int( index) for index in range(sentence_length)] - else: - indices_to_fill = [index for index in range(sentence_length)] - #print(embeddings,, self.max_word_coverage_tracker.device) - self.max_word_coverage_tracker[:,indices_to_fill, :] = torch.where(torch.max(embeddings, dim = 0).values.detach() > self.max_word_coverage_tracker[:,indices_to_fill, :] , torch.max(embeddings, dim = 0).values.detach(), self.max_word_coverage_tracker[:,indices_to_fill, :]) - self.min_word_coverage_tracker[:,indices_to_fill, :] = torch.where(torch.min(embeddings, dim = 0).values.detach() \ - < self.min_word_coverage_tracker[:,indices_to_fill, :] , torch.min(embeddings, dim = 0).values.detach(), self.min_word_coverage_tracker[:,indices_to_fill, :]) - - - ''' + """ + + sentence_length = embeddings[0][0, ...].size(0) + + embeddings = [e.unsqueeze(1) for e in embeddings] + + embeddings = torch.cat(embeddings, dim=1).cpu() + + if self.word_mask: + indices_to_fill = [int(index) for index in range(sentence_length)] + else: + indices_to_fill = [index for index in range(sentence_length)] + # print(embeddings,, self.max_word_coverage_tracker.device) + self.max_word_coverage_tracker[:, indices_to_fill, :] = torch.where( + torch.max(embeddings, dim=0).values.detach() + > self.max_word_coverage_tracker[:, indices_to_fill, :], + torch.max(embeddings, dim=0).values.detach(), + self.max_word_coverage_tracker[:, indices_to_fill, :], + ) + self.min_word_coverage_tracker[:, indices_to_fill, :] = torch.where( + torch.min(embeddings, dim=0).values.detach() + < self.min_word_coverage_tracker[:, indices_to_fill, :], + torch.min(embeddings, dim=0).values.detach(), + self.min_word_coverage_tracker[:, indices_to_fill, :], + ) + + """ self.max_coverage_tracker["classifier"] = torch.where( (outputs[0][0, ...].detach()) > self.max_coverage_tracker["classifier"], outputs[0][0, ...].detach(), self.max_coverage_tracker["classifier"], ) - ''' - - - def _update_initial_attention_coverage(self, all_attentions): - """Update `coverage_tracker` for input `text` for coarse coverage - Args: - `text`(str): text to update neuron coverage of. - - """ - - - # all_attentions = list of attentions of size B X H X L X L - - sentence_length = all_attentions[0][0,0, ...].size(-1) - all_attentions = torch.cat([a.unsqueeze(1) for a in all_attentions], dim = 1) # B X LA X HD X L X L - all_attentions_max = torch.max( all_attentions, dim = 0).values.cpu() - all_attentions_min = torch.min( all_attentions, dim = 0).values.cpu() - self.max_attention_coverage_tracker = torch.where(all_attentions_max > self.max_attention_coverage_tracker, all_attentions_max, self.max_attention_coverage_tracker) - self.min_attention_coverage_tracker = torch.where(all_attentions_min < self.min_attention_coverage_tracker, all_attentions_min, self.min_attention_coverage_tracker) - - - - def _update_initial_coverage(self, all_hidden_states, all_attentions, word_mask = None): - """Update `coverage_tracker` for input `text` - Args: - `text`(str): text to update neuron coverage of. - """ - - - self._update_initial_word_coverage(all_hidden_states, word_mask) - - self._update_initial_attention_coverage(all_attentions) - - def initialize_from_training_dataset(self, trainset, masks = None, bz=1): - """Update coverage from training dataset - `trainset`(list[str]): training dataset coverage statistics - - """ - mask_no = 0 - - - start = 0 - with torch.no_grad(): - for t in tqdm(trainset): - - if mask_no + bz >= len(trainset): - end = len(trainset) - else: - end = start + bz - if start >= end or start > len(trainset) : break - #print('current indices : ', trainset[start:end], start, end, len(trainset)) - encodings = self.test_model.tokenizer(trainset[start:end], padding='max_length', truncation=True, return_tensors="pt", max_length = self.max_seq_len) - - if self.max_seq_len > 0: - input_ids = encodings.input_ids[:, : self.max_seq_len] - attention_mask = encodings.attention_mask[:, : self.max_seq_len] - - input_ids = input_ids.to(textattack.shared.utils.device) - attention_mask = attention_mask.to(textattack.shared.utils.device) - - outputs = self.test_model(input_ids, attention_mask=attention_mask, output_attentions=True, output_hidden_states = True) - all_hidden_states, all_attentions = outputs[-2:] - self._update_initial_coverage(all_hidden_states, all_attentions, masks[start :end]) - start = end - - - self.training_word_coverage_dicts = copy.deepcopy(self.coverage_word_dicts) - self.training_attention_coverage_dicts = copy.deepcopy(self.coverage_attention_dicts) - - def _eval(self, text): - """Update `coverage_tracker` for input `text` for coarse coverage - Args: - `text`(str): text to update neuron coverage of. - - """ - encodings = self.test_model.tokenizer(text, return_tensors="pt") - if self.max_seq_len > 0: - input_ids = encodings.input_ids[:, : self.max_seq_len] - attention_mask = encodings.attention_mask[:, : self.max_seq_len] - - input_ids = input_ids.to(textattack.shared.utils.device) - attention_mask = attention_mask.to(textattack.shared.utils.device) - outputs = self.test_model(input_ids, attention_mask=attention_mask) - return outputs - - - def _update_word_coverage(self, all_hidden_states, word_mask = None): - """Update `coverage_tracker` for input `text` for coarse coverage - Args: - `text`(str): text to update neuron coverage of. - - - - a = time.time() - encodings = self.test_model.tokenizer(text, padding='max_length', truncation=True, return_tensors="pt", max_length = self.max_seq_len) - if self.max_seq_len > 0: - input_ids = encodings.input_ids[:, : self.max_seq_len] - attention_mask = encodings.attention_mask[:, : self.max_seq_len] - - input_ids = input_ids.to(textattack.shared.utils.device) - attention_mask = attention_mask.to(textattack.shared.utils.device) - outputs = self.test_model(input_ids, attention_mask=attention_mask) - b = time.time() - - sentence_length = outputs[1][0][0, ...].size(0) - """ - hidden_vectors = torch.cat([o.unsqueeze(1) for o in all_hidden_states], dim = 1) - sentence_length = hidden_vectors.size(2) - #print('size of output hidden bectors: ', hidden_vectors.size()) - if self.word_mask: - indices_to_fill = [index for index in range(sentence_length)] - else: - indices_to_fill = [index for index in range(sentence_length)] - current_coverage_tracker = self._init_word_coverage(fill_value=0) - a = time.time() - section_length = (self.max_word_coverage_tracker[:, indices_to_fill , :] - self.min_word_coverage_tracker[:, indices_to_fill , :] ) / self.bins_word - section_length = section_length.unsqueeze(0).repeat(hidden_vectors.size(0), 1, 1, 1) - #print('section length: ', section_length.size()) - section_index = torch.where( - section_length > 0, - ( - torch.floor( - ( - hidden_vectors.cpu().detach() - - self.min_word_coverage_tracker[:, - indices_to_fill , : - ] - ) - / section_length - ) - ), - torch.zeros_like(hidden_vectors.cpu().detach(), requires_grad=False) -1, - ).long() - # print('section index: ', section_index.size()) - - - #section_index = torch.where(section_index, section_index, self.bins_word + 1) - #section_index = torch.where(section_index>0, section_index, torch.zeros_like(section_index) + self.bins_word + 1) - section_index = torch.where(section_index0, section_index, torch.zeros_like(section_index) + self.bins_word + 1) - - # print('section index: ', section_index.size()) - - temp_store_activations = torch.max( (F.one_hot(section_index, num_classes = self.bins_word + 3)).permute(0,4,1,2,3), dim = 0).values - - # print('Temp Store Activations: ', temp_store_activations.size()) - self.coverage_word_dicts += temp_store_activations - del temp_store_activations - del current_coverage_tracker - - def _update_attention_coverage(self, all_attentions, masks): - """Update `coverage_tracker` for input `text` for coarse coverage - Args: - `text`(str): text to update neuron coverage of. - - - encodings = self.test_model.tokenizer(text, padding='max_length', truncation=True, return_tensors="pt", max_length = self.max_seq_len) - if self.max_seq_len > 0: - input_ids = encodings.input_ids[:, : self.max_seq_len] - attention_mask = encodings.attention_mask[:, : self.max_seq_len] - - input_ids = input_ids.to(textattack.shared.utils.device) - attention_mask = attention_mask.to(textattack.shared.utils.device) - outputs = self.test_model(input_ids, attention_mask=attention_mask, output_attentions=True, output_hidden_states = True) - - all_hidden_states, all_attentions = outputs[-2:] - # all_attentions = list of attentions of size B X H X L X L - - """ - sentence_length = all_attentions[0][0,0, ...].size(-1) - - - all_attentions = torch.cat( [a.unsqueeze(1) for a in all_attentions] , dim = 1).cpu()[:,:, 0:sentence_length, 0:sentence_length] - # B X layers X heads X l X l - # print('attentions size: ', all_attentions.size()) - current_coverage_tracker = self._init_attention_coverage(fill_value=0) - - - section_length = (self.max_attention_coverage_tracker[:,:, 0:sentence_length, 0:sentence_length] - \ - self.min_attention_coverage_tracker[:,:, 0:sentence_length, 0:sentence_length] ) / self.bins_attention - section_length = section_length.unsqueeze(0).repeat(all_attentions.size(0), 1, 1, 1, 1) - # print(' section length: ', section_length.size()) - section_index = torch.where( - section_length > 0, - ( - torch.floor( - ( - all_attentions.cpu().detach() - - self.min_attention_coverage_tracker - ) - / section_length - ) - ), - torch.zeros_like(all_attentions.cpu().detach(), requires_grad=False) - 1 - ).long() - - # print('section index: ', section_index.size()) - section_index = torch.where(section_index0, section_index, torch.zeros_like(section_index) + self.bins_word + 1) - temp_storage_activations = torch.max ((F.one_hot(section_index, num_classes = self.bins_attention + 3)).permute(0,5,1,2,3,4), dim = 0).values - # print(' temp storage activations: ', temp_storage_activations.size()) - self.coverage_attention_dicts += temp_storage_activations - del temp_storage_activations - del current_coverage_tracker - - def _compute_coverage(self): - """Calculate `neuron_coverage` for current model""" - neuron_word_coverage, neuron_word_coverage_total = 0.0, 0.0 - neuron_attention_coverage, neuron_attention_coverage_total = 0.0, 0.0 - neuron_word_coverage += np.count_nonzero(self.coverage_word_dicts.numpy()) - neuron_word_coverage_total += self.coverage_word_dicts.numel() - - neuron_attention_coverage += np.count_nonzero(self.coverage_attention_dicts.numpy()) - neuron_attention_coverage_total += self.coverage_attention_dicts.numel() - - - neuron_coverage = neuron_word_coverage + neuron_attention_coverage - # print('Word and Attention Only: ', neuron_word_coverage , neuron_attention_coverage) - neuron_coverage_total = neuron_word_coverage_total + neuron_attention_coverage_total - # print('Total Word and Attention Only: ', neuron_word_coverage_total , neuron_attention_coverage_total) - return neuron_coverage / neuron_coverage_total - - def _compute_vector(self): - """Calculate `neuron_coverage` for current model""" - neuron_coverage_vector = [] - for section in self.coverage_word_dicts: - for entry in section.values(): - neuron_coverage_vector += ([entry_val.item() for entry_val in entry.flatten()]) - for section in self.coverage_attention_dicts: - for entry in section.values(): - neuron_coverage_vector += ([entry_val.item() for entry_val in entry.flatten()]) - - return neuron_coverage_vector - def _update_coverage(self, text, word_mask = None): - """Update `coverage_tracker` for input `text` - Args: - `text`(str): text to update neuron coverage of. - - """ - - self._update_word_coverage(text, word_mask) - self._update_attention_coverage(text) - - def __call__(self, testset, masks = None, bz = 1): - """ - Returns neuron of `testset` - Args: - testset: Iterable of strings - Returns: - neuron coverage (float) - """ - # # # print('*'*50) - # # # print('Updating Coverage using test set: ') - mask_no, start = 0, 0 - with torch.no_grad(): - for t in tqdm(testset): - - if mask_no + bz >= len(testset): - end = len(testset) - else: - end = start + bz - if start >= end or start > len(testset) : break - - encodings = self.test_model.tokenizer(testset[start:end], padding='max_length', truncation=True, return_tensors="pt", max_length = self.max_seq_len) - - if self.max_seq_len > 0: - input_ids = encodings.input_ids[:, : self.max_seq_len] - attention_mask = encodings.attention_mask[:, : self.max_seq_len] - - input_ids = input_ids.to(textattack.shared.utils.device) - attention_mask = attention_mask.to(textattack.shared.utils.device) - - outputs = self.test_model(input_ids, attention_mask=attention_mask, output_attentions=True, output_hidden_states = True) - all_hidden_states, all_attentions = outputs[-2:] - self._update_word_coverage(all_hidden_states, masks[start:end]) - self._update_attention_coverage(all_attentions , masks[start:end]) - - - start = end - - - - - # # # print('*'*50) - # # # print() - # # # print('*'*50) - # # # print('Computing Coverage: ') - neuron_coverage = self._compute_coverage() - # # # print('*'*50) - return neuron_coverage - def vector(self, testset, start = False): - """ - Returns neuron of `testset` - Args: - testset: Iterable of strings - Returns: - neuron coverage (float) - """ - # # # print('*'*50) - if start: - self.coverage_word_dicts = copy.deepcopy(self.training_word_coverage_dicts) - self.coverage_attention_dicts = copy.deepcopy(self.training_attention_coverage_dicts) - # # # print('Updating Coverage using test set: ') - # # # print('#'*100) - # # # print(len(testset)) - # # # print(testset) - # # # print('#'*100) - for t in tqdm(testset): - # # # print(t) - self._update_coverage(t) - - # # # print('*'*50) - # # # print() - # # # print('*'*50) - # # # print('Computing Coverage: ') - neuron_coverage = self._compute_vector() - # # print('*'*50) - return neuron_coverage + def _update_initial_attention_coverage(self, all_attentions): + """Update `coverage_tracker` for input `text` for coarse coverage + Args: + `text`(str): text to update neuron coverage of. + + """ + + # all_attentions = list of attentions of size B X H X L X L + + sentence_length = all_attentions[0][0, 0, ...].size(-1) + all_attentions = torch.cat( + [a.unsqueeze(1) for a in all_attentions], dim=1 + ) # B X LA X HD X L X L + all_attentions_max = torch.max(all_attentions, dim=0).values.cpu() + all_attentions_min = torch.min(all_attentions, dim=0).values.cpu() + self.max_attention_coverage_tracker = torch.where( + all_attentions_max > self.max_attention_coverage_tracker, + all_attentions_max, + self.max_attention_coverage_tracker, + ) + self.min_attention_coverage_tracker = torch.where( + all_attentions_min < self.min_attention_coverage_tracker, + all_attentions_min, + self.min_attention_coverage_tracker, + ) + + def _update_initial_coverage( + self, all_hidden_states, all_attentions, word_mask=None + ): + """Update `coverage_tracker` for input `text` + Args: + `text`(str): text to update neuron coverage of. + + """ + + self._update_initial_word_coverage(all_hidden_states, word_mask) + + self._update_initial_attention_coverage(all_attentions) + + def initialize_from_training_dataset(self, trainset, masks=None, bz=1): + """Update coverage from training dataset + `trainset`(list[str]): training dataset coverage statistics + + + """ + mask_no = 0 + + start = 0 + with torch.no_grad(): + for t in tqdm(trainset): + if mask_no + bz >= len(trainset): + end = len(trainset) + else: + end = start + bz + if start >= end or start > len(trainset): + break + # print('current indices : ', trainset[start:end], start, end, len(trainset)) + encodings = self.test_model.tokenizer( + trainset[start:end], + padding="max_length", + truncation=True, + return_tensors="pt", + max_length=self.max_seq_len, + ) + + if self.max_seq_len > 0: + input_ids = encodings.input_ids[:, : self.max_seq_len] + attention_mask = encodings.attention_mask[:, : self.max_seq_len] + + input_ids = input_ids.to(textattack.shared.utils.device) + attention_mask = attention_mask.to(textattack.shared.utils.device) + + outputs = self.test_model( + input_ids, + attention_mask=attention_mask, + output_attentions=True, + output_hidden_states=True, + ) + all_hidden_states, all_attentions = outputs[-2:] + self._update_initial_coverage( + all_hidden_states, all_attentions, masks[start:end] + ) + start = end + + self.training_word_coverage_dicts = copy.deepcopy(self.coverage_word_dicts) + self.training_attention_coverage_dicts = copy.deepcopy( + self.coverage_attention_dicts + ) + + def _eval(self, text): + """Update `coverage_tracker` for input `text` for coarse coverage + Args: + `text`(str): text to update neuron coverage of. + + """ + encodings = self.test_model.tokenizer(text, return_tensors="pt") + if self.max_seq_len > 0: + input_ids = encodings.input_ids[:, : self.max_seq_len] + attention_mask = encodings.attention_mask[:, : self.max_seq_len] + + input_ids = input_ids.to(textattack.shared.utils.device) + attention_mask = attention_mask.to(textattack.shared.utils.device) + outputs = self.test_model(input_ids, attention_mask=attention_mask) + return outputs + + def _update_word_coverage(self, all_hidden_states, word_mask=None): + """Update `coverage_tracker` for input `text` for coarse coverage + Args: + `text`(str): text to update neuron coverage of. + + + + a = time.time() + encodings = self.test_model.tokenizer(text, padding='max_length', truncation=True, return_tensors="pt", max_length = self.max_seq_len) + if self.max_seq_len > 0: + input_ids = encodings.input_ids[:, : self.max_seq_len] + attention_mask = encodings.attention_mask[:, : self.max_seq_len] + + input_ids = input_ids.to(textattack.shared.utils.device) + attention_mask = attention_mask.to(textattack.shared.utils.device) + outputs = self.test_model(input_ids, attention_mask=attention_mask) + b = time.time() + + sentence_length = outputs[1][0][0, ...].size(0) + """ + hidden_vectors = torch.cat([o.unsqueeze(1) for o in all_hidden_states], dim=1) + sentence_length = hidden_vectors.size(2) + # print('size of output hidden bectors: ', hidden_vectors.size()) + if self.word_mask: + indices_to_fill = [index for index in range(sentence_length)] + else: + indices_to_fill = [index for index in range(sentence_length)] + current_coverage_tracker = self._init_word_coverage(fill_value=0) + a = time.time() + section_length = ( + self.max_word_coverage_tracker[:, indices_to_fill, :] + - self.min_word_coverage_tracker[:, indices_to_fill, :] + ) / self.bins_word + section_length = section_length.unsqueeze(0).repeat( + hidden_vectors.size(0), 1, 1, 1 + ) + # print('section length: ', section_length.size()) + section_index = torch.where( + section_length > 0, + ( + torch.floor( + ( + hidden_vectors.cpu().detach() + - self.min_word_coverage_tracker[:, indices_to_fill, :] + ) + / section_length + ) + ), + torch.zeros_like(hidden_vectors.cpu().detach(), requires_grad=False) - 1, + ).long() + # print('section index: ', section_index.size()) + + # section_index = torch.where(section_index, section_index, self.bins_word + 1) + # section_index = torch.where(section_index>0, section_index, torch.zeros_like(section_index) + self.bins_word + 1) + section_index = torch.where( + section_index < self.bins_word, + section_index, + torch.zeros_like(section_index) + self.bins_word + 2, + ) + section_index = torch.where( + section_index > 0, + section_index, + torch.zeros_like(section_index) + self.bins_word + 1, + ) + + # print('section index: ', section_index.size()) + + temp_store_activations = torch.max( + (F.one_hot(section_index, num_classes=self.bins_word + 3)).permute( + 0, 4, 1, 2, 3 + ), + dim=0, + ).values + + # print('Temp Store Activations: ', temp_store_activations.size()) + self.coverage_word_dicts += temp_store_activations + del temp_store_activations + del current_coverage_tracker + + def _update_attention_coverage(self, all_attentions, masks): + """Update `coverage_tracker` for input `text` for coarse coverage + Args: + `text`(str): text to update neuron coverage of. + + + encodings = self.test_model.tokenizer(text, padding='max_length', truncation=True, return_tensors="pt", max_length = self.max_seq_len) + if self.max_seq_len > 0: + input_ids = encodings.input_ids[:, : self.max_seq_len] + attention_mask = encodings.attention_mask[:, : self.max_seq_len] + + input_ids = input_ids.to(textattack.shared.utils.device) + attention_mask = attention_mask.to(textattack.shared.utils.device) + outputs = self.test_model(input_ids, attention_mask=attention_mask, output_attentions=True, output_hidden_states = True) + + all_hidden_states, all_attentions = outputs[-2:] + # all_attentions = list of attentions of size B X H X L X L + + """ + sentence_length = all_attentions[0][0, 0, ...].size(-1) + + all_attentions = torch.cat( + [a.unsqueeze(1) for a in all_attentions], dim=1 + ).cpu()[:, :, 0:sentence_length, 0:sentence_length] + # B X layers X heads X l X l + # print('attentions size: ', all_attentions.size()) + current_coverage_tracker = self._init_attention_coverage(fill_value=0) + + section_length = ( + self.max_attention_coverage_tracker[ + :, :, 0:sentence_length, 0:sentence_length + ] + - self.min_attention_coverage_tracker[ + :, :, 0:sentence_length, 0:sentence_length + ] + ) / self.bins_attention + section_length = section_length.unsqueeze(0).repeat( + all_attentions.size(0), 1, 1, 1, 1 + ) + # print(' section length: ', section_length.size()) + section_index = torch.where( + section_length > 0, + ( + torch.floor( + ( + all_attentions.cpu().detach() + - self.min_attention_coverage_tracker + ) + / section_length + ) + ), + torch.zeros_like(all_attentions.cpu().detach(), requires_grad=False) - 1, + ).long() + + # print('section index: ', section_index.size()) + section_index = torch.where( + section_index < self.bins_attention, + section_index, + torch.zeros_like(section_index) + self.bins_attention + 2, + ) + section_index = torch.where( + section_index > 0, + section_index, + torch.zeros_like(section_index) + self.bins_word + 1, + ) + temp_storage_activations = torch.max( + (F.one_hot(section_index, num_classes=self.bins_attention + 3)).permute( + 0, 5, 1, 2, 3, 4 + ), + dim=0, + ).values + # print(' temp storage activations: ', temp_storage_activations.size()) + self.coverage_attention_dicts += temp_storage_activations + del temp_storage_activations + del current_coverage_tracker + + def _compute_coverage(self): + """Calculate `neuron_coverage` for current model.""" + neuron_word_coverage, neuron_word_coverage_total = 0.0, 0.0 + neuron_attention_coverage, neuron_attention_coverage_total = 0.0, 0.0 + neuron_word_coverage += np.count_nonzero(self.coverage_word_dicts.numpy()) + neuron_word_coverage_total += self.coverage_word_dicts.numel() + + neuron_attention_coverage += np.count_nonzero( + self.coverage_attention_dicts.numpy() + ) + neuron_attention_coverage_total += self.coverage_attention_dicts.numel() + + neuron_coverage = neuron_word_coverage + neuron_attention_coverage + # print('Word and Attention Only: ', neuron_word_coverage , neuron_attention_coverage) + neuron_coverage_total = ( + neuron_word_coverage_total + neuron_attention_coverage_total + ) + # print('Total Word and Attention Only: ', neuron_word_coverage_total , neuron_attention_coverage_total) + return neuron_coverage / neuron_coverage_total + + def _compute_vector(self): + """Calculate `neuron_coverage` for current model.""" + neuron_coverage_vector = [] + for section in self.coverage_word_dicts: + for entry in section.values(): + neuron_coverage_vector += [ + entry_val.item() for entry_val in entry.flatten() + ] + for section in self.coverage_attention_dicts: + for entry in section.values(): + neuron_coverage_vector += [ + entry_val.item() for entry_val in entry.flatten() + ] + + return neuron_coverage_vector + + def _update_coverage(self, text, word_mask=None): + """Update `coverage_tracker` for input `text` + Args: + `text`(str): text to update neuron coverage of. + + """ + + self._update_word_coverage(text, word_mask) + self._update_attention_coverage(text) + + def __call__(self, testset, masks=None, bz=1): + """ + Returns neuron of `testset` + Args: + testset: Iterable of strings + Returns: + neuron coverage (float) + """ + # # # print('*'*50) + # # # print('Updating Coverage using test set: ') + mask_no, start = 0, 0 + with torch.no_grad(): + for t in tqdm(testset): + if mask_no + bz >= len(testset): + end = len(testset) + else: + end = start + bz + if start >= end or start > len(testset): + break + + encodings = self.test_model.tokenizer( + testset[start:end], + padding="max_length", + truncation=True, + return_tensors="pt", + max_length=self.max_seq_len, + ) + + if self.max_seq_len > 0: + input_ids = encodings.input_ids[:, : self.max_seq_len] + attention_mask = encodings.attention_mask[:, : self.max_seq_len] + + input_ids = input_ids.to(textattack.shared.utils.device) + attention_mask = attention_mask.to(textattack.shared.utils.device) + + outputs = self.test_model( + input_ids, + attention_mask=attention_mask, + output_attentions=True, + output_hidden_states=True, + ) + all_hidden_states, all_attentions = outputs[-2:] + self._update_word_coverage(all_hidden_states, masks[start:end]) + self._update_attention_coverage(all_attentions, masks[start:end]) + + start = end + + # # # print('*'*50) + # # # print() + # # # print('*'*50) + # # # print('Computing Coverage: ') + neuron_coverage = self._compute_coverage() + # # # print('*'*50) + return neuron_coverage + + def vector(self, testset, start=False): + """ + Returns neuron of `testset` + Args: + testset: Iterable of strings + Returns: + neuron coverage (float) + """ + # # # print('*'*50) + if start: + self.coverage_word_dicts = copy.deepcopy(self.training_word_coverage_dicts) + self.coverage_attention_dicts = copy.deepcopy( + self.training_attention_coverage_dicts + ) + # # # print('Updating Coverage using test set: ') + # # # print('#'*100) + # # # print(len(testset)) + # # # print(testset) + # # # print('#'*100) + for t in tqdm(testset): + # # # print(t) + self._update_coverage(t) + + # # # print('*'*50) + # # # print() + # # # print('*'*50) + # # # print('Computing Coverage: ') + neuron_coverage = self._compute_vector() + # # print('*'*50) + return neuron_coverage diff --git a/textattack/coverage/neuron_coverage.py b/textattack/coverage/neuron_coverage.py index eef9d6f55..f2c42d4e1 100644 --- a/textattack/coverage/neuron_coverage.py +++ b/textattack/coverage/neuron_coverage.py @@ -1,13 +1,14 @@ +from collections import defaultdict +import copy +import itertools import logging import torch -import transformers from tqdm import tqdm -import itertools -import copy +import transformers import textattack -from collections import defaultdict + from .coverage import ExtrinsicCoverage logging.getLogger("transformers.tokenization_utils_base").setLevel(logging.ERROR) @@ -34,16 +35,15 @@ def __init__( self, test_model="textattack/bert-base-uncased-ag-news", tokenizer=None, - num_labels = 2, + num_labels=2, max_seq_len=-1, threshold=0.0, coarse_coverage=True, ): - self.coarse_coverage = coarse_coverage config = transformers.AutoConfig.from_pretrained( - test_model, output_hidden_states=True,num_labels=num_labels + test_model, output_hidden_states=True, num_labels=num_labels ) if config.model_type in COVERAGE_MODEL_TYPES: self.test_model = ( @@ -72,7 +72,7 @@ def __init__( self.coverage_tracker = self._init_coverage() def _init_coverage(self): - """Initialize `coverage_tracker` dictionary + """Initialize `coverage_tracker` dictionary. Returns: `coverage_tracker`(dict): a dictionary with key: neuron and value: (bool) intialized False @@ -143,7 +143,6 @@ def scale(layer_outputs, rmax=1, rmin=0): self.coverage_tracker[("embedding")][0:sentence_length, ...], ) for h_index, hidden_vector in enumerate(outputs[1][1:]): - self.coverage_tracker[(h_index, "output")][ 0:sentence_length, ... ] = torch.where( @@ -170,7 +169,7 @@ def _update_refined_coverage(self, text): """ def _compute_coverage(self): - """Calculate `neuron_coverage` for current model""" + """Calculate `neuron_coverage` for current model.""" neuron_coverage = sum( [entry.sum().item() for entry in self.coverage_tracker.values()] @@ -198,7 +197,6 @@ def __call__(self, testset): neuron coverage (float) """ for t in tqdm(testset): - self._update_coverage(t[0]["text"]) neuron_coverage = self._compute_coverage() return neuron_coverage diff --git a/textattack/metrics/recipe.py b/textattack/metrics/recipe.py index 39df568b3..f7e7f4ca6 100644 --- a/textattack/metrics/recipe.py +++ b/textattack/metrics/recipe.py @@ -7,23 +7,24 @@ from . import metric + class AdvancedAttackMetric(Metric): - """Calculate a suite of advanced metrics to evaluate attackResults' quality - """ + """Calculate a suite of advanced metrics to evaluate attackResults' + quality.""" - def __init__(self, choices=['use']): + def __init__(self, choices=["use"]): self.achoices = choices def calculate(self, results): advanced_metrics = {} - if 'use' in self.achoices: - advanced_metrics['use_stats'] = USEMetric().calculate(results) - if 'perplexity' in self.achoices: - advanced_metrics['perplexity_stats'] = Perplexity().calculate(results) - if 'bert_score' in self.achoices: - advanced_metrics['bert_score'] = BERTScoreMetric().calculate(results) - if 'meteor_score' in self.achoices: - advanced_metrics['meteor_score'] = MeteorMetric().calculate(results) - if 'sbert_score' in self.achoices: - advanced_metrics['sbert_score'] = SBERTMetric().calculate(results) - return advanced_metrics \ No newline at end of file + if "use" in self.achoices: + advanced_metrics["use_stats"] = USEMetric().calculate(results) + if "perplexity" in self.achoices: + advanced_metrics["perplexity_stats"] = Perplexity().calculate(results) + if "bert_score" in self.achoices: + advanced_metrics["bert_score"] = BERTScoreMetric().calculate(results) + if "meteor_score" in self.achoices: + advanced_metrics["meteor_score"] = MeteorMetric().calculate(results) + if "sbert_score" in self.achoices: + advanced_metrics["sbert_score"] = SBERTMetric().calculate(results) + return advanced_metrics From d76075255ff8f60a634eb9a70c0d61f8ffe9e71b Mon Sep 17 00:00:00 2001 From: Yanjun Qi Date: Mon, 18 Sep 2023 11:22:19 -0400 Subject: [PATCH 6/7] format update --- examples/coverage/measure_coverage.py | 31 ++++++++++++++------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/examples/coverage/measure_coverage.py b/examples/coverage/measure_coverage.py index a7991c6de..e2b3171cd 100644 --- a/examples/coverage/measure_coverage.py +++ b/examples/coverage/measure_coverage.py @@ -5,28 +5,29 @@ """ -import torch +import argparse +from math import floor import os -import textattack import pickle -from textattack.models.tokenizers import AutoTokenizer -from textattack.models.wrappers import HuggingFaceModelWrapper -from textattack.models.wrappers import ModelWrapper +import random + +import numpy as np +import torch from transformers import AutoModelForSequenceClassification -from textattack.coverage import neuronMultiSectionCoverage -from textattack.attack_results import SuccessfulAttackResult -from textattack.datasets import HuggingFaceDataset + +import textattack from textattack.attack_recipes import ( - TextFoolerJin2019, - HotFlipEbrahimi2017, + BAEGarg2019, DeepWordBugGao2018, FasterGeneticAlgorithmJia2019, - BAEGarg2019, + HotFlipEbrahimi2017, + TextFoolerJin2019, ) -from math import floor -import random -import numpy as np -import argparse +from textattack.attack_results import SuccessfulAttackResult +from textattack.coverage import neuronMultiSectionCoverage +from textattack.datasets import HuggingFaceDataset +from textattack.models.tokenizers import AutoTokenizer +from textattack.models.wrappers import HuggingFaceModelWrapper, ModelWrapper def random_seed(seed): From 7b7b45f8803a4d5381327212ae0b07074c86281c Mon Sep 17 00:00:00 2001 From: Yanjun Qi Date: Mon, 18 Sep 2023 11:29:49 -0400 Subject: [PATCH 7/7] add testing code for metric recipes --- tests/test_metric_api.py | 5 +++++ textattack/metrics/recipe.py | 4 ++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/tests/test_metric_api.py b/tests/test_metric_api.py index 1e75e815e..f2ba20542 100644 --- a/tests/test_metric_api.py +++ b/tests/test_metric_api.py @@ -32,6 +32,7 @@ def test_use(): from textattack.datasets import HuggingFaceDataset from textattack.metrics.quality_metrics import USEMetric from textattack.models.wrappers import HuggingFaceModelWrapper + from textattack.metrics.recipe import AdvancedAttackMetric model = transformers.AutoModelForSequenceClassification.from_pretrained( "distilbert-base-uncased-finetuned-sst-2-english" @@ -56,3 +57,7 @@ def test_use(): usem = USEMetric().calculate(results) assert usem["avg_attack_use_score"] == 0.76 + + + adv_score = AdvancedAttackMetric(['use','perplexity']).calculate(results) + assert adv_score['use']["avg_attack_use_score"] == 0.76 diff --git a/textattack/metrics/recipe.py b/textattack/metrics/recipe.py index f7e7f4ca6..81490f540 100644 --- a/textattack/metrics/recipe.py +++ b/textattack/metrics/recipe.py @@ -18,9 +18,9 @@ def __init__(self, choices=["use"]): def calculate(self, results): advanced_metrics = {} if "use" in self.achoices: - advanced_metrics["use_stats"] = USEMetric().calculate(results) + advanced_metrics["use"] = USEMetric().calculate(results) if "perplexity" in self.achoices: - advanced_metrics["perplexity_stats"] = Perplexity().calculate(results) + advanced_metrics["perplexity"] = Perplexity().calculate(results) if "bert_score" in self.achoices: advanced_metrics["bert_score"] = BERTScoreMetric().calculate(results) if "meteor_score" in self.achoices: