Files

508 lines
18 KiB
TypeScript

import { serve } from "https://deno.land/std@0.168.0/http/server.ts";
import { createClient } from "https://esm.sh/@supabase/supabase-js@2";
import { load } from "https://deno.land/std@0.224.0/dotenv/mod.ts";
import SupabaseClient from "https://esm.sh/@supabase/supabase-js@2.76.1/dist/module/SupabaseClient.d.ts";
import { User } from "https://esm.sh/@supabase/auth-js@2.76.1/dist/module/lib/types.d.ts";
import { DOMParser } from "https://deno.land/x/deno_dom@v0.1.45/deno-dom-wasm.ts";
import OpenAI from "npm:openai@4";
// Load environment variables
await load({ export: true, envPath: ".env" });
const corsHeaders = {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'authorization, x-client-info, apikey, content-type'
};
// Initialize OpenAI client
const openai = new OpenAI({
apiKey: Deno.env.get("OPENAI_API_KEY") || "",
});
// Load the xml doc - prompts.xml
const isProduction = Deno.env.get('DENO_DEPLOYMENT_ID') !== undefined;
let xmlContent: string;
if (isProduction) {
const response = await fetch('https://git.imbenji.dev/ImBenji/Mori/raw/branch/main/supabase/functions/llm-pipeline/prompts.xml');
xmlContent = await response.text();
} else {
xmlContent = await Deno.readTextFile('./prompts.xml');
}
const prompt_xml = new DOMParser().parseFromString(xmlContent, 'text/html');
const mori_personality = prompt_xml?.querySelector('mori_personality')?.textContent || "";
async function handleRequest(req: Resquest, controller : ReadableStreamDefaultController, user : User, supabaseClient : SupabaseClient) {
function enqueueJson(data: any) {
controller.enqueue(new TextEncoder().encode(`${JSON.stringify(data)}\n\n`));
}
const showExamples = false;
if (showExamples) {
// Example 1: Send current status to the client
let status = {
command: "update_status",
status_type: "info",
message: "Discombobulating the flux capacitor...",
};
enqueueJson(status);
// Simulate some processing delay
await new Promise((resolve) => setTimeout(resolve, 2000));
// Example 2: Send progress update to the client
status = {
command: "update_status",
status_type: "progress",
message: "Halfway through the discombobulation!",
};
enqueueJson(status);
// Simulate some processing delay
await new Promise((resolve) => setTimeout(resolve, 2000));
status = {
command: "update_status",
status_type: "progress",
message: "Lost track of what we're doing...",
};
enqueueJson(status);
// Simulate some processing delay
await new Promise((resolve) => setTimeout(resolve, 2000));
}
const requestBody = await req.json();
enqueueJson({
command: "update_status",
status_type: "info",
message: "Contemplating conversation...",
});
/*
Summarise the conversation
*/
const summarySystemWork = new Promise<any>(async (resolve, reject) => {
const summarySystemPrompt = prompt_xml.querySelector('conversation_summariser')?.textContent
.replaceAll("{{PERSONALITY_INJECTION}}", mori_personality);
const summaryCompletion = await openai.chat.completions.create({
model: "gpt-4.1-mini",
messages: [
{
role: "system",
content: summarySystemPrompt || "",
// Static: gets cached
},
...requestBody.messages,
{
role: "assistant",
content: `CURRENT_TIME: ${new Date().toISOString()}\n\nCONVERSATION_GIST:\n${requestBody.gist || "No existing context."}`,
// Dynamic context injection
},
],
});
let summaryJson = JSON.parse(summaryCompletion.choices[0]?.message?.content || "{}");
enqueueJson({
command: "update_gist",
content: summaryJson.context || "",
});
resolve(summaryJson);
});
/*
Fetch relevant memories from the DB
*/
const memoryFetchWork = new Promise<any>(async (resolve, reject) => {
// Fetch all the existing tags for the user
const { data: tagsData, error: tagsError } = await supabaseClient
.schema("mori")
.from('tags')
.select('id, name')
.eq('user_id', user.id);
let userTags: string[] = [];
if (tagsError) {
console.error("Error fetching tags:", tagsError);
reject(tagsError);
} else if (tagsData) {
userTags = tagsData.map(tag => tag.name);
}
// Now formulate the memory fetch prompt
const memoryFetchSystemPrompt = prompt_xml.querySelector('memory_retriever')?.textContent
.replaceAll("{{PERSONALITY_INJECTION}}", mori_personality);
const memoryFetchCompletion = await openai.chat.completions.create({
model: "gpt-4.1-mini",
messages: [
{
role: "system",
content: memoryFetchSystemPrompt || "",
},
...requestBody.messages,
{
role: "assistant",
content: `CURRENT_TIME: ${new Date().toISOString()}\n\nAVAILABLE_TAGS:\n${userTags.join(", ")}`
}
],
});
let memoryFetchJson = JSON.parse(memoryFetchCompletion.choices[0]?.message?.content || "{}");
// Fetch memories based on the suggested tags
let fetchedMemories: any[] = [];
if (memoryFetchJson.tags && memoryFetchJson.tags.length > 0) {
const { data: memoriesData, error: memoriesError } = await supabaseClient
.rpc('get_memories_by_tags', {
tag_names: memoryFetchJson.tags,
p_user_id: user.id
});
if (memoriesError) {
console.error("Error fetching memories:", memoriesError);
} else if (memoriesData) {
fetchedMemories = memoriesData;
}
}
// Format fetched memories for the extraction prompt
const formattedMemories = fetchedMemories.map(mem =>
`[ID: ${mem.id}] ${mem.content} (${mem.context})`
).join('\n');
const memoryExtractionSystemPrompt = prompt_xml.querySelector('memory_extractor')?.textContent
.replaceAll("{{PERSONALITY_INJECTION}}", mori_personality);
const memoryExtractionCompletion = await openai.chat.completions.create({
model: "gpt-4.1",
messages: [
{
role: "system",
content: memoryExtractionSystemPrompt || "",
// Static: gets cached
},
{
role: "user",
content: requestBody.messages[requestBody.messages.length - 1].content
// Only the most recent user message
},
{
role: "assistant",
content: `CURRENT_TIME: ${new Date().toISOString()}\n\nAVAILABLE_TAGS:\n${userTags.join(", ")}\n\nEXISTING_MEMORIES:\n${formattedMemories || "No existing memories."}`
// Dynamic context injection
}
],
});
let memoryExtractionJson = JSON.parse(memoryExtractionCompletion.choices[0]?.message?.content || '{"changes": []}');
// Process the memory changes (ADD/UPDATE/DELETE)
const memoryPromises: Promise<any>[] = [];
for (const change of memoryExtractionJson.changes) {
if (change.action === "ADD") {
memoryPromises.push((async () => {
// First, ensure all tags exist (create if needed)
const tagIds: number[] = [];
for (const tagName of change.tags) {
// Check if tag already exists
const { data: existingTag } = await supabaseClient
.schema("mori")
.from("tags")
.select("id")
.eq("name", tagName)
.eq("user_id", user.id)
.single();
if (existingTag) {
tagIds.push(existingTag.id);
} else {
// Create new tag
const { data: newTag } = await supabaseClient
.schema("mori")
.from("tags")
.insert({
name: tagName,
user_id: user.id
})
.select("id")
.single();
if (newTag) {
tagIds.push(newTag.id);
}
}
}
// Insert the memory
const { data: newMemory } = await supabaseClient
.schema("mori")
.from("memories")
.insert({
content: change.content,
context: change.context,
user_id: user.id
})
.select("id")
.single();
// Link tags to memory in junction table
if (newMemory) {
const junctionInserts = tagIds.map(tagId => ({
memory_id: newMemory.id,
tag_id: tagId
}));
await supabaseClient
.schema("mori")
.from("memory_tags")
.insert(junctionInserts);
}
})());
} else if (change.action === "UPDATE") {
memoryPromises.push((async () => {
// Update the memory content
await supabaseClient
.schema("mori")
.from("memories")
.update({
content: change.content,
context: change.context
})
.eq("id", change.memory_id)
.eq("user_id", user.id);
// Delete old tag associations
await supabaseClient
.schema("mori")
.from("memory_tags")
.delete()
.eq("memory_id", change.memory_id);
// Re-create tag associations with new tags
const tagIds: number[] = [];
for (const tagName of change.tags) {
const { data: existingTag } = await supabaseClient
.schema("mori")
.from("tags")
.select("id")
.eq("name", tagName)
.eq("user_id", user.id)
.single();
if (existingTag) {
tagIds.push(existingTag.id);
} else {
const { data: newTag } = await supabaseClient
.schema("mori")
.from("tags")
.insert({
name: tagName,
user_id: user.id
})
.select("id")
.single();
if (newTag) {
tagIds.push(newTag.id);
}
}
}
const junctionInserts = tagIds.map(tagId => ({
memory_id: change.memory_id,
tag_id: tagId
}));
await supabaseClient
.schema("mori")
.from("memory_tags")
.insert(junctionInserts);
})());
} else if (change.action === "DELETE") {
memoryPromises.push(
supabaseClient
.schema("mori")
.from("memories")
.delete()
.eq("id", change.memory_id)
.eq("user_id", user.id)
);
}
}
// Wait for all memory operations to complete
await Promise.all(memoryPromises);
// Return the fetched memories for further use
resolve(fetchedMemories);
});
// Wait for both the summary and memory fetch to complete
await Promise.all([summarySystemWork, memoryFetchWork]);
const formattedMemories = (await memoryFetchWork).map((mem: any) =>
`[ID: ${mem.id}] ${mem.content} (${mem.context})`
).join('\n');
const summaryJson = await summarySystemWork;
/*
Formulate a response plan
*/
enqueueJson({
command: "update_status",
status_type: "info",
message: "Devising response plan...",
});
const responsePlanSystemPrompt = prompt_xml.querySelector('response_planner')?.textContent
.replaceAll("{{PERSONALITY_INJECTION}}", mori_personality);
const responsePlanCompletion = await openai.chat.completions.create({
model: "gpt-4.1-mini",
messages: [
{
role: "system",
content: responsePlanSystemPrompt
// Static: gets cached
},
...requestBody.messages,
// Recent conversation messages
{
role: "assistant",
content: `CURRENT_TIME: ${new Date().toISOString()}\n\nCONVERSATION_GIST:\n${summaryJson.context}\n\nRELEVANT_MEMORIES:\n${formattedMemories}`
}
]
});
console.log("Response Plan:", responsePlanCompletion.choices[0]?.message?.content);
let responsePlanJson = JSON.parse(responsePlanCompletion.choices[0]?.message?.content || "{}");
// enqueueJson({
// command: "append_response",
// content: responsePlanCompletion.choices[0]?.message?.content || "",
// });
/*
Generate the final response
*/
enqueueJson({
command: "update_status",
status_type: "info",
message: "**Mori** is typing...",
});
const chatSystemPrompt = prompt_xml.querySelector('chat_responder')?.textContent
.replaceAll("{{PERSONALITY_INJECTION}}", mori_personality);
const chatCompletion = await openai.chat.completions.create({
model: "gpt-4.1-mini",
messages: [
{
role: "system",
content: chatSystemPrompt || "",
// Static: gets cached
},
...requestBody.messages,
{
role: "assistant",
content: `CURRENT_TIME: ${new Date().toISOString()}\n\nCONVERSATION_GIST:\n${summaryJson.context || "No existing context."}\n\nRELEVANT_MEMORIES:\n${formattedMemories || "No memories retrieved."}\n\nRESPONSE_PLAN:\n${responsePlanJson.plan || "No specific plan."}`
}
],
stream: true,
});
for await (const chunk of chatCompletion) {
if (chunk.choices[0]?.delta?.content) {
enqueueJson({
command: "append_response",
content: chunk.choices[0].delta.content,
});
}
}
enqueueJson({
command: "update_status",
status_type: "success",
message: "",
});
}
serve(async (req) => {
// Handle CORS preflight requests
if (req.method === 'OPTIONS') {
return new Response('ok', { headers: corsHeaders });
}
// Get the token from the Authorization header
const authHeader = req.headers.get('Authorization');
const token = authHeader?.replace('Bearer ', '');
// Initialize the Supabase client
const supabaseClient = createClient(
Deno.env.get('SUPABASE_URL') || '',
Deno.env.get('SUPABASE_ANON_KEY') || '',
{
global: {
headers: { Authorization: `Bearer ${token}` },
},
}
);
// Authenticate the user
const { data: { user }, error } = await supabaseClient.auth.getUser(token);
if (error || !user) {
return new Response(
JSON.stringify({ error: 'Unauthorized' }),
{
status: 401,
headers: {
...corsHeaders,
'Content-Type': 'application/json'
}
}
);
}
// User is authenticated, handle the request
const readable = new ReadableStream({
async start(controller) {
// Wrap controller to log on enqueue
const originalEnqueue = controller.enqueue.bind(controller);
controller.enqueue = (chunk) => {
const decoded = new TextDecoder().decode(chunk);
console.log('Stream output:', decoded);
originalEnqueue(chunk);
};
try {
await handleRequest(req, controller, user, supabaseClient);
controller.close();
} catch (error) {
console.error("Error in stream:", error);
controller.error(error);
}
}
});
return new Response(readable, {
headers: {
...corsHeaders,
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
});
});