diff --git a/config.json b/config.json index 677c985..f79e684 100644 --- a/config.json +++ b/config.json @@ -43,11 +43,18 @@ "googleNews": "0 * * * *" }, "contentBackfill": { - "concurrency": 100, - "perSource": 50 + "workers": 3, + "perSource": 50, + "batchSize": 25, + "plainConcurrency": 50, + "browserConcurrency": 8 + }, + "embeddingBackfill": { + "perRound": 256, + "batchSize": 16 }, "browser": { - "maxConcurrentPages": 25 + "maxConcurrentPages": 8 }, "googleNews": { "queries": [ diff --git a/package.json b/package.json index b71edbc..9bf7332 100644 --- a/package.json +++ b/package.json @@ -19,7 +19,6 @@ "node-cron": "^4.2.1", "playwright": "^1.59.1", "rss-parser": "^3.13.0", - "sharp": "^0.34.5", "sqlite-vec": "^0.1.9" } } diff --git a/src/content.js b/src/content.js index 6f5b5bf..c1c8f46 100644 --- a/src/content.js +++ b/src/content.js @@ -1,8 +1,6 @@ const { extractFromHtml } = require("@extractus/article-extractor"); -const sharp = require("sharp"); const db = require("./db"); const config = require("./config"); -const { generateAndStoreEmbedding } = require("./embeddings"); const { fetchWithPolicy } = require("./http"); const { getSharedBrowserSession } = require("./sources/browserCrawler"); const { validateExtractedArticle } = require("./contentValidation"); @@ -18,18 +16,21 @@ const { const MAX_PLAIN_HTML_LENGTH = 1_500_000; const PLAIN_FETCH_TIMEOUT = 12000; const BROWSER_FETCH_TIMEOUT = 20000; +const HEAD_PRECHECK_TIMEOUT = 6000; -// retry windows for failures that look transient (validation rejected the -// page, fetch timed out). genuinely terminal failures (404, dead url) get -// a hard cap on attempt count instead const VALIDATION_RETRY_AFTER_MS = 24 * 60 * 60 * 1000; const TRANSIENT_RETRY_AFTER_MS = 6 * 60 * 60 * 1000; const MAX_TERMINAL_ATTEMPTS = 3; +// flaky domains get a HEAD precheck before we waste a body fetch. only kicks +// in once a domain has accumulated some failure history; pristine domains +// skip the round trip +const HEAD_PRECHECK_FAILURE_THRESHOLD = 2; + const updateArticleAssets = db.prepare(` UPDATE articles - SET content = ?, image = ?, content_status = 'ready', content_error = NULL, + SET content = ?, content_status = 'ready', content_error = NULL, content_attempted_at = ?, content_attempt_count = content_attempt_count + 1, content_retry_after = NULL WHERE id = ? @@ -58,9 +59,10 @@ const markContentPending = db.prepare(` WHERE id = ? `); -// round-robin pull of articles needing content. respects content_retry_after so -// a freshly-rejected article doesnt get retried in the next loop iteration -const selectRoundRobinArticlesMissingContent = db.prepare(` +// pulls a partition of pending articles. workerIndex/workerCount partitions +// by article id (deterministic) so multiple workers never see the same row. +// also round-robins by source so no single domain dominates the queue +const selectPartitionedArticlesMissingContent = db.prepare(` SELECT id, url, title, description FROM ( SELECT id, url, title, description, source, @@ -69,6 +71,7 @@ const selectRoundRobinArticlesMissingContent = db.prepare(` WHERE (content IS NULL OR TRIM(content) = '') AND (content_status IS NULL OR content_status = 'pending') AND (content_retry_after IS NULL OR content_retry_after <= datetime('now')) + AND (id % ?) = ? ) WHERE rn <= ? ORDER BY rn, source @@ -79,7 +82,37 @@ const selectAttemptCount = db.prepare(` `); -let contentBackfillRunning = false; +// shared semaphore — tracks both plain and browser pool occupancy across all +// workers. defining at module scope so the limits are global, not per-worker +function makeSemaphore(limit) { + let active = 0; + const waiters = []; + + return { + async acquire() { + if (active < limit) { + active += 1; + return; + } + await new Promise((resolve) => waiters.push(resolve)); + active += 1; + }, + release() { + active = Math.max(0, active - 1); + const next = waiters.shift(); + if (next) next(); + }, + inFlight() { + return active; + }, + }; +} + +const PLAIN_CONCURRENCY = Number(config.contentBackfill?.plainConcurrency) || 50; +const BROWSER_CONCURRENCY = Number(config.contentBackfill?.browserConcurrency) || 8; + +const plainSemaphore = makeSemaphore(PLAIN_CONCURRENCY); +const browserSemaphore = makeSemaphore(BROWSER_CONCURRENCY); function getErrorStatus(error) { @@ -91,11 +124,6 @@ function getErrorStatus(error) { return match ? Number(match[1]) : null; } -function getErrorMessage(error, fallback) { - const message = String((error && error.message) || fallback || "").trim(); - return message ? message.slice(0, 500) : null; -} - function nowIso() { return new Date().toISOString(); } @@ -105,74 +133,66 @@ function futureIso(ms) { } -async function fetchCompressedImage(url) { - const response = await fetchWithPolicy(url, { - retries: 1, - headers: { Accept: "image/*" }, - }); - - if (!response.ok) { - const error = new Error(`image request failed with ${response.status}`); - error.status = response.status; - throw error; +// cheap HEAD check before pulling the body. only used on domains we already +// know are unreliable. if HEAD says 404/410/451, skip the body fetch entirely +async function headPrecheck(url) { + try { + const response = await fetchWithPolicy(url, { + method: "HEAD", + timeout: HEAD_PRECHECK_TIMEOUT, + retries: 0, + }); + return { status: response.status, finalUrl: response.url || url }; + } catch (error) { + return { status: getErrorStatus(error), error }; } - - const contentType = String(response.headers.get("content-type") || "").toLowerCase(); - if (!contentType.startsWith("image/")) { - throw new Error(`image request returned ${contentType || "unknown content-type"}`); - } - - const input = Buffer.from(await response.arrayBuffer()); - if (input.length === 0) { - throw new Error("image request returned an empty body"); - } - - const output = await sharp(input) - .rotate() - .resize({ width: 320, height: 320, fit: "inside", withoutEnlargement: true }) - .webp({ quality: 25 }) - .toBuffer(); - - return output.toString("base64"); } -// plain http fetch — no js execution. fast, low memory, but fails on -// js-rendered sites and gets blocked by cloudflare more often async function fetchPlainHtml(url) { - const response = await fetchWithPolicy(url, { - timeout: PLAIN_FETCH_TIMEOUT, - retries: 1, - }); + await plainSemaphore.acquire(); + try { + const response = await fetchWithPolicy(url, { + timeout: PLAIN_FETCH_TIMEOUT, + retries: 1, + }); - if (!response.ok) { - const error = new Error(`plain fetch returned ${response.status}`); - error.status = response.status; - throw error; + if (!response.ok) { + const error = new Error(`plain fetch returned ${response.status}`); + error.status = response.status; + throw error; + } + + const contentType = String(response.headers.get("content-type") || "").toLowerCase(); + if (contentType && !contentType.includes("html") && !contentType.includes("xml")) { + throw new Error(`plain fetch returned non-html content-type: ${contentType}`); + } + + const text = await response.text(); + return { + html: text.slice(0, MAX_PLAIN_HTML_LENGTH), + finalUrl: response.url || url, + }; + } finally { + plainSemaphore.release(); } - - const contentType = String(response.headers.get("content-type") || "").toLowerCase(); - if (contentType && !contentType.includes("html") && !contentType.includes("xml")) { - throw new Error(`plain fetch returned non-html content-type: ${contentType}`); - } - - const text = await response.text(); - return { - html: text.slice(0, MAX_PLAIN_HTML_LENGTH), - finalUrl: response.url || url, - }; } async function fetchBrowserHtml(url) { - const maxConcurrentPages = Number(config.browser?.maxConcurrentPages) || 25; - const session = await getSharedBrowserSession({ - requestTimeout: BROWSER_FETCH_TIMEOUT, - maxConcurrentPages, - }); + await browserSemaphore.acquire(); + try { + const maxConcurrentPages = Number(config.browser?.maxConcurrentPages) || 8; + const session = await getSharedBrowserSession({ + requestTimeout: BROWSER_FETCH_TIMEOUT, + maxConcurrentPages, + }); - const html = await session.fetchRenderedHtml(url, { timeout: BROWSER_FETCH_TIMEOUT }); - return { html, finalUrl: url }; + const html = await session.fetchRenderedHtml(url, { timeout: BROWSER_FETCH_TIMEOUT }); + return { html, finalUrl: url }; + } finally { + browserSemaphore.release(); + } } @@ -183,10 +203,6 @@ function stripHtmlContent(value) { } -// runs fetch → extract → validate. returns { ok, article, html, finalUrl, reason } -// where article has been post-processed (content stripped of html). on failure, -// reason explains what tripped — used both for logging and for the per-domain -// policy update async function attemptFetch(url, fetcher) { let html; let finalUrl; @@ -231,11 +247,29 @@ function getAttemptCount(id) { } +// shouldPrecheck: domains with at least N consecutive plain or browser failures +// in their policy entry get a HEAD check first. we read the row directly here +// rather than threading through the policy module +const selectFailureCounts = db.prepare(` + SELECT consecutive_plain_failures, consecutive_browser_failures + FROM domain_fetch_policy WHERE domain = ? +`); + +function shouldPrecheck(url) { + try { + const domain = new URL(url).hostname.toLowerCase(); + const row = selectFailureCounts.get(domain); + if (!row) return false; + return (row.consecutive_plain_failures + row.consecutive_browser_failures) >= HEAD_PRECHECK_FAILURE_THRESHOLD; + } catch { + return false; + } +} + + async function fetchAndStoreContent(id, url, storedTitle, storedDescription) { const policy = getEffectivePolicy(url); - // domains we know are blocked — skip the fetch entirely until ttl expires. - // the row stays pending so it'll get picked up after the policy resets if (policy.policy === "blocked") { markContentPending.run( `domain blocked by policy`, @@ -246,6 +280,16 @@ async function fetchAndStoreContent(id, url, storedTitle, storedDescription) { return; } + // HEAD precheck for known-flaky domains. if it returns 404/410/451 we mark + // terminal-failed without burning a body fetch. transient errors fall through + if (shouldPrecheck(url)) { + const head = await headPrecheck(url); + if (head.status === 404 || head.status === 410 || head.status === 451) { + markContentFailed.run(`head ${head.status}`, nowIso(), id); + return; + } + } + const tryPlainFirst = policy.policy === "auto" || policy.policy === "plain_only"; let plainResult = null; let browserResult = null; @@ -256,15 +300,12 @@ async function fetchAndStoreContent(id, url, storedTitle, storedDescription) { if (plainResult.ok) { recordPlainSuccess(url); - await commitArticle(id, url, plainResult, storedTitle, storedDescription); + commitArticle(id, url, plainResult, storedTitle, storedDescription); return; } recordPlainFailure(url); - // hard 4xx (other than 408/429) on plain — domain might serve the same to - // browser, but try anyway since it's cheap once the policy hasnt flipped yet. - // 408/429/5xx defer for retry const status = plainResult.error && getErrorStatus(plainResult.error); if (status === 408 || status === 429 || (status && status >= 500)) { markContentPending.run( @@ -277,8 +318,6 @@ async function fetchAndStoreContent(id, url, storedTitle, storedDescription) { } } - // policy.policy === "plain_only" means we just tried plain and failed — - // dont escalate to browser, the operator (or earlier domain memory) said no if (policy.policy === "plain_only") { recordValidationFailure(id, plainResult); return; @@ -289,7 +328,7 @@ async function fetchAndStoreContent(id, url, storedTitle, storedDescription) { if (browserResult.ok) { recordBrowserSuccess(url); - await commitArticle(id, url, browserResult, storedTitle, storedDescription); + commitArticle(id, url, browserResult, storedTitle, storedDescription); return; } @@ -306,9 +345,6 @@ async function fetchAndStoreContent(id, url, storedTitle, storedDescription) { return; } - // both paths exhausted (or browser-only path failed). decide between - // pending-with-retry and terminal failed based on attempt count and - // whether the validator thought it was retryable recordValidationFailure(id, browserResult); } @@ -318,7 +354,6 @@ function recordValidationFailure(id, result) { const retryable = result?.retryable !== false; const attempts = getAttemptCount(id); - // hard fetch errors with no retryable signal — terminal after a few tries if (!retryable || attempts + 1 >= MAX_TERMINAL_ATTEMPTS) { markContentFailed.run(reason, nowIso(), id); return; @@ -328,11 +363,10 @@ function recordValidationFailure(id, result) { } -async function commitArticle(id, url, result, storedTitle, storedDescription) { - const { article, finalUrl } = result; +function commitArticle(id, url, result, storedTitle, storedDescription) { + const { article } = result; const content = article.content || null; - // if stored title looks like a raw url, replace with extracted one const titleLooksLikeUrl = storedTitle && /^https?:\/\//i.test(storedTitle.trim()); if (titleLooksLikeUrl) { const scrapedTitle = typeof article.title === "string" ? article.title.trim() : null; @@ -342,47 +376,33 @@ async function commitArticle(id, url, result, storedTitle, storedDescription) { } } - let image = null; - if (article.image) { - try { - image = await fetchCompressedImage(article.image); - } catch (error) { - const status = getErrorStatus(error); - if (status === 401 || status === 403 || status === 404 || status === 429) { - console.warn(`image fetch skipped for ${url}: upstream returned ${status}`); - } else { - console.error(`image fetch failed for ${url}:`, error.message || error); - } - } - } + updateArticleAssets.run(content, nowIso(), id); - updateArticleAssets.run(content, image, nowIso(), id); - - try { - await generateAndStoreEmbedding(id); - } catch (error) { - console.error(`embedding failed for article ${id}:`, error.message || error); - } + // embedding generation is no longer kicked off here — runEmbeddingLoop + // in scheduler.js batches them in its own pipeline. that decouples slow + // openrouter calls from content fetch throughput } -async function backfillMissingContent(perSource = 50, concurrency = 5) { - if (contentBackfillRunning) { - return; +// runs one worker pass — pulls its partition slice, fires N concurrent fetches +// from a single backfill loop. multiple workers share the plain/browser +// semaphores so total concurrency stays bounded regardless of worker count +async function runBackfillWorker({ workerIndex, workerCount, perSource, batchSize }) { + const rows = selectPartitionedArticlesMissingContent.all(workerCount, workerIndex, perSource); + if (rows.length === 0) return 0; + + // dispatch in chunks of batchSize so we don't allocate thousands of unawaited + // promises at once. the semaphores throttle inside fetchAndStoreContent + for (let i = 0; i < rows.length; i += batchSize) { + const batch = rows.slice(i, i + batchSize); + await Promise.all(batch.map((row) => + fetchAndStoreContent(row.id, row.url, row.title, row.description).catch((error) => { + console.error(`backfill worker ${workerIndex} failed on ${row.url}:`, error.message || error); + }) + )); } - contentBackfillRunning = true; - - try { - const rows = selectRoundRobinArticlesMissingContent.all(perSource); - - for (let i = 0; i < rows.length; i += concurrency) { - const batch = rows.slice(i, i + concurrency); - await Promise.all(batch.map((row) => fetchAndStoreContent(row.id, row.url, row.title, row.description))); - } - } finally { - contentBackfillRunning = false; - } + return rows.length; } @@ -397,8 +417,20 @@ function hasPendingContent() { } +// kept for backwards compat with scheduler/runAllIngestions one-shot runs +async function backfillMissingContent(perSource = 50, concurrency = 50) { + await runBackfillWorker({ + workerIndex: 0, + workerCount: 1, + perSource, + batchSize: concurrency, + }); +} + + module.exports = { fetchAndStoreContent, backfillMissingContent, + runBackfillWorker, hasPendingContent, }; diff --git a/src/db.js b/src/db.js index b8896b5..9dc208b 100644 --- a/src/db.js +++ b/src/db.js @@ -9,6 +9,8 @@ sqliteVec.load(db); db.pragma('journal_mode = WAL'); +// the image column is retained as a no-op for backwards compat with old rows. +// new code never writes to it; drop in a future migration if you really want db.exec(` CREATE TABLE IF NOT EXISTS articles ( id INTEGER PRIMARY KEY AUTOINCREMENT, diff --git a/src/embeddings.js b/src/embeddings.js index a2c2f50..d821378 100644 --- a/src/embeddings.js +++ b/src/embeddings.js @@ -57,7 +57,7 @@ const nearestNeighbors = db.prepare(` `); const selectArticlesMissingEmbeddings = db.prepare(` - SELECT a.id + SELECT a.id, a.title, a.description, a.content FROM articles a WHERE a.title IS NOT NULL AND TRIM(a.title) != '' @@ -254,6 +254,9 @@ function normalizeQuery(input) { .replace(/\s+/g, ' '); } +// supports both single string and array input. openrouter follows the openai +// embeddings contract — when input is an array, payload.data is an array of +// {index, embedding} objects in the same order async function requestEmbedding(input) { const response = await fetch('https://openrouter.ai/api/v1/embeddings', { method: 'POST', @@ -286,7 +289,24 @@ async function requestEmbedding(input) { } const payload = await response.json(); - const embedding = payload && payload.data && payload.data[0] && payload.data[0].embedding; + const data = payload && payload.data; + + if (!Array.isArray(data) || data.length === 0) { + throw new Error('invalid embedding response: missing data'); + } + + if (Array.isArray(input)) { + // sort by index to be safe — some providers return out-of-order + const sorted = [...data].sort((a, b) => (a.index ?? 0) - (b.index ?? 0)); + return sorted.map((row) => { + if (!Array.isArray(row.embedding) || row.embedding.length === 0) { + throw new Error(`invalid embedding at index ${row.index}`); + } + return row.embedding; + }); + } + + const embedding = data[0] && data[0].embedding; if (!Array.isArray(embedding) || embedding.length === 0) { throw new Error(`invalid embedding in response: ${Array.isArray(embedding) ? 'empty' : 'missing'}`); } @@ -357,25 +377,117 @@ async function generateAndStoreEmbedding(id) { } } -async function backfillMissingEmbeddings(limit = 100) { +// writes one batch of {id, embedding} pairs in a single transaction. the +// vec0 insert pads to 8192 dims so models with fewer dims still index +function commitEmbeddingBatch(rows) { + const tx = db.transaction((entries) => { + for (const entry of entries) { + const buffer = serializeEmbedding(entry.embedding); + upsertEmbeddingStore.run(entry.id, EMBEDDING_MODEL, buffer); + deleteEmbedding.run(BigInt(entry.id)); + insertEmbedding.run(BigInt(entry.id), padEmbeddingForVec0(entry.embedding)); + upsertEmbeddingMeta.run(entry.id, EMBEDDING_MODEL); + } + }); + + tx(rows); +} + + +// batched backfill — pulls `limit` candidates, sends them as one openrouter +// request (up to batchSize per call), writes the results in one tx. cuts +// per-article overhead from ~1 round-trip to ~1/batchSize round trips +async function backfillMissingEmbeddings(limit = 256, batchSize = 16) { if (embeddingBackfillRunning) { - return; + return { processed: 0, paused: false }; + } + + const apiKey = config.openRouter && config.openRouter.apiKey + ? String(config.openRouter.apiKey).trim() + : ''; + + if (!apiKey) { + return { processed: 0, paused: false }; } embeddingBackfillRunning = true; + let processed = 0; + let paused = false; try { - const rows = selectArticlesMissingEmbeddings.all(EMBEDDING_MODEL, limit); + const candidates = selectArticlesMissingEmbeddings.all(EMBEDDING_MODEL, limit); + if (candidates.length === 0) { + return { processed: 0, paused: false }; + } - for (const row of rows) { - const result = await generateAndStoreEmbedding(row.id); - if (result.shouldPauseBatch) { - break; + // pre-build inputs and drop any candidates with empty input (shouldnt + // happen since the query filters, but be defensive) + const eligible = candidates + .map((row) => ({ id: row.id, input: buildEmbeddingInput(row) })) + .filter((row) => row.input); + + for (let i = 0; i < eligible.length; i += batchSize) { + const batch = eligible.slice(i, i + batchSize); + const inputs = batch.map((row) => row.input); + + try { + const embeddings = await requestEmbedding(inputs); + + if (embeddings.length !== batch.length) { + console.error(`embedding batch length mismatch: expected ${batch.length}, got ${embeddings.length}`); + continue; + } + + const toCommit = batch.map((row, idx) => ({ id: row.id, embedding: embeddings[idx] })); + commitEmbeddingBatch(toCommit); + processed += toCommit.length; + } catch (error) { + console.error(`embedding batch failed (size ${batch.length}):`, error.message || error); + + if (error && error.status === 402) { + paused = true; + break; + } + + // on other errors, fall back to per-article so a single bad input + // doesnt poison the whole batch. slow but correct + for (const row of batch) { + try { + const single = await requestEmbedding(row.input); + commitEmbeddingBatch([{ id: row.id, embedding: single }]); + processed += 1; + } catch (singleError) { + console.error(`single embedding fallback failed for article ${row.id}:`, singleError.message || singleError); + if (singleError && singleError.status === 402) { + paused = true; + break; + } + } + } + + if (paused) break; } } } finally { embeddingBackfillRunning = false; } + + return { processed, paused }; +} + + +function hasPendingEmbeddings() { + return Boolean(db.prepare(` + SELECT 1 FROM articles a + WHERE a.title IS NOT NULL AND TRIM(a.title) != '' + AND a.description IS NOT NULL AND TRIM(a.description) != '' + AND a.content IS NOT NULL AND TRIM(a.content) != '' + AND NOT EXISTS ( + SELECT 1 FROM article_embedding_store s + WHERE s.article_id = a.id AND s.model = ? + ) + LIMIT 1 + `).get(EMBEDDING_MODEL)); } function getEmbeddingBuffer(articleId) { @@ -428,4 +540,5 @@ module.exports = { findSimilarArticles, getEmbeddingBuffer, getOrCreateQueryEmbedding, + hasPendingEmbeddings, }; diff --git a/src/ingest.js b/src/ingest.js index c33d43c..62e670e 100644 --- a/src/ingest.js +++ b/src/ingest.js @@ -7,14 +7,13 @@ const insertArticle = db.prepare(` title, description, content, - image, is_index_page, url, normalized_title, source, pub_date, ingested_at - ) VALUES (?, ?, NULL, NULL, ?, ?, ?, ?, ?, ?) + ) VALUES (?, ?, NULL, ?, ?, ?, ?, ?, ?) `); const findByUrl = db.prepare('SELECT id FROM articles WHERE url = ?'); const INDEX_PAGE_URL_HINT = /\/(category|categories|tag|tags|topic|topics|section|sections|archive|archives|authors|search)(?:\/|$)/i; diff --git a/src/routes/articles.js b/src/routes/articles.js index 298f43f..859ff43 100644 --- a/src/routes/articles.js +++ b/src/routes/articles.js @@ -45,7 +45,7 @@ function buildArticlesQuery(query) { return { sql: ` - SELECT id, title, description, content, image, ${includeEmbedding ? 'embedding,' : ''} url, normalized_title, source, pub_date, ingested_at + SELECT id, title, description, content, ${includeEmbedding ? 'embedding,' : ''} url, normalized_title, source, pub_date, ingested_at FROM articles ${whereClause} ORDER BY COALESCE(pub_date, ingested_at) DESC, id DESC @@ -72,7 +72,7 @@ function mapNeighborsToArticles(neighbors, excludeIndexPages, limit) { const placeholders = ids.map(() => '?').join(', '); const articles = db.prepare(` - SELECT id, title, description, content, image, url, normalized_title, source, pub_date, ingested_at + SELECT id, title, description, content, url, normalized_title, source, pub_date, ingested_at FROM articles WHERE id IN (${placeholders}) AND content IS NOT NULL AND content != '' @@ -144,7 +144,7 @@ async function articleRoutes(fastify) { } const article = db.prepare(` - SELECT id, title, description, content, image, url, normalized_title, source, pub_date, ingested_at + SELECT id, title, description, content, url, normalized_title, source, pub_date, ingested_at FROM articles WHERE id = ? AND content IS NOT NULL AND content != '' diff --git a/src/scheduler.js b/src/scheduler.js index 0763482..ef0d984 100644 --- a/src/scheduler.js +++ b/src/scheduler.js @@ -7,8 +7,8 @@ const { fetchEdgarArticles } = require('./sources/edgar'); const { fetchAlphaVantageArticles } = require('./sources/alphavantage'); const { fetchFinnhubArticles } = require('./sources/finnhub'); const { fetchGoogleNewsArticles } = require('./sources/googleNews'); -const { backfillMissingContent, hasPendingContent } = require('./content'); -const { backfillMissingEmbeddings } = require('./embeddings'); +const { backfillMissingContent, runBackfillWorker, hasPendingContent } = require('./content'); +const { backfillMissingEmbeddings, hasPendingEmbeddings } = require('./embeddings'); function sleep(ms) { return new Promise((resolve) => setTimeout(resolve, ms)); @@ -105,36 +105,75 @@ function startScheduler() { await runSource('googlenews', fetchGoogleNewsArticles); }; - const runContentLoop = async () => { + // each content worker pulls a disjoint partition of pending articles by + // source. they share the global plain/browser semaphores in content.js so + // bumping worker count doesnt blow past the per-pool concurrency caps + const runContentWorker = async (workerIndex, workerCount) => { while (true) { if (!hasPendingContent()) { - await sleep(60 * 1000); + await sleep(30 * 1000); continue; } try { - const concurrency = Number(config.contentBackfill?.concurrency) || 5; const perSource = Number(config.contentBackfill?.perSource) || 50; - await backfillMissingContent(perSource, concurrency); - } catch (error) { - console.error('content backfill failed:', error); - } + const batchSize = Number(config.contentBackfill?.batchSize) || 25; + const processed = await runBackfillWorker({ workerIndex, workerCount, perSource, batchSize }); - try { - await backfillMissingEmbeddings(); + // if a worker found nothing in its partition, brief sleep so we dont + // hammer the db with empty selects + if (processed === 0) { + await sleep(5000); + } } catch (error) { - console.error('embedding backfill failed:', error); + console.error(`content worker ${workerIndex} failed:`, error); + await sleep(5000); } } }; + + // dedicated embedding loop — runs independently of content fetch so slow + // openrouter calls dont block the content pipeline. batches per request + const runEmbeddingLoop = async () => { + while (true) { + if (!hasPendingEmbeddings()) { + await sleep(30 * 1000); + continue; + } + + try { + const limit = Number(config.embeddingBackfill?.perRound) || 256; + const batchSize = Number(config.embeddingBackfill?.batchSize) || 16; + const result = await backfillMissingEmbeddings(limit, batchSize); + + if (result.paused) { + // 402 means out of credits — wait longer before retrying + await sleep(5 * 60 * 1000); + } else if (result.processed === 0) { + await sleep(5000); + } + } catch (error) { + console.error('embedding backfill failed:', error); + await sleep(10000); + } + } + }; + + runRss(); runGdeltLoop(); runEdgar(); runAlphaVantage(); runFinnhub(); // runGoogleNews(); - runContentLoop(); + + const workerCount = Math.max(1, Number(config.contentBackfill?.workers) || 3); + for (let i = 0; i < workerCount; i += 1) { + runContentWorker(i, workerCount); + } + + runEmbeddingLoop(); cron.schedule(config.scheduler.rss, runRss); cron.schedule(config.scheduler.edgar, runEdgar);