const { extractFromHtml } = require("@extractus/article-extractor");
const db = require("./db");
const config = require("./config");
const { fetchWithPolicy } = require("./http");
const { getSharedBrowserSession } = require("./sources/browserCrawler");
const { validateExtractedArticle } = require("./contentValidation");
const {
getEffectivePolicy,
recordPlainSuccess,
recordPlainFailure,
recordBrowserSuccess,
recordBrowserFailure,
} = require("./domainPolicy");
const MAX_PLAIN_HTML_LENGTH = 1_500_000;
const PLAIN_FETCH_TIMEOUT = 12000;
const BROWSER_FETCH_TIMEOUT = 20000;
const HEAD_PRECHECK_TIMEOUT = 6000;
const VALIDATION_RETRY_AFTER_MS = 24 * 60 * 60 * 1000;
const TRANSIENT_RETRY_AFTER_MS = 6 * 60 * 60 * 1000;
const MAX_TERMINAL_ATTEMPTS = 3;
// flaky domains get a HEAD precheck before we waste a body fetch. only kicks
// in once a domain has accumulated some failure history; pristine domains
// skip the round trip
const HEAD_PRECHECK_FAILURE_THRESHOLD = 2;
const updateArticleAssets = db.prepare(`
UPDATE articles
SET content = ?, content_status = 'ready', content_error = NULL,
content_attempted_at = ?, content_attempt_count = content_attempt_count + 1,
content_retry_after = NULL
WHERE id = ?
`);
const updateArticleTitleDescription = db.prepare(`
UPDATE articles
SET title = ?, description = ?
WHERE id = ?
`);
const markContentSkipped = db.prepare(`
UPDATE articles
SET content_status = 'skipped', content_error = ?, content_attempted_at = ?,
content_attempt_count = content_attempt_count + 1, content_retry_after = NULL
WHERE id = ?
`);
const markContentFailed = db.prepare(`
UPDATE articles
SET content_status = 'failed', content_error = ?, content_attempted_at = ?,
content_attempt_count = content_attempt_count + 1, content_retry_after = NULL
WHERE id = ?
`);
const markContentPending = db.prepare(`
UPDATE articles
SET content_status = 'pending', content_error = ?, content_attempted_at = ?,
content_attempt_count = content_attempt_count + 1, content_retry_after = ?
WHERE id = ?
`);
// pulls a partition of pending articles. workerIndex/workerCount partitions
// by article id (deterministic) so multiple workers never see the same row.
// also round-robins by source so no single domain dominates the queue
const selectPartitionedArticlesMissingContent = db.prepare(`
SELECT id, url, title, description
FROM (
SELECT id, url, title, description, source, pub_date_effective,
ROW_NUMBER() OVER (PARTITION BY source ORDER BY pub_date_effective DESC, id DESC) AS rn
FROM articles
WHERE (content IS NULL OR TRIM(content) = '')
AND (content_status IS NULL OR content_status = 'pending')
AND (content_retry_after IS NULL OR content_retry_after <= datetime('now'))
AND (id % ?) = ?
)
WHERE rn <= ?
ORDER BY pub_date_effective DESC, rn, source
`);
const selectAttemptCount = db.prepare(`
SELECT content_attempt_count AS attempts FROM articles WHERE id = ?
`);
// shared semaphore — tracks both plain and browser pool occupancy across all
// workers. defining at module scope so the limits are global, not per-worker
function makeSemaphore(limit) {
let active = 0;
const waiters = [];
return {
async acquire() {
if (active < limit) {
active += 1;
return;
}
await new Promise((resolve) => waiters.push(resolve));
active += 1;
},
release() {
active = Math.max(0, active - 1);
const next = waiters.shift();
if (next) next();
},
inFlight() {
return active;
},
};
}
const PLAIN_CONCURRENCY = Number(config.contentBackfill?.plainConcurrency) || 50;
const BROWSER_CONCURRENCY = Number(config.contentBackfill?.browserConcurrency) || 8;
const plainSemaphore = makeSemaphore(PLAIN_CONCURRENCY);
const browserSemaphore = makeSemaphore(BROWSER_CONCURRENCY);
function getErrorStatus(error) {
if (error && Number.isInteger(error.status)) {
return error.status;
}
const match = String((error && error.message) || "").match(/\b(401|403|404|408|429|5\d\d)\b/);
return match ? Number(match[1]) : null;
}
function nowIso() {
return new Date().toISOString();
}
function futureIso(ms) {
return new Date(Date.now() + ms).toISOString();
}
// cheap HEAD check before pulling the body. only used on domains we already
// know are unreliable. if HEAD says 404/410/451, skip the body fetch entirely
async function headPrecheck(url) {
try {
const response = await fetchWithPolicy(url, {
method: "HEAD",
timeout: HEAD_PRECHECK_TIMEOUT,
retries: 0,
});
return { status: response.status, finalUrl: response.url || url };
} catch (error) {
return { status: getErrorStatus(error), error };
}
}
async function fetchPlainHtml(url) {
await plainSemaphore.acquire();
try {
const response = await fetchWithPolicy(url, {
timeout: PLAIN_FETCH_TIMEOUT,
retries: 1,
});
if (!response.ok) {
const error = new Error(`plain fetch returned ${response.status}`);
error.status = response.status;
throw error;
}
const contentType = String(response.headers.get("content-type") || "").toLowerCase();
if (contentType && !contentType.includes("html") && !contentType.includes("xml")) {
throw new Error(`plain fetch returned non-html content-type: ${contentType}`);
}
const text = await response.text();
return {
html: text.slice(0, MAX_PLAIN_HTML_LENGTH),
finalUrl: response.url || url,
};
} finally {
plainSemaphore.release();
}
}
async function fetchBrowserHtml(url) {
await browserSemaphore.acquire();
try {
const maxConcurrentPages = Number(config.browser?.maxConcurrentPages) || 8;
const session = await getSharedBrowserSession({
requestTimeout: BROWSER_FETCH_TIMEOUT,
maxConcurrentPages,
});
const html = await session.fetchRenderedHtml(url, { timeout: BROWSER_FETCH_TIMEOUT });
return { html, finalUrl: url };
} finally {
browserSemaphore.release();
}
}
function stripHtmlContent(value) {
if (typeof value !== "string") return null;
const stripped = value.replace(/<[^>]+>/g, " ").replace(/\s+/g, " ").trim();
return stripped || null;
}
async function attemptFetch(url, fetcher) {
let html;
let finalUrl;
try {
const result = await fetcher(url);
html = result.html;
finalUrl = result.finalUrl;
} catch (error) {
return { ok: false, reason: `fetch-error:${error.message || "unknown"}`, error };
}
if (!html) {
return { ok: false, reason: "empty-html" };
}
let extracted;
try {
extracted = await extractFromHtml(html, finalUrl || url);
} catch (error) {
return { ok: false, reason: `extractor-error:${error.message || "unknown"}` };
}
if (extracted) {
extracted = {
...extracted,
content: stripHtmlContent(extracted.content),
};
}
const validation = validateExtractedArticle({ article: extracted, html, finalUrl });
if (!validation.ok) {
return { ok: false, reason: validation.reason, retryable: validation.retryable, html, finalUrl };
}
return { ok: true, article: extracted, html, finalUrl };
}
function getAttemptCount(id) {
const row = selectAttemptCount.get(id);
return row ? row.attempts || 0 : 0;
}
// shouldPrecheck: domains with at least N consecutive plain or browser failures
// in their policy entry get a HEAD check first. we read the row directly here
// rather than threading through the policy module
const selectFailureCounts = db.prepare(`
SELECT consecutive_plain_failures, consecutive_browser_failures
FROM domain_fetch_policy WHERE domain = ?
`);
function shouldPrecheck(url) {
try {
const domain = new URL(url).hostname.toLowerCase();
const row = selectFailureCounts.get(domain);
if (!row) return false;
return (row.consecutive_plain_failures + row.consecutive_browser_failures) >= HEAD_PRECHECK_FAILURE_THRESHOLD;
} catch {
return false;
}
}
async function fetchAndStoreContent(id, url, storedTitle, storedDescription) {
const policy = getEffectivePolicy(url);
if (policy.policy === "blocked") {
markContentPending.run(
`domain blocked by policy`,
nowIso(),
futureIso(TRANSIENT_RETRY_AFTER_MS),
id
);
return;
}
// HEAD precheck for known-flaky domains. if it returns 404/410/451 we mark
// terminal-failed without burning a body fetch. transient errors fall through
if (shouldPrecheck(url)) {
const head = await headPrecheck(url);
if (head.status === 404 || head.status === 410 || head.status === 451) {
markContentFailed.run(`head ${head.status}`, nowIso(), id);
return;
}
}
const tryPlainFirst = policy.policy === "auto" || policy.policy === "plain_only";
let plainResult = null;
let browserResult = null;
if (tryPlainFirst) {
plainResult = await attemptFetch(url, fetchPlainHtml);
if (plainResult.ok) {
recordPlainSuccess(url);
commitArticle(id, url, plainResult, storedTitle, storedDescription);
return;
}
recordPlainFailure(url);
const status = plainResult.error && getErrorStatus(plainResult.error);
if (status === 408 || status === 429 || (status && status >= 500)) {
markContentPending.run(
`plain ${status}`,
nowIso(),
futureIso(TRANSIENT_RETRY_AFTER_MS),
id
);
return;
}
}
if (policy.policy === "plain_only") {
recordValidationFailure(id, plainResult);
return;
}
browserResult = await attemptFetch(url, fetchBrowserHtml);
if (browserResult.ok) {
recordBrowserSuccess(url);
commitArticle(id, url, browserResult, storedTitle, storedDescription);
return;
}
recordBrowserFailure(url);
const browserStatus = browserResult.error && getErrorStatus(browserResult.error);
if (browserStatus === 408 || browserStatus === 429 || (browserStatus && browserStatus >= 500)) {
markContentPending.run(
`browser ${browserStatus}`,
nowIso(),
futureIso(TRANSIENT_RETRY_AFTER_MS),
id
);
return;
}
recordValidationFailure(id, browserResult);
}
function recordValidationFailure(id, result) {
const reason = result?.reason || "unknown";
const retryable = result?.retryable !== false;
const attempts = getAttemptCount(id);
if (!retryable || attempts + 1 >= MAX_TERMINAL_ATTEMPTS) {
markContentFailed.run(reason, nowIso(), id);
return;
}
markContentPending.run(reason, nowIso(), futureIso(VALIDATION_RETRY_AFTER_MS), id);
}
function commitArticle(id, url, result, storedTitle, storedDescription) {
const { article } = result;
const content = article.content || null;
const titleLooksLikeUrl = storedTitle && /^https?:\/\//i.test(storedTitle.trim());
if (titleLooksLikeUrl) {
const scrapedTitle = typeof article.title === "string" ? article.title.trim() : null;
const scrapedDescription = typeof article.description === "string" ? article.description.trim() : null;
if (scrapedTitle) {
updateArticleTitleDescription.run(scrapedTitle, scrapedDescription || storedDescription || null, id);
}
}
updateArticleAssets.run(content, nowIso(), id);
// embedding generation is no longer kicked off here — runEmbeddingLoop
// in scheduler.js batches them in its own pipeline. that decouples slow
// openrouter calls from content fetch throughput
}
// runs one worker pass — pulls its partition slice, fires N concurrent fetches
// from a single backfill loop. multiple workers share the plain/browser
// semaphores so total concurrency stays bounded regardless of worker count
async function runBackfillWorker({ workerIndex, workerCount, perSource, batchSize }) {
const rows = selectPartitionedArticlesMissingContent.all(workerCount, workerIndex, perSource);
if (rows.length === 0) return 0;
// dispatch in chunks of batchSize so we don't allocate thousands of unawaited
// promises at once. the semaphores throttle inside fetchAndStoreContent
for (let i = 0; i < rows.length; i += batchSize) {
const batch = rows.slice(i, i + batchSize);
await Promise.all(batch.map((row) =>
fetchAndStoreContent(row.id, row.url, row.title, row.description).catch((error) => {
console.error(`backfill worker ${workerIndex} failed on ${row.url}:`, error.message || error);
})
));
}
return rows.length;
}
function hasPendingContent() {
return Boolean(db.prepare(`
SELECT 1 FROM articles
WHERE (content IS NULL OR TRIM(content) = '')
AND (content_status IS NULL OR content_status = 'pending')
AND (content_retry_after IS NULL OR content_retry_after <= datetime('now'))
LIMIT 1
`).get());
}
// kept for backwards compat with scheduler/runAllIngestions one-shot runs
async function backfillMissingContent(perSource = 50, concurrency = 50) {
await runBackfillWorker({
workerIndex: 0,
workerCount: 1,
perSource,
batchSize: concurrency,
});
}
module.exports = {
fetchAndStoreContent,
backfillMissingContent,
runBackfillWorker,
hasPendingContent,
};