Files
Mori/supabase/functions/llm-pipeline/index.ts

456 lines
15 KiB
TypeScript

// @ts-ignore
import { serve } from "https://deno.land/std@0.168.0/http/server.ts";
// @ts-ignore
import { load } from "https://deno.land/std@0.224.0/dotenv/mod.ts";
// @ts-ignore
import { DOMParser } from "https://deno.land/x/deno_dom@v0.1.45/deno-dom-wasm.ts";
import { User } from "https://esm.sh/@supabase/auth-js@2.76.1/dist/module/lib/types.d.ts";
import { createClient } from "https://esm.sh/@supabase/supabase-js@2";
//
import OpenAI from "npm:openai@4";
// Load environment variables
await load({ export: true, envPath: ".env" });
// Initialize Supabase client
let supabaseClient = null;
// Load and parse prompts.xml
const xmlContent = await Deno.readTextFile(new URL('./prompts.xml', import.meta.url).pathname);
const doc = new DOMParser().parseFromString(xmlContent, 'text/html');
const corsHeaders = {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'authorization, x-client-info, apikey, content-type'
};
/*
Stage 2: Process Input (Extract Memories)
*/
async function extractMemories(controller, messages, doc, user: User, allTags, relevantMemories?) {
const startTime = Date.now();
let addedCount = 0;
let updatedCount = 0;
let deletedCount = 0;
const extractedMemories = [];
console.log("Using cached tags for user:", allTags?.length || 0);
// Create and call OpenAI to process the input messages
console.log("Creating OpenAI client for processing input");
const openai = new OpenAI({
apiKey: Deno.env.get('OPENAI_API_KEY')
});
const system_prompt = doc?.querySelector('memory_extraction')?.textContent?.trim() || '';
console.log("Calling OpenAI API for processing...");
const response = await openai.chat.completions.create({
model: 'gpt-4.1',
temperature: 0.1,
max_completion_tokens: 20000,
messages: [
{ role: 'system', content: system_prompt },
...messages,
{
role: "assistant",
content: `I have access to the following reference data:
Available tags: ${JSON.stringify(allTags?.map(t => t.name) || [])}
Existing memories: ${JSON.stringify(relevantMemories || [])}
Now I will analyze the conversation above and extract memories.`
}
]
});
const processedContent = response.choices[0]?.message?.content || '';
console.log("Processing complete, sending processed content to client");
// Decode the json content
let processedData;
try {
processedData = JSON.parse(processedContent);
} catch (error) {
console.error("Error parsing processed content:", error);
throw new Error("Failed to parse processed content");
}
// Iterate over the changes and process them
for (const change of processedData.changes || []) {
if (change.action === "ADD") {
addedCount++;
extractedMemories.push({
action: 'ADD',
content: change.content,
context: change.context,
tags: change.tags
});
// Fetch all existing tags in a single query using .in()
const { data: existingTags } = await supabaseClient
.schema("mori")
.from("tags")
.select("*")
.in("name", change.tags)
.eq("user_id", user.id);
let tags = existingTags || [];
// Find tags that need to be created
const existingTagNames = new Set(tags.map(t => t.name));
const newTagNames = change.tags.filter(tagName => !existingTagNames.has(tagName));
// Batch insert all new tags in a single query
if (newTagNames.length > 0) {
const { data: insertedTags } = await supabaseClient
.schema("mori")
.from("tags")
.insert(newTagNames.map(name => ({
name: name,
user_id: user.id
})))
.select();
if (insertedTags) {
tags.push(...insertedTags);
}
}
// Now, insert the memory itself
const insertMemory = await supabaseClient
.schema("mori")
.from("memories")
.insert([{
content: change.content,
context: change.context,
user_id: user.id,
}])
.select()
.single();
// Batch insert all memory_tags links in a single query
if (tags.length > 0 && insertMemory.data) {
await supabaseClient
.schema("mori")
.from("memory_tags")
.insert(tags.map(tag => ({
memory_id: insertMemory.data.id,
tag_id: tag.id
})));
}
} else if (change.action === "UPDATE") {
updatedCount++;
extractedMemories.push({
action: 'UPDATE',
content: change.content,
context: change.context,
memory_id: change.memory_id
});
// Update existing memory
await supabaseClient
.schema("mori")
.from("memories")
.update({
content: change.content,
context: change.context,
updated_at: new Date().toISOString()
})
.eq("id", change.memory_id)
.eq("user_id", user.id);
// TODO: Handle tag updates if needed
// (delete old memory_tags links and create new ones)
} else if (change.action === "DELETE") {
deletedCount++;
extractedMemories.push({
action: 'DELETE',
memory_id: change.memory_id
});
// Delete memory (cascade should handle memory_tags)
await supabaseClient
.schema("mori")
.from("memories")
.delete()
.eq("id", change.memory_id)
.eq("user_id", user.id);
}
}
const processTime = Date.now() - startTime;
return {
extractedMemories,
addedCount,
updatedCount,
deletedCount,
processTime
};
}
/*
Stage 1: Fetch Relevant Memories and Tags.
*/
async function fetchRelevantMemories(controller, messages, doc, user: User) {
const startTime = Date.now();
const tags = await supabaseClient
.schema("mori")
.from("tags")
.select("*")
.eq("user_id", user.id);
console.log("Fetched existing tags for user:", tags.data?.length || 0);
// Create and call OpenAI to process the input messages
console.log("Creating OpenAI client for generating a response");
const openai = new OpenAI({
apiKey: Deno.env.get('OPENAI_API_KEY')
});
let system_prompt = doc?.querySelector('memory_query')?.textContent?.trim() || '';
console.log("Calling OpenAI API for fetching relevant memories...");
const response = await openai.chat.completions.create({
model: 'gpt-4.1',
messages: [
{ role: 'system', content: system_prompt },
...messages,
{
role: "user",
content: "Existing tags: " + JSON.stringify(tags.data || [])
}
],
});
const relevantMemoryTags = response.choices[0]?.message?.content || '';
let relevantMemoryTagsParsed;
try {
relevantMemoryTagsParsed = JSON.parse(relevantMemoryTags);
} catch (error) {
console.error("Error parsing relevant memories content:", error);
throw new Error("Failed to parse relevant memories content");
}
const { data: relevantMemories } = await supabaseClient
.rpc("get_memories_by_tags", {
tag_names: relevantMemoryTagsParsed.selected_tags,
p_user_id: user.id
});
const fetchTime = Date.now() - startTime;
return {
relevantMemories,
allTags: tags.data,
selectedTags: relevantMemoryTagsParsed.selected_tags || [],
fetchTime
};
}
/*
Stage 3: Generate Response
*/
async function generateResponse(controller, messages, doc, user: User, pipelineContext) {
console.log("Creating OpenAI client for generating a response");
const openai = new OpenAI({
apiKey: Deno.env.get('OPENAI_API_KEY')
});
let system_prompt = doc?.querySelector('system_response')?.textContent?.trim() || '';
system_prompt = system_prompt.replaceAll("{{username}}", user.user_metadata.username || 'User');
console.log("Calling OpenAI API for streaming response...");
const responseMessages = [
{ role: 'system', content: system_prompt },
];
// Build pipeline awareness context
const { relevantMemories, selectedTags, extractedMemories, addedCount, updatedCount, deletedCount } = pipelineContext;
let pipelineAwareness = `[Internal System Awareness - Not Part of Conversation]\n\n`;
pipelineAwareness += `You are Mori, and you have a memory system that automatically remembers important information about ${user.user_metadata.username || 'the user'} across conversations.\n\n`;
// Info about retrieved memories
if (relevantMemories && relevantMemories.length > 0) {
pipelineAwareness += `RETRIEVED MEMORIES (what you already knew):\n`;
pipelineAwareness += `You searched through memories using topics: ${selectedTags.join(', ')}\n`;
pipelineAwareness += `Found ${relevantMemories.length} relevant memories:\n`;
relevantMemories.forEach(m => {
pipelineAwareness += `${m.content}\n`;
});
pipelineAwareness += `\n`;
} else {
pipelineAwareness += `No previous memories were retrieved for this conversation.\n\n`;
}
// Info about newly extracted memories
if (extractedMemories && extractedMemories.length > 0) {
pipelineAwareness += `NEW MEMORIES (what you just learned and saved):\n`;
extractedMemories.forEach(mem => {
if (mem.action === 'ADD') {
pipelineAwareness += `• Learned: ${mem.content}\n`;
} else if (mem.action === 'UPDATE') {
pipelineAwareness += `• Updated: ${mem.content}\n`;
}
});
pipelineAwareness += `\n`;
}
pipelineAwareness += `HOW TO USE THIS:\n`;
pipelineAwareness += `- This awareness is internal. Don't report it.\n`;
pipelineAwareness += `- Let it naturally inform your response\n`;
pipelineAwareness += `- If the user explicitly asks you to remember something, you can acknowledge it naturally (e.g., "got it" or "I'll remember that")\n`;
pipelineAwareness += `- Reference past memories naturally without saying "I retrieved" or "according to my memory"\n`;
pipelineAwareness += `- You're a companion who pays attention, not a system reporting operations\n`;
// Inject pipeline awareness as assistant message
responseMessages.push({
role: 'assistant',
content: pipelineAwareness
});
responseMessages.push(...messages);
const stream = await openai.chat.completions.create({
model: 'gpt-4.1-mini',
messages: responseMessages,
stream: true
});
console.log("Stream created, starting to read chunks...");
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || '';
if (content) {
const data = `data: ${JSON.stringify({ type: 'content', content })}\n\n`;
controller.enqueue(new TextEncoder().encode(data));
}
}
}
serve(async (req)=>{
/*
Handle CORS preflight requests
*/
if (req.method === 'OPTIONS') {
return new Response('ok', {
headers: corsHeaders
});
}
/*
Authenticate with supabase API key
*/
// Get the token from the Authorization header
const authHeader = req.headers.get('Authorization')
const token = authHeader?.replace('Bearer ', '')
// Initialise the Supabase client
supabaseClient = createClient(
Deno.env.get('SUPABASE_URL') || '',
Deno.env.get('SUPABASE_ANON_KEY') || '',
{
global: {
headers: { Authorization: `Bearer ${token}` },
},
}
);
const user = await supabaseClient.auth.getUser(token);
if (user.error) {
return new Response(JSON.stringify({
error: 'Unauthorized'
}), {
status: 401,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
});
}
const username = user.data.user?.user_metadata.username || 'User';
/*
Gearing up to process the request
*/
const body = await req.json();
const { messages } = body;
// Create the stream that will be used throughout the pipeline
const readable = new ReadableStream({
async start(controller) {
try {
/*
Stage 1: Fetch Relevant Memories
*/
const stageFetchingData = `data: ${JSON.stringify({ type: 'stage', stage: 'fetching' })}\n\n`;
controller.enqueue(new TextEncoder().encode(stageFetchingData));
const { relevantMemories, allTags, selectedTags, fetchTime } = await fetchRelevantMemories(controller, messages, doc, user.data.user);
/*
Stage 2: Extract Relevant Memories
*/
const stageProcessingData = `data: ${JSON.stringify({ type: 'stage', stage: 'processing' })}\n\n`;
controller.enqueue(new TextEncoder().encode(stageProcessingData));
const { extractedMemories, addedCount, updatedCount, deletedCount, processTime } = await extractMemories(controller, messages, doc, user.data.user, allTags, relevantMemories);
/*
Stage 3: Stream the response back to the client
*/
const stageRespondingData = `data: ${JSON.stringify({ type: 'stage', stage: 'responding' })}\n\n`;
controller.enqueue(new TextEncoder().encode(stageRespondingData));
// Build complete pipeline context for Mori's awareness
const pipelineContext = {
relevantMemories,
selectedTags,
fetchTime,
extractedMemories,
addedCount,
updatedCount,
deletedCount,
processTime
};
await generateResponse(controller, messages, doc, user.data.user, pipelineContext);
// Send stage update: complete
const completeData = `data: ${JSON.stringify({ type: 'stage', stage: 'complete' })}\n\n`;
controller.enqueue(new TextEncoder().encode(completeData));
console.log("Stream completed, closing controller");
controller.close();
} catch (error) {
console.error("Error in pipeline:", error);
controller.error(error);
}
}
});
return new Response(readable, {
headers: {
...corsHeaders,
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
});
});