add intelligence and SQL tabs to admin interface with corresponding API endpoints

This commit is contained in:
ImBenji
2026-04-22 20:50:08 +01:00
parent ac7c87c6cf
commit 18d062fd2d
9 changed files with 428 additions and 228 deletions
+25 -211
View File
@@ -9,8 +9,6 @@ sqliteVec.load(db);
db.pragma('journal_mode = WAL');
// the image column is retained as a no-op for backwards compat with old rows.
// new code never writes to it; drop in a future migration if you really want
db.exec(`
CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
@@ -21,97 +19,29 @@ db.exec(`
content_status TEXT,
content_error TEXT,
content_attempted_at TEXT,
content_attempt_count INTEGER NOT NULL DEFAULT 0,
content_retry_after TEXT,
is_index_page INTEGER NOT NULL DEFAULT 0,
has_embedding INTEGER NOT NULL DEFAULT 0,
url TEXT NOT NULL UNIQUE,
normalized_title TEXT NOT NULL,
source TEXT NOT NULL,
pub_date TEXT,
pub_date_effective TEXT,
language TEXT,
event_id INTEGER REFERENCES events(id),
ingested_at TEXT NOT NULL DEFAULT (datetime('now'))
);
`);
function rebuildArticlesTableIfNeeded() {
const indexes = db.prepare(`PRAGMA index_list('articles')`).all();
const hasUniqueNormalizedTitleIndex = indexes.some((index) => {
if (index.origin !== 'u' || !index.name) {
return false;
}
const columns = db.prepare(`PRAGMA index_info('${index.name.replace(/'/g, "''")}')`).all();
return columns.length === 1 && columns[0].name === 'normalized_title';
});
if (!hasUniqueNormalizedTitleIndex) {
return;
}
db.exec(`
BEGIN;
CREATE TABLE articles_rebuild (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
description TEXT,
content TEXT,
image TEXT,
content_status TEXT,
content_error TEXT,
content_attempted_at TEXT,
is_index_page INTEGER NOT NULL DEFAULT 0,
url TEXT NOT NULL UNIQUE,
normalized_title TEXT NOT NULL,
source TEXT NOT NULL,
pub_date TEXT,
ingested_at TEXT NOT NULL DEFAULT (datetime('now'))
);
INSERT INTO articles_rebuild (
id,
title,
description,
content,
image,
content_status,
content_error,
content_attempted_at,
is_index_page,
url,
normalized_title,
source,
pub_date,
ingested_at
)
SELECT
id,
title,
description,
content,
image,
content_status,
content_error,
content_attempted_at,
0,
url,
normalized_title,
source,
pub_date,
ingested_at
FROM articles;
DROP TABLE articles;
ALTER TABLE articles_rebuild RENAME TO articles;
COMMIT;
`);
}
rebuildArticlesTableIfNeeded();
db.exec(`
CREATE INDEX IF NOT EXISTS idx_articles_source ON articles(source);
CREATE INDEX IF NOT EXISTS idx_articles_pub_date ON articles(pub_date);
CREATE INDEX IF NOT EXISTS idx_articles_ingested_at ON articles(ingested_at);
CREATE INDEX IF NOT EXISTS idx_articles_normalized_title ON articles(normalized_title);
CREATE INDEX IF NOT EXISTS idx_articles_event_id ON articles(event_id);
CREATE INDEX IF NOT EXISTS idx_articles_has_embedding ON articles(has_embedding);
CREATE INDEX IF NOT EXISTS idx_articles_pub_date_effective ON articles(pub_date_effective DESC);
`);
db.exec(`
@@ -132,93 +62,22 @@ db.exec(`
);
`);
// vec0 table — fixed at 8192 dims to cover any model on openrouter, shorter embeddings get zero-padded
{
const existing = db.prepare(`SELECT sql FROM sqlite_master WHERE type = 'table' AND name = 'article_embeddings'`).get();
const currentDim = existing && existing.sql && existing.sql.match(/FLOAT\[(\d+)\]/);
const needsMigration = existing && (!currentDim || parseInt(currentDim[1], 10) !== 8192);
db.exec(`
CREATE VIRTUAL TABLE IF NOT EXISTS article_embeddings USING vec0(
article_id INTEGER PRIMARY KEY,
embedding FLOAT[8192]
);
`);
if (needsMigration) {
// save everything in vec0 to the store before dropping it, keyed by whatever model is in meta
try {
const BATCH = 500;
let offset = 0;
const fetchBatch = db.prepare(`
SELECT e.article_id, m.model, e.embedding
FROM article_embeddings e
JOIN article_embedding_meta m ON m.article_id = e.article_id
LIMIT ? OFFSET ?
`);
const insert = db.prepare(`
INSERT OR IGNORE INTO article_embedding_store (article_id, model, embedding)
VALUES (?, ?, ?)
`);
const insertMany = db.transaction((rows) => {
for (const row of rows) insert.run(row.article_id, row.model, row.embedding);
});
while (true) {
const rows = fetchBatch.all(BATCH, offset);
if (rows.length === 0) break;
insertMany(rows);
offset += rows.length;
if (rows.length < BATCH) break;
}
} catch (err) {
console.error('failed to rescue embeddings from vec0 before migration:', err);
}
db.exec(`DROP TABLE article_embeddings`);
db.exec(`DELETE FROM article_embedding_meta`);
}
if (!existing || needsMigration) {
db.exec(`
CREATE VIRTUAL TABLE article_embeddings USING vec0(
article_id INTEGER PRIMARY KEY,
embedding FLOAT[8192]
);
`);
}
}
// migrate query_embeddings to include model in primary key
{
const cols = db.prepare(`PRAGMA table_info(query_embeddings)`).all();
const hasModel = cols.some(c => c.name === 'model');
if (!hasModel) {
db.exec(`
BEGIN;
CREATE TABLE query_embeddings_new (
query TEXT NOT NULL,
model TEXT NOT NULL,
embedding BLOB NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
PRIMARY KEY (query, model)
);
DROP TABLE IF EXISTS query_embeddings;
ALTER TABLE query_embeddings_new RENAME TO query_embeddings;
COMMIT;
`);
} else {
db.exec(`
CREATE TABLE IF NOT EXISTS query_embeddings (
query TEXT NOT NULL,
model TEXT NOT NULL,
embedding BLOB NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
PRIMARY KEY (query, model)
);
`);
}
}
db.exec(`
CREATE TABLE IF NOT EXISTS query_embeddings (
query TEXT NOT NULL,
model TEXT NOT NULL,
embedding BLOB NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
PRIMARY KEY (query, model)
);
`);
db.exec(`
CREATE TABLE IF NOT EXISTS events (
@@ -228,22 +87,6 @@ db.exec(`
);
`);
for (const statement of [
'ALTER TABLE articles ADD COLUMN event_id INTEGER REFERENCES events(id)',
]) {
try {
db.exec(statement);
} catch (error) {
if (!String(error.message).includes('duplicate column name')) {
throw error;
}
}
}
db.exec(`
CREATE INDEX IF NOT EXISTS idx_articles_event_id ON articles(event_id);
`);
db.exec(`
CREATE TABLE IF NOT EXISTS gdelt_backfill_windows (
source_id TEXT NOT NULL,
@@ -287,9 +130,6 @@ db.exec(`
);
`);
// per-domain fetch policy — caches whether plain http or browser is needed
// so we dont waste a round trip on every article from a known js-only site.
// expires_at lets us re-probe domains that may have recovered
db.exec(`
CREATE TABLE IF NOT EXISTS domain_fetch_policy (
domain TEXT PRIMARY KEY,
@@ -303,30 +143,4 @@ db.exec(`
);
`);
for (const statement of [
'ALTER TABLE articles ADD COLUMN image TEXT',
'ALTER TABLE articles ADD COLUMN content_status TEXT',
'ALTER TABLE articles ADD COLUMN content_error TEXT',
'ALTER TABLE articles ADD COLUMN content_attempted_at TEXT',
'ALTER TABLE articles ADD COLUMN content_attempt_count INTEGER NOT NULL DEFAULT 0',
'ALTER TABLE articles ADD COLUMN content_retry_after TEXT',
'ALTER TABLE articles ADD COLUMN is_index_page INTEGER NOT NULL DEFAULT 0',
'ALTER TABLE articles ADD COLUMN has_embedding INTEGER NOT NULL DEFAULT 0',
'ALTER TABLE articles ADD COLUMN pub_date_effective TEXT',
'ALTER TABLE articles ADD COLUMN language TEXT'
]) {
try {
db.exec(statement);
} catch (error) {
if (!String(error.message).includes('duplicate column name')) {
throw error;
}
}
}
db.exec(`
CREATE INDEX IF NOT EXISTS idx_articles_has_embedding ON articles(has_embedding);
CREATE INDEX IF NOT EXISTS idx_articles_pub_date_effective ON articles(pub_date_effective DESC);
`);
module.exports = db;
module.exports = db;
+134
View File
@@ -2,6 +2,24 @@ const fs = require('fs');
const path = require('path');
const db = require('../db');
const config = require('../config');
const Database = require('better-sqlite3');
let idb = null;
function getIntelligenceDb() {
if (idb) return idb;
const configDir = path.resolve(__dirname, '..', '..');
const rawPath = process.env.INTELLIGENCE_DB
|| (config.intelligence_db
? (path.isAbsolute(config.intelligence_db) ? config.intelligence_db : path.resolve(configDir, config.intelligence_db))
: path.resolve(configDir, 'intelligence.sqlite'));
if (!fs.existsSync(rawPath)) return null;
idb = new Database(rawPath, { readonly: true });
return idb;
}
const adminUser = (config.admin && config.admin.username) || 'admin';
const adminPass = (config.admin && config.admin.password) || 'changeme';
@@ -204,6 +222,122 @@ async function adminRoutes(fastify) {
return { ok: true };
});
// intelligence endpoints
fastify.get('/admin/api/intelligence/stats', async (request, reply) => {
if (!checkAuth(request, reply)) return;
const db = getIntelligenceDb();
if (!db) return { available: false };
const queue = db.prepare(`SELECT status, COUNT(*) as n FROM article_queue GROUP BY status`).all();
const knowledge = db.prepare(`SELECT COUNT(*) as n FROM event_knowledge`).get().n;
const predictions = db.prepare(`SELECT COUNT(*) as n FROM event_predictions`).get().n;
const companies = db.prepare(`SELECT COUNT(*) as n FROM tracked_companies`).get().n;
const embeddings = db.prepare(`SELECT COUNT(*) as n FROM company_embeddings`).get().n;
return { available: true, queue, knowledge, predictions, companies, embeddings };
});
fastify.get('/admin/api/intelligence/companies', async (request, reply) => {
if (!checkAuth(request, reply)) return;
const db = getIntelligenceDb();
if (!db) return [];
return db.prepare(`SELECT * FROM tracked_companies ORDER BY name`).all();
});
fastify.get('/admin/api/intelligence/knowledge', async (request, reply) => {
if (!checkAuth(request, reply)) return;
const db = getIntelligenceDb();
if (!db) return { total: 0, rows: [] };
const q = request.query || {};
const limit = Math.min(parseInt(q.limit, 10) || 50, 200);
const offset = parseInt(q.offset, 10) || 0;
const companyId = q.company_id ? parseInt(q.company_id, 10) : null;
const type = q.type || null;
const conditions = [];
const params = [];
if (companyId) { conditions.push('ek.company_id = ?'); params.push(companyId); }
if (type) { conditions.push('ek.type = ?'); params.push(type); }
const where = conditions.length ? `WHERE ${conditions.join(' AND ')}` : '';
const total = db.prepare(`SELECT COUNT(*) as n FROM event_knowledge ek ${where}`).get(...params).n;
const rows = db.prepare(`
SELECT ek.id, ek.event_id, ek.type, ek.data, ek.created_at,
tc.name as company_name
FROM event_knowledge ek
JOIN tracked_companies tc ON tc.id = ek.company_id
${where}
ORDER BY ek.id DESC
LIMIT ? OFFSET ?
`).all(...params, limit, offset);
return { total, rows };
});
fastify.get('/admin/api/intelligence/predictions', async (request, reply) => {
if (!checkAuth(request, reply)) return;
const db = getIntelligenceDb();
if (!db) return { total: 0, rows: [] };
const q = request.query || {};
const limit = Math.min(parseInt(q.limit, 10) || 50, 200);
const offset = parseInt(q.offset, 10) || 0;
const companyId = q.company_id ? parseInt(q.company_id, 10) : null;
const conditions = [];
const params = [];
if (companyId) { conditions.push('ep.company_id = ?'); params.push(companyId); }
const where = conditions.length ? `WHERE ${conditions.join(' AND ')}` : '';
const total = db.prepare(`SELECT COUNT(*) as n FROM event_predictions ep ${where}`).get(...params).n;
const rows = db.prepare(`
SELECT ep.*, tc.name as company_name
FROM event_predictions ep
JOIN tracked_companies tc ON tc.id = ep.company_id
${where}
ORDER BY ep.id DESC
LIMIT ? OFFSET ?
`).all(...params, limit, offset);
return { total, rows };
});
// raw sql console
fastify.post('/admin/api/sql', async (request, reply) => {
if (!checkAuth(request, reply)) return;
const { sql, database } = request.body || {};
if (!sql || !sql.trim()) { reply.code(400); return { error: 'no sql provided' }; }
const target = database === 'intelligence' ? getIntelligenceDb() : db;
if (!target) { reply.code(400); return { error: 'database not available' }; }
try {
const stmt = target.prepare(sql);
const start = Date.now();
let rows, changes, lastInsertRowid;
if (stmt.reader) {
rows = stmt.all();
} else {
const info = stmt.run();
changes = info.changes;
lastInsertRowid = info.lastInsertRowid;
}
return {
rows: rows || null,
changes: changes ?? null,
lastInsertRowid: lastInsertRowid ?? null,
elapsed: Date.now() - start,
};
} catch (err) {
reply.code(400);
return { error: err.message };
}
});
// stats for dashboard header
fastify.get('/admin/api/stats', async (request, reply) => {
if (!checkAuth(request, reply)) return;
+20 -12
View File
@@ -1,26 +1,34 @@
const fs = require('fs');
const path = require('path');
const os = require('os');
const config = require('../config');
const db = require('../db');
async function devRoutes(fastify) {
if (!config.dev || !config.dev.enabled) return;
fastify.get('/dev/db/download', async (req, reply) => {
const dbPath = path.resolve(config.duriin_db || './archive.sqlite');
const tmpPath = path.join(os.tmpdir(), `duriin_snapshot_${Date.now()}.sqlite`);
if (!fs.existsSync(dbPath)) {
return reply.code(404).send({ error: 'database file not found' });
try {
// VACUUM INTO gives us a consistent, defragmented copy with no mid-write corruption
db.prepare(`VACUUM INTO ?`).run(tmpPath);
const stat = fs.statSync(tmpPath);
reply.header('Content-Type', 'application/octet-stream');
reply.header('Content-Disposition', 'attachment; filename="archive.sqlite"');
reply.header('Content-Length', stat.size);
const stream = fs.createReadStream(tmpPath);
stream.on('close', () => fs.unlink(tmpPath, () => {}));
return reply.send(stream);
} catch (err) {
fs.unlink(tmpPath, () => {});
throw err;
}
const stat = fs.statSync(dbPath);
const filename = path.basename(dbPath);
reply.header('Content-Type', 'application/octet-stream');
reply.header('Content-Disposition', `attachment; filename="${filename}"`);
reply.header('Content-Length', stat.size);
return reply.send(fs.createReadStream(dbPath));
});
}