const https = require("https"); const http = require("http"); const CONCURRENCY = 4; async function runSignalWorker(archiveDb, intelligenceDb, config) { const llmConfig = config.openRouter || {}; // add as_of column if it doesnt exist yet try { intelligenceDb.prepare("ALTER TABLE trade_signals ADD COLUMN as_of TEXT").run(); console.log("[signal] added as_of column to trade_signals"); } catch (_) { // already exists } // all distinct event dates (by day) per company, newest first, that dont already have a signal const getNextCheckpoint = intelligenceDb.prepare(` SELECT company_id, substr(event_date, 1, 10) as checkpoint_date FROM event_predictions WHERE substr(event_date, 1, 10) NOT IN ( SELECT as_of FROM trade_signals WHERE as_of IS NOT NULL AND company_id = event_predictions.company_id ) GROUP BY company_id, substr(event_date, 1, 10) HAVING COUNT(*) >= 3 ORDER BY checkpoint_date DESC LIMIT 1 `); const getPredictions = intelligenceDb.prepare(` SELECT type, direction, magnitude, timeframe, rationale, event_date, id FROM event_predictions WHERE company_id = ? AND substr(event_date, 1, 10) <= ? ORDER BY event_date DESC LIMIT 50 `); const getFacts = intelligenceDb.prepare(` SELECT claim, type, confidence, confirmation_count FROM company_facts WHERE company_id = ? AND first_seen_at <= ? ORDER BY confirmation_count DESC LIMIT 40 `); const getRelationships = intelligenceDb.prepare(` SELECT relationship_type, to_entity, confidence, confirmation_count FROM company_relationships WHERE from_company_id = ? AND first_seen_at <= ? ORDER BY confirmation_count DESC LIMIT 20 `); const getCompanyById = intelligenceDb.prepare("SELECT * FROM tracked_companies WHERE id = ?"); const insertSignal = intelligenceDb.prepare(` INSERT INTO trade_signals (company_id, signal, confidence, timeframe, risk_level, risk_factors, summary, key_drivers, supporting_prediction_ids, window_days, as_of) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `); const recordEvent = intelligenceDb.prepare( `INSERT INTO worker_events (worker) VALUES ('signal')` ); const pruneEvents = intelligenceDb.prepare( `DELETE FROM worker_events WHERE worker = 'signal' AND completed_at < datetime('now', '-1 hour')` ); // in-process claim set — prevents concurrent workers from grabbing same checkpoint const inFlight = new Set(); let pruneCounter = 0; async function workerLoop(id) { while (true) { try { // find next checkpoint not already claimed or done let next = null; // keep scanning until we find one not in-flight const candidates = intelligenceDb.prepare(` SELECT company_id, substr(event_date, 1, 10) as checkpoint_date FROM event_predictions WHERE substr(event_date, 1, 10) NOT IN ( SELECT as_of FROM trade_signals WHERE as_of IS NOT NULL AND company_id = event_predictions.company_id ) GROUP BY company_id, substr(event_date, 1, 10) HAVING COUNT(*) >= 3 ORDER BY checkpoint_date DESC LIMIT 20 `).all(); for (const c of candidates) { const key = `${c.company_id}|${c.checkpoint_date}`; if (!inFlight.has(key)) { next = c; inFlight.add(key); break; } } if (!next) { await sleep(5000); continue; } const { company_id, checkpoint_date } = next; const key = `${company_id}|${checkpoint_date}`; const company = getCompanyById.get(company_id); if (!company) { inFlight.delete(key); continue; } const predictions = getPredictions.all(company_id, checkpoint_date); const facts = getFacts.all(company_id, checkpoint_date); const relationships = getRelationships.all(company_id, checkpoint_date); const prompt = buildPrompt(company.name, facts, relationships, predictions, checkpoint_date); let result; try { result = await callLlm(llmConfig, prompt); } catch (err) { console.error(`[signal:${id}] LLM error for ${company.name} @ ${checkpoint_date}:`, err.message); inFlight.delete(key); continue; } if (!result) { console.log(`[signal:${id}] ${company.name} @ ${checkpoint_date} — LLM returned null, skipping`); inFlight.delete(key); continue; } const predictionIds = predictions.map(p => p.id); insertSignal.run( company_id, result.signal, result.confidence, result.timeframe, result.risk_level, JSON.stringify(result.risk_factors || []), result.summary, JSON.stringify(result.key_drivers || []), JSON.stringify(predictionIds), null, checkpoint_date ); inFlight.delete(key); recordEvent.run(); pruneCounter++; if (pruneCounter >= 20) { pruneEvents.run(); pruneCounter = 0; } console.log(`[signal:${id}] ${company.name} @ ${checkpoint_date} — ${result.signal} (${result.confidence} confidence, ${result.risk_level} risk)`); } catch (err) { console.error(`[signal:${id}] cycle error:`, err.message); } } } // spin up CONCURRENCY workers const workers = []; for (let i = 0; i < CONCURRENCY; i++) { workers.push(workerLoop(i + 1)); } await Promise.all(workers); } function buildPrompt(companyName, facts, relationships, predictions, asOf) { const factsBlock = facts.length > 0 ? facts.map(f => `- ${f.claim} (confirmed ${f.confirmation_count}x)`).join("\n") : "No known facts yet."; const relBlock = relationships.length > 0 ? relationships.map(r => `- ${r.relationship_type}: ${r.to_entity} (${r.confidence})`).join("\n") : "No known relationships."; const predBlock = predictions.map((p, i) => `${i + 1}. [${p.type}] ${p.direction} / ${p.magnitude} / ${p.timeframe} — ${p.rationale || "no rationale"}` ).join("\n"); return `You are a financial intelligence analyst generating a trade signal for ${companyName} as of ${asOf}. Known facts about ${companyName} (most confirmed first): ${factsBlock} Known relationships: ${relBlock} Event predictions up to ${asOf}: ${predBlock} Generate a trade signal as JSON with this exact shape: { "signal": "BUY | HOLD | SELL", "confidence": "low | medium | high | very_high", "timeframe": "short | medium | long", "risk_level": "low | medium | high | very_high", "risk_factors": ["string", ...], "key_drivers": ["string", ...], "summary": "2-3 sentence plain English summary" } Risk factors should be derived from: - Supply chain concentration (heavy dependence on single suppliers) - Geopolitical exposure (relationships with entities in sensitive regions) - Competitive threats (strong competitors gaining ground) - Regulatory exposure (themes mentioning regulation or export controls) - Negative prediction patterns in recent events Only output valid JSON. Always respond in English.`; } async function callLlm(llmConfig, prompt) { const body = JSON.stringify({ model: llmConfig.llmModel || llmConfig.model, messages: [{ role: "user", content: prompt }], temperature: 0.1, }); const url = new URL("https://openrouter.ai/api/v1/chat/completions"); const responseText = await httpPost(url, body, { "Content-Type": "application/json", "Authorization": `Bearer ${llmConfig.apiKey || ""}`, }); let parsed; try { parsed = JSON.parse(responseText); } catch (_) { throw new Error(`LLM response not JSON: ${responseText.slice(0, 300)}`); } const content = parsed.choices?.[0]?.message?.content; if (!content) return null; const stripped = content.replace(/^```(?:json)?\s*/i, "").replace(/\s*```$/, "").trim(); return JSON.parse(stripped); } function httpPost(url, body, headers) { return new Promise((resolve, reject) => { const lib = url.protocol === "https:" ? https : http; const req = lib.request({ hostname: url.hostname, port: url.port || (url.protocol === "https:" ? 443 : 80), path: url.pathname + url.search, method: "POST", headers: { ...headers, "Content-Length": Buffer.byteLength(body) }, }, (res) => { let data = ""; res.on("data", chunk => data += chunk); res.on("end", () => { if (res.statusCode >= 200 && res.statusCode < 300) resolve(data); else reject(new Error(`LLM ${res.statusCode}: ${data.slice(0, 300)}`)); }); }); req.on("error", reject); req.write(body); req.end(); }); } function sleep(ms) { return new Promise(r => setTimeout(r, ms)); } module.exports = { runSignalWorker };