398 lines
13 KiB
TypeScript
398 lines
13 KiB
TypeScript
// @ts-ignore
|
|
import { serve } from "https://deno.land/std@0.168.0/http/server.ts";
|
|
// @ts-ignore
|
|
import { load } from "https://deno.land/std@0.224.0/dotenv/mod.ts";
|
|
// @ts-ignore
|
|
import { DOMParser } from "https://deno.land/x/deno_dom@v0.1.45/deno-dom-wasm.ts";
|
|
import { User } from "https://esm.sh/@supabase/auth-js@2.76.1/dist/module/lib/types.d.ts";
|
|
|
|
import { createClient } from "https://esm.sh/@supabase/supabase-js@2";
|
|
|
|
//
|
|
|
|
import OpenAI from "npm:openai@4";
|
|
|
|
// Load environment variables
|
|
await load({ export: true, envPath: ".env" });
|
|
|
|
// Initialize Supabase client
|
|
let supabaseClient = null;
|
|
|
|
// Load and parse prompts.xml
|
|
const xmlContent = await Deno.readTextFile(new URL('./prompts.xml', import.meta.url).pathname);
|
|
const doc = new DOMParser().parseFromString(xmlContent, 'text/html');
|
|
|
|
const corsHeaders = {
|
|
'Access-Control-Allow-Origin': '*',
|
|
'Access-Control-Allow-Headers': 'authorization, x-client-info, apikey, content-type'
|
|
};
|
|
|
|
/*
|
|
Stage 2: Process Input (Extract Memories)
|
|
*/
|
|
async function extractMemories(controller, messages, doc, relevantMemories?) {
|
|
|
|
// Fetch existing memory tags from the database, that belong to the user
|
|
const user : User = (await supabaseClient.auth.getUser()).data.user;
|
|
|
|
const tags = await supabaseClient
|
|
.schema("mori")
|
|
.from("tags")
|
|
.select("*")
|
|
.eq("user_id", user.id);
|
|
|
|
console.log("Fetched existing tags for user:", tags.data?.length || 0);
|
|
|
|
// Create and call OpenAI to process the input messages
|
|
console.log("Creating OpenAI client for processing input");
|
|
const openai = new OpenAI({
|
|
apiKey: Deno.env.get('OPENAI_API_KEY')
|
|
});
|
|
|
|
const system_prompt = doc?.querySelector('memory_extraction')?.textContent?.trim() || '';
|
|
|
|
console.log("Calling OpenAI API for processing...");
|
|
const response = await openai.chat.completions.create({
|
|
model: 'gpt-4.1-mini',
|
|
temperature: 0.1,
|
|
max_completion_tokens: 20000,
|
|
messages: [
|
|
{ role: 'system', content: system_prompt },
|
|
...messages,
|
|
{
|
|
role: "assistant",
|
|
content: `I have access to the following reference data:
|
|
|
|
Available tags: ${JSON.stringify(tags.data?.map(t => t.name) || [])}
|
|
|
|
Existing memories: ${JSON.stringify(relevantMemories || [])}
|
|
|
|
Now I will analyze the conversation above and extract memories.`
|
|
}
|
|
]
|
|
});
|
|
|
|
const processedContent = response.choices[0]?.message?.content || '';
|
|
console.log("Processing complete, sending processed content to client");
|
|
|
|
// Decode the json content
|
|
let processedData;
|
|
try {
|
|
processedData = JSON.parse(processedContent);
|
|
} catch (error) {
|
|
console.error("Error parsing processed content:", error);
|
|
throw new Error("Failed to parse processed content");
|
|
}
|
|
|
|
// Iterate over the changes and process them
|
|
for (const change of processedData.changes || []) {
|
|
|
|
if (change.action === "ADD") {
|
|
// First, fetch the tag rows that already exist
|
|
let tags = [];
|
|
for (const tagName of change.tags) {
|
|
|
|
const tagRow = await supabaseClient
|
|
.schema("mori")
|
|
.from("tags")
|
|
.select("*")
|
|
.eq("name", tagName)
|
|
.single();
|
|
|
|
if (tagRow.data) {
|
|
tags.push(tagRow.data);
|
|
}
|
|
}
|
|
|
|
// Insert any tags that do not already exist into the database
|
|
for (const tagName of change.tags) {
|
|
|
|
// Ensure we don't duplicate tags
|
|
let tagExists = false;
|
|
for (const tag of tags) {
|
|
if (tag.name === tagName) {
|
|
tagExists = true;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (tagExists) {
|
|
continue;
|
|
}
|
|
|
|
const insertTag = await supabaseClient
|
|
.schema("mori")
|
|
.from("tags")
|
|
.insert([{
|
|
name: tagName,
|
|
user_id: user.id
|
|
}])
|
|
.select()
|
|
.single();
|
|
|
|
if (insertTag.data) {
|
|
tags.push(insertTag.data);
|
|
}
|
|
}
|
|
|
|
// Now, insert the memory itself
|
|
const insertMemory = await supabaseClient
|
|
.schema("mori")
|
|
.from("memories")
|
|
.insert([{
|
|
content: change.content,
|
|
context: change.context,
|
|
user_id: user.id,
|
|
}])
|
|
.select()
|
|
.single();
|
|
|
|
// Now, link the tags to the memory in the memory_tags table
|
|
for (const tag of tags) {
|
|
await supabaseClient
|
|
.schema("mori")
|
|
.from("memory_tags")
|
|
.insert([{
|
|
memory_id: insertMemory.data.id,
|
|
tag_id: tag.id
|
|
}]);
|
|
}
|
|
|
|
} else if (change.action === "UPDATE") {
|
|
// Update existing memory
|
|
await supabaseClient
|
|
.schema("mori")
|
|
.from("memories")
|
|
.update({
|
|
content: change.content,
|
|
context: change.context,
|
|
updated_at: new Date().toISOString()
|
|
})
|
|
.eq("id", change.memory_id)
|
|
.eq("user_id", user.id);
|
|
|
|
// TODO: Handle tag updates if needed
|
|
// (delete old memory_tags links and create new ones)
|
|
|
|
} else if (change.action === "DELETE") {
|
|
// Delete memory (cascade should handle memory_tags)
|
|
await supabaseClient
|
|
.schema("mori")
|
|
.from("memories")
|
|
.delete()
|
|
.eq("id", change.memory_id)
|
|
.eq("user_id", user.id);
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
/*
|
|
Stage 1: Fetch Relevant Memories.
|
|
*/
|
|
async function fetchRelevantMemories(controller, messages, doc) {
|
|
|
|
// Fetch existing memory tags from the database, that belong to the user
|
|
const user : User = (await supabaseClient.auth.getUser()).data.user;
|
|
|
|
const tags = await supabaseClient
|
|
.schema("mori")
|
|
.from("tags")
|
|
.select("*")
|
|
.eq("user_id", user.id);
|
|
|
|
console.log("Fetched existing tags for user:", tags.data?.length || 0);
|
|
|
|
// Create and call OpenAI to process the input messages
|
|
console.log("Creating OpenAI client for generating a response");
|
|
const openai = new OpenAI({
|
|
apiKey: Deno.env.get('OPENAI_API_KEY')
|
|
});
|
|
|
|
let system_prompt = doc?.querySelector('memory_query')?.textContent?.trim() || '';
|
|
|
|
console.log("Calling OpenAI API for fetching relevant memories...");
|
|
const response = await openai.chat.completions.create({
|
|
model: 'gpt-4.1-mini',
|
|
messages: [
|
|
{ role: 'system', content: system_prompt },
|
|
...messages,
|
|
{
|
|
role: "user",
|
|
content: "Existing tags: " + JSON.stringify(tags.data || [])
|
|
}
|
|
],
|
|
});
|
|
|
|
const relevantMemoryTags = response.choices[0]?.message?.content || '';
|
|
let relevantMemoryTagsParsed;
|
|
try {
|
|
relevantMemoryTagsParsed = JSON.parse(relevantMemoryTags);
|
|
} catch (error) {
|
|
console.error("Error parsing relevant memories content:", error);
|
|
throw new Error("Failed to parse relevant memories content");
|
|
}
|
|
|
|
|
|
const { data: relevantMemories } = await supabaseClient
|
|
.rpc("get_memories_by_tags", {
|
|
tag_names: relevantMemoryTagsParsed.selected_tags,
|
|
p_user_id: user.id
|
|
});
|
|
|
|
return relevantMemories;
|
|
}
|
|
|
|
/*
|
|
Stage 3: Generate Response
|
|
*/
|
|
async function generateResponse(controller, messages, doc, relevantMemories) {
|
|
|
|
// Fetch existing memory tags from the database, that belong to the user
|
|
const user : User = (await supabaseClient.auth.getUser()).data.user;
|
|
|
|
console.log("Creating OpenAI client for generating a response");
|
|
const openai = new OpenAI({
|
|
apiKey: Deno.env.get('OPENAI_API_KEY')
|
|
});
|
|
|
|
let system_prompt = doc?.querySelector('system_response')?.textContent?.trim() || '';
|
|
system_prompt = system_prompt.replaceAll("{{username}}", user.user_metadata.username || 'User');
|
|
|
|
console.log("Calling OpenAI API for streaming response...");
|
|
|
|
const responseMessages = [
|
|
{ role: 'system', content: system_prompt },
|
|
];
|
|
|
|
// Add relevant memories as context if available
|
|
if (relevantMemories && relevantMemories.length > 0) {
|
|
responseMessages.push({
|
|
role: 'assistant',
|
|
content: `Context from previous conversations:\n${relevantMemories.map(m => `- ${m.content}`).join('\n')}\n\nI'll use this context naturally in our conversation.`
|
|
});
|
|
}
|
|
|
|
responseMessages.push(...messages);
|
|
|
|
const stream = await openai.chat.completions.create({
|
|
model: 'gpt-4.1-mini',
|
|
messages: responseMessages,
|
|
stream: true
|
|
});
|
|
|
|
console.log("Stream created, starting to read chunks...");
|
|
for await (const chunk of stream) {
|
|
const content = chunk.choices[0]?.delta?.content || '';
|
|
if (content) {
|
|
const data = `data: ${JSON.stringify({ type: 'content', content })}\n\n`;
|
|
controller.enqueue(new TextEncoder().encode(data));
|
|
}
|
|
}
|
|
}
|
|
|
|
serve(async (req)=>{
|
|
|
|
/*
|
|
Handle CORS preflight requests
|
|
*/
|
|
if (req.method === 'OPTIONS') {
|
|
return new Response('ok', {
|
|
headers: corsHeaders
|
|
});
|
|
}
|
|
|
|
/*
|
|
Authenticate with supabase API key
|
|
*/
|
|
|
|
// Get the token from the Authorization header
|
|
const authHeader = req.headers.get('Authorization')
|
|
const token = authHeader?.replace('Bearer ', '')
|
|
|
|
// Initialise the Supabase client
|
|
supabaseClient = createClient(
|
|
Deno.env.get('SUPABASE_URL') || '',
|
|
Deno.env.get('SUPABASE_ANON_KEY') || '',
|
|
{
|
|
global: {
|
|
headers: { Authorization: `Bearer ${token}` },
|
|
},
|
|
}
|
|
);
|
|
|
|
const user = await supabaseClient.auth.getUser(token);
|
|
|
|
if (user.error) {
|
|
return new Response(JSON.stringify({
|
|
error: 'Unauthorized'
|
|
}), {
|
|
status: 401,
|
|
headers: {
|
|
...corsHeaders,
|
|
'Content-Type': 'application/json'
|
|
}
|
|
});
|
|
}
|
|
|
|
const username = user.data.user?.user_metadata.username || 'User';
|
|
|
|
|
|
/*
|
|
Gearing up to process the request
|
|
*/
|
|
const body = await req.json();
|
|
const { messages } = body;
|
|
|
|
// Create the stream that will be used throughout the pipeline
|
|
const readable = new ReadableStream({
|
|
async start(controller) {
|
|
try {
|
|
|
|
/*
|
|
Stage 1: Fetch Relevant Memories
|
|
*/
|
|
const stageFetchingData = `data: ${JSON.stringify({ type: 'stage', stage: 'fetching' })}\n\n`;
|
|
controller.enqueue(new TextEncoder().encode(stageFetchingData));
|
|
|
|
const relevantMemories = await fetchRelevantMemories(controller, messages, doc);
|
|
|
|
/*
|
|
Stage 2: Extract Relevant Memories
|
|
*/
|
|
const stageProcessingData = `data: ${JSON.stringify({ type: 'stage', stage: 'processing' })}\n\n`;
|
|
controller.enqueue(new TextEncoder().encode(stageProcessingData));
|
|
|
|
await extractMemories(controller, messages, doc, relevantMemories);
|
|
|
|
/*
|
|
Stage 3: Stream the response back to the client
|
|
*/
|
|
const stageRespondingData = `data: ${JSON.stringify({ type: 'stage', stage: 'responding' })}\n\n`;
|
|
controller.enqueue(new TextEncoder().encode(stageRespondingData));
|
|
|
|
await generateResponse(controller, messages, doc, relevantMemories);
|
|
|
|
// Send stage update: complete
|
|
const completeData = `data: ${JSON.stringify({ type: 'stage', stage: 'complete' })}\n\n`;
|
|
controller.enqueue(new TextEncoder().encode(completeData));
|
|
|
|
console.log("Stream completed, closing controller");
|
|
controller.close();
|
|
} catch (error) {
|
|
console.error("Error in pipeline:", error);
|
|
controller.error(error);
|
|
}
|
|
}
|
|
});
|
|
|
|
return new Response(readable, {
|
|
headers: {
|
|
...corsHeaders,
|
|
'Content-Type': 'text/event-stream',
|
|
'Cache-Control': 'no-cache',
|
|
'Connection': 'keep-alive'
|
|
}
|
|
});
|
|
});
|