Refactor request handling to improve memory retrieval and conversation summarization processes
This commit is contained in:
@@ -35,7 +35,7 @@ const mori_personality = prompt_xml?.querySelector('mori_personality')?.textCont
|
||||
async function handleRequest(req: Resquest, controller : ReadableStreamDefaultController, user : User, supabaseClient : SupabaseClient) {
|
||||
|
||||
function enqueueJson(data: any) {
|
||||
controller.enqueue(new TextEncoder().encode(`${JSON.stringify(data)}`));
|
||||
controller.enqueue(new TextEncoder().encode(`${JSON.stringify(data)}\n\n`));
|
||||
}
|
||||
|
||||
const showExamples = false;
|
||||
@@ -75,38 +75,291 @@ async function handleRequest(req: Resquest, controller : ReadableStreamDefaultCo
|
||||
|
||||
const requestBody = await req.json();
|
||||
|
||||
/*
|
||||
Summarise the conversation
|
||||
*/
|
||||
enqueueJson({
|
||||
command: "update_status",
|
||||
status_type: "info",
|
||||
message: "Contemplating conversation...",
|
||||
});
|
||||
const summarySystemPrompt = prompt_xml.querySelector('conversation_summariser')?.textContent
|
||||
.replaceAll("{{PERSONALITY_INJECTION}}", mori_personality);
|
||||
const summaryCompletion = await openai.chat.completions.create({
|
||||
model: "gpt-4.1-mini",
|
||||
messages: [
|
||||
{
|
||||
role: "system",
|
||||
content: summarySystemPrompt || "",
|
||||
// Static: gets cached
|
||||
},
|
||||
...requestBody.messages,
|
||||
{
|
||||
role: "assistant",
|
||||
content: `CURRENT_TIME: ${new Date().toISOString()}\n\nCONVERSATION_GIST:\n${requestBody.gist || "No existing context."}`,
|
||||
// Dynamic context injection
|
||||
},
|
||||
],
|
||||
|
||||
/*
|
||||
Summarise the conversation
|
||||
*/
|
||||
|
||||
const summarySystemWork = new Promise<any>(async (resolve, reject) => {
|
||||
|
||||
const summarySystemPrompt = prompt_xml.querySelector('conversation_summariser')?.textContent
|
||||
.replaceAll("{{PERSONALITY_INJECTION}}", mori_personality);
|
||||
const summaryCompletion = await openai.chat.completions.create({
|
||||
model: "gpt-4.1-mini",
|
||||
messages: [
|
||||
{
|
||||
role: "system",
|
||||
content: summarySystemPrompt || "",
|
||||
// Static: gets cached
|
||||
},
|
||||
...requestBody.messages,
|
||||
{
|
||||
role: "assistant",
|
||||
content: `CURRENT_TIME: ${new Date().toISOString()}\n\nCONVERSATION_GIST:\n${requestBody.gist || "No existing context."}`,
|
||||
// Dynamic context injection
|
||||
},
|
||||
],
|
||||
});
|
||||
let summaryJson = JSON.parse(summaryCompletion.choices[0]?.message?.content || "{}");
|
||||
enqueueJson({
|
||||
command: "update_gist",
|
||||
content: summaryJson.context || "",
|
||||
});
|
||||
resolve(summaryJson);
|
||||
|
||||
});
|
||||
let summaryJson = JSON.parse(summaryCompletion.choices[0]?.message?.content || "{}");
|
||||
enqueueJson({
|
||||
command: "update_gist",
|
||||
content: summaryJson.context || "",
|
||||
|
||||
/*
|
||||
Fetch relevant memories from the DB
|
||||
*/
|
||||
|
||||
const memoryFetchWork = new Promise<any>(async (resolve, reject) => {
|
||||
|
||||
// Fetch all the existing tags for the user
|
||||
const { data: tagsData, error: tagsError } = await supabaseClient
|
||||
.schema("mori")
|
||||
.from('tags')
|
||||
.select('id, name')
|
||||
.eq('user_id', user.id);
|
||||
|
||||
let userTags: string[] = [];
|
||||
|
||||
if (tagsError) {
|
||||
console.error("Error fetching tags:", tagsError);
|
||||
reject(tagsError);
|
||||
} else if (tagsData) {
|
||||
userTags = tagsData.map(tag => tag.name);
|
||||
}
|
||||
|
||||
// Now formulate the memory fetch prompt
|
||||
const memoryFetchSystemPrompt = prompt_xml.querySelector('memory_retriever')?.textContent
|
||||
.replaceAll("{{PERSONALITY_INJECTION}}", mori_personality);
|
||||
const memoryFetchCompletion = await openai.chat.completions.create({
|
||||
model: "gpt-4.1-mini",
|
||||
messages: [
|
||||
{
|
||||
role: "system",
|
||||
content: memoryFetchSystemPrompt || "",
|
||||
},
|
||||
...requestBody.messages,
|
||||
{
|
||||
role: "assistant",
|
||||
content: `CURRENT_TIME: ${new Date().toISOString()}\n\nAVAILABLE_TAGS:\n${userTags.join(", ")}`
|
||||
}
|
||||
],
|
||||
});
|
||||
let memoryFetchJson = JSON.parse(memoryFetchCompletion.choices[0]?.message?.content || "{}");
|
||||
|
||||
// Fetch memories based on the suggested tags
|
||||
let fetchedMemories: any[] = [];
|
||||
|
||||
if (memoryFetchJson.tags && memoryFetchJson.tags.length > 0) {
|
||||
const { data: memoriesData, error: memoriesError } = await supabaseClient
|
||||
.rpc('get_memories_by_tags', {
|
||||
tag_names: memoryFetchJson.tags,
|
||||
p_user_id: user.id
|
||||
});
|
||||
|
||||
if (memoriesError) {
|
||||
console.error("Error fetching memories:", memoriesError);
|
||||
} else if (memoriesData) {
|
||||
fetchedMemories = memoriesData;
|
||||
}
|
||||
}
|
||||
|
||||
// Format fetched memories for the extraction prompt
|
||||
const formattedMemories = fetchedMemories.map(mem =>
|
||||
`[ID: ${mem.id}] ${mem.content} (${mem.context})`
|
||||
).join('\n');
|
||||
|
||||
const memoryExtractionSystemPrompt = prompt_xml.querySelector('memory_extractor')?.textContent
|
||||
.replaceAll("{{PERSONALITY_INJECTION}}", mori_personality);
|
||||
|
||||
const memoryExtractionCompletion = await openai.chat.completions.create({
|
||||
model: "gpt-4.1",
|
||||
messages: [
|
||||
{
|
||||
role: "system",
|
||||
content: memoryExtractionSystemPrompt || "",
|
||||
// Static: gets cached
|
||||
},
|
||||
{
|
||||
role: "user",
|
||||
content: requestBody.messages[requestBody.messages.length - 1].content
|
||||
// Only the most recent user message
|
||||
},
|
||||
{
|
||||
role: "assistant",
|
||||
content: `CURRENT_TIME: ${new Date().toISOString()}\n\nAVAILABLE_TAGS:\n${userTags.join(", ")}\n\nEXISTING_MEMORIES:\n${formattedMemories || "No existing memories."}`
|
||||
// Dynamic context injection
|
||||
}
|
||||
],
|
||||
});
|
||||
|
||||
let memoryExtractionJson = JSON.parse(memoryExtractionCompletion.choices[0]?.message?.content || '{"changes": []}');
|
||||
|
||||
// Process the memory changes (ADD/UPDATE/DELETE)
|
||||
const memoryPromises: Promise<any>[] = [];
|
||||
|
||||
for (const change of memoryExtractionJson.changes) {
|
||||
if (change.action === "ADD") {
|
||||
memoryPromises.push((async () => {
|
||||
// First, ensure all tags exist (create if needed)
|
||||
const tagIds: number[] = [];
|
||||
|
||||
for (const tagName of change.tags) {
|
||||
// Check if tag already exists
|
||||
const { data: existingTag } = await supabaseClient
|
||||
.schema("mori")
|
||||
.from("tags")
|
||||
.select("id")
|
||||
.eq("name", tagName)
|
||||
.eq("user_id", user.id)
|
||||
.single();
|
||||
|
||||
if (existingTag) {
|
||||
tagIds.push(existingTag.id);
|
||||
} else {
|
||||
// Create new tag
|
||||
const { data: newTag } = await supabaseClient
|
||||
.schema("mori")
|
||||
.from("tags")
|
||||
.insert({
|
||||
name: tagName,
|
||||
user_id: user.id
|
||||
})
|
||||
.select("id")
|
||||
.single();
|
||||
|
||||
if (newTag) {
|
||||
tagIds.push(newTag.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Insert the memory
|
||||
const { data: newMemory } = await supabaseClient
|
||||
.schema("mori")
|
||||
.from("memories")
|
||||
.insert({
|
||||
content: change.content,
|
||||
context: change.context,
|
||||
user_id: user.id
|
||||
})
|
||||
.select("id")
|
||||
.single();
|
||||
|
||||
// Link tags to memory in junction table
|
||||
if (newMemory) {
|
||||
const junctionInserts = tagIds.map(tagId => ({
|
||||
memory_id: newMemory.id,
|
||||
tag_id: tagId
|
||||
}));
|
||||
|
||||
await supabaseClient
|
||||
.schema("mori")
|
||||
.from("memory_tags")
|
||||
.insert(junctionInserts);
|
||||
}
|
||||
})());
|
||||
|
||||
} else if (change.action === "UPDATE") {
|
||||
memoryPromises.push((async () => {
|
||||
// Update the memory content
|
||||
await supabaseClient
|
||||
.schema("mori")
|
||||
.from("memories")
|
||||
.update({
|
||||
content: change.content,
|
||||
context: change.context
|
||||
})
|
||||
.eq("id", change.memory_id)
|
||||
.eq("user_id", user.id);
|
||||
|
||||
// Delete old tag associations
|
||||
await supabaseClient
|
||||
.schema("mori")
|
||||
.from("memory_tags")
|
||||
.delete()
|
||||
.eq("memory_id", change.memory_id);
|
||||
|
||||
// Re-create tag associations with new tags
|
||||
const tagIds: number[] = [];
|
||||
|
||||
for (const tagName of change.tags) {
|
||||
const { data: existingTag } = await supabaseClient
|
||||
.schema("mori")
|
||||
.from("tags")
|
||||
.select("id")
|
||||
.eq("name", tagName)
|
||||
.eq("user_id", user.id)
|
||||
.single();
|
||||
|
||||
if (existingTag) {
|
||||
tagIds.push(existingTag.id);
|
||||
} else {
|
||||
const { data: newTag } = await supabaseClient
|
||||
.schema("mori")
|
||||
.from("tags")
|
||||
.insert({
|
||||
name: tagName,
|
||||
user_id: user.id
|
||||
})
|
||||
.select("id")
|
||||
.single();
|
||||
|
||||
if (newTag) {
|
||||
tagIds.push(newTag.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const junctionInserts = tagIds.map(tagId => ({
|
||||
memory_id: change.memory_id,
|
||||
tag_id: tagId
|
||||
}));
|
||||
|
||||
await supabaseClient
|
||||
.schema("mori")
|
||||
.from("memory_tags")
|
||||
.insert(junctionInserts);
|
||||
})());
|
||||
|
||||
} else if (change.action === "DELETE") {
|
||||
memoryPromises.push(
|
||||
supabaseClient
|
||||
.schema("mori")
|
||||
.from("memories")
|
||||
.delete()
|
||||
.eq("id", change.memory_id)
|
||||
.eq("user_id", user.id)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for all memory operations to complete
|
||||
await Promise.all(memoryPromises);
|
||||
|
||||
// Return the fetched memories for further use
|
||||
resolve(fetchedMemories);
|
||||
});
|
||||
|
||||
// Wait for both the summary and memory fetch to complete
|
||||
await Promise.all([summarySystemWork, memoryFetchWork]);
|
||||
|
||||
const formattedMemories = (await memoryFetchWork).map((mem: any) =>
|
||||
`[ID: ${mem.id}] ${mem.content} (${mem.context})`
|
||||
).join('\n');
|
||||
|
||||
const summaryJson = await summarySystemWork;
|
||||
|
||||
|
||||
|
||||
/*
|
||||
Formulate a response plan
|
||||
*/
|
||||
@@ -115,6 +368,7 @@ async function handleRequest(req: Resquest, controller : ReadableStreamDefaultCo
|
||||
status_type: "info",
|
||||
message: "Devising response plan...",
|
||||
});
|
||||
|
||||
const responsePlanSystemPrompt = prompt_xml.querySelector('response_planner')?.textContent
|
||||
.replaceAll("{{PERSONALITY_INJECTION}}", mori_personality);
|
||||
const responsePlanCompletion = await openai.chat.completions.create({
|
||||
@@ -129,9 +383,8 @@ async function handleRequest(req: Resquest, controller : ReadableStreamDefaultCo
|
||||
// Recent conversation messages
|
||||
{
|
||||
role: "assistant",
|
||||
content: `CURRENT_TIME: ${new Date().toISOString()}\n\nCONVERSATION_GIST:\n${summaryJson.context || "No existing context."}`
|
||||
// Dynamic context injection
|
||||
},
|
||||
content: `CURRENT_TIME: ${new Date().toISOString()}\n\nCONVERSATION_GIST:\n${summaryJson.context}\n\nRELEVANT_MEMORIES:\n${formattedMemories}`
|
||||
}
|
||||
]
|
||||
});
|
||||
console.log("Response Plan:", responsePlanCompletion.choices[0]?.message?.content);
|
||||
@@ -147,7 +400,7 @@ async function handleRequest(req: Resquest, controller : ReadableStreamDefaultCo
|
||||
enqueueJson({
|
||||
command: "update_status",
|
||||
status_type: "info",
|
||||
message: "",
|
||||
message: "**Mori** is typing...",
|
||||
});
|
||||
const chatSystemPrompt = prompt_xml.querySelector('chat_responder')?.textContent
|
||||
.replaceAll("{{PERSONALITY_INJECTION}}", mori_personality);
|
||||
@@ -162,9 +415,8 @@ async function handleRequest(req: Resquest, controller : ReadableStreamDefaultCo
|
||||
...requestBody.messages,
|
||||
{
|
||||
role: "assistant",
|
||||
content: `CURRENT_TIME: ${new Date().toISOString()}\n\nCONVERSATION_GIST:\n${summaryJson.context || "No existing context."}\n\nRESPONSE_PLAN:\n${responsePlanJson.plan || "No specific plan."}`
|
||||
// Dynamic context injection
|
||||
},
|
||||
content: `CURRENT_TIME: ${new Date().toISOString()}\n\nCONVERSATION_GIST:\n${summaryJson.context || "No existing context."}\n\nRELEVANT_MEMORIES:\n${formattedMemories || "No memories retrieved."}\n\nRESPONSE_PLAN:\n${responsePlanJson.plan || "No specific plan."}`
|
||||
}
|
||||
],
|
||||
stream: true,
|
||||
});
|
||||
@@ -178,6 +430,12 @@ async function handleRequest(req: Resquest, controller : ReadableStreamDefaultCo
|
||||
}
|
||||
}
|
||||
|
||||
enqueueJson({
|
||||
command: "update_status",
|
||||
status_type: "success",
|
||||
message: "",
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
serve(async (req) => {
|
||||
|
||||
Reference in New Issue
Block a user