refactor: update worker commands and add new scripts for API rebuilding and queue feeding

This commit is contained in:
ImBenji 2026-04-27 17:30:52 +01:00
parent 04966fac55
commit f00b1a9640

View file

@ -6,45 +6,60 @@ async function runSignalWorker(archiveDb, intelligenceDb, config) {
const loopDelay = config.workers?.signalLoopDelayMs ?? 120000;
const llmConfig = config.openRouter || {};
const getCompanies = intelligenceDb.prepare("SELECT * FROM tracked_companies ORDER BY id");
// add as_of column if it doesnt exist yet
try {
intelligenceDb.prepare("ALTER TABLE trade_signals ADD COLUMN as_of TEXT").run();
console.log("[signal] added as_of column to trade_signals");
} catch (_) {
// already exists
}
const getLastSignal = intelligenceDb.prepare(`
SELECT generated_at FROM trade_signals WHERE company_id = ? ORDER BY generated_at DESC LIMIT 1
`);
const getFacts = intelligenceDb.prepare(`
SELECT claim, type, confidence, confirmation_count
FROM company_facts
WHERE company_id = ?
ORDER BY confirmation_count DESC
LIMIT 40
// all distinct event dates (by day) per company, newest first, that dont already have a signal
const getNextCheckpoint = intelligenceDb.prepare(`
SELECT company_id, substr(event_date, 1, 10) as checkpoint_date
FROM event_predictions
WHERE substr(event_date, 1, 10) NOT IN (
SELECT as_of FROM trade_signals WHERE as_of IS NOT NULL AND company_id = event_predictions.company_id
)
GROUP BY company_id, substr(event_date, 1, 10)
HAVING COUNT(*) >= 3
ORDER BY checkpoint_date DESC
LIMIT 1
`);
const getPredictions = intelligenceDb.prepare(`
SELECT type, direction, magnitude, timeframe, rationale, event_date, id
FROM event_predictions
WHERE company_id = ?
AND created_at >= datetime('now', '-90 days')
ORDER BY created_at DESC
AND substr(event_date, 1, 10) <= ?
ORDER BY event_date DESC
LIMIT 50
`);
const getFacts = intelligenceDb.prepare(`
SELECT claim, type, confidence, confirmation_count
FROM company_facts
WHERE company_id = ?
AND first_seen_at <= ?
ORDER BY confirmation_count DESC
LIMIT 40
`);
const getRelationships = intelligenceDb.prepare(`
SELECT relationship_type, to_entity, confidence, confirmation_count
FROM company_relationships
WHERE from_company_id = ?
AND first_seen_at <= ?
ORDER BY confirmation_count DESC
LIMIT 20
`);
const deleteSignal = intelligenceDb.prepare(
"DELETE FROM trade_signals WHERE company_id = ?"
);
const getCompanyById = intelligenceDb.prepare("SELECT * FROM tracked_companies WHERE id = ?");
const insertSignal = intelligenceDb.prepare(`
INSERT INTO trade_signals
(company_id, signal, confidence, timeframe, risk_level, risk_factors, summary, key_drivers, supporting_prediction_ids, window_days)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 90)
(company_id, signal, confidence, timeframe, risk_level, risk_factors, summary, key_drivers, supporting_prediction_ids, window_days, as_of)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`);
const recordEvent = intelligenceDb.prepare(
@ -59,85 +74,63 @@ async function runSignalWorker(archiveDb, intelligenceDb, config) {
while (true) {
try {
const companies = getCompanies.all();
const next = getNextCheckpoint.get();
// pick the company with the oldest (or missing) signal that has enough predictions
let target = null;
let oldestTs = null;
const companiesByAge = [];
for (const company of companies) {
const row = getLastSignal.get(company.id);
companiesByAge.push({ company, ts: row ? row.generated_at : null });
}
// null ts (never generated) first, then oldest
companiesByAge.sort((a, b) => {
if (a.ts === null && b.ts === null) return 0;
if (a.ts === null) return -1;
if (b.ts === null) return 1;
return a.ts < b.ts ? -1 : 1;
});
let predictions = null;
for (const entry of companiesByAge) {
const preds = getPredictions.all(entry.company.id);
if (preds.length >= 3) {
target = entry.company;
predictions = preds;
break;
}
}
if (!target) {
if (!next) {
await sleep(loopDelay);
continue;
}
const facts = getFacts.all(target.id);
const relationships = getRelationships.all(target.id);
const { company_id, checkpoint_date } = next;
const company = getCompanyById.get(company_id);
const prompt = buildPrompt(target.name, facts, relationships, predictions);
if (!company) {
await sleep(loopDelay);
continue;
}
const predictions = getPredictions.all(company_id, checkpoint_date);
const facts = getFacts.all(company_id, checkpoint_date);
const relationships = getRelationships.all(company_id, checkpoint_date);
const prompt = buildPrompt(company.name, facts, relationships, predictions, checkpoint_date);
let result;
try {
result = await callLlm(llmConfig, prompt);
} catch (err) {
console.error(`[signal] LLM error for ${target.name}:`, err.message);
console.error(`[signal] LLM error for ${company.name} @ ${checkpoint_date}:`, err.message);
await sleep(loopDelay);
continue;
}
if (!result) {
console.log(`[signal] ${target.name} — LLM returned null, skipping`);
console.log(`[signal] ${company.name} @ ${checkpoint_date} — LLM returned null, skipping`);
await sleep(loopDelay);
continue;
}
const predictionIds = predictions.map(p => p.id);
const write = intelligenceDb.transaction(() => {
deleteSignal.run(target.id);
insertSignal.run(
target.id,
result.signal,
result.confidence,
result.timeframe,
result.risk_level,
JSON.stringify(result.risk_factors || []),
result.summary,
JSON.stringify(result.key_drivers || []),
JSON.stringify(predictionIds)
);
});
write();
insertSignal.run(
company_id,
result.signal,
result.confidence,
result.timeframe,
result.risk_level,
JSON.stringify(result.risk_factors || []),
result.summary,
JSON.stringify(result.key_drivers || []),
JSON.stringify(predictionIds),
null,
checkpoint_date
);
recordEvent.run();
pruneCounter++;
if (pruneCounter >= 20) { pruneEvents.run(); pruneCounter = 0; }
console.log(`[signal] ${target.name}${result.signal} (${result.confidence} confidence, ${result.risk_level} risk)`);
console.log(`[signal] ${company.name} @ ${checkpoint_date}${result.signal} (${result.confidence} confidence, ${result.risk_level} risk)`);
} catch (err) {
console.error("[signal] cycle error:", err.message);
@ -148,7 +141,7 @@ async function runSignalWorker(archiveDb, intelligenceDb, config) {
}
function buildPrompt(companyName, facts, relationships, predictions) {
function buildPrompt(companyName, facts, relationships, predictions, asOf) {
const factsBlock = facts.length > 0
? facts.map(f => `- ${f.claim} (confirmed ${f.confirmation_count}x)`).join("\n")
: "No known facts yet.";
@ -161,7 +154,7 @@ function buildPrompt(companyName, facts, relationships, predictions) {
`${i + 1}. [${p.type}] ${p.direction} / ${p.magnitude} / ${p.timeframe}${p.rationale || "no rationale"}`
).join("\n");
return `You are a financial intelligence analyst generating a trade signal for ${companyName}.
return `You are a financial intelligence analyst generating a trade signal for ${companyName} as of ${asOf}.
Known facts about ${companyName} (most confirmed first):
${factsBlock}
@ -169,7 +162,7 @@ ${factsBlock}
Known relationships:
${relBlock}
Recent event predictions (last 90 days):
Event predictions up to ${asOf}:
${predBlock}
Generate a trade signal as JSON with this exact shape: