import { serve } from "https://deno.land/std@0.168.0/http/server.ts"; import { createClient } from "https://esm.sh/@supabase/supabase-js@2"; import { load } from "https://deno.land/std@0.224.0/dotenv/mod.ts"; import SupabaseClient from "https://esm.sh/@supabase/supabase-js@2.76.1/dist/module/SupabaseClient.d.ts"; import { User } from "https://esm.sh/@supabase/auth-js@2.76.1/dist/module/lib/types.d.ts"; import { DOMParser } from "https://deno.land/x/deno_dom@v0.1.45/deno-dom-wasm.ts"; import OpenAI from "npm:openai@4"; // Load environment variables await load({ export: true, envPath: ".env" }); const corsHeaders = { 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Headers': 'authorization, x-client-info, apikey, content-type' }; // Initialize OpenAI client const openai = new OpenAI({ apiKey: Deno.env.get("OPENAI_API_KEY") || "", }); // Load the xml doc - prompts.xml const isProduction = Deno.env.get('DENO_DEPLOYMENT_ID') !== undefined; let xmlContent: string; if (isProduction) { const response = await fetch('https://git.imbenji.dev/ImBenji/Mori/raw/branch/main/supabase/functions/llm-pipeline/prompts.xml'); xmlContent = await response.text(); } else { xmlContent = await Deno.readTextFile('./prompts.xml'); } const prompt_xml = new DOMParser().parseFromString(xmlContent, 'text/html'); const mori_personality = prompt_xml?.querySelector('mori_personality')?.textContent || ""; async function handleRequest(req: Resquest, controller : ReadableStreamDefaultController, user : User, supabaseClient : SupabaseClient) { function enqueueJson(data: any) { controller.enqueue(new TextEncoder().encode(`${JSON.stringify(data)}\n\n`)); } const showExamples = false; if (showExamples) { // Example 1: Send current status to the client let status = { command: "update_status", status_type: "info", message: "Discombobulating the flux capacitor...", }; enqueueJson(status); // Simulate some processing delay await new Promise((resolve) => setTimeout(resolve, 2000)); // Example 2: Send progress update to the client status = { command: "update_status", status_type: "progress", message: "Halfway through the discombobulation!", }; enqueueJson(status); // Simulate some processing delay await new Promise((resolve) => setTimeout(resolve, 2000)); status = { command: "update_status", status_type: "progress", message: "Lost track of what we're doing...", }; enqueueJson(status); // Simulate some processing delay await new Promise((resolve) => setTimeout(resolve, 2000)); } const requestBody = await req.json(); enqueueJson({ command: "update_status", status_type: "info", message: "Contemplating conversation...", }); /* Summarise the conversation */ const summarySystemWork = new Promise(async (resolve, reject) => { const summarySystemPrompt = prompt_xml.querySelector('conversation_summariser')?.textContent .replaceAll("{{PERSONALITY_INJECTION}}", mori_personality); const summaryCompletion = await openai.chat.completions.create({ model: "gpt-4.1-mini", messages: [ { role: "system", content: summarySystemPrompt || "", // Static: gets cached }, ...requestBody.messages, { role: "assistant", content: `CURRENT_TIME: ${new Date().toISOString()}\n\nCONVERSATION_GIST:\n${requestBody.gist || "No existing context."}`, // Dynamic context injection }, ], }); let summaryJson = JSON.parse(summaryCompletion.choices[0]?.message?.content || "{}"); enqueueJson({ command: "update_gist", content: summaryJson.context || "", }); resolve(summaryJson); }); /* Fetch relevant memories from the DB */ const memoryFetchWork = new Promise(async (resolve, reject) => { // Fetch all the existing tags for the user const { data: tagsData, error: tagsError } = await supabaseClient .schema("mori") .from('tags') .select('id, name') .eq('user_id', user.id); let userTags: string[] = []; if (tagsError) { console.error("Error fetching tags:", tagsError); reject(tagsError); } else if (tagsData) { userTags = tagsData.map(tag => tag.name); } // Now formulate the memory fetch prompt const memoryFetchSystemPrompt = prompt_xml.querySelector('memory_retriever')?.textContent .replaceAll("{{PERSONALITY_INJECTION}}", mori_personality); const memoryFetchCompletion = await openai.chat.completions.create({ model: "gpt-4.1-mini", messages: [ { role: "system", content: memoryFetchSystemPrompt || "", }, ...requestBody.messages, { role: "assistant", content: `CURRENT_TIME: ${new Date().toISOString()}\n\nAVAILABLE_TAGS:\n${userTags.join(", ")}` } ], }); let memoryFetchJson = JSON.parse(memoryFetchCompletion.choices[0]?.message?.content || "{}"); // Fetch memories based on the suggested tags let fetchedMemories: any[] = []; if (memoryFetchJson.tags && memoryFetchJson.tags.length > 0) { const { data: memoriesData, error: memoriesError } = await supabaseClient .rpc('get_memories_by_tags', { tag_names: memoryFetchJson.tags, p_user_id: user.id }); if (memoriesError) { console.error("Error fetching memories:", memoriesError); } else if (memoriesData) { fetchedMemories = memoriesData; } } // Format fetched memories for the extraction prompt const formattedMemories = fetchedMemories.map(mem => `[ID: ${mem.id}] ${mem.content} (${mem.context})` ).join('\n'); const memoryExtractionSystemPrompt = prompt_xml.querySelector('memory_extractor')?.textContent .replaceAll("{{PERSONALITY_INJECTION}}", mori_personality); const memoryExtractionCompletion = await openai.chat.completions.create({ model: "gpt-4.1", messages: [ { role: "system", content: memoryExtractionSystemPrompt || "", // Static: gets cached }, { role: "user", content: requestBody.messages[requestBody.messages.length - 1].content // Only the most recent user message }, { role: "assistant", content: `CURRENT_TIME: ${new Date().toISOString()}\n\nAVAILABLE_TAGS:\n${userTags.join(", ")}\n\nEXISTING_MEMORIES:\n${formattedMemories || "No existing memories."}` // Dynamic context injection } ], }); let memoryExtractionJson = JSON.parse(memoryExtractionCompletion.choices[0]?.message?.content || '{"changes": []}'); // Process the memory changes (ADD/UPDATE/DELETE) const memoryPromises: Promise[] = []; for (const change of memoryExtractionJson.changes) { if (change.action === "ADD") { memoryPromises.push((async () => { // First, ensure all tags exist (create if needed) const tagIds: number[] = []; for (const tagName of change.tags) { // Check if tag already exists const { data: existingTag } = await supabaseClient .schema("mori") .from("tags") .select("id") .eq("name", tagName) .eq("user_id", user.id) .single(); if (existingTag) { tagIds.push(existingTag.id); } else { // Create new tag const { data: newTag } = await supabaseClient .schema("mori") .from("tags") .insert({ name: tagName, user_id: user.id }) .select("id") .single(); if (newTag) { tagIds.push(newTag.id); } } } // Insert the memory const { data: newMemory } = await supabaseClient .schema("mori") .from("memories") .insert({ content: change.content, context: change.context, user_id: user.id }) .select("id") .single(); // Link tags to memory in junction table if (newMemory) { const junctionInserts = tagIds.map(tagId => ({ memory_id: newMemory.id, tag_id: tagId })); await supabaseClient .schema("mori") .from("memory_tags") .insert(junctionInserts); } })()); } else if (change.action === "UPDATE") { memoryPromises.push((async () => { // Update the memory content await supabaseClient .schema("mori") .from("memories") .update({ content: change.content, context: change.context }) .eq("id", change.memory_id) .eq("user_id", user.id); // Delete old tag associations await supabaseClient .schema("mori") .from("memory_tags") .delete() .eq("memory_id", change.memory_id); // Re-create tag associations with new tags const tagIds: number[] = []; for (const tagName of change.tags) { const { data: existingTag } = await supabaseClient .schema("mori") .from("tags") .select("id") .eq("name", tagName) .eq("user_id", user.id) .single(); if (existingTag) { tagIds.push(existingTag.id); } else { const { data: newTag } = await supabaseClient .schema("mori") .from("tags") .insert({ name: tagName, user_id: user.id }) .select("id") .single(); if (newTag) { tagIds.push(newTag.id); } } } const junctionInserts = tagIds.map(tagId => ({ memory_id: change.memory_id, tag_id: tagId })); await supabaseClient .schema("mori") .from("memory_tags") .insert(junctionInserts); })()); } else if (change.action === "DELETE") { memoryPromises.push( supabaseClient .schema("mori") .from("memories") .delete() .eq("id", change.memory_id) .eq("user_id", user.id) ); } } // Wait for all memory operations to complete await Promise.all(memoryPromises); // Return the fetched memories for further use resolve(fetchedMemories); }); // Wait for both the summary and memory fetch to complete await Promise.all([summarySystemWork, memoryFetchWork]); const formattedMemories = (await memoryFetchWork).map((mem: any) => `[ID: ${mem.id}] ${mem.content} (${mem.context})` ).join('\n'); const summaryJson = await summarySystemWork; /* Formulate a response plan */ enqueueJson({ command: "update_status", status_type: "info", message: "Devising response plan...", }); const responsePlanSystemPrompt = prompt_xml.querySelector('response_planner')?.textContent .replaceAll("{{PERSONALITY_INJECTION}}", mori_personality); const responsePlanCompletion = await openai.chat.completions.create({ model: "gpt-4.1-mini", messages: [ { role: "system", content: responsePlanSystemPrompt // Static: gets cached }, ...requestBody.messages, // Recent conversation messages { role: "assistant", content: `CURRENT_TIME: ${new Date().toISOString()}\n\nCONVERSATION_GIST:\n${summaryJson.context}\n\nRELEVANT_MEMORIES:\n${formattedMemories}` } ] }); console.log("Response Plan:", responsePlanCompletion.choices[0]?.message?.content); let responsePlanJson = JSON.parse(responsePlanCompletion.choices[0]?.message?.content || "{}"); // enqueueJson({ // command: "append_response", // content: responsePlanCompletion.choices[0]?.message?.content || "", // }); /* Generate the final response */ enqueueJson({ command: "update_status", status_type: "info", message: "**Mori** is typing...", }); const chatSystemPrompt = prompt_xml.querySelector('chat_responder')?.textContent .replaceAll("{{PERSONALITY_INJECTION}}", mori_personality); const chatCompletion = await openai.chat.completions.create({ model: "gpt-4.1-mini", messages: [ { role: "system", content: chatSystemPrompt || "", // Static: gets cached }, ...requestBody.messages, { role: "assistant", content: `CURRENT_TIME: ${new Date().toISOString()}\n\nCONVERSATION_GIST:\n${summaryJson.context || "No existing context."}\n\nRELEVANT_MEMORIES:\n${formattedMemories || "No memories retrieved."}\n\nRESPONSE_PLAN:\n${responsePlanJson.plan || "No specific plan."}` } ], stream: true, }); for await (const chunk of chatCompletion) { if (chunk.choices[0]?.delta?.content) { enqueueJson({ command: "append_response", content: chunk.choices[0].delta.content, }); } } enqueueJson({ command: "update_status", status_type: "success", message: "", }); } serve(async (req) => { // Handle CORS preflight requests if (req.method === 'OPTIONS') { return new Response('ok', { headers: corsHeaders }); } // Get the token from the Authorization header const authHeader = req.headers.get('Authorization'); const token = authHeader?.replace('Bearer ', ''); // Initialize the Supabase client const supabaseClient = createClient( Deno.env.get('SUPABASE_URL') || '', Deno.env.get('SUPABASE_ANON_KEY') || '', { global: { headers: { Authorization: `Bearer ${token}` }, }, } ); // Authenticate the user const { data: { user }, error } = await supabaseClient.auth.getUser(token); if (error || !user) { return new Response( JSON.stringify({ error: 'Unauthorized' }), { status: 401, headers: { ...corsHeaders, 'Content-Type': 'application/json' } } ); } // User is authenticated, handle the request const readable = new ReadableStream({ async start(controller) { // Wrap controller to log on enqueue const originalEnqueue = controller.enqueue.bind(controller); controller.enqueue = (chunk) => { const decoded = new TextDecoder().decode(chunk); console.log('Stream output:', decoded); originalEnqueue(chunk); }; try { await handleRequest(req, controller, user, supabaseClient); controller.close(); } catch (error) { console.error("Error in stream:", error); controller.error(error); } } }); return new Response(readable, { headers: { ...corsHeaders, 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive' } }); });