// @ts-ignore import { serve } from "https://deno.land/std@0.168.0/http/server.ts"; // @ts-ignore import { load } from "https://deno.land/std@0.224.0/dotenv/mod.ts"; // @ts-ignore import { DOMParser } from "https://deno.land/x/deno_dom@v0.1.45/deno-dom-wasm.ts"; import { User } from "https://esm.sh/@supabase/auth-js@2.76.1/dist/module/lib/types.d.ts"; import { createClient } from "https://esm.sh/@supabase/supabase-js@2"; // import OpenAI from "npm:openai@4"; // Load environment variables await load({ export: true, envPath: ".env" }); // Initialize Supabase client let supabaseClient = null; // Load and parse prompts.xml const xmlContent = await Deno.readTextFile(new URL('./prompts.xml', import.meta.url).pathname); const doc = new DOMParser().parseFromString(xmlContent, 'text/html'); const corsHeaders = { 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Headers': 'authorization, x-client-info, apikey, content-type' }; /* Stage 2: Process Input (Extract Memories) */ async function extractMemories(controller, messages, doc, relevantMemories?) { // Fetch existing memory tags from the database, that belong to the user const user : User = (await supabaseClient.auth.getUser()).data.user; const tags = await supabaseClient .schema("mori") .from("tags") .select("*") .eq("user_id", user.id); console.log("Fetched existing tags for user:", tags.data?.length || 0); // Create and call OpenAI to process the input messages console.log("Creating OpenAI client for processing input"); const openai = new OpenAI({ apiKey: Deno.env.get('OPENAI_API_KEY') }); const system_prompt = doc?.querySelector('memory_extraction')?.textContent?.trim() || ''; console.log("Calling OpenAI API for processing..."); const response = await openai.chat.completions.create({ model: 'gpt-4.1-mini', temperature: 0.1, max_completion_tokens: 20000, messages: [ { role: 'system', content: system_prompt }, ...messages, { role: "assistant", content: `I have access to the following reference data: Available tags: ${JSON.stringify(tags.data?.map(t => t.name) || [])} Existing memories: ${JSON.stringify(relevantMemories || [])} Now I will analyze the conversation above and extract memories.` } ] }); const processedContent = response.choices[0]?.message?.content || ''; console.log("Processing complete, sending processed content to client"); // Decode the json content let processedData; try { processedData = JSON.parse(processedContent); } catch (error) { console.error("Error parsing processed content:", error); throw new Error("Failed to parse processed content"); } // Iterate over the changes and process them for (const change of processedData.changes || []) { if (change.action === "ADD") { // First, fetch the tag rows that already exist let tags = []; for (const tagName of change.tags) { const tagRow = await supabaseClient .schema("mori") .from("tags") .select("*") .eq("name", tagName) .single(); if (tagRow.data) { tags.push(tagRow.data); } } // Insert any tags that do not already exist into the database for (const tagName of change.tags) { // Ensure we don't duplicate tags let tagExists = false; for (const tag of tags) { if (tag.name === tagName) { tagExists = true; break; } } if (tagExists) { continue; } const insertTag = await supabaseClient .schema("mori") .from("tags") .insert([{ name: tagName, user_id: user.id }]) .select() .single(); if (insertTag.data) { tags.push(insertTag.data); } } // Now, insert the memory itself const insertMemory = await supabaseClient .schema("mori") .from("memories") .insert([{ content: change.content, context: change.context, user_id: user.id, }]) .select() .single(); // Now, link the tags to the memory in the memory_tags table for (const tag of tags) { await supabaseClient .schema("mori") .from("memory_tags") .insert([{ memory_id: insertMemory.data.id, tag_id: tag.id }]); } } else if (change.action === "UPDATE") { // Update existing memory await supabaseClient .schema("mori") .from("memories") .update({ content: change.content, context: change.context, updated_at: new Date().toISOString() }) .eq("id", change.memory_id) .eq("user_id", user.id); // TODO: Handle tag updates if needed // (delete old memory_tags links and create new ones) } else if (change.action === "DELETE") { // Delete memory (cascade should handle memory_tags) await supabaseClient .schema("mori") .from("memories") .delete() .eq("id", change.memory_id) .eq("user_id", user.id); } } } /* Stage 1: Fetch Relevant Memories. */ async function fetchRelevantMemories(controller, messages, doc) { // Fetch existing memory tags from the database, that belong to the user const user : User = (await supabaseClient.auth.getUser()).data.user; const tags = await supabaseClient .schema("mori") .from("tags") .select("*") .eq("user_id", user.id); console.log("Fetched existing tags for user:", tags.data?.length || 0); // Create and call OpenAI to process the input messages console.log("Creating OpenAI client for generating a response"); const openai = new OpenAI({ apiKey: Deno.env.get('OPENAI_API_KEY') }); let system_prompt = doc?.querySelector('memory_query')?.textContent?.trim() || ''; console.log("Calling OpenAI API for fetching relevant memories..."); const response = await openai.chat.completions.create({ model: 'gpt-4.1-mini', messages: [ { role: 'system', content: system_prompt }, ...messages, { role: "user", content: "Existing tags: " + JSON.stringify(tags.data || []) } ], }); const relevantMemoryTags = response.choices[0]?.message?.content || ''; let relevantMemoryTagsParsed; try { relevantMemoryTagsParsed = JSON.parse(relevantMemoryTags); } catch (error) { console.error("Error parsing relevant memories content:", error); throw new Error("Failed to parse relevant memories content"); } const { data: relevantMemories } = await supabaseClient .rpc("get_memories_by_tags", { tag_names: relevantMemoryTagsParsed.selected_tags, p_user_id: user.id }); return relevantMemories; } /* Stage 3: Generate Response */ async function generateResponse(controller, messages, doc, relevantMemories) { // Fetch existing memory tags from the database, that belong to the user const user : User = (await supabaseClient.auth.getUser()).data.user; console.log("Creating OpenAI client for generating a response"); const openai = new OpenAI({ apiKey: Deno.env.get('OPENAI_API_KEY') }); let system_prompt = doc?.querySelector('system_response')?.textContent?.trim() || ''; system_prompt = system_prompt.replaceAll("{{username}}", user.user_metadata.username || 'User'); console.log("Calling OpenAI API for streaming response..."); const responseMessages = [ { role: 'system', content: system_prompt }, ]; // Add relevant memories as context if available if (relevantMemories && relevantMemories.length > 0) { responseMessages.push({ role: 'assistant', content: `Context from previous conversations:\n${relevantMemories.map(m => `- ${m.content}`).join('\n')}\n\nI'll use this context naturally in our conversation.` }); } responseMessages.push(...messages); const stream = await openai.chat.completions.create({ model: 'gpt-4.1-mini', messages: responseMessages, stream: true }); console.log("Stream created, starting to read chunks..."); for await (const chunk of stream) { const content = chunk.choices[0]?.delta?.content || ''; if (content) { const data = `data: ${JSON.stringify({ type: 'content', content })}\n\n`; controller.enqueue(new TextEncoder().encode(data)); } } } serve(async (req)=>{ /* Handle CORS preflight requests */ if (req.method === 'OPTIONS') { return new Response('ok', { headers: corsHeaders }); } /* Authenticate with supabase API key */ // Get the token from the Authorization header const authHeader = req.headers.get('Authorization') const token = authHeader?.replace('Bearer ', '') // Initialise the Supabase client supabaseClient = createClient( Deno.env.get('SUPABASE_URL') || '', Deno.env.get('SUPABASE_ANON_KEY') || '', { global: { headers: { Authorization: `Bearer ${token}` }, }, } ); const user = await supabaseClient.auth.getUser(token); if (user.error) { return new Response(JSON.stringify({ error: 'Unauthorized' }), { status: 401, headers: { ...corsHeaders, 'Content-Type': 'application/json' } }); } const username = user.data.user?.user_metadata.username || 'User'; /* Gearing up to process the request */ const body = await req.json(); const { messages } = body; // Create the stream that will be used throughout the pipeline const readable = new ReadableStream({ async start(controller) { try { /* Stage 1: Fetch Relevant Memories */ const stageFetchingData = `data: ${JSON.stringify({ type: 'stage', stage: 'fetching' })}\n\n`; controller.enqueue(new TextEncoder().encode(stageFetchingData)); const relevantMemories = await fetchRelevantMemories(controller, messages, doc); /* Stage 2: Extract Relevant Memories */ const stageProcessingData = `data: ${JSON.stringify({ type: 'stage', stage: 'processing' })}\n\n`; controller.enqueue(new TextEncoder().encode(stageProcessingData)); await extractMemories(controller, messages, doc, relevantMemories); /* Stage 3: Stream the response back to the client */ const stageRespondingData = `data: ${JSON.stringify({ type: 'stage', stage: 'responding' })}\n\n`; controller.enqueue(new TextEncoder().encode(stageRespondingData)); await generateResponse(controller, messages, doc, relevantMemories); // Send stage update: complete const completeData = `data: ${JSON.stringify({ type: 'stage', stage: 'complete' })}\n\n`; controller.enqueue(new TextEncoder().encode(completeData)); console.log("Stream completed, closing controller"); controller.close(); } catch (error) { console.error("Error in pipeline:", error); controller.error(error); } } }); return new Response(readable, { headers: { ...corsHeaders, 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive' } }); });