Skip to content

Commit

Permalink
streaming progress
Browse files Browse the repository at this point in the history
  • Loading branch information
lassejaco committed Sep 6, 2024
1 parent dc8c2c6 commit b486288
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 25 deletions.
24 changes: 21 additions & 3 deletions devcon/src/pages/api/ai/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,31 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
if (req.method === 'GET') {
return res.send('hello from server')
} else if (req.method === 'POST') {
const { message, threadID } = JSON.parse(req.body)
console.log(req.body, 'req.body')
const { message, threadID } = req.body

console.log(message, threadID, 'msg thread id')

const result = await api.createMessage('asst_nirZMEbcECQHLSduSq73vmEB', message, threadID)
// Set headers for streaming
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache, no-transform',
Connection: 'keep-alive',
})

// Create a stream for the AI response
const stream = await api.createMessageStream('asst_nirZMEbcECQHLSduSq73vmEB', message, threadID)

// Stream the response to the client
for await (const chunk of stream) {
res.write(JSON.stringify(chunk) + 'chunk_split')
// res.flush() // Ensure the data is sent immediately
}

return res.json(result)
// End the response
// res.write()
res.end()
return
}

return res.send('hello')
Expand Down
141 changes: 137 additions & 4 deletions devcon/src/pages/api/ai/open-ai.ts
Original file line number Diff line number Diff line change
Expand Up @@ -350,10 +350,6 @@ export const api = (() => {
})

const formattedMessages = await Promise.all(formattedMessagesPromises)
// for (const message of messages.data.reverse()) {
// // @ts-ignore
// console.log(`${message.role} > ${message.content[0].text.value}`)
// }

return {
runID: run.id,
Expand All @@ -371,6 +367,143 @@ export const api = (() => {
}
}
},
createMessageStream: async (assistantID: string, userMessage: string, threadID: string) => {
if (!threadID) {
const thread = await _interface.createThread()
threadID = thread.id
}

await openai.beta.threads.messages.create(threadID, {
role: 'user',
content: `${userMessage}\nSystem: The current date is: ${new Date().toLocaleDateString()}.`,
})

const run = openai.beta.threads.runs.stream(threadID, {
assistant_id: assistantID,
})

let fullMessage = ''
let runID = ''

return {
[Symbol.asyncIterator]: async function* () {
for await (const event of run) {
const eventType = event.event
const eventData = event.data

console.log(event, 'event data')

switch (eventType) {
case 'thread.created':
case 'thread.run.created':
case 'thread.run.queued':
case 'thread.run.in_progress':
case 'thread.run.requires_action':
case 'thread.run.completed':
case 'thread.run.failed':
case 'thread.run.cancelling':
case 'thread.run.cancelled':
case 'thread.run.expired':
case 'thread.run.step.created':
case 'thread.run.step.in_progress':
case 'thread.run.step.completed':
case 'thread.run.step.failed':
case 'thread.run.step.cancelled':
case 'thread.run.step.expired':
case 'thread.message.created':
case 'thread.message.in_progress':
case 'thread.message.completed':
case 'thread.message.incomplete':
yield { type: eventType, data: eventData }
break
case 'thread.message.delta':
if (eventData.delta.content && eventData.delta.content[0].type === 'text') {
const text = eventData.delta.content[0].text.value
fullMessage += text
yield { type: eventType, content: text }
}
break
case 'thread.run.step.delta':
if (eventData.delta.step_details && eventData.delta.step_details.type === 'message_creation') {
const content = eventData.delta.step_details.message_creation.content
if (content && content[0].type === 'text') {
const text = content[0].text.value
fullMessage += text
yield { type: eventType, content: text }
}
}
break
case 'error':
yield { type: 'error', error: eventData }
break
}

if (eventType === 'thread.run.completed') {
runID = eventData.id
}
}

// After the stream is complete, fetch the final run and process annotations
if (runID) {
const completedRun = await openai.beta.threads.runs.retrieve(threadID, runID)
const messages = await openai.beta.threads.messages.list(threadID)

const formattedMessagesPromises = messages.data.reverse().map(async (message: any) => {
const content = message.content[0]

let references

if (content.text.annotations) {
const fileAnnotationsPromises = await content.text.annotations.map(async (annotation: any) => {
if (annotation.type === 'file_citation') {
const file = await openai.files.retrieve(annotation.file_citation.file_id)

// @ts-ignore
const fileUrl = filenameToUrl[file.filename.split('.txt')[0]]

return {
file,
fileUrl: fileUrl,
textReplace: annotation.text,
}
}
})

references = await Promise.all(fileAnnotationsPromises)
}

let text = content.text.value

if (references) {
for (const reference of references) {
text = text.replace(reference.textReplace, ``)
}
}

return {
id: message.id,
role: message.role,
text,
files: references || [],
}
})

const formattedMessages = await Promise.all(formattedMessagesPromises)

yield {
type: 'done',
content: formattedMessages[0].text,
references: formattedMessages[0].files,
threadID,
runID,
status: completedRun.status,
rawMessages: messages.data,
messages: formattedMessages,
}
}
},
}
},
}

return _interface
Expand Down
74 changes: 56 additions & 18 deletions lib/components/ai/overlay.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -45,38 +45,66 @@ const DevaBot = () => {
resetMessages();
};

const [streamingMessage, setStreamingMessage] = React.useState("");

const onSend = async () => {
if (executingQuery) return;

setExecutingQuery(true);
setStreamingMessage("");

try {
let url = "/api/ai";

const result = await fetch(url, {
const response = await fetch("/api/ai", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ message: query, threadID }),
});

if (result.status === 429) {
setError(
"You've hit the rate limit (15 requests per hour). Please try again later."
);
setExecutingQuery(false);

return;
const reader = response.body
.pipeThrough(new TextDecoderStream())
.getReader();

while (true) {
const { value, done } = await reader.read();
if (done) break;

console.log(value, "value");

const chunks = value.split("chunk_split");

for (const chunk of chunks) {
if (chunk.length === 0) continue;

const response = JSON.parse(chunk);

if (response.type === "thread.message.delta") {
setStreamingMessage((prev) => prev + response.content);
}

if (response.type === "done") {
setThreadID(response.threadID);
// setMessages((prevMessages) => [
// ...prevMessages,
// { role: "user", text: query },
// {
// role: "assistant",
// text: response.content,
// files: response.references || [],
// },
// ]);
setMessages(response.messages);
setStreamingMessage("");
setExecutingQuery(false);
}
}
}

const data = await result.json();

setThreadID(data.threadID);
setMessages(data.messages);
} catch (e: any) {
console.error(e, "error");
setError(e.message);
setError("Testing streaming responses, errors will occur.." + e.message);
setExecutingQuery(false);
}

setExecutingQuery(false);
};

React.useEffect(() => {
Expand Down Expand Up @@ -198,6 +226,16 @@ const DevaBot = () => {
</div>
);
})}
{streamingMessage && (
<div className="shrink-0 flex flex-col">
<span className="text-sm opacity-50">
DevAI is responding
</span>
<Markdown className="markdown">
{streamingMessage}
</Markdown>
</div>
)}
</div>
<div className="absolute bottom-0 left-0 right-0 h-10 bg-gradient-to-t from-slate-900 to-transparent pointer-events-none"></div>
</div>
Expand Down

0 comments on commit b486288

Please sign in to comment.