refactor content fetching and embedding processes for improved concurrency and error handling

This commit is contained in:
ImBenji 2026-04-19 23:21:29 +01:00
parent 6bf3a9282f
commit 37d9dfb083
8 changed files with 176 additions and 10 deletions

View file

@ -2,6 +2,7 @@ const Fastify = require('fastify');
const cors = require('@fastify/cors'); const cors = require('@fastify/cors');
const articleRoutes = require('./src/routes/articles'); const articleRoutes = require('./src/routes/articles');
const statusRoutes = require('./src/routes/status'); const statusRoutes = require('./src/routes/status');
const sourcesRoutes = require('./src/routes/sources');
const config = require('./src/config'); const config = require('./src/config');
const { startScheduler } = require('./src/scheduler'); const { startScheduler } = require('./src/scheduler');
@ -10,6 +11,7 @@ const app = Fastify({ logger: true });
app.register(cors, { origin: true }); app.register(cors, { origin: true });
app.register(articleRoutes); app.register(articleRoutes);
app.register(statusRoutes); app.register(statusRoutes);
app.register(sourcesRoutes);
app.get('/', async () => ({ ok: true })); app.get('/', async () => ({ ok: true }));

View file

@ -68,6 +68,10 @@ const BODY_PREFIX_BLOCKLIST = [
"checking your browser before", "checking your browser before",
"this site requires javascript", "this site requires javascript",
"please make sure your browser supports", "please make sure your browser supports",
// yahoo finance serves its global nav when the article body is js-rendered
// and the plain fetch only gets the static shell
"today's news us politics world weather",
]; ];

View file

@ -286,7 +286,9 @@ for (const statement of [
'ALTER TABLE articles ADD COLUMN content_attempted_at TEXT', 'ALTER TABLE articles ADD COLUMN content_attempted_at TEXT',
'ALTER TABLE articles ADD COLUMN content_attempt_count INTEGER NOT NULL DEFAULT 0', 'ALTER TABLE articles ADD COLUMN content_attempt_count INTEGER NOT NULL DEFAULT 0',
'ALTER TABLE articles ADD COLUMN content_retry_after TEXT', 'ALTER TABLE articles ADD COLUMN content_retry_after TEXT',
'ALTER TABLE articles ADD COLUMN is_index_page INTEGER NOT NULL DEFAULT 0' 'ALTER TABLE articles ADD COLUMN is_index_page INTEGER NOT NULL DEFAULT 0',
'ALTER TABLE articles ADD COLUMN has_embedding INTEGER NOT NULL DEFAULT 0',
'ALTER TABLE articles ADD COLUMN pub_date_effective TEXT'
]) { ]) {
try { try {
db.exec(statement); db.exec(statement);
@ -297,6 +299,24 @@ for (const statement of [
} }
} }
// backfill has_embedding for existing rows — safe to re-run, only touches rows that need it
db.exec(`
UPDATE articles SET has_embedding = 1
WHERE has_embedding = 0
AND EXISTS (SELECT 1 FROM article_embedding_meta WHERE article_id = articles.id)
`);
// backfill pub_date_effective for existing rows
db.exec(`
UPDATE articles SET pub_date_effective = COALESCE(pub_date, ingested_at)
WHERE pub_date_effective IS NULL
`);
db.exec(`
CREATE INDEX IF NOT EXISTS idx_articles_has_embedding ON articles(has_embedding);
CREATE INDEX IF NOT EXISTS idx_articles_pub_date_effective ON articles(pub_date_effective DESC);
`);
db.exec(` db.exec(`
UPDATE articles UPDATE articles
SET is_index_page = 1 SET is_index_page = 1
@ -321,4 +341,13 @@ db.exec(`
) )
`); `);
// reset articles that grabbed yahoo finance's nav shell instead of article body
db.exec(`
UPDATE articles
SET content = NULL, content_status = NULL, content_error = NULL,
content_attempted_at = NULL, content_attempt_count = 0,
content_retry_after = NULL
WHERE content LIKE 'Today''s news US Politics World Weather%'
`);
module.exports = db; module.exports = db;

View file

@ -35,6 +35,10 @@ const upsertEmbeddingMeta = db.prepare(`
embedded_at = excluded.embedded_at embedded_at = excluded.embedded_at
`); `);
const markArticleHasEmbedding = db.prepare(`
UPDATE articles SET has_embedding = 1 WHERE id = ? AND has_embedding = 0
`);
const upsertEmbeddingStore = db.prepare(` const upsertEmbeddingStore = db.prepare(`
INSERT INTO article_embedding_store (article_id, model, embedding, embedded_at) INSERT INTO article_embedding_store (article_id, model, embedding, embedded_at)
VALUES (?, ?, ?, datetime('now')) VALUES (?, ?, ?, datetime('now'))
@ -364,6 +368,7 @@ async function generateAndStoreEmbedding(id) {
deleteEmbedding.run(BigInt(id)); deleteEmbedding.run(BigInt(id));
insertEmbedding.run(BigInt(id), padEmbeddingForVec0(embedding)); insertEmbedding.run(BigInt(id), padEmbeddingForVec0(embedding));
upsertEmbeddingMeta.run(id, EMBEDDING_MODEL); upsertEmbeddingMeta.run(id, EMBEDDING_MODEL);
markArticleHasEmbedding.run(id);
return { stored: true, shouldPauseBatch: false }; return { stored: true, shouldPauseBatch: false };
} catch (error) { } catch (error) {
@ -387,6 +392,7 @@ function commitEmbeddingBatch(rows) {
deleteEmbedding.run(BigInt(entry.id)); deleteEmbedding.run(BigInt(entry.id));
insertEmbedding.run(BigInt(entry.id), padEmbeddingForVec0(entry.embedding)); insertEmbedding.run(BigInt(entry.id), padEmbeddingForVec0(entry.embedding));
upsertEmbeddingMeta.run(entry.id, EMBEDDING_MODEL); upsertEmbeddingMeta.run(entry.id, EMBEDDING_MODEL);
markArticleHasEmbedding.run(entry.id);
} }
}); });

