diff --git a/src/content.js b/src/content.js index 1c47720..d24b380 100644 --- a/src/content.js +++ b/src/content.js @@ -171,7 +171,7 @@ async function fetchAndStoreContent(id, url) { } } -async function backfillMissingContent(limit = 10) { +async function backfillMissingContent(limit = 100, concurrency = 5) { if (contentBackfillRunning) { return; } @@ -183,15 +183,26 @@ async function backfillMissingContent(limit = 10) { ? selectAllArticlesMissingContent.all() : selectArticlesMissingContent.all(limit); - for (const row of rows) { - await fetchAndStoreContent(row.id, row.url); + for (let i = 0; i < rows.length; i += concurrency) { + const batch = rows.slice(i, i + concurrency); + await Promise.all(batch.map((row) => fetchAndStoreContent(row.id, row.url))); } } finally { contentBackfillRunning = false; } } +function hasPendingContent() { + return Boolean(db.prepare(` + SELECT 1 FROM articles + WHERE (content IS NULL OR TRIM(content) = '') + AND (content_status IS NULL OR content_status = 'pending') + LIMIT 1 + `).get()); +} + module.exports = { fetchAndStoreContent, backfillMissingContent, + hasPendingContent, }; diff --git a/src/db.js b/src/db.js index 7f65b14..7680bc5 100644 --- a/src/db.js +++ b/src/db.js @@ -127,6 +127,14 @@ db.exec(` ); `); +db.exec(` + CREATE TABLE IF NOT EXISTS article_embedding_meta ( + article_id INTEGER PRIMARY KEY, + model TEXT NOT NULL, + embedded_at TEXT NOT NULL DEFAULT (datetime('now')) + ); +`); + db.exec(` CREATE TABLE IF NOT EXISTS gdelt_backfill_windows ( source_id TEXT NOT NULL, diff --git a/src/embeddings.js b/src/embeddings.js index 47f1054..cd6ebd6 100644 --- a/src/embeddings.js +++ b/src/embeddings.js @@ -19,6 +19,16 @@ const selectEmbeddingBuffer = db.prepare(` FROM article_embeddings WHERE article_id = ? `); +const upsertEmbeddingMeta = db.prepare(` + INSERT INTO article_embedding_meta (article_id, model, embedded_at) + VALUES (?, ?, datetime('now')) + ON CONFLICT(article_id) DO UPDATE SET + model = excluded.model, + embedded_at = excluded.embedded_at +`); +const selectEmbeddingModel = db.prepare(` + SELECT model FROM article_embedding_meta WHERE article_id = ? +`); const nearestNeighbors = db.prepare(` SELECT article_id, distance FROM article_embeddings @@ -35,10 +45,14 @@ const selectArticlesMissingEmbeddings = db.prepare(` AND TRIM(a.description) != '' AND a.content IS NOT NULL AND TRIM(a.content) != '' - AND NOT EXISTS ( - SELECT 1 - FROM article_embeddings e - WHERE e.article_id = a.id + AND ( + NOT EXISTS ( + SELECT 1 FROM article_embeddings e WHERE e.article_id = a.id + ) + OR NOT EXISTS ( + SELECT 1 FROM article_embedding_meta m + WHERE m.article_id = a.id AND m.model = ? + ) ) ORDER BY a.ingested_at ASC, a.id ASC LIMIT ? @@ -56,7 +70,15 @@ const upsertQueryEmbedding = db.prepare(` created_at = datetime('now') `); +const EMBEDDING_MODEL = 'perplexity/pplx-embed-v1-0.6b'; + let embeddingBackfillRunning = false; + +// backfill meta for any embeddings that existed before model tracking +db.prepare(` + INSERT OR IGNORE INTO article_embedding_meta (article_id, model) + SELECT article_id, ? FROM article_embeddings +`).run(EMBEDDING_MODEL); const embeddingJobsRunning = new Set(); function buildEmbeddingInput(article) { @@ -90,7 +112,7 @@ async function requestEmbedding(input) { 'Content-Type': 'application/json', }, body: JSON.stringify({ - model: 'perplexity/pplx-embed-v1-0.6b', + model: EMBEDDING_MODEL, input, }), }); @@ -159,6 +181,7 @@ async function generateAndStoreEmbedding(id) { const embedding = await requestEmbedding(input); deleteEmbedding.run(BigInt(id)); insertEmbedding.run(BigInt(id), serializeEmbedding(embedding)); + upsertEmbeddingMeta.run(id, EMBEDDING_MODEL); return { stored: true, shouldPauseBatch: false }; } catch (error) { console.error(`embedding generation failed for article ${id}:`, error); @@ -179,7 +202,7 @@ async function backfillMissingEmbeddings(limit = 100) { embeddingBackfillRunning = true; try { - const rows = selectArticlesMissingEmbeddings.all(limit); + const rows = selectArticlesMissingEmbeddings.all(EMBEDDING_MODEL, limit); for (const row of rows) { const result = await generateAndStoreEmbedding(row.id); diff --git a/src/scheduler.js b/src/scheduler.js index 73d5f30..32c3c5c 100644 --- a/src/scheduler.js +++ b/src/scheduler.js @@ -7,7 +7,7 @@ const { fetchEdgarArticles } = require('./sources/edgar'); const { fetchAlphaVantageArticles } = require('./sources/alphavantage'); const { fetchFinnhubArticles } = require('./sources/finnhub'); const { fetchGoogleNewsArticles } = require('./sources/googleNews'); -const { backfillMissingContent } = require('./content'); +const { backfillMissingContent, hasPendingContent } = require('./content'); const { backfillMissingEmbeddings } = require('./embeddings'); function sleep(ms) { @@ -89,17 +89,24 @@ function startScheduler() { await runSource('googlenews', fetchGoogleNewsArticles); }; - const runContentMaintenance = async () => { - try { - await backfillMissingContent(); - } catch (error) { - console.error('content backfill failed:', error); - } + const runContentLoop = async () => { + while (true) { + if (!hasPendingContent()) { + await sleep(60 * 1000); + continue; + } - try { - await backfillMissingEmbeddings(); - } catch (error) { - console.error('embedding backfill failed:', error); + try { + await backfillMissingContent(); + } catch (error) { + console.error('content backfill failed:', error); + } + + try { + await backfillMissingEmbeddings(); + } catch (error) { + console.error('embedding backfill failed:', error); + } } }; @@ -109,14 +116,13 @@ function startScheduler() { runAlphaVantage(); runFinnhub(); // runGoogleNews(); - runContentMaintenance(); + runContentLoop(); cron.schedule(config.scheduler.rss, runRss); cron.schedule(config.scheduler.edgar, runEdgar); cron.schedule(config.scheduler.alphaVantage, runAlphaVantage); cron.schedule(config.scheduler.finnhub, runFinnhub); -cron.schedule(config.contentBackfill.cron, runContentMaintenance); } module.exports = { diff --git a/src/sources/gdelt.js b/src/sources/gdelt.js index 4a5345e..16b25fa 100644 --- a/src/sources/gdelt.js +++ b/src/sources/gdelt.js @@ -114,14 +114,19 @@ async function fetchWindowBigQuery(source, window, bigquery) { const maxRecords = Math.max(1, Math.min(Number(config.gdelt?.maxRecords) || 100, 1000)); const domainClauses = source.website.map((d) => `LOWER(DocumentIdentifier) LIKE '%${d}%'`).join(' OR '); + // use _PARTITIONTIME (not the INTEGER DATE column) so BigQuery can prune partitions + // and avoid full-table scans on the ~130TB gkg table + const startTs = window.start.toISOString(); + const endTs = window.end.toISOString(); + const query = ` SELECT DocumentIdentifier AS url, SourceCommonName AS domain, CAST(DATE AS STRING) AS seendate FROM \`gdelt-bq.gdeltv2.gkg\` - WHERE DATE >= ${window.startKey} - AND DATE < ${window.endKey} + WHERE _PARTITIONTIME >= TIMESTAMP("${startTs}") + AND _PARTITIONTIME < TIMESTAMP("${endTs}") AND (${domainClauses}) LIMIT ${maxRecords} `;