Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement automatic thread management logic #18

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions packages/core/src/interfaces/ai/agent.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
import { AppContext } from "src/interfaces/app";
import { Tool } from "./tool";

export type AgentMessage = {
imageUrl?: string;
role: "assistant" | "user";
text: string;
threadId: string;
};

export type UserMessage = {
imageUrl?: string;
text: string;
};

type RespondInput = {
ctx: AppContext;
message: AgentMessage;
tools: Tool<any>[];
message: UserMessage;
};

export type Agent = {
Expand Down
16 changes: 11 additions & 5 deletions packages/core/src/interfaces/app.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import { AgentMessage, Tool } from "./ai";
import { Chat } from "./chat";
import { KV } from "./storage/kv";
import { User } from "./user";

export type AppContext<T extends Chat = Chat> = {
chat: T;
kv: KV;
user: User;
};
export interface AppContext<T extends Chat = Chat> {
readonly chat: T;
readonly kv: KV;
readonly messages: Iterable<AgentMessage>;
readonly tools: Tool<any>[];
readonly user: User;

pushMessage(message: AgentMessage): Promise<void>;
onNewMessage(handler: (message: AgentMessage) => Promise<void>): void;
}
49 changes: 42 additions & 7 deletions packages/functions/src/handlers/telegram-webhook-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,46 @@ import { Config } from "sst/node/config";

import { kv } from "@bubby/aws";
import { AppContext } from "@bubby/core/interfaces/app";
import { ChatPhoto, ChatText } from "@bubby/core/interfaces/chat";
import { Chat, ChatPhoto, ChatText } from "@bubby/core/interfaces/chat";
import { agent, speech } from "@bubby/openai";
import { handleWebhook } from "@bubby/telegram";
import { handleWebhook, OnXxxInput } from "@bubby/telegram";
import { tools } from "src/tools";
import { User } from "@bubby/core/interfaces/user";
import { AgentMessage } from "@bubby/core/interfaces/ai";

class AppContextImpt<T extends Chat> implements AppContext<T> {
public readonly chat: T;
public readonly messages: AgentMessage[] = [];
public readonly user: User;

private onNewMessageHandlers: Array<
Parameters<AppContext["onNewMessage"]>[0]
> = [];

constructor(input: OnXxxInput<T>) {
this.chat = input.chat;
this.user = input.user;
}

get kv() {
return kv;
}

get tools() {
return tools;
}

async pushMessage(message: AgentMessage): Promise<void> {
this.messages.push(message);
for (const handler of this.onNewMessageHandlers) {
await handler(message);
}
}

onNewMessage(handler: (message: AgentMessage) => Promise<void>): void {
this.onNewMessageHandlers.push(handler);
}
}

export async function handleTelegramWebhook(secretToken: string, update: any) {
const expectedSecretToken = Config.TELEGRAM_WEBHOOK_SECRET_TOKEN;
Expand All @@ -14,10 +50,9 @@ export async function handleTelegramWebhook(secretToken: string, update: any) {
return;
}

console.log(JSON.stringify(update, null, 2));
await handleWebhook({
onPhoto: (input) => replyToPhoto({ ...input, kv }),
onText: (input) => replyToText({ ...input, kv }),
onPhoto: (input) => replyToPhoto(new AppContextImpt(input)),
onText: (input) => replyToText(new AppContextImpt(input)),
speech,
update,
});
Expand All @@ -29,11 +64,11 @@ async function replyToPhoto(ctx: AppContext<ChatPhoto>): Promise<void> {
imageUrl: await chat.getPhotoUrl(),
text: chat.getPhotoCaption() ?? "",
};
return agent.respond({ ctx, message, tools });
return agent.respond({ ctx, message });
}

async function replyToText(ctx: AppContext<ChatText>): Promise<void> {
const text = await ctx.chat.getTextMessage();
const message = { text };
return agent.respond({ ctx, message, tools });
return agent.respond({ ctx, message });
}
51 changes: 35 additions & 16 deletions packages/openai/src/agent.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import { AssistantStream } from "openai/lib/AssistantStream";
import { FunctionToolCall } from "openai/resources/beta/threads/runs/steps";

import { Agent, Tool } from "@bubby/core/interfaces/ai";
import { Agent } from "@bubby/core/interfaces/ai";
import { AppContext } from "@bubby/core/interfaces/app";
import { assistantSendMessage } from "./internal/assistant_message";
import {
assistantSendMessage,
endOfThinking,
} from "./internal/assistant_message";
import { assistantThreadIdUpsert } from "./internal/assistant_thread";
import { assistantSubmitToolOutputs } from "./internal/assistant_tool_outputs";

class AgentStreamer {
constructor(
private ctx: AppContext,
private threadId: string,
private tools: Tool<any>[]
) {}
constructor(private ctx: AppContext, private threadId: string) {}

async consume(stream: AssistantStream): Promise<AssistantStream | undefined> {
const { ctx, threadId } = this;
let state:
| { type: "code_interpreter" | "file_search" }
| { type: "function"; toolCall: FunctionToolCall }
Expand All @@ -28,9 +28,17 @@ class AgentStreamer {
functionToolCalls.push(state.toolCall);
break;
case "text":
const markdown = state.text;
let markdown = state.text;

const thinkingIndex = markdown.indexOf(endOfThinking);
if (thinkingIndex > -1) {
markdown = markdown
.slice(thinkingIndex + endOfThinking.length)
.trim();
}

if (markdown.length > 0) {
void this.ctx.chat.reply({ type: "markdown", markdown });
void ctx.chat.reply({ type: "markdown", markdown });
}
break;
}
Expand All @@ -40,7 +48,7 @@ class AgentStreamer {
stream
.on("textCreated", () => {
resetState({ type: "text", text: "" });
void this.ctx.chat.typing();
void ctx.chat.typing();
})
.on("textDelta", (textDelta) => {
if (state?.type === "text") {
Expand All @@ -65,15 +73,26 @@ class AgentStreamer {
}
break;
}
})
.on("messageDone", (message) => {
for (const content of message.content) {
if (content.type === "text") {
ctx.pushMessage({
role: "assistant",
threadId: message.thread_id,
text: content.text.value,
});
}
}
console.log(JSON.stringify(message, null, 2));
});

const run = await stream.finalRun(); // wait for OpenAI
resetState(); // flush the last state

const { ctx, threadId, tools } = this;
if (functionToolCalls.length === 0) {
if (run.status === "failed" || run.status === "incomplete") {
for (const tool of tools) {
for (const tool of ctx.tools) {
if (tool.name === "new_thread") {
// force new thread in case of run failure
const parameters = tool.parametersSchema.parse({});
Expand All @@ -87,16 +106,16 @@ class AgentStreamer {
}

const runId = run.id;
const input = { ctx, runId, threadId, tools };
const input = { ctx, runId, threadId };
return assistantSubmitToolOutputs(input, functionToolCalls);
}
}

export const agent: Agent = {
respond: async ({ ctx, message, tools }) => {
respond: async ({ ctx, message }) => {
const threadId = await assistantThreadIdUpsert(ctx);
const streamer = new AgentStreamer(ctx, threadId, tools);
const firstStream = await assistantSendMessage(threadId, message, tools);
const streamer = new AgentStreamer(ctx, threadId);
const firstStream = await assistantSendMessage(ctx, threadId, message);

let stream: typeof firstStream | undefined = firstStream;
while (typeof stream !== "undefined") {
Expand Down
53 changes: 46 additions & 7 deletions packages/openai/src/internal/assistant_message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,26 @@ import { AssistantStream } from "openai/lib/AssistantStream";
import { FunctionParameters } from "openai/resources";
import { AssistantTool } from "openai/resources/beta/assistants";
import { MessageContentPartParam } from "openai/resources/beta/threads/messages";
import { RunCreateParams } from "openai/resources/beta/threads/runs/runs";

import { AgentMessage, Tool } from "@bubby/core/interfaces/ai";
import { UserMessage } from "@bubby/core/interfaces/ai";
import { AppContext } from "@bubby/core/interfaces/app";
import { assistantId, threads } from "./openai";

export const endOfThinking = "---- END OF THINKING ---";

export async function assistantSendMessage(
ctx: AppContext,
threadId: string,
{ imageUrl, text }: AgentMessage,
tools: Tool<any>[]
{ imageUrl, text: textWithoutMetadata }: UserMessage
): Promise<AssistantStream> {
await threads.messages.create(threadId, {
const text = `---- METADATA ----
The current time is ${new Date().toISOString()}.
---- END OF METADATA ---

${textWithoutMetadata}
`;
const message: RunCreateParams.AdditionalMessage = {
content: [
{
type: "text",
Expand All @@ -28,20 +38,49 @@ export async function assistantSendMessage(
: []),
],
role: "user",
});
};
await ctx.pushMessage({ role: "user", threadId, text });

const instructions = `Your name is Bubby.
You are a personal assistant bot. Ensure efficient and user-friendly interaction, focusing on simplicity and clarity in communication.
You provide concise and direct answers. Maintain a straightforward and easy-going conversation tone. Keep responses brief, typically in short sentences.
You can only reply to text or photo messages.`;

User message format:
1. Each user message starts with a METADATA section, including the current time
2. The actual user message follows the METADATA section.

Bot message format:
1. Each bot message starts with a THINKING section
2. The THINKING section re-count the different topics in the thread
3. The section ends with '${endOfThinking}'
4. The actual bot message follows the THINKING section

Thread management:
1. Before EVERY response, you MUST evaluate if a new thread is needed.
2. Use the new_thread tool when there are more than one distinct questions or tasks.
3. For ambiguous cases, ask the user to confirm whether they want to start another thread or continue the current one.

Example:
User: What's the weather like today?
Bubby: It's sunny and 24°C today.
User: How about tomorrow?
Bubby: Tomorrow will be cloudy with a high of 21°C.
User: Can you help me with a recipe?
Bubby uses the new_thread tool
Bubby: Certainly! What kind of recipe are you looking for?

Remember, your context is limited, so managing threads efficiently is crucial.
ALWAYS evaluate if a new thread is needed before responding. If in doubt, ask the user.
`;
return threads.runs.stream(threadId, {
assistant_id: assistantId,
instructions,
additional_messages: [message],
model: "gpt-4o-mini",
tools: [
{ type: "code_interpreter" },
{ type: "file_search" },
...tools.map<AssistantTool>((tool) => {
...ctx.tools.map<AssistantTool>((tool) => {
return {
function: {
description: tool.description,
Expand Down
48 changes: 30 additions & 18 deletions packages/openai/src/internal/assistant_thread.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,37 @@
import { ThreadCreateParams } from "openai/resources/beta/threads/threads";

import { AppContext } from "@bubby/core/interfaces/app";
import { threads } from "./openai";

export async function assistantThreadIdInsert({
chat,
kv,
user,
}: AppContext): Promise<string> {
const memory =
(await kv.get(chat.getChannelId(), "memory")) ??
`User's name: ${user.getUserName()}\nUser's date of birth: Unknown\nUser's relationship status: Unknown`;
const threadId = (
await threads.create({
messages: [
{
content: `---- START OF MEMORY ----\n${memory}\n---- END OF MEMORY ----`,
role: "user",
},
],
})
).id;
export async function assistantThreadIdInsert(
ctx: AppContext
): Promise<string> {
const { chat, kv } = ctx;

const messages: ThreadCreateParams.Message[] = [];
for (const { role, text } of ctx.messages) {
// populate new thread with recent messages
messages.push({ role, content: text });
}

const memory = await kv.get(chat.getChannelId(), "memory");
if (typeof memory === "string") {
messages.push({
content: `---- START OF MEMORY ----\n${memory}\n---- END OF MEMORY ----`,
role: "user",
});
}

const threadId = (await threads.create({ messages })).id;
await kv.set(chat.getChannelId(), "assistant-thread-id", threadId);

ctx.onNewMessage(async ({ role, text, threadId: newMessageThreadId }) => {
if (newMessageThreadId !== threadId) {
// sync new messages to our thread
await threads.messages.create(threadId, { content: text, role });
}
});

return threadId;
}

Expand Down
4 changes: 2 additions & 2 deletions packages/openai/src/internal/assistant_tool_outputs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ type AssistantSubmitToolOutputsInput = {
ctx: AppContext;
runId: string;
threadId: string;
tools: Tool<any>[];
};

export async function assistantSubmitToolOutputs(
input: AssistantSubmitToolOutputsInput,
functionToolCalls: FunctionToolCall[]
): Promise<AssistantStream> {
const { runId, threadId, tools } = input;
const { ctx, runId, threadId } = input;
const { tools } = ctx;
const tool_outputs: RunSubmitToolOutputsParams.ToolOutput[] = [];

for (const toolCall of functionToolCalls) {
Expand Down
1 change: 0 additions & 1 deletion packages/telegram/src/internal/chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ export abstract class Chat<
switch (reply.type) {
case "markdown":
const safeHtml = convertMarkdownToSafeHtml(reply.markdown);
console.log({ channelId, reply, safeHtml });
await this.replyWrapper(reply.type, ctx.replyWithHTML(safeHtml));
break;
case "photo":
Expand Down
Loading
Loading