refactor content fetching and embedding processes for improved concurrency and error handling

This commit is contained in:
ImBenji 2026-04-19 18:51:42 +01:00
parent b4b2fe2ac7
commit 6bf3a9282f
8 changed files with 348 additions and 157 deletions

View file

@ -43,11 +43,18 @@
"googleNews": "0 * * * *" "googleNews": "0 * * * *"
}, },
"contentBackfill": { "contentBackfill": {
"concurrency": 100, "workers": 3,
"perSource": 50 "perSource": 50,
"batchSize": 25,
"plainConcurrency": 50,
"browserConcurrency": 8
},
"embeddingBackfill": {
"perRound": 256,
"batchSize": 16
}, },
"browser": { "browser": {
"maxConcurrentPages": 25 "maxConcurrentPages": 8
}, },
"googleNews": { "googleNews": {
"queries": [ "queries": [

View file

@ -19,7 +19,6 @@
"node-cron": "^4.2.1", "node-cron": "^4.2.1",
"playwright": "^1.59.1", "playwright": "^1.59.1",
"rss-parser": "^3.13.0", "rss-parser": "^3.13.0",
"sharp": "^0.34.5",
"sqlite-vec": "^0.1.9" "sqlite-vec": "^0.1.9"
} }
} }

View file

@ -1,8 +1,6 @@
const { extractFromHtml } = require("@extractus/article-extractor"); const { extractFromHtml } = require("@extractus/article-extractor");
const sharp = require("sharp");
const db = require("./db"); const db = require("./db");
const config = require("./config"); const config = require("./config");
const { generateAndStoreEmbedding } = require("./embeddings");
const { fetchWithPolicy } = require("./http"); const { fetchWithPolicy } = require("./http");
const { getSharedBrowserSession } = require("./sources/browserCrawler"); const { getSharedBrowserSession } = require("./sources/browserCrawler");
const { validateExtractedArticle } = require("./contentValidation"); const { validateExtractedArticle } = require("./contentValidation");
@ -18,18 +16,21 @@ const {
const MAX_PLAIN_HTML_LENGTH = 1_500_000; const MAX_PLAIN_HTML_LENGTH = 1_500_000;
const PLAIN_FETCH_TIMEOUT = 12000; const PLAIN_FETCH_TIMEOUT = 12000;
const BROWSER_FETCH_TIMEOUT = 20000; const BROWSER_FETCH_TIMEOUT = 20000;
const HEAD_PRECHECK_TIMEOUT = 6000;
// retry windows for failures that look transient (validation rejected the
// page, fetch timed out). genuinely terminal failures (404, dead url) get
// a hard cap on attempt count instead
const VALIDATION_RETRY_AFTER_MS = 24 * 60 * 60 * 1000; const VALIDATION_RETRY_AFTER_MS = 24 * 60 * 60 * 1000;
const TRANSIENT_RETRY_AFTER_MS = 6 * 60 * 60 * 1000; const TRANSIENT_RETRY_AFTER_MS = 6 * 60 * 60 * 1000;
const MAX_TERMINAL_ATTEMPTS = 3; const MAX_TERMINAL_ATTEMPTS = 3;
// flaky domains get a HEAD precheck before we waste a body fetch. only kicks
// in once a domain has accumulated some failure history; pristine domains
// skip the round trip
const HEAD_PRECHECK_FAILURE_THRESHOLD = 2;
const updateArticleAssets = db.prepare(` const updateArticleAssets = db.prepare(`
UPDATE articles UPDATE articles
SET content = ?, image = ?, content_status = 'ready', content_error = NULL, SET content = ?, content_status = 'ready', content_error = NULL,
content_attempted_at = ?, content_attempt_count = content_attempt_count + 1, content_attempted_at = ?, content_attempt_count = content_attempt_count + 1,
content_retry_after = NULL content_retry_after = NULL
WHERE id = ? WHERE id = ?
@ -58,9 +59,10 @@ const markContentPending = db.prepare(`
WHERE id = ? WHERE id = ?
`); `);
// round-robin pull of articles needing content. respects content_retry_after so // pulls a partition of pending articles. workerIndex/workerCount partitions
// a freshly-rejected article doesnt get retried in the next loop iteration // by article id (deterministic) so multiple workers never see the same row.
const selectRoundRobinArticlesMissingContent = db.prepare(` // also round-robins by source so no single domain dominates the queue
const selectPartitionedArticlesMissingContent = db.prepare(`
SELECT id, url, title, description SELECT id, url, title, description
FROM ( FROM (
SELECT id, url, title, description, source, SELECT id, url, title, description, source,
@ -69,6 +71,7 @@ const selectRoundRobinArticlesMissingContent = db.prepare(`
WHERE (content IS NULL OR TRIM(content) = '') WHERE (content IS NULL OR TRIM(content) = '')
AND (content_status IS NULL OR content_status = 'pending') AND (content_status IS NULL OR content_status = 'pending')
AND (content_retry_after IS NULL OR content_retry_after <= datetime('now')) AND (content_retry_after IS NULL OR content_retry_after <= datetime('now'))
AND (id % ?) = ?
) )
WHERE rn <= ? WHERE rn <= ?
ORDER BY rn, source ORDER BY rn, source
@ -79,7 +82,37 @@ const selectAttemptCount = db.prepare(`
`); `);
let contentBackfillRunning = false; // shared semaphore — tracks both plain and browser pool occupancy across all
// workers. defining at module scope so the limits are global, not per-worker
function makeSemaphore(limit) {
let active = 0;
const waiters = [];
return {
async acquire() {
if (active < limit) {
active += 1;
return;
}
await new Promise((resolve) => waiters.push(resolve));
active += 1;
},
release() {
active = Math.max(0, active - 1);
const next = waiters.shift();
if (next) next();
},
inFlight() {
return active;
},
};
}
const PLAIN_CONCURRENCY = Number(config.contentBackfill?.plainConcurrency) || 50;
const BROWSER_CONCURRENCY = Number(config.contentBackfill?.browserConcurrency) || 8;
const plainSemaphore = makeSemaphore(PLAIN_CONCURRENCY);
const browserSemaphore = makeSemaphore(BROWSER_CONCURRENCY);
function getErrorStatus(error) { function getErrorStatus(error) {
@ -91,11 +124,6 @@ function getErrorStatus(error) {
return match ? Number(match[1]) : null; return match ? Number(match[1]) : null;
} }
function getErrorMessage(error, fallback) {
const message = String((error && error.message) || fallback || "").trim();
return message ? message.slice(0, 500) : null;
}
function nowIso() { function nowIso() {
return new Date().toISOString(); return new Date().toISOString();
} }
@ -105,74 +133,66 @@ function futureIso(ms) {
} }
async function fetchCompressedImage(url) { // cheap HEAD check before pulling the body. only used on domains we already
const response = await fetchWithPolicy(url, { // know are unreliable. if HEAD says 404/410/451, skip the body fetch entirely
retries: 1, async function headPrecheck(url) {
headers: { Accept: "image/*" }, try {
}); const response = await fetchWithPolicy(url, {
method: "HEAD",
if (!response.ok) { timeout: HEAD_PRECHECK_TIMEOUT,
const error = new Error(`image request failed with ${response.status}`); retries: 0,
error.status = response.status; });
throw error; return { status: response.status, finalUrl: response.url || url };
} catch (error) {
return { status: getErrorStatus(error), error };
} }
const contentType = String(response.headers.get("content-type") || "").toLowerCase();
if (!contentType.startsWith("image/")) {
throw new Error(`image request returned ${contentType || "unknown content-type"}`);
}
const input = Buffer.from(await response.arrayBuffer());
if (input.length === 0) {
throw new Error("image request returned an empty body");
}
const output = await sharp(input)
.rotate()
.resize({ width: 320, height: 320, fit: "inside", withoutEnlargement: true })
.webp({ quality: 25 })
.toBuffer();
return output.toString("base64");
} }
// plain http fetch — no js execution. fast, low memory, but fails on
// js-rendered sites and gets blocked by cloudflare more often
async function fetchPlainHtml(url) { async function fetchPlainHtml(url) {
const response = await fetchWithPolicy(url, { await plainSemaphore.acquire();
timeout: PLAIN_FETCH_TIMEOUT, try {
retries: 1, const response = await fetchWithPolicy(url, {
}); timeout: PLAIN_FETCH_TIMEOUT,
retries: 1,
});
if (!response.ok) { if (!response.ok) {
const error = new Error(`plain fetch returned ${response.status}`); const error = new Error(`plain fetch returned ${response.status}`);
error.status = response.status; error.status = response.status;
throw error; throw error;
}
const contentType = String(response.headers.get("content-type") || "").toLowerCase();
if (contentType && !contentType.includes("html") && !contentType.includes("xml")) {
throw new Error(`plain fetch returned non-html content-type: ${contentType}`);
}
const text = await response.text();
return {
html: text.slice(0, MAX_PLAIN_HTML_LENGTH),
finalUrl: response.url || url,
};
} finally {
plainSemaphore.release();
} }
const contentType = String(response.headers.get("content-type") || "").toLowerCase();
if (contentType && !contentType.includes("html") && !contentType.includes("xml")) {
throw new Error(`plain fetch returned non-html content-type: ${contentType}`);
}
const text = await response.text();
return {
html: text.slice(0, MAX_PLAIN_HTML_LENGTH),
finalUrl: response.url || url,
};
} }
async function fetchBrowserHtml(url) { async function fetchBrowserHtml(url) {
const maxConcurrentPages = Number(config.browser?.maxConcurrentPages) || 25; await browserSemaphore.acquire();
const session = await getSharedBrowserSession({ try {
requestTimeout: BROWSER_FETCH_TIMEOUT, const maxConcurrentPages = Number(config.browser?.maxConcurrentPages) || 8;
maxConcurrentPages, const session = await getSharedBrowserSession({
}); requestTimeout: BROWSER_FETCH_TIMEOUT,
maxConcurrentPages,
});
const html = await session.fetchRenderedHtml(url, { timeout: BROWSER_FETCH_TIMEOUT }); const html = await session.fetchRenderedHtml(url, { timeout: BROWSER_FETCH_TIMEOUT });
return { html, finalUrl: url }; return { html, finalUrl: url };
} finally {
browserSemaphore.release();
}
} }
@ -183,10 +203,6 @@ function stripHtmlContent(value) {
} }
// runs fetch → extract → validate. returns { ok, article, html, finalUrl, reason }
// where article has been post-processed (content stripped of html). on failure,
// reason explains what tripped — used both for logging and for the per-domain
// policy update
async function attemptFetch(url, fetcher) { async function attemptFetch(url, fetcher) {
let html; let html;
let finalUrl; let finalUrl;
@ -231,11 +247,29 @@ function getAttemptCount(id) {
} }
// shouldPrecheck: domains with at least N consecutive plain or browser failures
// in their policy entry get a HEAD check first. we read the row directly here
// rather than threading through the policy module
const selectFailureCounts = db.prepare(`
SELECT consecutive_plain_failures, consecutive_browser_failures
FROM domain_fetch_policy WHERE domain = ?
`);
function shouldPrecheck(url) {
try {
const domain = new URL(url).hostname.toLowerCase();
const row = selectFailureCounts.get(domain);
if (!row) return false;
return (row.consecutive_plain_failures + row.consecutive_browser_failures) >= HEAD_PRECHECK_FAILURE_THRESHOLD;
} catch {
return false;
}
}
async function fetchAndStoreContent(id, url, storedTitle, storedDescription) { async function fetchAndStoreContent(id, url, storedTitle, storedDescription) {
const policy = getEffectivePolicy(url); const policy = getEffectivePolicy(url);
// domains we know are blocked — skip the fetch entirely until ttl expires.
// the row stays pending so it'll get picked up after the policy resets
if (policy.policy === "blocked") { if (policy.policy === "blocked") {
markContentPending.run( markContentPending.run(
`domain blocked by policy`, `domain blocked by policy`,
@ -246,6 +280,16 @@ async function fetchAndStoreContent(id, url, storedTitle, storedDescription) {
return; return;
} }
// HEAD precheck for known-flaky domains. if it returns 404/410/451 we mark
// terminal-failed without burning a body fetch. transient errors fall through
if (shouldPrecheck(url)) {
const head = await headPrecheck(url);
if (head.status === 404 || head.status === 410 || head.status === 451) {
markContentFailed.run(`head ${head.status}`, nowIso(), id);
return;
}
}
const tryPlainFirst = policy.policy === "auto" || policy.policy === "plain_only"; const tryPlainFirst = policy.policy === "auto" || policy.policy === "plain_only";
let plainResult = null; let plainResult = null;
let browserResult = null; let browserResult = null;
@ -256,15 +300,12 @@ async function fetchAndStoreContent(id, url, storedTitle, storedDescription) {
if (plainResult.ok) { if (plainResult.ok) {
recordPlainSuccess(url); recordPlainSuccess(url);
await commitArticle(id, url, plainResult, storedTitle, storedDescription); commitArticle(id, url, plainResult, storedTitle, storedDescription);
return; return;
} }
recordPlainFailure(url); recordPlainFailure(url);
// hard 4xx (other than 408/429) on plain — domain might serve the same to
// browser, but try anyway since it's cheap once the policy hasnt flipped yet.
// 408/429/5xx defer for retry
const status = plainResult.error && getErrorStatus(plainResult.error); const status = plainResult.error && getErrorStatus(plainResult.error);
if (status === 408 || status === 429 || (status && status >= 500)) { if (status === 408 || status === 429 || (status && status >= 500)) {
markContentPending.run( markContentPending.run(
@ -277,8 +318,6 @@ async function fetchAndStoreContent(id, url, storedTitle, storedDescription) {
} }
} }
// policy.policy === "plain_only" means we just tried plain and failed —
// dont escalate to browser, the operator (or earlier domain memory) said no
if (policy.policy === "plain_only") { if (policy.policy === "plain_only") {
recordValidationFailure(id, plainResult); recordValidationFailure(id, plainResult);
return; return;
@ -289,7 +328,7 @@ async function fetchAndStoreContent(id, url, storedTitle, storedDescription) {
if (browserResult.ok) { if (browserResult.ok) {
recordBrowserSuccess(url); recordBrowserSuccess(url);
await commitArticle(id, url, browserResult, storedTitle, storedDescription); commitArticle(id, url, browserResult, storedTitle, storedDescription);
return; return;
} }
@ -306,9 +345,6 @@ async function fetchAndStoreContent(id, url, storedTitle, storedDescription) {
return; return;
} }
// both paths exhausted (or browser-only path failed). decide between
// pending-with-retry and terminal failed based on attempt count and
// whether the validator thought it was retryable
recordValidationFailure(id, browserResult); recordValidationFailure(id, browserResult);
} }
@ -318,7 +354,6 @@ function recordValidationFailure(id, result) {
const retryable = result?.retryable !== false; const retryable = result?.retryable !== false;
const attempts = getAttemptCount(id); const attempts = getAttemptCount(id);
// hard fetch errors with no retryable signal — terminal after a few tries
if (!retryable || attempts + 1 >= MAX_TERMINAL_ATTEMPTS) { if (!retryable || attempts + 1 >= MAX_TERMINAL_ATTEMPTS) {
markContentFailed.run(reason, nowIso(), id); markContentFailed.run(reason, nowIso(), id);
return; return;
@ -328,11 +363,10 @@ function recordValidationFailure(id, result) {
} }
async function commitArticle(id, url, result, storedTitle, storedDescription) { function commitArticle(id, url, result, storedTitle, storedDescription) {
const { article, finalUrl } = result; const { article } = result;
const content = article.content || null; const content = article.content || null;
// if stored title looks like a raw url, replace with extracted one
const titleLooksLikeUrl = storedTitle && /^https?:\/\//i.test(storedTitle.trim()); const titleLooksLikeUrl = storedTitle && /^https?:\/\//i.test(storedTitle.trim());
if (titleLooksLikeUrl) { if (titleLooksLikeUrl) {
const scrapedTitle = typeof article.title === "string" ? article.title.trim() : null; const scrapedTitle = typeof article.title === "string" ? article.title.trim() : null;
@ -342,47 +376,33 @@ async function commitArticle(id, url, result, storedTitle, storedDescription) {
} }
} }
let image = null; updateArticleAssets.run(content, nowIso(), id);
if (article.image) {
try {
image = await fetchCompressedImage(article.image);
} catch (error) {
const status = getErrorStatus(error);
if (status === 401 || status === 403 || status === 404 || status === 429) {
console.warn(`image fetch skipped for ${url}: upstream returned ${status}`);
} else {
console.error(`image fetch failed for ${url}:`, error.message || error);
}
}
}
updateArticleAssets.run(content, image, nowIso(), id); // embedding generation is no longer kicked off here — runEmbeddingLoop
// in scheduler.js batches them in its own pipeline. that decouples slow
try { // openrouter calls from content fetch throughput
await generateAndStoreEmbedding(id);
} catch (error) {
console.error(`embedding failed for article ${id}:`, error.message || error);
}
} }
async function backfillMissingContent(perSource = 50, concurrency = 5) { // runs one worker pass — pulls its partition slice, fires N concurrent fetches
if (contentBackfillRunning) { // from a single backfill loop. multiple workers share the plain/browser
return; // semaphores so total concurrency stays bounded regardless of worker count
async function runBackfillWorker({ workerIndex, workerCount, perSource, batchSize }) {
const rows = selectPartitionedArticlesMissingContent.all(workerCount, workerIndex, perSource);
if (rows.length === 0) return 0;
// dispatch in chunks of batchSize so we don't allocate thousands of unawaited
// promises at once. the semaphores throttle inside fetchAndStoreContent
for (let i = 0; i < rows.length; i += batchSize) {
const batch = rows.slice(i, i + batchSize);
await Promise.all(batch.map((row) =>
fetchAndStoreContent(row.id, row.url, row.title, row.description).catch((error) => {
console.error(`backfill worker ${workerIndex} failed on ${row.url}:`, error.message || error);
})
));
} }
contentBackfillRunning = true; return rows.length;
try {
const rows = selectRoundRobinArticlesMissingContent.all(perSource);
for (let i = 0; i < rows.length; i += concurrency) {
const batch = rows.slice(i, i + concurrency);
await Promise.all(batch.map((row) => fetchAndStoreContent(row.id, row.url, row.title, row.description)));
}
} finally {
contentBackfillRunning = false;
}
} }
@ -397,8 +417,20 @@ function hasPendingContent() {
} }
// kept for backwards compat with scheduler/runAllIngestions one-shot runs
async function backfillMissingContent(perSource = 50, concurrency = 50) {
await runBackfillWorker({
workerIndex: 0,
workerCount: 1,
perSource,
batchSize: concurrency,
});
}
module.exports = { module.exports = {
fetchAndStoreContent, fetchAndStoreContent,
backfillMissingContent, backfillMissingContent,
runBackfillWorker,
hasPendingContent, hasPendingContent,
}; };

View file

@ -9,6 +9,8 @@ sqliteVec.load(db);
db.pragma('journal_mode = WAL'); db.pragma('journal_mode = WAL');
// the image column is retained as a no-op for backwards compat with old rows.
// new code never writes to it; drop in a future migration if you really want
db.exec(` db.exec(`
CREATE TABLE IF NOT EXISTS articles ( CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,

View file

@ -57,7 +57,7 @@ const nearestNeighbors = db.prepare(`
`); `);
const selectArticlesMissingEmbeddings = db.prepare(` const selectArticlesMissingEmbeddings = db.prepare(`
SELECT a.id SELECT a.id, a.title, a.description, a.content
FROM articles a FROM articles a
WHERE a.title IS NOT NULL WHERE a.title IS NOT NULL
AND TRIM(a.title) != '' AND TRIM(a.title) != ''
@ -254,6 +254,9 @@ function normalizeQuery(input) {
.replace(/\s+/g, ' '); .replace(/\s+/g, ' ');
} }
// supports both single string and array input. openrouter follows the openai
// embeddings contract — when input is an array, payload.data is an array of
// {index, embedding} objects in the same order
async function requestEmbedding(input) { async function requestEmbedding(input) {
const response = await fetch('https://openrouter.ai/api/v1/embeddings', { const response = await fetch('https://openrouter.ai/api/v1/embeddings', {
method: 'POST', method: 'POST',
@ -286,7 +289,24 @@ async function requestEmbedding(input) {
} }
const payload = await response.json(); const payload = await response.json();
const embedding = payload && payload.data && payload.data[0] && payload.data[0].embedding; const data = payload && payload.data;
if (!Array.isArray(data) || data.length === 0) {
throw new Error('invalid embedding response: missing data');
}
if (Array.isArray(input)) {
// sort by index to be safe — some providers return out-of-order
const sorted = [...data].sort((a, b) => (a.index ?? 0) - (b.index ?? 0));
return sorted.map((row) => {
if (!Array.isArray(row.embedding) || row.embedding.length === 0) {
throw new Error(`invalid embedding at index ${row.index}`);
}
return row.embedding;
});
}
const embedding = data[0] && data[0].embedding;
if (!Array.isArray(embedding) || embedding.length === 0) { if (!Array.isArray(embedding) || embedding.length === 0) {
throw new Error(`invalid embedding in response: ${Array.isArray(embedding) ? 'empty' : 'missing'}`); throw new Error(`invalid embedding in response: ${Array.isArray(embedding) ? 'empty' : 'missing'}`);
} }
@ -357,25 +377,117 @@ async function generateAndStoreEmbedding(id) {
} }
} }
async function backfillMissingEmbeddings(limit = 100) { // writes one batch of {id, embedding} pairs in a single transaction. the
// vec0 insert pads to 8192 dims so models with fewer dims still index
function commitEmbeddingBatch(rows) {
const tx = db.transaction((entries) => {
for (const entry of entries) {
const buffer = serializeEmbedding(entry.embedding);
upsertEmbeddingStore.run(entry.id, EMBEDDING_MODEL, buffer);
deleteEmbedding.run(BigInt(entry.id));
insertEmbedding.run(BigInt(entry.id), padEmbeddingForVec0(entry.embedding));
upsertEmbeddingMeta.run(entry.id, EMBEDDING_MODEL);
}
});
tx(rows);
}
// batched backfill — pulls `limit` candidates, sends them as one openrouter
// request (up to batchSize per call), writes the results in one tx. cuts
// per-article overhead from ~1 round-trip to ~1/batchSize round trips
async function backfillMissingEmbeddings(limit = 256, batchSize = 16) {
if (embeddingBackfillRunning) { if (embeddingBackfillRunning) {
return; return { processed: 0, paused: false };
}
const apiKey = config.openRouter && config.openRouter.apiKey
? String(config.openRouter.apiKey).trim()
: '';
if (!apiKey) {
return { processed: 0, paused: false };
} }
embeddingBackfillRunning = true; embeddingBackfillRunning = true;
let processed = 0;
let paused = false;
try { try {
const rows = selectArticlesMissingEmbeddings.all(EMBEDDING_MODEL, limit); const candidates = selectArticlesMissingEmbeddings.all(EMBEDDING_MODEL, limit);
if (candidates.length === 0) {
return { processed: 0, paused: false };
}
for (const row of rows) { // pre-build inputs and drop any candidates with empty input (shouldnt
const result = await generateAndStoreEmbedding(row.id); // happen since the query filters, but be defensive)
if (result.shouldPauseBatch) { const eligible = candidates
break; .map((row) => ({ id: row.id, input: buildEmbeddingInput(row) }))
.filter((row) => row.input);
for (let i = 0; i < eligible.length; i += batchSize) {
const batch = eligible.slice(i, i + batchSize);
const inputs = batch.map((row) => row.input);
try {
const embeddings = await requestEmbedding(inputs);
if (embeddings.length !== batch.length) {
console.error(`embedding batch length mismatch: expected ${batch.length}, got ${embeddings.length}`);
continue;
}
const toCommit = batch.map((row, idx) => ({ id: row.id, embedding: embeddings[idx] }));
commitEmbeddingBatch(toCommit);
processed += toCommit.length;
} catch (error) {
console.error(`embedding batch failed (size ${batch.length}):`, error.message || error);
if (error && error.status === 402) {
paused = true;
break;
}
// on other errors, fall back to per-article so a single bad input
// doesnt poison the whole batch. slow but correct
for (const row of batch) {
try {
const single = await requestEmbedding(row.input);
commitEmbeddingBatch([{ id: row.id, embedding: single }]);
processed += 1;
} catch (singleError) {
console.error(`single embedding fallback failed for article ${row.id}:`, singleError.message || singleError);
if (singleError && singleError.status === 402) {
paused = true;
break;
}
}
}
if (paused) break;
} }
} }
} finally { } finally {
embeddingBackfillRunning = false; embeddingBackfillRunning = false;
} }
return { processed, paused };
}
function hasPendingEmbeddings() {
return Boolean(db.prepare(`
SELECT 1 FROM articles a
WHERE a.title IS NOT NULL AND TRIM(a.title) != ''
AND a.description IS NOT NULL AND TRIM(a.description) != ''
AND a.content IS NOT NULL AND TRIM(a.content) != ''
AND NOT EXISTS (
SELECT 1 FROM article_embedding_store s
WHERE s.article_id = a.id AND s.model = ?
)
LIMIT 1
`).get(EMBEDDING_MODEL));
} }
function getEmbeddingBuffer(articleId) { function getEmbeddingBuffer(articleId) {
@ -428,4 +540,5 @@ module.exports = {
findSimilarArticles, findSimilarArticles,
getEmbeddingBuffer, getEmbeddingBuffer,
getOrCreateQueryEmbedding, getOrCreateQueryEmbedding,
hasPendingEmbeddings,
}; };

View file

@ -7,14 +7,13 @@ const insertArticle = db.prepare(`
title, title,
description, description,
content, content,
image,
is_index_page, is_index_page,
url, url,
normalized_title, normalized_title,
source, source,
pub_date, pub_date,
ingested_at ingested_at
) VALUES (?, ?, NULL, NULL, ?, ?, ?, ?, ?, ?) ) VALUES (?, ?, NULL, ?, ?, ?, ?, ?, ?)
`); `);
const findByUrl = db.prepare('SELECT id FROM articles WHERE url = ?'); const findByUrl = db.prepare('SELECT id FROM articles WHERE url = ?');
const INDEX_PAGE_URL_HINT = /\/(category|categories|tag|tags|topic|topics|section|sections|archive|archives|authors|search)(?:\/|$)/i; const INDEX_PAGE_URL_HINT = /\/(category|categories|tag|tags|topic|topics|section|sections|archive|archives|authors|search)(?:\/|$)/i;

View file

@ -45,7 +45,7 @@ function buildArticlesQuery(query) {
return { return {
sql: ` sql: `
SELECT id, title, description, content, image, ${includeEmbedding ? 'embedding,' : ''} url, normalized_title, source, pub_date, ingested_at SELECT id, title, description, content, ${includeEmbedding ? 'embedding,' : ''} url, normalized_title, source, pub_date, ingested_at
FROM articles FROM articles
${whereClause} ${whereClause}
ORDER BY COALESCE(pub_date, ingested_at) DESC, id DESC ORDER BY COALESCE(pub_date, ingested_at) DESC, id DESC
@ -72,7 +72,7 @@ function mapNeighborsToArticles(neighbors, excludeIndexPages, limit) {
const placeholders = ids.map(() => '?').join(', '); const placeholders = ids.map(() => '?').join(', ');
const articles = db.prepare(` const articles = db.prepare(`
SELECT id, title, description, content, image, url, normalized_title, source, pub_date, ingested_at SELECT id, title, description, content, url, normalized_title, source, pub_date, ingested_at
FROM articles FROM articles
WHERE id IN (${placeholders}) WHERE id IN (${placeholders})
AND content IS NOT NULL AND content != '' AND content IS NOT NULL AND content != ''
@ -144,7 +144,7 @@ async function articleRoutes(fastify) {
} }
const article = db.prepare(` const article = db.prepare(`
SELECT id, title, description, content, image, url, normalized_title, source, pub_date, ingested_at SELECT id, title, description, content, url, normalized_title, source, pub_date, ingested_at
FROM articles FROM articles
WHERE id = ? WHERE id = ?
AND content IS NOT NULL AND content != '' AND content IS NOT NULL AND content != ''

View file

@ -7,8 +7,8 @@ const { fetchEdgarArticles } = require('./sources/edgar');
const { fetchAlphaVantageArticles } = require('./sources/alphavantage'); const { fetchAlphaVantageArticles } = require('./sources/alphavantage');
const { fetchFinnhubArticles } = require('./sources/finnhub'); const { fetchFinnhubArticles } = require('./sources/finnhub');
const { fetchGoogleNewsArticles } = require('./sources/googleNews'); const { fetchGoogleNewsArticles } = require('./sources/googleNews');
const { backfillMissingContent, hasPendingContent } = require('./content'); const { backfillMissingContent, runBackfillWorker, hasPendingContent } = require('./content');
const { backfillMissingEmbeddings } = require('./embeddings'); const { backfillMissingEmbeddings, hasPendingEmbeddings } = require('./embeddings');
function sleep(ms) { function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms)); return new Promise((resolve) => setTimeout(resolve, ms));
@ -105,36 +105,75 @@ function startScheduler() {
await runSource('googlenews', fetchGoogleNewsArticles); await runSource('googlenews', fetchGoogleNewsArticles);
}; };
const runContentLoop = async () => { // each content worker pulls a disjoint partition of pending articles by
// source. they share the global plain/browser semaphores in content.js so
// bumping worker count doesnt blow past the per-pool concurrency caps
const runContentWorker = async (workerIndex, workerCount) => {
while (true) { while (true) {
if (!hasPendingContent()) { if (!hasPendingContent()) {
await sleep(60 * 1000); await sleep(30 * 1000);
continue; continue;
} }
try { try {
const concurrency = Number(config.contentBackfill?.concurrency) || 5;
const perSource = Number(config.contentBackfill?.perSource) || 50; const perSource = Number(config.contentBackfill?.perSource) || 50;
await backfillMissingContent(perSource, concurrency); const batchSize = Number(config.contentBackfill?.batchSize) || 25;
} catch (error) { const processed = await runBackfillWorker({ workerIndex, workerCount, perSource, batchSize });
console.error('content backfill failed:', error);
}
try { // if a worker found nothing in its partition, brief sleep so we dont
await backfillMissingEmbeddings(); // hammer the db with empty selects
if (processed === 0) {
await sleep(5000);
}
} catch (error) { } catch (error) {
console.error('embedding backfill failed:', error); console.error(`content worker ${workerIndex} failed:`, error);
await sleep(5000);
} }
} }
}; };
// dedicated embedding loop — runs independently of content fetch so slow
// openrouter calls dont block the content pipeline. batches per request
const runEmbeddingLoop = async () => {
while (true) {
if (!hasPendingEmbeddings()) {
await sleep(30 * 1000);
continue;
}
try {
const limit = Number(config.embeddingBackfill?.perRound) || 256;
const batchSize = Number(config.embeddingBackfill?.batchSize) || 16;
const result = await backfillMissingEmbeddings(limit, batchSize);
if (result.paused) {
// 402 means out of credits — wait longer before retrying
await sleep(5 * 60 * 1000);
} else if (result.processed === 0) {
await sleep(5000);
}
} catch (error) {
console.error('embedding backfill failed:', error);
await sleep(10000);
}
}
};
runRss(); runRss();
runGdeltLoop(); runGdeltLoop();
runEdgar(); runEdgar();
runAlphaVantage(); runAlphaVantage();
runFinnhub(); runFinnhub();
// runGoogleNews(); // runGoogleNews();
runContentLoop();
const workerCount = Math.max(1, Number(config.contentBackfill?.workers) || 3);
for (let i = 0; i < workerCount; i += 1) {
runContentWorker(i, workerCount);
}
runEmbeddingLoop();
cron.schedule(config.scheduler.rss, runRss); cron.schedule(config.scheduler.rss, runRss);
cron.schedule(config.scheduler.edgar, runEdgar); cron.schedule(config.scheduler.edgar, runEdgar);