diff --git a/packages/core/src/interfaces/ai/agent.ts b/packages/core/src/interfaces/ai/agent.ts index 882604a..eb1bd7f 100644 --- a/packages/core/src/interfaces/ai/agent.ts +++ b/packages/core/src/interfaces/ai/agent.ts @@ -1,15 +1,20 @@ import { AppContext } from "src/interfaces/app"; -import { Tool } from "./tool"; export type AgentMessage = { + imageUrl?: string; + role: "assistant" | "user"; + text: string; + threadId: string; +}; + +export type UserMessage = { imageUrl?: string; text: string; }; type RespondInput = { ctx: AppContext; - message: AgentMessage; - tools: Tool[]; + message: UserMessage; }; export type Agent = { diff --git a/packages/core/src/interfaces/app.ts b/packages/core/src/interfaces/app.ts index 282aa28..b87cb09 100644 --- a/packages/core/src/interfaces/app.ts +++ b/packages/core/src/interfaces/app.ts @@ -1,9 +1,15 @@ +import { AgentMessage, Tool } from "./ai"; import { Chat } from "./chat"; import { KV } from "./storage/kv"; import { User } from "./user"; -export type AppContext = { - chat: T; - kv: KV; - user: User; -}; +export interface AppContext { + readonly chat: T; + readonly kv: KV; + readonly messages: Iterable; + readonly tools: Tool[]; + readonly user: User; + + pushMessage(message: AgentMessage): Promise; + onNewMessage(handler: (message: AgentMessage) => Promise): void; +} diff --git a/packages/functions/src/handlers/telegram-webhook-handler.ts b/packages/functions/src/handlers/telegram-webhook-handler.ts index 0dfe245..c04d400 100644 --- a/packages/functions/src/handlers/telegram-webhook-handler.ts +++ b/packages/functions/src/handlers/telegram-webhook-handler.ts @@ -2,10 +2,46 @@ import { Config } from "sst/node/config"; import { kv } from "@bubby/aws"; import { AppContext } from "@bubby/core/interfaces/app"; -import { ChatPhoto, ChatText } from "@bubby/core/interfaces/chat"; +import { Chat, ChatPhoto, ChatText } from "@bubby/core/interfaces/chat"; import { agent, speech } from "@bubby/openai"; -import { handleWebhook } from "@bubby/telegram"; +import { handleWebhook, OnXxxInput } from "@bubby/telegram"; import { tools } from "src/tools"; +import { User } from "@bubby/core/interfaces/user"; +import { AgentMessage } from "@bubby/core/interfaces/ai"; + +class AppContextImpt implements AppContext { + public readonly chat: T; + public readonly messages: AgentMessage[] = []; + public readonly user: User; + + private onNewMessageHandlers: Array< + Parameters[0] + > = []; + + constructor(input: OnXxxInput) { + this.chat = input.chat; + this.user = input.user; + } + + get kv() { + return kv; + } + + get tools() { + return tools; + } + + async pushMessage(message: AgentMessage): Promise { + this.messages.push(message); + for (const handler of this.onNewMessageHandlers) { + await handler(message); + } + } + + onNewMessage(handler: (message: AgentMessage) => Promise): void { + this.onNewMessageHandlers.push(handler); + } +} export async function handleTelegramWebhook(secretToken: string, update: any) { const expectedSecretToken = Config.TELEGRAM_WEBHOOK_SECRET_TOKEN; @@ -14,10 +50,9 @@ export async function handleTelegramWebhook(secretToken: string, update: any) { return; } - console.log(JSON.stringify(update, null, 2)); await handleWebhook({ - onPhoto: (input) => replyToPhoto({ ...input, kv }), - onText: (input) => replyToText({ ...input, kv }), + onPhoto: (input) => replyToPhoto(new AppContextImpt(input)), + onText: (input) => replyToText(new AppContextImpt(input)), speech, update, }); @@ -29,11 +64,11 @@ async function replyToPhoto(ctx: AppContext): Promise { imageUrl: await chat.getPhotoUrl(), text: chat.getPhotoCaption() ?? "", }; - return agent.respond({ ctx, message, tools }); + return agent.respond({ ctx, message }); } async function replyToText(ctx: AppContext): Promise { const text = await ctx.chat.getTextMessage(); const message = { text }; - return agent.respond({ ctx, message, tools }); + return agent.respond({ ctx, message }); } diff --git a/packages/openai/src/agent.ts b/packages/openai/src/agent.ts index 16017eb..dc5226f 100644 --- a/packages/openai/src/agent.ts +++ b/packages/openai/src/agent.ts @@ -1,20 +1,20 @@ import { AssistantStream } from "openai/lib/AssistantStream"; import { FunctionToolCall } from "openai/resources/beta/threads/runs/steps"; -import { Agent, Tool } from "@bubby/core/interfaces/ai"; +import { Agent } from "@bubby/core/interfaces/ai"; import { AppContext } from "@bubby/core/interfaces/app"; -import { assistantSendMessage } from "./internal/assistant_message"; +import { + assistantSendMessage, + endOfThinking, +} from "./internal/assistant_message"; import { assistantThreadIdUpsert } from "./internal/assistant_thread"; import { assistantSubmitToolOutputs } from "./internal/assistant_tool_outputs"; class AgentStreamer { - constructor( - private ctx: AppContext, - private threadId: string, - private tools: Tool[] - ) {} + constructor(private ctx: AppContext, private threadId: string) {} async consume(stream: AssistantStream): Promise { + const { ctx, threadId } = this; let state: | { type: "code_interpreter" | "file_search" } | { type: "function"; toolCall: FunctionToolCall } @@ -28,9 +28,17 @@ class AgentStreamer { functionToolCalls.push(state.toolCall); break; case "text": - const markdown = state.text; + let markdown = state.text; + + const thinkingIndex = markdown.indexOf(endOfThinking); + if (thinkingIndex > -1) { + markdown = markdown + .slice(thinkingIndex + endOfThinking.length) + .trim(); + } + if (markdown.length > 0) { - void this.ctx.chat.reply({ type: "markdown", markdown }); + void ctx.chat.reply({ type: "markdown", markdown }); } break; } @@ -40,7 +48,7 @@ class AgentStreamer { stream .on("textCreated", () => { resetState({ type: "text", text: "" }); - void this.ctx.chat.typing(); + void ctx.chat.typing(); }) .on("textDelta", (textDelta) => { if (state?.type === "text") { @@ -65,15 +73,26 @@ class AgentStreamer { } break; } + }) + .on("messageDone", (message) => { + for (const content of message.content) { + if (content.type === "text") { + ctx.pushMessage({ + role: "assistant", + threadId: message.thread_id, + text: content.text.value, + }); + } + } + console.log(JSON.stringify(message, null, 2)); }); const run = await stream.finalRun(); // wait for OpenAI resetState(); // flush the last state - const { ctx, threadId, tools } = this; if (functionToolCalls.length === 0) { if (run.status === "failed" || run.status === "incomplete") { - for (const tool of tools) { + for (const tool of ctx.tools) { if (tool.name === "new_thread") { // force new thread in case of run failure const parameters = tool.parametersSchema.parse({}); @@ -87,16 +106,16 @@ class AgentStreamer { } const runId = run.id; - const input = { ctx, runId, threadId, tools }; + const input = { ctx, runId, threadId }; return assistantSubmitToolOutputs(input, functionToolCalls); } } export const agent: Agent = { - respond: async ({ ctx, message, tools }) => { + respond: async ({ ctx, message }) => { const threadId = await assistantThreadIdUpsert(ctx); - const streamer = new AgentStreamer(ctx, threadId, tools); - const firstStream = await assistantSendMessage(threadId, message, tools); + const streamer = new AgentStreamer(ctx, threadId); + const firstStream = await assistantSendMessage(ctx, threadId, message); let stream: typeof firstStream | undefined = firstStream; while (typeof stream !== "undefined") { diff --git a/packages/openai/src/internal/assistant_message.ts b/packages/openai/src/internal/assistant_message.ts index cb63a9e..63bad80 100644 --- a/packages/openai/src/internal/assistant_message.ts +++ b/packages/openai/src/internal/assistant_message.ts @@ -3,16 +3,26 @@ import { AssistantStream } from "openai/lib/AssistantStream"; import { FunctionParameters } from "openai/resources"; import { AssistantTool } from "openai/resources/beta/assistants"; import { MessageContentPartParam } from "openai/resources/beta/threads/messages"; +import { RunCreateParams } from "openai/resources/beta/threads/runs/runs"; -import { AgentMessage, Tool } from "@bubby/core/interfaces/ai"; +import { UserMessage } from "@bubby/core/interfaces/ai"; +import { AppContext } from "@bubby/core/interfaces/app"; import { assistantId, threads } from "./openai"; +export const endOfThinking = "---- END OF THINKING ---"; + export async function assistantSendMessage( + ctx: AppContext, threadId: string, - { imageUrl, text }: AgentMessage, - tools: Tool[] + { imageUrl, text: textWithoutMetadata }: UserMessage ): Promise { - await threads.messages.create(threadId, { + const text = `---- METADATA ---- +The current time is ${new Date().toISOString()}. +---- END OF METADATA --- + +${textWithoutMetadata} +`; + const message: RunCreateParams.AdditionalMessage = { content: [ { type: "text", @@ -28,20 +38,49 @@ export async function assistantSendMessage( : []), ], role: "user", - }); + }; + await ctx.pushMessage({ role: "user", threadId, text }); const instructions = `Your name is Bubby. You are a personal assistant bot. Ensure efficient and user-friendly interaction, focusing on simplicity and clarity in communication. You provide concise and direct answers. Maintain a straightforward and easy-going conversation tone. Keep responses brief, typically in short sentences. -You can only reply to text or photo messages.`; + +User message format: +1. Each user message starts with a METADATA section, including the current time +2. The actual user message follows the METADATA section. + +Bot message format: +1. Each bot message starts with a THINKING section +2. The THINKING section re-count the different topics in the thread +3. The section ends with '${endOfThinking}' +4. The actual bot message follows the THINKING section + +Thread management: +1. Before EVERY response, you MUST evaluate if a new thread is needed. +2. Use the new_thread tool when there are more than one distinct questions or tasks. +3. For ambiguous cases, ask the user to confirm whether they want to start another thread or continue the current one. + +Example: +User: What's the weather like today? +Bubby: It's sunny and 24°C today. +User: How about tomorrow? +Bubby: Tomorrow will be cloudy with a high of 21°C. +User: Can you help me with a recipe? +Bubby uses the new_thread tool +Bubby: Certainly! What kind of recipe are you looking for? + +Remember, your context is limited, so managing threads efficiently is crucial. +ALWAYS evaluate if a new thread is needed before responding. If in doubt, ask the user. +`; return threads.runs.stream(threadId, { assistant_id: assistantId, instructions, + additional_messages: [message], model: "gpt-4o-mini", tools: [ { type: "code_interpreter" }, { type: "file_search" }, - ...tools.map((tool) => { + ...ctx.tools.map((tool) => { return { function: { description: tool.description, diff --git a/packages/openai/src/internal/assistant_thread.ts b/packages/openai/src/internal/assistant_thread.ts index 266990a..c671754 100644 --- a/packages/openai/src/internal/assistant_thread.ts +++ b/packages/openai/src/internal/assistant_thread.ts @@ -1,25 +1,37 @@ +import { ThreadCreateParams } from "openai/resources/beta/threads/threads"; + import { AppContext } from "@bubby/core/interfaces/app"; import { threads } from "./openai"; -export async function assistantThreadIdInsert({ - chat, - kv, - user, -}: AppContext): Promise { - const memory = - (await kv.get(chat.getChannelId(), "memory")) ?? - `User's name: ${user.getUserName()}\nUser's date of birth: Unknown\nUser's relationship status: Unknown`; - const threadId = ( - await threads.create({ - messages: [ - { - content: `---- START OF MEMORY ----\n${memory}\n---- END OF MEMORY ----`, - role: "user", - }, - ], - }) - ).id; +export async function assistantThreadIdInsert( + ctx: AppContext +): Promise { + const { chat, kv } = ctx; + + const messages: ThreadCreateParams.Message[] = []; + for (const { role, text } of ctx.messages) { + // populate new thread with recent messages + messages.push({ role, content: text }); + } + + const memory = await kv.get(chat.getChannelId(), "memory"); + if (typeof memory === "string") { + messages.push({ + content: `---- START OF MEMORY ----\n${memory}\n---- END OF MEMORY ----`, + role: "user", + }); + } + + const threadId = (await threads.create({ messages })).id; await kv.set(chat.getChannelId(), "assistant-thread-id", threadId); + + ctx.onNewMessage(async ({ role, text, threadId: newMessageThreadId }) => { + if (newMessageThreadId !== threadId) { + // sync new messages to our thread + await threads.messages.create(threadId, { content: text, role }); + } + }); + return threadId; } diff --git a/packages/openai/src/internal/assistant_tool_outputs.ts b/packages/openai/src/internal/assistant_tool_outputs.ts index 951883b..f3dd5a2 100644 --- a/packages/openai/src/internal/assistant_tool_outputs.ts +++ b/packages/openai/src/internal/assistant_tool_outputs.ts @@ -15,14 +15,14 @@ type AssistantSubmitToolOutputsInput = { ctx: AppContext; runId: string; threadId: string; - tools: Tool[]; }; export async function assistantSubmitToolOutputs( input: AssistantSubmitToolOutputsInput, functionToolCalls: FunctionToolCall[] ): Promise { - const { runId, threadId, tools } = input; + const { ctx, runId, threadId } = input; + const { tools } = ctx; const tool_outputs: RunSubmitToolOutputsParams.ToolOutput[] = []; for (const toolCall of functionToolCalls) { diff --git a/packages/telegram/src/internal/chat.ts b/packages/telegram/src/internal/chat.ts index 5484586..6d72a98 100644 --- a/packages/telegram/src/internal/chat.ts +++ b/packages/telegram/src/internal/chat.ts @@ -44,7 +44,6 @@ export abstract class Chat< switch (reply.type) { case "markdown": const safeHtml = convertMarkdownToSafeHtml(reply.markdown); - console.log({ channelId, reply, safeHtml }); await this.replyWrapper(reply.type, ctx.replyWithHTML(safeHtml)); break; case "photo": diff --git a/packages/telegram/src/webhook.ts b/packages/telegram/src/webhook.ts index 38a58fb..f01d803 100644 --- a/packages/telegram/src/webhook.ts +++ b/packages/telegram/src/webhook.ts @@ -8,7 +8,7 @@ import { bot } from "./internal/telegram"; import { User } from "./internal/user"; import { commands } from "./internal/commands"; -type OnXxxInput = { +export type OnXxxInput = { chat: T; user: User; };