// builds cross-company relationship graph from company_facts rows where type = 'relationship' const https = require("https"); const http = require("http"); const VALID_TYPES = ["supplier", "customer", "competitor", "partner", "investor", "dependency"]; const KEYWORD_MAP = [ ["manufactur", "supplier"], ["suppli", "supplier"], ["distribut", "supplier"], ["compet", "competitor"], ["rival", "competitor"], ["invest", "investor"], ["fund", "investor"], ["backer", "investor"], ["customer", "customer"], ["client", "customer"], ["depend", "dependency"], ["partner", "partner"], ["collaborat", "partner"], ["joint", "partner"], ]; const RECIPROCAL = { supplier: "customer", customer: "supplier", competitor: "competitor", partner: "partner", investor: "dependency", dependency: "investor", }; function parseRelationshipType(rawType) { const t = (rawType || "").toLowerCase().trim(); if (VALID_TYPES.includes(t)) return t; for (const [kw, mapped] of KEYWORD_MAP) { if (t.includes(kw)) return mapped; } return "partner"; } function parseClaim(claim) { // format from consolidationWorker: "entity is a type" const parts = claim.split(" is a "); if (parts.length < 2) return null; const toEntity = parts[0].trim(); const rawType = parts.slice(1).join(" is a ").trim(); if (!toEntity || !rawType) return null; return { toEntity, relationshipType: parseRelationshipType(rawType), }; } function normalizeName(s) { return (s || "").toLowerCase().replace(/\./g, "").replace(/\s+/g, " ").trim(); } // stopwords to skip when building LIKE terms const SKIP_WORDS = new Set(["the", "and", "of", "a", "an", "inc", "corp", "ltd", "llc", "co", "plus"]); // ask LLM to match entity against a list of candidate company names // returns the matched company name or null async function llmResolveEntity(entity, candidates, llmConfig) { if (!candidates.length) return null; const list = candidates.map((c, i) => `${i + 1}. ${c.name}`).join("\n"); const prompt = `Which of the following companies, if any, does "${entity}" refer to? ${list} Reply with just the number of the match, or "none" if none apply. No explanation.`; const body = JSON.stringify({ model: llmConfig.cheapModel || llmConfig.llmModel || llmConfig.model, messages: [{ role: "user", content: prompt }], temperature: 0, }); const url = new URL("https://openrouter.ai/api/v1/chat/completions"); let responseText; try { responseText = await httpPost(url, body, { "Content-Type": "application/json", "Authorization": `Bearer ${llmConfig.apiKey || ""}`, }); } catch (err) { console.warn("[graph] LLM resolve failed:", err.message); return null; } let parsed; try { parsed = JSON.parse(responseText); } catch (_) { return null; } const content = (parsed.choices?.[0]?.message?.content || "").trim().toLowerCase(); if (content === "none" || content === "0") return null; const idx = parseInt(content, 10); if (!isNaN(idx) && idx >= 1 && idx <= candidates.length) { return candidates[idx - 1]; } return null; } // pre-resolves all unique unmatched entities via LIKE + LLM before the transaction async function buildResolutionCache(facts, intelligenceDb, llmConfig) { const exactCache = new Map(); // entity -> company row (from exact match) const llmCache = new Map(); // entity -> company row or null (from LLM) const allCompanies = intelligenceDb.prepare("SELECT * FROM tracked_companies").all(); // build a quick exact-match lookup const byNormalizedName = new Map(); const byTicker = new Map(); const byAlias = new Map(); for (const co of allCompanies) { byNormalizedName.set(normalizeName(co.name), co); if (co.ticker) byTicker.set(co.ticker.toLowerCase(), co); let aliases = []; try { aliases = JSON.parse(co.aliases || "[]"); } catch (_) {} for (const alias of aliases) byAlias.set(normalizeName(alias), co); } const uniqueEntities = [...new Set( facts.map(f => parseClaim(f.claim)?.toEntity).filter(Boolean) )]; const getCandidates = intelligenceDb.prepare(` SELECT * FROM tracked_companies WHERE name LIKE ? LIMIT 20 `); for (const entity of uniqueEntities) { const needle = normalizeName(entity); // try exact match first const exact = byNormalizedName.get(needle) || byTicker.get(needle) || byAlias.get(needle); if (exact) { exactCache.set(entity, exact); continue; } // build candidate list via LIKE on each meaningful word const words = needle.split(" ").filter(w => w.length >= 3 && !SKIP_WORDS.has(w)); if (!words.length) { llmCache.set(entity, null); continue; } const seen = new Map(); for (const word of words) { const rows = getCandidates.all(`%${word}%`); for (const row of rows) { if (!seen.has(row.id)) seen.set(row.id, row); } } const candidates = [...seen.values()].slice(0, 20); const resolved = await llmResolveEntity(entity, candidates, llmConfig); llmCache.set(entity, resolved || null); if (resolved) { console.log(`[graph] LLM resolved "${entity}" -> ${resolved.name}`); } } return { exactCache, llmCache }; } function resolveFromCache(entity, exactCache, llmCache) { return exactCache.get(entity) ?? llmCache.get(entity) ?? null; } function confidenceFromCount(count) { if (count >= 8) return "very_high"; if (count >= 4) return "high"; if (count >= 2) return "medium"; return "low"; } function mergeEventIds(existingJson, newJson) { let existing = []; let incoming = []; try { existing = JSON.parse(existingJson || "[]"); } catch (_) {} try { incoming = JSON.parse(newJson || "[]"); } catch (_) {} const merged = [...new Set([...existing, ...incoming])]; return JSON.stringify(merged); } async function runGraphWorker(archiveDb, intelligenceDb, config) { const loopDelay = config.workers?.graphWorkerLoopDelayMs ?? 90000; const llmConfig = config.openRouter || {}; const getRelationshipFacts = intelligenceDb.prepare(` SELECT * FROM company_facts WHERE type = 'relationship' `); const recordEvents = intelligenceDb.prepare( `INSERT INTO worker_events (worker) VALUES ('graph')` ); const pruneEvents = intelligenceDb.prepare( `DELETE FROM worker_events WHERE worker = 'graph' AND completed_at < datetime('now', '-1 hour')` ); const getCompanyById = intelligenceDb.prepare("SELECT * FROM tracked_companies WHERE id = ?"); const getExistingRel = intelligenceDb.prepare(` SELECT * FROM company_relationships WHERE from_company_id = ? AND relationship_type = ? AND to_entity = ? `); const insertRel = intelligenceDb.prepare(` INSERT INTO company_relationships (from_company_id, relationship_type, to_entity, to_company_id, confidence, confirmation_count, first_seen_at, last_seen_at, supporting_event_ids) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) `); const updateRel = intelligenceDb.prepare(` UPDATE company_relationships SET confirmation_count = ?, confidence = ?, last_seen_at = ?, supporting_event_ids = ? WHERE from_company_id = ? AND relationship_type = ? AND to_entity = ? `); const getExistingReciprocal = intelligenceDb.prepare(` SELECT id FROM company_relationships WHERE from_company_id = ? AND relationship_type = ? AND to_entity = ? `); const insertReciprocal = intelligenceDb.prepare(` INSERT OR IGNORE INTO company_relationships (from_company_id, relationship_type, to_entity, to_company_id, confidence, confirmation_count, first_seen_at, last_seen_at, supporting_event_ids) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) `); while (true) { try { const facts = getRelationshipFacts.all(); // resolve all entities before entering the transaction const { exactCache, llmCache } = await buildResolutionCache(facts, intelligenceDb, llmConfig); let upserted = 0; let reciprocals = 0; const processAll = intelligenceDb.transaction(() => { for (const fact of facts) { const parsed = parseClaim(fact.claim); if (!parsed) continue; const { toEntity, relationshipType } = parsed; const resolved = resolveFromCache(toEntity, exactCache, llmCache); const toCompanyId = resolved ? resolved.id : null; const existing = getExistingRel.get(fact.company_id, relationshipType, toEntity); const now = new Date().toISOString(); let finalCount; let finalFirst; let finalLast; let finalEventIds; if (existing) { finalCount = fact.confirmation_count; finalFirst = existing.first_seen_at; finalLast = fact.last_seen_at || now; finalEventIds = mergeEventIds(existing.supporting_event_ids, fact.supporting_event_ids); updateRel.run( finalCount, confidenceFromCount(finalCount), finalLast, finalEventIds, fact.company_id, relationshipType, toEntity ); } else { finalCount = fact.confirmation_count || 1; finalFirst = fact.first_seen_at || now; finalLast = fact.last_seen_at || now; finalEventIds = fact.supporting_event_ids || "[]"; insertRel.run( fact.company_id, relationshipType, toEntity, toCompanyId, confidenceFromCount(finalCount), finalCount, finalFirst, finalLast, finalEventIds ); } upserted++; // reciprocal edge — only where both companies are tracked if (toCompanyId) { const fromCompany = getCompanyById.get(fact.company_id); if (!fromCompany) continue; const recipType = RECIPROCAL[relationshipType] || "partner"; const recipToEntity = fromCompany.name; const recipExists = getExistingReciprocal.get(toCompanyId, recipType, recipToEntity); if (!recipExists) { insertReciprocal.run( toCompanyId, recipType, recipToEntity, fact.company_id, confidenceFromCount(finalCount), finalCount, finalFirst, finalLast, finalEventIds ); reciprocals++; } } } }); processAll(); if (upserted > 0) { const insertMany = intelligenceDb.transaction((n) => { for (let i = 0; i < n; i++) recordEvents.run(); }); insertMany(upserted); pruneEvents.run(); } if (upserted > 0 || reciprocals > 0) { console.log(`[graph] cycle complete — ${upserted} edges upserted, ${reciprocals} reciprocals added`); } else { console.log("[graph] cycle complete — no new data"); } } catch (err) { console.error("[graph] cycle error:", err.message); } await sleep(loopDelay); } } function httpPost(url, body, headers) { return new Promise((resolve, reject) => { const lib = url.protocol === "https:" ? https : http; const req = lib.request({ hostname: url.hostname, port: url.port || (url.protocol === "https:" ? 443 : 80), path: url.pathname + url.search, method: "POST", headers: { ...headers, "Content-Length": Buffer.byteLength(body) }, }, (res) => { let data = ""; res.on("data", chunk => data += chunk); res.on("end", () => { if (res.statusCode >= 200 && res.statusCode < 300) resolve(data); else reject(new Error(`LLM ${res.statusCode}: ${data.slice(0, 300)}`)); }); }); req.on("error", reject); req.write(body); req.end(); }); } function sleep(ms) { return new Promise(r => setTimeout(r, ms)); } module.exports = { runGraphWorker };