diff --git a/README.md b/README.md index 7ddcaab..49975f2 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ On startup the server: 1. Opens the SQLite database and runs any pending migrations. 2. Registers routes. 3. Starts the HTTP server. -4. Launches continuous background loops for each source, content backfill, and embedding backfill. +4. Launches continuous background loops for each source, content backfill, embedding backfill, and event clustering. When a new article is inserted: @@ -31,6 +31,7 @@ When a new article is inserted: - `content` starts as `null` - content backfill workers pick it up asynchronously — plain HTTP first, Playwright fallback for JS-heavy sites - vector embeddings are generated after title, description, and content are all available +- the clustering worker assigns the article to an event once it has an embedding - only articles with content + embedding are exposed via the API Content backfill prioritises recent articles (`pub_date_effective DESC`) so newest content surfaces first regardless of ingestion order. @@ -107,6 +108,41 @@ Semantic and similarity results also include `"distance": 0.1234`. Returns one article by numeric ID. Same usability filter as the list endpoint — returns `404` if the article exists but has no content or embedding. +### `GET /events` + +Returns a single event and its articles. + +#### Query params + +| Param | Description | +|---|---| +| `id` | Event ID (required) | + +#### Response shape + +```json +{ + "id": 1, + "title": "...", + "created_at": "2025-01-01T12:35:10.000Z", + "articles": [ + { + "id": 123, + "title": "...", + "description": "...", + "content": "...", + "url": "...", + "normalized_title": "...", + "source": "rss:BBC", + "pub_date": "2025-01-01T12:34:56.000Z", + "ingested_at": "2025-01-01T12:35:10.000Z" + } + ] +} +``` + +Returns `404` if the event ID does not exist. + ### `GET /status` Returns archive summary. Cached for 30 seconds. @@ -149,3 +185,4 @@ Use `domains[].policy` to diagnose why a source has high `skipped` or `failed` c - Embeddings use OpenRouter and are indexed in `sqlite-vec` for ANN search. - Query embeddings are cached in SQLite to avoid redundant API calls. - SEC requests use the `User-Agent` from `config.json`. +- Event clustering groups articles by embedding similarity (cosine distance ≤ `config.clustering.distanceThreshold`, default `0.25`) and time proximity (within `config.clustering.windowHours`, default `72`). Articles outside the time window are never grouped together even if embeddings are close. diff --git a/config.json b/config.json index 14cb93f..e2f51f2 100644 --- a/config.json +++ b/config.json @@ -19,7 +19,7 @@ "tickers": [] }, "openRouter": { - "apiKey": "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", + "apiKey": "[OFF]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", "embeddingModel": "qwen/qwen3-embedding-8b" }, "gdelt": { diff --git a/server.js b/server.js index be60406..c8de679 100644 --- a/server.js +++ b/server.js @@ -3,6 +3,7 @@ const cors = require('@fastify/cors'); const articleRoutes = require('./src/routes/articles'); const statusRoutes = require('./src/routes/status'); const sourcesRoutes = require('./src/routes/sources'); +const eventRoutes = require('./src/routes/events'); const config = require('./src/config'); const { startScheduler } = require('./src/scheduler'); @@ -12,6 +13,7 @@ app.register(cors, { origin: true }); app.register(articleRoutes); app.register(statusRoutes); app.register(sourcesRoutes); +app.register(eventRoutes); app.get('/', async () => ({ ok: true })); diff --git a/src/clustering.js b/src/clustering.js new file mode 100644 index 0000000..875d5f9 --- /dev/null +++ b/src/clustering.js @@ -0,0 +1,114 @@ +const db = require('./db'); +const config = require('./config'); +const { findSimilarArticles } = require('./embeddings'); + +// cosine distance threshold — articles closer than this get filed into the same event. +// tunable via config.clustering.distanceThreshold +const DEFAULT_THRESHOLD = 0.25; + +const DEFAULT_WINDOW_HOURS = 72; + +function getThreshold() { + return Number((config.clustering && config.clustering.distanceThreshold) || DEFAULT_THRESHOLD); +} + +function getWindowHours() { + return Number((config.clustering && config.clustering.windowHours) || DEFAULT_WINDOW_HOURS); +} + +const selectPending = db.prepare(` + SELECT id, title, pub_date_effective FROM articles + WHERE has_embedding = 1 + AND event_id IS NULL + AND content IS NOT NULL AND content != '' + AND is_index_page = 0 + ORDER BY pub_date_effective DESC, id DESC + LIMIT ? +`); + +const getEventNeighbor = db.prepare(` + SELECT a.event_id, a.pub_date_effective + FROM articles a + WHERE a.id = ? AND a.event_id IS NOT NULL +`); + +const assignEvent = db.prepare(` + UPDATE articles SET event_id = ? WHERE id = ? +`); + +const createEvent = db.prepare(` + INSERT INTO events (title) VALUES (?) +`); + + +let clusteringRunning = false; + +async function backfillMissingClusters(limit = 128) { + if (clusteringRunning) return { processed: 0 }; + + clusteringRunning = true; + let processed = 0; + + try { + const candidates = selectPending.all(limit); + if (candidates.length === 0) return { processed: 0 }; + + const threshold = getThreshold(); + const windowMs = getWindowHours() * 60 * 60 * 1000; + + for (const article of candidates) { + + // re-check in case a prev iteration in this batch already assigned it + const alreadyAssigned = db.prepare(`SELECT event_id FROM articles WHERE id = ?`).get(article.id); + if (alreadyAssigned && alreadyAssigned.event_id != null) { + processed++; + continue; + } + + const neighbors = findSimilarArticles(article.id, 10); + const closeEnough = neighbors.filter((n) => n.distance <= threshold); + + const articleTime = article.pub_date_effective ? new Date(article.pub_date_effective).getTime() : null; + + let eventId = null; + + for (const neighbor of closeEnough) { + const row = getEventNeighbor.get(neighbor.articleId); + if (!row || row.event_id == null) continue; + + if (articleTime && row.pub_date_effective) { + const neighborTime = new Date(row.pub_date_effective).getTime(); + if (Math.abs(articleTime - neighborTime) > windowMs) continue; + } + + eventId = row.event_id; + break; + } + + if (eventId == null) { + const result = createEvent.run(article.title); + eventId = result.lastInsertRowid; + } + + assignEvent.run(eventId, article.id); + processed++; + } + } finally { + clusteringRunning = false; + } + + return { processed }; +} + +function hasPendingClusters() { + return Boolean(db.prepare(` + SELECT 1 FROM articles + WHERE has_embedding = 1 + AND event_id IS NULL + AND content IS NOT NULL AND content != '' + AND is_index_page = 0 + LIMIT 1 + `).get()); +} + +module.exports = { backfillMissingClusters, hasPendingClusters }; diff --git a/src/db.js b/src/db.js index 95c8f11..e0e95e0 100644 --- a/src/db.js +++ b/src/db.js @@ -220,6 +220,30 @@ db.exec(` } } +db.exec(` + CREATE TABLE IF NOT EXISTS events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + title TEXT NOT NULL, + created_at TEXT NOT NULL DEFAULT (datetime('now')) + ); +`); + +for (const statement of [ + 'ALTER TABLE articles ADD COLUMN event_id INTEGER REFERENCES events(id)', +]) { + try { + db.exec(statement); + } catch (error) { + if (!String(error.message).includes('duplicate column name')) { + throw error; + } + } +} + +db.exec(` + CREATE INDEX IF NOT EXISTS idx_articles_event_id ON articles(event_id); +`); + db.exec(` CREATE TABLE IF NOT EXISTS gdelt_backfill_windows ( source_id TEXT NOT NULL, @@ -300,69 +324,9 @@ for (const statement of [ } } -// backfill has_embedding for existing rows — safe to re-run, only touches rows that need it -db.exec(` - UPDATE articles SET has_embedding = 1 - WHERE has_embedding = 0 - AND EXISTS (SELECT 1 FROM article_embedding_meta WHERE article_id = articles.id) -`); - -// backfill pub_date_effective for existing rows -db.exec(` - UPDATE articles SET pub_date_effective = COALESCE(pub_date, ingested_at) - WHERE pub_date_effective IS NULL -`); - -// backfill language from sources.json for existing rows -{ - const sources = require('../sources.json'); - const updateLang = db.prepare(`UPDATE articles SET language = ? WHERE source = ? AND language IS NULL`); - const backfillLang = db.transaction(() => { - for (const src of sources) { - if (src.language) { - updateLang.run(src.language, src.id); - } - } - }); - backfillLang(); -} - db.exec(` CREATE INDEX IF NOT EXISTS idx_articles_has_embedding ON articles(has_embedding); CREATE INDEX IF NOT EXISTS idx_articles_pub_date_effective ON articles(pub_date_effective DESC); `); -db.exec(` - UPDATE articles - SET is_index_page = 1 - WHERE is_index_page = 0 - AND ( - LOWER(url) LIKE '%/category/%' - OR LOWER(url) LIKE '%/categories/%' - OR LOWER(url) LIKE '%/tag/%' - OR LOWER(url) LIKE '%/tags/%' - OR LOWER(url) LIKE '%/topic/%' - OR LOWER(url) LIKE '%/topics/%' - OR LOWER(url) LIKE '%/section/%' - OR LOWER(url) LIKE '%/sections/%' - OR LOWER(url) LIKE '%/archive%' - OR LOWER(url) LIKE '%/archives/%' - OR LOWER(url) LIKE '%/authors/%' - OR LOWER(url) LIKE '%/search%' - OR LOWER(title) LIKE '%category%' - OR LOWER(title) LIKE '%archives%' - OR LOWER(title) LIKE '%archive%' - OR LOWER(title) LIKE '%latest news%' - ) -`); - -// reset articles that grabbed yahoo finance's nav shell instead of article body -db.exec(` - UPDATE articles - SET content = NULL, content_status = NULL, content_error = NULL, - content_attempted_at = NULL, content_attempt_count = 0, - content_retry_after = NULL - WHERE content LIKE 'Today''s news US Politics World Weather%' -`); - module.exports = db; diff --git a/src/embeddings.js b/src/embeddings.js index 2fc7fb6..238275b 100644 --- a/src/embeddings.js +++ b/src/embeddings.js @@ -121,46 +121,6 @@ try { console.error('embedding store backfill failed:', err); } -// probe the API to get the real dimension count for the current model, then purge -// any store entries that don't match — handles the case where old embeddings -// got stamped with the wrong model name during migration -async function purgeWrongSizeEmbeddings() { - const apiKey = config.openRouter && config.openRouter.apiKey - ? String(config.openRouter.apiKey).trim() - : ''; - - if (!apiKey) return; - - try { - const probe = await requestEmbedding('probe'); - const expectedBytes = probe.length * 4; - - const stale = db.prepare(` - SELECT article_id FROM article_embedding_store - WHERE model = ? AND LENGTH(embedding) != ? - `).all(EMBEDDING_MODEL, expectedBytes); - - if (stale.length === 0) return; - - const deleteStore = db.prepare(`DELETE FROM article_embedding_store WHERE article_id = ? AND model = ?`); - const deleteMeta = db.prepare(`DELETE FROM article_embedding_meta WHERE article_id = ?`); - const deleteVec = db.prepare(`DELETE FROM article_embeddings WHERE article_id = ?`); - - db.transaction(() => { - for (const row of stale) { - deleteStore.run(row.article_id, EMBEDDING_MODEL); - deleteMeta.run(row.article_id); - deleteVec.run(BigInt(row.article_id)); - } - })(); - - console.log(`purged ${stale.length} wrong-size embeddings for model ${EMBEDDING_MODEL} (expected ${probe.length} dims)`); - } catch (err) { - console.error('embedding size validation failed:', err); - } -} - -purgeWrongSizeEmbeddings(); // if the config model changed, rebuild the vec0 search index from store. // only proceeds if the store actually has embeddings for the new model. diff --git a/src/routes/events.js b/src/routes/events.js new file mode 100644 index 0000000..36e9fca --- /dev/null +++ b/src/routes/events.js @@ -0,0 +1,37 @@ +const db = require('../db'); + +async function eventRoutes(fastify) { + fastify.get('/events', async (request, reply) => { + const query = request.query || {}; + + if (!query.id) { + reply.code(400); + return { error: 'id is required' }; + } + + const id = Number.parseInt(query.id, 10); + if (!Number.isFinite(id)) { + reply.code(400); + return { error: 'id must be a number' }; + } + + const event = db.prepare(`SELECT id, title, created_at FROM events WHERE id = ?`).get(id); + if (!event) { + reply.code(404); + return { error: 'Event not found' }; + } + + const articles = db.prepare(` + SELECT id, title, description, content, url, normalized_title, source, pub_date, ingested_at + FROM articles + WHERE event_id = ? + AND content IS NOT NULL AND content != '' + AND is_index_page = 0 + ORDER BY pub_date_effective DESC, id DESC + `).all(id); + + return { ...event, articles }; + }); +} + +module.exports = eventRoutes; diff --git a/src/scheduler.js b/src/scheduler.js index ef0d984..d8a70e8 100644 --- a/src/scheduler.js +++ b/src/scheduler.js @@ -9,6 +9,7 @@ const { fetchFinnhubArticles } = require('./sources/finnhub'); const { fetchGoogleNewsArticles } = require('./sources/googleNews'); const { backfillMissingContent, runBackfillWorker, hasPendingContent } = require('./content'); const { backfillMissingEmbeddings, hasPendingEmbeddings } = require('./embeddings'); +const { backfillMissingClusters, hasPendingClusters } = require('./clustering'); function sleep(ms) { return new Promise((resolve) => setTimeout(resolve, ms)); @@ -173,7 +174,29 @@ function startScheduler() { runContentWorker(i, workerCount); } + const runClusteringLoop = async () => { + while (true) { + if (!hasPendingClusters()) { + await sleep(30 * 1000); + continue; + } + + try { + const limit = Number(config.clustering && config.clustering.perRound) || 128; + const result = await backfillMissingClusters(limit); + + if (result.processed === 0) { + await sleep(5000); + } + } catch (error) { + console.error('clustering backfill failed:', error); + await sleep(10000); + } + } + }; + runEmbeddingLoop(); + runClusteringLoop(); cron.schedule(config.scheduler.rss, runRss); cron.schedule(config.scheduler.edgar, runEdgar);