293 lines
8.3 KiB
JavaScript
293 lines
8.3 KiB
JavaScript
// consolidationWorker.js
|
|
// reads from event_knowledge (never writes to it) and maintains company_facts
|
|
// runs on a slow loop — one full pass over all tracked companies per cycle
|
|
|
|
const https = require("https");
|
|
const http = require("http");
|
|
|
|
|
|
async function runConsolidationWorker(archiveDb, intelligenceDb, config) {
|
|
const loopDelay = config.workers?.consolidationLoopDelayMs ?? 60000;
|
|
const llmConfig = config.openRouter || {};
|
|
|
|
const getCompanies = intelligenceDb.prepare("SELECT * FROM tracked_companies");
|
|
|
|
const recordEvent = intelligenceDb.prepare(
|
|
`INSERT INTO worker_events (worker) VALUES ('consolidation')`
|
|
);
|
|
const pruneEvents = intelligenceDb.prepare(
|
|
`DELETE FROM worker_events WHERE worker = 'consolidation' AND completed_at < datetime('now', '-1 hour')`
|
|
);
|
|
|
|
const getKnowledge = intelligenceDb.prepare(`
|
|
SELECT * FROM event_knowledge WHERE company_id = ? ORDER BY event_date ASC
|
|
`);
|
|
|
|
const getExistingFirstSeen = intelligenceDb.prepare(`
|
|
SELECT claim, first_seen_at FROM company_facts WHERE company_id = ?
|
|
`);
|
|
|
|
const deleteCompanyFacts = intelligenceDb.prepare(
|
|
"DELETE FROM company_facts WHERE company_id = ?"
|
|
);
|
|
|
|
const insertFact = intelligenceDb.prepare(`
|
|
INSERT INTO company_facts
|
|
(company_id, type, claim, confidence, confirmation_count, first_seen_at, last_seen_at, last_event_id, supporting_event_ids)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
`);
|
|
|
|
// stale = seen only once and not reinforced in 90 days
|
|
const deleteStaleFacts = intelligenceDb.prepare(`
|
|
DELETE FROM company_facts
|
|
WHERE confirmation_count = 1
|
|
AND last_seen_at < datetime('now', '-90 days')
|
|
`);
|
|
|
|
while (true) {
|
|
try {
|
|
const companies = getCompanies.all();
|
|
|
|
let pruneCounter = 0;
|
|
for (const company of companies) {
|
|
try {
|
|
await processCompany(
|
|
company, intelligenceDb, llmConfig,
|
|
getKnowledge, getExistingFirstSeen, deleteCompanyFacts, insertFact
|
|
);
|
|
recordEvent.run();
|
|
pruneCounter++;
|
|
if (pruneCounter >= 50) { pruneEvents.run(); pruneCounter = 0; }
|
|
} catch (err) {
|
|
console.error(`[consolidation] error on ${company.name}:`, err.message);
|
|
}
|
|
}
|
|
|
|
const staleResult = deleteStaleFacts.run();
|
|
if (staleResult.changes > 0) {
|
|
console.log(`[consolidation] pruned ${staleResult.changes} stale facts`);
|
|
}
|
|
|
|
console.log("[consolidation] cycle complete");
|
|
|
|
} catch (err) {
|
|
console.error("[consolidation] cycle error:", err.message);
|
|
}
|
|
|
|
await sleep(loopDelay);
|
|
}
|
|
}
|
|
|
|
|
|
async function processCompany(company, intelligenceDb, llmConfig, getKnowledge, getExistingFirstSeen, deleteCompanyFacts, insertFact) {
|
|
const rows = getKnowledge.all(company.id);
|
|
if (rows.length === 0) return;
|
|
|
|
const rawClaims = [];
|
|
|
|
for (const row of rows) {
|
|
let data;
|
|
try { data = JSON.parse(row.data); } catch (_) { continue; }
|
|
|
|
const text = extractClaimText(row.type, data);
|
|
if (!text) continue;
|
|
|
|
rawClaims.push({
|
|
text,
|
|
type: row.type,
|
|
eventId: row.event_id,
|
|
eventDate: row.event_date || row.created_at,
|
|
});
|
|
}
|
|
|
|
if (rawClaims.length === 0) return;
|
|
|
|
let groups;
|
|
try {
|
|
groups = await normalizeClaims(company.name, rawClaims, llmConfig);
|
|
} catch (err) {
|
|
console.error(`[consolidation] LLM failed for ${company.name}:`, err.message);
|
|
return;
|
|
}
|
|
|
|
if (!groups || groups.length === 0) return;
|
|
|
|
// save first_seen_at before we wipe the rows
|
|
const priorFirstSeen = {};
|
|
for (const f of getExistingFirstSeen.all(company.id)) {
|
|
priorFirstSeen[f.claim] = f.first_seen_at;
|
|
}
|
|
|
|
const writeAll = intelligenceDb.transaction(() => {
|
|
deleteCompanyFacts.run(company.id);
|
|
|
|
for (const group of groups) {
|
|
const { canonical, type, eventIds, dates } = group;
|
|
|
|
const distinctEventIds = [...new Set(eventIds)];
|
|
const count = distinctEventIds.size || distinctEventIds.length;
|
|
|
|
const sortedDates = dates.filter(Boolean).sort();
|
|
const firstSeen = priorFirstSeen[canonical] || sortedDates[0] || new Date().toISOString();
|
|
const lastSeen = sortedDates[sortedDates.length - 1] || firstSeen;
|
|
|
|
const lastEventId = eventIds[eventIds.length - 1];
|
|
const confidence = countToConfidence(count);
|
|
|
|
insertFact.run(
|
|
company.id,
|
|
type,
|
|
canonical,
|
|
confidence,
|
|
count,
|
|
firstSeen,
|
|
lastSeen,
|
|
lastEventId,
|
|
JSON.stringify(distinctEventIds)
|
|
);
|
|
}
|
|
});
|
|
|
|
writeAll();
|
|
console.log(`[consolidation] ${company.name}: ${groups.length} distinct facts`);
|
|
}
|
|
|
|
|
|
function normalizeEntityName(name) {
|
|
return name
|
|
.toLowerCase()
|
|
.replace(/\./g, "")
|
|
.replace(/\s+/g, " ")
|
|
.trim();
|
|
}
|
|
|
|
|
|
function extractClaimText(type, data) {
|
|
if (type === "relationship") {
|
|
if (!data.entity) return null;
|
|
return `${normalizeEntityName(data.entity)} is a ${data.type || "partner"}`;
|
|
}
|
|
|
|
if (type === "theme") {
|
|
if (!data.theme) return null;
|
|
return `${data.theme} is ${data.direction || "relevant"}`;
|
|
}
|
|
|
|
if (type === "factor") {
|
|
if (!data.factor) return null;
|
|
return `${data.factor}: ${data.relationship || "relevant"}`;
|
|
}
|
|
|
|
return null;
|
|
}
|
|
|
|
|
|
function countToConfidence(count) {
|
|
if (count >= 8) return "very_high";
|
|
if (count >= 4) return "high";
|
|
if (count >= 2) return "medium";
|
|
return "low";
|
|
}
|
|
|
|
|
|
async function normalizeClaims(companyName, rawClaims, llmConfig) {
|
|
// cap to keep the prompt manageable
|
|
const capped = rawClaims.slice(0, 120);
|
|
|
|
const lines = capped.map((c, i) => `${i + 1}. [${c.type}] ${c.text}`).join("\n");
|
|
|
|
const prompt = `You are normalizing knowledge claims about ${companyName}. Always respond in English regardless of the language of the input articles.
|
|
|
|
Group semantically equivalent claims and provide a canonical short form for each group (lowercase, no punctuation).
|
|
|
|
Claims:
|
|
${lines}
|
|
|
|
Return JSON only — no explanation. Shape:
|
|
{
|
|
"groups": [
|
|
{ "canonical": "canonical claim text", "type": "relationship|theme|factor", "indices": [1, 2] }
|
|
]
|
|
}
|
|
|
|
Rules:
|
|
- canonical must be lowercase with no punctuation marks
|
|
- merge claims that mean the same thing even if worded differently
|
|
- each index appears in exactly one group
|
|
- pick the type that best fits the group`;
|
|
|
|
const body = JSON.stringify({
|
|
model: llmConfig.llmModel || llmConfig.model,
|
|
messages: [{ role: "user", content: prompt }],
|
|
temperature: 0.1,
|
|
});
|
|
|
|
const url = new URL("https://openrouter.ai/api/v1/chat/completions");
|
|
const responseText = await httpPost(url, body, {
|
|
"Content-Type": "application/json",
|
|
"Authorization": `Bearer ${llmConfig.apiKey || ""}`,
|
|
});
|
|
|
|
let parsed;
|
|
try {
|
|
parsed = JSON.parse(responseText);
|
|
} catch (_) {
|
|
throw new Error(`LLM response not JSON: ${responseText.slice(0, 200)}`);
|
|
}
|
|
|
|
const content = parsed.choices?.[0]?.message?.content;
|
|
if (!content) return [];
|
|
|
|
const stripped = content.replace(/^```(?:json)?\s*/i, "").replace(/\s*```$/, "").trim();
|
|
const result = JSON.parse(stripped);
|
|
|
|
const groups = [];
|
|
|
|
for (const g of (result.groups || [])) {
|
|
const indices = (g.indices || [])
|
|
.map(i => i - 1)
|
|
.filter(i => i >= 0 && i < capped.length);
|
|
|
|
if (indices.length === 0) continue;
|
|
|
|
const eventIds = indices.map(i => capped[i].eventId);
|
|
const dates = indices.map(i => capped[i].eventDate);
|
|
const type = g.type || capped[indices[0]].type;
|
|
|
|
groups.push({ canonical: g.canonical, type, eventIds, dates });
|
|
}
|
|
|
|
return groups;
|
|
}
|
|
|
|
|
|
function httpPost(url, body, headers) {
|
|
return new Promise((resolve, reject) => {
|
|
const lib = url.protocol === "https:" ? https : http;
|
|
|
|
const req = lib.request({
|
|
hostname: url.hostname,
|
|
port: url.port || (url.protocol === "https:" ? 443 : 80),
|
|
path: url.pathname + url.search,
|
|
method: "POST",
|
|
headers: { ...headers, "Content-Length": Buffer.byteLength(body) },
|
|
}, (res) => {
|
|
let data = "";
|
|
res.on("data", chunk => data += chunk);
|
|
res.on("end", () => {
|
|
if (res.statusCode >= 200 && res.statusCode < 300) resolve(data);
|
|
else reject(new Error(`LLM ${res.statusCode}: ${data.slice(0, 300)}`));
|
|
});
|
|
});
|
|
|
|
req.on("error", reject);
|
|
req.write(body);
|
|
req.end();
|
|
});
|
|
}
|
|
|
|
function sleep(ms) {
|
|
return new Promise(r => setTimeout(r, ms));
|
|
}
|
|
|
|
module.exports = { runConsolidationWorker };
|