initialize project structure with core modules and configuration

This commit is contained in:
ImBenji 2026-04-16 22:47:34 +01:00
parent 82d856a21d
commit 7724fafbdc
20 changed files with 3644 additions and 0 deletions

14
.gitignore vendored Normal file
View file

@ -0,0 +1,14 @@
node_modules/
.env
.env.*
config.json
*.sqlite
*.sqlite-shm
*.sqlite-wal
.idea/
.DS_Store
.the_agency/

2133
package-lock.json generated Normal file

File diff suppressed because it is too large Load diff

22
package.json Normal file
View file

@ -0,0 +1,22 @@
{
"name": "duriin_api",
"version": "1.0.0",
"description": "News ingestion archive server",
"main": "server.js",
"scripts": {
"start": "node server.js"
},
"keywords": [],
"author": "",
"license": "ISC",
"type": "commonjs",
"dependencies": {
"@extractus/article-extractor": "^8.0.18",
"better-sqlite3": "^12.4.1",
"fastify": "^5.6.1",
"node-cron": "^4.2.1",
"rss-parser": "^3.13.0",
"sharp": "^0.34.5",
"sqlite-vec": "^0.1.9"
}
}

27
server.js Normal file
View file

@ -0,0 +1,27 @@
const Fastify = require('fastify');
const articleRoutes = require('./src/routes/articles');
const statusRoutes = require('./src/routes/status');
const config = require('./src/config');
const { startScheduler, runAllIngestions } = require('./src/scheduler');
const app = Fastify({ logger: true });
app.register(articleRoutes);
app.register(statusRoutes);
app.get('/', async () => ({ ok: true }));
async function start() {
await app.listen({ port: config.server.port, host: config.server.host });
runAllIngestions().catch((error) => {
app.log.error(error);
});
startScheduler();
}
start().catch((error) => {
app.log.error(error);
process.exit(1);
});

7
src/config.js Normal file
View file

@ -0,0 +1,7 @@
const fs = require('fs');
const path = require('path');
const configPath = path.join(__dirname, '..', 'config.json');
const config = JSON.parse(fs.readFileSync(configPath, 'utf8'));
module.exports = config;

203
src/content.js Normal file
View file

@ -0,0 +1,203 @@
const { extract } = require('@extractus/article-extractor');
const sharp = require('sharp');
const db = require('./db');
const { generateAndStoreEmbedding } = require('./embeddings');
const { fetchWithPolicy } = require('./http');
const updateArticleAssets = db.prepare(`
UPDATE articles
SET content = ?, image = ?, content_status = 'ready', content_error = NULL, content_attempted_at = ?
WHERE id = ?
`);
const markContentSkipped = db.prepare(`
UPDATE articles
SET content_status = 'skipped', content_error = ?, content_attempted_at = ?
WHERE id = ?
`);
const markContentFailed = db.prepare(`
UPDATE articles
SET content_status = 'failed', content_error = ?, content_attempted_at = ?
WHERE id = ?
`);
const markContentPending = db.prepare(`
UPDATE articles
SET content_status = NULL, content_error = NULL, content_attempted_at = ?
WHERE id = ?
`);
const selectArticlesMissingContent = db.prepare(`
SELECT id, url
FROM articles
WHERE (content IS NULL OR TRIM(content) = '')
AND (content_status IS NULL OR content_status = 'pending')
ORDER BY ingested_at DESC, id DESC
LIMIT ?
`);
const blockedContentDomains = [
'axios.com',
'bizjournals.com',
'fastcompany.com',
'gurufocus.com',
'investing.com',
'rbc.ru',
'stocktitan.net',
];
const loggedBlockedDomains = new Set();
const articleFetchHeaders = {
Accept: 'text/html,application/xhtml+xml',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36',
};
let contentBackfillRunning = false;
function getHostname(url) {
try {
return new URL(url).hostname.toLowerCase();
} catch {
return '';
}
}
function isBlockedContentUrl(url) {
const hostname = getHostname(url);
return blockedContentDomains.some((domain) => hostname === domain || hostname.endsWith(`.${domain}`));
}
function getErrorStatus(error) {
if (error && Number.isInteger(error.status)) {
return error.status;
}
const match = String(error && error.message || '').match(/\b(401|403|404|408|429|5\d\d)\b/);
return match ? Number(match[1]) : null;
}
function getErrorMessage(error, fallback) {
const message = String(error && error.message || fallback || '').trim();
return message ? message.slice(0, 500) : null;
}
function markArticleStatus(statement, id, message) {
statement.run(message, new Date().toISOString(), id);
}
async function fetchCompressedImage(url) {
const response = await fetchWithPolicy(url, {
retries: 1,
headers: {
Accept: 'image/*',
},
});
if (!response.ok) {
const error = new Error(`image request failed with ${response.status}`);
error.status = response.status;
throw error;
}
const contentType = String(response.headers.get('content-type') || '').toLowerCase();
if (!contentType.startsWith('image/')) {
throw new Error(`image request returned ${contentType || 'unknown content-type'}`);
}
const input = Buffer.from(await response.arrayBuffer());
if (input.length === 0) {
throw new Error('image request returned an empty body');
}
const output = await sharp(input)
.rotate()
.resize({ width: 320, height: 320, fit: 'inside', withoutEnlargement: true })
.webp({ quality: 25 })
.toBuffer();
return output.toString('base64');
}
async function fetchAndStoreContent(id, url) {
try {
if (isBlockedContentUrl(url)) {
const hostname = getHostname(url);
if (hostname && !loggedBlockedDomains.has(hostname)) {
loggedBlockedDomains.add(hostname);
console.warn(`content extraction skipped for blocked domain ${hostname}`);
}
markArticleStatus(markContentSkipped, id, `blocked domain: ${hostname || 'unknown'}`);
return;
}
const article = await extract(url, {}, {
headers: articleFetchHeaders,
signal: AbortSignal.timeout(20000),
});
if (!article) {
markArticleStatus(markContentSkipped, id, 'extractor returned no article');
return;
}
const content = typeof article.content === 'string'
? article.content.replace(/<[^>]+>/g, ' ').replace(/\s+/g, ' ').trim() || null
: null;
let image = null;
if (article.image) {
try {
image = await fetchCompressedImage(article.image);
} catch (error) {
const status = getErrorStatus(error);
if (status === 401 || status === 403 || status === 404 || status === 429) {
console.warn(`image fetch skipped for ${url}: upstream returned ${status}`);
} else {
console.error(`image fetch failed for ${url}:`, error);
}
}
}
if (!content && !image) {
markArticleStatus(markContentSkipped, id, 'article had no extractable content or image');
return;
}
updateArticleAssets.run(content, image, new Date().toISOString(), id);
await generateAndStoreEmbedding(id);
} catch (error) {
const status = getErrorStatus(error);
if (status === 401 || status === 403 || status === 404) {
console.warn(`content fetch skipped for ${url}: upstream returned ${status}`);
markArticleStatus(markContentSkipped, id, `upstream returned ${status}`);
return;
}
if (status === 408 || status === 429 || (status && status >= 500)) {
console.warn(`content fetch deferred for ${url}: upstream returned ${status}`);
markArticleStatus(markContentPending, id, null);
return;
}
markArticleStatus(markContentFailed, id, getErrorMessage(error, 'content fetch failed'));
console.error(`content fetch failed for ${url}:`, error);
}
}
async function backfillMissingContent(limit = 10) {
if (contentBackfillRunning) {
return;
}
contentBackfillRunning = true;
try {
const rows = selectArticlesMissingContent.all(limit);
for (const row of rows) {
await fetchAndStoreContent(row.id, row.url);
}
} finally {
contentBackfillRunning = false;
}
}
module.exports = {
fetchAndStoreContent,
backfillMissingContent,
};

