By source
@@ -887,6 +900,34 @@ async function loadStats() {
document.getElementById('statusTable').innerHTML = data.byStatus
.map(r => `
| ${badgeHtml(r.status === 'null' ? null : r.status)} | ${r.n.toLocaleString()} |
`).join('');
+
+ renderWorkerRates();
+}
+
+function renderWorkerRates() {
+ const rates = window._workerRates || [];
+ const el = document.getElementById('worker-rates-row');
+ if (!el) return;
+
+ const workerLabels = { augor: 'Augor (events)', consolidation: 'Consolidation (companies)', graph: 'Graph (edges)' };
+
+ if (rates.length === 0) {
+ el.innerHTML = '
No data yet — worker_events table may not exist until workers restart.';
+ return;
+ }
+
+ el.innerHTML = rates.map(r => {
+ const label = workerLabels[r.worker] || r.worker;
+ const avg5m = (r.n5m / 5).toFixed(1);
+ const last1m = r.n1m;
+
+ return `
+
+ ${label}
+ ${avg5m}/min
+ ${last1m} in last 1m
+
`;
+ }).join('');
}
// ── source dropdown ────────────────────────────────────────────────────────
@@ -1109,9 +1150,6 @@ async function loadIntelligenceStats() {
const queueMap = {};
(data.queue || []).forEach(r => queueMap[r.status] = r.n);
- const rate5m = (data.processed5m / 5).toFixed(1);
- const rate1m = data.processed1m;
-
document.getElementById('intel-stats-row').innerHTML = [
['Queue pending', (queueMap.pending || 0).toLocaleString()],
['Processed', (queueMap.processed || 0).toLocaleString()],
@@ -1119,8 +1157,6 @@ async function loadIntelligenceStats() {
['Knowledge rows', data.knowledge.toLocaleString()],
['Predictions', data.predictions.toLocaleString()],
['Companies', `${data.embeddings}/${data.companies} embedded`],
- ['Rate (5m avg)', `${rate5m}/min`],
- ['Rate (last 1m)', `${rate1m}/min`],
].map(([label, value]) => `
${label}
@@ -1128,6 +1164,9 @@ async function loadIntelligenceStats() {
`).join('');
+ // stash worker rates for the stats tab
+ window._workerRates = data.workerRates || [];
+
return true;
}
@@ -1324,6 +1363,10 @@ document.getElementById('graph-show-untracked').addEventListener('change', () =>
if (graphAllNodes.length) renderIntelGraph();
});
+document.getElementById('graph-min-count').addEventListener('change', () => {
+ if (graphAllNodes.length) renderIntelGraph();
+});
+
function toggleGraphExpand() {
const wrap = document.getElementById('intel-graph-svg-wrap');
@@ -1337,13 +1380,25 @@ function toggleGraphExpand() {
function renderIntelGraph() {
const showUntracked = document.getElementById('graph-show-untracked').checked;
+ const minCount = parseInt(document.getElementById('graph-min-count').value, 10) || 1;
- const nodes = showUntracked
- ? graphAllNodes
- : graphAllNodes.filter(n => n.tracked);
+ const filteredLinks = graphAllLinks.filter(l => (l.count || 1) >= minCount);
+
+ // only show nodes that still have at least one edge after the count filter
+ const activeKeys = new Set();
+ for (const l of filteredLinks) {
+ const src = typeof l.source === 'object' ? l.source.key : l.source;
+ const tgt = typeof l.target === 'object' ? l.target.key : l.target;
+ activeKeys.add(src);
+ activeKeys.add(tgt);
+ }
+
+ const nodes = graphAllNodes.filter(n =>
+ activeKeys.has(n.key) && (showUntracked || n.tracked)
+ );
const visibleKeys = new Set(nodes.map(n => n.key));
- const links = graphAllLinks.filter(l => visibleKeys.has(
+ const links = filteredLinks.filter(l => visibleKeys.has(
typeof l.source === 'object' ? l.source.key : l.source
) && visibleKeys.has(
typeof l.target === 'object' ? l.target.key : l.target
@@ -1456,6 +1511,7 @@ function renderIntelGraph() {
// links — use
so textPath can align labels to the line
const linkG = g.append('g');
+ let linkLabels;
// stroke-width min 10 so text (7px) always fits inside the line
const strokeWidth = d => 10 + Math.pow(d.count / maxCount, 0.55) * 8;
@@ -1471,14 +1527,16 @@ function renderIntelGraph() {
.attr('stroke-opacity', d => 0.55 + (d.count / maxCount) * 0.35)
.on('mouseenter', function(ev, d) {
d3.select(this).attr('stroke', '#60a5fa').attr('stroke-opacity', 1);
+ linkLabels.filter(l => l === d).attr('fill', '#f8fafc');
})
.on('mouseleave', function(ev, d) {
d3.select(this).attr('stroke', '#334155')
.attr('stroke-opacity', 0.2 + (d.count / maxCount) * 0.7);
+ linkLabels.filter(l => l === d).attr('fill', '#94a3b8');
});
// labels along each edge via textPath — font-size slightly smaller than stroke-width
- const linkLabels = linkG.selectAll('text.ig-elabel')
+ linkLabels = linkG.selectAll('text.ig-elabel')
.data(linksCopy)
.join('text')
.attr('class', 'ig-elabel')
@@ -1769,7 +1827,7 @@ function switchTab(tab) {
location.hash = tab;
if (tab === 'events') loadEvents();
- if (tab === 'stats') loadStats();
+ if (tab === 'stats') { loadStats(); loadIntelligenceStats(); }
if (tab === 'intelligence') { intelOffset = 0; loadIntelligenceStats().then(ok => { if (ok) { loadIntelligenceCompanies(); loadIntelligence(); } }); }
}
diff --git a/intelligence/augorWorker.js b/intelligence/augorWorker.js
index bde8745..eeff7d0 100644
--- a/intelligence/augorWorker.js
+++ b/intelligence/augorWorker.js
@@ -11,6 +11,15 @@ async function runAugorWorker(archiveDb, intelligenceDb, config) {
SELECT * FROM article_queue WHERE status = 'pending' LIMIT 1
`);
+ const recordEvent = intelligenceDb.prepare(
+ `INSERT INTO worker_events (worker) VALUES ('augor')`
+ );
+
+ const pruneEvents = intelligenceDb.prepare(
+ `DELETE FROM worker_events WHERE worker = 'augor' AND completed_at < datetime('now', '-1 hour')`
+ );
+ let pruneCounter = 0;
+
const setStatus = intelligenceDb.prepare(`
UPDATE article_queue SET status = ?, updated_at = CURRENT_TIMESTAMP WHERE article_id = ?
`);
@@ -149,6 +158,9 @@ async function runAugorWorker(archiveDb, intelligenceDb, config) {
}
for (const r of getEventArticleIds.all(eventId)) setStatusByArticleId.run(r.id);
+ recordEvent.run();
+ pruneCounter++;
+ if (pruneCounter >= 100) { pruneEvents.run(); pruneCounter = 0; }
console.log(`[augor] processed event ${eventId} (${matchedCompanies.length} companies, ${eventArticles.length} articles)`);
} catch (err) {
diff --git a/intelligence/consolidationWorker.js b/intelligence/consolidationWorker.js
index 4510d27..6e863eb 100644
--- a/intelligence/consolidationWorker.js
+++ b/intelligence/consolidationWorker.js
@@ -12,6 +12,13 @@ async function runConsolidationWorker(archiveDb, intelligenceDb, config) {
const getCompanies = intelligenceDb.prepare("SELECT * FROM tracked_companies");
+ const recordEvent = intelligenceDb.prepare(
+ `INSERT INTO worker_events (worker) VALUES ('consolidation')`
+ );
+ const pruneEvents = intelligenceDb.prepare(
+ `DELETE FROM worker_events WHERE worker = 'consolidation' AND completed_at < datetime('now', '-1 hour')`
+ );
+
const getKnowledge = intelligenceDb.prepare(`
SELECT * FROM event_knowledge WHERE company_id = ? ORDER BY event_date ASC
`);
@@ -41,12 +48,16 @@ async function runConsolidationWorker(archiveDb, intelligenceDb, config) {
try {
const companies = getCompanies.all();
+ let pruneCounter = 0;
for (const company of companies) {
try {
await processCompany(
company, intelligenceDb, llmConfig,
getKnowledge, getExistingFirstSeen, deleteCompanyFacts, insertFact
);
+ recordEvent.run();
+ pruneCounter++;
+ if (pruneCounter >= 50) { pruneEvents.run(); pruneCounter = 0; }
} catch (err) {
console.error(`[consolidation] error on ${company.name}:`, err.message);
}
diff --git a/intelligence/db.js b/intelligence/db.js
index 5e8e440..f6c5e91 100644
--- a/intelligence/db.js
+++ b/intelligence/db.js
@@ -110,6 +110,18 @@ function runColumnMigrations(db) {
try { db.exec("ALTER TABLE event_predictions ADD COLUMN event_date TEXT"); } catch (_) {}
try { db.exec("ALTER TABLE event_knowledge ADD COLUMN event_date TEXT"); } catch (_) {}
+ db.exec(`
+ CREATE TABLE IF NOT EXISTS worker_events (
+ id INTEGER PRIMARY KEY,
+ worker TEXT NOT NULL,
+ completed_at DATETIME DEFAULT CURRENT_TIMESTAMP
+ );
+
+ CREATE INDEX IF NOT EXISTS idx_worker_events_lookup ON worker_events (worker, completed_at);
+ `);
+
+ // prune rows older than 1 hour so the table doesnt grow unbounded
+ db.exec(`DELETE FROM worker_events WHERE completed_at < datetime('now', '-1 hour')`);
}
function seedCompanies(db) {
diff --git a/intelligence/graphWorker.js b/intelligence/graphWorker.js
index 2845f7b..dd2ead8 100644
--- a/intelligence/graphWorker.js
+++ b/intelligence/graphWorker.js
@@ -105,6 +105,13 @@ async function runGraphWorker(archiveDb, intelligenceDb, config) {
SELECT * FROM company_facts WHERE type = 'relationship'
`);
+ const recordEvents = intelligenceDb.prepare(
+ `INSERT INTO worker_events (worker) VALUES ('graph')`
+ );
+ const pruneEvents = intelligenceDb.prepare(
+ `DELETE FROM worker_events WHERE worker = 'graph' AND completed_at < datetime('now', '-1 hour')`
+ );
+
const getCompanies = intelligenceDb.prepare("SELECT * FROM tracked_companies");
const getCompanyById = intelligenceDb.prepare("SELECT * FROM tracked_companies WHERE id = ?");
@@ -236,6 +243,15 @@ async function runGraphWorker(archiveDb, intelligenceDb, config) {
processAll();
+ // record one event per edge upserted so the rate tracks actual work
+ if (upserted > 0) {
+ const insertMany = intelligenceDb.transaction((n) => {
+ for (let i = 0; i < n; i++) recordEvents.run();
+ });
+ insertMany(upserted);
+ pruneEvents.run();
+ }
+
if (upserted > 0 || reciprocals > 0) {
console.log(`[graph] cycle complete — ${upserted} edges upserted, ${reciprocals} reciprocals added`);
} else {
diff --git a/src/routes/admin.js b/src/routes/admin.js
index f9fbe5d..fd75323 100644
--- a/src/routes/admin.js
+++ b/src/routes/admin.js
@@ -234,17 +234,19 @@ async function adminRoutes(fastify) {
const companies = db.prepare(`SELECT COUNT(*) as n FROM tracked_companies`).get().n;
const embeddings = db.prepare(`SELECT COUNT(*) as n FROM company_embeddings`).get().n;
- const processed5m = db.prepare(`
- SELECT COUNT(*) as n FROM article_queue
- WHERE status = 'processed' AND updated_at >= datetime('now', '-5 minutes')
- `).get().n;
+ let workerRates = [];
+ try {
+ workerRates = db.prepare(`
+ SELECT
+ worker,
+ SUM(CASE WHEN completed_at >= datetime('now', '-5 minutes') THEN 1 ELSE 0 END) as n5m,
+ SUM(CASE WHEN completed_at >= datetime('now', '-1 minute') THEN 1 ELSE 0 END) as n1m
+ FROM worker_events
+ GROUP BY worker
+ `).all();
+ } catch (_) {}
- const processed1m = db.prepare(`
- SELECT COUNT(*) as n FROM article_queue
- WHERE status = 'processed' AND updated_at >= datetime('now', '-1 minute')
- `).get().n;
-
- return { available: true, queue, knowledge, predictions, companies, embeddings, processed5m, processed1m };
+ return { available: true, queue, knowledge, predictions, companies, embeddings, workerRates };
});
fastify.get('/admin/api/intelligence/companies', async (request, reply) => {
@@ -333,10 +335,21 @@ async function adminRoutes(fastify) {
const allTracked = idb.prepare(`SELECT id, name, ticker FROM tracked_companies`).all();
- // untracked entities — appear only as to_entity with no to_company_id
+ // build a lowercase name+alias set so we can exclude untracked entities
+ // that are just unresolved references to tracked companies
+ const trackedNameSet = new Set();
+ for (const c of allTracked) {
+ trackedNameSet.add(c.name.toLowerCase());
+ trackedNameSet.add(c.ticker.toLowerCase());
+ }
+
const untrackedSeen = new Set();
for (const e of edges) {
- if (!e.to_company_id && e.to_entity) untrackedSeen.add(e.to_entity);
+ if (!e.to_company_id && e.to_entity) {
+ if (!trackedNameSet.has(e.to_entity.toLowerCase())) {
+ untrackedSeen.add(e.to_entity);
+ }
+ }
}
const nodes = [