add event_date column to event_knowledge and event_predictions tables; update related logic in admin panel and augorWorker

This commit is contained in:
ImBenji
2026-04-23 13:13:28 +01:00
parent d972569e54
commit 29c6cdc6c3
8 changed files with 400 additions and 17 deletions
+17 -8
View File
@@ -32,12 +32,18 @@ async function runAugorWorker(archiveDb, intelligenceDb, config) {
);
const insertKnowledge = intelligenceDb.prepare(`
INSERT INTO event_knowledge (event_id, company_id, type, data)
VALUES (?, ?, ?, ?)
INSERT INTO event_knowledge (event_id, company_id, type, data, event_date)
VALUES (?, ?, ?, ?, ?)
`);
const insertPrediction = intelligenceDb.prepare(`
INSERT INTO event_predictions (event_id, company_id, type, direction, magnitude, timeframe, rationale)
VALUES (?, ?, ?, ?, ?, ?, ?)
INSERT INTO event_predictions (event_id, company_id, type, direction, magnitude, timeframe, rationale, event_date)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
`);
const getEventDate = archiveDb.prepare(`
SELECT pub_date_effective FROM articles
WHERE event_id = ? AND pub_date_effective IS NOT NULL
ORDER BY pub_date_effective ASC LIMIT 1
`);
while (true) {
@@ -91,6 +97,9 @@ async function runAugorWorker(archiveDb, intelligenceDb, config) {
deleteKnowledge.run(eventId);
deletePredictions.run(eventId);
const eventDateRow = getEventDate.get(eventId);
const eventDate = eventDateRow ? eventDateRow.pub_date_effective : null;
const articleText = eventArticles.map((a, i) => {
const body = (a.content || a.description || "").slice(0, 2000);
return `[Article ${i + 1}] ${a.title}\n${body}`;
@@ -103,17 +112,17 @@ async function runAugorWorker(archiveDb, intelligenceDb, config) {
if (result) {
const writeAll = intelligenceDb.transaction(() => {
for (const r of (result.knowledge?.relationships || [])) {
insertKnowledge.run(eventId, company.id, "relationship", JSON.stringify(r));
insertKnowledge.run(eventId, company.id, "relationship", JSON.stringify(r), eventDate);
}
for (const t of (result.knowledge?.themes || [])) {
insertKnowledge.run(eventId, company.id, "theme", JSON.stringify(t));
insertKnowledge.run(eventId, company.id, "theme", JSON.stringify(t), eventDate);
}
for (const f of (result.knowledge?.factors || [])) {
insertKnowledge.run(eventId, company.id, "factor", JSON.stringify(f));
insertKnowledge.run(eventId, company.id, "factor", JSON.stringify(f), eventDate);
}
for (const p of (result.predictions || [])) {
insertPrediction.run(eventId, company.id, p.type, p.direction, p.magnitude, p.timeframe, p.rationale);
insertPrediction.run(eventId, company.id, p.type, p.direction, p.magnitude, p.timeframe, p.rationale, eventDate);
}
});
+273
View File
@@ -0,0 +1,273 @@
// consolidationWorker.js
// reads from event_knowledge (never writes to it) and maintains company_facts
// runs on a slow loop — one full pass over all tracked companies per cycle
const https = require("https");
const http = require("http");
async function runConsolidationWorker(archiveDb, intelligenceDb, config) {
const loopDelay = config.workers?.consolidationLoopDelayMs ?? 60000;
const llmConfig = config.openRouter || {};
const getCompanies = intelligenceDb.prepare("SELECT * FROM tracked_companies");
const getKnowledge = intelligenceDb.prepare(`
SELECT * FROM event_knowledge WHERE company_id = ? ORDER BY event_date ASC
`);
const getExistingFirstSeen = intelligenceDb.prepare(`
SELECT claim, first_seen_at FROM company_facts WHERE company_id = ?
`);
const deleteCompanyFacts = intelligenceDb.prepare(
"DELETE FROM company_facts WHERE company_id = ?"
);
const insertFact = intelligenceDb.prepare(`
INSERT INTO company_facts
(company_id, type, claim, confidence, confirmation_count, first_seen_at, last_seen_at, last_event_id, supporting_event_ids)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
`);
// stale = seen only once and not reinforced in 90 days
const deleteStaleFacts = intelligenceDb.prepare(`
DELETE FROM company_facts
WHERE confirmation_count = 1
AND last_seen_at < datetime('now', '-90 days')
`);
while (true) {
try {
const companies = getCompanies.all();
for (const company of companies) {
try {
await processCompany(
company, intelligenceDb, llmConfig,
getKnowledge, getExistingFirstSeen, deleteCompanyFacts, insertFact
);
} catch (err) {
console.error(`[consolidation] error on ${company.name}:`, err.message);
}
}
const staleResult = deleteStaleFacts.run();
if (staleResult.changes > 0) {
console.log(`[consolidation] pruned ${staleResult.changes} stale facts`);
}
console.log("[consolidation] cycle complete");
} catch (err) {
console.error("[consolidation] cycle error:", err.message);
}
await sleep(loopDelay);
}
}
async function processCompany(company, intelligenceDb, llmConfig, getKnowledge, getExistingFirstSeen, deleteCompanyFacts, insertFact) {
const rows = getKnowledge.all(company.id);
if (rows.length === 0) return;
const rawClaims = [];
for (const row of rows) {
let data;
try { data = JSON.parse(row.data); } catch (_) { continue; }
const text = extractClaimText(row.type, data);
if (!text) continue;
rawClaims.push({
text,
type: row.type,
eventId: row.event_id,
eventDate: row.event_date || row.created_at,
});
}
if (rawClaims.length === 0) return;
let groups;
try {
groups = await normalizeClaims(company.name, rawClaims, llmConfig);
} catch (err) {
console.error(`[consolidation] LLM failed for ${company.name}:`, err.message);
return;
}
if (!groups || groups.length === 0) return;
// save first_seen_at before we wipe the rows
const priorFirstSeen = {};
for (const f of getExistingFirstSeen.all(company.id)) {
priorFirstSeen[f.claim] = f.first_seen_at;
}
const writeAll = intelligenceDb.transaction(() => {
deleteCompanyFacts.run(company.id);
for (const group of groups) {
const { canonical, type, eventIds, dates } = group;
const distinctEventIds = [...new Set(eventIds)];
const count = distinctEventIds.size || distinctEventIds.length;
const sortedDates = dates.filter(Boolean).sort();
const firstSeen = priorFirstSeen[canonical] || sortedDates[0] || new Date().toISOString();
const lastSeen = sortedDates[sortedDates.length - 1] || firstSeen;
const lastEventId = eventIds[eventIds.length - 1];
const confidence = countToConfidence(count);
insertFact.run(
company.id,
type,
canonical,
confidence,
count,
firstSeen,
lastSeen,
lastEventId,
JSON.stringify(distinctEventIds)
);
}
});
writeAll();
console.log(`[consolidation] ${company.name}: ${groups.length} distinct facts`);
}
function extractClaimText(type, data) {
if (type === "relationship") {
if (!data.entity) return null;
return `${data.entity} is a ${data.type || "partner"}`;
}
if (type === "theme") {
if (!data.theme) return null;
return `${data.theme} is ${data.direction || "relevant"}`;
}
if (type === "factor") {
if (!data.factor) return null;
return `${data.factor}: ${data.relationship || "relevant"}`;
}
return null;
}
function countToConfidence(count) {
if (count >= 8) return "very_high";
if (count >= 4) return "high";
if (count >= 2) return "medium";
return "low";
}
async function normalizeClaims(companyName, rawClaims, llmConfig) {
// cap to keep the prompt manageable
const capped = rawClaims.slice(0, 120);
const lines = capped.map((c, i) => `${i + 1}. [${c.type}] ${c.text}`).join("\n");
const prompt = `You are normalizing knowledge claims about ${companyName}.
Group semantically equivalent claims and provide a canonical short form for each group (lowercase, no punctuation).
Claims:
${lines}
Return JSON only — no explanation. Shape:
{
"groups": [
{ "canonical": "canonical claim text", "type": "relationship|theme|factor", "indices": [1, 2] }
]
}
Rules:
- canonical must be lowercase with no punctuation marks
- merge claims that mean the same thing even if worded differently
- each index appears in exactly one group
- pick the type that best fits the group`;
const body = JSON.stringify({
model: llmConfig.llmModel || llmConfig.model,
messages: [{ role: "user", content: prompt }],
temperature: 0.1,
});
const url = new URL("https://openrouter.ai/api/v1/chat/completions");
const responseText = await httpPost(url, body, {
"Content-Type": "application/json",
"Authorization": `Bearer ${llmConfig.apiKey || ""}`,
});
let parsed;
try {
parsed = JSON.parse(responseText);
} catch (_) {
throw new Error(`LLM response not JSON: ${responseText.slice(0, 200)}`);
}
const content = parsed.choices?.[0]?.message?.content;
if (!content) return [];
const stripped = content.replace(/^```(?:json)?\s*/i, "").replace(/\s*```$/, "").trim();
const result = JSON.parse(stripped);
const groups = [];
for (const g of (result.groups || [])) {
const indices = (g.indices || [])
.map(i => i - 1)
.filter(i => i >= 0 && i < capped.length);
if (indices.length === 0) continue;
const eventIds = indices.map(i => capped[i].eventId);
const dates = indices.map(i => capped[i].eventDate);
const type = g.type || capped[indices[0]].type;
groups.push({ canonical: g.canonical, type, eventIds, dates });
}
return groups;
}
function httpPost(url, body, headers) {
return new Promise((resolve, reject) => {
const lib = url.protocol === "https:" ? https : http;
const req = lib.request({
hostname: url.hostname,
port: url.port || (url.protocol === "https:" ? 443 : 80),
path: url.pathname + url.search,
method: "POST",
headers: { ...headers, "Content-Length": Buffer.byteLength(body) },
}, (res) => {
let data = "";
res.on("data", chunk => data += chunk);
res.on("end", () => {
if (res.statusCode >= 200 && res.statusCode < 300) resolve(data);
else reject(new Error(`LLM ${res.statusCode}: ${data.slice(0, 300)}`));
});
});
req.on("error", reject);
req.write(body);
req.end();
});
}
function sleep(ms) {
return new Promise(r => setTimeout(r, ms));
}
module.exports = { runConsolidationWorker };
+24 -1
View File
@@ -68,11 +68,34 @@ function runMigrations(db) {
magnitude TEXT,
timeframe TEXT,
rationale TEXT,
event_date TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS company_facts (
id INTEGER PRIMARY KEY,
company_id INTEGER,
type TEXT,
claim TEXT,
confidence TEXT,
confirmation_count INTEGER DEFAULT 1,
first_seen_at DATETIME,
last_seen_at DATETIME,
last_event_id INTEGER,
supporting_event_ids TEXT
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_company_facts_unique ON company_facts (company_id, claim);
`);
}
function runColumnMigrations(db) {
try { db.exec("ALTER TABLE event_predictions ADD COLUMN event_date TEXT"); } catch (_) {}
try { db.exec("ALTER TABLE event_knowledge ADD COLUMN event_date TEXT"); } catch (_) {}
}
function seedCompanies(db) {
const count = db.prepare("SELECT COUNT(*) as c FROM tracked_companies").get().c;
if (count > 0) return;
@@ -138,4 +161,4 @@ function seedCompanies(db) {
console.log(`[db] seeded ${companies.length} tracked companies`);
}
module.exports = { getArchiveDb, getIntelligenceDb, runMigrations, seedCompanies };
module.exports = { getArchiveDb, getIntelligenceDb, runMigrations, runColumnMigrations, seedCompanies };
+8 -1
View File
@@ -1,10 +1,11 @@
const fs = require("fs");
const path = require("path");
const { getArchiveDb, getIntelligenceDb, runMigrations, seedCompanies } = require("./db");
const { getArchiveDb, getIntelligenceDb, runMigrations, runColumnMigrations, seedCompanies } = require("./db");
const { runQueueFeeder } = require("./queueFeeder");
const { runAugorWorker } = require("./augorWorker");
const { ensureCompanyEmbeddings } = require("./embeddings");
const { runConsolidationWorker } = require("./consolidationWorker");
const configPath = path.resolve(__dirname, "../config.json");
@@ -33,6 +34,7 @@ const archiveDb = getArchiveDb(config.duriin_db);
const intelligenceDb = getIntelligenceDb(config.intelligence_db);
runMigrations(intelligenceDb);
runColumnMigrations(intelligenceDb);
seedCompanies(intelligenceDb);
ensureCompanyEmbeddings(intelligenceDb, config.openRouter).then(() => {
@@ -51,6 +53,11 @@ runAugorWorker(archiveDb, intelligenceDb, config).catch(err => {
process.exit(1);
});
runConsolidationWorker(archiveDb, intelligenceDb, config).catch(err => {
console.error("[consolidation] fatal:", err);
process.exit(1);
});
process.on("SIGINT", () => {
console.log("[intelligence] shutting down");
process.exit(0);