Duriin-API/src/embeddings.js

415 lines
12 KiB
JavaScript

const db = require('./db');
const config = require('./config');
const EMBEDDING_MODEL = (config.openRouter && config.openRouter.embeddingModel)
? config.openRouter.embeddingModel
: 'perplexity/pplx-embed-v1-0.6b';
const selectArticleForEmbedding = db.prepare(`
SELECT id, title, description, content
FROM articles
WHERE id = ?
`);
const insertEmbedding = db.prepare(`
INSERT INTO article_embeddings (article_id, embedding)
VALUES (?, ?)
`);
const deleteEmbedding = db.prepare(`
DELETE FROM article_embeddings
WHERE article_id = ?
`);
const selectEmbeddingBuffer = db.prepare(`
SELECT embedding
FROM article_embeddings
WHERE article_id = ?
`);
const upsertEmbeddingMeta = db.prepare(`
INSERT INTO article_embedding_meta (article_id, model, embedded_at)
VALUES (?, ?, datetime('now'))
ON CONFLICT(article_id) DO UPDATE SET
model = excluded.model,
embedded_at = excluded.embedded_at
`);
const markArticleHasEmbedding = db.prepare(`
UPDATE articles SET has_embedding = 1 WHERE id = ? AND has_embedding = 0
`);
const upsertEmbeddingStore = db.prepare(`
INSERT INTO article_embedding_store (article_id, model, embedding, embedded_at)
VALUES (?, ?, ?, datetime('now'))
ON CONFLICT(article_id, model) DO UPDATE SET
embedding = excluded.embedding,
embedded_at = excluded.embedded_at
`);
const selectEmbeddingFromStore = db.prepare(`
SELECT embedding FROM article_embedding_store
WHERE article_id = ? AND model = ?
`);
const nearestNeighbors = db.prepare(`
SELECT article_id, distance
FROM article_embeddings
WHERE embedding MATCH ?
AND k = ?
ORDER BY distance
`);
const selectArticlesMissingEmbeddings = db.prepare(`
SELECT a.id, a.title, a.description, a.content
FROM articles a
WHERE a.title IS NOT NULL
AND TRIM(a.title) != ''
AND a.description IS NOT NULL
AND TRIM(a.description) != ''
AND a.content IS NOT NULL
AND TRIM(a.content) != ''
AND NOT EXISTS (
SELECT 1 FROM article_embedding_store s
WHERE s.article_id = a.id AND s.model = ?
)
ORDER BY a.pub_date_effective DESC, a.id DESC
LIMIT ?
`);
const selectQueryEmbedding = db.prepare(`
SELECT embedding FROM query_embeddings WHERE query = ? AND model = ?
`);
const upsertQueryEmbedding = db.prepare(`
INSERT INTO query_embeddings (query, model, embedding)
VALUES (?, ?, ?)
ON CONFLICT(query, model) DO UPDATE SET
embedding = excluded.embedding,
created_at = datetime('now')
`);
const VEC0_DIM = 8192;
function isOpenRouterEnabled() {
if (!config.openRouter) return false;
if (config.openRouter.enabled === false) return false;
return Boolean(config.openRouter.apiKey && String(config.openRouter.apiKey).trim());
}
function serializeEmbedding(values) {
return Buffer.from(new Float32Array(values).buffer);
}
function padEmbeddingForVec0(values) {
if (values.length === VEC0_DIM) return serializeEmbedding(values);
const padded = new Float32Array(VEC0_DIM);
padded.set(values);
return Buffer.from(padded.buffer);
}
let embeddingBackfillRunning = false;
const embeddingJobsRunning = new Set();
function buildEmbeddingInput(article) {
const title = typeof article.title === 'string' ? article.title.trim() : '';
const description = typeof article.description === 'string' ? article.description.trim() : '';
const content = typeof article.content === 'string' ? article.content.trim() : '';
if (!title || !description || !content) {
return '';
}
return [title, description, content].join('\n\n');
}
function normalizeQuery(input) {
return String(input || '')
.trim()
.toLowerCase()
.replace(/\s+/g, ' ');
}
// supports both single string and array input. openrouter follows the openai
// embeddings contract — when input is an array, payload.data is an array of
// {index, embedding} objects in the same order
async function requestEmbedding(input) {
const response = await fetch('https://openrouter.ai/api/v1/embeddings', {
method: 'POST',
headers: {
Authorization: `Bearer ${String(config.openRouter.apiKey).trim()}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
model: EMBEDDING_MODEL,
input,
}),
});
if (!response.ok) {
let message = `embedding request failed with ${response.status}`;
try {
const payload = await response.json();
const errorMessage = payload && payload.error && payload.error.message;
if (errorMessage) {
message = errorMessage;
}
} catch (error) {
console.error('failed to parse embedding error response:', error);
}
const requestError = new Error(message);
requestError.status = response.status;
throw requestError;
}
const payload = await response.json();
const data = payload && payload.data;
if (!Array.isArray(data) || data.length === 0) {
throw new Error('invalid embedding response: missing data');
}
if (Array.isArray(input)) {
// sort by index to be safe — some providers return out-of-order
const sorted = [...data].sort((a, b) => (a.index ?? 0) - (b.index ?? 0));
return sorted.map((row) => {
if (!Array.isArray(row.embedding) || row.embedding.length === 0) {
throw new Error(`invalid embedding at index ${row.index}`);
}
return row.embedding;
});
}
const embedding = data[0] && data[0].embedding;
if (!Array.isArray(embedding) || embedding.length === 0) {
throw new Error(`invalid embedding in response: ${Array.isArray(embedding) ? 'empty' : 'missing'}`);
}
return embedding;
}
async function generateAndStoreEmbedding(id) {
if (!isOpenRouterEnabled()) {
return { stored: false, shouldPauseBatch: false };
}
if (embeddingJobsRunning.has(id)) {
return { stored: false, shouldPauseBatch: false };
}
if (selectEmbeddingFromStore.get(id, EMBEDDING_MODEL)) {
// already in store — make sure vec0 is also up to date
if (!selectEmbeddingBuffer.get(id)) {
const row = selectEmbeddingFromStore.get(id, EMBEDDING_MODEL);
const vals = new Float32Array(row.embedding.buffer, row.embedding.byteOffset, row.embedding.byteLength / 4);
deleteEmbedding.run(BigInt(id));
insertEmbedding.run(BigInt(id), padEmbeddingForVec0(vals));
upsertEmbeddingMeta.run(id, EMBEDDING_MODEL);
}
return { stored: false, shouldPauseBatch: false };
}
const article = selectArticleForEmbedding.get(id);
if (!article) {
return { stored: false, shouldPauseBatch: false };
}
const input = buildEmbeddingInput(article);
if (!input) {
return { stored: false, shouldPauseBatch: false };
}
embeddingJobsRunning.add(id);
try {
if (selectEmbeddingFromStore.get(id, EMBEDDING_MODEL)) {
return { stored: false, shouldPauseBatch: false };
}
const embedding = await requestEmbedding(input);
const buffer = serializeEmbedding(embedding);
upsertEmbeddingStore.run(id, EMBEDDING_MODEL, buffer);
deleteEmbedding.run(BigInt(id));
insertEmbedding.run(BigInt(id), padEmbeddingForVec0(embedding));
upsertEmbeddingMeta.run(id, EMBEDDING_MODEL);
markArticleHasEmbedding.run(id);
return { stored: true, shouldPauseBatch: false };
} catch (error) {
console.error(`embedding generation failed for article ${id}:`, error);
return {
stored: false,
shouldPauseBatch: error && error.status === 402,
};
} finally {
embeddingJobsRunning.delete(id);
}
}
// writes one batch of {id, embedding} pairs in a single transaction. the
// vec0 insert pads to 8192 dims so models with fewer dims still index
function commitEmbeddingBatch(rows) {
const tx = db.transaction((entries) => {
for (const entry of entries) {
const buffer = serializeEmbedding(entry.embedding);
upsertEmbeddingStore.run(entry.id, EMBEDDING_MODEL, buffer);
deleteEmbedding.run(BigInt(entry.id));
insertEmbedding.run(BigInt(entry.id), padEmbeddingForVec0(entry.embedding));
upsertEmbeddingMeta.run(entry.id, EMBEDDING_MODEL);
markArticleHasEmbedding.run(entry.id);
}
});
tx(rows);
}
// batched backfill — pulls `limit` candidates, sends them as one openrouter
// request (up to batchSize per call), writes the results in one tx. cuts
// per-article overhead from ~1 round-trip to ~1/batchSize round trips
async function backfillMissingEmbeddings(limit = 256, batchSize = 16) {
if (embeddingBackfillRunning) {
return { processed: 0, paused: false };
}
if (!isOpenRouterEnabled()) {
return { processed: 0, paused: false };
}
embeddingBackfillRunning = true;
let processed = 0;
let paused = false;
try {
const candidates = selectArticlesMissingEmbeddings.all(EMBEDDING_MODEL, limit);
if (candidates.length === 0) {
return { processed: 0, paused: false };
}
// pre-build inputs and drop any candidates with empty input (shouldnt
// happen since the query filters, but be defensive)
const eligible = candidates
.map((row) => ({ id: row.id, input: buildEmbeddingInput(row) }))
.filter((row) => row.input);
for (let i = 0; i < eligible.length; i += batchSize) {
const batch = eligible.slice(i, i + batchSize);
const inputs = batch.map((row) => row.input);
try {
const embeddings = await requestEmbedding(inputs);
if (embeddings.length !== batch.length) {
console.error(`embedding batch length mismatch: expected ${batch.length}, got ${embeddings.length}`);
continue;
}
const toCommit = batch.map((row, idx) => ({ id: row.id, embedding: embeddings[idx] }));
commitEmbeddingBatch(toCommit);
processed += toCommit.length;
} catch (error) {
console.error(`embedding batch failed (size ${batch.length}):`, error.message || error);
if (error && error.status === 402) {
paused = true;
break;
}
// on other errors, fall back to per-article so a single bad input
// doesnt poison the whole batch. slow but correct
for (const row of batch) {
try {
const single = await requestEmbedding(row.input);
commitEmbeddingBatch([{ id: row.id, embedding: single }]);
processed += 1;
} catch (singleError) {
console.error(`single embedding fallback failed for article ${row.id}:`, singleError.message || singleError);
if (singleError && singleError.status === 402) {
paused = true;
break;
}
}
}
if (paused) break;
}
}
} finally {
embeddingBackfillRunning = false;
}
return { processed, paused };
}
function hasPendingEmbeddings() {
return Boolean(db.prepare(`
SELECT 1 FROM articles a
WHERE a.title IS NOT NULL AND TRIM(a.title) != ''
AND a.description IS NOT NULL AND TRIM(a.description) != ''
AND a.content IS NOT NULL AND TRIM(a.content) != ''
AND NOT EXISTS (
SELECT 1 FROM article_embedding_store s
WHERE s.article_id = a.id AND s.model = ?
)
LIMIT 1
`).get(EMBEDDING_MODEL));
}
function getEmbeddingBuffer(articleId) {
const row = selectEmbeddingFromStore.get(articleId, EMBEDDING_MODEL);
if (!row) return null;
const vals = new Float32Array(row.embedding.buffer, row.embedding.byteOffset, row.embedding.byteLength / 4);
return padEmbeddingForVec0(vals);
}
function findArticlesByEmbedding(embedding, limit) {
if (!embedding) {
return [];
}
return nearestNeighbors.all(embedding, limit)
.map((row) => ({
articleId: Number(row.article_id),
distance: row.distance,
}));
}
async function getOrCreateQueryEmbedding(query) {
const normalizedQuery = normalizeQuery(query);
if (!normalizedQuery) {
return null;
}
const cached = selectQueryEmbedding.get(normalizedQuery, EMBEDDING_MODEL);
if (cached) {
const vals = new Float32Array(cached.embedding.buffer, cached.embedding.byteOffset, cached.embedding.byteLength / 4);
return padEmbeddingForVec0(vals);
}
const embedding = await requestEmbedding(normalizedQuery);
const buffer = serializeEmbedding(embedding);
upsertQueryEmbedding.run(normalizedQuery, EMBEDDING_MODEL, buffer);
return padEmbeddingForVec0(embedding);
}
function findSimilarArticles(articleId, limit) {
return findArticlesByEmbedding(getEmbeddingBuffer(articleId), limit + 1)
.filter((row) => row.articleId !== Number(articleId))
.slice(0, limit);
}
module.exports = {
backfillMissingEmbeddings,
findArticlesByEmbedding,
generateAndStoreEmbedding,
findSimilarArticles,
getEmbeddingBuffer,
getOrCreateQueryEmbedding,
hasPendingEmbeddings,
};