add Google News integration and enhance crawler capabilities

This commit is contained in:
ImBenji 2026-04-18 13:59:46 +01:00
parent 14727cdfe3
commit 830766bcfe
5 changed files with 77 additions and 24 deletions

View file

@ -171,7 +171,7 @@ async function fetchAndStoreContent(id, url) {
} }
} }
async function backfillMissingContent(limit = 10) { async function backfillMissingContent(limit = 100, concurrency = 5) {
if (contentBackfillRunning) { if (contentBackfillRunning) {
return; return;
} }
@ -183,15 +183,26 @@ async function backfillMissingContent(limit = 10) {
? selectAllArticlesMissingContent.all() ? selectAllArticlesMissingContent.all()
: selectArticlesMissingContent.all(limit); : selectArticlesMissingContent.all(limit);
for (const row of rows) { for (let i = 0; i < rows.length; i += concurrency) {
await fetchAndStoreContent(row.id, row.url); const batch = rows.slice(i, i + concurrency);
await Promise.all(batch.map((row) => fetchAndStoreContent(row.id, row.url)));
} }
} finally { } finally {
contentBackfillRunning = false; contentBackfillRunning = false;
} }
} }
function hasPendingContent() {
return Boolean(db.prepare(`
SELECT 1 FROM articles
WHERE (content IS NULL OR TRIM(content) = '')
AND (content_status IS NULL OR content_status = 'pending')
LIMIT 1
`).get());
}
module.exports = { module.exports = {
fetchAndStoreContent, fetchAndStoreContent,
backfillMissingContent, backfillMissingContent,
hasPendingContent,
}; };

View file

