Duriin-API/workers/graphWorker.js

414 lines
12 KiB
JavaScript

// builds cross-company relationship graph from company_facts rows where type = 'relationship'
const https = require("https");
const http = require("http");
const VALID_TYPES = ["supplier", "customer", "competitor", "partner", "investor", "dependency"];
const KEYWORD_MAP = [
["manufactur", "supplier"],
["suppli", "supplier"],
["distribut", "supplier"],
["compet", "competitor"],
["rival", "competitor"],
["invest", "investor"],
["fund", "investor"],
["backer", "investor"],
["customer", "customer"],
["client", "customer"],
["depend", "dependency"],
["partner", "partner"],
["collaborat", "partner"],
["joint", "partner"],
];
const RECIPROCAL = {
supplier: "customer",
customer: "supplier",
competitor: "competitor",
partner: "partner",
investor: "dependency",
dependency: "investor",
};
function parseRelationshipType(rawType) {
const t = (rawType || "").toLowerCase().trim();
if (VALID_TYPES.includes(t)) return t;
for (const [kw, mapped] of KEYWORD_MAP) {
if (t.includes(kw)) return mapped;
}
return "partner";
}
function parseClaim(claim) {
// format from consolidationWorker: "entity is a type"
const parts = claim.split(" is a ");
if (parts.length < 2) return null;
const toEntity = parts[0].trim();
const rawType = parts.slice(1).join(" is a ").trim();
if (!toEntity || !rawType) return null;
return {
toEntity,
relationshipType: parseRelationshipType(rawType),
};
}
function normalizeName(s) {
return (s || "").toLowerCase().replace(/\./g, "").replace(/\s+/g, " ").trim();
}
// stopwords to skip when building LIKE terms
const SKIP_WORDS = new Set(["the", "and", "of", "a", "an", "inc", "corp", "ltd", "llc", "co", "plus"]);
// ask LLM to match entity against a list of candidate company names
// returns the matched company name or null
async function llmResolveEntity(entity, candidates, llmConfig) {
if (!candidates.length) return null;
const list = candidates.map((c, i) => `${i + 1}. ${c.name}`).join("\n");
const prompt = `Which of the following companies, if any, does "${entity}" refer to?
${list}
Reply with just the number of the match, or "none" if none apply. No explanation.`;
const body = JSON.stringify({
model: llmConfig.cheapModel || llmConfig.llmModel || llmConfig.model,
messages: [{ role: "user", content: prompt }],
temperature: 0,
});
const url = new URL("https://openrouter.ai/api/v1/chat/completions");
let responseText;
try {
responseText = await httpPost(url, body, {
"Content-Type": "application/json",
"Authorization": `Bearer ${llmConfig.apiKey || ""}`,
});
} catch (err) {
console.warn("[graph] LLM resolve failed:", err.message);
return null;
}
let parsed;
try { parsed = JSON.parse(responseText); } catch (_) { return null; }
const content = (parsed.choices?.[0]?.message?.content || "").trim().toLowerCase();
if (content === "none" || content === "0") return null;
const idx = parseInt(content, 10);
if (!isNaN(idx) && idx >= 1 && idx <= candidates.length) {
return candidates[idx - 1];
}
return null;
}
// pre-resolves all unique unmatched entities via LIKE + LLM before the transaction
async function buildResolutionCache(facts, intelligenceDb, llmConfig) {
const exactCache = new Map(); // entity -> company row (from exact match)
const llmCache = new Map(); // entity -> company row or null (from LLM)
const allCompanies = intelligenceDb.prepare("SELECT * FROM tracked_companies").all();
// build a quick exact-match lookup
const byNormalizedName = new Map();
const byTicker = new Map();
const byAlias = new Map();
for (const co of allCompanies) {
byNormalizedName.set(normalizeName(co.name), co);
if (co.ticker) byTicker.set(co.ticker.toLowerCase(), co);
let aliases = [];
try { aliases = JSON.parse(co.aliases || "[]"); } catch (_) {}
for (const alias of aliases) byAlias.set(normalizeName(alias), co);
}
const uniqueEntities = [...new Set(
facts.map(f => parseClaim(f.claim)?.toEntity).filter(Boolean)
)];
const getCandidates = intelligenceDb.prepare(`
SELECT * FROM tracked_companies WHERE name LIKE ? LIMIT 20
`);
for (const entity of uniqueEntities) {
const needle = normalizeName(entity);
// try exact match first
const exact = byNormalizedName.get(needle) || byTicker.get(needle) || byAlias.get(needle);
if (exact) {
exactCache.set(entity, exact);
continue;
}
// build candidate list via LIKE on each meaningful word
const words = needle.split(" ").filter(w => w.length >= 3 && !SKIP_WORDS.has(w));
if (!words.length) {
llmCache.set(entity, null);
continue;
}
const seen = new Map();
for (const word of words) {
const rows = getCandidates.all(`%${word}%`);
for (const row of rows) {
if (!seen.has(row.id)) seen.set(row.id, row);
}
}
const candidates = [...seen.values()].slice(0, 20);
const resolved = await llmResolveEntity(entity, candidates, llmConfig);
llmCache.set(entity, resolved || null);
if (resolved) {
console.log(`[graph] LLM resolved "${entity}" -> ${resolved.name}`);
}
}
return { exactCache, llmCache };
}
function resolveFromCache(entity, exactCache, llmCache) {
return exactCache.get(entity) ?? llmCache.get(entity) ?? null;
}
function confidenceFromCount(count) {
if (count >= 8) return "very_high";
if (count >= 4) return "high";
if (count >= 2) return "medium";
return "low";
}
function mergeEventIds(existingJson, newJson) {
let existing = [];
let incoming = [];
try { existing = JSON.parse(existingJson || "[]"); } catch (_) {}
try { incoming = JSON.parse(newJson || "[]"); } catch (_) {}
const merged = [...new Set([...existing, ...incoming])];
return JSON.stringify(merged);
}
async function runGraphWorker(archiveDb, intelligenceDb, config) {
const loopDelay = config.workers?.graphWorkerLoopDelayMs ?? 90000;
const llmConfig = config.openRouter || {};
const getRelationshipFacts = intelligenceDb.prepare(`
SELECT * FROM company_facts WHERE type = 'relationship'
`);
const recordEvents = intelligenceDb.prepare(
`INSERT INTO worker_events (worker) VALUES ('graph')`
);
const pruneEvents = intelligenceDb.prepare(
`DELETE FROM worker_events WHERE worker = 'graph' AND completed_at < datetime('now', '-1 hour')`
);
const getCompanyById = intelligenceDb.prepare("SELECT * FROM tracked_companies WHERE id = ?");
const getExistingRel = intelligenceDb.prepare(`
SELECT * FROM company_relationships
WHERE from_company_id = ? AND relationship_type = ? AND to_entity = ?
`);
const insertRel = intelligenceDb.prepare(`
INSERT INTO company_relationships
(from_company_id, relationship_type, to_entity, to_company_id, confidence, confirmation_count, first_seen_at, last_seen_at, supporting_event_ids)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
`);
const updateRel = intelligenceDb.prepare(`
UPDATE company_relationships
SET confirmation_count = ?,
confidence = ?,
last_seen_at = ?,
supporting_event_ids = ?
WHERE from_company_id = ? AND relationship_type = ? AND to_entity = ?
`);
const getExistingReciprocal = intelligenceDb.prepare(`
SELECT id FROM company_relationships
WHERE from_company_id = ? AND relationship_type = ? AND to_entity = ?
`);
const insertReciprocal = intelligenceDb.prepare(`
INSERT OR IGNORE INTO company_relationships
(from_company_id, relationship_type, to_entity, to_company_id, confidence, confirmation_count, first_seen_at, last_seen_at, supporting_event_ids)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
`);
while (true) {
try {
const facts = getRelationshipFacts.all();
// resolve all entities before entering the transaction
const { exactCache, llmCache } = await buildResolutionCache(facts, intelligenceDb, llmConfig);
let upserted = 0;
let reciprocals = 0;
const processAll = intelligenceDb.transaction(() => {
for (const fact of facts) {
const parsed = parseClaim(fact.claim);
if (!parsed) continue;
const { toEntity, relationshipType } = parsed;
const resolved = resolveFromCache(toEntity, exactCache, llmCache);
const toCompanyId = resolved ? resolved.id : null;
const existing = getExistingRel.get(fact.company_id, relationshipType, toEntity);
const now = new Date().toISOString();
let finalCount;
let finalFirst;
let finalLast;
let finalEventIds;
if (existing) {
finalCount = fact.confirmation_count;
finalFirst = existing.first_seen_at;
finalLast = fact.last_seen_at || now;
finalEventIds = mergeEventIds(existing.supporting_event_ids, fact.supporting_event_ids);
updateRel.run(
finalCount,
confidenceFromCount(finalCount),
finalLast,
finalEventIds,
fact.company_id,
relationshipType,
toEntity
);
} else {
finalCount = fact.confirmation_count || 1;
finalFirst = fact.first_seen_at || now;
finalLast = fact.last_seen_at || now;
finalEventIds = fact.supporting_event_ids || "[]";
insertRel.run(
fact.company_id,
relationshipType,
toEntity,
toCompanyId,
confidenceFromCount(finalCount),
finalCount,
finalFirst,
finalLast,
finalEventIds
);
}
upserted++;
// reciprocal edge — only where both companies are tracked
if (toCompanyId) {
const fromCompany = getCompanyById.get(fact.company_id);
if (!fromCompany) continue;
const recipType = RECIPROCAL[relationshipType] || "partner";
const recipToEntity = fromCompany.name;
const recipExists = getExistingReciprocal.get(toCompanyId, recipType, recipToEntity);
if (!recipExists) {
insertReciprocal.run(
toCompanyId,
recipType,
recipToEntity,
fact.company_id,
confidenceFromCount(finalCount),
finalCount,
finalFirst,
finalLast,
finalEventIds
);
reciprocals++;
}
}
}
});
processAll();
if (upserted > 0) {
const insertMany = intelligenceDb.transaction((n) => {
for (let i = 0; i < n; i++) recordEvents.run();
});
insertMany(upserted);
pruneEvents.run();
}
if (upserted > 0 || reciprocals > 0) {
console.log(`[graph] cycle complete — ${upserted} edges upserted, ${reciprocals} reciprocals added`);
} else {
console.log("[graph] cycle complete — no new data");
}
} catch (err) {
console.error("[graph] cycle error:", err.message);
}
await sleep(loopDelay);
}
}
function httpPost(url, body, headers) {
return new Promise((resolve, reject) => {
const lib = url.protocol === "https:" ? https : http;
const req = lib.request({
hostname: url.hostname,
port: url.port || (url.protocol === "https:" ? 443 : 80),
path: url.pathname + url.search,
method: "POST",
headers: { ...headers, "Content-Length": Buffer.byteLength(body) },
}, (res) => {
let data = "";
res.on("data", chunk => data += chunk);
res.on("end", () => {
if (res.statusCode >= 200 && res.statusCode < 300) resolve(data);
else reject(new Error(`LLM ${res.statusCode}: ${data.slice(0, 300)}`));
});
});
req.on("error", reject);
req.write(body);
req.end();
});
}
function sleep(ms) {
return new Promise(r => setTimeout(r, ms));
}
module.exports = { runGraphWorker };