From 5a9a2e4c6dfd0fa6fd904ca69ce2cda3af17f473 Mon Sep 17 00:00:00 2001 From: ImBenji Date: Mon, 27 Apr 2026 19:08:54 +0100 Subject: [PATCH] refactor: load environment variables from .env file and update openRouter configuration --- workers/signalWorker.js | 157 +++++++++++++++++++++++++--------------- 1 file changed, 100 insertions(+), 57 deletions(-) diff --git a/workers/signalWorker.js b/workers/signalWorker.js index 1417b7d..ecb41a0 100644 --- a/workers/signalWorker.js +++ b/workers/signalWorker.js @@ -1,9 +1,10 @@ const https = require("https"); const http = require("http"); +const CONCURRENCY = 4; + async function runSignalWorker(archiveDb, intelligenceDb, config) { - const loopDelay = config.workers?.signalLoopDelayMs ?? 120000; const llmConfig = config.openRouter || {}; // add as_of column if it doesnt exist yet @@ -70,69 +71,111 @@ async function runSignalWorker(archiveDb, intelligenceDb, config) { `DELETE FROM worker_events WHERE worker = 'signal' AND completed_at < datetime('now', '-1 hour')` ); + // in-process claim set — prevents concurrent workers from grabbing same checkpoint + const inFlight = new Set(); let pruneCounter = 0; - while (true) { - try { - const next = getNextCheckpoint.get(); - - if (!next) { - await sleep(5000); - continue; - } - - const { company_id, checkpoint_date } = next; - const company = getCompanyById.get(company_id); - - if (!company) { - continue; - } - - const predictions = getPredictions.all(company_id, checkpoint_date); - const facts = getFacts.all(company_id, checkpoint_date); - const relationships = getRelationships.all(company_id, checkpoint_date); - - const prompt = buildPrompt(company.name, facts, relationships, predictions, checkpoint_date); - - let result; + async function workerLoop(id) { + while (true) { try { - result = await callLlm(llmConfig, prompt); + // find next checkpoint not already claimed or done + let next = null; + + // keep scanning until we find one not in-flight + const candidates = intelligenceDb.prepare(` + SELECT company_id, substr(event_date, 1, 10) as checkpoint_date + FROM event_predictions + WHERE substr(event_date, 1, 10) NOT IN ( + SELECT as_of FROM trade_signals WHERE as_of IS NOT NULL AND company_id = event_predictions.company_id + ) + GROUP BY company_id, substr(event_date, 1, 10) + HAVING COUNT(*) >= 3 + ORDER BY checkpoint_date DESC + LIMIT 20 + `).all(); + + for (const c of candidates) { + const key = `${c.company_id}|${c.checkpoint_date}`; + if (!inFlight.has(key)) { + next = c; + inFlight.add(key); + break; + } + } + + if (!next) { + await sleep(5000); + continue; + } + + const { company_id, checkpoint_date } = next; + const key = `${company_id}|${checkpoint_date}`; + + const company = getCompanyById.get(company_id); + + if (!company) { + inFlight.delete(key); + continue; + } + + const predictions = getPredictions.all(company_id, checkpoint_date); + const facts = getFacts.all(company_id, checkpoint_date); + const relationships = getRelationships.all(company_id, checkpoint_date); + + const prompt = buildPrompt(company.name, facts, relationships, predictions, checkpoint_date); + + let result; + try { + result = await callLlm(llmConfig, prompt); + } catch (err) { + console.error(`[signal:${id}] LLM error for ${company.name} @ ${checkpoint_date}:`, err.message); + inFlight.delete(key); + continue; + } + + if (!result) { + console.log(`[signal:${id}] ${company.name} @ ${checkpoint_date} — LLM returned null, skipping`); + inFlight.delete(key); + continue; + } + + const predictionIds = predictions.map(p => p.id); + + insertSignal.run( + company_id, + result.signal, + result.confidence, + result.timeframe, + result.risk_level, + JSON.stringify(result.risk_factors || []), + result.summary, + JSON.stringify(result.key_drivers || []), + JSON.stringify(predictionIds), + null, + checkpoint_date + ); + + inFlight.delete(key); + + recordEvent.run(); + pruneCounter++; + if (pruneCounter >= 20) { pruneEvents.run(); pruneCounter = 0; } + + console.log(`[signal:${id}] ${company.name} @ ${checkpoint_date} — ${result.signal} (${result.confidence} confidence, ${result.risk_level} risk)`); + } catch (err) { - console.error(`[signal] LLM error for ${company.name} @ ${checkpoint_date}:`, err.message); - continue; + console.error(`[signal:${id}] cycle error:`, err.message); } - - if (!result) { - console.log(`[signal] ${company.name} @ ${checkpoint_date} — LLM returned null, skipping`); - continue; - } - - const predictionIds = predictions.map(p => p.id); - - insertSignal.run( - company_id, - result.signal, - result.confidence, - result.timeframe, - result.risk_level, - JSON.stringify(result.risk_factors || []), - result.summary, - JSON.stringify(result.key_drivers || []), - JSON.stringify(predictionIds), - null, - checkpoint_date - ); - - recordEvent.run(); - pruneCounter++; - if (pruneCounter >= 20) { pruneEvents.run(); pruneCounter = 0; } - - console.log(`[signal] ${company.name} @ ${checkpoint_date} — ${result.signal} (${result.confidence} confidence, ${result.risk_level} risk)`); - - } catch (err) { - console.error("[signal] cycle error:", err.message); } } + + // spin up CONCURRENCY workers + const workers = []; + for (let i = 0; i < CONCURRENCY; i++) { + workers.push(workerLoop(i + 1)); + } + + await Promise.all(workers); }