Duriin-API/src/scheduler.js

211 lines
6.3 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

const cron = require('node-cron');
const config = require('./config');
const { ingestBatch } = require('./ingest');
const { fetchRssArticles } = require('./sources/rss');
const { fetchGdeltArticles, hasPendingWindows } = require('./sources/gdelt');
const { fetchEdgarArticles } = require('./sources/edgar');
const { fetchAlphaVantageArticles } = require('./sources/alphavantage');
const { fetchFinnhubArticles } = require('./sources/finnhub');
const { fetchGoogleNewsArticles } = require('./sources/googleNews');
const { backfillMissingContent, runBackfillWorker, hasPendingContent } = require('./content');
const { backfillMissingEmbeddings, hasPendingEmbeddings } = require('./embeddings');
const { backfillMissingClusters, hasPendingClusters } = require('./clustering');
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
async function runSource(source, fetcher) {
try {
const articles = await fetcher();
return await ingestBatch(source, articles);
} catch (error) {
console.error(`${source} ingestion failed:`, error);
return { source, inserted: 0, total: 0, error: error.message };
}
}
async function runAllIngestions() {
const results = [];
results.push(await runSource('rss', fetchRssArticles));
// gdelt streams via callback so we dont accumulate every article in memory
let gdeltInserted = 0;
let gdeltTotal = 0;
try {
await fetchGdeltArticles(async (articles) => {
const result = await ingestBatch("gdelt", articles);
gdeltInserted += result.inserted;
gdeltTotal += result.total;
});
results.push({ source: "gdelt", inserted: gdeltInserted, total: gdeltTotal });
} catch (error) {
console.error("gdelt ingestion failed:", error);
results.push({ source: "gdelt", inserted: gdeltInserted, total: gdeltTotal, error: error.message });
}
results.push(await runSource('edgar', fetchEdgarArticles));
results.push(await runSource('alphavantage', fetchAlphaVantageArticles));
results.push(await runSource('finnhub', fetchFinnhubArticles));
results.push(await runSource('googlenews', fetchGoogleNewsArticles));
try {
await backfillMissingContent();
} catch (error) {
console.error('content backfill failed:', error);
}
try {
await backfillMissingEmbeddings();
} catch (error) {
console.error('embedding backfill failed:', error);
}
return results;
}
function startScheduler() {
const runRss = async () => {
await runSource('rss', fetchRssArticles);
};
const runGdeltLoop = async () => {
while (true) {
if (!hasPendingWindows()) {
await sleep(60 * 1000);
continue;
}
// both api and bigquery paths now stream per-window so we never hold the full
// result set across all sources × 52 weeks in memory at once
try {
await fetchGdeltArticles(async (articles) => {
await ingestBatch("gdelt", articles);
});
} catch (error) {
console.error("gdelt ingestion failed:", error);
}
}
};
const runEdgar = async () => {
await runSource('edgar', fetchEdgarArticles);
};
const runAlphaVantage = async () => {
await runSource('alphavantage', fetchAlphaVantageArticles);
};
const runFinnhub = async () => {
await runSource('finnhub', fetchFinnhubArticles);
};
const runGoogleNews = async () => {
await runSource('googlenews', fetchGoogleNewsArticles);
};
// each content worker pulls a disjoint partition of pending articles by
// source. they share the global plain/browser semaphores in content.js so
// bumping worker count doesnt blow past the per-pool concurrency caps
const runContentWorker = async (workerIndex, workerCount) => {
while (true) {
if (!hasPendingContent()) {
await sleep(30 * 1000);
continue;
}
try {
const perSource = Number(config.contentBackfill?.perSource) || 50;
const batchSize = Number(config.contentBackfill?.batchSize) || 25;
const processed = await runBackfillWorker({ workerIndex, workerCount, perSource, batchSize });
// if a worker found nothing in its partition, brief sleep so we dont
// hammer the db with empty selects
if (processed === 0) {
await sleep(5000);
}
} catch (error) {
console.error(`content worker ${workerIndex} failed:`, error);
await sleep(5000);
}
}
};
// dedicated embedding loop — runs independently of content fetch so slow
// openrouter calls dont block the content pipeline. batches per request
const runEmbeddingLoop = async () => {
while (true) {
if (!hasPendingEmbeddings()) {
await sleep(30 * 1000);
continue;
}
try {
const limit = Number(config.embeddingBackfill?.perRound) || 256;
const batchSize = Number(config.embeddingBackfill?.batchSize) || 16;
const result = await backfillMissingEmbeddings(limit, batchSize);
if (result.paused) {
// 402 means out of credits — wait longer before retrying
await sleep(5 * 60 * 1000);
} else if (result.processed === 0) {
await sleep(5000);
}
} catch (error) {
console.error('embedding backfill failed:', error);
await sleep(10000);
}
}
};
runRss();
runGdeltLoop();
runEdgar();
runAlphaVantage();
runFinnhub();
// runGoogleNews();
const workerCount = Math.max(1, Number(config.contentBackfill?.workers) || 3);
for (let i = 0; i < workerCount; i += 1) {
runContentWorker(i, workerCount);
}
const runClusteringLoop = async () => {
while (true) {
if (!hasPendingClusters()) {
await sleep(30 * 1000);
continue;
}
try {
const limit = Number(config.clustering && config.clustering.perRound) || 128;
const result = await backfillMissingClusters(limit);
if (result.processed === 0) {
await sleep(5000);
}
} catch (error) {
console.error('clustering backfill failed:', error);
await sleep(10000);
}
}
};
runEmbeddingLoop();
runClusteringLoop();
cron.schedule(config.scheduler.rss, runRss);
cron.schedule(config.scheduler.edgar, runEdgar);
cron.schedule(config.scheduler.alphaVantage, runAlphaVantage);
cron.schedule(config.scheduler.finnhub, runFinnhub);
}
module.exports = {
startScheduler,
runAllIngestions,
};