From f00b1a96408e94c5cfdded2a2ea521918f9fa99b Mon Sep 17 00:00:00 2001 From: ImBenji Date: Mon, 27 Apr 2026 17:30:52 +0100 Subject: [PATCH] refactor: update worker commands and add new scripts for API rebuilding and queue feeding --- workers/signalWorker.js | 139 +++++++++++++++++++--------------------- 1 file changed, 66 insertions(+), 73 deletions(-) diff --git a/workers/signalWorker.js b/workers/signalWorker.js index c6d23ef..df86a71 100644 --- a/workers/signalWorker.js +++ b/workers/signalWorker.js @@ -6,45 +6,60 @@ async function runSignalWorker(archiveDb, intelligenceDb, config) { const loopDelay = config.workers?.signalLoopDelayMs ?? 120000; const llmConfig = config.openRouter || {}; - const getCompanies = intelligenceDb.prepare("SELECT * FROM tracked_companies ORDER BY id"); + // add as_of column if it doesnt exist yet + try { + intelligenceDb.prepare("ALTER TABLE trade_signals ADD COLUMN as_of TEXT").run(); + console.log("[signal] added as_of column to trade_signals"); + } catch (_) { + // already exists + } - const getLastSignal = intelligenceDb.prepare(` - SELECT generated_at FROM trade_signals WHERE company_id = ? ORDER BY generated_at DESC LIMIT 1 - `); - - const getFacts = intelligenceDb.prepare(` - SELECT claim, type, confidence, confirmation_count - FROM company_facts - WHERE company_id = ? - ORDER BY confirmation_count DESC - LIMIT 40 + // all distinct event dates (by day) per company, newest first, that dont already have a signal + const getNextCheckpoint = intelligenceDb.prepare(` + SELECT company_id, substr(event_date, 1, 10) as checkpoint_date + FROM event_predictions + WHERE substr(event_date, 1, 10) NOT IN ( + SELECT as_of FROM trade_signals WHERE as_of IS NOT NULL AND company_id = event_predictions.company_id + ) + GROUP BY company_id, substr(event_date, 1, 10) + HAVING COUNT(*) >= 3 + ORDER BY checkpoint_date DESC + LIMIT 1 `); const getPredictions = intelligenceDb.prepare(` SELECT type, direction, magnitude, timeframe, rationale, event_date, id FROM event_predictions WHERE company_id = ? - AND created_at >= datetime('now', '-90 days') - ORDER BY created_at DESC + AND substr(event_date, 1, 10) <= ? + ORDER BY event_date DESC LIMIT 50 `); + const getFacts = intelligenceDb.prepare(` + SELECT claim, type, confidence, confirmation_count + FROM company_facts + WHERE company_id = ? + AND first_seen_at <= ? + ORDER BY confirmation_count DESC + LIMIT 40 + `); + const getRelationships = intelligenceDb.prepare(` SELECT relationship_type, to_entity, confidence, confirmation_count FROM company_relationships WHERE from_company_id = ? + AND first_seen_at <= ? ORDER BY confirmation_count DESC LIMIT 20 `); - const deleteSignal = intelligenceDb.prepare( - "DELETE FROM trade_signals WHERE company_id = ?" - ); + const getCompanyById = intelligenceDb.prepare("SELECT * FROM tracked_companies WHERE id = ?"); const insertSignal = intelligenceDb.prepare(` INSERT INTO trade_signals - (company_id, signal, confidence, timeframe, risk_level, risk_factors, summary, key_drivers, supporting_prediction_ids, window_days) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 90) + (company_id, signal, confidence, timeframe, risk_level, risk_factors, summary, key_drivers, supporting_prediction_ids, window_days, as_of) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `); const recordEvent = intelligenceDb.prepare( @@ -59,85 +74,63 @@ async function runSignalWorker(archiveDb, intelligenceDb, config) { while (true) { try { - const companies = getCompanies.all(); + const next = getNextCheckpoint.get(); - // pick the company with the oldest (or missing) signal that has enough predictions - let target = null; - let oldestTs = null; - - const companiesByAge = []; - for (const company of companies) { - const row = getLastSignal.get(company.id); - companiesByAge.push({ company, ts: row ? row.generated_at : null }); - } - - // null ts (never generated) first, then oldest - companiesByAge.sort((a, b) => { - if (a.ts === null && b.ts === null) return 0; - if (a.ts === null) return -1; - if (b.ts === null) return 1; - return a.ts < b.ts ? -1 : 1; - }); - - let predictions = null; - for (const entry of companiesByAge) { - const preds = getPredictions.all(entry.company.id); - if (preds.length >= 3) { - target = entry.company; - predictions = preds; - break; - } - } - - if (!target) { + if (!next) { await sleep(loopDelay); continue; } - const facts = getFacts.all(target.id); - const relationships = getRelationships.all(target.id); + const { company_id, checkpoint_date } = next; + const company = getCompanyById.get(company_id); - const prompt = buildPrompt(target.name, facts, relationships, predictions); + if (!company) { + await sleep(loopDelay); + continue; + } + + const predictions = getPredictions.all(company_id, checkpoint_date); + const facts = getFacts.all(company_id, checkpoint_date); + const relationships = getRelationships.all(company_id, checkpoint_date); + + const prompt = buildPrompt(company.name, facts, relationships, predictions, checkpoint_date); let result; try { result = await callLlm(llmConfig, prompt); } catch (err) { - console.error(`[signal] LLM error for ${target.name}:`, err.message); + console.error(`[signal] LLM error for ${company.name} @ ${checkpoint_date}:`, err.message); await sleep(loopDelay); continue; } if (!result) { - console.log(`[signal] ${target.name} — LLM returned null, skipping`); + console.log(`[signal] ${company.name} @ ${checkpoint_date} — LLM returned null, skipping`); await sleep(loopDelay); continue; } const predictionIds = predictions.map(p => p.id); - const write = intelligenceDb.transaction(() => { - deleteSignal.run(target.id); - insertSignal.run( - target.id, - result.signal, - result.confidence, - result.timeframe, - result.risk_level, - JSON.stringify(result.risk_factors || []), - result.summary, - JSON.stringify(result.key_drivers || []), - JSON.stringify(predictionIds) - ); - }); - - write(); + insertSignal.run( + company_id, + result.signal, + result.confidence, + result.timeframe, + result.risk_level, + JSON.stringify(result.risk_factors || []), + result.summary, + JSON.stringify(result.key_drivers || []), + JSON.stringify(predictionIds), + null, + checkpoint_date + ); recordEvent.run(); pruneCounter++; if (pruneCounter >= 20) { pruneEvents.run(); pruneCounter = 0; } - console.log(`[signal] ${target.name} — ${result.signal} (${result.confidence} confidence, ${result.risk_level} risk)`); + console.log(`[signal] ${company.name} @ ${checkpoint_date} — ${result.signal} (${result.confidence} confidence, ${result.risk_level} risk)`); } catch (err) { console.error("[signal] cycle error:", err.message); @@ -148,7 +141,7 @@ async function runSignalWorker(archiveDb, intelligenceDb, config) { } -function buildPrompt(companyName, facts, relationships, predictions) { +function buildPrompt(companyName, facts, relationships, predictions, asOf) { const factsBlock = facts.length > 0 ? facts.map(f => `- ${f.claim} (confirmed ${f.confirmation_count}x)`).join("\n") : "No known facts yet."; @@ -161,7 +154,7 @@ function buildPrompt(companyName, facts, relationships, predictions) { `${i + 1}. [${p.type}] ${p.direction} / ${p.magnitude} / ${p.timeframe} — ${p.rationale || "no rationale"}` ).join("\n"); - return `You are a financial intelligence analyst generating a trade signal for ${companyName}. + return `You are a financial intelligence analyst generating a trade signal for ${companyName} as of ${asOf}. Known facts about ${companyName} (most confirmed first): ${factsBlock} @@ -169,7 +162,7 @@ ${factsBlock} Known relationships: ${relBlock} -Recent event predictions (last 90 days): +Event predictions up to ${asOf}: ${predBlock} Generate a trade signal as JSON with this exact shape: