From 82abe0bcb325fa190da82db1048e9f3d5a912262 Mon Sep 17 00:00:00 2001 From: ImBenji Date: Mon, 27 Apr 2026 18:02:20 +0100 Subject: [PATCH] refactor: load environment variables from .env file and update openRouter configuration --- workers/consolidationWorker.js | 11 ++- workers/graphWorker.js | 172 ++++++++++++++++++++++++++++++--- 2 files changed, 166 insertions(+), 17 deletions(-) diff --git a/workers/consolidationWorker.js b/workers/consolidationWorker.js index 6e863eb..5941f40 100644 --- a/workers/consolidationWorker.js +++ b/workers/consolidationWorker.js @@ -153,10 +153,19 @@ async function processCompany(company, intelligenceDb, llmConfig, getKnowledge, } +function normalizeEntityName(name) { + return name + .toLowerCase() + .replace(/\./g, "") + .replace(/\s+/g, " ") + .trim(); +} + + function extractClaimText(type, data) { if (type === "relationship") { if (!data.entity) return null; - return `${data.entity} is a ${data.type || "partner"}`; + return `${normalizeEntityName(data.entity)} is a ${data.type || "partner"}`; } if (type === "theme") { diff --git a/workers/graphWorker.js b/workers/graphWorker.js index dd2ead8..a89f964 100644 --- a/workers/graphWorker.js +++ b/workers/graphWorker.js @@ -1,5 +1,8 @@ // builds cross-company relationship graph from company_facts rows where type = 'relationship' +const https = require("https"); +const http = require("http"); + const VALID_TYPES = ["supplier", "customer", "competitor", "partner", "investor", "dependency"]; const KEYWORD_MAP = [ @@ -59,25 +62,137 @@ function parseClaim(claim) { } -function resolveCompany(toEntity, companies) { - const needle = toEntity.toLowerCase(); +function normalizeName(s) { + return (s || "").toLowerCase().replace(/\./g, "").replace(/\s+/g, " ").trim(); +} - for (const co of companies) { - if (co.name.toLowerCase() === needle) return co; - if (co.ticker && co.ticker.toLowerCase() === needle) return co; - let aliases = []; - try { aliases = JSON.parse(co.aliases || "[]"); } catch (_) {} +// stopwords to skip when building LIKE terms +const SKIP_WORDS = new Set(["the", "and", "of", "a", "an", "inc", "corp", "ltd", "llc", "co", "plus"]); - for (const alias of aliases) { - if (alias.toLowerCase() === needle) return co; - } + +// ask LLM to match entity against a list of candidate company names +// returns the matched company name or null +async function llmResolveEntity(entity, candidates, llmConfig) { + if (!candidates.length) return null; + + const list = candidates.map((c, i) => `${i + 1}. ${c.name}`).join("\n"); + + const prompt = `Which of the following companies, if any, does "${entity}" refer to? + +${list} + +Reply with just the number of the match, or "none" if none apply. No explanation.`; + + const body = JSON.stringify({ + model: llmConfig.cheapModel || llmConfig.llmModel || llmConfig.model, + messages: [{ role: "user", content: prompt }], + temperature: 0, + }); + + const url = new URL("https://openrouter.ai/api/v1/chat/completions"); + let responseText; + + try { + responseText = await httpPost(url, body, { + "Content-Type": "application/json", + "Authorization": `Bearer ${llmConfig.apiKey || ""}`, + }); + } catch (err) { + console.warn("[graph] LLM resolve failed:", err.message); + return null; + } + + let parsed; + try { parsed = JSON.parse(responseText); } catch (_) { return null; } + + const content = (parsed.choices?.[0]?.message?.content || "").trim().toLowerCase(); + + if (content === "none" || content === "0") return null; + + const idx = parseInt(content, 10); + if (!isNaN(idx) && idx >= 1 && idx <= candidates.length) { + return candidates[idx - 1]; } return null; } +// pre-resolves all unique unmatched entities via LIKE + LLM before the transaction +async function buildResolutionCache(facts, intelligenceDb, llmConfig) { + + const exactCache = new Map(); // entity -> company row (from exact match) + const llmCache = new Map(); // entity -> company row or null (from LLM) + + const allCompanies = intelligenceDb.prepare("SELECT * FROM tracked_companies").all(); + + // build a quick exact-match lookup + const byNormalizedName = new Map(); + const byTicker = new Map(); + const byAlias = new Map(); + + for (const co of allCompanies) { + byNormalizedName.set(normalizeName(co.name), co); + if (co.ticker) byTicker.set(co.ticker.toLowerCase(), co); + + let aliases = []; + try { aliases = JSON.parse(co.aliases || "[]"); } catch (_) {} + for (const alias of aliases) byAlias.set(normalizeName(alias), co); + } + + const uniqueEntities = [...new Set( + facts.map(f => parseClaim(f.claim)?.toEntity).filter(Boolean) + )]; + + const getCandidates = intelligenceDb.prepare(` + SELECT * FROM tracked_companies WHERE name LIKE ? LIMIT 20 + `); + + for (const entity of uniqueEntities) { + const needle = normalizeName(entity); + + // try exact match first + const exact = byNormalizedName.get(needle) || byTicker.get(needle) || byAlias.get(needle); + if (exact) { + exactCache.set(entity, exact); + continue; + } + + // build candidate list via LIKE on each meaningful word + const words = needle.split(" ").filter(w => w.length >= 3 && !SKIP_WORDS.has(w)); + if (!words.length) { + llmCache.set(entity, null); + continue; + } + + const seen = new Map(); + for (const word of words) { + const rows = getCandidates.all(`%${word}%`); + for (const row of rows) { + if (!seen.has(row.id)) seen.set(row.id, row); + } + } + + const candidates = [...seen.values()].slice(0, 20); + + const resolved = await llmResolveEntity(entity, candidates, llmConfig); + llmCache.set(entity, resolved || null); + + if (resolved) { + console.log(`[graph] LLM resolved "${entity}" -> ${resolved.name}`); + } + } + + return { exactCache, llmCache }; +} + + +function resolveFromCache(entity, exactCache, llmCache) { + return exactCache.get(entity) ?? llmCache.get(entity) ?? null; +} + + function confidenceFromCount(count) { if (count >= 8) return "very_high"; if (count >= 4) return "high"; @@ -100,6 +215,7 @@ function mergeEventIds(existingJson, newJson) { async function runGraphWorker(archiveDb, intelligenceDb, config) { const loopDelay = config.workers?.graphWorkerLoopDelayMs ?? 90000; + const llmConfig = config.openRouter || {}; const getRelationshipFacts = intelligenceDb.prepare(` SELECT * FROM company_facts WHERE type = 'relationship' @@ -112,8 +228,6 @@ async function runGraphWorker(archiveDb, intelligenceDb, config) { `DELETE FROM worker_events WHERE worker = 'graph' AND completed_at < datetime('now', '-1 hour')` ); - const getCompanies = intelligenceDb.prepare("SELECT * FROM tracked_companies"); - const getCompanyById = intelligenceDb.prepare("SELECT * FROM tracked_companies WHERE id = ?"); const getExistingRel = intelligenceDb.prepare(` @@ -151,7 +265,9 @@ async function runGraphWorker(archiveDb, intelligenceDb, config) { while (true) { try { const facts = getRelationshipFacts.all(); - const companies = getCompanies.all(); + + // resolve all entities before entering the transaction + const { exactCache, llmCache } = await buildResolutionCache(facts, intelligenceDb, llmConfig); let upserted = 0; let reciprocals = 0; @@ -163,7 +279,7 @@ async function runGraphWorker(archiveDb, intelligenceDb, config) { const { toEntity, relationshipType } = parsed; - const resolved = resolveCompany(toEntity, companies); + const resolved = resolveFromCache(toEntity, exactCache, llmCache); const toCompanyId = resolved ? resolved.id : null; const existing = getExistingRel.get(fact.company_id, relationshipType, toEntity); @@ -175,7 +291,6 @@ async function runGraphWorker(archiveDb, intelligenceDb, config) { let finalEventIds; if (existing) { - // derive count from the fact itself, not accumulate — avoids compounding on re-runs finalCount = fact.confirmation_count; finalFirst = existing.first_seen_at; finalLast = fact.last_seen_at || now; @@ -243,7 +358,6 @@ async function runGraphWorker(archiveDb, intelligenceDb, config) { processAll(); - // record one event per edge upserted so the rate tracks actual work if (upserted > 0) { const insertMany = intelligenceDb.transaction((n) => { for (let i = 0; i < n; i++) recordEvents.run(); @@ -267,6 +381,32 @@ async function runGraphWorker(archiveDb, intelligenceDb, config) { } +function httpPost(url, body, headers) { + return new Promise((resolve, reject) => { + const lib = url.protocol === "https:" ? https : http; + + const req = lib.request({ + hostname: url.hostname, + port: url.port || (url.protocol === "https:" ? 443 : 80), + path: url.pathname + url.search, + method: "POST", + headers: { ...headers, "Content-Length": Buffer.byteLength(body) }, + }, (res) => { + let data = ""; + res.on("data", chunk => data += chunk); + res.on("end", () => { + if (res.statusCode >= 200 && res.statusCode < 300) resolve(data); + else reject(new Error(`LLM ${res.statusCode}: ${data.slice(0, 300)}`)); + }); + }); + + req.on("error", reject); + req.write(body); + req.end(); + }); +} + + function sleep(ms) { return new Promise(r => setTimeout(r, ms)); }