// @ts-ignore import { serve } from "https://deno.land/std@0.168.0/http/server.ts"; // @ts-ignore import { load } from "https://deno.land/std@0.224.0/dotenv/mod.ts"; // @ts-ignore import { DOMParser } from "https://deno.land/x/deno_dom@v0.1.45/deno-dom-wasm.ts"; import { User } from "https://esm.sh/@supabase/auth-js@2.76.1/dist/module/lib/types.d.ts"; import { createClient } from "https://esm.sh/@supabase/supabase-js@2"; // import OpenAI from "npm:openai@4"; // Load environment variables await load({ export: true, envPath: ".env" }); // Initialize Supabase client let supabaseClient = null; // Load and parse prompts.xml // In production (Supabase Edge), load from Git; locally, load from file system const isProduction = Deno.env.get('DENO_DEPLOYMENT_ID') !== undefined; let xmlContent: string; if (isProduction) { const response = await fetch('https://git.imbenji.dev/ImBenji/Mori/raw/branch/main/supabase/functions/llm-pipeline/prompts.xml'); xmlContent = await response.text(); } else { xmlContent = await Deno.readTextFile('./prompts.xml'); } const doc = new DOMParser().parseFromString(xmlContent, 'text/html'); const corsHeaders = { 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Headers': 'authorization, x-client-info, apikey, content-type' }; /* Stage 2: Process Input (Extract Memories) */ async function extractMemories(controller, messages, doc, user: User, allTags, relevantMemories?) { const startTime = Date.now(); let addedCount = 0; let updatedCount = 0; let deletedCount = 0; const extractedMemories = []; console.log("Using cached tags for user:", allTags?.length || 0); // Create and call OpenAI to process the input messages console.log("Creating OpenAI client for processing input"); const openai = new OpenAI({ apiKey: Deno.env.get('OPENAI_API_KEY') }); const system_prompt = doc?.querySelector('memory_extraction')?.textContent?.trim() || ''; console.log("Calling OpenAI API for processing..."); const response = await openai.chat.completions.create({ model: 'gpt-4.1', temperature: 0.1, max_completion_tokens: 50000, messages: [ { role: 'system', content: system_prompt }, ...messages, { role: "assistant", content: `I have access to the following reference data: Available tags: ${JSON.stringify(allTags?.map(t => t.name) || [])} Existing memories: ${JSON.stringify(relevantMemories || [])} Now I will analyze the conversation above and extract memories. I will extract EVERY SINGLE atomic fact from the user's messages. For detailed reports, I expect to extract 100-200+ separate memories. I will NOT summarize or limit myself. I will break down every detail into individual atomic facts.` } ] }); const processedContent = response.choices[0]?.message?.content || ''; console.log("Processing complete, sending processed content to client"); // Decode the json content let processedData; try { processedData = JSON.parse(processedContent); } catch (error) { console.error("Error parsing processed content:", error); throw new Error("Failed to parse processed content"); } // Iterate over the changes and process them for (const change of processedData.changes || []) { if (change.action === "ADD") { addedCount++; extractedMemories.push({ action: 'ADD', content: change.content, context: change.context, tags: change.tags }); // Fetch all existing tags in a single query using .in() const { data: existingTags } = await supabaseClient .schema("mori") .from("tags") .select("*") .in("name", change.tags) .eq("user_id", user.id); let tags = existingTags || []; // Find tags that need to be created const existingTagNames = new Set(tags.map(t => t.name)); const newTagNames = change.tags.filter(tagName => !existingTagNames.has(tagName)); // Batch insert all new tags in a single query if (newTagNames.length > 0) { const { data: insertedTags } = await supabaseClient .schema("mori") .from("tags") .insert(newTagNames.map(name => ({ name: name, user_id: user.id }))) .select(); if (insertedTags) { tags.push(...insertedTags); } } // Now, insert the memory itself const insertMemory = await supabaseClient .schema("mori") .from("memories") .insert([{ content: change.content, context: change.context, user_id: user.id, }]) .select() .single(); // Batch insert all memory_tags links in a single query if (tags.length > 0 && insertMemory.data) { await supabaseClient .schema("mori") .from("memory_tags") .insert(tags.map(tag => ({ memory_id: insertMemory.data.id, tag_id: tag.id }))); } } else if (change.action === "UPDATE") { updatedCount++; extractedMemories.push({ action: 'UPDATE', content: change.content, context: change.context, memory_id: change.memory_id }); // Update existing memory await supabaseClient .schema("mori") .from("memories") .update({ content: change.content, context: change.context, updated_at: new Date().toISOString() }) .eq("id", change.memory_id) .eq("user_id", user.id); // TODO: Handle tag updates if needed // (delete old memory_tags links and create new ones) } else if (change.action === "DELETE") { deletedCount++; // First, fetch the memory content before deleting const memoryToDelete = relevantMemories?.find(m => m.id === change.memory_id); extractedMemories.push({ action: 'DELETE', memory_id: change.memory_id, content: memoryToDelete?.content || change.content, reason: change.reason }); // Delete memory (cascade should handle memory_tags) await supabaseClient .schema("mori") .from("memories") .delete() .eq("id", change.memory_id) .eq("user_id", user.id); } } const processTime = Date.now() - startTime; return { extractedMemories, addedCount, updatedCount, deletedCount, processTime }; } /* Stage 1: Fetch Relevant Memories and Tags. */ async function fetchRelevantMemories(controller, messages, doc, user: User) { const startTime = Date.now(); const tags = await supabaseClient .schema("mori") .from("tags") .select("*") .eq("user_id", user.id); console.log("Fetched existing tags for user:", tags.data?.length || 0); // Create and call OpenAI to process the input messages console.log("Creating OpenAI client for generating a response"); const openai = new OpenAI({ apiKey: Deno.env.get('OPENAI_API_KEY') }); let system_prompt = doc?.querySelector('memory_query')?.textContent?.trim() || ''; console.log("Calling OpenAI API for fetching relevant memories..."); const response = await openai.chat.completions.create({ model: 'gpt-4.1', messages: [ { role: 'system', content: system_prompt }, ...messages, { role: "user", content: "Existing tags: " + JSON.stringify(tags.data || []) } ], }); const relevantMemoryTags = response.choices[0]?.message?.content || ''; let relevantMemoryTagsParsed; try { relevantMemoryTagsParsed = JSON.parse(relevantMemoryTags); } catch (error) { console.error("Error parsing relevant memories content:", error); throw new Error("Failed to parse relevant memories content"); } const { data: relevantMemories } = await supabaseClient .rpc("get_memories_by_tags", { tag_names: relevantMemoryTagsParsed.selected_tags, p_user_id: user.id }); const fetchTime = Date.now() - startTime; return { relevantMemories, allTags: tags.data, selectedTags: relevantMemoryTagsParsed.selected_tags || [], fetchTime }; } /* Stage 3: Generate Response */ async function generateResponse(controller, messages, doc, user: User, pipelineContext) { console.log("Creating OpenAI client for generating a response"); const openai = new OpenAI({ apiKey: Deno.env.get('OPENAI_API_KEY') }); let system_prompt = doc?.querySelector('system_response')?.textContent?.trim() || ''; system_prompt = system_prompt.replaceAll("{{username}}", user.user_metadata.username || 'User'); console.log("Calling OpenAI API for streaming response..."); const responseMessages = [ { role: 'system', content: system_prompt }, ]; // Build pipeline awareness context const { relevantMemories, selectedTags, extractedMemories, addedCount, updatedCount, deletedCount } = pipelineContext; let pipelineAwareness = `[Internal System Awareness - Not Part of Conversation]\n\n`; pipelineAwareness += `You are Mori, and you have a memory system that automatically remembers important information about ${user.user_metadata.username || 'the user'} across conversations.\n\n`; // Info about retrieved memories if (relevantMemories && relevantMemories.length > 0) { pipelineAwareness += `RETRIEVED MEMORIES (what you already knew):\n`; pipelineAwareness += `You searched through memories using topics: ${selectedTags.join(', ')}\n`; pipelineAwareness += `Found ${relevantMemories.length} relevant memories:\n`; relevantMemories.forEach(m => { pipelineAwareness += `• ${m.content}\n`; }); pipelineAwareness += `\n`; } else { pipelineAwareness += `No previous memories were retrieved for this conversation.\n\n`; } // Info about newly extracted memories if (extractedMemories && extractedMemories.length > 0) { pipelineAwareness += `NEW MEMORIES (what you just learned and saved):\n`; extractedMemories.forEach(mem => { if (mem.action === 'ADD') { pipelineAwareness += `• Learned: ${mem.content}\n`; } else if (mem.action === 'UPDATE') { pipelineAwareness += `• Updated: ${mem.content}\n`; } else if (mem.action === 'DELETE') { pipelineAwareness += `• Forgot: ${mem.content || 'a previous memory'}\n`; } }); pipelineAwareness += `\n`; } pipelineAwareness += `HOW TO USE THIS:\n`; pipelineAwareness += `- This awareness is internal. Don't report it.\n`; pipelineAwareness += `- Let it naturally inform your response\n`; pipelineAwareness += `- If the user explicitly asks you to remember something, you can acknowledge it naturally (e.g., "got it" or "I'll remember that")\n`; pipelineAwareness += `- If the user asks you to forget something and memories were deleted, acknowledge it naturally (e.g., "forgot it" or "done")\n`; pipelineAwareness += `- Reference past memories naturally without saying "I retrieved" or "according to my memory"\n`; pipelineAwareness += `- You're a companion who pays attention, not a system reporting operations\n`; // Inject pipeline awareness as assistant message responseMessages.push({ role: 'assistant', content: pipelineAwareness }); responseMessages.push(...messages); const stream = await openai.chat.completions.create({ model: 'gpt-4.1', messages: responseMessages, stream: true }); console.log("Stream created, starting to read chunks..."); for await (const chunk of stream) { const content = chunk.choices[0]?.delta?.content || ''; if (content) { const data = `data: ${JSON.stringify({ type: 'content', content })}\n\n`; controller.enqueue(new TextEncoder().encode(data)); } } } serve(async (req)=>{ /* Handle CORS preflight requests */ if (req.method === 'OPTIONS') { return new Response('ok', { headers: corsHeaders }); } /* Authenticate with supabase API key */ // Get the token from the Authorization header const authHeader = req.headers.get('Authorization') const token = authHeader?.replace('Bearer ', '') // Initialise the Supabase client supabaseClient = createClient( Deno.env.get('SUPABASE_URL') || '', Deno.env.get('SUPABASE_ANON_KEY') || '', { global: { headers: { Authorization: `Bearer ${token}` }, }, } ); const user = await supabaseClient.auth.getUser(token); if (user.error) { return new Response(JSON.stringify({ error: 'Unauthorized' }), { status: 401, headers: { ...corsHeaders, 'Content-Type': 'application/json' } }); } const username = user.data.user?.user_metadata.username || 'User'; /* Gearing up to process the request */ const body = await req.json(); const { messages } = body; // Create the stream that will be used throughout the pipeline const readable = new ReadableStream({ async start(controller) { try { /* Stage 1: Fetch Relevant Memories */ const stageFetchingData = `data: ${JSON.stringify({ type: 'stage', stage: 'fetching' })}\n\n`; controller.enqueue(new TextEncoder().encode(stageFetchingData)); const { relevantMemories, allTags, selectedTags, fetchTime } = await fetchRelevantMemories(controller, messages, doc, user.data.user); /* Stage 2: Extract Relevant Memories */ const stageProcessingData = `data: ${JSON.stringify({ type: 'stage', stage: 'processing' })}\n\n`; controller.enqueue(new TextEncoder().encode(stageProcessingData)); const { extractedMemories, addedCount, updatedCount, deletedCount, processTime } = await extractMemories(controller, messages, doc, user.data.user, allTags, relevantMemories); /* Stage 3: Stream the response back to the client */ const stageRespondingData = `data: ${JSON.stringify({ type: 'stage', stage: 'responding' })}\n\n`; controller.enqueue(new TextEncoder().encode(stageRespondingData)); // Build complete pipeline context for Mori's awareness const pipelineContext = { relevantMemories, selectedTags, fetchTime, extractedMemories, addedCount, updatedCount, deletedCount, processTime }; await generateResponse(controller, messages, doc, user.data.user, pipelineContext); // Send stage update: complete const completeData = `data: ${JSON.stringify({ type: 'stage', stage: 'complete' })}\n\n`; controller.enqueue(new TextEncoder().encode(completeData)); console.log("Stream completed, closing controller"); controller.close(); } catch (error) { console.error("Error in pipeline:", error); controller.error(error); } } }); return new Response(readable, { headers: { ...corsHeaders, 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive' } }); });