-
-
Notifications
You must be signed in to change notification settings - Fork 11
/
functions.py
420 lines (290 loc) · 12.7 KB
/
functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
import json
import requests
import os
import asyncio
import re
import base64
import random
from PIL import Image
import io
import datetime
# Set an API struct to whatever is in a JSON file to our heart's content
async def set_api(config_file):
# Go grab the configuration file for me
file = get_file_name("configurations", config_file)
contents = await get_json_file(file)
api = {}
# If contents aren't none, clear the API and shove new data in
if contents != None:
api.update(contents)
# Return the API
return api
# Check to see if the API is running (pick any API)
async def api_status_check(link, headers):
try:
response = requests.get(link, headers=headers)
status = response.ok
except requests.exceptions.RequestException as e:
await write_to_log("Error occurred: " + e + ". Language model not currently running.")
status = False
return status
def get_file_name(directory, file_name):
# Create the file path from name and directory and return that information
filepath = os.path.join(directory, file_name)
return filepath
# Read in a JSON file and spit it out, usefully or "None" if file's not there or we have an issue
async def get_json_file(filename):
# Try to go read the file!
try:
with open(filename, 'r') as file:
contents = json.load(file)
return contents
# Be very sad if the file isn't there to read
except FileNotFoundError:
await write_to_log("File " + filename + "not found. Where did you lose it?")
return None
# Be also sad if the file isn't a JSON or is malformed somehow
except json.JSONDecodeError:
await write_to_log("Unable to parse " + filename + " as JSON.")
return None
# Be super sad if we have no idea what's going on here
except Exception as e:
await write_to_log("An unexpected error occurred: " + e)
return None
# Write a line to the log file
async def write_to_log(information):
file = get_file_name("", "log.txt")
# Add a time stamp to the provided error message
current_time = datetime.datetime.now()
rounded_time = current_time.replace(microsecond=0)
text = str(rounded_time) + " " + information + "\n"
await append_text_file(file, text)
# Figure out if the user is looking for an image to be generated
def check_for_image_request(user_message):
# Set user's message to all lowercase
user_message = user_message.lower()
# Create a pattern we'll be matching against
pattern = re.compile(
'(send|create|give|generate|draw|snap|show|take|message).*?(image|picture|photo|photogragh|pic|drawing|painting|screenshot)')
# Do some matching, I suppose
result = bool(pattern.search(user_message))
return result
async def create_text_prompt(user_input, user, character, bot, history, reply, text_api):
prompt = character + history + reply + user + \
": " + user_input + "\n" + bot + ": "
stopping_strings = ["\n" + user + ":", user + ":", bot + ":", "You:"]
data = text_api["parameters"]
prompt = "[INST] " + prompt + " [/INST]" # for Mixtral
data.update({"prompt": prompt})
if text_api["name"] == "openai":
data.update({"stop": stopping_strings})
else:
data.update({"stop_sequence": stopping_strings})
data_string = json.dumps(data)
return data_string
async def create_text_prompt(user_input, user, character, bot, history, reply, text_api, image_description=None):
if image_description:
image_prompt = "[NOTE TO AI - USER MESSAGE CONTAINS AN IMAGE. IMAGE RECOGNITION HAS BEEN RUN ON THE IMAGE. DESCRIPTION OF THE IMAGE: " + \
image_description.capitalize() + "]"
prompt = character + history + reply + user + ": " + \
user_input + "\n" + image_prompt + "\n" + bot + ": "
else:
prompt = character + history + reply + user + \
": " + user_input + "\n" + bot + ": "
stopping_strings = ["\n" + user + ":", user + ":", bot +
":", "You:", "@Ava", "User", "@" + user, "<|endoftext|>"]
data = text_api["parameters"]
if text_api["name"] == "openai":
messages = [
{
"role": "user",
"content": prompt
}
]
# data.update({"stop": stopping_strings})
data.update({"messages": messages})
else:
data.update({"prompt": prompt})
data.update({"stop_sequence": stopping_strings})
data_string = json.dumps(data)
return data_string
async def create_text_prompt(user_input, user, character, bot, history, reply, text_api, image_description=None):
if image_description:
image_prompt = "[NOTE TO AI - USER MESSAGE CONTAINS AN IMAGE. IMAGE RECOGNITION HAS BEEN RUN ON THE IMAGE. DESCRIPTION OF THE IMAGE: " + \
image_description.capitalize() + "]"
prompt = character + history + reply + user + ": " + \
user_input + "\n" + image_prompt + "\n" + bot + ": "
else:
prompt = character + history + reply + user + \
": " + user_input + "\n" + bot + ": "
stopping_strings = ["\n" + user + ":", user + ":", bot +
":", "You:", "@Ava", "User", "@" + user, "<|endoftext|>"]
data = text_api["parameters"]
if text_api["name"] == "openai":
messages = [
{
"role": "user",
"content": prompt
}
]
# data.update({"stop": stopping_strings})
data.update({"messages": messages})
else:
data.update({"prompt": prompt})
data.update({"stop_sequence": stopping_strings})
data_string = json.dumps(data)
return data_string
async def create_image_prompt(user_input, character, text_api):
user_input = user_input.lower()
if "of" in user_input:
subject = user_input.split('of', 1)[1]
prompt = "Please describe the following in maximum three sentences, in vivid detail using descriptive keywords so that someone could draw that based on that description: " + subject + "\n"
else:
prompt = "Please describe the way you look in maximum three sentences, in vivid detail using descriptive keywords so that someone could draw you based on that description."
stopping_strings = ["### Instruction:", "### Response:", "You:"]
data = text_api["parameters"]
data.update({"prompt": prompt})
if text_api["name"] == "openai":
messages = [
{
"role": "user",
"content": prompt
}
]
# data.update({"stop": stopping_strings})
data.update({"messages": messages})
else:
data.update({"prompt": prompt})
data.update({"stopping_strings": stopping_strings})
data_string = json.dumps(data)
return data_string
# Clean username before storing a context .txt file with that username
def clean_username(username):
# Replace invalid characters with an underscore
cleaned_username = re.sub(r'[<>:"/\\|?*]', '_', username)
# Remove any trailing spaces or periods (as they are not allowed at the end of Windows filenames)
cleaned_username = cleaned_username.rstrip('. ')
return cleaned_username
# Get user's conversation history
async def get_conversation_history(user, lines):
user = clean_username(user)
file = get_file_name("context", user + ".txt")
# Get as many lines from the file as needed
contents, length = await get_txt_file(file, lines)
if contents is None:
contents = ""
if length > 50:
await prune_text_file(file, 30)
return contents
async def add_to_conversation_history(message, user, file):
file = clean_username(file)
user = clean_username(user)
file_name = get_file_name("context", file + ".txt")
content = user + ": " + message + "\n"
await append_text_file(file_name, content)
# Read in however many lines of a text file (for context or other text)
# Returns a string with the contents of the file
async def get_txt_file(filename, lines):
# Attempt to read the file and put its contents into a variable
try:
with open(filename, "r", encoding="utf-8") as file: # Open the file in read mode
contents = file.readlines()
length = len(contents)
contents = contents[-lines:]
# Turn contents into a string for easier consumption
# I may not want to do this step. We'll see
history_string = ''.join(contents)
return history_string, length
# Let someone know if the file isn't where we expected to find it.
except FileNotFoundError:
await write_to_log("File " + filename + " not found. Where did you lose it?")
return None, 0
# Panic if we have no idea what's going in here
except Exception as e:
await write_to_log("An unexpected error occurred: " + e)
return None, 0
async def prune_text_file(file, trim_to):
try:
with open(file, "r", encoding="utf-8") as f: # Open the file in read mode
contents = f.readlines()
contents = contents[-trim_to:] # Keep the last 'trim_to' lines
with open(file, "w", encoding="utf-8") as f: # Open the file in write mode
f.writelines(contents) # Write the pruned lines to the file
except FileNotFoundError:
await write_to_log("Could not prune file " + file + " because it doesn't exist.")
# Append text to the end of a text file
async def append_text_file(file, text):
with open(file, 'a+', encoding="utf-8") as context:
context.write(text)
context.close()
# Clean the input provided by the user to the bot!
def clean_user_message(user_input):
# Remove the bot's tag from the input since it's not needed.
user_input = user_input.replace("@Kobold", "")
user_input = user_input.replace("<|endoftext|>", "")
# Remove any spaces before and after the text.
user_input = user_input.strip()
return user_input
# Mistral-medium hallucinates some stuff in parentheses on newlines and then it hallucinates more
def truncate_from_newline_parenthesis(text):
# This regex pattern matches an open parenthesis at the start of any line within the text
pattern = r'^\('
# Use the MULTILINE flag to ensure ^ matches the start of each line
match = re.search(pattern, text, re.MULTILINE)
# If a match is found, return the substring up to that point, else return the original string
if match:
return text[:match.start()]
else:
return text
async def clean_llm_reply(message, user, bot):
# Clean the text and prepare it for posting
dirty_message = message.replace(bot + ":", "")
clean_message = dirty_message.replace(user + ":", "")
clean_message = clean_message.strip()
clean_message = truncate_from_newline_parenthesis(clean_message)
parts = clean_message.split("#", 1)
parts2 = parts[0].split("User1", 1) # Mistral-medium hallucination
parts3 = parts2[0].split("@", 1) # Mistral-medium hallucination
# Return nice and clean message
return parts3[0]
# Get the current bot character in a prompt-friendly format
def get_character(character_card):
# Your name is <name>.
character = "Your name is " + character_card["name"] + ". "
# Your name is <name>. You are a <persona>.
character = character + "You are " + character_card["persona"] + ". "
# Instructions on what the bot should do. This is where an instruction model will get its stuff.
character = character + character_card["instructions"]
examples = [] # put example responses here
# Example messages!
character = character + " Here are examples of how you speak: " + \
"\n" + '\n'.join(examples) + "\n"
return character
# Get the contents of a character file (which should contain everything about the character)
async def get_character_card(name):
# Get the file name and then its contents
file = get_file_name("characters", name)
contents = await get_json_file(file)
character = {}
if contents != None:
character.update(contents)
# return the contents of the JSON file
return character
# Get the list of all available characters (files in the character directory, hopefully)
def get_file_list(directory):
# Try to get the list of character files from the directory provided.
try:
dir_path = directory + "\\"
files = os.listdir(dir_path)
except FileNotFoundError:
files = []
except OSError:
files = []
# Return either the list of files or a blank list.
return files
def image_from_string(image_string):
img = base64.b64decode(image_string)
name = "image_" + datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + ".png"
with open(name, 'wb') as f:
f.write(img)
return name