Duriin-API/intelligence/augorWorker.js

220 lines
7.1 KiB
JavaScript

const https = require("https");
const http = require("http");
const { findMatchedCompaniesByEmbedding } = require("./embeddings");
async function runAugorWorker(archiveDb, intelligenceDb, config) {
const loopDelay = config.workers?.augorLoopDelayMs ?? 1500;
const llmConfig = config.llm || {};
const getPending = intelligenceDb.prepare(`
SELECT * FROM article_queue WHERE status = 'pending' LIMIT 1
`);
const setStatus = intelligenceDb.prepare(`
UPDATE article_queue SET status = ?, updated_at = CURRENT_TIMESTAMP WHERE article_id = ?
`);
const getEventArticleIds = archiveDb.prepare(
"SELECT id FROM articles WHERE event_id = ?"
);
const setStatusByArticleId = intelligenceDb.prepare(`
UPDATE article_queue SET status = 'processed', updated_at = CURRENT_TIMESTAMP
WHERE article_id = ? AND status = 'pending'
`);
const deleteKnowledge = intelligenceDb.prepare(
"DELETE FROM event_knowledge WHERE event_id = ?"
);
const deletePredictions = intelligenceDb.prepare(
"DELETE FROM event_predictions WHERE event_id = ?"
);
const insertKnowledge = intelligenceDb.prepare(`
INSERT INTO event_knowledge (event_id, company_id, type, data)
VALUES (?, ?, ?, ?)
`);
const insertPrediction = intelligenceDb.prepare(`
INSERT INTO event_predictions (event_id, company_id, type, direction, magnitude, timeframe, rationale)
VALUES (?, ?, ?, ?, ?, ?, ?)
`);
while (true) {
try {
const queueRow = getPending.get();
if (!queueRow) {
await sleep(loopDelay);
continue;
}
const article = archiveDb.prepare(`
SELECT id, event_id, content, has_embedding
FROM articles WHERE id = ?
`).get(queueRow.article_id);
if (!article || !article.content || !article.has_embedding || !article.event_id) {
setStatus.run("skipped", queueRow.article_id);
continue;
}
const eventId = article.event_id;
const event = archiveDb.prepare("SELECT * FROM events WHERE id = ?").get(eventId);
if (!event) {
setStatus.run("skipped", queueRow.article_id);
continue;
}
const eventArticles = archiveDb.prepare(`
SELECT id, title, description, content
FROM articles
WHERE event_id = ? AND content IS NOT NULL AND content != ''
ORDER BY id ASC
LIMIT 25
`).all(eventId);
const eventArticleIds = eventArticles.map(a => a.id);
const matchedCompanies = findMatchedCompaniesByEmbedding(
eventArticleIds, archiveDb, intelligenceDb, config
);
if (matchedCompanies.length === 0) {
for (const r of getEventArticleIds.all(eventId)) setStatusByArticleId.run(r.id);
console.log(`[augor] event ${eventId} — no company match, skipped`);
continue;
}
deleteKnowledge.run(eventId);
deletePredictions.run(eventId);
const articleText = eventArticles.map((a, i) => {
const body = (a.content || a.description || "").slice(0, 2000);
return `[Article ${i + 1}] ${a.title}\n${body}`;
}).join("\n\n---\n\n");
for (const company of matchedCompanies) {
try {
const result = await callLlm(llmConfig, buildPrompt(company.name, event.title, articleText));
if (result) {
const writeAll = intelligenceDb.transaction(() => {
for (const r of (result.knowledge?.relationships || [])) {
insertKnowledge.run(eventId, company.id, "relationship", JSON.stringify(r));
}
for (const t of (result.knowledge?.themes || [])) {
insertKnowledge.run(eventId, company.id, "theme", JSON.stringify(t));
}
for (const f of (result.knowledge?.factors || [])) {
insertKnowledge.run(eventId, company.id, "factor", JSON.stringify(f));
}
for (const p of (result.predictions || [])) {
insertPrediction.run(eventId, company.id, p.type, p.direction, p.magnitude, p.timeframe, p.rationale);
}
});
writeAll();
}
} catch (llmErr) {
console.error(`[augor] LLM error for ${company.name} on event ${eventId}:`, llmErr.message);
}
}
for (const r of getEventArticleIds.all(eventId)) setStatusByArticleId.run(r.id);
console.log(`[augor] processed event ${eventId} (${matchedCompanies.length} companies, ${eventArticles.length} articles)`);
} catch (err) {
console.error("[augor] error:", err.message);
await sleep(loopDelay);
}
}
}
function buildPrompt(companyName, eventTitle, articleText) {
return `You are a financial intelligence analyst. Extract structured knowledge about ${companyName} from these news articles.
Event: ${eventTitle}
${articleText}
Return JSON only — no explanation. Shape:
{
"knowledge": {
"relationships": [
{ "type": "supplier|customer|competitor", "entity": "string", "confidence": "high|medium|low", "evidence": "string" }
],
"themes": [
{ "theme": "string", "direction": "increasing|stable|decreasing", "evidence": "string" }
],
"factors": [
{ "factor": "string", "relationship": "string", "evidence": "string" }
]
},
"predictions": [
{ "type": "market_share|stock_price|competitive_position|other", "direction": "positive|negative|neutral", "magnitude": "high|medium|low", "timeframe": "short|medium|long", "rationale": "string" }
]
}
Only include claims directly supported by the articles. Use empty arrays if nothing applies.`;
}
async function callLlm(llmConfig, prompt) {
const body = JSON.stringify({
model: llmConfig.model || "gpt-4o-mini",
messages: [{ role: "user", content: prompt }],
temperature: 0.1,
});
const baseUrl = llmConfig.baseUrl || "https://api.openai.com";
const url = new URL("/v1/chat/completions", baseUrl);
const responseText = await httpPost(url, body, {
"Content-Type": "application/json",
"Authorization": `Bearer ${llmConfig.apiKey || ""}`,
});
const parsed = JSON.parse(responseText);
const content = parsed.choices?.[0]?.message?.content;
if (!content) return null;
const stripped = content.replace(/^```(?:json)?\s*/i, '').replace(/\s*```$/, '').trim();
return JSON.parse(stripped);
}
function httpPost(url, body, headers) {
return new Promise((resolve, reject) => {
const lib = url.protocol === "https:" ? https : http;
const req = lib.request({
hostname: url.hostname,
port: url.port || (url.protocol === "https:" ? 443 : 80),
path: url.pathname + url.search,
method: "POST",
headers: { ...headers, "Content-Length": Buffer.byteLength(body) },
}, (res) => {
let data = "";
res.on("data", chunk => data += chunk);
res.on("end", () => {
if (res.statusCode >= 200 && res.statusCode < 300) {
resolve(data);
} else {
reject(new Error(`LLM ${res.statusCode}: ${data.slice(0, 300)}`));
}
});
});
req.on("error", reject);
req.write(body);
req.end();
});
}
function sleep(ms) {
return new Promise(r => setTimeout(r, ms));
}
module.exports = { runAugorWorker };