diff --git a/README.md b/README.md index d1c2df8..a652621 100644 --- a/README.md +++ b/README.md @@ -186,6 +186,56 @@ Use `domains[].policy` to diagnose why a source has high `skipped` or `failed` c - `normalized_title` is stored for deduplication and indexing. - `source` format is `: + @@ -781,13 +787,17 @@ async function loadIntelligence() { if (view === 'knowledge') { document.getElementById('i-type').parentElement.style.display = ''; + document.getElementById('i-sort-wrap').style.display = ''; + + const sort = document.getElementById('i-sort').value; + if (sort) params.set('sort', sort); if (type) params.set('type', type); const data = await api(`/admin/api/intelligence/knowledge?${params}`); intelRows = data.rows; document.getElementById('intel-thead').innerHTML = ` - IDCompanyEventTypeDataCreated`; + IDCompanyEventTypeDataEvent date`; document.getElementById('intel-tbody').innerHTML = data.rows.map(r => { let parsed = {}; @@ -799,7 +809,7 @@ async function loadIntelligence() { ${r.event_id} ${r.type} ${summary} - ${r.created_at ? r.created_at.slice(0,16) : '—'} + ${r.event_date ? r.event_date.slice(0,10) : '—'} `; }).join(''); @@ -811,12 +821,16 @@ async function loadIntelligence() { } else { document.getElementById('i-type').parentElement.style.display = 'none'; + document.getElementById('i-sort-wrap').style.display = ''; + + const sort = document.getElementById('i-sort').value; + if (sort) params.set('sort', sort); const data = await api(`/admin/api/intelligence/predictions?${params}`); intelRows = data.rows; document.getElementById('intel-thead').innerHTML = ` - IDCompanyEventTypeDirectionMagnitudeTimeframeRationaleCreated`; + IDCompanyEventTypeDirectionMagnitudeTimeframeRationaleEvent date`; document.getElementById('intel-tbody').innerHTML = data.rows.map(r => ` @@ -828,7 +842,7 @@ async function loadIntelligence() { ${r.magnitude || '—'} ${r.timeframe || '—'} ${r.rationale || '—'} - ${r.created_at ? r.created_at.slice(0,16) : '—'} + ${r.event_date ? r.event_date.slice(0,10) : '—'} `).join(''); diff --git a/config.json b/config.json index 5f3d777..8f7de25 100644 --- a/config.json +++ b/config.json @@ -70,6 +70,9 @@ "browser": { "maxConcurrentPages": 8 }, + "intelligence": { + "similarityThreshold": 0.5 + }, "dev": { "enabled": true }, diff --git a/intelligence/augorWorker.js b/intelligence/augorWorker.js index a1fe6b8..92eaea4 100644 --- a/intelligence/augorWorker.js +++ b/intelligence/augorWorker.js @@ -32,12 +32,18 @@ async function runAugorWorker(archiveDb, intelligenceDb, config) { ); const insertKnowledge = intelligenceDb.prepare(` - INSERT INTO event_knowledge (event_id, company_id, type, data) - VALUES (?, ?, ?, ?) + INSERT INTO event_knowledge (event_id, company_id, type, data, event_date) + VALUES (?, ?, ?, ?, ?) `); const insertPrediction = intelligenceDb.prepare(` - INSERT INTO event_predictions (event_id, company_id, type, direction, magnitude, timeframe, rationale) - VALUES (?, ?, ?, ?, ?, ?, ?) + INSERT INTO event_predictions (event_id, company_id, type, direction, magnitude, timeframe, rationale, event_date) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + `); + + const getEventDate = archiveDb.prepare(` + SELECT pub_date_effective FROM articles + WHERE event_id = ? AND pub_date_effective IS NOT NULL + ORDER BY pub_date_effective ASC LIMIT 1 `); while (true) { @@ -91,6 +97,9 @@ async function runAugorWorker(archiveDb, intelligenceDb, config) { deleteKnowledge.run(eventId); deletePredictions.run(eventId); + const eventDateRow = getEventDate.get(eventId); + const eventDate = eventDateRow ? eventDateRow.pub_date_effective : null; + const articleText = eventArticles.map((a, i) => { const body = (a.content || a.description || "").slice(0, 2000); return `[Article ${i + 1}] ${a.title}\n${body}`; @@ -103,17 +112,17 @@ async function runAugorWorker(archiveDb, intelligenceDb, config) { if (result) { const writeAll = intelligenceDb.transaction(() => { for (const r of (result.knowledge?.relationships || [])) { - insertKnowledge.run(eventId, company.id, "relationship", JSON.stringify(r)); + insertKnowledge.run(eventId, company.id, "relationship", JSON.stringify(r), eventDate); } for (const t of (result.knowledge?.themes || [])) { - insertKnowledge.run(eventId, company.id, "theme", JSON.stringify(t)); + insertKnowledge.run(eventId, company.id, "theme", JSON.stringify(t), eventDate); } for (const f of (result.knowledge?.factors || [])) { - insertKnowledge.run(eventId, company.id, "factor", JSON.stringify(f)); + insertKnowledge.run(eventId, company.id, "factor", JSON.stringify(f), eventDate); } for (const p of (result.predictions || [])) { - insertPrediction.run(eventId, company.id, p.type, p.direction, p.magnitude, p.timeframe, p.rationale); + insertPrediction.run(eventId, company.id, p.type, p.direction, p.magnitude, p.timeframe, p.rationale, eventDate); } }); diff --git a/intelligence/consolidationWorker.js b/intelligence/consolidationWorker.js new file mode 100644 index 0000000..e8fa2e0 --- /dev/null +++ b/intelligence/consolidationWorker.js @@ -0,0 +1,273 @@ +// consolidationWorker.js +// reads from event_knowledge (never writes to it) and maintains company_facts +// runs on a slow loop — one full pass over all tracked companies per cycle + +const https = require("https"); +const http = require("http"); + + +async function runConsolidationWorker(archiveDb, intelligenceDb, config) { + const loopDelay = config.workers?.consolidationLoopDelayMs ?? 60000; + const llmConfig = config.openRouter || {}; + + const getCompanies = intelligenceDb.prepare("SELECT * FROM tracked_companies"); + + const getKnowledge = intelligenceDb.prepare(` + SELECT * FROM event_knowledge WHERE company_id = ? ORDER BY event_date ASC + `); + + const getExistingFirstSeen = intelligenceDb.prepare(` + SELECT claim, first_seen_at FROM company_facts WHERE company_id = ? + `); + + const deleteCompanyFacts = intelligenceDb.prepare( + "DELETE FROM company_facts WHERE company_id = ?" + ); + + const insertFact = intelligenceDb.prepare(` + INSERT INTO company_facts + (company_id, type, claim, confidence, confirmation_count, first_seen_at, last_seen_at, last_event_id, supporting_event_ids) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + `); + + // stale = seen only once and not reinforced in 90 days + const deleteStaleFacts = intelligenceDb.prepare(` + DELETE FROM company_facts + WHERE confirmation_count = 1 + AND last_seen_at < datetime('now', '-90 days') + `); + + while (true) { + try { + const companies = getCompanies.all(); + + for (const company of companies) { + try { + await processCompany( + company, intelligenceDb, llmConfig, + getKnowledge, getExistingFirstSeen, deleteCompanyFacts, insertFact + ); + } catch (err) { + console.error(`[consolidation] error on ${company.name}:`, err.message); + } + } + + const staleResult = deleteStaleFacts.run(); + if (staleResult.changes > 0) { + console.log(`[consolidation] pruned ${staleResult.changes} stale facts`); + } + + console.log("[consolidation] cycle complete"); + + } catch (err) { + console.error("[consolidation] cycle error:", err.message); + } + + await sleep(loopDelay); + } +} + + +async function processCompany(company, intelligenceDb, llmConfig, getKnowledge, getExistingFirstSeen, deleteCompanyFacts, insertFact) { + const rows = getKnowledge.all(company.id); + if (rows.length === 0) return; + + const rawClaims = []; + + for (const row of rows) { + let data; + try { data = JSON.parse(row.data); } catch (_) { continue; } + + const text = extractClaimText(row.type, data); + if (!text) continue; + + rawClaims.push({ + text, + type: row.type, + eventId: row.event_id, + eventDate: row.event_date || row.created_at, + }); + } + + if (rawClaims.length === 0) return; + + let groups; + try { + groups = await normalizeClaims(company.name, rawClaims, llmConfig); + } catch (err) { + console.error(`[consolidation] LLM failed for ${company.name}:`, err.message); + return; + } + + if (!groups || groups.length === 0) return; + + // save first_seen_at before we wipe the rows + const priorFirstSeen = {}; + for (const f of getExistingFirstSeen.all(company.id)) { + priorFirstSeen[f.claim] = f.first_seen_at; + } + + const writeAll = intelligenceDb.transaction(() => { + deleteCompanyFacts.run(company.id); + + for (const group of groups) { + const { canonical, type, eventIds, dates } = group; + + const distinctEventIds = [...new Set(eventIds)]; + const count = distinctEventIds.size || distinctEventIds.length; + + const sortedDates = dates.filter(Boolean).sort(); + const firstSeen = priorFirstSeen[canonical] || sortedDates[0] || new Date().toISOString(); + const lastSeen = sortedDates[sortedDates.length - 1] || firstSeen; + + const lastEventId = eventIds[eventIds.length - 1]; + const confidence = countToConfidence(count); + + insertFact.run( + company.id, + type, + canonical, + confidence, + count, + firstSeen, + lastSeen, + lastEventId, + JSON.stringify(distinctEventIds) + ); + } + }); + + writeAll(); + console.log(`[consolidation] ${company.name}: ${groups.length} distinct facts`); +} + + +function extractClaimText(type, data) { + if (type === "relationship") { + if (!data.entity) return null; + return `${data.entity} is a ${data.type || "partner"}`; + } + + if (type === "theme") { + if (!data.theme) return null; + return `${data.theme} is ${data.direction || "relevant"}`; + } + + if (type === "factor") { + if (!data.factor) return null; + return `${data.factor}: ${data.relationship || "relevant"}`; + } + + return null; +} + + +function countToConfidence(count) { + if (count >= 8) return "very_high"; + if (count >= 4) return "high"; + if (count >= 2) return "medium"; + return "low"; +} + + +async function normalizeClaims(companyName, rawClaims, llmConfig) { + // cap to keep the prompt manageable + const capped = rawClaims.slice(0, 120); + + const lines = capped.map((c, i) => `${i + 1}. [${c.type}] ${c.text}`).join("\n"); + + const prompt = `You are normalizing knowledge claims about ${companyName}. + +Group semantically equivalent claims and provide a canonical short form for each group (lowercase, no punctuation). + +Claims: +${lines} + +Return JSON only — no explanation. Shape: +{ + "groups": [ + { "canonical": "canonical claim text", "type": "relationship|theme|factor", "indices": [1, 2] } + ] +} + +Rules: +- canonical must be lowercase with no punctuation marks +- merge claims that mean the same thing even if worded differently +- each index appears in exactly one group +- pick the type that best fits the group`; + + const body = JSON.stringify({ + model: llmConfig.llmModel || llmConfig.model, + messages: [{ role: "user", content: prompt }], + temperature: 0.1, + }); + + const url = new URL("https://openrouter.ai/api/v1/chat/completions"); + const responseText = await httpPost(url, body, { + "Content-Type": "application/json", + "Authorization": `Bearer ${llmConfig.apiKey || ""}`, + }); + + let parsed; + try { + parsed = JSON.parse(responseText); + } catch (_) { + throw new Error(`LLM response not JSON: ${responseText.slice(0, 200)}`); + } + + const content = parsed.choices?.[0]?.message?.content; + if (!content) return []; + + const stripped = content.replace(/^```(?:json)?\s*/i, "").replace(/\s*```$/, "").trim(); + const result = JSON.parse(stripped); + + const groups = []; + + for (const g of (result.groups || [])) { + const indices = (g.indices || []) + .map(i => i - 1) + .filter(i => i >= 0 && i < capped.length); + + if (indices.length === 0) continue; + + const eventIds = indices.map(i => capped[i].eventId); + const dates = indices.map(i => capped[i].eventDate); + const type = g.type || capped[indices[0]].type; + + groups.push({ canonical: g.canonical, type, eventIds, dates }); + } + + return groups; +} + + +function httpPost(url, body, headers) { + return new Promise((resolve, reject) => { + const lib = url.protocol === "https:" ? https : http; + + const req = lib.request({ + hostname: url.hostname, + port: url.port || (url.protocol === "https:" ? 443 : 80), + path: url.pathname + url.search, + method: "POST", + headers: { ...headers, "Content-Length": Buffer.byteLength(body) }, + }, (res) => { + let data = ""; + res.on("data", chunk => data += chunk); + res.on("end", () => { + if (res.statusCode >= 200 && res.statusCode < 300) resolve(data); + else reject(new Error(`LLM ${res.statusCode}: ${data.slice(0, 300)}`)); + }); + }); + + req.on("error", reject); + req.write(body); + req.end(); + }); +} + +function sleep(ms) { + return new Promise(r => setTimeout(r, ms)); +} + +module.exports = { runConsolidationWorker }; diff --git a/intelligence/db.js b/intelligence/db.js index 2e63ebd..2efc998 100644 --- a/intelligence/db.js +++ b/intelligence/db.js @@ -68,11 +68,34 @@ function runMigrations(db) { magnitude TEXT, timeframe TEXT, rationale TEXT, + event_date TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); + + CREATE TABLE IF NOT EXISTS company_facts ( + id INTEGER PRIMARY KEY, + company_id INTEGER, + type TEXT, + claim TEXT, + confidence TEXT, + confirmation_count INTEGER DEFAULT 1, + first_seen_at DATETIME, + last_seen_at DATETIME, + last_event_id INTEGER, + supporting_event_ids TEXT + ); + + CREATE UNIQUE INDEX IF NOT EXISTS idx_company_facts_unique ON company_facts (company_id, claim); + `); } +function runColumnMigrations(db) { + try { db.exec("ALTER TABLE event_predictions ADD COLUMN event_date TEXT"); } catch (_) {} + try { db.exec("ALTER TABLE event_knowledge ADD COLUMN event_date TEXT"); } catch (_) {} + +} + function seedCompanies(db) { const count = db.prepare("SELECT COUNT(*) as c FROM tracked_companies").get().c; if (count > 0) return; @@ -138,4 +161,4 @@ function seedCompanies(db) { console.log(`[db] seeded ${companies.length} tracked companies`); } -module.exports = { getArchiveDb, getIntelligenceDb, runMigrations, seedCompanies }; +module.exports = { getArchiveDb, getIntelligenceDb, runMigrations, runColumnMigrations, seedCompanies }; diff --git a/intelligence/index.js b/intelligence/index.js index d53055c..ab50754 100644 --- a/intelligence/index.js +++ b/intelligence/index.js @@ -1,10 +1,11 @@ const fs = require("fs"); const path = require("path"); -const { getArchiveDb, getIntelligenceDb, runMigrations, seedCompanies } = require("./db"); +const { getArchiveDb, getIntelligenceDb, runMigrations, runColumnMigrations, seedCompanies } = require("./db"); const { runQueueFeeder } = require("./queueFeeder"); const { runAugorWorker } = require("./augorWorker"); const { ensureCompanyEmbeddings } = require("./embeddings"); +const { runConsolidationWorker } = require("./consolidationWorker"); const configPath = path.resolve(__dirname, "../config.json"); @@ -33,6 +34,7 @@ const archiveDb = getArchiveDb(config.duriin_db); const intelligenceDb = getIntelligenceDb(config.intelligence_db); runMigrations(intelligenceDb); +runColumnMigrations(intelligenceDb); seedCompanies(intelligenceDb); ensureCompanyEmbeddings(intelligenceDb, config.openRouter).then(() => { @@ -51,6 +53,11 @@ runAugorWorker(archiveDb, intelligenceDb, config).catch(err => { process.exit(1); }); +runConsolidationWorker(archiveDb, intelligenceDb, config).catch(err => { + console.error("[consolidation] fatal:", err); + process.exit(1); +}); + process.on("SIGINT", () => { console.log("[intelligence] shutting down"); process.exit(0); diff --git a/src/routes/admin.js b/src/routes/admin.js index fb0bba7..c5f324f 100644 --- a/src/routes/admin.js +++ b/src/routes/admin.js @@ -261,14 +261,16 @@ async function adminRoutes(fastify) { if (type) { conditions.push('ek.type = ?'); params.push(type); } const where = conditions.length ? `WHERE ${conditions.join(' AND ')}` : ''; + const sortCol = q.sort === 'event_date' ? 'ek.event_date' : 'ek.id'; + const total = db.prepare(`SELECT COUNT(*) as n FROM event_knowledge ek ${where}`).get(...params).n; const rows = db.prepare(` - SELECT ek.id, ek.event_id, ek.type, ek.data, ek.created_at, + SELECT ek.id, ek.event_id, ek.type, ek.data, ek.event_date, ek.created_at, tc.name as company_name FROM event_knowledge ek JOIN tracked_companies tc ON tc.id = ek.company_id ${where} - ORDER BY ek.id DESC + ORDER BY ${sortCol} DESC LIMIT ? OFFSET ? `).all(...params, limit, offset); @@ -290,13 +292,15 @@ async function adminRoutes(fastify) { if (companyId) { conditions.push('ep.company_id = ?'); params.push(companyId); } const where = conditions.length ? `WHERE ${conditions.join(' AND ')}` : ''; + const sortCol = q.sort === 'event_date' ? 'ep.event_date' : 'ep.id'; + const total = db.prepare(`SELECT COUNT(*) as n FROM event_predictions ep ${where}`).get(...params).n; const rows = db.prepare(` SELECT ep.*, tc.name as company_name FROM event_predictions ep JOIN tracked_companies tc ON tc.id = ep.company_id ${where} - ORDER BY ep.id DESC + ORDER BY ${sortCol} DESC LIMIT ? OFFSET ? `).all(...params, limit, offset);