add worker event tracking; implement worker rates display in admin panel

This commit is contained in:
ImBenji 2026-04-23 17:13:57 +01:00
parent 16d95f5392
commit 30104c6e66
6 changed files with 145 additions and 23 deletions

View file

@ -715,6 +715,11 @@
<span>Show untracked entities</span> <span>Show untracked entities</span>
</label> </label>
<label style="display:flex; align-items:center; gap:6px; margin-left:16px; user-select:none">
<span>Min confirmations</span>
<input type="number" id="graph-min-count" value="2" min="1" style="width:52px; min-width:unset; padding:3px 6px; font-size:12px" />
</label>
<span style="margin-left:auto; color:var(--muted-dark)">Scroll to zoom · drag nodes · click for facts</span> <span style="margin-left:auto; color:var(--muted-dark)">Scroll to zoom · drag nodes · click for facts</span>
</div> </div>
</div> </div>
@ -739,6 +744,14 @@
<!-- Stats tab --> <!-- Stats tab -->
<div id="tab-stats" style="display:none"> <div id="tab-stats" style="display:none">
<div style="margin-bottom:28px">
<div class="section-heading">Worker throughput</div>
<div id="worker-rates-row" style="display:flex; gap:12px; flex-wrap:wrap; margin-top:10px">
<span style="color:var(--muted); font-size:13px">Loading...</span>
</div>
</div>
<div style="display:flex; gap:32px; flex-wrap:wrap; padding-top:4px"> <div style="display:flex; gap:32px; flex-wrap:wrap; padding-top:4px">
<div> <div>
<div class="section-heading">By source</div> <div class="section-heading">By source</div>
@ -887,6 +900,34 @@ async function loadStats() {
document.getElementById('statusTable').innerHTML = data.byStatus document.getElementById('statusTable').innerHTML = data.byStatus
.map(r => `<tr><td>${badgeHtml(r.status === 'null' ? null : r.status)}</td><td style="text-align:right; padding-left:24px">${r.n.toLocaleString()}</td></tr>`).join(''); .map(r => `<tr><td>${badgeHtml(r.status === 'null' ? null : r.status)}</td><td style="text-align:right; padding-left:24px">${r.n.toLocaleString()}</td></tr>`).join('');
renderWorkerRates();
}
function renderWorkerRates() {
const rates = window._workerRates || [];
const el = document.getElementById('worker-rates-row');
if (!el) return;
const workerLabels = { augor: 'Augor (events)', consolidation: 'Consolidation (companies)', graph: 'Graph (edges)' };
if (rates.length === 0) {
el.innerHTML = '<span style="color:var(--muted); font-size:13px">No data yet — worker_events table may not exist until workers restart.</span>';
return;
}
el.innerHTML = rates.map(r => {
const label = workerLabels[r.worker] || r.worker;
const avg5m = (r.n5m / 5).toFixed(1);
const last1m = r.n1m;
return `
<div class="intel-stat-card" style="min-width:180px">
<span class="label">${label}</span>
<span class="value" style="font-size:18px">${avg5m}<span style="font-size:12px; font-weight:400; color:var(--muted)">/min</span></span>
<span style="font-size:11px; color:var(--muted-dark); margin-top:4px">${last1m} in last 1m</span>
</div>`;
}).join('');
} }
// ── source dropdown ──────────────────────────────────────────────────────── // ── source dropdown ────────────────────────────────────────────────────────
@ -1109,9 +1150,6 @@ async function loadIntelligenceStats() {
const queueMap = {}; const queueMap = {};
(data.queue || []).forEach(r => queueMap[r.status] = r.n); (data.queue || []).forEach(r => queueMap[r.status] = r.n);
const rate5m = (data.processed5m / 5).toFixed(1);
const rate1m = data.processed1m;
document.getElementById('intel-stats-row').innerHTML = [ document.getElementById('intel-stats-row').innerHTML = [
['Queue pending', (queueMap.pending || 0).toLocaleString()], ['Queue pending', (queueMap.pending || 0).toLocaleString()],
['Processed', (queueMap.processed || 0).toLocaleString()], ['Processed', (queueMap.processed || 0).toLocaleString()],
@ -1119,8 +1157,6 @@ async function loadIntelligenceStats() {
['Knowledge rows', data.knowledge.toLocaleString()], ['Knowledge rows', data.knowledge.toLocaleString()],
['Predictions', data.predictions.toLocaleString()], ['Predictions', data.predictions.toLocaleString()],
['Companies', `${data.embeddings}/${data.companies} embedded`], ['Companies', `${data.embeddings}/${data.companies} embedded`],
['Rate (5m avg)', `${rate5m}/min`],
['Rate (last 1m)', `${rate1m}/min`],
].map(([label, value]) => ` ].map(([label, value]) => `
<div class="intel-stat-card"> <div class="intel-stat-card">
<span class="label">${label}</span> <span class="label">${label}</span>
@ -1128,6 +1164,9 @@ async function loadIntelligenceStats() {
</div> </div>
`).join(''); `).join('');
// stash worker rates for the stats tab
window._workerRates = data.workerRates || [];
return true; return true;
} }
@ -1324,6 +1363,10 @@ document.getElementById('graph-show-untracked').addEventListener('change', () =>
if (graphAllNodes.length) renderIntelGraph(); if (graphAllNodes.length) renderIntelGraph();
}); });
document.getElementById('graph-min-count').addEventListener('change', () => {
if (graphAllNodes.length) renderIntelGraph();
});
function toggleGraphExpand() { function toggleGraphExpand() {
const wrap = document.getElementById('intel-graph-svg-wrap'); const wrap = document.getElementById('intel-graph-svg-wrap');
@ -1337,13 +1380,25 @@ function toggleGraphExpand() {
function renderIntelGraph() { function renderIntelGraph() {
const showUntracked = document.getElementById('graph-show-untracked').checked; const showUntracked = document.getElementById('graph-show-untracked').checked;
const minCount = parseInt(document.getElementById('graph-min-count').value, 10) || 1;
const nodes = showUntracked const filteredLinks = graphAllLinks.filter(l => (l.count || 1) >= minCount);
? graphAllNodes
: graphAllNodes.filter(n => n.tracked); // only show nodes that still have at least one edge after the count filter
const activeKeys = new Set();
for (const l of filteredLinks) {
const src = typeof l.source === 'object' ? l.source.key : l.source;
const tgt = typeof l.target === 'object' ? l.target.key : l.target;
activeKeys.add(src);
activeKeys.add(tgt);
}
const nodes = graphAllNodes.filter(n =>
activeKeys.has(n.key) && (showUntracked || n.tracked)
);
const visibleKeys = new Set(nodes.map(n => n.key)); const visibleKeys = new Set(nodes.map(n => n.key));
const links = graphAllLinks.filter(l => visibleKeys.has( const links = filteredLinks.filter(l => visibleKeys.has(
typeof l.source === 'object' ? l.source.key : l.source typeof l.source === 'object' ? l.source.key : l.source
) && visibleKeys.has( ) && visibleKeys.has(
typeof l.target === 'object' ? l.target.key : l.target typeof l.target === 'object' ? l.target.key : l.target
@ -1456,6 +1511,7 @@ function renderIntelGraph() {
// links — use <path> so textPath can align labels to the line // links — use <path> so textPath can align labels to the line
const linkG = g.append('g'); const linkG = g.append('g');
let linkLabels;
// stroke-width min 10 so text (7px) always fits inside the line // stroke-width min 10 so text (7px) always fits inside the line
const strokeWidth = d => 10 + Math.pow(d.count / maxCount, 0.55) * 8; const strokeWidth = d => 10 + Math.pow(d.count / maxCount, 0.55) * 8;
@ -1471,14 +1527,16 @@ function renderIntelGraph() {
.attr('stroke-opacity', d => 0.55 + (d.count / maxCount) * 0.35) .attr('stroke-opacity', d => 0.55 + (d.count / maxCount) * 0.35)
.on('mouseenter', function(ev, d) { .on('mouseenter', function(ev, d) {
d3.select(this).attr('stroke', '#60a5fa').attr('stroke-opacity', 1); d3.select(this).attr('stroke', '#60a5fa').attr('stroke-opacity', 1);
linkLabels.filter(l => l === d).attr('fill', '#f8fafc');
}) })
.on('mouseleave', function(ev, d) { .on('mouseleave', function(ev, d) {
d3.select(this).attr('stroke', '#334155') d3.select(this).attr('stroke', '#334155')
.attr('stroke-opacity', 0.2 + (d.count / maxCount) * 0.7); .attr('stroke-opacity', 0.2 + (d.count / maxCount) * 0.7);
linkLabels.filter(l => l === d).attr('fill', '#94a3b8');
}); });
// labels along each edge via textPath — font-size slightly smaller than stroke-width // labels along each edge via textPath — font-size slightly smaller than stroke-width
const linkLabels = linkG.selectAll('text.ig-elabel') linkLabels = linkG.selectAll('text.ig-elabel')
.data(linksCopy) .data(linksCopy)
.join('text') .join('text')
.attr('class', 'ig-elabel') .attr('class', 'ig-elabel')
@ -1769,7 +1827,7 @@ function switchTab(tab) {
location.hash = tab; location.hash = tab;
if (tab === 'events') loadEvents(); if (tab === 'events') loadEvents();
if (tab === 'stats') loadStats(); if (tab === 'stats') { loadStats(); loadIntelligenceStats(); }
if (tab === 'intelligence') { intelOffset = 0; loadIntelligenceStats().then(ok => { if (ok) { loadIntelligenceCompanies(); loadIntelligence(); } }); } if (tab === 'intelligence') { intelOffset = 0; loadIntelligenceStats().then(ok => { if (ok) { loadIntelligenceCompanies(); loadIntelligence(); } }); }
} }

View file

@ -11,6 +11,15 @@ async function runAugorWorker(archiveDb, intelligenceDb, config) {
SELECT * FROM article_queue WHERE status = 'pending' LIMIT 1 SELECT * FROM article_queue WHERE status = 'pending' LIMIT 1
`); `);
const recordEvent = intelligenceDb.prepare(
`INSERT INTO worker_events (worker) VALUES ('augor')`
);
const pruneEvents = intelligenceDb.prepare(
`DELETE FROM worker_events WHERE worker = 'augor' AND completed_at < datetime('now', '-1 hour')`
);
let pruneCounter = 0;
const setStatus = intelligenceDb.prepare(` const setStatus = intelligenceDb.prepare(`
UPDATE article_queue SET status = ?, updated_at = CURRENT_TIMESTAMP WHERE article_id = ? UPDATE article_queue SET status = ?, updated_at = CURRENT_TIMESTAMP WHERE article_id = ?
`); `);
@ -149,6 +158,9 @@ async function runAugorWorker(archiveDb, intelligenceDb, config) {
} }
for (const r of getEventArticleIds.all(eventId)) setStatusByArticleId.run(r.id); for (const r of getEventArticleIds.all(eventId)) setStatusByArticleId.run(r.id);
recordEvent.run();
pruneCounter++;
if (pruneCounter >= 100) { pruneEvents.run(); pruneCounter = 0; }
console.log(`[augor] processed event ${eventId} (${matchedCompanies.length} companies, ${eventArticles.length} articles)`); console.log(`[augor] processed event ${eventId} (${matchedCompanies.length} companies, ${eventArticles.length} articles)`);
} catch (err) { } catch (err) {

View file

@ -12,6 +12,13 @@ async function runConsolidationWorker(archiveDb, intelligenceDb, config) {
const getCompanies = intelligenceDb.prepare("SELECT * FROM tracked_companies"); const getCompanies = intelligenceDb.prepare("SELECT * FROM tracked_companies");
const recordEvent = intelligenceDb.prepare(
`INSERT INTO worker_events (worker) VALUES ('consolidation')`
);
const pruneEvents = intelligenceDb.prepare(
`DELETE FROM worker_events WHERE worker = 'consolidation' AND completed_at < datetime('now', '-1 hour')`
);
const getKnowledge = intelligenceDb.prepare(` const getKnowledge = intelligenceDb.prepare(`
SELECT * FROM event_knowledge WHERE company_id = ? ORDER BY event_date ASC SELECT * FROM event_knowledge WHERE company_id = ? ORDER BY event_date ASC
`); `);
@ -41,12 +48,16 @@ async function runConsolidationWorker(archiveDb, intelligenceDb, config) {
try { try {
const companies = getCompanies.all(); const companies = getCompanies.all();
let pruneCounter = 0;
for (const company of companies) { for (const company of companies) {
try { try {
await processCompany( await processCompany(
company, intelligenceDb, llmConfig, company, intelligenceDb, llmConfig,
getKnowledge, getExistingFirstSeen, deleteCompanyFacts, insertFact getKnowledge, getExistingFirstSeen, deleteCompanyFacts, insertFact
); );
recordEvent.run();
pruneCounter++;
if (pruneCounter >= 50) { pruneEvents.run(); pruneCounter = 0; }
} catch (err) { } catch (err) {
console.error(`[consolidation] error on ${company.name}:`, err.message); console.error(`[consolidation] error on ${company.name}:`, err.message);
} }

View file

@ -110,6 +110,18 @@ function runColumnMigrations(db) {
try { db.exec("ALTER TABLE event_predictions ADD COLUMN event_date TEXT"); } catch (_) {} try { db.exec("ALTER TABLE event_predictions ADD COLUMN event_date TEXT"); } catch (_) {}
try { db.exec("ALTER TABLE event_knowledge ADD COLUMN event_date TEXT"); } catch (_) {} try { db.exec("ALTER TABLE event_knowledge ADD COLUMN event_date TEXT"); } catch (_) {}
db.exec(`
CREATE TABLE IF NOT EXISTS worker_events (
id INTEGER PRIMARY KEY,
worker TEXT NOT NULL,
completed_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_worker_events_lookup ON worker_events (worker, completed_at);
`);
// prune rows older than 1 hour so the table doesnt grow unbounded
db.exec(`DELETE FROM worker_events WHERE completed_at < datetime('now', '-1 hour')`);
} }
function seedCompanies(db) { function seedCompanies(db) {

View file

@ -105,6 +105,13 @@ async function runGraphWorker(archiveDb, intelligenceDb, config) {
SELECT * FROM company_facts WHERE type = 'relationship' SELECT * FROM company_facts WHERE type = 'relationship'
`); `);
const recordEvents = intelligenceDb.prepare(
`INSERT INTO worker_events (worker) VALUES ('graph')`
);
const pruneEvents = intelligenceDb.prepare(
`DELETE FROM worker_events WHERE worker = 'graph' AND completed_at < datetime('now', '-1 hour')`
);
const getCompanies = intelligenceDb.prepare("SELECT * FROM tracked_companies"); const getCompanies = intelligenceDb.prepare("SELECT * FROM tracked_companies");
const getCompanyById = intelligenceDb.prepare("SELECT * FROM tracked_companies WHERE id = ?"); const getCompanyById = intelligenceDb.prepare("SELECT * FROM tracked_companies WHERE id = ?");
@ -236,6 +243,15 @@ async function runGraphWorker(archiveDb, intelligenceDb, config) {
processAll(); processAll();
// record one event per edge upserted so the rate tracks actual work
if (upserted > 0) {
const insertMany = intelligenceDb.transaction((n) => {
for (let i = 0; i < n; i++) recordEvents.run();
});
insertMany(upserted);
pruneEvents.run();
}
if (upserted > 0 || reciprocals > 0) { if (upserted > 0 || reciprocals > 0) {
console.log(`[graph] cycle complete — ${upserted} edges upserted, ${reciprocals} reciprocals added`); console.log(`[graph] cycle complete — ${upserted} edges upserted, ${reciprocals} reciprocals added`);
} else { } else {

View file

@ -234,17 +234,19 @@ async function adminRoutes(fastify) {
const companies = db.prepare(`SELECT COUNT(*) as n FROM tracked_companies`).get().n; const companies = db.prepare(`SELECT COUNT(*) as n FROM tracked_companies`).get().n;
const embeddings = db.prepare(`SELECT COUNT(*) as n FROM company_embeddings`).get().n; const embeddings = db.prepare(`SELECT COUNT(*) as n FROM company_embeddings`).get().n;
const processed5m = db.prepare(` let workerRates = [];
SELECT COUNT(*) as n FROM article_queue try {
WHERE status = 'processed' AND updated_at >= datetime('now', '-5 minutes') workerRates = db.prepare(`
`).get().n; SELECT
worker,
SUM(CASE WHEN completed_at >= datetime('now', '-5 minutes') THEN 1 ELSE 0 END) as n5m,
SUM(CASE WHEN completed_at >= datetime('now', '-1 minute') THEN 1 ELSE 0 END) as n1m
FROM worker_events
GROUP BY worker
`).all();
} catch (_) {}
const processed1m = db.prepare(` return { available: true, queue, knowledge, predictions, companies, embeddings, workerRates };
SELECT COUNT(*) as n FROM article_queue
WHERE status = 'processed' AND updated_at >= datetime('now', '-1 minute')
`).get().n;
return { available: true, queue, knowledge, predictions, companies, embeddings, processed5m, processed1m };
}); });
fastify.get('/admin/api/intelligence/companies', async (request, reply) => { fastify.get('/admin/api/intelligence/companies', async (request, reply) => {
@ -333,10 +335,21 @@ async function adminRoutes(fastify) {
const allTracked = idb.prepare(`SELECT id, name, ticker FROM tracked_companies`).all(); const allTracked = idb.prepare(`SELECT id, name, ticker FROM tracked_companies`).all();
// untracked entities — appear only as to_entity with no to_company_id // build a lowercase name+alias set so we can exclude untracked entities
// that are just unresolved references to tracked companies
const trackedNameSet = new Set();
for (const c of allTracked) {
trackedNameSet.add(c.name.toLowerCase());
trackedNameSet.add(c.ticker.toLowerCase());
}
const untrackedSeen = new Set(); const untrackedSeen = new Set();
for (const e of edges) { for (const e of edges) {
if (!e.to_company_id && e.to_entity) untrackedSeen.add(e.to_entity); if (!e.to_company_id && e.to_entity) {
if (!trackedNameSet.has(e.to_entity.toLowerCase())) {
untrackedSeen.add(e.to_entity);
}
}
} }
const nodes = [ const nodes = [