Duriin-API/workers/signalWorker.js

244 lines
7.4 KiB
JavaScript

const https = require("https");
const http = require("http");
async function runSignalWorker(archiveDb, intelligenceDb, config) {
const loopDelay = config.workers?.signalLoopDelayMs ?? 120000;
const llmConfig = config.openRouter || {};
// add as_of column if it doesnt exist yet
try {
intelligenceDb.prepare("ALTER TABLE trade_signals ADD COLUMN as_of TEXT").run();
console.log("[signal] added as_of column to trade_signals");
} catch (_) {
// already exists
}
// all distinct event dates (by day) per company, newest first, that dont already have a signal
const getNextCheckpoint = intelligenceDb.prepare(`
SELECT company_id, substr(event_date, 1, 10) as checkpoint_date
FROM event_predictions
WHERE substr(event_date, 1, 10) NOT IN (
SELECT as_of FROM trade_signals WHERE as_of IS NOT NULL AND company_id = event_predictions.company_id
)
GROUP BY company_id, substr(event_date, 1, 10)
HAVING COUNT(*) >= 3
ORDER BY checkpoint_date DESC
LIMIT 1
`);
const getPredictions = intelligenceDb.prepare(`
SELECT type, direction, magnitude, timeframe, rationale, event_date, id
FROM event_predictions
WHERE company_id = ?
AND substr(event_date, 1, 10) <= ?
ORDER BY event_date DESC
LIMIT 50
`);
const getFacts = intelligenceDb.prepare(`
SELECT claim, type, confidence, confirmation_count
FROM company_facts
WHERE company_id = ?
AND first_seen_at <= ?
ORDER BY confirmation_count DESC
LIMIT 40
`);
const getRelationships = intelligenceDb.prepare(`
SELECT relationship_type, to_entity, confidence, confirmation_count
FROM company_relationships
WHERE from_company_id = ?
AND first_seen_at <= ?
ORDER BY confirmation_count DESC
LIMIT 20
`);
const getCompanyById = intelligenceDb.prepare("SELECT * FROM tracked_companies WHERE id = ?");
const insertSignal = intelligenceDb.prepare(`
INSERT INTO trade_signals
(company_id, signal, confidence, timeframe, risk_level, risk_factors, summary, key_drivers, supporting_prediction_ids, window_days, as_of)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`);
const recordEvent = intelligenceDb.prepare(
`INSERT INTO worker_events (worker) VALUES ('signal')`
);
const pruneEvents = intelligenceDb.prepare(
`DELETE FROM worker_events WHERE worker = 'signal' AND completed_at < datetime('now', '-1 hour')`
);
let pruneCounter = 0;
while (true) {
try {
const next = getNextCheckpoint.get();
if (!next) {
await sleep(5000);
continue;
}
const { company_id, checkpoint_date } = next;
const company = getCompanyById.get(company_id);
if (!company) {
continue;
}
const predictions = getPredictions.all(company_id, checkpoint_date);
const facts = getFacts.all(company_id, checkpoint_date);
const relationships = getRelationships.all(company_id, checkpoint_date);
const prompt = buildPrompt(company.name, facts, relationships, predictions, checkpoint_date);
let result;
try {
result = await callLlm(llmConfig, prompt);
} catch (err) {
console.error(`[signal] LLM error for ${company.name} @ ${checkpoint_date}:`, err.message);
continue;
}
if (!result) {
console.log(`[signal] ${company.name} @ ${checkpoint_date} — LLM returned null, skipping`);
continue;
}
const predictionIds = predictions.map(p => p.id);
insertSignal.run(
company_id,
result.signal,
result.confidence,
result.timeframe,
result.risk_level,
JSON.stringify(result.risk_factors || []),
result.summary,
JSON.stringify(result.key_drivers || []),
JSON.stringify(predictionIds),
null,
checkpoint_date
);
recordEvent.run();
pruneCounter++;
if (pruneCounter >= 20) { pruneEvents.run(); pruneCounter = 0; }
console.log(`[signal] ${company.name} @ ${checkpoint_date}${result.signal} (${result.confidence} confidence, ${result.risk_level} risk)`);
} catch (err) {
console.error("[signal] cycle error:", err.message);
}
}
}
function buildPrompt(companyName, facts, relationships, predictions, asOf) {
const factsBlock = facts.length > 0
? facts.map(f => `- ${f.claim} (confirmed ${f.confirmation_count}x)`).join("\n")
: "No known facts yet.";
const relBlock = relationships.length > 0
? relationships.map(r => `- ${r.relationship_type}: ${r.to_entity} (${r.confidence})`).join("\n")
: "No known relationships.";
const predBlock = predictions.map((p, i) =>
`${i + 1}. [${p.type}] ${p.direction} / ${p.magnitude} / ${p.timeframe}${p.rationale || "no rationale"}`
).join("\n");
return `You are a financial intelligence analyst generating a trade signal for ${companyName} as of ${asOf}.
Known facts about ${companyName} (most confirmed first):
${factsBlock}
Known relationships:
${relBlock}
Event predictions up to ${asOf}:
${predBlock}
Generate a trade signal as JSON with this exact shape:
{
"signal": "BUY | HOLD | SELL",
"confidence": "low | medium | high | very_high",
"timeframe": "short | medium | long",
"risk_level": "low | medium | high | very_high",
"risk_factors": ["string", ...],
"key_drivers": ["string", ...],
"summary": "2-3 sentence plain English summary"
}
Risk factors should be derived from:
- Supply chain concentration (heavy dependence on single suppliers)
- Geopolitical exposure (relationships with entities in sensitive regions)
- Competitive threats (strong competitors gaining ground)
- Regulatory exposure (themes mentioning regulation or export controls)
- Negative prediction patterns in recent events
Only output valid JSON. Always respond in English.`;
}
async function callLlm(llmConfig, prompt) {
const body = JSON.stringify({
model: llmConfig.llmModel || llmConfig.model,
messages: [{ role: "user", content: prompt }],
temperature: 0.1,
});
const url = new URL("https://openrouter.ai/api/v1/chat/completions");
const responseText = await httpPost(url, body, {
"Content-Type": "application/json",
"Authorization": `Bearer ${llmConfig.apiKey || ""}`,
});
let parsed;
try {
parsed = JSON.parse(responseText);
} catch (_) {
throw new Error(`LLM response not JSON: ${responseText.slice(0, 300)}`);
}
const content = parsed.choices?.[0]?.message?.content;
if (!content) return null;
const stripped = content.replace(/^```(?:json)?\s*/i, "").replace(/\s*```$/, "").trim();
return JSON.parse(stripped);
}
function httpPost(url, body, headers) {
return new Promise((resolve, reject) => {
const lib = url.protocol === "https:" ? https : http;
const req = lib.request({
hostname: url.hostname,
port: url.port || (url.protocol === "https:" ? 443 : 80),
path: url.pathname + url.search,
method: "POST",
headers: { ...headers, "Content-Length": Buffer.byteLength(body) },
}, (res) => {
let data = "";
res.on("data", chunk => data += chunk);
res.on("end", () => {
if (res.statusCode >= 200 && res.statusCode < 300) resolve(data);
else reject(new Error(`LLM ${res.statusCode}: ${data.slice(0, 300)}`));
});
});
req.on("error", reject);
req.write(body);
req.end();
});
}
function sleep(ms) {
return new Promise(r => setTimeout(r, ms));
}
module.exports = { runSignalWorker };