const cron = require('node-cron'); const config = require('./config'); const { ingestBatch } = require('./ingest'); const { fetchRssArticles } = require('./sources/rss'); const { fetchGdeltArticles, hasPendingWindows } = require('./sources/gdelt'); const { fetchEdgarArticles } = require('./sources/edgar'); const { fetchAlphaVantageArticles } = require('./sources/alphavantage'); const { fetchFinnhubArticles } = require('./sources/finnhub'); const { fetchGoogleNewsArticles } = require('./sources/googleNews'); const { backfillMissingContent, hasPendingContent } = require('./content'); const { backfillMissingEmbeddings } = require('./embeddings'); function sleep(ms) { return new Promise((resolve) => setTimeout(resolve, ms)); } async function runSource(source, fetcher) { try { const articles = await fetcher(); return await ingestBatch(source, articles); } catch (error) { console.error(`${source} ingestion failed:`, error); return { source, inserted: 0, total: 0, error: error.message }; } } async function runAllIngestions() { const results = []; results.push(await runSource('rss', fetchRssArticles)); // gdelt streams via callback so we dont accumulate every article in memory let gdeltInserted = 0; let gdeltTotal = 0; try { await fetchGdeltArticles(async (articles) => { const result = await ingestBatch("gdelt", articles); gdeltInserted += result.inserted; gdeltTotal += result.total; }); results.push({ source: "gdelt", inserted: gdeltInserted, total: gdeltTotal }); } catch (error) { console.error("gdelt ingestion failed:", error); results.push({ source: "gdelt", inserted: gdeltInserted, total: gdeltTotal, error: error.message }); } results.push(await runSource('edgar', fetchEdgarArticles)); results.push(await runSource('alphavantage', fetchAlphaVantageArticles)); results.push(await runSource('finnhub', fetchFinnhubArticles)); results.push(await runSource('googlenews', fetchGoogleNewsArticles)); try { await backfillMissingContent(); } catch (error) { console.error('content backfill failed:', error); } try { await backfillMissingEmbeddings(); } catch (error) { console.error('embedding backfill failed:', error); } return results; } function startScheduler() { const runRss = async () => { await runSource('rss', fetchRssArticles); }; const runGdeltLoop = async () => { while (true) { if (!hasPendingWindows()) { await sleep(60 * 1000); continue; } // both api and bigquery paths now stream per-window so we never hold the full // result set across all sources × 52 weeks in memory at once try { await fetchGdeltArticles(async (articles) => { await ingestBatch("gdelt", articles); }); } catch (error) { console.error("gdelt ingestion failed:", error); } } }; const runEdgar = async () => { await runSource('edgar', fetchEdgarArticles); }; const runAlphaVantage = async () => { await runSource('alphavantage', fetchAlphaVantageArticles); }; const runFinnhub = async () => { await runSource('finnhub', fetchFinnhubArticles); }; const runGoogleNews = async () => { await runSource('googlenews', fetchGoogleNewsArticles); }; const runContentLoop = async () => { while (true) { if (!hasPendingContent()) { await sleep(60 * 1000); continue; } try { const concurrency = Number(config.contentBackfill?.concurrency) || 5; const perSource = Number(config.contentBackfill?.perSource) || 50; await backfillMissingContent(perSource, concurrency); } catch (error) { console.error('content backfill failed:', error); } try { await backfillMissingEmbeddings(); } catch (error) { console.error('embedding backfill failed:', error); } } }; runRss(); runGdeltLoop(); runEdgar(); runAlphaVantage(); runFinnhub(); // runGoogleNews(); runContentLoop(); cron.schedule(config.scheduler.rss, runRss); cron.schedule(config.scheduler.edgar, runEdgar); cron.schedule(config.scheduler.alphaVantage, runAlphaVantage); cron.schedule(config.scheduler.finnhub, runFinnhub); } module.exports = { startScheduler, runAllIngestions, };