Duriin-API/intelligence/consolidationWorker.js

273 lines
7.7 KiB
JavaScript

// consolidationWorker.js
// reads from event_knowledge (never writes to it) and maintains company_facts
// runs on a slow loop — one full pass over all tracked companies per cycle
const https = require("https");
const http = require("http");
async function runConsolidationWorker(archiveDb, intelligenceDb, config) {
const loopDelay = config.workers?.consolidationLoopDelayMs ?? 60000;
const llmConfig = config.openRouter || {};
const getCompanies = intelligenceDb.prepare("SELECT * FROM tracked_companies");
const getKnowledge = intelligenceDb.prepare(`
SELECT * FROM event_knowledge WHERE company_id = ? ORDER BY event_date ASC
`);
const getExistingFirstSeen = intelligenceDb.prepare(`
SELECT claim, first_seen_at FROM company_facts WHERE company_id = ?
`);
const deleteCompanyFacts = intelligenceDb.prepare(
"DELETE FROM company_facts WHERE company_id = ?"
);
const insertFact = intelligenceDb.prepare(`
INSERT INTO company_facts
(company_id, type, claim, confidence, confirmation_count, first_seen_at, last_seen_at, last_event_id, supporting_event_ids)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
`);
// stale = seen only once and not reinforced in 90 days
const deleteStaleFacts = intelligenceDb.prepare(`
DELETE FROM company_facts
WHERE confirmation_count = 1
AND last_seen_at < datetime('now', '-90 days')
`);
while (true) {
try {
const companies = getCompanies.all();
for (const company of companies) {
try {
await processCompany(
company, intelligenceDb, llmConfig,
getKnowledge, getExistingFirstSeen, deleteCompanyFacts, insertFact
);
} catch (err) {
console.error(`[consolidation] error on ${company.name}:`, err.message);
}
}
const staleResult = deleteStaleFacts.run();
if (staleResult.changes > 0) {
console.log(`[consolidation] pruned ${staleResult.changes} stale facts`);
}
console.log("[consolidation] cycle complete");
} catch (err) {
console.error("[consolidation] cycle error:", err.message);
}
await sleep(loopDelay);
}
}
async function processCompany(company, intelligenceDb, llmConfig, getKnowledge, getExistingFirstSeen, deleteCompanyFacts, insertFact) {
const rows = getKnowledge.all(company.id);
if (rows.length === 0) return;
const rawClaims = [];
for (const row of rows) {
let data;
try { data = JSON.parse(row.data); } catch (_) { continue; }
const text = extractClaimText(row.type, data);
if (!text) continue;
rawClaims.push({
text,
type: row.type,
eventId: row.event_id,
eventDate: row.event_date || row.created_at,
});
}
if (rawClaims.length === 0) return;
let groups;
try {
groups = await normalizeClaims(company.name, rawClaims, llmConfig);
} catch (err) {
console.error(`[consolidation] LLM failed for ${company.name}:`, err.message);
return;
}
if (!groups || groups.length === 0) return;
// save first_seen_at before we wipe the rows
const priorFirstSeen = {};
for (const f of getExistingFirstSeen.all(company.id)) {
priorFirstSeen[f.claim] = f.first_seen_at;
}
const writeAll = intelligenceDb.transaction(() => {
deleteCompanyFacts.run(company.id);
for (const group of groups) {
const { canonical, type, eventIds, dates } = group;
const distinctEventIds = [...new Set(eventIds)];
const count = distinctEventIds.size || distinctEventIds.length;
const sortedDates = dates.filter(Boolean).sort();
const firstSeen = priorFirstSeen[canonical] || sortedDates[0] || new Date().toISOString();
const lastSeen = sortedDates[sortedDates.length - 1] || firstSeen;
const lastEventId = eventIds[eventIds.length - 1];
const confidence = countToConfidence(count);
insertFact.run(
company.id,
type,
canonical,
confidence,
count,
firstSeen,
lastSeen,
lastEventId,
JSON.stringify(distinctEventIds)
);
}
});
writeAll();
console.log(`[consolidation] ${company.name}: ${groups.length} distinct facts`);
}
function extractClaimText(type, data) {
if (type === "relationship") {
if (!data.entity) return null;
return `${data.entity} is a ${data.type || "partner"}`;
}
if (type === "theme") {
if (!data.theme) return null;
return `${data.theme} is ${data.direction || "relevant"}`;
}
if (type === "factor") {
if (!data.factor) return null;
return `${data.factor}: ${data.relationship || "relevant"}`;
}
return null;
}
function countToConfidence(count) {
if (count >= 8) return "very_high";
if (count >= 4) return "high";
if (count >= 2) return "medium";
return "low";
}
async function normalizeClaims(companyName, rawClaims, llmConfig) {
// cap to keep the prompt manageable
const capped = rawClaims.slice(0, 120);
const lines = capped.map((c, i) => `${i + 1}. [${c.type}] ${c.text}`).join("\n");
const prompt = `You are normalizing knowledge claims about ${companyName}. Always respond in English regardless of the language of the input articles.
Group semantically equivalent claims and provide a canonical short form for each group (lowercase, no punctuation).
Claims:
${lines}
Return JSON only — no explanation. Shape:
{
"groups": [
{ "canonical": "canonical claim text", "type": "relationship|theme|factor", "indices": [1, 2] }
]
}
Rules:
- canonical must be lowercase with no punctuation marks
- merge claims that mean the same thing even if worded differently
- each index appears in exactly one group
- pick the type that best fits the group`;
const body = JSON.stringify({
model: llmConfig.llmModel || llmConfig.model,
messages: [{ role: "user", content: prompt }],
temperature: 0.1,
});
const url = new URL("https://openrouter.ai/api/v1/chat/completions");
const responseText = await httpPost(url, body, {
"Content-Type": "application/json",
"Authorization": `Bearer ${llmConfig.apiKey || ""}`,
});
let parsed;
try {
parsed = JSON.parse(responseText);
} catch (_) {
throw new Error(`LLM response not JSON: ${responseText.slice(0, 200)}`);
}
const content = parsed.choices?.[0]?.message?.content;
if (!content) return [];
const stripped = content.replace(/^```(?:json)?\s*/i, "").replace(/\s*```$/, "").trim();
const result = JSON.parse(stripped);
const groups = [];
for (const g of (result.groups || [])) {
const indices = (g.indices || [])
.map(i => i - 1)
.filter(i => i >= 0 && i < capped.length);
if (indices.length === 0) continue;
const eventIds = indices.map(i => capped[i].eventId);
const dates = indices.map(i => capped[i].eventDate);
const type = g.type || capped[indices[0]].type;
groups.push({ canonical: g.canonical, type, eventIds, dates });
}
return groups;
}
function httpPost(url, body, headers) {
return new Promise((resolve, reject) => {
const lib = url.protocol === "https:" ? https : http;
const req = lib.request({
hostname: url.hostname,
port: url.port || (url.protocol === "https:" ? 443 : 80),
path: url.pathname + url.search,
method: "POST",
headers: { ...headers, "Content-Length": Buffer.byteLength(body) },
}, (res) => {
let data = "";
res.on("data", chunk => data += chunk);
res.on("end", () => {
if (res.statusCode >= 200 && res.statusCode < 300) resolve(data);
else reject(new Error(`LLM ${res.statusCode}: ${data.slice(0, 300)}`));
});
});
req.on("error", reject);
req.write(body);
req.end();
});
}
function sleep(ms) {
return new Promise(r => setTimeout(r, ms));
}
module.exports = { runConsolidationWorker };