From a10c5eb39f48ce1cbd758d2afed2898da10b6ae1 Mon Sep 17 00:00:00 2001 From: ImBenji Date: Sat, 18 Apr 2026 23:58:32 +0100 Subject: [PATCH] migrate article embeddings to support multi-model architecture and enhance data integrity --- config.json | 2 +- src/ingest.js | 6 ++++-- src/scheduler.js | 28 ++++++++++++++++++++++------ src/sources/gdelt.js | 21 ++++++++++++++++----- 4 files changed, 43 insertions(+), 14 deletions(-) diff --git a/config.json b/config.json index 2f203d1..677c985 100644 --- a/config.json +++ b/config.json @@ -43,7 +43,7 @@ "googleNews": "0 * * * *" }, "contentBackfill": { - "concurrency": 10, + "concurrency": 100, "perSource": 50 }, "browser": { diff --git a/src/ingest.js b/src/ingest.js index 2cf32cf..c33d43c 100644 --- a/src/ingest.js +++ b/src/ingest.js @@ -1,6 +1,5 @@ const db = require('./db'); const { normalizeTitle } = require('./dedup'); -const { fetchAndStoreContent } = require('./content'); const { markSourceRun } = require('./state'); const insertArticle = db.prepare(` @@ -95,7 +94,10 @@ function ingestArticle(article) { ingestedAt ); - fetchAndStoreContent(result.lastInsertRowid, url, title, description); + // dont kick off the content fetch here — it used to be fire-and-forget which + // pinned thousands of pending render promises in memory during big gdelt + // backfills. the runContentLoop polls for pending rows and handles them + // with proper concurrency limits return { inserted: true, id: result.lastInsertRowid }; } catch (error) { diff --git a/src/scheduler.js b/src/scheduler.js index 10f49b9..0763482 100644 --- a/src/scheduler.js +++ b/src/scheduler.js @@ -28,7 +28,22 @@ async function runAllIngestions() { const results = []; results.push(await runSource('rss', fetchRssArticles)); - results.push(await runSource('gdelt', fetchGdeltArticles)); + + // gdelt streams via callback so we dont accumulate every article in memory + let gdeltInserted = 0; + let gdeltTotal = 0; + try { + await fetchGdeltArticles(async (articles) => { + const result = await ingestBatch("gdelt", articles); + gdeltInserted += result.inserted; + gdeltTotal += result.total; + }); + results.push({ source: "gdelt", inserted: gdeltInserted, total: gdeltTotal }); + } catch (error) { + console.error("gdelt ingestion failed:", error); + results.push({ source: "gdelt", inserted: gdeltInserted, total: gdeltTotal, error: error.message }); + } + results.push(await runSource('edgar', fetchEdgarArticles)); results.push(await runSource('alphavantage', fetchAlphaVantageArticles)); results.push(await runSource('finnhub', fetchFinnhubArticles)); @@ -61,14 +76,15 @@ function startScheduler() { continue; } - const isBigQuery = String(config.gdelt?.source || 'api').toLowerCase() === 'bigquery'; - if (isBigQuery) { + // both api and bigquery paths now stream per-window so we never hold the full + // result set across all sources × 52 weeks in memory at once + try { await fetchGdeltArticles(async (articles) => { - await ingestBatch('gdelt', articles); + await ingestBatch("gdelt", articles); }); - } else { - await runSource('gdelt', fetchGdeltArticles); + } catch (error) { + console.error("gdelt ingestion failed:", error); } } }; diff --git a/src/sources/gdelt.js b/src/sources/gdelt.js index 16b25fa..2578442 100644 --- a/src/sources/gdelt.js +++ b/src/sources/gdelt.js @@ -203,8 +203,12 @@ async function fetchGdeltArticlesBigQuery(onWindow) { return allArticles; } -async function fetchGdeltArticles() { - const articles = []; +async function fetchGdeltArticles(onWindow) { + // when onWindow is provided we stream per-window and never hold the full set in memory. + // the legacy non-streaming caller still gets an array back, but only for tiny one-shot + // runs — anything that loops over many sources should be using the callback path + const articles = onWindow ? null : []; + const windows = buildWeeklyWindows(); const requestDelayMs = Math.max(0, Number(config.gdelt?.requestDelayMs) || 5500); const maxWindowsPerRun = Number(config.gdelt?.maxWindowsPerRun) || 0; @@ -223,9 +227,16 @@ async function fetchGdeltArticles() { try { const windowArticles = await fetchWindow(source, window); - articles.push(...windowArticles); markWindowCompleted(source.id, window); windowsFetched += 1; + + if (onWindow) { + if (windowArticles.length > 0) { + await onWindow(windowArticles); + } + } else { + articles.push(...windowArticles); + } } catch (error) { if (error && error.status === 429) { console.warn(`GDELT window rate-limited for ${source.id} ${window.startKey}-${window.endKey}`); @@ -246,7 +257,7 @@ async function fetchGdeltArticles() { } } - return articles; + return articles || []; } function hasPendingWindows() { @@ -266,7 +277,7 @@ function fetchGdeltArticlesRouted(onWindow) { if (source === 'bigquery') { return fetchGdeltArticlesBigQuery(onWindow); } - return fetchGdeltArticles(); + return fetchGdeltArticles(onWindow); } module.exports = {