// Pulls usable articles from duriin into article_queue continuously // Usable = has content, has embedding, has event assignment async function runQueueFeeder(archiveDb, intelligenceDb, config) { const batchSize = config.workers?.queueFeederBatchSize ?? 100; const loopDelay = config.workers?.queueFeederLoopDelayMs ?? 3000; const getCursor = intelligenceDb.prepare( "SELECT value FROM cursors WHERE key = 'queue_feeder'" ); const setCursor = intelligenceDb.prepare( "INSERT OR REPLACE INTO cursors (key, value) VALUES ('queue_feeder', ?)" ); const insertQueued = intelligenceDb.prepare(` INSERT OR IGNORE INTO article_queue (article_id, status, created_at) VALUES (?, 'pending', CURRENT_TIMESTAMP) `); while (true) { try { const cursorRow = getCursor.get(); const cursor = cursorRow ? cursorRow.value : 0; const articles = archiveDb.prepare(` SELECT id FROM articles WHERE id > ? AND content IS NOT NULL AND content != '' AND has_embedding = 1 AND event_id IS NOT NULL ORDER BY id ASC LIMIT ? `).all(cursor, batchSize); if (articles.length === 0) { await sleep(loopDelay); continue; } let newCursor = cursor; let inserted = 0; for (const a of articles) { const info = insertQueued.run(a.id); if (info.changes > 0) inserted++; if (a.id > newCursor) newCursor = a.id; } setCursor.run(newCursor); if (inserted > 0) { console.log(`[feeder] queued ${inserted} articles, cursor now ${newCursor}`); } } catch (err) { console.error("[feeder] error:", err.message); await sleep(loopDelay); } } } function sleep(ms) { return new Promise(r => setTimeout(r, ms)); } module.exports = { runQueueFeeder };