-
Notifications
You must be signed in to change notification settings - Fork 1
/
data.py
269 lines (193 loc) · 9.56 KB
/
data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
import collections
import string
import json
import os
import re
import nltk
# Set max number of tokens allowed in a sentence.
MAX_NUM_TOKENS = os.getenv('MAX_NUM_TOKENS', 20)
# Set the maximum number of utterances to load.
MAX_NUM_UTTERANCES = os.getenv('MAX_NUM_UTTERANCES', 10000)
# The maximum number of words which are considered. This value
# is the number of most common words which get included in embedding.
MAX_VOCABULARY_SIZE = os.getenv('MAX_VOCABULARY_SIZE', 3000)
# If specifified, only tweets from this user name will be used as replies.
TARGET_USER = os.getenv('TARGET_USER', None)
# If set, we filter away self-responses.
REMOVE_SELF_REPLIES = os.getenv('REMOVE_SELF_REPLIES', True)
# Whether or not we should use the Cornell English corpus instead of Norwegian data.
USE_CORNELL_CORPUS = os.getenv('USE_CORNELL_CORPUS', False)
# Special tokens.
START_UTTERANCE = '<u>'
END_UTTERANCE = '</u>'
UNKNOWN_TOKEN = '<unk>'
PAD_TOKEN = '<pad>'
def clean_content(content, decode_content=True):
''' Cleans the text belonging to a content in the Facebook data. '''
if decode_content:
# Facebook encodes the data in their exports incorrectly. We can work around
# this error by encoding the data as Latin-1 and decoding again as UTF-8.
content = content.encode('latin1').decode('utf8')
# Convert all text to lowercase.
content = content.lower()
# Remove all punctuation from text.
content = re.sub('[{}]'.format(re.escape(string.punctuation)), '', content)
# Replace newlines with spaces.
content = re.sub('\n', ' ', content)
# Return the cleaned content.
return content
def load_facebook_utterances():
''' Load a list of utterances from Facebook data. '''
utterances = []
# Recursively traverse all directories in the corpus folder.
for root, subdirs, files in os.walk('corpus'):
# Traverse all files found in this subdirectory.
for filename in files:
# Check if we found a JSON file.
if filename.endswith('json'):
# Find the complete file path.
file_path = os.path.join(root, filename)
with open(file_path, 'r', encoding='utf-8') as f:
# Load the data file.
data = json.load(f)
for message in data.get('messages', []):
if 'content' in message:
utterances.append({
'sender_name': clean_content(message['sender_name']),
'content': clean_content(message['content']),
})
return utterances
def get_cornell_utterance_pairs():
input_utterances, target_utterances = [], []
with open('corpus/cornell/movie_lines.txt', encoding='latin-1') as f:
# Read utterances from Cornell corpus file.
lines = [
clean_content(line.split('+++$+++')[-1].strip(), decode_content=False)
for line in f.readlines()
][:MAX_NUM_UTTERANCES]
for i, utterance in enumerate(lines[1:], 1):
# Tokenize input and target utterances.
input_tokens, target_tokens = map(tokenize, (lines[i-1], utterance))
# Add input utterance to list.
input_utterances.append(wrap_utterance(input_tokens))
# Add corresponding output utterance.
target_utterances.append(wrap_utterance(target_tokens))
# Return utterance pairs.
return input_utterances, target_utterances
def wrap_utterance(utterance):
''' Wrap an utterance in start and end tags, to detect when
an entire utterance has been generated by the chatbot. '''
return [START_UTTERANCE] + utterance + [END_UTTERANCE]
def tokenize(utterance):
''' Tokenize and clean an utterance. '''
# Tokenize the utterance using NLTK.
tokens = [
token for sentence in nltk.sent_tokenize(utterance, language='norwegian')
for token in nltk.word_tokenize(sentence, language='norwegian')[:MAX_NUM_TOKENS]
]
# Return tokenized utterance.
return tokens
def get_utterance_pairs():
''' Load utterances and split them into questions and answers. '''
# Load Norwegian data from file.
utterances = load_facebook_utterances()
# Lists for input utterances with corresponding output utterances.
input_utterances, target_utterances = [], []
# Loop through all utterances, starting at the second line.
for i, utterance in enumerate(utterances[1:], 1):
# Stop when max number of utterances is reached.
if len(input_utterances) == MAX_NUM_UTTERANCES:
break
# Tokenize input and target utterances.
input_tokens, target_tokens = map(tokenize, (utterances[i-1]['content'], utterance['content']))
if len(input_tokens) == 0 or len(target_tokens) == 0:
continue
# Check that the user of the target message is the target user, if set.
if TARGET_USER != None and utterance['sender_name'] != TARGET_USER.lower():
continue
# If set, we remove self replies from the dataset.
if REMOVE_SELF_REPLIES and utterances[i-1]['sender_name'] == utterance['sender_name']:
continue
# Add input utterance to list.
input_utterances.append(wrap_utterance(input_tokens))
# Add corresponding output utterance.
target_utterances.append(wrap_utterance(target_tokens))
return input_utterances, target_utterances
def pad_tokens(tokens, max_length):
''' Add padding tokens to the given list of tokens until the max length is reached. '''
return tokens + [PAD_TOKEN] * (max_length - len(tokens))
def get_unknown_token():
''' Function which returns the unknown token code. We cannot use a lambda, as
these cannot be pickled by Python automatically. '''
return 1
def get_word_map(corpus):
''' Create mapping between tokens and an unique number for each
token, and vice versa. '''
# Count occurences of tokens in corpus.
token_counts = collections.Counter(token for utterance in corpus for token in utterance)
# Only consider the most commonly used tokens.
tokens = [entry[0] for entry in token_counts.most_common(MAX_VOCABULARY_SIZE)]
# Map tokens to an unique number. Assign all unknown
# tokens to the same value, 1.
token_to_num = collections.defaultdict(get_unknown_token)
# Use 0 for padding to match empty vectors.
token_to_num[PAD_TOKEN] = 0
# Add the unknown token for good measure.
token_to_num[UNKNOWN_TOKEN] = 1
for i, token in enumerate(tokens, start=1):
# Add tokens into the dictionary.
token_to_num[token] = i
# Inverse mapping which takes numbers back to tokens.
num_to_token = { i: token for token, i in token_to_num.items() }
# Map 0 back to the padding token.
num_to_token[0] = PAD_TOKEN
# Map 1 back to the unknown token.
num_to_token[1] = UNKNOWN_TOKEN
# Return both mappings.
return token_to_num, num_to_token
def filter_unknown(input_utterances, target_utterances, input_mapper, target_mapper):
''' Remove unknown tokens from the provided utterance pairs. '''
updated_input_utterances, updated_target_utterances = [], []
for input_utterance, target_utterance in zip(input_utterances, target_utterances):
# Filter out unknown tokens from both utterances.
input_utterance = [token for token in input_utterance if token in input_mapper.tok2num]
target_utterance = [token for token in target_utterance if token in target_mapper.tok2num]
if len(input_utterance) > 2 and len(target_utterance) > 2:
# Add both input and output if they still contain tokens.
updated_input_utterances.append(input_utterance)
updated_target_utterances.append(target_utterance)
# Return updated tokens.
return updated_input_utterances, updated_target_utterances
class TokenMapper():
def __init__(self, utterances):
''' Create a word map for the utterances and add special tokens. '''
tok2num, num2tok = get_word_map(utterances)
self.tok2num = tok2num
self.num2tok = num2tok
# Add remainding tokens to the set of available tokens.
for token in [START_UTTERANCE, END_UTTERANCE]:
self.add_token(token)
def add_token(self, token):
''' Adds a new token to the end of the mapper dictionary. '''
if token not in self.tok2num:
self.tok2num[token] = len(self.num2tok)
self.num2tok[len(self.num2tok)] = token
def analyze_facebook_corpus():
''' todo '''
# Load input and target utterances.
input_utterances, target_utterances = get_utterance_pairs()
# Merge first input utterance with time-shifted target
# utterances, to get a list with all utterances.
merged_utterances = [input_utterances[0]] + target_utterances
token_mapper = TokenMapper(merged_utterances)
# Vectorize utterances.
mapped_utterances = [
[token_mapper.tok2num[token] for token in utterance]
for utterance in merged_utterances
]
# Count occurrences of each token in dataset.
token_counts = collections.Counter(token for tokens in mapped_utterances for token in tokens)
print('Percentage of unknown tokens', token_counts[token_mapper.tok2num[UNKNOWN_TOKEN]] / sum(token_counts.values()))
print('Average length of utterance', sum(len(utterance) for utterance in mapped_utterances) / len(mapped_utterances))
print('Percentage of utterances over MAX_NUM_TOKENS',
sum(1 for utterance in mapped_utterances if len(utterance) > MAX_NUM_TOKENS) / len(mapped_utterances))