Duriin-API/intelligence/queueFeeder.js

67 lines
1.8 KiB
JavaScript

// Pulls usable articles from duriin into article_queue continuously
// Usable = has content, has embedding, has event assignment
async function runQueueFeeder(archiveDb, intelligenceDb, config) {
const batchSize = config.workers?.queueFeederBatchSize ?? 100;
const loopDelay = config.workers?.queueFeederLoopDelayMs ?? 3000;
const getCursor = intelligenceDb.prepare(
"SELECT value FROM cursors WHERE key = 'queue_feeder'"
);
const setCursor = intelligenceDb.prepare(
"INSERT OR REPLACE INTO cursors (key, value) VALUES ('queue_feeder', ?)"
);
const insertQueued = intelligenceDb.prepare(`
INSERT OR IGNORE INTO article_queue (article_id, status, created_at)
VALUES (?, 'pending', CURRENT_TIMESTAMP)
`);
while (true) {
try {
const cursorRow = getCursor.get();
const cursor = cursorRow ? cursorRow.value : 0;
const articles = archiveDb.prepare(`
SELECT id FROM articles
WHERE id > ?
AND content IS NOT NULL
AND content != ''
AND has_embedding = 1
AND event_id IS NOT NULL
ORDER BY id ASC
LIMIT ?
`).all(cursor, batchSize);
if (articles.length === 0) {
await sleep(loopDelay);
continue;
}
let newCursor = cursor;
let inserted = 0;
for (const a of articles) {
const info = insertQueued.run(a.id);
if (info.changes > 0) inserted++;
if (a.id > newCursor) newCursor = a.id;
}
setCursor.run(newCursor);
if (inserted > 0) {
console.log(`[feeder] queued ${inserted} articles, cursor now ${newCursor}`);
}
} catch (err) {
console.error("[feeder] error:", err.message);
await sleep(loopDelay);
}
}
}
function sleep(ms) {
return new Promise(r => setTimeout(r, ms));
}
module.exports = { runQueueFeeder };