diff --git a/README.md b/README.md index 294dfbf..d1c2df8 100644 --- a/README.md +++ b/README.md @@ -124,7 +124,7 @@ Without `id` — returns a paginated list of events. With `id` — returns a sin ```json [ - { "id": 1, "title": "...", "created_at": "2025-01-01T12:35:10.000Z" } + { "id": 1, "title": "...", "pub_date": "2025-01-01T12:34:56.000Z" } ] ``` @@ -134,7 +134,7 @@ Without `id` — returns a paginated list of events. With `id` — returns a sin { "id": 1, "title": "...", - "created_at": "2025-01-01T12:35:10.000Z", + "pub_date": "2025-01-01T12:34:56.000Z", "articles": [ { "id": 123, diff --git a/config.json b/config.json index 559ae0d..2737cec 100644 --- a/config.json +++ b/config.json @@ -1,4 +1,17 @@ { + "duriin_db": "/data/archive.sqlite", + "intelligence_db": "/data/intelligence.sqlite", + "llm": { + "baseUrl": "https://openrouter.ai/api/v1", + "model": "qwen/qwen3-235b-a22b-2507", + "apiKey": "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" + }, + "workers": { + "relevanceBatchSize": 50, + "relevanceLoopDelayMs": 2000, + "extractionLoopDelayMs": 1000, + "consolidationLoopDelayMs": 60000 + }, "server": { "port": 3001, "host": "0.0.0.0" @@ -61,6 +74,9 @@ "browser": { "maxConcurrentPages": 8 }, + "dev": { + "enabled": true + }, "googleNews": { "queries": [ "technology" diff --git a/docker-compose.yml b/docker-compose.yml index 2b6fe7b..fdb4add 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -12,6 +12,20 @@ services: networks: - nginx_proxy_manager_default + intelligence: + build: + context: . + provenance: false + command: node intelligence/index.js + volumes: + - ./config.json:/app/config.json:ro + - ./data:/data + environment: + NODE_ENV: production + restart: unless-stopped + networks: + - nginx_proxy_manager_default + networks: nginx_proxy_manager_default: external: true diff --git a/intelligence/augorWorker.js b/intelligence/augorWorker.js new file mode 100644 index 0000000..5679524 --- /dev/null +++ b/intelligence/augorWorker.js @@ -0,0 +1,220 @@ +const https = require("https"); +const http = require("http"); + +const { findMatchedCompaniesByEmbedding } = require("./embeddings"); + +async function runAugorWorker(archiveDb, intelligenceDb, config) { + const loopDelay = config.workers?.augorLoopDelayMs ?? 1500; + const llmConfig = config.llm || {}; + + const getPending = intelligenceDb.prepare(` + SELECT * FROM article_queue WHERE status = 'pending' LIMIT 1 + `); + + const setStatus = intelligenceDb.prepare(` + UPDATE article_queue SET status = ?, updated_at = CURRENT_TIMESTAMP WHERE article_id = ? + `); + + const getEventArticleIds = archiveDb.prepare( + "SELECT id FROM articles WHERE event_id = ?" + ); + + const setStatusByArticleId = intelligenceDb.prepare(` + UPDATE article_queue SET status = 'processed', updated_at = CURRENT_TIMESTAMP + WHERE article_id = ? AND status = 'pending' + `); + + const deleteKnowledge = intelligenceDb.prepare( + "DELETE FROM event_knowledge WHERE event_id = ?" + ); + const deletePredictions = intelligenceDb.prepare( + "DELETE FROM event_predictions WHERE event_id = ?" + ); + + const insertKnowledge = intelligenceDb.prepare(` + INSERT INTO event_knowledge (event_id, company_id, type, data) + VALUES (?, ?, ?, ?) + `); + const insertPrediction = intelligenceDb.prepare(` + INSERT INTO event_predictions (event_id, company_id, type, direction, magnitude, timeframe, rationale) + VALUES (?, ?, ?, ?, ?, ?, ?) + `); + + while (true) { + try { + const queueRow = getPending.get(); + + if (!queueRow) { + await sleep(loopDelay); + continue; + } + + const article = archiveDb.prepare(` + SELECT id, event_id, content, has_embedding + FROM articles WHERE id = ? + `).get(queueRow.article_id); + + if (!article || !article.content || !article.has_embedding || !article.event_id) { + setStatus.run("skipped", queueRow.article_id); + continue; + } + + const eventId = article.event_id; + + const event = archiveDb.prepare("SELECT * FROM events WHERE id = ?").get(eventId); + if (!event) { + setStatus.run("skipped", queueRow.article_id); + continue; + } + + const eventArticles = archiveDb.prepare(` + SELECT id, title, description, content + FROM articles + WHERE event_id = ? AND content IS NOT NULL AND content != '' + ORDER BY id ASC + LIMIT 25 + `).all(eventId); + + const eventArticleIds = eventArticles.map(a => a.id); + + const matchedCompanies = findMatchedCompaniesByEmbedding( + eventArticleIds, archiveDb, intelligenceDb, config + ); + + if (matchedCompanies.length === 0) { + for (const r of getEventArticleIds.all(eventId)) setStatusByArticleId.run(r.id); + console.log(`[augor] event ${eventId} — no company match, skipped`); + continue; + } + + + deleteKnowledge.run(eventId); + deletePredictions.run(eventId); + + const articleText = eventArticles.map((a, i) => { + const body = (a.content || a.description || "").slice(0, 2000); + return `[Article ${i + 1}] ${a.title}\n${body}`; + }).join("\n\n---\n\n"); + + for (const company of matchedCompanies) { + try { + const result = await callLlm(llmConfig, buildPrompt(company.name, event.title, articleText)); + + if (result) { + const writeAll = intelligenceDb.transaction(() => { + for (const r of (result.knowledge?.relationships || [])) { + insertKnowledge.run(eventId, company.id, "relationship", JSON.stringify(r)); + } + for (const t of (result.knowledge?.themes || [])) { + insertKnowledge.run(eventId, company.id, "theme", JSON.stringify(t)); + } + for (const f of (result.knowledge?.factors || [])) { + insertKnowledge.run(eventId, company.id, "factor", JSON.stringify(f)); + } + + for (const p of (result.predictions || [])) { + insertPrediction.run(eventId, company.id, p.type, p.direction, p.magnitude, p.timeframe, p.rationale); + } + }); + + writeAll(); + } + + } catch (llmErr) { + console.error(`[augor] LLM error for ${company.name} on event ${eventId}:`, llmErr.message); + } + } + + setStatusByEventArticles.run(eventId); + console.log(`[augor] processed event ${eventId} (${matchedCompanies.length} companies, ${eventArticles.length} articles)`); + + } catch (err) { + console.error("[augor] error:", err.message); + await sleep(loopDelay); + } + } +} + +function buildPrompt(companyName, eventTitle, articleText) { + return `You are a financial intelligence analyst. Extract structured knowledge about ${companyName} from these news articles. + +Event: ${eventTitle} + +${articleText} + +Return JSON only — no explanation. Shape: +{ + "knowledge": { + "relationships": [ + { "type": "supplier|customer|competitor", "entity": "string", "confidence": "high|medium|low", "evidence": "string" } + ], + "themes": [ + { "theme": "string", "direction": "increasing|stable|decreasing", "evidence": "string" } + ], + "factors": [ + { "factor": "string", "relationship": "string", "evidence": "string" } + ] + }, + "predictions": [ + { "type": "market_share|stock_price|competitive_position|other", "direction": "positive|negative|neutral", "magnitude": "high|medium|low", "timeframe": "short|medium|long", "rationale": "string" } + ] +} + +Only include claims directly supported by the articles. Use empty arrays if nothing applies.`; +} + +async function callLlm(llmConfig, prompt) { + const body = JSON.stringify({ + model: llmConfig.model || "gpt-4o-mini", + messages: [{ role: "user", content: prompt }], + temperature: 0.1, + response_format: { type: "json_object" }, + }); + + const baseUrl = llmConfig.baseUrl || "https://api.openai.com"; + const url = new URL("/v1/chat/completions", baseUrl); + + const responseText = await httpPost(url, body, { + "Content-Type": "application/json", + "Authorization": `Bearer ${llmConfig.apiKey || ""}`, + }); + + const parsed = JSON.parse(responseText); + const content = parsed.choices?.[0]?.message?.content; + if (!content) return null; + + return JSON.parse(content); +} + +function httpPost(url, body, headers) { + return new Promise((resolve, reject) => { + const lib = url.protocol === "https:" ? https : http; + const req = lib.request({ + hostname: url.hostname, + port: url.port || (url.protocol === "https:" ? 443 : 80), + path: url.pathname + url.search, + method: "POST", + headers: { ...headers, "Content-Length": Buffer.byteLength(body) }, + }, (res) => { + let data = ""; + res.on("data", chunk => data += chunk); + res.on("end", () => { + if (res.statusCode >= 200 && res.statusCode < 300) { + resolve(data); + } else { + reject(new Error(`LLM ${res.statusCode}: ${data.slice(0, 300)}`)); + } + }); + }); + + req.on("error", reject); + req.write(body); + req.end(); + }); +} + +function sleep(ms) { + return new Promise(r => setTimeout(r, ms)); +} + +module.exports = { runAugorWorker }; diff --git a/intelligence/db.js b/intelligence/db.js new file mode 100644 index 0000000..2bdfaa8 --- /dev/null +++ b/intelligence/db.js @@ -0,0 +1,97 @@ +const Database = require("better-sqlite3"); + +let archiveDb = null; +let intelligenceDb = null; + +function getArchiveDb(dbPath) { + if (!archiveDb) { + archiveDb = new Database(dbPath, { readonly: true }); + archiveDb.pragma("journal_mode = WAL"); + } + return archiveDb; +} + +function getIntelligenceDb(dbPath) { + if (!intelligenceDb) { + intelligenceDb = new Database(dbPath); + intelligenceDb.pragma("journal_mode = WAL"); + } + return intelligenceDb; +} + +function runMigrations(db) { + db.exec(` + CREATE TABLE IF NOT EXISTS cursors ( + key TEXT PRIMARY KEY, + value INTEGER + ); + + CREATE TABLE IF NOT EXISTS tracked_companies ( + id INTEGER PRIMARY KEY, + name TEXT, + ticker TEXT, + aliases TEXT + ); + + CREATE TABLE IF NOT EXISTS article_queue ( + id INTEGER PRIMARY KEY, + article_id INTEGER UNIQUE, + status TEXT DEFAULT 'pending', + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME + ); + + CREATE TABLE IF NOT EXISTS event_knowledge ( + id INTEGER PRIMARY KEY, + event_id INTEGER, + company_id INTEGER, + type TEXT, + data TEXT, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP + ); + + CREATE TABLE IF NOT EXISTS company_embeddings ( + company_id INTEGER PRIMARY KEY, + embedding BLOB NOT NULL, + model TEXT NOT NULL, + generated_at DATETIME DEFAULT CURRENT_TIMESTAMP + ); + + CREATE TABLE IF NOT EXISTS event_predictions ( + id INTEGER PRIMARY KEY, + event_id INTEGER, + company_id INTEGER, + type TEXT, + direction TEXT, + magnitude TEXT, + timeframe TEXT, + rationale TEXT, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP + ); + `); +} + +function seedCompanies(db) { + const count = db.prepare("SELECT COUNT(*) as c FROM tracked_companies").get().c; + if (count > 0) return; + + const insert = db.prepare( + "INSERT INTO tracked_companies (name, ticker, aliases) VALUES (?, ?, ?)" + ); + + const companies = [ + { name: "NVIDIA", ticker: "NVDA", aliases: ["Nvidia Corporation", "NVDA"] }, + { name: "TSMC", ticker: "TSM", aliases: ["Taiwan Semiconductor", "Taiwan Semiconductor Manufacturing"] }, + { name: "ASML", ticker: "ASML", aliases: ["ASML Holding", "ASML Holdings"] }, + { name: "Intel", ticker: "INTC", aliases: ["Intel Corporation"] }, + { name: "Samsung", ticker: "005930.KS", aliases: ["Samsung Electronics", "Samsung Group"] }, + ]; + + for (const c of companies) { + insert.run(c.name, c.ticker, JSON.stringify(c.aliases)); + } + + console.log(`[db] seeded ${companies.length} tracked companies`); +} + +module.exports = { getArchiveDb, getIntelligenceDb, runMigrations, seedCompanies }; diff --git a/intelligence/embeddings.js b/intelligence/embeddings.js new file mode 100644 index 0000000..f434ddb --- /dev/null +++ b/intelligence/embeddings.js @@ -0,0 +1,134 @@ +// embedding generation and cosine similarity for the intelligence layer + +async function generateEmbedding(text, openRouterConfig) { + const response = await fetch("https://openrouter.ai/api/v1/embeddings", { + method: "POST", + headers: { + "Authorization": `Bearer ${openRouterConfig.apiKey}`, + "Content-Type": "application/json", + }, + body: JSON.stringify({ + model: openRouterConfig.embeddingModel, + input: text, + }), + }); + + if (!response.ok) { + let msg = `embedding request failed with ${response.status}`; + try { + const payload = await response.json(); + if (payload?.error?.message) msg = payload.error.message; + } catch (_) {} + throw new Error(msg); + } + + const payload = await response.json(); + const embedding = payload?.data?.[0]?.embedding; + if (!Array.isArray(embedding) || embedding.length === 0) { + throw new Error("invalid embedding response"); + } + + return embedding; +} + +// Float32 BLOB -> Float32Array +function blobToFloat32(buf) { + return new Float32Array(buf.buffer, buf.byteOffset, buf.byteLength / 4); +} + +function cosineSimilarity(a, b) { + if (a.length !== b.length) { + // if dims differ just use the shorter length — handles edge cases gracefully + const len = Math.min(a.length, b.length); + a = a.subarray(0, len); + b = b.subarray(0, len); + } + + let dot = 0, normA = 0, normB = 0; + for (let i = 0; i < a.length; i++) { + dot += a[i] * b[i]; + normA += a[i] * a[i]; + normB += b[i] * b[i]; + } + + const denom = Math.sqrt(normA) * Math.sqrt(normB); + return denom === 0 ? 0 : dot / denom; +} + +// generates company embeddings for any tracked company that doesnt have one yet +async function ensureCompanyEmbeddings(intelligenceDb, openRouterConfig) { + const companies = intelligenceDb.prepare("SELECT * FROM tracked_companies").all(); + + const getEmbed = intelligenceDb.prepare( + "SELECT embedding FROM company_embeddings WHERE company_id = ?" + ); + const upsertEmbed = intelligenceDb.prepare(` + INSERT INTO company_embeddings (company_id, embedding, model, generated_at) + VALUES (?, ?, ?, CURRENT_TIMESTAMP) + ON CONFLICT(company_id) DO UPDATE SET + embedding = excluded.embedding, + model = excluded.model, + generated_at = excluded.generated_at + `); + + for (const company of companies) { + const existing = getEmbed.get(company.id); + if (existing) continue; + + const text = `${company.name} is a company with ticker ${company.ticker}`; + try { + const embedding = await generateEmbedding(text, openRouterConfig); + const buf = Buffer.from(new Float32Array(embedding).buffer); + upsertEmbed.run(company.id, buf, openRouterConfig.embeddingModel); + console.log(`[embeddings] generated embedding for ${company.name}`); + } catch (err) { + console.error(`[embeddings] failed for ${company.name}:`, err.message); + } + } +} + + +// returns matched company objects from tracked_companies +// checks cosine similarity between each company embedding and +// the raw embeddings of all articles in the event +function findMatchedCompaniesByEmbedding(eventArticleIds, archiveDb, intelligenceDb, config) { + const threshold = config.intelligence?.similarityThreshold ?? 0.35; + const model = config.openRouter?.embeddingModel; + + const companies = intelligenceDb.prepare( + "SELECT id, name, ticker FROM company_embeddings ce JOIN tracked_companies tc ON tc.id = ce.company_id" + ).all(); + + if (companies.length === 0) return []; + + // load article embeddings from archive — only articles that have one + const articleEmbeddings = []; + for (const articleId of eventArticleIds) { + const row = archiveDb.prepare( + "SELECT embedding FROM article_embedding_store WHERE article_id = ? AND model = ?" + ).get(articleId, model); + if (row) articleEmbeddings.push(blobToFloat32(row.embedding)); + } + + if (articleEmbeddings.length === 0) return []; + + const matched = []; + for (const company of companies) { + const companyRow = intelligenceDb.prepare( + "SELECT embedding FROM company_embeddings WHERE company_id = ?" + ).get(company.id); + if (!companyRow) continue; + + const companyVec = blobToFloat32(companyRow.embedding); + const hit = articleEmbeddings.some(articleVec => { + const sim = cosineSimilarity(companyVec, articleVec); + return sim >= threshold; + }); + + if (hit) matched.push(company); + } + + return matched; +} + +module.exports = { generateEmbedding, ensureCompanyEmbeddings, findMatchedCompaniesByEmbedding }; diff --git a/intelligence/index.js b/intelligence/index.js new file mode 100644 index 0000000..7ede601 --- /dev/null +++ b/intelligence/index.js @@ -0,0 +1,62 @@ +const fs = require("fs"); +const path = require("path"); + +const { getArchiveDb, getIntelligenceDb, runMigrations, seedCompanies } = require("./db"); +const { runQueueFeeder } = require("./queueFeeder"); +const { runAugorWorker } = require("./augorWorker"); +const { ensureCompanyEmbeddings } = require("./embeddings"); + + +const configPath = path.resolve(__dirname, "../config.json"); +const configDir = path.dirname(configPath); +const rawConfig = JSON.parse(fs.readFileSync(configPath, "utf8")); + +function resolvePath(p, fallback) { + if (!p) return fallback; + return path.isAbsolute(p) ? p : path.resolve(configDir, p); +} + +const config = { + duriin_db: resolvePath(rawConfig.duriin_db, path.resolve(configDir, "archive.sqlite")), + intelligence_db: resolvePath(rawConfig.intelligence_db, path.resolve(configDir, "intelligence.sqlite")), + llm: rawConfig.llm || {}, + workers: rawConfig.workers || {}, + openRouter: rawConfig.openRouter || {}, + intelligence: rawConfig.intelligence || {}, +}; + +console.log("[intelligence] starting up"); +console.log(`[intelligence] archive: ${config.duriin_db}`); +console.log(`[intelligence] intelligence: ${config.intelligence_db}`); + +const archiveDb = getArchiveDb(config.duriin_db); +const intelligenceDb = getIntelligenceDb(config.intelligence_db); + +runMigrations(intelligenceDb); +seedCompanies(intelligenceDb); + +ensureCompanyEmbeddings(intelligenceDb, config.openRouter).then(() => { + console.log("[intelligence] company embeddings ready"); +}).catch(err => { + console.error("[intelligence] company embedding generation failed:", err.message); +}); + +runQueueFeeder(archiveDb, intelligenceDb, config).catch(err => { + console.error("[feeder] fatal:", err); + process.exit(1); +}); + +runAugorWorker(archiveDb, intelligenceDb, config).catch(err => { + console.error("[augor] fatal:", err); + process.exit(1); +}); + +process.on("SIGINT", () => { + console.log("[intelligence] shutting down"); + process.exit(0); +}); + +process.on("SIGTERM", () => { + console.log("[intelligence] shutting down"); + process.exit(0); +}); diff --git a/intelligence/queueFeeder.js b/intelligence/queueFeeder.js new file mode 100644 index 0000000..8f04975 --- /dev/null +++ b/intelligence/queueFeeder.js @@ -0,0 +1,67 @@ +// Pulls usable articles from duriin into article_queue continuously +// Usable = has content, has embedding, has event assignment + +async function runQueueFeeder(archiveDb, intelligenceDb, config) { + const batchSize = config.workers?.queueFeederBatchSize ?? 100; + const loopDelay = config.workers?.queueFeederLoopDelayMs ?? 3000; + + const getCursor = intelligenceDb.prepare( + "SELECT value FROM cursors WHERE key = 'queue_feeder'" + ); + const setCursor = intelligenceDb.prepare( + "INSERT OR REPLACE INTO cursors (key, value) VALUES ('queue_feeder', ?)" + ); + + const insertQueued = intelligenceDb.prepare(` + INSERT OR IGNORE INTO article_queue (article_id, status, created_at) + VALUES (?, 'pending', CURRENT_TIMESTAMP) + `); + + while (true) { + try { + const cursorRow = getCursor.get(); + const cursor = cursorRow ? cursorRow.value : 0; + + const articles = archiveDb.prepare(` + SELECT id FROM articles + WHERE id > ? + AND content IS NOT NULL + AND content != '' + AND has_embedding = 1 + AND event_id IS NOT NULL + ORDER BY id ASC + LIMIT ? + `).all(cursor, batchSize); + + if (articles.length === 0) { + await sleep(loopDelay); + continue; + } + + let newCursor = cursor; + let inserted = 0; + + for (const a of articles) { + const info = insertQueued.run(a.id); + if (info.changes > 0) inserted++; + if (a.id > newCursor) newCursor = a.id; + } + + setCursor.run(newCursor); + + if (inserted > 0) { + console.log(`[feeder] queued ${inserted} articles, cursor now ${newCursor}`); + } + + } catch (err) { + console.error("[feeder] error:", err.message); + await sleep(loopDelay); + } + } +} + +function sleep(ms) { + return new Promise(r => setTimeout(r, ms)); +} + +module.exports = { runQueueFeeder }; diff --git a/server.js b/server.js index ce1a82e..e43eb89 100644 --- a/server.js +++ b/server.js @@ -5,6 +5,7 @@ const statusRoutes = require('./src/routes/status'); const sourcesRoutes = require('./src/routes/sources'); const eventRoutes = require('./src/routes/events'); const adminRoutes = require('./src/routes/admin'); +const devRoutes = require('./src/routes/dev'); const config = require('./src/config'); const { startScheduler } = require('./src/scheduler'); @@ -16,6 +17,7 @@ app.register(statusRoutes); app.register(sourcesRoutes); app.register(eventRoutes); app.register(adminRoutes); +app.register(devRoutes); app.get('/', async () => ({ ok: true })); diff --git a/src/routes/dev.js b/src/routes/dev.js new file mode 100644 index 0000000..f9fc54f --- /dev/null +++ b/src/routes/dev.js @@ -0,0 +1,27 @@ +const fs = require('fs'); +const path = require('path'); +const config = require('../config'); + + +async function devRoutes(fastify) { + if (!config.dev || !config.dev.enabled) return; + + fastify.get('/dev/db/download', async (req, reply) => { + const dbPath = path.resolve(config.duriin_db || './archive.sqlite'); + + if (!fs.existsSync(dbPath)) { + return reply.code(404).send({ error: 'database file not found' }); + } + + const stat = fs.statSync(dbPath); + const filename = path.basename(dbPath); + + reply.header('Content-Type', 'application/octet-stream'); + reply.header('Content-Disposition', `attachment; filename="${filename}"`); + reply.header('Content-Length', stat.size); + + return reply.send(fs.createReadStream(dbPath)); + }); +} + +module.exports = devRoutes; \ No newline at end of file diff --git a/src/routes/events.js b/src/routes/events.js index b7e0037..35eac30 100644 --- a/src/routes/events.js +++ b/src/routes/events.js @@ -1,4 +1,5 @@ const db = require('../db'); +const { findArticlesByEmbedding, getOrCreateQueryEmbedding } = require('../embeddings'); function parseLimit(value) { const n = Number.parseInt(value, 10); @@ -10,10 +11,88 @@ function parseOffset(value) { return Number.isFinite(n) && n >= 0 ? n : 0; } +const getArticlesForEvent = db.prepare(` + SELECT id, title, description, content, url, normalized_title, source, pub_date, ingested_at + FROM articles + WHERE event_id = ? + AND content IS NOT NULL AND content != '' + AND is_index_page = 0 + ORDER BY pub_date_effective DESC, id DESC +`); + +function fetchEventsByIds(ids) { + if (ids.length === 0) return []; + + const placeholders = ids.map(() => '?').join(', '); + const rows = db.prepare(` + SELECT e.id, e.title, + (SELECT MIN(a.pub_date_effective) FROM articles a WHERE a.event_id = e.id AND a.content IS NOT NULL AND a.content != '' AND a.is_index_page = 0) AS pub_date + FROM events e + WHERE e.id IN (${placeholders}) + `).all(...ids); + + const byId = new Map(rows.map(e => [e.id, e])); + + // preserve the caller-supplied order (distance order for semantic, sort order for keyword) + return ids.map(id => { + const e = byId.get(id); + return e ? { ...e, articles: getArticlesForEvent.all(e.id) } : null; + }).filter(Boolean); +} + async function eventRoutes(fastify) { fastify.get('/events', async (request, reply) => { const query = request.query || {}; + const limit = parseLimit(query.limit); + const offset = parseOffset(query.offset); + // semantic path: embed query → find nearest articles → resolve to events + if (query.semantic !== undefined) { + const embedding = await getOrCreateQueryEmbedding(query.semantic); + if (!embedding) { + reply.code(400); + return { error: 'Semantic query must not be empty' }; + } + + // fetch more article candidates than we need so we have enough after dedup + const neighbors = findArticlesByEmbedding(embedding, Math.min(limit * 20, 2000)); + const neighborIds = neighbors.map(n => n.articleId); + if (neighborIds.length === 0) return []; + + const placeholders = neighborIds.map(() => '?').join(', '); + const conditions = [ + `id IN (${placeholders})`, + 'event_id IS NOT NULL', + "content IS NOT NULL AND content != ''", + 'is_index_page = 0', + ]; + const params = [...neighborIds]; + + if (query.from) { conditions.push('pub_date_effective >= ?'); params.push(query.from); } + if (query.to) { conditions.push('pub_date_effective <= ?'); params.push(query.to); } + + const articleRows = db.prepare( + `SELECT id, event_id FROM articles WHERE ${conditions.join(' AND ')}` + ).all(...params); + + const articleEventMap = new Map(articleRows.map(r => [r.id, r.event_id])); + + // walk neighbors in distance order and collect unique event IDs + const seen = new Set(); + const orderedEventIds = []; + for (const id of neighborIds) { + const eventId = articleEventMap.get(id); + if (eventId != null && !seen.has(eventId)) { + seen.add(eventId); + orderedEventIds.push(eventId); + } + } + + return fetchEventsByIds(orderedEventIds.slice(offset, offset + limit)); + } + + + // keyword / date filter path const conditions = []; const params = []; @@ -27,17 +106,46 @@ async function eventRoutes(fastify) { params.push(id); } - const limit = parseLimit(query.limit); - const offset = parseOffset(query.offset); + if (query.keyword) { + const keywords = [].concat(query.keyword).map(k => k.trim()).filter(Boolean); + const mode = String(query.keyword_mode || '').toLowerCase() === 'or' ? 'OR' : 'AND'; + const clauses = keywords.map(() => '(a.title LIKE ? OR a.description LIKE ? OR a.content LIKE ?)'); + + conditions.push(`EXISTS ( + SELECT 1 FROM articles a + WHERE a.event_id = e.id + AND a.content IS NOT NULL AND a.content != '' + AND a.is_index_page = 0 + AND (${clauses.join(` ${mode} `)}) + )`); + + for (const kw of keywords) { + const like = `%${kw}%`; + params.push(like, like, like); + } + } + + if (query.from) { + conditions.push(`EXISTS ( + SELECT 1 FROM articles a WHERE a.event_id = e.id AND a.pub_date_effective >= ? + )`); + params.push(query.from); + } + + if (query.to) { + conditions.push(`EXISTS ( + SELECT 1 FROM articles a WHERE a.event_id = e.id AND a.pub_date_effective <= ? + )`); + params.push(query.to); + } const SORT_COLUMNS = { - pub_date: '(SELECT MIN(a.pub_date_effective) FROM articles a WHERE a.event_id = e.id AND a.content IS NOT NULL AND a.content != \'\' AND a.is_index_page = 0)', + pub_date: "(SELECT MIN(a.pub_date_effective) FROM articles a WHERE a.event_id = e.id AND a.content IS NOT NULL AND a.content != '' AND a.is_index_page = 0)", id: 'e.id', }; const sortBy = SORT_COLUMNS[query.sort_by] || SORT_COLUMNS.pub_date; const order = String(query.order || '').toLowerCase() === 'asc' ? 'ASC' : 'DESC'; - const where = conditions.length ? `WHERE ${conditions.join(' AND ')}` : ''; const events = db.prepare(` @@ -49,16 +157,7 @@ async function eventRoutes(fastify) { LIMIT ? OFFSET ? `).all(...params, limit, offset); - const getArticles = db.prepare(` - SELECT id, title, description, content, url, normalized_title, source, pub_date, ingested_at - FROM articles - WHERE event_id = ? - AND content IS NOT NULL AND content != '' - AND is_index_page = 0 - ORDER BY pub_date_effective DESC, id DESC - `); - - return events.map(e => ({ ...e, articles: getArticles.all(e.id) })); + return events.map(e => ({ ...e, articles: getArticlesForEvent.all(e.id) })); }); }