View file

@ -12,8 +12,9 @@ const insertArticle = db.prepare(`
normalized_title, normalized_title,
source, source,
pub_date, pub_date,
ingested_at ingested_at,
) VALUES (?, ?, NULL, ?, ?, ?, ?, ?, ?) pub_date_effective
) VALUES (?, ?, NULL, ?, ?, ?, ?, ?, ?, ?)
`); `);
const findByUrl = db.prepare('SELECT id FROM articles WHERE url = ?'); const findByUrl = db.prepare('SELECT id FROM articles WHERE url = ?');
const INDEX_PAGE_URL_HINT = /\/(category|categories|tag|tags|topic|topics|section|sections|archive|archives|authors|search)(?:\/|$)/i; const INDEX_PAGE_URL_HINT = /\/(category|categories|tag|tags|topic|topics|section|sections|archive|archives|authors|search)(?:\/|$)/i;
@ -90,7 +91,8 @@ function ingestArticle(article) {
normalizedTitle, normalizedTitle,
source, source,
pubDate, pubDate,
ingestedAt ingestedAt,
pubDate || ingestedAt
); );
// dont kick off the content fetch here — it used to be fire-and-forget which // dont kick off the content fetch here — it used to be fire-and-forget which

View file

@ -34,7 +34,7 @@ function buildArticlesQuery(query) {
conditions.push('content IS NOT NULL AND content != \'\''); conditions.push('content IS NOT NULL AND content != \'\'');
conditions.push('is_index_page = 0'); conditions.push('is_index_page = 0');
conditions.push('EXISTS (SELECT 1 FROM article_embeddings WHERE article_id = articles.id)'); conditions.push('has_embedding = 1');
const whereClause = `WHERE ${conditions.join(' AND ')}`; const whereClause = `WHERE ${conditions.join(' AND ')}`;
const limit = Number.parseInt(query.limit, 10); const limit = Number.parseInt(query.limit, 10);
@ -48,7 +48,7 @@ function buildArticlesQuery(query) {
SELECT id, title, description, content, ${includeEmbedding ? 'embedding,' : ''} url, normalized_title, source, pub_date, ingested_at SELECT id, title, description, content, ${includeEmbedding ? 'embedding,' : ''} url, normalized_title, source, pub_date, ingested_at
FROM articles FROM articles
${whereClause} ${whereClause}
ORDER BY COALESCE(pub_date, ingested_at) DESC, id DESC ORDER BY pub_date_effective DESC, id DESC
LIMIT ? OFFSET ? LIMIT ? OFFSET ?
`, `,
params, params,
@ -76,7 +76,7 @@ function mapNeighborsToArticles(neighbors, excludeIndexPages, limit) {
FROM articles FROM articles
WHERE id IN (${placeholders}) WHERE id IN (${placeholders})
AND content IS NOT NULL AND content != '' AND content IS NOT NULL AND content != ''
AND EXISTS (SELECT 1 FROM article_embeddings WHERE article_id = articles.id) AND has_embedding = 1
${excludeIndexPages ? 'AND is_index_page = 0' : ''} ${excludeIndexPages ? 'AND is_index_page = 0' : ''}
`).all(...ids); `).all(...ids);
const byId = new Map(articles.map((article) => [article.id, article])); const byId = new Map(articles.map((article) => [article.id, article]));
@ -149,7 +149,7 @@ async function articleRoutes(fastify) {
WHERE id = ? WHERE id = ?
AND content IS NOT NULL AND content != '' AND content IS NOT NULL AND content != ''
AND is_index_page = 0 AND is_index_page = 0
AND EXISTS (SELECT 1 FROM article_embeddings WHERE article_id = articles.id) AND has_embedding = 1
`).get(request.params.id); `).get(request.params.id);
if (!article) { if (!article) {

110
src/routes/sources.js Normal file
View file

@ -0,0 +1,110 @@
const db = require('../db');
const { getSourceCatalog } = require('../sources/sourceCatalog');
async function sourcesRoutes(fastify) {
fastify.get('/sources', async () => {
const catalog = getSourceCatalog();
// bucket every article by source string and content_status so we can
// answer "why is this source unusable" without N queries
const statusRows = db.prepare(`
SELECT
source,
COUNT(*) AS total,
SUM(CASE WHEN content_status = 'ready' THEN 1 ELSE 0 END) AS ready,
SUM(CASE WHEN content_status = 'skipped' THEN 1 ELSE 0 END) AS skipped,
SUM(CASE WHEN content_status = 'failed' THEN 1 ELSE 0 END) AS failed,
SUM(CASE WHEN content_status = 'pending' THEN 1 ELSE 0 END) AS pending,
SUM(CASE WHEN content_status IS NULL THEN 1 ELSE 0 END) AS untried,
SUM(CASE
WHEN content IS NOT NULL AND content != ''
AND is_index_page = 0
AND has_embedding = 1
THEN 1 ELSE 0
END) AS usable
FROM articles
GROUP BY source
`).all();
const statusByLabel = new Map();
for (const row of statusRows) {
// source can be "rss:Al Jazeera", "gdelt:Al Jazeera", or just "alphavantage"
const idx = row.source.indexOf(':');
const label = idx >= 0 ? row.source.slice(idx + 1) : row.source;
const feed = idx >= 0 ? row.source.slice(0, idx) : row.source;
if (!statusByLabel.has(label)) statusByLabel.set(label, []);
statusByLabel.get(label).push({ feed, ...row });
}
const policyRows = db.prepare(`
SELECT domain, policy, consecutive_plain_failures, consecutive_browser_failures,
plain_success_count, browser_success_count, expires_at, updated_at
FROM domain_fetch_policy
`).all();
const policyByDomain = new Map(policyRows.map((r) => [r.domain, r]));
return catalog.map((s) => {
const buckets = statusByLabel.get(s.label) || [];
const counts = buckets.reduce(
(acc, b) => {
acc.total += b.total;
acc.ready += b.ready;
acc.skipped += b.skipped;
acc.failed += b.failed;
acc.pending += b.pending;
acc.untried += b.untried;
acc.usable += b.usable;
return acc;
},
{ total: 0, ready: 0, skipped: 0, failed: 0, pending: 0, untried: 0, usable: 0 }
);
const domains = s.website.map((d) => {
const row = policyByDomain.get(d);
if (!row) {
return { domain: d, policy: 'auto' };
}
return {
domain: d,
policy: row.policy,
plainFailures: row.consecutive_plain_failures,
browserFailures: row.consecutive_browser_failures,
plainSuccesses: row.plain_success_count,
browserSuccesses: row.browser_success_count,
expiresAt: row.expires_at,
updatedAt: row.updated_at,
};
});
return {
id: s.id,
label: s.label,
websites: s.website,
backfill: s.backfill,
feeds: s.feedUrls,
counts,
byFeed: buckets.map((b) => ({
feed: b.feed,
total: b.total,
ready: b.ready,
skipped: b.skipped,
failed: b.failed,
pending: b.pending,
untried: b.untried,
usable: b.usable,
})),
domains,
};
});
});
}
module.exports = sourcesRoutes;

View file

@ -1,8 +1,17 @@
const db = require('../db'); const db = require('../db');
const { getLastIngestionBySource } = require('../state'); const { getLastIngestionBySource } = require('../state');
let statusCache = null;
let statusCacheAt = 0;
const STATUS_CACHE_TTL_MS = 30 * 1000;
async function statusRoutes(fastify) { async function statusRoutes(fastify) {
fastify.get('/status', async () => { fastify.get('/status', async () => {
const now = Date.now();
if (statusCache && now - statusCacheAt < STATUS_CACHE_TTL_MS) {
return statusCache;
}
const bySourceRows = db.prepare(` const bySourceRows = db.prepare(`
SELECT SELECT
source, source,
@ -11,7 +20,7 @@ async function statusRoutes(fastify) {
SUM(CASE SUM(CASE
WHEN content IS NOT NULL AND content != '' WHEN content IS NOT NULL AND content != ''
AND is_index_page = 0 AND is_index_page = 0
AND EXISTS (SELECT 1 FROM article_embedding_meta WHERE article_id = articles.id) AND has_embedding = 1
THEN 1 ELSE 0 THEN 1 ELSE 0
END) AS usable END) AS usable
FROM articles FROM articles
@ -38,7 +47,7 @@ async function statusRoutes(fastify) {
ORDER BY article_count DESC ORDER BY article_count DESC
`).all(); `).all();
return { const result = {
total: totals.total, total: totals.total,
usable: totals.usable, usable: totals.usable,
lastIngestionBySource: getLastIngestionBySource(), lastIngestionBySource: getLastIngestionBySource(),
@ -51,6 +60,10 @@ async function statusRoutes(fastify) {
dimensions: row.sample_bytes ? row.sample_bytes / 4 : null, dimensions: row.sample_bytes ? row.sample_bytes / 4 : null,
})), })),
}; };
statusCache = result;
statusCacheAt = now;
return result;
}); });
} }