@ -127,6 +127,14 @@ db.exec(`
); );
`); `);
db.exec(`
CREATE TABLE IF NOT EXISTS article_embedding_meta (
article_id INTEGER PRIMARY KEY,
model TEXT NOT NULL,
embedded_at TEXT NOT NULL DEFAULT (datetime('now'))
);
`);
db.exec(` db.exec(`
CREATE TABLE IF NOT EXISTS gdelt_backfill_windows ( CREATE TABLE IF NOT EXISTS gdelt_backfill_windows (
source_id TEXT NOT NULL, source_id TEXT NOT NULL,

View file

@ -19,6 +19,16 @@ const selectEmbeddingBuffer = db.prepare(`
FROM article_embeddings FROM article_embeddings
WHERE article_id = ? WHERE article_id = ?
`); `);
const upsertEmbeddingMeta = db.prepare(`
INSERT INTO article_embedding_meta (article_id, model, embedded_at)
VALUES (?, ?, datetime('now'))
ON CONFLICT(article_id) DO UPDATE SET
model = excluded.model,
embedded_at = excluded.embedded_at
`);
const selectEmbeddingModel = db.prepare(`
SELECT model FROM article_embedding_meta WHERE article_id = ?
`);
const nearestNeighbors = db.prepare(` const nearestNeighbors = db.prepare(`
SELECT article_id, distance SELECT article_id, distance
FROM article_embeddings FROM article_embeddings
@ -35,10 +45,14 @@ const selectArticlesMissingEmbeddings = db.prepare(`
AND TRIM(a.description) != '' AND TRIM(a.description) != ''
AND a.content IS NOT NULL AND a.content IS NOT NULL
AND TRIM(a.content) != '' AND TRIM(a.content) != ''
AND NOT EXISTS ( AND (
SELECT 1 NOT EXISTS (
FROM article_embeddings e SELECT 1 FROM article_embeddings e WHERE e.article_id = a.id
WHERE e.article_id = a.id )
OR NOT EXISTS (
SELECT 1 FROM article_embedding_meta m
WHERE m.article_id = a.id AND m.model = ?
)
) )
ORDER BY a.ingested_at ASC, a.id ASC ORDER BY a.ingested_at ASC, a.id ASC
LIMIT ? LIMIT ?
@ -56,7 +70,15 @@ const upsertQueryEmbedding = db.prepare(`
created_at = datetime('now') created_at = datetime('now')
`); `);
const EMBEDDING_MODEL = 'perplexity/pplx-embed-v1-0.6b';
let embeddingBackfillRunning = false; let embeddingBackfillRunning = false;
// backfill meta for any embeddings that existed before model tracking
db.prepare(`
INSERT OR IGNORE INTO article_embedding_meta (article_id, model)
SELECT article_id, ? FROM article_embeddings
`).run(EMBEDDING_MODEL);
const embeddingJobsRunning = new Set(); const embeddingJobsRunning = new Set();
function buildEmbeddingInput(article) { function buildEmbeddingInput(article) {
@ -90,7 +112,7 @@ async function requestEmbedding(input) {
'Content-Type': 'application/json', 'Content-Type': 'application/json',
}, },
body: JSON.stringify({ body: JSON.stringify({
model: 'perplexity/pplx-embed-v1-0.6b', model: EMBEDDING_MODEL,
input, input,
}), }),
}); });
@ -159,6 +181,7 @@ async function generateAndStoreEmbedding(id) {
const embedding = await requestEmbedding(input); const embedding = await requestEmbedding(input);
deleteEmbedding.run(BigInt(id)); deleteEmbedding.run(BigInt(id));
insertEmbedding.run(BigInt(id), serializeEmbedding(embedding)); insertEmbedding.run(BigInt(id), serializeEmbedding(embedding));
upsertEmbeddingMeta.run(id, EMBEDDING_MODEL);
return { stored: true, shouldPauseBatch: false }; return { stored: true, shouldPauseBatch: false };
} catch (error) { } catch (error) {
console.error(`embedding generation failed for article ${id}:`, error); console.error(`embedding generation failed for article ${id}:`, error);
@ -179,7 +202,7 @@ async function backfillMissingEmbeddings(limit = 100) {
embeddingBackfillRunning = true; embeddingBackfillRunning = true;
try { try {
const rows = selectArticlesMissingEmbeddings.all(limit); const rows = selectArticlesMissingEmbeddings.all(EMBEDDING_MODEL, limit);
for (const row of rows) { for (const row of rows) {
const result = await generateAndStoreEmbedding(row.id); const result = await generateAndStoreEmbedding(row.id);

View file

@ -7,7 +7,7 @@ const { fetchEdgarArticles } = require('./sources/edgar');
const { fetchAlphaVantageArticles } = require('./sources/alphavantage'); const { fetchAlphaVantageArticles } = require('./sources/alphavantage');
const { fetchFinnhubArticles } = require('./sources/finnhub'); const { fetchFinnhubArticles } = require('./sources/finnhub');
const { fetchGoogleNewsArticles } = require('./sources/googleNews'); const { fetchGoogleNewsArticles } = require('./sources/googleNews');
const { backfillMissingContent } = require('./content'); const { backfillMissingContent, hasPendingContent } = require('./content');
const { backfillMissingEmbeddings } = require('./embeddings'); const { backfillMissingEmbeddings } = require('./embeddings');
function sleep(ms) { function sleep(ms) {
@ -89,7 +89,13 @@ function startScheduler() {
await runSource('googlenews', fetchGoogleNewsArticles); await runSource('googlenews', fetchGoogleNewsArticles);
}; };
const runContentMaintenance = async () => { const runContentLoop = async () => {
while (true) {
if (!hasPendingContent()) {
await sleep(60 * 1000);
continue;
}
try { try {
await backfillMissingContent(); await backfillMissingContent();
} catch (error) { } catch (error) {
@ -101,6 +107,7 @@ function startScheduler() {
} catch (error) { } catch (error) {
console.error('embedding backfill failed:', error); console.error('embedding backfill failed:', error);
} }
}
}; };
runRss(); runRss();
@ -109,14 +116,13 @@ function startScheduler() {
runAlphaVantage(); runAlphaVantage();
runFinnhub(); runFinnhub();
// runGoogleNews(); // runGoogleNews();
runContentMaintenance(); runContentLoop();
cron.schedule(config.scheduler.rss, runRss); cron.schedule(config.scheduler.rss, runRss);
cron.schedule(config.scheduler.edgar, runEdgar); cron.schedule(config.scheduler.edgar, runEdgar);
cron.schedule(config.scheduler.alphaVantage, runAlphaVantage); cron.schedule(config.scheduler.alphaVantage, runAlphaVantage);
cron.schedule(config.scheduler.finnhub, runFinnhub); cron.schedule(config.scheduler.finnhub, runFinnhub);
cron.schedule(config.contentBackfill.cron, runContentMaintenance);
} }
module.exports = { module.exports = {

View file

@ -114,14 +114,19 @@ async function fetchWindowBigQuery(source, window, bigquery) {
const maxRecords = Math.max(1, Math.min(Number(config.gdelt?.maxRecords) || 100, 1000)); const maxRecords = Math.max(1, Math.min(Number(config.gdelt?.maxRecords) || 100, 1000));
const domainClauses = source.website.map((d) => `LOWER(DocumentIdentifier) LIKE '%${d}%'`).join(' OR '); const domainClauses = source.website.map((d) => `LOWER(DocumentIdentifier) LIKE '%${d}%'`).join(' OR ');
// use _PARTITIONTIME (not the INTEGER DATE column) so BigQuery can prune partitions
// and avoid full-table scans on the ~130TB gkg table
const startTs = window.start.toISOString();
const endTs = window.end.toISOString();
const query = ` const query = `
SELECT SELECT
DocumentIdentifier AS url, DocumentIdentifier AS url,
SourceCommonName AS domain, SourceCommonName AS domain,
CAST(DATE AS STRING) AS seendate CAST(DATE AS STRING) AS seendate
FROM \`gdelt-bq.gdeltv2.gkg\` FROM \`gdelt-bq.gdeltv2.gkg\`
WHERE DATE >= ${window.startKey} WHERE _PARTITIONTIME >= TIMESTAMP("${startTs}")
AND DATE < ${window.endKey} AND _PARTITIONTIME < TIMESTAMP("${endTs}")
AND (${domainClauses}) AND (${domainClauses})
LIMIT ${maxRecords} LIMIT ${maxRecords}
`; `;