508 lines
18 KiB
TypeScript
508 lines
18 KiB
TypeScript
import { serve } from "https://deno.land/std@0.168.0/http/server.ts";
|
|
import { createClient } from "https://esm.sh/@supabase/supabase-js@2";
|
|
import { load } from "https://deno.land/std@0.224.0/dotenv/mod.ts";
|
|
import SupabaseClient from "https://esm.sh/@supabase/supabase-js@2.76.1/dist/module/SupabaseClient.d.ts";
|
|
import { User } from "https://esm.sh/@supabase/auth-js@2.76.1/dist/module/lib/types.d.ts";
|
|
import { DOMParser } from "https://deno.land/x/deno_dom@v0.1.45/deno-dom-wasm.ts";
|
|
import OpenAI from "npm:openai@4";
|
|
|
|
// Load environment variables
|
|
await load({ export: true, envPath: ".env" });
|
|
|
|
const corsHeaders = {
|
|
'Access-Control-Allow-Origin': '*',
|
|
'Access-Control-Allow-Headers': 'authorization, x-client-info, apikey, content-type'
|
|
};
|
|
|
|
// Initialize OpenAI client
|
|
const openai = new OpenAI({
|
|
apiKey: Deno.env.get("OPENAI_API_KEY") || "",
|
|
});
|
|
|
|
// Load the xml doc - prompts.xml
|
|
const isProduction = Deno.env.get('DENO_DEPLOYMENT_ID') !== undefined;
|
|
let xmlContent: string;
|
|
if (isProduction) {
|
|
const response = await fetch('https://git.imbenji.dev/ImBenji/Mori/raw/branch/main/supabase/functions/llm-pipeline/prompts.xml');
|
|
xmlContent = await response.text();
|
|
} else {
|
|
xmlContent = await Deno.readTextFile('./prompts.xml');
|
|
}
|
|
const prompt_xml = new DOMParser().parseFromString(xmlContent, 'text/html');
|
|
|
|
const mori_personality = prompt_xml?.querySelector('mori_personality')?.textContent || "";
|
|
|
|
async function handleRequest(req: Resquest, controller : ReadableStreamDefaultController, user : User, supabaseClient : SupabaseClient) {
|
|
|
|
function enqueueJson(data: any) {
|
|
controller.enqueue(new TextEncoder().encode(`${JSON.stringify(data)}\n\n`));
|
|
}
|
|
|
|
const showExamples = false;
|
|
if (showExamples) {
|
|
// Example 1: Send current status to the client
|
|
let status = {
|
|
command: "update_status",
|
|
status_type: "info",
|
|
message: "Discombobulating the flux capacitor...",
|
|
};
|
|
enqueueJson(status);
|
|
|
|
// Simulate some processing delay
|
|
await new Promise((resolve) => setTimeout(resolve, 2000));
|
|
|
|
// Example 2: Send progress update to the client
|
|
status = {
|
|
command: "update_status",
|
|
status_type: "progress",
|
|
message: "Halfway through the discombobulation!",
|
|
};
|
|
enqueueJson(status);
|
|
|
|
// Simulate some processing delay
|
|
await new Promise((resolve) => setTimeout(resolve, 2000));
|
|
|
|
status = {
|
|
command: "update_status",
|
|
status_type: "progress",
|
|
message: "Lost track of what we're doing...",
|
|
};
|
|
enqueueJson(status);
|
|
|
|
// Simulate some processing delay
|
|
await new Promise((resolve) => setTimeout(resolve, 2000));
|
|
}
|
|
|
|
const requestBody = await req.json();
|
|
|
|
enqueueJson({
|
|
command: "update_status",
|
|
status_type: "info",
|
|
message: "Contemplating conversation...",
|
|
});
|
|
|
|
/*
|
|
Summarise the conversation
|
|
*/
|
|
|
|
const summarySystemWork = new Promise<any>(async (resolve, reject) => {
|
|
|
|
const summarySystemPrompt = prompt_xml.querySelector('conversation_summariser')?.textContent
|
|
.replaceAll("{{PERSONALITY_INJECTION}}", mori_personality);
|
|
const summaryCompletion = await openai.chat.completions.create({
|
|
model: "gpt-4.1-mini",
|
|
messages: [
|
|
{
|
|
role: "system",
|
|
content: summarySystemPrompt || "",
|
|
// Static: gets cached
|
|
},
|
|
...requestBody.messages,
|
|
{
|
|
role: "assistant",
|
|
content: `CURRENT_TIME: ${new Date().toISOString()}\n\nCONVERSATION_GIST:\n${requestBody.gist || "No existing context."}`,
|
|
// Dynamic context injection
|
|
},
|
|
],
|
|
});
|
|
let summaryJson = JSON.parse(summaryCompletion.choices[0]?.message?.content || "{}");
|
|
enqueueJson({
|
|
command: "update_gist",
|
|
content: summaryJson.context || "",
|
|
});
|
|
resolve(summaryJson);
|
|
|
|
});
|
|
|
|
/*
|
|
Fetch relevant memories from the DB
|
|
*/
|
|
|
|
const memoryFetchWork = new Promise<any>(async (resolve, reject) => {
|
|
|
|
// Fetch all the existing tags for the user
|
|
const { data: tagsData, error: tagsError } = await supabaseClient
|
|
.schema("mori")
|
|
.from('tags')
|
|
.select('id, name')
|
|
.eq('user_id', user.id);
|
|
|
|
let userTags: string[] = [];
|
|
|
|
if (tagsError) {
|
|
console.error("Error fetching tags:", tagsError);
|
|
reject(tagsError);
|
|
} else if (tagsData) {
|
|
userTags = tagsData.map(tag => tag.name);
|
|
}
|
|
|
|
// Now formulate the memory fetch prompt
|
|
const memoryFetchSystemPrompt = prompt_xml.querySelector('memory_retriever')?.textContent
|
|
.replaceAll("{{PERSONALITY_INJECTION}}", mori_personality);
|
|
const memoryFetchCompletion = await openai.chat.completions.create({
|
|
model: "gpt-4.1-mini",
|
|
messages: [
|
|
{
|
|
role: "system",
|
|
content: memoryFetchSystemPrompt || "",
|
|
},
|
|
...requestBody.messages,
|
|
{
|
|
role: "assistant",
|
|
content: `CURRENT_TIME: ${new Date().toISOString()}\n\nAVAILABLE_TAGS:\n${userTags.join(", ")}`
|
|
}
|
|
],
|
|
});
|
|
let memoryFetchJson = JSON.parse(memoryFetchCompletion.choices[0]?.message?.content || "{}");
|
|
|
|
// Fetch memories based on the suggested tags
|
|
let fetchedMemories: any[] = [];
|
|
|
|
if (memoryFetchJson.tags && memoryFetchJson.tags.length > 0) {
|
|
const { data: memoriesData, error: memoriesError } = await supabaseClient
|
|
.rpc('get_memories_by_tags', {
|
|
tag_names: memoryFetchJson.tags,
|
|
p_user_id: user.id
|
|
});
|
|
|
|
if (memoriesError) {
|
|
console.error("Error fetching memories:", memoriesError);
|
|
} else if (memoriesData) {
|
|
fetchedMemories = memoriesData;
|
|
}
|
|
}
|
|
|
|
// Format fetched memories for the extraction prompt
|
|
const formattedMemories = fetchedMemories.map(mem =>
|
|
`[ID: ${mem.id}] ${mem.content} (${mem.context})`
|
|
).join('\n');
|
|
|
|
const memoryExtractionSystemPrompt = prompt_xml.querySelector('memory_extractor')?.textContent
|
|
.replaceAll("{{PERSONALITY_INJECTION}}", mori_personality);
|
|
|
|
const memoryExtractionCompletion = await openai.chat.completions.create({
|
|
model: "gpt-4.1",
|
|
messages: [
|
|
{
|
|
role: "system",
|
|
content: memoryExtractionSystemPrompt || "",
|
|
// Static: gets cached
|
|
},
|
|
{
|
|
role: "user",
|
|
content: requestBody.messages[requestBody.messages.length - 1].content
|
|
// Only the most recent user message
|
|
},
|
|
{
|
|
role: "assistant",
|
|
content: `CURRENT_TIME: ${new Date().toISOString()}\n\nAVAILABLE_TAGS:\n${userTags.join(", ")}\n\nEXISTING_MEMORIES:\n${formattedMemories || "No existing memories."}`
|
|
// Dynamic context injection
|
|
}
|
|
],
|
|
});
|
|
|
|
let memoryExtractionJson = JSON.parse(memoryExtractionCompletion.choices[0]?.message?.content || '{"changes": []}');
|
|
|
|
// Process the memory changes (ADD/UPDATE/DELETE)
|
|
const memoryPromises: Promise<any>[] = [];
|
|
|
|
for (const change of memoryExtractionJson.changes) {
|
|
if (change.action === "ADD") {
|
|
memoryPromises.push((async () => {
|
|
// First, ensure all tags exist (create if needed)
|
|
const tagIds: number[] = [];
|
|
|
|
for (const tagName of change.tags) {
|
|
// Check if tag already exists
|
|
const { data: existingTag } = await supabaseClient
|
|
.schema("mori")
|
|
.from("tags")
|
|
.select("id")
|
|
.eq("name", tagName)
|
|
.eq("user_id", user.id)
|
|
.single();
|
|
|
|
if (existingTag) {
|
|
tagIds.push(existingTag.id);
|
|
} else {
|
|
// Create new tag
|
|
const { data: newTag } = await supabaseClient
|
|
.schema("mori")
|
|
.from("tags")
|
|
.insert({
|
|
name: tagName,
|
|
user_id: user.id
|
|
})
|
|
.select("id")
|
|
.single();
|
|
|
|
if (newTag) {
|
|
tagIds.push(newTag.id);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Insert the memory
|
|
const { data: newMemory } = await supabaseClient
|
|
.schema("mori")
|
|
.from("memories")
|
|
.insert({
|
|
content: change.content,
|
|
context: change.context,
|
|
user_id: user.id
|
|
})
|
|
.select("id")
|
|
.single();
|
|
|
|
// Link tags to memory in junction table
|
|
if (newMemory) {
|
|
const junctionInserts = tagIds.map(tagId => ({
|
|
memory_id: newMemory.id,
|
|
tag_id: tagId
|
|
}));
|
|
|
|
await supabaseClient
|
|
.schema("mori")
|
|
.from("memory_tags")
|
|
.insert(junctionInserts);
|
|
}
|
|
})());
|
|
|
|
} else if (change.action === "UPDATE") {
|
|
memoryPromises.push((async () => {
|
|
// Update the memory content
|
|
await supabaseClient
|
|
.schema("mori")
|
|
.from("memories")
|
|
.update({
|
|
content: change.content,
|
|
context: change.context
|
|
})
|
|
.eq("id", change.memory_id)
|
|
.eq("user_id", user.id);
|
|
|
|
// Delete old tag associations
|
|
await supabaseClient
|
|
.schema("mori")
|
|
.from("memory_tags")
|
|
.delete()
|
|
.eq("memory_id", change.memory_id);
|
|
|
|
// Re-create tag associations with new tags
|
|
const tagIds: number[] = [];
|
|
|
|
for (const tagName of change.tags) {
|
|
const { data: existingTag } = await supabaseClient
|
|
.schema("mori")
|
|
.from("tags")
|
|
.select("id")
|
|
.eq("name", tagName)
|
|
.eq("user_id", user.id)
|
|
.single();
|
|
|
|
if (existingTag) {
|
|
tagIds.push(existingTag.id);
|
|
} else {
|
|
const { data: newTag } = await supabaseClient
|
|
.schema("mori")
|
|
.from("tags")
|
|
.insert({
|
|
name: tagName,
|
|
user_id: user.id
|
|
})
|
|
.select("id")
|
|
.single();
|
|
|
|
if (newTag) {
|
|
tagIds.push(newTag.id);
|
|
}
|
|
}
|
|
}
|
|
|
|
const junctionInserts = tagIds.map(tagId => ({
|
|
memory_id: change.memory_id,
|
|
tag_id: tagId
|
|
}));
|
|
|
|
await supabaseClient
|
|
.schema("mori")
|
|
.from("memory_tags")
|
|
.insert(junctionInserts);
|
|
})());
|
|
|
|
} else if (change.action === "DELETE") {
|
|
memoryPromises.push(
|
|
supabaseClient
|
|
.schema("mori")
|
|
.from("memories")
|
|
.delete()
|
|
.eq("id", change.memory_id)
|
|
.eq("user_id", user.id)
|
|
);
|
|
}
|
|
}
|
|
|
|
// Wait for all memory operations to complete
|
|
await Promise.all(memoryPromises);
|
|
|
|
// Return the fetched memories for further use
|
|
resolve(fetchedMemories);
|
|
});
|
|
|
|
// Wait for both the summary and memory fetch to complete
|
|
await Promise.all([summarySystemWork, memoryFetchWork]);
|
|
|
|
const formattedMemories = (await memoryFetchWork).map((mem: any) =>
|
|
`[ID: ${mem.id}] ${mem.content} (${mem.context})`
|
|
).join('\n');
|
|
|
|
const summaryJson = await summarySystemWork;
|
|
|
|
|
|
|
|
/*
|
|
Formulate a response plan
|
|
*/
|
|
enqueueJson({
|
|
command: "update_status",
|
|
status_type: "info",
|
|
message: "Devising response plan...",
|
|
});
|
|
|
|
const responsePlanSystemPrompt = prompt_xml.querySelector('response_planner')?.textContent
|
|
.replaceAll("{{PERSONALITY_INJECTION}}", mori_personality);
|
|
const responsePlanCompletion = await openai.chat.completions.create({
|
|
model: "gpt-4.1-mini",
|
|
messages: [
|
|
{
|
|
role: "system",
|
|
content: responsePlanSystemPrompt
|
|
// Static: gets cached
|
|
},
|
|
...requestBody.messages,
|
|
// Recent conversation messages
|
|
{
|
|
role: "assistant",
|
|
content: `CURRENT_TIME: ${new Date().toISOString()}\n\nCONVERSATION_GIST:\n${summaryJson.context}\n\nRELEVANT_MEMORIES:\n${formattedMemories}`
|
|
}
|
|
]
|
|
});
|
|
console.log("Response Plan:", responsePlanCompletion.choices[0]?.message?.content);
|
|
let responsePlanJson = JSON.parse(responsePlanCompletion.choices[0]?.message?.content || "{}");
|
|
// enqueueJson({
|
|
// command: "append_response",
|
|
// content: responsePlanCompletion.choices[0]?.message?.content || "",
|
|
// });
|
|
|
|
/*
|
|
Generate the final response
|
|
*/
|
|
enqueueJson({
|
|
command: "update_status",
|
|
status_type: "info",
|
|
message: "**Mori** is typing...",
|
|
});
|
|
const chatSystemPrompt = prompt_xml.querySelector('chat_responder')?.textContent
|
|
.replaceAll("{{PERSONALITY_INJECTION}}", mori_personality);
|
|
const chatCompletion = await openai.chat.completions.create({
|
|
model: "gpt-4.1-mini",
|
|
messages: [
|
|
{
|
|
role: "system",
|
|
content: chatSystemPrompt || "",
|
|
// Static: gets cached
|
|
},
|
|
...requestBody.messages,
|
|
{
|
|
role: "assistant",
|
|
content: `CURRENT_TIME: ${new Date().toISOString()}\n\nCONVERSATION_GIST:\n${summaryJson.context || "No existing context."}\n\nRELEVANT_MEMORIES:\n${formattedMemories || "No memories retrieved."}\n\nRESPONSE_PLAN:\n${responsePlanJson.plan || "No specific plan."}`
|
|
}
|
|
],
|
|
stream: true,
|
|
});
|
|
|
|
for await (const chunk of chatCompletion) {
|
|
if (chunk.choices[0]?.delta?.content) {
|
|
enqueueJson({
|
|
command: "append_response",
|
|
content: chunk.choices[0].delta.content,
|
|
});
|
|
}
|
|
}
|
|
|
|
enqueueJson({
|
|
command: "update_status",
|
|
status_type: "success",
|
|
message: "",
|
|
});
|
|
|
|
}
|
|
|
|
serve(async (req) => {
|
|
// Handle CORS preflight requests
|
|
if (req.method === 'OPTIONS') {
|
|
return new Response('ok', { headers: corsHeaders });
|
|
}
|
|
|
|
// Get the token from the Authorization header
|
|
const authHeader = req.headers.get('Authorization');
|
|
const token = authHeader?.replace('Bearer ', '');
|
|
|
|
// Initialize the Supabase client
|
|
const supabaseClient = createClient(
|
|
Deno.env.get('SUPABASE_URL') || '',
|
|
Deno.env.get('SUPABASE_ANON_KEY') || '',
|
|
{
|
|
global: {
|
|
headers: { Authorization: `Bearer ${token}` },
|
|
},
|
|
}
|
|
);
|
|
|
|
// Authenticate the user
|
|
const { data: { user }, error } = await supabaseClient.auth.getUser(token);
|
|
|
|
if (error || !user) {
|
|
return new Response(
|
|
JSON.stringify({ error: 'Unauthorized' }),
|
|
{
|
|
status: 401,
|
|
headers: {
|
|
...corsHeaders,
|
|
'Content-Type': 'application/json'
|
|
}
|
|
}
|
|
);
|
|
}
|
|
|
|
// User is authenticated, handle the request
|
|
const readable = new ReadableStream({
|
|
async start(controller) {
|
|
// Wrap controller to log on enqueue
|
|
const originalEnqueue = controller.enqueue.bind(controller);
|
|
controller.enqueue = (chunk) => {
|
|
const decoded = new TextDecoder().decode(chunk);
|
|
console.log('Stream output:', decoded);
|
|
originalEnqueue(chunk);
|
|
};
|
|
|
|
try {
|
|
await handleRequest(req, controller, user, supabaseClient);
|
|
controller.close();
|
|
} catch (error) {
|
|
console.error("Error in stream:", error);
|
|
controller.error(error);
|
|
}
|
|
}
|
|
});
|
|
|
|
return new Response(readable, {
|
|
headers: {
|
|
...corsHeaders,
|
|
'Content-Type': 'text/event-stream',
|
|
'Cache-Control': 'no-cache',
|
|
'Connection': 'keep-alive'
|
|
}
|
|
});
|
|
});
|