add intelligence worker and embedding generation for article processing
This commit is contained in:
parent
715172596f
commit
ac7c87c6cf
11 changed files with 754 additions and 16 deletions
|
|
@ -124,7 +124,7 @@ Without `id` — returns a paginated list of events. With `id` — returns a sin
|
|||
|
||||
```json
|
||||
[
|
||||
{ "id": 1, "title": "...", "created_at": "2025-01-01T12:35:10.000Z" }
|
||||
{ "id": 1, "title": "...", "pub_date": "2025-01-01T12:34:56.000Z" }
|
||||
]
|
||||
```
|
||||
|
||||
|
|
@ -134,7 +134,7 @@ Without `id` — returns a paginated list of events. With `id` — returns a sin
|
|||
{
|
||||
"id": 1,
|
||||
"title": "...",
|
||||
"created_at": "2025-01-01T12:35:10.000Z",
|
||||
"pub_date": "2025-01-01T12:34:56.000Z",
|
||||
"articles": [
|
||||
{
|
||||
"id": 123,
|
||||
|
|
|
|||
16
config.json
16
config.json
|
|
@ -1,4 +1,17 @@
|
|||
{
|
||||
"duriin_db": "/data/archive.sqlite",
|
||||
"intelligence_db": "/data/intelligence.sqlite",
|
||||
"llm": {
|
||||
"baseUrl": "https://openrouter.ai/api/v1",
|
||||
"model": "qwen/qwen3-235b-a22b-2507",
|
||||
"apiKey": "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"
|
||||
},
|
||||
"workers": {
|
||||
"relevanceBatchSize": 50,
|
||||
"relevanceLoopDelayMs": 2000,
|
||||
"extractionLoopDelayMs": 1000,
|
||||
"consolidationLoopDelayMs": 60000
|
||||
},
|
||||
"server": {
|
||||
"port": 3001,
|
||||
"host": "0.0.0.0"
|
||||
|
|
@ -61,6 +74,9 @@
|
|||
"browser": {
|
||||
"maxConcurrentPages": 8
|
||||
},
|
||||
"dev": {
|
||||
"enabled": true
|
||||
},
|
||||
"googleNews": {
|
||||
"queries": [
|
||||
"technology"
|
||||
|
|
|
|||
|
|
@ -12,6 +12,20 @@ services:
|
|||
networks:
|
||||
- nginx_proxy_manager_default
|
||||
|
||||
intelligence:
|
||||
build:
|
||||
context: .
|
||||
provenance: false
|
||||
command: node intelligence/index.js
|
||||
volumes:
|
||||
- ./config.json:/app/config.json:ro
|
||||
- ./data:/data
|
||||
environment:
|
||||
NODE_ENV: production
|
||||
restart: unless-stopped
|
||||
networks:
|
||||
- nginx_proxy_manager_default
|
||||
|
||||
networks:
|
||||
nginx_proxy_manager_default:
|
||||
external: true
|
||||
|
|
|
|||
220
intelligence/augorWorker.js
Normal file
220
intelligence/augorWorker.js
Normal file
|
|
@ -0,0 +1,220 @@
|
|||
const https = require("https");
|
||||
const http = require("http");
|
||||
|
||||
const { findMatchedCompaniesByEmbedding } = require("./embeddings");
|
||||
|
||||
async function runAugorWorker(archiveDb, intelligenceDb, config) {
|
||||
const loopDelay = config.workers?.augorLoopDelayMs ?? 1500;
|
||||
const llmConfig = config.llm || {};
|
||||
|
||||
const getPending = intelligenceDb.prepare(`
|
||||
SELECT * FROM article_queue WHERE status = 'pending' LIMIT 1
|
||||
`);
|
||||
|
||||
const setStatus = intelligenceDb.prepare(`
|
||||
UPDATE article_queue SET status = ?, updated_at = CURRENT_TIMESTAMP WHERE article_id = ?
|
||||
`);
|
||||
|
||||
const getEventArticleIds = archiveDb.prepare(
|
||||
"SELECT id FROM articles WHERE event_id = ?"
|
||||
);
|
||||
|
||||
const setStatusByArticleId = intelligenceDb.prepare(`
|
||||
UPDATE article_queue SET status = 'processed', updated_at = CURRENT_TIMESTAMP
|
||||
WHERE article_id = ? AND status = 'pending'
|
||||
`);
|
||||
|
||||
const deleteKnowledge = intelligenceDb.prepare(
|
||||
"DELETE FROM event_knowledge WHERE event_id = ?"
|
||||
);
|
||||
const deletePredictions = intelligenceDb.prepare(
|
||||
"DELETE FROM event_predictions WHERE event_id = ?"
|
||||
);
|
||||
|
||||
const insertKnowledge = intelligenceDb.prepare(`
|
||||
INSERT INTO event_knowledge (event_id, company_id, type, data)
|
||||
VALUES (?, ?, ?, ?)
|
||||
`);
|
||||
const insertPrediction = intelligenceDb.prepare(`
|
||||
INSERT INTO event_predictions (event_id, company_id, type, direction, magnitude, timeframe, rationale)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
`);
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
const queueRow = getPending.get();
|
||||
|
||||
if (!queueRow) {
|
||||
await sleep(loopDelay);
|
||||
continue;
|
||||
}
|
||||
|
||||
const article = archiveDb.prepare(`
|
||||
SELECT id, event_id, content, has_embedding
|
||||
FROM articles WHERE id = ?
|
||||
`).get(queueRow.article_id);
|
||||
|
||||
if (!article || !article.content || !article.has_embedding || !article.event_id) {
|
||||
setStatus.run("skipped", queueRow.article_id);
|
||||
continue;
|
||||
}
|
||||
|
||||
const eventId = article.event_id;
|
||||
|
||||
const event = archiveDb.prepare("SELECT * FROM events WHERE id = ?").get(eventId);
|
||||
if (!event) {
|
||||
setStatus.run("skipped", queueRow.article_id);
|
||||
continue;
|
||||
}
|
||||
|
||||
const eventArticles = archiveDb.prepare(`
|
||||
SELECT id, title, description, content
|
||||
FROM articles
|
||||
WHERE event_id = ? AND content IS NOT NULL AND content != ''
|
||||
ORDER BY id ASC
|
||||
LIMIT 25
|
||||
`).all(eventId);
|
||||
|
||||
const eventArticleIds = eventArticles.map(a => a.id);
|
||||
|
||||
const matchedCompanies = findMatchedCompaniesByEmbedding(
|
||||
eventArticleIds, archiveDb, intelligenceDb, config
|
||||
);
|
||||
|
||||
if (matchedCompanies.length === 0) {
|
||||
for (const r of getEventArticleIds.all(eventId)) setStatusByArticleId.run(r.id);
|
||||
console.log(`[augor] event ${eventId} — no company match, skipped`);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
deleteKnowledge.run(eventId);
|
||||
deletePredictions.run(eventId);
|
||||
|
||||
const articleText = eventArticles.map((a, i) => {
|
||||
const body = (a.content || a.description || "").slice(0, 2000);
|
||||
return `[Article ${i + 1}] ${a.title}\n${body}`;
|
||||
}).join("\n\n---\n\n");
|
||||
|
||||
for (const company of matchedCompanies) {
|
||||
try {
|
||||
const result = await callLlm(llmConfig, buildPrompt(company.name, event.title, articleText));
|
||||
|
||||
if (result) {
|
||||
const writeAll = intelligenceDb.transaction(() => {
|
||||
for (const r of (result.knowledge?.relationships || [])) {
|
||||
insertKnowledge.run(eventId, company.id, "relationship", JSON.stringify(r));
|
||||
}
|
||||
for (const t of (result.knowledge?.themes || [])) {
|
||||
insertKnowledge.run(eventId, company.id, "theme", JSON.stringify(t));
|
||||
}
|
||||
for (const f of (result.knowledge?.factors || [])) {
|
||||
insertKnowledge.run(eventId, company.id, "factor", JSON.stringify(f));
|
||||
}
|
||||
|
||||
for (const p of (result.predictions || [])) {
|
||||
insertPrediction.run(eventId, company.id, p.type, p.direction, p.magnitude, p.timeframe, p.rationale);
|
||||
}
|
||||
});
|
||||
|
||||
writeAll();
|
||||
}
|
||||
|
||||
} catch (llmErr) {
|
||||
console.error(`[augor] LLM error for ${company.name} on event ${eventId}:`, llmErr.message);
|
||||
}
|
||||
}
|
||||
|
||||
setStatusByEventArticles.run(eventId);
|
||||
console.log(`[augor] processed event ${eventId} (${matchedCompanies.length} companies, ${eventArticles.length} articles)`);
|
||||
|
||||
} catch (err) {
|
||||
console.error("[augor] error:", err.message);
|
||||
await sleep(loopDelay);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function buildPrompt(companyName, eventTitle, articleText) {
|
||||
return `You are a financial intelligence analyst. Extract structured knowledge about ${companyName} from these news articles.
|
||||
|
||||
Event: ${eventTitle}
|
||||
|
||||
${articleText}
|
||||
|
||||
Return JSON only — no explanation. Shape:
|
||||
{
|
||||
"knowledge": {
|
||||
"relationships": [
|
||||
{ "type": "supplier|customer|competitor", "entity": "string", "confidence": "high|medium|low", "evidence": "string" }
|
||||
],
|
||||
"themes": [
|
||||
{ "theme": "string", "direction": "increasing|stable|decreasing", "evidence": "string" }
|
||||
],
|
||||
"factors": [
|
||||
{ "factor": "string", "relationship": "string", "evidence": "string" }
|
||||
]
|
||||
},
|
||||
"predictions": [
|
||||
{ "type": "market_share|stock_price|competitive_position|other", "direction": "positive|negative|neutral", "magnitude": "high|medium|low", "timeframe": "short|medium|long", "rationale": "string" }
|
||||
]
|
||||
}
|
||||
|
||||
Only include claims directly supported by the articles. Use empty arrays if nothing applies.`;
|
||||
}
|
||||
|
||||
async function callLlm(llmConfig, prompt) {
|
||||
const body = JSON.stringify({
|
||||
model: llmConfig.model || "gpt-4o-mini",
|
||||
messages: [{ role: "user", content: prompt }],
|
||||
temperature: 0.1,
|
||||
response_format: { type: "json_object" },
|
||||
});
|
||||
|
||||
const baseUrl = llmConfig.baseUrl || "https://api.openai.com";
|
||||
const url = new URL("/v1/chat/completions", baseUrl);
|
||||
|
||||
const responseText = await httpPost(url, body, {
|
||||
"Content-Type": "application/json",
|
||||
"Authorization": `Bearer ${llmConfig.apiKey || ""}`,
|
||||
});
|
||||
|
||||
const parsed = JSON.parse(responseText);
|
||||
const content = parsed.choices?.[0]?.message?.content;
|
||||
if (!content) return null;
|
||||
|
||||
return JSON.parse(content);
|
||||
}
|
||||
|
||||
function httpPost(url, body, headers) {
|
||||
return new Promise((resolve, reject) => {
|
||||
const lib = url.protocol === "https:" ? https : http;
|
||||
const req = lib.request({
|
||||
hostname: url.hostname,
|
||||
port: url.port || (url.protocol === "https:" ? 443 : 80),
|
||||
path: url.pathname + url.search,
|
||||
method: "POST",
|
||||
headers: { ...headers, "Content-Length": Buffer.byteLength(body) },
|
||||
}, (res) => {
|
||||
let data = "";
|
||||
res.on("data", chunk => data += chunk);
|
||||
res.on("end", () => {
|
||||
if (res.statusCode >= 200 && res.statusCode < 300) {
|
||||
resolve(data);
|
||||
} else {
|
||||
reject(new Error(`LLM ${res.statusCode}: ${data.slice(0, 300)}`));
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
req.on("error", reject);
|
||||
req.write(body);
|
||||
req.end();
|
||||
});
|
||||
}
|
||||
|
||||
function sleep(ms) {
|
||||
return new Promise(r => setTimeout(r, ms));
|
||||
}
|
||||
|
||||
module.exports = { runAugorWorker };
|
||||
97
intelligence/db.js
Normal file
97
intelligence/db.js
Normal file
|
|
@ -0,0 +1,97 @@
|
|||
const Database = require("better-sqlite3");
|
||||
|
||||
let archiveDb = null;
|
||||
let intelligenceDb = null;
|
||||
|
||||
function getArchiveDb(dbPath) {
|
||||
if (!archiveDb) {
|
||||
archiveDb = new Database(dbPath, { readonly: true });
|
||||
archiveDb.pragma("journal_mode = WAL");
|
||||
}
|
||||
return archiveDb;
|
||||
}
|
||||
|
||||
function getIntelligenceDb(dbPath) {
|
||||
if (!intelligenceDb) {
|
||||
intelligenceDb = new Database(dbPath);
|
||||
intelligenceDb.pragma("journal_mode = WAL");
|
||||
}
|
||||
return intelligenceDb;
|
||||
}
|
||||
|
||||
function runMigrations(db) {
|
||||
db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS cursors (
|
||||
key TEXT PRIMARY KEY,
|
||||
value INTEGER
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS tracked_companies (
|
||||
id INTEGER PRIMARY KEY,
|
||||
name TEXT,
|
||||
ticker TEXT,
|
||||
aliases TEXT
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS article_queue (
|
||||
id INTEGER PRIMARY KEY,
|
||||
article_id INTEGER UNIQUE,
|
||||
status TEXT DEFAULT 'pending',
|
||||
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at DATETIME
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS event_knowledge (
|
||||
id INTEGER PRIMARY KEY,
|
||||
event_id INTEGER,
|
||||
company_id INTEGER,
|
||||
type TEXT,
|
||||
data TEXT,
|
||||
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS company_embeddings (
|
||||
company_id INTEGER PRIMARY KEY,
|
||||
embedding BLOB NOT NULL,
|
||||
model TEXT NOT NULL,
|
||||
generated_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS event_predictions (
|
||||
id INTEGER PRIMARY KEY,
|
||||
event_id INTEGER,
|
||||
company_id INTEGER,
|
||||
type TEXT,
|
||||
direction TEXT,
|
||||
magnitude TEXT,
|
||||
timeframe TEXT,
|
||||
rationale TEXT,
|
||||
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
`);
|
||||
}
|
||||
|
||||
function seedCompanies(db) {
|
||||
const count = db.prepare("SELECT COUNT(*) as c FROM tracked_companies").get().c;
|
||||
if (count > 0) return;
|
||||
|
||||
const insert = db.prepare(
|
||||
"INSERT INTO tracked_companies (name, ticker, aliases) VALUES (?, ?, ?)"
|
||||
);
|
||||
|
||||
const companies = [
|
||||
{ name: "NVIDIA", ticker: "NVDA", aliases: ["Nvidia Corporation", "NVDA"] },
|
||||
{ name: "TSMC", ticker: "TSM", aliases: ["Taiwan Semiconductor", "Taiwan Semiconductor Manufacturing"] },
|
||||
{ name: "ASML", ticker: "ASML", aliases: ["ASML Holding", "ASML Holdings"] },
|
||||
{ name: "Intel", ticker: "INTC", aliases: ["Intel Corporation"] },
|
||||
{ name: "Samsung", ticker: "005930.KS", aliases: ["Samsung Electronics", "Samsung Group"] },
|
||||
];
|
||||
|
||||
for (const c of companies) {
|
||||
insert.run(c.name, c.ticker, JSON.stringify(c.aliases));
|
||||
}
|
||||
|
||||
console.log(`[db] seeded ${companies.length} tracked companies`);
|
||||
}
|
||||
|
||||
module.exports = { getArchiveDb, getIntelligenceDb, runMigrations, seedCompanies };
|
||||
134
intelligence/embeddings.js
Normal file
134
intelligence/embeddings.js
Normal file
|
|
@ -0,0 +1,134 @@
|
|||
// embedding generation and cosine similarity for the intelligence layer
|
||||
|
||||
async function generateEmbedding(text, openRouterConfig) {
|
||||
const response = await fetch("https://openrouter.ai/api/v1/embeddings", {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Authorization": `Bearer ${openRouterConfig.apiKey}`,
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
body: JSON.stringify({
|
||||
model: openRouterConfig.embeddingModel,
|
||||
input: text,
|
||||
}),
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
let msg = `embedding request failed with ${response.status}`;
|
||||
try {
|
||||
const payload = await response.json();
|
||||
if (payload?.error?.message) msg = payload.error.message;
|
||||
} catch (_) {}
|
||||
throw new Error(msg);
|
||||
}
|
||||
|
||||
const payload = await response.json();
|
||||
const embedding = payload?.data?.[0]?.embedding;
|
||||
if (!Array.isArray(embedding) || embedding.length === 0) {
|
||||
throw new Error("invalid embedding response");
|
||||
}
|
||||
|
||||
return embedding;
|
||||
}
|
||||
|
||||
// Float32 BLOB -> Float32Array
|
||||
function blobToFloat32(buf) {
|
||||
return new Float32Array(buf.buffer, buf.byteOffset, buf.byteLength / 4);
|
||||
}
|
||||
|
||||
function cosineSimilarity(a, b) {
|
||||
if (a.length !== b.length) {
|
||||
// if dims differ just use the shorter length — handles edge cases gracefully
|
||||
const len = Math.min(a.length, b.length);
|
||||
a = a.subarray(0, len);
|
||||
b = b.subarray(0, len);
|
||||
}
|
||||
|
||||
let dot = 0, normA = 0, normB = 0;
|
||||
for (let i = 0; i < a.length; i++) {
|
||||
dot += a[i] * b[i];
|
||||
normA += a[i] * a[i];
|
||||
normB += b[i] * b[i];
|
||||
}
|
||||
|
||||
const denom = Math.sqrt(normA) * Math.sqrt(normB);
|
||||
return denom === 0 ? 0 : dot / denom;
|
||||
}
|
||||
|
||||
// generates company embeddings for any tracked company that doesnt have one yet
|
||||
async function ensureCompanyEmbeddings(intelligenceDb, openRouterConfig) {
|
||||
const companies = intelligenceDb.prepare("SELECT * FROM tracked_companies").all();
|
||||
|
||||
const getEmbed = intelligenceDb.prepare(
|
||||
"SELECT embedding FROM company_embeddings WHERE company_id = ?"
|
||||
);
|
||||
const upsertEmbed = intelligenceDb.prepare(`
|
||||
INSERT INTO company_embeddings (company_id, embedding, model, generated_at)
|
||||
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
|
||||
ON CONFLICT(company_id) DO UPDATE SET
|
||||
embedding = excluded.embedding,
|
||||
model = excluded.model,
|
||||
generated_at = excluded.generated_at
|
||||
`);
|
||||
|
||||
for (const company of companies) {
|
||||
const existing = getEmbed.get(company.id);
|
||||
if (existing) continue;
|
||||
|
||||
const text = `${company.name} is a company with ticker ${company.ticker}`;
|
||||
try {
|
||||
const embedding = await generateEmbedding(text, openRouterConfig);
|
||||
const buf = Buffer.from(new Float32Array(embedding).buffer);
|
||||
upsertEmbed.run(company.id, buf, openRouterConfig.embeddingModel);
|
||||
console.log(`[embeddings] generated embedding for ${company.name}`);
|
||||
} catch (err) {
|
||||
console.error(`[embeddings] failed for ${company.name}:`, err.message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// returns matched company objects from tracked_companies
|
||||
// checks cosine similarity between each company embedding and
|
||||
// the raw embeddings of all articles in the event
|
||||
function findMatchedCompaniesByEmbedding(eventArticleIds, archiveDb, intelligenceDb, config) {
|
||||
const threshold = config.intelligence?.similarityThreshold ?? 0.35;
|
||||
const model = config.openRouter?.embeddingModel;
|
||||
|
||||
const companies = intelligenceDb.prepare(
|
||||
"SELECT id, name, ticker FROM company_embeddings ce JOIN tracked_companies tc ON tc.id = ce.company_id"
|
||||
).all();
|
||||
|
||||
if (companies.length === 0) return [];
|
||||
|
||||
// load article embeddings from archive — only articles that have one
|
||||
const articleEmbeddings = [];
|
||||
for (const articleId of eventArticleIds) {
|
||||
const row = archiveDb.prepare(
|
||||
"SELECT embedding FROM article_embedding_store WHERE article_id = ? AND model = ?"
|
||||
).get(articleId, model);
|
||||
if (row) articleEmbeddings.push(blobToFloat32(row.embedding));
|
||||
}
|
||||
|
||||
if (articleEmbeddings.length === 0) return [];
|
||||
|
||||
const matched = [];
|
||||
for (const company of companies) {
|
||||
const companyRow = intelligenceDb.prepare(
|
||||
"SELECT embedding FROM company_embeddings WHERE company_id = ?"
|
||||
).get(company.id);
|
||||
if (!companyRow) continue;
|
||||
|
||||
const companyVec = blobToFloat32(companyRow.embedding);
|
||||
const hit = articleEmbeddings.some(articleVec => {
|
||||
const sim = cosineSimilarity(companyVec, articleVec);
|
||||
return sim >= threshold;
|
||||
});
|
||||
|
||||
if (hit) matched.push(company);
|
||||
}
|
||||
|
||||
return matched;
|
||||
}
|
||||
|
||||
module.exports = { generateEmbedding, ensureCompanyEmbeddings, findMatchedCompaniesByEmbedding };
|
||||
62
intelligence/index.js
Normal file
62
intelligence/index.js
Normal file
|
|
@ -0,0 +1,62 @@
|
|||
const fs = require("fs");
|
||||
const path = require("path");
|
||||
|
||||
const { getArchiveDb, getIntelligenceDb, runMigrations, seedCompanies } = require("./db");
|
||||
const { runQueueFeeder } = require("./queueFeeder");
|
||||
const { runAugorWorker } = require("./augorWorker");
|
||||
const { ensureCompanyEmbeddings } = require("./embeddings");
|
||||
|
||||
|
||||
const configPath = path.resolve(__dirname, "../config.json");
|
||||
const configDir = path.dirname(configPath);
|
||||
const rawConfig = JSON.parse(fs.readFileSync(configPath, "utf8"));
|
||||
|
||||
function resolvePath(p, fallback) {
|
||||
if (!p) return fallback;
|
||||
return path.isAbsolute(p) ? p : path.resolve(configDir, p);
|
||||
}
|
||||
|
||||
const config = {
|
||||
duriin_db: resolvePath(rawConfig.duriin_db, path.resolve(configDir, "archive.sqlite")),
|
||||
intelligence_db: resolvePath(rawConfig.intelligence_db, path.resolve(configDir, "intelligence.sqlite")),
|
||||
llm: rawConfig.llm || {},
|
||||
workers: rawConfig.workers || {},
|
||||
openRouter: rawConfig.openRouter || {},
|
||||
intelligence: rawConfig.intelligence || {},
|
||||
};
|
||||
|
||||
console.log("[intelligence] starting up");
|
||||
console.log(`[intelligence] archive: ${config.duriin_db}`);
|
||||
console.log(`[intelligence] intelligence: ${config.intelligence_db}`);
|
||||
|
||||
const archiveDb = getArchiveDb(config.duriin_db);
|
||||
const intelligenceDb = getIntelligenceDb(config.intelligence_db);
|
||||
|
||||
runMigrations(intelligenceDb);
|
||||
seedCompanies(intelligenceDb);
|
||||
|
||||
ensureCompanyEmbeddings(intelligenceDb, config.openRouter).then(() => {
|
||||
console.log("[intelligence] company embeddings ready");
|
||||
}).catch(err => {
|
||||
console.error("[intelligence] company embedding generation failed:", err.message);
|
||||
});
|
||||
|
||||
runQueueFeeder(archiveDb, intelligenceDb, config).catch(err => {
|
||||
console.error("[feeder] fatal:", err);
|
||||
process.exit(1);
|
||||
});
|
||||
|
||||
runAugorWorker(archiveDb, intelligenceDb, config).catch(err => {
|
||||
console.error("[augor] fatal:", err);
|
||||
process.exit(1);
|
||||
});
|
||||
|
||||
process.on("SIGINT", () => {
|
||||
console.log("[intelligence] shutting down");
|
||||
process.exit(0);
|
||||
});
|
||||
|
||||
process.on("SIGTERM", () => {
|
||||
console.log("[intelligence] shutting down");
|
||||
process.exit(0);
|
||||
});
|
||||
67
intelligence/queueFeeder.js
Normal file
67
intelligence/queueFeeder.js
Normal file
|
|
@ -0,0 +1,67 @@
|
|||
// Pulls usable articles from duriin into article_queue continuously
|
||||
// Usable = has content, has embedding, has event assignment
|
||||
|
||||
async function runQueueFeeder(archiveDb, intelligenceDb, config) {
|
||||
const batchSize = config.workers?.queueFeederBatchSize ?? 100;
|
||||
const loopDelay = config.workers?.queueFeederLoopDelayMs ?? 3000;
|
||||
|
||||
const getCursor = intelligenceDb.prepare(
|
||||
"SELECT value FROM cursors WHERE key = 'queue_feeder'"
|
||||
);
|
||||
const setCursor = intelligenceDb.prepare(
|
||||
"INSERT OR REPLACE INTO cursors (key, value) VALUES ('queue_feeder', ?)"
|
||||
);
|
||||
|
||||
const insertQueued = intelligenceDb.prepare(`
|
||||
INSERT OR IGNORE INTO article_queue (article_id, status, created_at)
|
||||
VALUES (?, 'pending', CURRENT_TIMESTAMP)
|
||||
`);
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
const cursorRow = getCursor.get();
|
||||
const cursor = cursorRow ? cursorRow.value : 0;
|
||||
|
||||
const articles = archiveDb.prepare(`
|
||||
SELECT id FROM articles
|
||||
WHERE id > ?
|
||||
AND content IS NOT NULL
|
||||
AND content != ''
|
||||
AND has_embedding = 1
|
||||
AND event_id IS NOT NULL
|
||||
ORDER BY id ASC
|
||||
LIMIT ?
|
||||
`).all(cursor, batchSize);
|
||||
|
||||
if (articles.length === 0) {
|
||||
await sleep(loopDelay);
|
||||
continue;
|
||||
}
|
||||
|
||||
let newCursor = cursor;
|
||||
let inserted = 0;
|
||||
|
||||
for (const a of articles) {
|
||||
const info = insertQueued.run(a.id);
|
||||
if (info.changes > 0) inserted++;
|
||||
if (a.id > newCursor) newCursor = a.id;
|
||||
}
|
||||
|
||||
setCursor.run(newCursor);
|
||||
|
||||
if (inserted > 0) {
|
||||
console.log(`[feeder] queued ${inserted} articles, cursor now ${newCursor}`);
|
||||
}
|
||||
|
||||
} catch (err) {
|
||||
console.error("[feeder] error:", err.message);
|
||||
await sleep(loopDelay);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function sleep(ms) {
|
||||
return new Promise(r => setTimeout(r, ms));
|
||||
}
|
||||
|
||||
module.exports = { runQueueFeeder };
|
||||
|
|
@ -5,6 +5,7 @@ const statusRoutes = require('./src/routes/status');
|
|||
const sourcesRoutes = require('./src/routes/sources');
|
||||
const eventRoutes = require('./src/routes/events');
|
||||
const adminRoutes = require('./src/routes/admin');
|
||||
const devRoutes = require('./src/routes/dev');
|
||||
const config = require('./src/config');
|
||||
const { startScheduler } = require('./src/scheduler');
|
||||
|
||||
|
|
@ -16,6 +17,7 @@ app.register(statusRoutes);
|
|||
app.register(sourcesRoutes);
|
||||
app.register(eventRoutes);
|
||||
app.register(adminRoutes);
|
||||
app.register(devRoutes);
|
||||
|
||||
app.get('/', async () => ({ ok: true }));
|
||||
|
||||
|
|
|
|||
27
src/routes/dev.js
Normal file
27
src/routes/dev.js
Normal file
|
|
@ -0,0 +1,27 @@
|
|||
const fs = require('fs');
|
||||
const path = require('path');
|
||||
const config = require('../config');
|
||||
|
||||
|
||||
async function devRoutes(fastify) {
|
||||
if (!config.dev || !config.dev.enabled) return;
|
||||
|
||||
fastify.get('/dev/db/download', async (req, reply) => {
|
||||
const dbPath = path.resolve(config.duriin_db || './archive.sqlite');
|
||||
|
||||
if (!fs.existsSync(dbPath)) {
|
||||
return reply.code(404).send({ error: 'database file not found' });
|
||||
}
|
||||
|
||||
const stat = fs.statSync(dbPath);
|
||||
const filename = path.basename(dbPath);
|
||||
|
||||
reply.header('Content-Type', 'application/octet-stream');
|
||||
reply.header('Content-Disposition', `attachment; filename="${filename}"`);
|
||||
reply.header('Content-Length', stat.size);
|
||||
|
||||
return reply.send(fs.createReadStream(dbPath));
|
||||
});
|
||||
}
|
||||
|
||||
module.exports = devRoutes;
|
||||
|
|
@ -1,4 +1,5 @@
|
|||
const db = require('../db');
|
||||
const { findArticlesByEmbedding, getOrCreateQueryEmbedding } = require('../embeddings');
|
||||
|
||||
function parseLimit(value) {
|
||||
const n = Number.parseInt(value, 10);
|
||||
|
|
@ -10,10 +11,88 @@ function parseOffset(value) {
|
|||
return Number.isFinite(n) && n >= 0 ? n : 0;
|
||||
}
|
||||
|
||||
const getArticlesForEvent = db.prepare(`
|
||||
SELECT id, title, description, content, url, normalized_title, source, pub_date, ingested_at
|
||||
FROM articles
|
||||
WHERE event_id = ?
|
||||
AND content IS NOT NULL AND content != ''
|
||||
AND is_index_page = 0
|
||||
ORDER BY pub_date_effective DESC, id DESC
|
||||
`);
|
||||
|
||||
function fetchEventsByIds(ids) {
|
||||
if (ids.length === 0) return [];
|
||||
|
||||
const placeholders = ids.map(() => '?').join(', ');
|
||||
const rows = db.prepare(`
|
||||
SELECT e.id, e.title,
|
||||
(SELECT MIN(a.pub_date_effective) FROM articles a WHERE a.event_id = e.id AND a.content IS NOT NULL AND a.content != '' AND a.is_index_page = 0) AS pub_date
|
||||
FROM events e
|
||||
WHERE e.id IN (${placeholders})
|
||||
`).all(...ids);
|
||||
|
||||
const byId = new Map(rows.map(e => [e.id, e]));
|
||||
|
||||
// preserve the caller-supplied order (distance order for semantic, sort order for keyword)
|
||||
return ids.map(id => {
|
||||
const e = byId.get(id);
|
||||
return e ? { ...e, articles: getArticlesForEvent.all(e.id) } : null;
|
||||
}).filter(Boolean);
|
||||
}
|
||||
|
||||
async function eventRoutes(fastify) {
|
||||
fastify.get('/events', async (request, reply) => {
|
||||
const query = request.query || {};
|
||||
const limit = parseLimit(query.limit);
|
||||
const offset = parseOffset(query.offset);
|
||||
|
||||
// semantic path: embed query → find nearest articles → resolve to events
|
||||
if (query.semantic !== undefined) {
|
||||
const embedding = await getOrCreateQueryEmbedding(query.semantic);
|
||||
if (!embedding) {
|
||||
reply.code(400);
|
||||
return { error: 'Semantic query must not be empty' };
|
||||
}
|
||||
|
||||
// fetch more article candidates than we need so we have enough after dedup
|
||||
const neighbors = findArticlesByEmbedding(embedding, Math.min(limit * 20, 2000));
|
||||
const neighborIds = neighbors.map(n => n.articleId);
|
||||
if (neighborIds.length === 0) return [];
|
||||
|
||||
const placeholders = neighborIds.map(() => '?').join(', ');
|
||||
const conditions = [
|
||||
`id IN (${placeholders})`,
|
||||
'event_id IS NOT NULL',
|
||||
"content IS NOT NULL AND content != ''",
|
||||
'is_index_page = 0',
|
||||
];
|
||||
const params = [...neighborIds];
|
||||
|
||||
if (query.from) { conditions.push('pub_date_effective >= ?'); params.push(query.from); }
|
||||
if (query.to) { conditions.push('pub_date_effective <= ?'); params.push(query.to); }
|
||||
|
||||
const articleRows = db.prepare(
|
||||
`SELECT id, event_id FROM articles WHERE ${conditions.join(' AND ')}`
|
||||
).all(...params);
|
||||
|
||||
const articleEventMap = new Map(articleRows.map(r => [r.id, r.event_id]));
|
||||
|
||||
// walk neighbors in distance order and collect unique event IDs
|
||||
const seen = new Set();
|
||||
const orderedEventIds = [];
|
||||
for (const id of neighborIds) {
|
||||
const eventId = articleEventMap.get(id);
|
||||
if (eventId != null && !seen.has(eventId)) {
|
||||
seen.add(eventId);
|
||||
orderedEventIds.push(eventId);
|
||||
}
|
||||
}
|
||||
|
||||
return fetchEventsByIds(orderedEventIds.slice(offset, offset + limit));
|
||||
}
|
||||
|
||||
|
||||
// keyword / date filter path
|
||||
const conditions = [];
|
||||
const params = [];
|
||||
|
||||
|
|
@ -27,17 +106,46 @@ async function eventRoutes(fastify) {
|
|||
params.push(id);
|
||||
}
|
||||
|
||||
const limit = parseLimit(query.limit);
|
||||
const offset = parseOffset(query.offset);
|
||||
if (query.keyword) {
|
||||
const keywords = [].concat(query.keyword).map(k => k.trim()).filter(Boolean);
|
||||
const mode = String(query.keyword_mode || '').toLowerCase() === 'or' ? 'OR' : 'AND';
|
||||
const clauses = keywords.map(() => '(a.title LIKE ? OR a.description LIKE ? OR a.content LIKE ?)');
|
||||
|
||||
conditions.push(`EXISTS (
|
||||
SELECT 1 FROM articles a
|
||||
WHERE a.event_id = e.id
|
||||
AND a.content IS NOT NULL AND a.content != ''
|
||||
AND a.is_index_page = 0
|
||||
AND (${clauses.join(` ${mode} `)})
|
||||
)`);
|
||||
|
||||
for (const kw of keywords) {
|
||||
const like = `%${kw}%`;
|
||||
params.push(like, like, like);
|
||||
}
|
||||
}
|
||||
|
||||
if (query.from) {
|
||||
conditions.push(`EXISTS (
|
||||
SELECT 1 FROM articles a WHERE a.event_id = e.id AND a.pub_date_effective >= ?
|
||||
)`);
|
||||
params.push(query.from);
|
||||
}
|
||||
|
||||
if (query.to) {
|
||||
conditions.push(`EXISTS (
|
||||
SELECT 1 FROM articles a WHERE a.event_id = e.id AND a.pub_date_effective <= ?
|
||||
)`);
|
||||
params.push(query.to);
|
||||
}
|
||||
|
||||
const SORT_COLUMNS = {
|
||||
pub_date: '(SELECT MIN(a.pub_date_effective) FROM articles a WHERE a.event_id = e.id AND a.content IS NOT NULL AND a.content != \'\' AND a.is_index_page = 0)',
|
||||
pub_date: "(SELECT MIN(a.pub_date_effective) FROM articles a WHERE a.event_id = e.id AND a.content IS NOT NULL AND a.content != '' AND a.is_index_page = 0)",
|
||||
id: 'e.id',
|
||||
};
|
||||
|
||||
const sortBy = SORT_COLUMNS[query.sort_by] || SORT_COLUMNS.pub_date;
|
||||
const order = String(query.order || '').toLowerCase() === 'asc' ? 'ASC' : 'DESC';
|
||||
|
||||
const where = conditions.length ? `WHERE ${conditions.join(' AND ')}` : '';
|
||||
|
||||
const events = db.prepare(`
|
||||
|
|
@ -49,16 +157,7 @@ async function eventRoutes(fastify) {
|
|||
LIMIT ? OFFSET ?
|
||||
`).all(...params, limit, offset);
|
||||
|
||||
const getArticles = db.prepare(`
|
||||
SELECT id, title, description, content, url, normalized_title, source, pub_date, ingested_at
|
||||
FROM articles
|
||||
WHERE event_id = ?
|
||||
AND content IS NOT NULL AND content != ''
|
||||
AND is_index_page = 0
|
||||
ORDER BY pub_date_effective DESC, id DESC
|
||||
`);
|
||||
|
||||
return events.map(e => ({ ...e, articles: getArticles.all(e.id) }));
|
||||
return events.map(e => ({ ...e, articles: getArticlesForEvent.all(e.id) }));
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue