Duriin-API/intelligence/augorWorker.js

263 lines
8.7 KiB
JavaScript

const https = require("https");
const http = require("http");
const { findMatchedCompaniesByEmbedding } = require("./embeddings");
async function runAugorWorker(archiveDb, intelligenceDb, config) {
const loopDelay = config.workers?.augorLoopDelayMs ?? 1500;
const llmConfig = config.openRouter || {};
const getPending = intelligenceDb.prepare(`
SELECT * FROM article_queue WHERE status = 'pending' LIMIT 1
`);
const recordEvent = intelligenceDb.prepare(
`INSERT INTO worker_events (worker) VALUES ('augor')`
);
const pruneEvents = intelligenceDb.prepare(
`DELETE FROM worker_events WHERE worker = 'augor' AND completed_at < datetime('now', '-1 hour')`
);
let pruneCounter = 0;
const setStatus = intelligenceDb.prepare(`
UPDATE article_queue SET status = ?, updated_at = CURRENT_TIMESTAMP WHERE article_id = ?
`);
const getEventArticleIds = archiveDb.prepare(
"SELECT id FROM articles WHERE event_id = ?"
);
const setStatusByArticleId = intelligenceDb.prepare(`
UPDATE article_queue SET status = 'processed', updated_at = CURRENT_TIMESTAMP
WHERE article_id = ? AND status = 'pending'
`);
const deleteKnowledge = intelligenceDb.prepare(
"DELETE FROM event_knowledge WHERE event_id = ?"
);
const deletePredictions = intelligenceDb.prepare(
"DELETE FROM event_predictions WHERE event_id = ?"
);
const insertKnowledge = intelligenceDb.prepare(`
INSERT INTO event_knowledge (event_id, company_id, type, data, event_date)
VALUES (?, ?, ?, ?, ?)
`);
const insertPrediction = intelligenceDb.prepare(`
INSERT INTO event_predictions (event_id, company_id, type, direction, magnitude, timeframe, rationale, event_date)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
`);
const getEventDate = archiveDb.prepare(`
SELECT pub_date_effective FROM articles
WHERE event_id = ? AND pub_date_effective IS NOT NULL
ORDER BY pub_date_effective ASC LIMIT 1
`);
const getCompanyFacts = intelligenceDb.prepare(`
SELECT claim, confidence, confirmation_count FROM company_facts
WHERE company_id = ? ORDER BY confirmation_count DESC LIMIT 30
`);
while (true) {
try {
const queueRow = getPending.get();
if (!queueRow) {
await sleep(loopDelay);
continue;
}
const article = archiveDb.prepare(`
SELECT id, event_id, content, has_embedding
FROM articles WHERE id = ?
`).get(queueRow.article_id);
if (!article || !article.content || !article.has_embedding || !article.event_id) {
setStatus.run("skipped", queueRow.article_id);
continue;
}
const eventId = article.event_id;
const event = archiveDb.prepare("SELECT * FROM events WHERE id = ?").get(eventId);
if (!event) {
setStatus.run("skipped", queueRow.article_id);
continue;
}
const eventArticles = archiveDb.prepare(`
SELECT id, title, description, content
FROM articles
WHERE event_id = ? AND content IS NOT NULL AND content != ''
ORDER BY id ASC
LIMIT 25
`).all(eventId);
const eventArticleIds = eventArticles.map(a => a.id);
const matchedCompanies = findMatchedCompaniesByEmbedding(
eventArticleIds, archiveDb, intelligenceDb, config
);
if (matchedCompanies.length === 0) {
for (const r of getEventArticleIds.all(eventId)) setStatusByArticleId.run(r.id);
console.log(`[augor] event ${eventId} — no company match, skipped`);
continue;
}
deleteKnowledge.run(eventId);
deletePredictions.run(eventId);
const eventDateRow = getEventDate.get(eventId);
const eventDate = eventDateRow ? eventDateRow.pub_date_effective : null;
const articleText = eventArticles.map((a, i) => {
const body = (a.content || a.description || "").slice(0, 2000);
return `[Article ${i + 1}] ${a.title}\n${body}`;
}).join("\n\n---\n\n");
for (const company of matchedCompanies) {
try {
const facts = getCompanyFacts.all(company.id);
let factsBlock = null;
if (facts.length > 0) {
const lines = facts.map(f => `- ${f.claim} (confirmed ${f.confirmation_count} times)`).join("\n");
factsBlock = `Known facts about ${company.name}:\n${lines}`;
}
const result = await callLlm(llmConfig, buildPrompt(company.name, event.title, articleText, factsBlock));
if (result) {
const writeAll = intelligenceDb.transaction(() => {
for (const r of (result.knowledge?.relationships || [])) {
insertKnowledge.run(eventId, company.id, "relationship", JSON.stringify(r), eventDate);
}
for (const t of (result.knowledge?.themes || [])) {
insertKnowledge.run(eventId, company.id, "theme", JSON.stringify(t), eventDate);
}
for (const f of (result.knowledge?.factors || [])) {
insertKnowledge.run(eventId, company.id, "factor", JSON.stringify(f), eventDate);
}
for (const p of (result.predictions || [])) {
insertPrediction.run(eventId, company.id, p.type, p.direction, p.magnitude, p.timeframe, p.rationale, eventDate);
}
});
writeAll();
}
} catch (llmErr) {
console.error(`[augor] LLM error for ${company.name} on event ${eventId}:`, llmErr.message);
}
}
for (const r of getEventArticleIds.all(eventId)) setStatusByArticleId.run(r.id);
recordEvent.run();
pruneCounter++;
if (pruneCounter >= 100) { pruneEvents.run(); pruneCounter = 0; }
console.log(`[augor] processed event ${eventId} (${matchedCompanies.length} companies, ${eventArticles.length} articles)`);
} catch (err) {
console.error("[augor] error:", err.message);
await sleep(loopDelay);
}
}
}
function buildPrompt(companyName, eventTitle, articleText, factsBlock) {
const factsPart = factsBlock ? `${factsBlock}\n\n` : "";
return `You are a financial intelligence analyst focused on ${companyName}. Always respond in English regardless of the language of the input articles.
${factsPart}Assess the impact of the following news event on ${companyName} given what you already know about the company.
Event: ${eventTitle}
${articleText}
Return JSON only — no explanation. Shape:
{
"knowledge": {
"relationships": [
{ "type": "supplier|customer|competitor", "entity": "string", "confidence": "high|medium|low", "evidence": "string" }
],
"themes": [
{ "theme": "string", "direction": "increasing|stable|decreasing", "evidence": "string" }
],
"factors": [
{ "factor": "string", "relationship": "string", "evidence": "string" }
]
},
"predictions": [
{ "type": "market_share|stock_price|competitive_position|other", "direction": "positive|negative|neutral", "magnitude": "high|medium|low", "timeframe": "short|medium|long", "rationale": "string" }
]
}
Only include claims directly supported by the articles. Use empty arrays if nothing applies.`;
}
async function callLlm(llmConfig, prompt) {
const body = JSON.stringify({
model: llmConfig.llmModel || llmConfig.model,
messages: [{ role: "user", content: prompt }],
temperature: 0.1,
});
const url = new URL("https://openrouter.ai/api/v1/chat/completions");
const responseText = await httpPost(url, body, {
"Content-Type": "application/json",
"Authorization": `Bearer ${llmConfig.apiKey || ""}`,
});
let parsed;
try {
parsed = JSON.parse(responseText);
} catch (e) {
throw new Error(`LLM response not JSON: ${responseText.slice(0, 300)}`);
}
const content = parsed.choices?.[0]?.message?.content;
if (!content) return null;
const stripped = content.replace(/^```(?:json)?\s*/i, '').replace(/\s*```$/, '').trim();
return JSON.parse(stripped);
}
function httpPost(url, body, headers) {
return new Promise((resolve, reject) => {
const lib = url.protocol === "https:" ? https : http;
const req = lib.request({
hostname: url.hostname,
port: url.port || (url.protocol === "https:" ? 443 : 80),
path: url.pathname + url.search,
method: "POST",
headers: { ...headers, "Content-Length": Buffer.byteLength(body) },
}, (res) => {
let data = "";
res.on("data", chunk => data += chunk);
res.on("end", () => {
if (res.statusCode >= 200 && res.statusCode < 300) {
resolve(data);
} else {
reject(new Error(`LLM ${res.statusCode}: ${data.slice(0, 300)}`));
}
});
});
req.on("error", reject);
req.write(body);
req.end();
});
}
function sleep(ms) {
return new Promise(r => setTimeout(r, ms));
}
module.exports = { runAugorWorker };