diff --git a/config.json b/config.json index c9e50bb..5e76049 100644 --- a/config.json +++ b/config.json @@ -19,7 +19,8 @@ "tickers": [] }, "openRouter": { - "apiKey": "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" + "apiKey": "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", + "embeddingModel": "qwen/qwen3-embedding-8b" }, "gdelt": { "source": "api", diff --git a/src/db.js b/src/db.js index 7680bc5..04b1595 100644 --- a/src/db.js +++ b/src/db.js @@ -120,10 +120,12 @@ db.exec(` `); db.exec(` - CREATE TABLE IF NOT EXISTS query_embeddings ( - query TEXT PRIMARY KEY, + CREATE TABLE IF NOT EXISTS article_embedding_store ( + article_id INTEGER NOT NULL, + model TEXT NOT NULL, embedding BLOB NOT NULL, - created_at TEXT NOT NULL DEFAULT (datetime('now')) + embedded_at TEXT NOT NULL DEFAULT (datetime('now')), + PRIMARY KEY (article_id, model) ); `); @@ -135,6 +137,41 @@ db.exec(` ); `); +// migrate query_embeddings to include model in primary key +{ + const cols = db.prepare(`PRAGMA table_info(query_embeddings)`).all(); + const hasModel = cols.some(c => c.name === 'model'); + + if (!hasModel) { + db.exec(` + BEGIN; + + CREATE TABLE query_embeddings_new ( + query TEXT NOT NULL, + model TEXT NOT NULL, + embedding BLOB NOT NULL, + created_at TEXT NOT NULL DEFAULT (datetime('now')), + PRIMARY KEY (query, model) + ); + + DROP TABLE IF EXISTS query_embeddings; + ALTER TABLE query_embeddings_new RENAME TO query_embeddings; + + COMMIT; + `); + } else { + db.exec(` + CREATE TABLE IF NOT EXISTS query_embeddings ( + query TEXT NOT NULL, + model TEXT NOT NULL, + embedding BLOB NOT NULL, + created_at TEXT NOT NULL DEFAULT (datetime('now')), + PRIMARY KEY (query, model) + ); + `); + } +} + db.exec(` CREATE TABLE IF NOT EXISTS gdelt_backfill_windows ( source_id TEXT NOT NULL, diff --git a/src/embeddings.js b/src/embeddings.js index cd6ebd6..16bacb7 100644 --- a/src/embeddings.js +++ b/src/embeddings.js @@ -1,24 +1,32 @@ const db = require('./db'); const config = require('./config'); +const EMBEDDING_MODEL = (config.openRouter && config.openRouter.embeddingModel) + ? config.openRouter.embeddingModel + : 'perplexity/pplx-embed-v1-0.6b'; + const selectArticleForEmbedding = db.prepare(` SELECT id, title, description, content FROM articles WHERE id = ? `); + const insertEmbedding = db.prepare(` INSERT INTO article_embeddings (article_id, embedding) VALUES (?, ?) `); + const deleteEmbedding = db.prepare(` DELETE FROM article_embeddings WHERE article_id = ? `); + const selectEmbeddingBuffer = db.prepare(` SELECT embedding FROM article_embeddings WHERE article_id = ? `); + const upsertEmbeddingMeta = db.prepare(` INSERT INTO article_embedding_meta (article_id, model, embedded_at) VALUES (?, ?, datetime('now')) @@ -26,9 +34,20 @@ const upsertEmbeddingMeta = db.prepare(` model = excluded.model, embedded_at = excluded.embedded_at `); -const selectEmbeddingModel = db.prepare(` - SELECT model FROM article_embedding_meta WHERE article_id = ? + +const upsertEmbeddingStore = db.prepare(` + INSERT INTO article_embedding_store (article_id, model, embedding, embedded_at) + VALUES (?, ?, ?, datetime('now')) + ON CONFLICT(article_id, model) DO UPDATE SET + embedding = excluded.embedding, + embedded_at = excluded.embedded_at `); + +const selectEmbeddingFromStore = db.prepare(` + SELECT embedding FROM article_embedding_store + WHERE article_id = ? AND model = ? +`); + const nearestNeighbors = db.prepare(` SELECT article_id, distance FROM article_embeddings @@ -36,6 +55,7 @@ const nearestNeighbors = db.prepare(` AND k = ? ORDER BY distance `); + const selectArticlesMissingEmbeddings = db.prepare(` SELECT a.id FROM articles a @@ -45,40 +65,80 @@ const selectArticlesMissingEmbeddings = db.prepare(` AND TRIM(a.description) != '' AND a.content IS NOT NULL AND TRIM(a.content) != '' - AND ( - NOT EXISTS ( - SELECT 1 FROM article_embeddings e WHERE e.article_id = a.id - ) - OR NOT EXISTS ( - SELECT 1 FROM article_embedding_meta m - WHERE m.article_id = a.id AND m.model = ? - ) + AND NOT EXISTS ( + SELECT 1 FROM article_embedding_store s + WHERE s.article_id = a.id AND s.model = ? ) ORDER BY a.ingested_at ASC, a.id ASC LIMIT ? `); + const selectQueryEmbedding = db.prepare(` - SELECT embedding - FROM query_embeddings - WHERE query = ? + SELECT embedding FROM query_embeddings WHERE query = ? AND model = ? `); + const upsertQueryEmbedding = db.prepare(` - INSERT INTO query_embeddings (query, embedding) - VALUES (?, ?) - ON CONFLICT(query) DO UPDATE SET + INSERT INTO query_embeddings (query, model, embedding) + VALUES (?, ?, ?) + ON CONFLICT(query, model) DO UPDATE SET embedding = excluded.embedding, created_at = datetime('now') `); -const EMBEDDING_MODEL = 'perplexity/pplx-embed-v1-0.6b'; +// backfill store from any embeddings that predate multi-model support +try { + db.prepare(` + INSERT OR IGNORE INTO article_embedding_store (article_id, model, embedding) + SELECT e.article_id, ?, e.embedding FROM article_embeddings e + `).run(EMBEDDING_MODEL); + + db.prepare(` + INSERT OR IGNORE INTO article_embedding_meta (article_id, model) + SELECT article_id, ? FROM article_embedding_store WHERE model = ? + `).run(EMBEDDING_MODEL, EMBEDDING_MODEL); +} catch (err) { + console.error('embedding store backfill failed:', err); +} + +// if the config model changed, rebuild the vec0 search index from store +function rebuildVec0IfModelChanged() { + const stale = db.prepare(` + SELECT 1 FROM article_embedding_meta WHERE model != ? LIMIT 1 + `).get(EMBEDDING_MODEL); + + if (!stale) return; + + console.log(`embedding model changed to ${EMBEDDING_MODEL}, rebuilding search index...`); + + db.exec('BEGIN'); + + try { + db.exec('DELETE FROM article_embeddings'); + db.exec('DELETE FROM article_embedding_meta'); + + const rows = db.prepare(` + SELECT article_id, embedding FROM article_embedding_store WHERE model = ? + `).all(EMBEDDING_MODEL); + + const insertVec = db.prepare(`INSERT INTO article_embeddings (article_id, embedding) VALUES (?, ?)`); + const insertMeta = db.prepare(`INSERT INTO article_embedding_meta (article_id, model) VALUES (?, ?)`); + + for (const row of rows) { + insertVec.run(BigInt(row.article_id), row.embedding); + insertMeta.run(row.article_id, EMBEDDING_MODEL); + } + + db.exec('COMMIT'); + console.log(`rebuilt search index with ${rows.length} embeddings`); + } catch (err) { + db.exec('ROLLBACK'); + throw err; + } +} + +rebuildVec0IfModelChanged(); let embeddingBackfillRunning = false; - -// backfill meta for any embeddings that existed before model tracking -db.prepare(` - INSERT OR IGNORE INTO article_embedding_meta (article_id, model) - SELECT article_id, ? FROM article_embeddings -`).run(EMBEDDING_MODEL); const embeddingJobsRunning = new Set(); function buildEmbeddingInput(article) { @@ -157,7 +217,15 @@ async function generateAndStoreEmbedding(id) { return { stored: false, shouldPauseBatch: false }; } - if (selectEmbeddingBuffer.get(id)) { + if (selectEmbeddingFromStore.get(id, EMBEDDING_MODEL)) { + // already in store — make sure vec0 is also up to date + if (!selectEmbeddingBuffer.get(id)) { + const row = selectEmbeddingFromStore.get(id, EMBEDDING_MODEL); + deleteEmbedding.run(BigInt(id)); + insertEmbedding.run(BigInt(id), row.embedding); + upsertEmbeddingMeta.run(id, EMBEDDING_MODEL); + } + return { stored: false, shouldPauseBatch: false }; } @@ -174,14 +242,18 @@ async function generateAndStoreEmbedding(id) { embeddingJobsRunning.add(id); try { - if (selectEmbeddingBuffer.get(id)) { + if (selectEmbeddingFromStore.get(id, EMBEDDING_MODEL)) { return { stored: false, shouldPauseBatch: false }; } const embedding = await requestEmbedding(input); + const buffer = serializeEmbedding(embedding); + + upsertEmbeddingStore.run(id, EMBEDDING_MODEL, buffer); deleteEmbedding.run(BigInt(id)); - insertEmbedding.run(BigInt(id), serializeEmbedding(embedding)); + insertEmbedding.run(BigInt(id), buffer); upsertEmbeddingMeta.run(id, EMBEDDING_MODEL); + return { stored: true, shouldPauseBatch: false }; } catch (error) { console.error(`embedding generation failed for article ${id}:`, error); @@ -216,7 +288,7 @@ async function backfillMissingEmbeddings(limit = 100) { } function getEmbeddingBuffer(articleId) { - const row = selectEmbeddingBuffer.get(articleId); + const row = selectEmbeddingFromStore.get(articleId, EMBEDDING_MODEL); return row ? row.embedding : null; } @@ -238,14 +310,14 @@ async function getOrCreateQueryEmbedding(query) { return null; } - const cached = selectQueryEmbedding.get(normalizedQuery); + const cached = selectQueryEmbedding.get(normalizedQuery, EMBEDDING_MODEL); if (cached) { return cached.embedding; } const embedding = await requestEmbedding(normalizedQuery); const buffer = serializeEmbedding(embedding); - upsertQueryEmbedding.run(normalizedQuery, buffer); + upsertQueryEmbedding.run(normalizedQuery, EMBEDDING_MODEL, buffer); return buffer; } diff --git a/src/routes/status.js b/src/routes/status.js index 1f323a3..dce57b5 100644 --- a/src/routes/status.js +++ b/src/routes/status.js @@ -28,6 +28,13 @@ async function statusRoutes(fastify) { { total: 0, usable: 0 } ); + const embeddingModelRows = db.prepare(` + SELECT model, COUNT(*) AS article_count + FROM article_embedding_store + GROUP BY model + ORDER BY article_count DESC + `).all(); + return { total: totals.total, usable: totals.usable, @@ -35,6 +42,10 @@ async function statusRoutes(fastify) { bySource: Object.fromEntries( bySourceRows.map((row) => [row.source, { total: row.total, usable: row.usable }]) ), + embeddingModels: embeddingModelRows.map((row) => ({ + model: row.model, + articles: row.article_count, + })), }; }); }