141
src/db.js Normal file
View file

@ -0,0 +1,141 @@
const path = require('path');
const Database = require('better-sqlite3');
const sqliteVec = require('sqlite-vec');
const config = require('./config');
const dbPath = path.resolve(__dirname, '..', config.database.path || './archive.sqlite');
const db = new Database(dbPath);
sqliteVec.load(db);
db.pragma('journal_mode = WAL');
db.exec(`
CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
description TEXT,
content TEXT,
image TEXT,
content_status TEXT,
content_error TEXT,
content_attempted_at TEXT,
url TEXT NOT NULL UNIQUE,
normalized_title TEXT NOT NULL,
source TEXT NOT NULL,
pub_date TEXT,
ingested_at TEXT NOT NULL DEFAULT (datetime('now'))
);
`);
function rebuildArticlesTableIfNeeded() {
const indexes = db.prepare(`PRAGMA index_list('articles')`).all();
const hasUniqueNormalizedTitleIndex = indexes.some((index) => {
if (index.origin !== 'u' || !index.name) {
return false;
}
const columns = db.prepare(`PRAGMA index_info('${index.name.replace(/'/g, "''")}')`).all();
return columns.length === 1 && columns[0].name === 'normalized_title';
});
if (!hasUniqueNormalizedTitleIndex) {
return;
}
db.exec(`
BEGIN;
CREATE TABLE articles_rebuild (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
description TEXT,
content TEXT,
image TEXT,
content_status TEXT,
content_error TEXT,
content_attempted_at TEXT,
url TEXT NOT NULL UNIQUE,
normalized_title TEXT NOT NULL,
source TEXT NOT NULL,
pub_date TEXT,
ingested_at TEXT NOT NULL DEFAULT (datetime('now'))
);
INSERT INTO articles_rebuild (
id,
title,
description,
content,
image,
content_status,
content_error,
content_attempted_at,
url,
normalized_title,
source,
pub_date,
ingested_at
)
SELECT
id,
title,
description,
content,
image,
content_status,
content_error,
content_attempted_at,
url,
normalized_title,
source,
pub_date,
ingested_at
FROM articles;
DROP TABLE articles;
ALTER TABLE articles_rebuild RENAME TO articles;
COMMIT;
`);
}
rebuildArticlesTableIfNeeded();
db.exec(`
CREATE INDEX IF NOT EXISTS idx_articles_source ON articles(source);
CREATE INDEX IF NOT EXISTS idx_articles_pub_date ON articles(pub_date);
CREATE INDEX IF NOT EXISTS idx_articles_ingested_at ON articles(ingested_at);
CREATE INDEX IF NOT EXISTS idx_articles_normalized_title ON articles(normalized_title);
`);
db.exec(`
CREATE VIRTUAL TABLE IF NOT EXISTS article_embeddings USING vec0(
article_id INTEGER PRIMARY KEY,
embedding FLOAT[1024]
);
`);
db.exec(`
CREATE TABLE IF NOT EXISTS query_embeddings (
query TEXT PRIMARY KEY,
embedding BLOB NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
`);
for (const statement of [
'ALTER TABLE articles ADD COLUMN image TEXT',
'ALTER TABLE articles ADD COLUMN content_status TEXT',
'ALTER TABLE articles ADD COLUMN content_error TEXT',
'ALTER TABLE articles ADD COLUMN content_attempted_at TEXT'
]) {
try {
db.exec(statement);
} catch (error) {
if (!String(error.message).includes('duplicate column name')) {
throw error;
}
}
}
module.exports = db;

