Refactor request handling and integrate OpenAI response generation with enhanced personality context
This commit is contained in:
@@ -1,383 +1,197 @@
|
||||
// @ts-ignore
|
||||
import { serve } from "https://deno.land/std@0.168.0/http/server.ts";
|
||||
// @ts-ignore
|
||||
import { load } from "https://deno.land/std@0.224.0/dotenv/mod.ts";
|
||||
// @ts-ignore
|
||||
import { DOMParser } from "https://deno.land/x/deno_dom@v0.1.45/deno-dom-wasm.ts";
|
||||
import { User } from "https://esm.sh/@supabase/auth-js@2.76.1/dist/module/lib/types.d.ts";
|
||||
|
||||
import { createClient } from "https://esm.sh/@supabase/supabase-js@2";
|
||||
|
||||
//
|
||||
|
||||
import { load } from "https://deno.land/std@0.224.0/dotenv/mod.ts";
|
||||
import SupabaseClient from "https://esm.sh/@supabase/supabase-js@2.76.1/dist/module/SupabaseClient.d.ts";
|
||||
import { User } from "https://esm.sh/@supabase/auth-js@2.76.1/dist/module/lib/types.d.ts";
|
||||
import { DOMParser } from "https://deno.land/x/deno_dom@v0.1.45/deno-dom-wasm.ts";
|
||||
import OpenAI from "npm:openai@4";
|
||||
|
||||
// Load environment variables
|
||||
await load({ export: true, envPath: ".env" });
|
||||
|
||||
// Initialize Supabase client
|
||||
let supabaseClient = null;
|
||||
|
||||
// Load and parse prompts.xml
|
||||
// In production (Supabase Edge), load from Git; locally, load from file system
|
||||
const isProduction = Deno.env.get('DENO_DEPLOYMENT_ID') !== undefined;
|
||||
let xmlContent: string;
|
||||
|
||||
if (isProduction) {
|
||||
const response = await fetch('https://git.imbenji.dev/ImBenji/Mori/raw/branch/main/supabase/functions/llm-pipeline/prompts.xml');
|
||||
xmlContent = await response.text();
|
||||
} else {
|
||||
xmlContent = await Deno.readTextFile('./prompts.xml');
|
||||
}
|
||||
|
||||
const doc = new DOMParser().parseFromString(xmlContent, 'text/html');
|
||||
|
||||
const corsHeaders = {
|
||||
'Access-Control-Allow-Origin': '*',
|
||||
'Access-Control-Allow-Headers': 'authorization, x-client-info, apikey, content-type'
|
||||
};
|
||||
|
||||
/*
|
||||
Stage 2: Process Input (Extract Memories)
|
||||
*/
|
||||
async function extractMemories(controller, messages, doc, user: User, allTags, relevantMemories?) {
|
||||
const startTime = Date.now();
|
||||
let addedCount = 0;
|
||||
let updatedCount = 0;
|
||||
let deletedCount = 0;
|
||||
const extractedMemories = [];
|
||||
// Initialize OpenAI client
|
||||
const openai = new OpenAI({
|
||||
apiKey: Deno.env.get("OPENAI_API_KEY") || "",
|
||||
});
|
||||
|
||||
console.log("Using cached tags for user:", allTags?.length || 0);
|
||||
// Load the xml doc - prompts.xml
|
||||
const isProduction = Deno.env.get('DENO_DEPLOYMENT_ID') !== undefined;
|
||||
let xmlContent: string;
|
||||
if (isProduction) {
|
||||
const response = await fetch('https://git.imbenji.dev/ImBenji/Mori/raw/branch/main/supabase/functions/llm-pipeline/prompts.xml');
|
||||
xmlContent = await response.text();
|
||||
} else {
|
||||
xmlContent = await Deno.readTextFile('./prompts.xml');
|
||||
}
|
||||
const prompt_xml = new DOMParser().parseFromString(xmlContent, 'text/html');
|
||||
|
||||
// Create and call OpenAI to process the input messages
|
||||
console.log("Creating OpenAI client for processing input");
|
||||
const openai = new OpenAI({
|
||||
apiKey: Deno.env.get('OPENAI_API_KEY')
|
||||
const mori_personality = prompt_xml?.querySelector('mori_personality')?.textContent || "";
|
||||
|
||||
async function handleRequest(req: Resquest, controller : ReadableStreamDefaultController, user : User, supabaseClient : SupabaseClient) {
|
||||
|
||||
function enqueueJson(data: any) {
|
||||
controller.enqueue(new TextEncoder().encode(`${JSON.stringify(data)}`));
|
||||
}
|
||||
|
||||
const showExamples = false;
|
||||
if (showExamples) {
|
||||
// Example 1: Send current status to the client
|
||||
let status = {
|
||||
command: "update_status",
|
||||
status_type: "info",
|
||||
message: "Discombobulating the flux capacitor...",
|
||||
};
|
||||
enqueueJson(status);
|
||||
|
||||
// Simulate some processing delay
|
||||
await new Promise((resolve) => setTimeout(resolve, 2000));
|
||||
|
||||
// Example 2: Send progress update to the client
|
||||
status = {
|
||||
command: "update_status",
|
||||
status_type: "progress",
|
||||
message: "Halfway through the discombobulation!",
|
||||
};
|
||||
enqueueJson(status);
|
||||
|
||||
// Simulate some processing delay
|
||||
await new Promise((resolve) => setTimeout(resolve, 2000));
|
||||
|
||||
status = {
|
||||
command: "update_status",
|
||||
status_type: "progress",
|
||||
message: "Lost track of what we're doing...",
|
||||
};
|
||||
enqueueJson(status);
|
||||
|
||||
// Simulate some processing delay
|
||||
await new Promise((resolve) => setTimeout(resolve, 2000));
|
||||
}
|
||||
|
||||
const requestBody = await req.json();
|
||||
|
||||
/*
|
||||
Summarise the conversation
|
||||
*/
|
||||
enqueueJson({
|
||||
command: "update_status",
|
||||
status_type: "info",
|
||||
message: "Contemplating conversation...",
|
||||
});
|
||||
|
||||
const system_prompt = doc?.querySelector('memory_extraction')?.textContent?.trim() || '';
|
||||
|
||||
console.log("Calling OpenAI API for processing...");
|
||||
const response = await openai.chat.completions.create({
|
||||
model: 'gpt-4.1',
|
||||
temperature: 0.1,
|
||||
max_completion_tokens: 32000,
|
||||
const summarySystemPrompt = prompt_xml.querySelector('conversation_summariser')?.textContent
|
||||
.replaceAll("{{PERSONALITY_INJECTION}}", mori_personality);
|
||||
const summaryCompletion = await openai.chat.completions.create({
|
||||
model: "gpt-4.1-mini",
|
||||
messages: [
|
||||
{ role: 'system', content: system_prompt },
|
||||
...messages,
|
||||
{
|
||||
role: "system",
|
||||
content: summarySystemPrompt || "",
|
||||
// Static: gets cached
|
||||
},
|
||||
...requestBody.messages,
|
||||
{
|
||||
role: "assistant",
|
||||
content: `I have access to the following reference data:
|
||||
|
||||
Available tags: ${JSON.stringify(allTags?.map(t => t.name) || [])}
|
||||
|
||||
Existing memories: ${JSON.stringify(relevantMemories || [])}
|
||||
|
||||
Now I will analyze the conversation above and extract memories. I will extract EVERY SINGLE atomic fact from the user's messages. For detailed reports, I expect to extract 100-200+ separate memories. I will NOT summarize or limit myself. I will break down every detail into individual atomic facts.`
|
||||
}
|
||||
]
|
||||
});
|
||||
|
||||
const processedContent = response.choices[0]?.message?.content || '';
|
||||
console.log("Processing complete, sending processed content to client");
|
||||
|
||||
// Decode the json content
|
||||
let processedData;
|
||||
try {
|
||||
processedData = JSON.parse(processedContent);
|
||||
} catch (error) {
|
||||
console.error("Error parsing processed content:", error);
|
||||
throw new Error("Failed to parse processed content");
|
||||
}
|
||||
|
||||
// Iterate over the changes and process them
|
||||
for (const change of processedData.changes || []) {
|
||||
|
||||
if (change.action === "ADD") {
|
||||
addedCount++;
|
||||
extractedMemories.push({
|
||||
action: 'ADD',
|
||||
content: change.content,
|
||||
context: change.context,
|
||||
tags: change.tags
|
||||
});
|
||||
// Fetch all existing tags in a single query using .in()
|
||||
const { data: existingTags } = await supabaseClient
|
||||
.schema("mori")
|
||||
.from("tags")
|
||||
.select("*")
|
||||
.in("name", change.tags)
|
||||
.eq("user_id", user.id);
|
||||
|
||||
let tags = existingTags || [];
|
||||
|
||||
// Find tags that need to be created
|
||||
const existingTagNames = new Set(tags.map(t => t.name));
|
||||
const newTagNames = change.tags.filter(tagName => !existingTagNames.has(tagName));
|
||||
|
||||
// Batch insert all new tags in a single query
|
||||
if (newTagNames.length > 0) {
|
||||
const { data: insertedTags } = await supabaseClient
|
||||
.schema("mori")
|
||||
.from("tags")
|
||||
.insert(newTagNames.map(name => ({
|
||||
name: name,
|
||||
user_id: user.id
|
||||
})))
|
||||
.select();
|
||||
|
||||
if (insertedTags) {
|
||||
tags.push(...insertedTags);
|
||||
}
|
||||
}
|
||||
|
||||
// Now, insert the memory itself
|
||||
const insertMemory = await supabaseClient
|
||||
.schema("mori")
|
||||
.from("memories")
|
||||
.insert([{
|
||||
content: change.content,
|
||||
context: change.context,
|
||||
user_id: user.id,
|
||||
}])
|
||||
.select()
|
||||
.single();
|
||||
|
||||
// Batch insert all memory_tags links in a single query
|
||||
if (tags.length > 0 && insertMemory.data) {
|
||||
await supabaseClient
|
||||
.schema("mori")
|
||||
.from("memory_tags")
|
||||
.insert(tags.map(tag => ({
|
||||
memory_id: insertMemory.data.id,
|
||||
tag_id: tag.id
|
||||
})));
|
||||
}
|
||||
|
||||
} else if (change.action === "UPDATE") {
|
||||
updatedCount++;
|
||||
extractedMemories.push({
|
||||
action: 'UPDATE',
|
||||
content: change.content,
|
||||
context: change.context,
|
||||
memory_id: change.memory_id
|
||||
});
|
||||
// Update existing memory
|
||||
await supabaseClient
|
||||
.schema("mori")
|
||||
.from("memories")
|
||||
.update({
|
||||
content: change.content,
|
||||
context: change.context,
|
||||
updated_at: new Date().toISOString()
|
||||
})
|
||||
.eq("id", change.memory_id)
|
||||
.eq("user_id", user.id);
|
||||
|
||||
// TODO: Handle tag updates if needed
|
||||
// (delete old memory_tags links and create new ones)
|
||||
|
||||
} else if (change.action === "DELETE") {
|
||||
deletedCount++;
|
||||
|
||||
// First, fetch the memory content before deleting
|
||||
const memoryToDelete = relevantMemories?.find(m => m.id === change.memory_id);
|
||||
|
||||
extractedMemories.push({
|
||||
action: 'DELETE',
|
||||
memory_id: change.memory_id,
|
||||
content: memoryToDelete?.content || change.content,
|
||||
reason: change.reason
|
||||
});
|
||||
|
||||
// Delete memory (cascade should handle memory_tags)
|
||||
await supabaseClient
|
||||
.schema("mori")
|
||||
.from("memories")
|
||||
.delete()
|
||||
.eq("id", change.memory_id)
|
||||
.eq("user_id", user.id);
|
||||
}
|
||||
}
|
||||
|
||||
const processTime = Date.now() - startTime;
|
||||
|
||||
return {
|
||||
extractedMemories,
|
||||
addedCount,
|
||||
updatedCount,
|
||||
deletedCount,
|
||||
processTime
|
||||
};
|
||||
}
|
||||
|
||||
/*
|
||||
Stage 1: Fetch Relevant Memories and Tags.
|
||||
*/
|
||||
async function fetchRelevantMemories(controller, messages, doc, user: User) {
|
||||
const startTime = Date.now();
|
||||
|
||||
const tags = await supabaseClient
|
||||
.schema("mori")
|
||||
.from("tags")
|
||||
.select("*")
|
||||
.eq("user_id", user.id);
|
||||
|
||||
console.log("Fetched existing tags for user:", tags.data?.length || 0);
|
||||
|
||||
// Create and call OpenAI to process the input messages
|
||||
console.log("Creating OpenAI client for generating a response");
|
||||
const openai = new OpenAI({
|
||||
apiKey: Deno.env.get('OPENAI_API_KEY')
|
||||
});
|
||||
|
||||
let system_prompt = doc?.querySelector('memory_query')?.textContent?.trim() || '';
|
||||
|
||||
console.log("Calling OpenAI API for fetching relevant memories...");
|
||||
const response = await openai.chat.completions.create({
|
||||
model: 'gpt-4.1',
|
||||
messages: [
|
||||
{ role: 'system', content: system_prompt },
|
||||
...messages,
|
||||
{
|
||||
role: "user",
|
||||
content: "Existing tags: " + JSON.stringify(tags.data || [])
|
||||
}
|
||||
content: `CURRENT_TIME: ${new Date().toISOString()}\n\nCONVERSATION_GIST:\n${requestBody.gist || "No existing context."}`,
|
||||
// Dynamic context injection
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
const relevantMemoryTags = response.choices[0]?.message?.content || '';
|
||||
let relevantMemoryTagsParsed;
|
||||
try {
|
||||
relevantMemoryTagsParsed = JSON.parse(relevantMemoryTags);
|
||||
} catch (error) {
|
||||
console.error("Error parsing relevant memories content:", error);
|
||||
throw new Error("Failed to parse relevant memories content");
|
||||
}
|
||||
|
||||
|
||||
const { data: relevantMemories } = await supabaseClient
|
||||
.rpc("get_memories_by_tags", {
|
||||
tag_names: relevantMemoryTagsParsed.selected_tags,
|
||||
p_user_id: user.id
|
||||
});
|
||||
|
||||
const fetchTime = Date.now() - startTime;
|
||||
|
||||
return {
|
||||
relevantMemories,
|
||||
allTags: tags.data,
|
||||
selectedTags: relevantMemoryTagsParsed.selected_tags || [],
|
||||
fetchTime
|
||||
};
|
||||
}
|
||||
|
||||
/*
|
||||
Stage 3: Generate Response
|
||||
*/
|
||||
async function generateResponse(controller, messages, doc, user: User, pipelineContext) {
|
||||
|
||||
console.log("Creating OpenAI client for generating a response");
|
||||
const openai = new OpenAI({
|
||||
apiKey: Deno.env.get('OPENAI_API_KEY')
|
||||
let summaryJson = JSON.parse(summaryCompletion.choices[0]?.message?.content || "{}");
|
||||
enqueueJson({
|
||||
command: "update_gist",
|
||||
content: summaryJson.context || "",
|
||||
});
|
||||
|
||||
let system_prompt = doc?.querySelector('system_response')?.textContent?.trim() || '';
|
||||
system_prompt = system_prompt.replaceAll("{{username}}", user.user_metadata.username || 'User');
|
||||
/*
|
||||
Formulate a response plan
|
||||
*/
|
||||
enqueueJson({
|
||||
command: "update_status",
|
||||
status_type: "info",
|
||||
message: "Devising response plan...",
|
||||
});
|
||||
const responsePlanSystemPrompt = prompt_xml.querySelector('response_planner')?.textContent
|
||||
.replaceAll("{{PERSONALITY_INJECTION}}", mori_personality);
|
||||
const responsePlanCompletion = await openai.chat.completions.create({
|
||||
model: "gpt-4.1-mini",
|
||||
messages: [
|
||||
{
|
||||
role: "system",
|
||||
content: responsePlanSystemPrompt
|
||||
// Static: gets cached
|
||||
},
|
||||
...requestBody.messages,
|
||||
// Recent conversation messages
|
||||
{
|
||||
role: "assistant",
|
||||
content: `CURRENT_TIME: ${new Date().toISOString()}\n\nCONVERSATION_GIST:\n${summaryJson.context || "No existing context."}`
|
||||
// Dynamic context injection
|
||||
},
|
||||
]
|
||||
});
|
||||
console.log("Response Plan:", responsePlanCompletion.choices[0]?.message?.content);
|
||||
let responsePlanJson = JSON.parse(responsePlanCompletion.choices[0]?.message?.content || "{}");
|
||||
// enqueueJson({
|
||||
// command: "append_response",
|
||||
// content: responsePlanCompletion.choices[0]?.message?.content || "",
|
||||
// });
|
||||
|
||||
console.log("Calling OpenAI API for streaming response...");
|
||||
|
||||
const responseMessages = [
|
||||
{ role: 'system', content: system_prompt },
|
||||
];
|
||||
|
||||
// Build pipeline awareness context
|
||||
const { relevantMemories, selectedTags, extractedMemories, addedCount, updatedCount, deletedCount } = pipelineContext;
|
||||
|
||||
let pipelineAwareness = `[Internal System Awareness - Not Part of Conversation]\n\n`;
|
||||
pipelineAwareness += `You are Mori, and you have a memory system that automatically remembers important information about ${user.user_metadata.username || 'the user'} across conversations.\n\n`;
|
||||
|
||||
// Info about retrieved memories
|
||||
if (relevantMemories && relevantMemories.length > 0) {
|
||||
pipelineAwareness += `RETRIEVED MEMORIES (what you already knew):\n`;
|
||||
pipelineAwareness += `You searched through memories using topics: ${selectedTags.join(', ')}\n`;
|
||||
pipelineAwareness += `Found ${relevantMemories.length} relevant memories:\n`;
|
||||
relevantMemories.forEach(m => {
|
||||
pipelineAwareness += `• ${m.content}\n`;
|
||||
});
|
||||
pipelineAwareness += `\n`;
|
||||
} else {
|
||||
pipelineAwareness += `No previous memories were retrieved for this conversation.\n\n`;
|
||||
}
|
||||
|
||||
// Info about newly extracted memories
|
||||
if (extractedMemories && extractedMemories.length > 0) {
|
||||
pipelineAwareness += `NEW MEMORIES (what you just learned and saved):\n`;
|
||||
extractedMemories.forEach(mem => {
|
||||
if (mem.action === 'ADD') {
|
||||
pipelineAwareness += `• Learned: ${mem.content}\n`;
|
||||
} else if (mem.action === 'UPDATE') {
|
||||
pipelineAwareness += `• Updated: ${mem.content}\n`;
|
||||
} else if (mem.action === 'DELETE') {
|
||||
pipelineAwareness += `• Forgot: ${mem.content || 'a previous memory'}\n`;
|
||||
}
|
||||
});
|
||||
pipelineAwareness += `\n`;
|
||||
}
|
||||
|
||||
pipelineAwareness += `HOW TO USE THIS:\n`;
|
||||
pipelineAwareness += `- This awareness is internal. Don't report it.\n`;
|
||||
pipelineAwareness += `- Let it naturally inform your response\n`;
|
||||
pipelineAwareness += `- If the user explicitly asks you to remember something, you can acknowledge it naturally (e.g., "got it" or "I'll remember that")\n`;
|
||||
pipelineAwareness += `- If the user asks you to forget something and memories were deleted, acknowledge it naturally (e.g., "forgot it" or "done")\n`;
|
||||
pipelineAwareness += `- Reference past memories naturally without saying "I retrieved" or "according to my memory"\n`;
|
||||
pipelineAwareness += `- You're a companion who pays attention, not a system reporting operations\n`;
|
||||
|
||||
// Inject pipeline awareness as assistant message
|
||||
responseMessages.push({
|
||||
role: 'assistant',
|
||||
content: pipelineAwareness
|
||||
/*
|
||||
Generate the final response
|
||||
*/
|
||||
enqueueJson({
|
||||
command: "update_status",
|
||||
status_type: "info",
|
||||
message: "",
|
||||
});
|
||||
const chatSystemPrompt = prompt_xml.querySelector('chat_responder')?.textContent
|
||||
.replaceAll("{{PERSONALITY_INJECTION}}", mori_personality);
|
||||
const chatCompletion = await openai.chat.completions.create({
|
||||
model: "gpt-4.1-mini",
|
||||
messages: [
|
||||
{
|
||||
role: "system",
|
||||
content: chatSystemPrompt || "",
|
||||
// Static: gets cached
|
||||
},
|
||||
...requestBody.messages,
|
||||
{
|
||||
role: "assistant",
|
||||
content: `CURRENT_TIME: ${new Date().toISOString()}\n\nCONVERSATION_GIST:\n${summaryJson.context || "No existing context."}\n\nRESPONSE_PLAN:\n${responsePlanJson.plan || "No specific plan."}`
|
||||
// Dynamic context injection
|
||||
},
|
||||
],
|
||||
stream: true,
|
||||
});
|
||||
|
||||
responseMessages.push(...messages);
|
||||
|
||||
const stream = await openai.chat.completions.create({
|
||||
model: 'gpt-4.1',
|
||||
messages: responseMessages,
|
||||
stream: true
|
||||
});
|
||||
|
||||
console.log("Stream created, starting to read chunks...");
|
||||
for await (const chunk of stream) {
|
||||
const content = chunk.choices[0]?.delta?.content || '';
|
||||
if (content) {
|
||||
const data = `data: ${JSON.stringify({ type: 'content', content })}\n\n`;
|
||||
controller.enqueue(new TextEncoder().encode(data));
|
||||
for await (const chunk of chatCompletion) {
|
||||
if (chunk.choices[0]?.delta?.content) {
|
||||
enqueueJson({
|
||||
command: "append_response",
|
||||
content: chunk.choices[0].delta.content,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
serve(async (req)=>{
|
||||
|
||||
/*
|
||||
Handle CORS preflight requests
|
||||
*/
|
||||
serve(async (req) => {
|
||||
// Handle CORS preflight requests
|
||||
if (req.method === 'OPTIONS') {
|
||||
return new Response('ok', {
|
||||
headers: corsHeaders
|
||||
});
|
||||
return new Response('ok', { headers: corsHeaders });
|
||||
}
|
||||
|
||||
/*
|
||||
Authenticate with supabase API key
|
||||
*/
|
||||
|
||||
// Get the token from the Authorization header
|
||||
const authHeader = req.headers.get('Authorization')
|
||||
const token = authHeader?.replace('Bearer ', '')
|
||||
const authHeader = req.headers.get('Authorization');
|
||||
const token = authHeader?.replace('Bearer ', '');
|
||||
|
||||
// Initialise the Supabase client
|
||||
supabaseClient = createClient(
|
||||
// Initialize the Supabase client
|
||||
const supabaseClient = createClient(
|
||||
Deno.env.get('SUPABASE_URL') || '',
|
||||
Deno.env.get('SUPABASE_ANON_KEY') || '',
|
||||
{
|
||||
@@ -387,78 +201,38 @@ serve(async (req)=>{
|
||||
}
|
||||
);
|
||||
|
||||
const user = await supabaseClient.auth.getUser(token);
|
||||
// Authenticate the user
|
||||
const { data: { user }, error } = await supabaseClient.auth.getUser(token);
|
||||
|
||||
if (user.error) {
|
||||
return new Response(JSON.stringify({
|
||||
error: 'Unauthorized'
|
||||
}), {
|
||||
status: 401,
|
||||
headers: {
|
||||
...corsHeaders,
|
||||
'Content-Type': 'application/json'
|
||||
if (error || !user) {
|
||||
return new Response(
|
||||
JSON.stringify({ error: 'Unauthorized' }),
|
||||
{
|
||||
status: 401,
|
||||
headers: {
|
||||
...corsHeaders,
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
}
|
||||
});
|
||||
);
|
||||
}
|
||||
|
||||
const username = user.data.user?.user_metadata.username || 'User';
|
||||
|
||||
|
||||
/*
|
||||
Gearing up to process the request
|
||||
*/
|
||||
const body = await req.json();
|
||||
const { messages } = body;
|
||||
|
||||
// Create the stream that will be used throughout the pipeline
|
||||
// User is authenticated, handle the request
|
||||
const readable = new ReadableStream({
|
||||
async start(controller) {
|
||||
// Wrap controller to log on enqueue
|
||||
const originalEnqueue = controller.enqueue.bind(controller);
|
||||
controller.enqueue = (chunk) => {
|
||||
const decoded = new TextDecoder().decode(chunk);
|
||||
console.log('Stream output:', decoded);
|
||||
originalEnqueue(chunk);
|
||||
};
|
||||
|
||||
try {
|
||||
|
||||
/*
|
||||
Stage 1: Fetch Relevant Memories
|
||||
*/
|
||||
const stageFetchingData = `data: ${JSON.stringify({ type: 'stage', stage: 'fetching' })}\n\n`;
|
||||
controller.enqueue(new TextEncoder().encode(stageFetchingData));
|
||||
|
||||
const { relevantMemories, allTags, selectedTags, fetchTime } = await fetchRelevantMemories(controller, messages, doc, user.data.user);
|
||||
|
||||
/*
|
||||
Stage 2: Extract Relevant Memories
|
||||
*/
|
||||
const stageProcessingData = `data: ${JSON.stringify({ type: 'stage', stage: 'processing' })}\n\n`;
|
||||
controller.enqueue(new TextEncoder().encode(stageProcessingData));
|
||||
|
||||
const { extractedMemories, addedCount, updatedCount, deletedCount, processTime } = await extractMemories(controller, messages, doc, user.data.user, allTags, relevantMemories);
|
||||
|
||||
/*
|
||||
Stage 3: Stream the response back to the client
|
||||
*/
|
||||
const stageRespondingData = `data: ${JSON.stringify({ type: 'stage', stage: 'responding' })}\n\n`;
|
||||
controller.enqueue(new TextEncoder().encode(stageRespondingData));
|
||||
|
||||
// Build complete pipeline context for Mori's awareness
|
||||
const pipelineContext = {
|
||||
relevantMemories,
|
||||
selectedTags,
|
||||
fetchTime,
|
||||
extractedMemories,
|
||||
addedCount,
|
||||
updatedCount,
|
||||
deletedCount,
|
||||
processTime
|
||||
};
|
||||
|
||||
await generateResponse(controller, messages, doc, user.data.user, pipelineContext);
|
||||
|
||||
// Send stage update: complete
|
||||
const completeData = `data: ${JSON.stringify({ type: 'stage', stage: 'complete' })}\n\n`;
|
||||
controller.enqueue(new TextEncoder().encode(completeData));
|
||||
|
||||
console.log("Stream completed, closing controller");
|
||||
await handleRequest(req, controller, user, supabaseClient);
|
||||
controller.close();
|
||||
} catch (error) {
|
||||
console.error("Error in pipeline:", error);
|
||||
console.error("Error in stream:", error);
|
||||
controller.error(error);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user