enhance embedding model support and update database schema for multi-model compatibility

This commit is contained in:
ImBenji 2026-04-18 16:43:13 +01:00
parent 5f2aa7f591
commit c6d5624656
4 changed files with 154 additions and 33 deletions

View file

@ -19,7 +19,8 @@
"tickers": [] "tickers": []
}, },
"openRouter": { "openRouter": {
"apiKey": "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" "apiKey": "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",
"embeddingModel": "qwen/qwen3-embedding-8b"
}, },
"gdelt": { "gdelt": {
"source": "api", "source": "api",

View file

@ -120,10 +120,12 @@ db.exec(`
`); `);
db.exec(` db.exec(`
CREATE TABLE IF NOT EXISTS query_embeddings ( CREATE TABLE IF NOT EXISTS article_embedding_store (
query TEXT PRIMARY KEY, article_id INTEGER NOT NULL,
model TEXT NOT NULL,
embedding BLOB NOT NULL, embedding BLOB NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now')) embedded_at TEXT NOT NULL DEFAULT (datetime('now')),
PRIMARY KEY (article_id, model)
); );
`); `);
@ -135,6 +137,41 @@ db.exec(`
); );
`); `);
// migrate query_embeddings to include model in primary key
{
const cols = db.prepare(`PRAGMA table_info(query_embeddings)`).all();
const hasModel = cols.some(c => c.name === 'model');
if (!hasModel) {
db.exec(`
BEGIN;
CREATE TABLE query_embeddings_new (
query TEXT NOT NULL,
model TEXT NOT NULL,
embedding BLOB NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
PRIMARY KEY (query, model)
);
DROP TABLE IF EXISTS query_embeddings;
ALTER TABLE query_embeddings_new RENAME TO query_embeddings;
COMMIT;
`);
} else {
db.exec(`
CREATE TABLE IF NOT EXISTS query_embeddings (
query TEXT NOT NULL,
model TEXT NOT NULL,
embedding BLOB NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
PRIMARY KEY (query, model)
);
`);
}
}
db.exec(` db.exec(`
CREATE TABLE IF NOT EXISTS gdelt_backfill_windows ( CREATE TABLE IF NOT EXISTS gdelt_backfill_windows (
source_id TEXT NOT NULL, source_id TEXT NOT NULL,

View file

@ -1,24 +1,32 @@
const db = require('./db'); const db = require('./db');
const config = require('./config'); const config = require('./config');
const EMBEDDING_MODEL = (config.openRouter && config.openRouter.embeddingModel)
? config.openRouter.embeddingModel
: 'perplexity/pplx-embed-v1-0.6b';
const selectArticleForEmbedding = db.prepare(` const selectArticleForEmbedding = db.prepare(`
SELECT id, title, description, content SELECT id, title, description, content
FROM articles FROM articles
WHERE id = ? WHERE id = ?
`); `);
const insertEmbedding = db.prepare(` const insertEmbedding = db.prepare(`
INSERT INTO article_embeddings (article_id, embedding) INSERT INTO article_embeddings (article_id, embedding)
VALUES (?, ?) VALUES (?, ?)
`); `);
const deleteEmbedding = db.prepare(` const deleteEmbedding = db.prepare(`
DELETE FROM article_embeddings DELETE FROM article_embeddings
WHERE article_id = ? WHERE article_id = ?
`); `);
const selectEmbeddingBuffer = db.prepare(` const selectEmbeddingBuffer = db.prepare(`
SELECT embedding SELECT embedding
FROM article_embeddings FROM article_embeddings
WHERE article_id = ? WHERE article_id = ?
`); `);
const upsertEmbeddingMeta = db.prepare(` const upsertEmbeddingMeta = db.prepare(`
INSERT INTO article_embedding_meta (article_id, model, embedded_at) INSERT INTO article_embedding_meta (article_id, model, embedded_at)
VALUES (?, ?, datetime('now')) VALUES (?, ?, datetime('now'))
@ -26,9 +34,20 @@ const upsertEmbeddingMeta = db.prepare(`
model = excluded.model, model = excluded.model,
embedded_at = excluded.embedded_at embedded_at = excluded.embedded_at
`); `);
const selectEmbeddingModel = db.prepare(`
SELECT model FROM article_embedding_meta WHERE article_id = ? const upsertEmbeddingStore = db.prepare(`
INSERT INTO article_embedding_store (article_id, model, embedding, embedded_at)
VALUES (?, ?, ?, datetime('now'))
ON CONFLICT(article_id, model) DO UPDATE SET
embedding = excluded.embedding,
embedded_at = excluded.embedded_at
`); `);
const selectEmbeddingFromStore = db.prepare(`
SELECT embedding FROM article_embedding_store
WHERE article_id = ? AND model = ?
`);
const nearestNeighbors = db.prepare(` const nearestNeighbors = db.prepare(`
SELECT article_id, distance SELECT article_id, distance
FROM article_embeddings FROM article_embeddings
@ -36,6 +55,7 @@ const nearestNeighbors = db.prepare(`
AND k = ? AND k = ?
ORDER BY distance ORDER BY distance
`); `);
const selectArticlesMissingEmbeddings = db.prepare(` const selectArticlesMissingEmbeddings = db.prepare(`
SELECT a.id SELECT a.id
FROM articles a FROM articles a
@ -45,40 +65,80 @@ const selectArticlesMissingEmbeddings = db.prepare(`
AND TRIM(a.description) != '' AND TRIM(a.description) != ''
AND a.content IS NOT NULL AND a.content IS NOT NULL
AND TRIM(a.content) != '' AND TRIM(a.content) != ''
AND ( AND NOT EXISTS (
NOT EXISTS ( SELECT 1 FROM article_embedding_store s
SELECT 1 FROM article_embeddings e WHERE e.article_id = a.id WHERE s.article_id = a.id AND s.model = ?
)
OR NOT EXISTS (
SELECT 1 FROM article_embedding_meta m
WHERE m.article_id = a.id AND m.model = ?
)
) )
ORDER BY a.ingested_at ASC, a.id ASC ORDER BY a.ingested_at ASC, a.id ASC
LIMIT ? LIMIT ?
`); `);
const selectQueryEmbedding = db.prepare(` const selectQueryEmbedding = db.prepare(`
SELECT embedding SELECT embedding FROM query_embeddings WHERE query = ? AND model = ?
FROM query_embeddings
WHERE query = ?
`); `);
const upsertQueryEmbedding = db.prepare(` const upsertQueryEmbedding = db.prepare(`
INSERT INTO query_embeddings (query, embedding) INSERT INTO query_embeddings (query, model, embedding)
VALUES (?, ?) VALUES (?, ?, ?)
ON CONFLICT(query) DO UPDATE SET ON CONFLICT(query, model) DO UPDATE SET
embedding = excluded.embedding, embedding = excluded.embedding,
created_at = datetime('now') created_at = datetime('now')
`); `);
const EMBEDDING_MODEL = 'perplexity/pplx-embed-v1-0.6b'; // backfill store from any embeddings that predate multi-model support
try {
db.prepare(`
INSERT OR IGNORE INTO article_embedding_store (article_id, model, embedding)
SELECT e.article_id, ?, e.embedding FROM article_embeddings e
`).run(EMBEDDING_MODEL);
let embeddingBackfillRunning = false;
// backfill meta for any embeddings that existed before model tracking
db.prepare(` db.prepare(`
INSERT OR IGNORE INTO article_embedding_meta (article_id, model) INSERT OR IGNORE INTO article_embedding_meta (article_id, model)
SELECT article_id, ? FROM article_embeddings SELECT article_id, ? FROM article_embedding_store WHERE model = ?
`).run(EMBEDDING_MODEL); `).run(EMBEDDING_MODEL, EMBEDDING_MODEL);
} catch (err) {
console.error('embedding store backfill failed:', err);
}
// if the config model changed, rebuild the vec0 search index from store
function rebuildVec0IfModelChanged() {
const stale = db.prepare(`
SELECT 1 FROM article_embedding_meta WHERE model != ? LIMIT 1
`).get(EMBEDDING_MODEL);
if (!stale) return;
console.log(`embedding model changed to ${EMBEDDING_MODEL}, rebuilding search index...`);
db.exec('BEGIN');
try {
db.exec('DELETE FROM article_embeddings');
db.exec('DELETE FROM article_embedding_meta');
const rows = db.prepare(`
SELECT article_id, embedding FROM article_embedding_store WHERE model = ?
`).all(EMBEDDING_MODEL);
const insertVec = db.prepare(`INSERT INTO article_embeddings (article_id, embedding) VALUES (?, ?)`);
const insertMeta = db.prepare(`INSERT INTO article_embedding_meta (article_id, model) VALUES (?, ?)`);
for (const row of rows) {
insertVec.run(BigInt(row.article_id), row.embedding);
insertMeta.run(row.article_id, EMBEDDING_MODEL);
}
db.exec('COMMIT');
console.log(`rebuilt search index with ${rows.length} embeddings`);
} catch (err) {
db.exec('ROLLBACK');
throw err;
}
}
rebuildVec0IfModelChanged();
let embeddingBackfillRunning = false;
const embeddingJobsRunning = new Set(); const embeddingJobsRunning = new Set();
function buildEmbeddingInput(article) { function buildEmbeddingInput(article) {
@ -157,7 +217,15 @@ async function generateAndStoreEmbedding(id) {
return { stored: false, shouldPauseBatch: false }; return { stored: false, shouldPauseBatch: false };
} }
if (selectEmbeddingBuffer.get(id)) { if (selectEmbeddingFromStore.get(id, EMBEDDING_MODEL)) {
// already in store — make sure vec0 is also up to date
if (!selectEmbeddingBuffer.get(id)) {
const row = selectEmbeddingFromStore.get(id, EMBEDDING_MODEL);
deleteEmbedding.run(BigInt(id));
insertEmbedding.run(BigInt(id), row.embedding);
upsertEmbeddingMeta.run(id, EMBEDDING_MODEL);
}
return { stored: false, shouldPauseBatch: false }; return { stored: false, shouldPauseBatch: false };
} }
@ -174,14 +242,18 @@ async function generateAndStoreEmbedding(id) {
embeddingJobsRunning.add(id); embeddingJobsRunning.add(id);
try { try {
if (selectEmbeddingBuffer.get(id)) { if (selectEmbeddingFromStore.get(id, EMBEDDING_MODEL)) {
return { stored: false, shouldPauseBatch: false }; return { stored: false, shouldPauseBatch: false };
} }
const embedding = await requestEmbedding(input); const embedding = await requestEmbedding(input);
const buffer = serializeEmbedding(embedding);
upsertEmbeddingStore.run(id, EMBEDDING_MODEL, buffer);
deleteEmbedding.run(BigInt(id)); deleteEmbedding.run(BigInt(id));
insertEmbedding.run(BigInt(id), serializeEmbedding(embedding)); insertEmbedding.run(BigInt(id), buffer);
upsertEmbeddingMeta.run(id, EMBEDDING_MODEL); upsertEmbeddingMeta.run(id, EMBEDDING_MODEL);
return { stored: true, shouldPauseBatch: false }; return { stored: true, shouldPauseBatch: false };
} catch (error) { } catch (error) {
console.error(`embedding generation failed for article ${id}:`, error); console.error(`embedding generation failed for article ${id}:`, error);
@ -216,7 +288,7 @@ async function backfillMissingEmbeddings(limit = 100) {
} }
function getEmbeddingBuffer(articleId) { function getEmbeddingBuffer(articleId) {
const row = selectEmbeddingBuffer.get(articleId); const row = selectEmbeddingFromStore.get(articleId, EMBEDDING_MODEL);
return row ? row.embedding : null; return row ? row.embedding : null;
} }
@ -238,14 +310,14 @@ async function getOrCreateQueryEmbedding(query) {
return null; return null;
} }
const cached = selectQueryEmbedding.get(normalizedQuery); const cached = selectQueryEmbedding.get(normalizedQuery, EMBEDDING_MODEL);
if (cached) { if (cached) {
return cached.embedding; return cached.embedding;
} }
const embedding = await requestEmbedding(normalizedQuery); const embedding = await requestEmbedding(normalizedQuery);
const buffer = serializeEmbedding(embedding); const buffer = serializeEmbedding(embedding);
upsertQueryEmbedding.run(normalizedQuery, buffer); upsertQueryEmbedding.run(normalizedQuery, EMBEDDING_MODEL, buffer);
return buffer; return buffer;
} }

View file

@ -28,6 +28,13 @@ async function statusRoutes(fastify) {
{ total: 0, usable: 0 } { total: 0, usable: 0 }
); );
const embeddingModelRows = db.prepare(`
SELECT model, COUNT(*) AS article_count
FROM article_embedding_store
GROUP BY model
ORDER BY article_count DESC
`).all();
return { return {
total: totals.total, total: totals.total,
usable: totals.usable, usable: totals.usable,
@ -35,6 +42,10 @@ async function statusRoutes(fastify) {
bySource: Object.fromEntries( bySource: Object.fromEntries(
bySourceRows.map((row) => [row.source, { total: row.total, usable: row.usable }]) bySourceRows.map((row) => [row.source, { total: row.total, usable: row.usable }])
), ),
embeddingModels: embeddingModelRows.map((row) => ({
model: row.model,
articles: row.article_count,
})),
}; };
}); });
} }