11
src/dedup.js Normal file
View file

@ -0,0 +1,11 @@
function normalizeTitle(title) {
return String(title || '')
.toLowerCase()
.replace(/[^a-z0-9\s]/g, ' ')
.replace(/\s+/g, ' ')
.trim();
}
module.exports = {
normalizeTitle,
};

242
src/embeddings.js Normal file
View file

@ -0,0 +1,242 @@
const db = require('./db');
const config = require('./config');
const selectArticleForEmbedding = db.prepare(`
SELECT id, title, description, content
FROM articles
WHERE id = ?
`);
const insertEmbedding = db.prepare(`
INSERT INTO article_embeddings (article_id, embedding)
VALUES (?, ?)
`);
const deleteEmbedding = db.prepare(`
DELETE FROM article_embeddings
WHERE article_id = ?
`);
const selectEmbeddingBuffer = db.prepare(`
SELECT embedding
FROM article_embeddings
WHERE article_id = ?
`);
const nearestNeighbors = db.prepare(`
SELECT article_id, distance
FROM article_embeddings
WHERE embedding MATCH ?
AND k = ?
ORDER BY distance
`);
const selectArticlesMissingEmbeddings = db.prepare(`
SELECT a.id
FROM articles a
WHERE a.title IS NOT NULL
AND TRIM(a.title) != ''
AND a.description IS NOT NULL
AND TRIM(a.description) != ''
AND a.content IS NOT NULL
AND TRIM(a.content) != ''
AND NOT EXISTS (
SELECT 1
FROM article_embeddings e
WHERE e.article_id = a.id
)
ORDER BY a.ingested_at ASC, a.id ASC
LIMIT ?
`);
const selectQueryEmbedding = db.prepare(`
SELECT embedding
FROM query_embeddings
WHERE query = ?
`);
const upsertQueryEmbedding = db.prepare(`
INSERT INTO query_embeddings (query, embedding)
VALUES (?, ?)
ON CONFLICT(query) DO UPDATE SET
embedding = excluded.embedding,
created_at = datetime('now')
`);
let embeddingBackfillRunning = false;
const embeddingJobsRunning = new Set();
function buildEmbeddingInput(article) {
const title = typeof article.title === 'string' ? article.title.trim() : '';
const description = typeof article.description === 'string' ? article.description.trim() : '';
const content = typeof article.content === 'string' ? article.content.trim() : '';
if (!title || !description || !content) {
return '';
}
return [title, description, content].join('\n\n');
}
function serializeEmbedding(values) {
return Buffer.from(new Float32Array(values).buffer);
}
function normalizeQuery(input) {
return String(input || '')
.trim()
.toLowerCase()
.replace(/\s+/g, ' ');
}
async function requestEmbedding(input) {
const response = await fetch('https://openrouter.ai/api/v1/embeddings', {
method: 'POST',
headers: {
Authorization: `Bearer ${String(config.openRouter.apiKey).trim()}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
model: 'perplexity/pplx-embed-v1-0.6b',
input,
}),
});
if (!response.ok) {
let message = `embedding request failed with ${response.status}`;
try {
const payload = await response.json();
const errorMessage = payload && payload.error && payload.error.message;
if (errorMessage) {
message = errorMessage;
}
} catch (error) {
console.error('failed to parse embedding error response:', error);
}
const requestError = new Error(message);
requestError.status = response.status;
throw requestError;
}
const payload = await response.json();
const embedding = payload && payload.data && payload.data[0] && payload.data[0].embedding;
if (!Array.isArray(embedding) || embedding.length !== 1024) {
throw new Error(`unexpected embedding length: ${Array.isArray(embedding) ? embedding.length : 'missing'}`);
}
return embedding;
}
async function generateAndStoreEmbedding(id) {
const apiKey = config.openRouter && config.openRouter.apiKey
? String(config.openRouter.apiKey).trim()
: '';
if (!apiKey) {
return { stored: false, shouldPauseBatch: false };
}
if (embeddingJobsRunning.has(id)) {
return { stored: false, shouldPauseBatch: false };
}
if (selectEmbeddingBuffer.get(id)) {
return { stored: false, shouldPauseBatch: false };
}
const article = selectArticleForEmbedding.get(id);
if (!article) {
return { stored: false, shouldPauseBatch: false };
}
const input = buildEmbeddingInput(article);
if (!input) {
return { stored: false, shouldPauseBatch: false };
}
embeddingJobsRunning.add(id);
try {
if (selectEmbeddingBuffer.get(id)) {
return { stored: false, shouldPauseBatch: false };
}
const embedding = await requestEmbedding(input);
deleteEmbedding.run(BigInt(id));
insertEmbedding.run(BigInt(id), serializeEmbedding(embedding));
return { stored: true, shouldPauseBatch: false };
} catch (error) {
console.error(`embedding generation failed for article ${id}:`, error);
return {
stored: false,
shouldPauseBatch: error && error.status === 402,
};
} finally {
embeddingJobsRunning.delete(id);
}
}
async function backfillMissingEmbeddings(limit = 100) {
if (embeddingBackfillRunning) {
return;
}
embeddingBackfillRunning = true;
try {
const rows = selectArticlesMissingEmbeddings.all(limit);
for (const row of rows) {
const result = await generateAndStoreEmbedding(row.id);
if (result.shouldPauseBatch) {
break;
}
}
} finally {
embeddingBackfillRunning = false;
}
}
function getEmbeddingBuffer(articleId) {
const row = selectEmbeddingBuffer.get(articleId);
return row ? row.embedding : null;
}
function findArticlesByEmbedding(embedding, limit) {
if (!embedding) {
return [];
}
return nearestNeighbors.all(embedding, limit)
.map((row) => ({
articleId: Number(row.article_id),
distance: row.distance,
}));
}
async function getOrCreateQueryEmbedding(query) {
const normalizedQuery = normalizeQuery(query);
if (!normalizedQuery) {
return null;
}
const cached = selectQueryEmbedding.get(normalizedQuery);
if (cached) {
return cached.embedding;
}
const embedding = await requestEmbedding(normalizedQuery);
const buffer = serializeEmbedding(embedding);
upsertQueryEmbedding.run(normalizedQuery, buffer);
return buffer;
}
function findSimilarArticles(articleId, limit) {
return findArticlesByEmbedding(getEmbeddingBuffer(articleId), limit + 1)
.filter((row) => row.articleId !== Number(articleId))
.slice(0, limit);
}
module.exports = {
backfillMissingEmbeddings,
findArticlesByEmbedding,
generateAndStoreEmbedding,
findSimilarArticles,
getEmbeddingBuffer,
getOrCreateQueryEmbedding,
};

96
src/http.js Normal file
View file

@ -0,0 +1,96 @@
const DEFAULT_HEADERS = {
'User-Agent': 'duriin_api/1.0',
Accept: 'application/json, text/plain, */*',
};
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
function isRetryableStatus(status) {
return status === 429 || status >= 500;
}
function getRetryDelay(attempt, response) {
const retryAfter = response && response.headers ? response.headers.get('retry-after') : null;
if (retryAfter) {
const seconds = Number(retryAfter);
if (Number.isFinite(seconds) && seconds >= 0) {
return Math.min(seconds * 1000, 30000);
}
const retryDate = new Date(retryAfter).getTime();
if (!Number.isNaN(retryDate)) {
return Math.max(Math.min(retryDate - Date.now(), 30000), 0);
}
}
const baseDelay = Math.min(1000 * (2 ** attempt), 10000);
return baseDelay + Math.floor(Math.random() * 250);
}
async function fetchWithPolicy(url, options = {}) {
const {
timeout = 20000,
retries = 2,
headers = {},
...fetchOptions
} = options;
let lastError;
for (let attempt = 0; attempt <= retries; attempt += 1) {
let response;
try {
response = await fetch(url, {
...fetchOptions,
signal: fetchOptions.signal || AbortSignal.timeout(timeout),
headers: {
...DEFAULT_HEADERS,
...headers,
},
});
if (response.ok || !isRetryableStatus(response.status)) {
return response;
}
const error = new Error(`Request failed with ${response.status} for ${url}`);
error.status = response.status;
lastError = error;
} catch (error) {
lastError = error;
}
if (attempt < retries) {
await sleep(getRetryDelay(attempt, response));
}
}
throw lastError;
}
async function fetchJson(url, options = {}) {
const response = await fetchWithPolicy(url, {
...options,
headers: {
Accept: 'application/json',
...(options.headers || {}),
},
});
if (!response.ok) {
const error = new Error(`Request failed with ${response.status} for ${url}`);
error.status = response.status;
throw error;
}
return response.json();
}
module.exports = {
fetchJson,
fetchWithPolicy,
};

120
src/ingest.js Normal file
View file

@ -0,0 +1,120 @@
const db = require('./db');
const { normalizeTitle } = require('./dedup');
const { fetchAndStoreContent } = require('./content');
const { markSourceRun } = require('./state');
const insertArticle = db.prepare(`
INSERT INTO articles (
title,
description,
content,
image,
url,
normalized_title,
source,
pub_date,
ingested_at
) VALUES (?, ?, NULL, NULL, ?, ?, ?, ?, ?)
`);
const findByUrl = db.prepare('SELECT id FROM articles WHERE url = ?');
function normalizePubDate(value) {
if (!value) {
return null;
}
if (typeof value === 'number') {
const parsed = new Date(value);
return Number.isNaN(parsed.getTime()) ? null : parsed.toISOString();
}
const input = String(value).trim();
if (!input) {
return null;
}
if (/^\d{8}T\d{6}$/.test(input)) {
const normalized = `${input.slice(0, 4)}-${input.slice(4, 6)}-${input.slice(6, 8)}T${input.slice(9, 11)}:${input.slice(11, 13)}:${input.slice(13, 15)}Z`;
const parsed = new Date(normalized);
return Number.isNaN(parsed.getTime()) ? null : parsed.toISOString();
}
if (/^\d{8}T\d{6}Z$/.test(input)) {
const normalized = `${input.slice(0, 4)}-${input.slice(4, 6)}-${input.slice(6, 8)}T${input.slice(9, 11)}:${input.slice(11, 13)}:${input.slice(13, 15)}Z`;
const parsed = new Date(normalized);
return Number.isNaN(parsed.getTime()) ? null : parsed.toISOString();
}
if (/^\d{4}-\d{2}-\d{2}$/.test(input)) {
return `${input}T00:00:00.000Z`;
}
const parsed = new Date(input);
return Number.isNaN(parsed.getTime()) ? null : parsed.toISOString();
}
function ingestArticle(article) {
const title = String(article.title || '').trim();
const url = String(article.url || '').trim();
const source = String(article.source || '').trim();
if (!title || !url || !source) {
return { inserted: false, reason: 'missing_required_fields' };
}
const normalizedTitle = normalizeTitle(title);
if (!normalizedTitle) {
return { inserted: false, reason: 'empty_normalized_title' };
}
const description = article.description == null ? null : String(article.description).trim() || null;
const pubDate = normalizePubDate(article.pubDate);
const ingestedAt = new Date().toISOString();
try {
const result = insertArticle.run(
title,
description,
url,
normalizedTitle,
source,
pubDate,
ingestedAt
);
fetchAndStoreContent(result.lastInsertRowid, url);
return { inserted: true, id: result.lastInsertRowid };
} catch (error) {
if (error.code === 'SQLITE_CONSTRAINT_UNIQUE') {
const duplicateByUrl = findByUrl.get(url);
if (duplicateByUrl) {
return { inserted: false, reason: 'duplicate_url', id: duplicateByUrl.id };
}
return { inserted: false, reason: 'duplicate' };
}
throw error;
}
}
async function ingestBatch(source, articles) {
let inserted = 0;
for (const article of articles) {
const result = ingestArticle({ ...article, source: article.source || source });
if (result.inserted) {
inserted += 1;
}
}
markSourceRun(source);
return { source, inserted, total: articles.length };
}
module.exports = {
ingestArticle,
ingestBatch,
};

155
src/routes/articles.js Normal file
View file

@ -0,0 +1,155 @@
const db = require('../db');
const {
findArticlesByEmbedding,
findSimilarArticles,
getEmbeddingBuffer,
getOrCreateQueryEmbedding,
} = require('../embeddings');
function buildArticlesQuery(query) {
const conditions = [];
const params = [];
const includeEmbedding = String(query.include_embedding || '').toLowerCase() === 'true';
if (query.q) {
conditions.push('(title LIKE ? OR description LIKE ? OR content LIKE ?)');
const keyword = `%${query.q}%`;
params.push(keyword, keyword, keyword);
}
if (query.source) {
conditions.push('source = ?');
params.push(query.source);
}
if (query.from) {
conditions.push('pub_date >= ?');
params.push(query.from);
}
if (query.to) {
conditions.push('pub_date <= ?');
params.push(query.to);
}
const whereClause = conditions.length > 0 ? `WHERE ${conditions.join(' AND ')}` : '';
const limit = Number.parseInt(query.limit, 10);
const offset = Number.parseInt(query.offset, 10);
params.push(Number.isFinite(limit) && limit > 0 ? Math.min(limit, 100) : 20);
params.push(Number.isFinite(offset) && offset >= 0 ? offset : 0);
return {
sql: `
SELECT id, title, description, content, image, ${includeEmbedding ? 'embedding,' : ''} url, normalized_title, source, pub_date, ingested_at
FROM articles
${whereClause}
ORDER BY COALESCE(pub_date, ingested_at) DESC, id DESC
LIMIT ? OFFSET ?
`,
params,
};
}
async function articleRoutes(fastify) {
fastify.get('/articles', async (request, reply) => {
const query = request.query || {};
if (query.include_embedding) {
reply.code(400);
return { error: 'Embeddings are not returned directly. Use similar_to for vector search.' };
}
if (query.topic !== undefined) {
const limit = Number.parseInt(query.limit, 10);
const embedding = await getOrCreateQueryEmbedding(query.topic);
if (!embedding) {
reply.code(400);
return { error: 'Topic must not be empty' };
}
const neighbors = findArticlesByEmbedding(
embedding,
Number.isFinite(limit) && limit > 0 ? Math.min(limit, 100) : 20
);
const ids = neighbors.map((row) => row.articleId);
if (ids.length === 0) {
return [];
}
const placeholders = ids.map(() => '?').join(', ');
const articles = db.prepare(`
SELECT id, title, description, content, image, url, normalized_title, source, pub_date, ingested_at
FROM articles
WHERE id IN (${placeholders})
`).all(...ids);
const byId = new Map(articles.map((article) => [article.id, article]));
return neighbors
.map((row) => {
const article = byId.get(row.articleId);
return article ? { ...article, distance: row.distance } : null;
})
.filter(Boolean);
}
if (query.similar_to) {
const limit = Number.parseInt(query.limit, 10);
const articleId = Number.parseInt(query.similar_to, 10);
const neighbors = findSimilarArticles(
articleId,
Number.isFinite(limit) && limit > 0 ? Math.min(limit, 100) : 20
);
if (neighbors.length === 0 && !getEmbeddingBuffer(articleId)) {
reply.code(404);
return { error: 'Embedding not found for article' };
}
const ids = neighbors.map((row) => row.articleId);
if (ids.length === 0) {
return [];
}
const placeholders = ids.map(() => '?').join(', ');
const articles = db.prepare(`
SELECT id, title, description, content, image, url, normalized_title, source, pub_date, ingested_at
FROM articles
WHERE id IN (${placeholders})
`).all(...ids);
const byId = new Map(articles.map((article) => [article.id, article]));
return neighbors
.map((row) => {
const article = byId.get(row.articleId);
return article ? { ...article, distance: row.distance } : null;
})
.filter(Boolean);
}
const { sql, params } = buildArticlesQuery(query);
return db.prepare(sql).all(...params);
});
fastify.get('/articles/:id', async (request, reply) => {
if (String((request.query || {}).include_embedding || '').toLowerCase() === 'true') {
reply.code(400);
return { error: 'Embeddings are not returned directly. Use similar_to for vector search.' };
}
const article = db.prepare(`
SELECT id, title, description, content, image, url, normalized_title, source, pub_date, ingested_at
FROM articles
WHERE id = ?
`).get(request.params.id);
if (!article) {
reply.code(404);
return { error: 'Article not found' };
}
return article;
});
}
module.exports = articleRoutes;

34
src/routes/status.js Normal file
View file

@ -0,0 +1,34 @@
const db = require('../db');
const { getLastIngestionBySource } = require('../state');
async function statusRoutes(fastify) {
fastify.get('/status', async () => {
const total = db.prepare('SELECT COUNT(*) AS count FROM articles').get().count;
const bySourceRows = db.prepare('SELECT source, COUNT(*) AS count FROM articles GROUP BY source ORDER BY source').all();
const contentCoverage = db.prepare(`
SELECT
COUNT(*) AS total,
SUM(CASE WHEN content IS NOT NULL AND TRIM(content) <> '' THEN 1 ELSE 0 END) AS with_content,
SUM(CASE WHEN image IS NOT NULL AND TRIM(image) <> '' THEN 1 ELSE 0 END) AS with_image
FROM articles
`).get();
const embeddingCoverage = db.prepare('SELECT COUNT(*) AS count FROM article_embeddings').get();
return {
totalArticles: total,
countsBySource: Object.fromEntries(bySourceRows.map((row) => [row.source, row.count])),
lastIngestionBySource: getLastIngestionBySource(),
contentFetchCoverage: {
withContent: contentCoverage.with_content || 0,
withImage: contentCoverage.with_image || 0,
withEmbedding: embeddingCoverage.count || 0,
total: contentCoverage.total || 0,
contentRatio: contentCoverage.total ? Number((contentCoverage.with_content / contentCoverage.total).toFixed(4)) : 0,
imageRatio: contentCoverage.total ? Number((contentCoverage.with_image / contentCoverage.total).toFixed(4)) : 0,
embeddingRatio: contentCoverage.total ? Number((embeddingCoverage.count / contentCoverage.total).toFixed(4)) : 0,
},
};
});
}
module.exports = statusRoutes;

85
src/scheduler.js Normal file
View file

@ -0,0 +1,85 @@
const cron = require('node-cron');
const config = require('./config');
const { ingestBatch } = require('./ingest');
const { fetchRssArticles } = require('./sources/rss');
const { fetchGdeltArticles } = require('./sources/gdelt');
const { fetchEdgarArticles } = require('./sources/edgar');
const { fetchAlphaVantageArticles } = require('./sources/alphavantage');
const { fetchFinnhubArticles } = require('./sources/finnhub');
const { backfillMissingContent } = require('./content');
const { backfillMissingEmbeddings } = require('./embeddings');
async function runSource(source, fetcher) {
try {
const articles = await fetcher();
return await ingestBatch(source, articles);
} catch (error) {
console.error(`${source} ingestion failed:`, error);
return { source, inserted: 0, total: 0, error: error.message };
}
}
async function runAllIngestions() {
const results = [];
results.push(await runSource('rss', fetchRssArticles));
results.push(await runSource('gdelt', fetchGdeltArticles));
results.push(await runSource('edgar', fetchEdgarArticles));
results.push(await runSource('alphavantage', fetchAlphaVantageArticles));
results.push(await runSource('finnhub', fetchFinnhubArticles));
try {
await backfillMissingContent();
} catch (error) {
console.error('content backfill failed:', error);
}
try {
await backfillMissingEmbeddings();
} catch (error) {
console.error('embedding backfill failed:', error);
}
return results;
}
function startScheduler() {
cron.schedule(config.scheduler.rss, async () => {
await runSource('rss', fetchRssArticles);
});
cron.schedule(config.scheduler.gdelt, async () => {
await runSource('gdelt', fetchGdeltArticles);
});
cron.schedule(config.scheduler.edgar, async () => {
await runSource('edgar', fetchEdgarArticles);
});
cron.schedule(config.scheduler.alphaVantage, async () => {
await runSource('alphavantage', fetchAlphaVantageArticles);
});
cron.schedule(config.scheduler.finnhub, async () => {
await runSource('finnhub', fetchFinnhubArticles);
});
cron.schedule('0 * * * *', async () => {
try {
await backfillMissingContent();
} catch (error) {
console.error('content backfill failed:', error);
}
try {
await backfillMissingEmbeddings();
} catch (error) {
console.error('embedding backfill failed:', error);
}
});
}
module.exports = {
startScheduler,
runAllIngestions,
};

View file

@ -0,0 +1,32 @@
const config = require('../config');
const { fetchJson } = require('../http');
async function fetchAlphaVantageArticles() {
if (!config.alphaVantage?.apiKey) {
return [];
}
const tickers = (config.alphaVantage.tickers || []).join(',');
const params = new URLSearchParams({
function: 'NEWS_SENTIMENT',
apikey: config.alphaVantage.apiKey,
});
if (tickers) {
params.set('tickers', tickers);
}
const data = await fetchJson(`https://www.alphavantage.co/query?${params.toString()}`);
return (data.feed || []).map((item) => ({
title: item.title,
description: item.summary || null,
url: item.url,
source: 'alphavantage',
pubDate: item.time_published || null,
})).filter((item) => item.title && item.url);
}
module.exports = {
fetchAlphaVantageArticles,
};

78
src/sources/edgar.js Normal file
View file

@ -0,0 +1,78 @@
const config = require('../config');
const { fetchJson } = require('../http');
let tickerMap;
function getHeaders() {
return {
headers: {
'User-Agent': config.sec.userAgent,
Accept: 'application/json',
},
};
}
async function getTickerMap() {
if (tickerMap) {
return tickerMap;
}
const data = await fetchJson('https://www.sec.gov/files/company_tickers.json', getHeaders());
tickerMap = Object.values(data).reduce((acc, company) => {
acc[String(company.ticker || '').toUpperCase()] = String(company.cik_str || '').padStart(10, '0');
return acc;
}, {});
return tickerMap;
}
async function fetchEdgarArticles() {
const tickers = config.sec?.tickers || [];
if (tickers.length === 0) {
return [];
}
const map = await getTickerMap();
const articles = [];
for (const ticker of tickers) {
const cik = map[String(ticker).toUpperCase()];
if (!cik) {
continue;
}
const data = await fetchJson(`https://data.sec.gov/submissions/CIK${cik}.json`, getHeaders());
const recent = data.filings?.recent;
if (!recent) {
continue;
}
for (let i = 0; i < recent.form.length; i += 1) {
if (recent.form[i] !== '8-K') {
continue;
}
const accession = String(recent.accessionNumber[i] || '').replace(/-/g, '');
const primaryDocument = String(recent.primaryDocument[i] || '').trim();
const filingDate = recent.filingDate[i] || null;
if (!accession || !primaryDocument) {
continue;
}
articles.push({
title: `${String(ticker).toUpperCase()} 8-K ${filingDate || accession}`,
description: `${String(data.name || ticker).trim()} filed Form 8-K`,
url: `https://www.sec.gov/Archives/edgar/data/${Number(cik)}/${accession}/${primaryDocument}`,
source: 'edgar',
pubDate: filingDate,
});
}
}
return articles;
}
module.exports = {
fetchEdgarArticles,
};

55
src/sources/finnhub.js Normal file
View file

@ -0,0 +1,55 @@
const config = require('../config');
const { fetchJson } = require('../http');
function getDateRange() {
const to = new Date();
const from = new Date(to);
from.setDate(from.getDate() - 1);
return {
from: from.toISOString().slice(0, 10),
to: to.toISOString().slice(0, 10),
};
}
async function fetchFinnhubArticles() {
if (!config.finnhub?.apiKey) {
return [];
}
const { from, to } = getDateRange();
const articles = [];
for (const symbol of config.finnhub.tickers || []) {
const params = new URLSearchParams({
symbol,
from,
to,
token: config.finnhub.apiKey,
});
const data = await fetchJson(`https://finnhub.io/api/v1/company-news?${params.toString()}`);
for (const item of data || []) {
const title = String(item.headline || '').trim();
const url = String(item.url || '').trim();
if (!title || !url) {
continue;
}
articles.push({
title,
description: item.summary || null,
url,
source: 'finnhub',
pubDate: item.datetime ? new Date(item.datetime * 1000).toISOString() : null,
});
}
}
return articles;
}
module.exports = {
fetchFinnhubArticles,
};

48
src/sources/gdelt.js Normal file
View file

@ -0,0 +1,48 @@
const config = require('../config');
const { fetchJson } = require('../http');
async function fetchGdeltArticles() {
const articles = [];
for (const query of config.gdelt?.queries || []) {
try {
const params = new URLSearchParams({
query,
mode: config.gdelt.mode || 'ArtList',
maxrecords: String(Math.min(config.gdelt.maxRecords || 10, 10)),
format: config.gdelt.format || 'json',
});
const data = await fetchJson(`https://api.gdeltproject.org/api/v2/doc/doc?${params.toString()}`);
for (const item of data.articles || []) {
const title = String(item.title || '').trim();
const url = String(item.url || '').trim();
if (!title || !url) {
continue;
}
articles.push({
title,
description: item.domain || null,
url,
source: 'gdelt',
pubDate: item.seendate || null,
});
}
} catch (error) {
if (error && error.status === 429) {
console.warn(`GDELT query skipped for rate limit: ${query}`);
continue;
}
console.error(`Failed to fetch GDELT query: ${query}`, error);
}
}
return articles;
}
module.exports = {
fetchGdeltArticles,
};

127
src/sources/rss.js Normal file
View file

@ -0,0 +1,127 @@
const Parser = require('rss-parser');
const config = require('../config');
const parser = new Parser({
timeout: 10000,
headers: {
'User-Agent': 'Mozilla/5.0',
Accept: 'application/rss+xml, application/xml, text/xml;q=0.9, */*;q=0.8',
},
});
const blockedFeedDomains = [
'arabnews.com',
'arabianbusiness.com',
'business-standard.com',
'cityam.com',
'eleconomista.com.mx',
'eleconomista.es',
'moneycontrol.com',
'thisismoney.co.uk',
];
const invalidFeedLabels = new Set([
'ABC Business AU',
'Australian Fin Review',
'Business Daily Africa',
'BusinessLive SA',
'Caixin Global',
'Cinco Dias',
'El Comercio Peru',
'FD.nl',
'Gulf News Business',
'Il Sole 24 Ore',
'Infobae Economia AR',
'Japan Times Business',
'Korea JoongAng Daily',
'Les Echos',
'Live Mint',
'NZ Herald Business',
'Portafolio Colombia',
'The Star Malaysia',
'Xinhua Business',
]);
const malformedFeedLabels = new Set([
'BFM Business',
'Business Daily Africa',
]);
const loggedBlockedFeeds = new Set();
const loggedInvalidFeeds = new Set();
function getHostname(url) {
try {
return new URL(url).hostname.toLowerCase();
} catch {
return '';
}
}
function isBlockedFeed(feed) {
const hostname = getHostname(feed.url);
return blockedFeedDomains.some((domain) => hostname === domain || hostname.endsWith(`.${domain}`));
}
function isMalformedFeedError(error) {
const message = String(error && error.message || '');
return message.includes('Invalid character in entity name') || message.includes('Attribute without value');
}
async function fetchRssArticles() {
const articles = [];
for (const feed of config.rssFeeds || []) {
const label = feed.label || feed.url;
if (invalidFeedLabels.has(label)) {
if (!loggedInvalidFeeds.has(label)) {
loggedInvalidFeeds.add(label);
console.warn(`RSS feed skipped for invalid endpoint ${label}`);
}
continue;
}
if (isBlockedFeed(feed)) {
const hostname = getHostname(feed.url);
if (!loggedBlockedFeeds.has(hostname)) {
loggedBlockedFeeds.add(hostname);
console.warn(`RSS feed skipped for blocked domain ${hostname}`);
}
continue;
}
try {
const parsed = await parser.parseURL(feed.url);
for (const item of parsed.items || []) {
const title = String(item.title || '').trim();
const url = String(item.link || item.guid || '').trim();
if (!title || !url) {
continue;
}
articles.push({
title,
description: item.contentSnippet || item.content || item.summary || null,
url,
source: feed.label ? `rss:${feed.label}` : 'rss',
pubDate: item.isoDate || item.pubDate || null,
});
}
} catch (error) {
if (malformedFeedLabels.has(label) && isMalformedFeedError(error)) {
if (!loggedInvalidFeeds.has(label)) {
loggedInvalidFeeds.add(label);
console.warn(`RSS feed skipped for malformed XML ${label}`);
}
continue;
}
console.error(`Failed to fetch RSS feed: ${label}`, error);
}
}
return articles;
}
module.exports = {
fetchRssArticles,
};

14
src/state.js Normal file
View file

@ -0,0 +1,14 @@
const ingestionState = new Map();
function markSourceRun(source) {
ingestionState.set(source, new Date().toISOString());
}
function getLastIngestionBySource() {
return Object.fromEntries(ingestionState.entries());
}
module.exports = {
markSourceRun,
getLastIngestionBySource,
};