67 lines
1.8 KiB
JavaScript
67 lines
1.8 KiB
JavaScript
// Pulls usable articles from duriin into article_queue continuously
|
|
// Usable = has content, has embedding, has event assignment
|
|
|
|
async function runQueueFeeder(archiveDb, intelligenceDb, config) {
|
|
const batchSize = config.workers?.queueFeederBatchSize ?? 100;
|
|
const loopDelay = config.workers?.queueFeederLoopDelayMs ?? 3000;
|
|
|
|
const getCursor = intelligenceDb.prepare(
|
|
"SELECT value FROM cursors WHERE key = 'queue_feeder'"
|
|
);
|
|
const setCursor = intelligenceDb.prepare(
|
|
"INSERT OR REPLACE INTO cursors (key, value) VALUES ('queue_feeder', ?)"
|
|
);
|
|
|
|
const insertQueued = intelligenceDb.prepare(`
|
|
INSERT OR IGNORE INTO article_queue (article_id, status, created_at)
|
|
VALUES (?, 'pending', CURRENT_TIMESTAMP)
|
|
`);
|
|
|
|
while (true) {
|
|
try {
|
|
const cursorRow = getCursor.get();
|
|
const cursor = cursorRow ? cursorRow.value : 0;
|
|
|
|
const articles = archiveDb.prepare(`
|
|
SELECT id FROM articles
|
|
WHERE id > ?
|
|
AND content IS NOT NULL
|
|
AND content != ''
|
|
AND has_embedding = 1
|
|
AND event_id IS NOT NULL
|
|
ORDER BY id ASC
|
|
LIMIT ?
|
|
`).all(cursor, batchSize);
|
|
|
|
if (articles.length === 0) {
|
|
await sleep(loopDelay);
|
|
continue;
|
|
}
|
|
|
|
let newCursor = cursor;
|
|
let inserted = 0;
|
|
|
|
for (const a of articles) {
|
|
const info = insertQueued.run(a.id);
|
|
if (info.changes > 0) inserted++;
|
|
if (a.id > newCursor) newCursor = a.id;
|
|
}
|
|
|
|
setCursor.run(newCursor);
|
|
|
|
if (inserted > 0) {
|
|
console.log(`[feeder] queued ${inserted} articles, cursor now ${newCursor}`);
|
|
}
|
|
|
|
} catch (err) {
|
|
console.error("[feeder] error:", err.message);
|
|
await sleep(loopDelay);
|
|
}
|
|
}
|
|
}
|
|
|
|
function sleep(ms) {
|
|
return new Promise(r => setTimeout(r, ms));
|
|
}
|
|
|
|
module.exports = { runQueueFeeder };
|