// consolidationWorker.js // reads from event_knowledge (never writes to it) and maintains company_facts // runs on a slow loop — one full pass over all tracked companies per cycle const https = require("https"); const http = require("http"); async function runConsolidationWorker(archiveDb, intelligenceDb, config) { const loopDelay = config.workers?.consolidationLoopDelayMs ?? 60000; const llmConfig = config.openRouter || {}; const getCompanies = intelligenceDb.prepare("SELECT * FROM tracked_companies"); const recordEvent = intelligenceDb.prepare( `INSERT INTO worker_events (worker) VALUES ('consolidation')` ); const pruneEvents = intelligenceDb.prepare( `DELETE FROM worker_events WHERE worker = 'consolidation' AND completed_at < datetime('now', '-1 hour')` ); const getKnowledge = intelligenceDb.prepare(` SELECT * FROM event_knowledge WHERE company_id = ? ORDER BY event_date ASC `); const getExistingFirstSeen = intelligenceDb.prepare(` SELECT claim, first_seen_at FROM company_facts WHERE company_id = ? `); const deleteCompanyFacts = intelligenceDb.prepare( "DELETE FROM company_facts WHERE company_id = ?" ); const insertFact = intelligenceDb.prepare(` INSERT INTO company_facts (company_id, type, claim, confidence, confirmation_count, first_seen_at, last_seen_at, last_event_id, supporting_event_ids) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) `); // stale = seen only once and not reinforced in 90 days const deleteStaleFacts = intelligenceDb.prepare(` DELETE FROM company_facts WHERE confirmation_count = 1 AND last_seen_at < datetime('now', '-90 days') `); while (true) { try { const companies = getCompanies.all(); let pruneCounter = 0; for (const company of companies) { try { await processCompany( company, intelligenceDb, llmConfig, getKnowledge, getExistingFirstSeen, deleteCompanyFacts, insertFact ); recordEvent.run(); pruneCounter++; if (pruneCounter >= 50) { pruneEvents.run(); pruneCounter = 0; } } catch (err) { console.error(`[consolidation] error on ${company.name}:`, err.message); } } const staleResult = deleteStaleFacts.run(); if (staleResult.changes > 0) { console.log(`[consolidation] pruned ${staleResult.changes} stale facts`); } console.log("[consolidation] cycle complete"); } catch (err) { console.error("[consolidation] cycle error:", err.message); } await sleep(loopDelay); } } async function processCompany(company, intelligenceDb, llmConfig, getKnowledge, getExistingFirstSeen, deleteCompanyFacts, insertFact) { const rows = getKnowledge.all(company.id); if (rows.length === 0) return; const rawClaims = []; for (const row of rows) { let data; try { data = JSON.parse(row.data); } catch (_) { continue; } const text = extractClaimText(row.type, data); if (!text) continue; rawClaims.push({ text, type: row.type, eventId: row.event_id, eventDate: row.event_date || row.created_at, }); } if (rawClaims.length === 0) return; let groups; try { groups = await normalizeClaims(company.name, rawClaims, llmConfig); } catch (err) { console.error(`[consolidation] LLM failed for ${company.name}:`, err.message); return; } if (!groups || groups.length === 0) return; // save first_seen_at before we wipe the rows const priorFirstSeen = {}; for (const f of getExistingFirstSeen.all(company.id)) { priorFirstSeen[f.claim] = f.first_seen_at; } const writeAll = intelligenceDb.transaction(() => { deleteCompanyFacts.run(company.id); for (const group of groups) { const { canonical, type, eventIds, dates } = group; const distinctEventIds = [...new Set(eventIds)]; const count = distinctEventIds.size || distinctEventIds.length; const sortedDates = dates.filter(Boolean).sort(); const firstSeen = priorFirstSeen[canonical] || sortedDates[0] || new Date().toISOString(); const lastSeen = sortedDates[sortedDates.length - 1] || firstSeen; const lastEventId = eventIds[eventIds.length - 1]; const confidence = countToConfidence(count); insertFact.run( company.id, type, canonical, confidence, count, firstSeen, lastSeen, lastEventId, JSON.stringify(distinctEventIds) ); } }); writeAll(); console.log(`[consolidation] ${company.name}: ${groups.length} distinct facts`); } function extractClaimText(type, data) { if (type === "relationship") { if (!data.entity) return null; return `${data.entity} is a ${data.type || "partner"}`; } if (type === "theme") { if (!data.theme) return null; return `${data.theme} is ${data.direction || "relevant"}`; } if (type === "factor") { if (!data.factor) return null; return `${data.factor}: ${data.relationship || "relevant"}`; } return null; } function countToConfidence(count) { if (count >= 8) return "very_high"; if (count >= 4) return "high"; if (count >= 2) return "medium"; return "low"; } async function normalizeClaims(companyName, rawClaims, llmConfig) { // cap to keep the prompt manageable const capped = rawClaims.slice(0, 120); const lines = capped.map((c, i) => `${i + 1}. [${c.type}] ${c.text}`).join("\n"); const prompt = `You are normalizing knowledge claims about ${companyName}. Always respond in English regardless of the language of the input articles. Group semantically equivalent claims and provide a canonical short form for each group (lowercase, no punctuation). Claims: ${lines} Return JSON only — no explanation. Shape: { "groups": [ { "canonical": "canonical claim text", "type": "relationship|theme|factor", "indices": [1, 2] } ] } Rules: - canonical must be lowercase with no punctuation marks - merge claims that mean the same thing even if worded differently - each index appears in exactly one group - pick the type that best fits the group`; const body = JSON.stringify({ model: llmConfig.llmModel || llmConfig.model, messages: [{ role: "user", content: prompt }], temperature: 0.1, }); const url = new URL("https://openrouter.ai/api/v1/chat/completions"); const responseText = await httpPost(url, body, { "Content-Type": "application/json", "Authorization": `Bearer ${llmConfig.apiKey || ""}`, }); let parsed; try { parsed = JSON.parse(responseText); } catch (_) { throw new Error(`LLM response not JSON: ${responseText.slice(0, 200)}`); } const content = parsed.choices?.[0]?.message?.content; if (!content) return []; const stripped = content.replace(/^```(?:json)?\s*/i, "").replace(/\s*```$/, "").trim(); const result = JSON.parse(stripped); const groups = []; for (const g of (result.groups || [])) { const indices = (g.indices || []) .map(i => i - 1) .filter(i => i >= 0 && i < capped.length); if (indices.length === 0) continue; const eventIds = indices.map(i => capped[i].eventId); const dates = indices.map(i => capped[i].eventDate); const type = g.type || capped[indices[0]].type; groups.push({ canonical: g.canonical, type, eventIds, dates }); } return groups; } function httpPost(url, body, headers) { return new Promise((resolve, reject) => { const lib = url.protocol === "https:" ? https : http; const req = lib.request({ hostname: url.hostname, port: url.port || (url.protocol === "https:" ? 443 : 80), path: url.pathname + url.search, method: "POST", headers: { ...headers, "Content-Length": Buffer.byteLength(body) }, }, (res) => { let data = ""; res.on("data", chunk => data += chunk); res.on("end", () => { if (res.statusCode >= 200 && res.statusCode < 300) resolve(data); else reject(new Error(`LLM ${res.statusCode}: ${data.slice(0, 300)}`)); }); }); req.on("error", reject); req.write(body); req.end(); }); } function sleep(ms) { return new Promise(r => setTimeout(r, ms)); } module.exports = { runConsolidationWorker };