forked from jswope00/AI-MicroApps
-
Notifications
You must be signed in to change notification settings - Fork 0
/
handlers.py
257 lines (223 loc) · 10.1 KB
/
handlers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
import openai
import anthropic
import google.generativeai as genai
import rag_pipeline
import requests
import os
from dotenv import load_dotenv
import re
load_dotenv()
# fetching api key for LLM interactions
def get_api_key(service_name):
"""Retrieve API key from environment variables based on service name."""
env_var_name = f"{service_name.upper()}_API_KEY"
api_key = os.getenv(env_var_name)
if not api_key:
raise ValueError(f"API key for {service_name} not found in environment variables.")
return api_key
# chat history formatting for different LLMs
def format_chat_history(chat_history, family):
"""Format chat history based on LLM family."""
formatted_history = []
if len(chat_history) > 0:
for history in chat_history:
user_content = history["user"]
assistant_content = history["assistant"]
if family == "gemini":
formatted_history.extend([
{"role": "user", "parts": [user_content]},
{"role": "model", "parts": [assistant_content]}
])
else:
formatted_history.extend([
{"role": "user", "content": user_content},
{"role": "assistant", "content": assistant_content}
])
return formatted_history
# openai llm handler
def handle_openai(context):
"""Handle requests for OpenAI models."""
if not context["supports_image"] and context.get("image_urls"):
return "Images are not supported by selected model."
try:
openai.api_key = get_api_key("openai")
messages = format_chat_history(context["chat_history"], "openai") + [
{"role": "system", "content": context["SYSTEM_PROMPT"]},
{"role": "assistant", "content": context["phase_instructions"]},
{"role": "user", "content": context["user_prompt"]}
]
if context["supports_image"] and context["image_urls"]:
messages.insert(2, {"role": "user", "content": [{"type": "image_url", "image_url": {"url": url}} for url in
context["image_urls"]]})
response = openai.chat.completions.create(
model=context["model"],
messages=messages,
temperature=context["temperature"],
max_tokens=context["max_tokens"],
top_p=context["top_p"],
frequency_penalty=context["frequency_penalty"],
presence_penalty=context["presence_penalty"]
)
input_price = int(response.usage.prompt_tokens) * context["price_input_token_1M"] / 1000000
output_price = int(response.usage.completion_tokens) * context["price_output_token_1M"] / 1000000
total_price = input_price + output_price
context['TOTAL_PRICE'] += total_price
return response.choices[0].message.content
except Exception as e:
return f"Unexpected error while handling OpenAI request: {e}"
# claude llm handler
def handle_claude(context):
"""Handle requests for Claude models."""
if not context["supports_image"] and context.get("image_urls"):
return "Images are not supported by selected model."
try:
client = anthropic.Anthropic(api_key=get_api_key("claude"))
messages = format_chat_history(context["chat_history"], "claude") + [
{"role": "user", "content": [{"type": "text", "text": context["user_prompt"]}]},
{"role": "assistant", "content": [{"type": "text", "text": context["phase_instructions"]}]}
]
if context["supports_image"] and context["image_urls"]:
for image_url in context["image_urls"]:
# Extract base64 data from the image URL
base64_data = image_url.split(",")[1]
mime_type = re.search(r"data:(.*?);base64,", image_url).group(1) if re.search(r"data:(.*?);base64,",
image_url) else None
# Add image to the messages
messages.append({
"role": "user",
"content": [{
"type": "image",
"source": {
"type": "base64",
"media_type": mime_type,
"data": base64_data
}
}]
})
response = client.messages.create(
model=context["model"],
max_tokens=context["max_tokens"],
temperature=context["temperature"],
system=f"{context['SYSTEM_PROMPT']}",
messages=messages
)
input_price = int(response.usage.input_tokens) * context["price_input_token_1M"] / 1000000
output_price = int(response.usage.output_tokens) * context["price_output_token_1M"] / 1000000
total_price = input_price + output_price
context['TOTAL_PRICE'] += total_price
return '\n'.join([block.text for block in response.content if block.type == 'text'])
except Exception as e:
return f"Unexpected error while handling Claude request: {e}"
# gemini llm handler
def handle_gemini(context):
"""Handle requests for Gemini models."""
if not context["supports_image"] and context.get("image_urls"):
return "Images are not supported by selected model."
try:
genai.configure(api_key=get_api_key("google"))
messages = format_chat_history(context["chat_history"], "gemini") + [
{"role": "user", "parts": [context["user_prompt"]]},
{"role": "model", "parts": [context["phase_instructions"]]}
]
if context["supports_image"] and context["image_urls"]:
for image_url in context["image_urls"]:
# Add image to the messages
messages.append({
"role": "user",
"parts": [image_url]
})
chat_session = genai.GenerativeModel(
model_name=context["model"],
generation_config= {"temperature": context["temperature"],"top_p": context["top_p"],"max_output_tokens": context["max_tokens"],"response_mime_type":"text/plain"},
system_instruction=f"{context['SYSTEM_PROMPT']}"
).start_chat(history=messages)
response = chat_session.send_message(context["user_prompt"])
return response.text
except Exception as e:
return f"Unexpected error while handling Gemini request: {e}"
# perplexity handler
def handle_perplexity(context):
"""Handle requests for Perplexity models."""
if not context["supports_image"] and context.get("image_urls"):
return "Images are not supported by selected model."
api_key = get_api_key("perplexity")
url = "https://api.perplexity.ai/chat/completions"
# Prepare messages
messages = [
{"role": "system", "content": context["SYSTEM_PROMPT"] + context["phase_instructions"]}
] + format_chat_history(context["chat_history"], "perplexity") + [
{"role": "user", "content": context["user_prompt"]}
]
# Add image URLs if supported
if context["supports_image"] and context["image_urls"]:
for image_url in context["image_urls"]:
messages.append({
"role": "user",
"content": {
"type": "image_url",
"image_url": {"url": image_url}
}
})
# Prepare payload
payload = {
"model": context["model"],
"messages": messages
}
# Prepare headers
headers = {
"accept": "application/json",
"content-type": "application/json",
"authorization": f"Bearer {api_key}"
}
# Make the API request
try:
response = requests.post(url, json=payload, headers=headers)
response.raise_for_status() # Raise an error for bad status codes
response_json = response.json()
if "choices" in response_json and len(response_json["choices"]) > 0:
return response_json["choices"][0]["message"]["content"]
else:
return "Unexpected response format from Perplexity API."
except requests.exceptions.HTTPError as http_err:
return f"HTTP error occurred while handling Perplexity request: {http_err}"
except requests.exceptions.RequestException as req_err:
return f"Error occurred while making the Perplexity request: {req_err}"
def rag_handler(context):
"""
RAG Handler that processes the document, retrieves relevant information,
and generates a response using the OpenAI language model.
Args:
- context: A dictionary containing the file path, user prompt, and LLM configuration.
Returns:
- Generated response and cost.
"""
# Step 1: Extract necessary information from the context
file_path = context.get("file_path", None)
user_prompt = context.get("user_prompt", "")
if not file_path:
raise ValueError("File path is required for RAG-based generation.")
if not user_prompt:
raise ValueError("User prompt is required.")
# Step 2: Check and store metadata and embeddings if not already present
rag_pipeline.check_and_store_metadata_and_embeddings(file_path)
# Step 4: Retrieve relevant documents based on the user's query and generate a response
try:
# Call the retrieval and response generation pipeline
rag_response, cost = rag_pipeline.retrieve_and_generate_response(
question= user_prompt,
template_text= str(context["phase_instructions"])+" User answer is "+ user_prompt
)
print(rag_response,cost)
# Step 5: Update the context with the cost (if applicable)
context["TOTAL_PRICE"] = context.get("TOTAL_PRICE", 0) + (cost if cost else 0)
return rag_response
except Exception as e:
return f"Error during RAG processing: {e}"
# Mapping of model families to handler functions
HANDLERS = {
"openai": handle_openai,
"claude": handle_claude,
"gemini": handle_gemini,
"perplexity": handle_perplexity,
"rag":rag_handler
}