Duriin-API/scripts/migrate-to-postgres.js

284 lines
7.9 KiB
JavaScript

#!/usr/bin/env node
// Migrates an existing SQLite archive.sqlite database into postgres.
// Run once after setting database.type = "postgres" in config.json.
//
// Usage: node scripts/migrate-to-postgres.js [--dry-run]
//
// What it does:
// 1. connects to postgres (using config.database.postgres)
// 2. ensures all tables exist (db-pg.js already creates them on require)
// 3. streams every row from each sqlite table and upserts into postgres
// 4. migrates article_embeddings blobs from article_embedding_store
// into the postgres vector column (requires pgvector extension)
//
// Skips rows that already exist (safe to re-run).
const path = require('path');
const Database = require('better-sqlite3');
const sqliteVec = require('sqlite-vec');
const { Pool } = require('pg');
const config = require('../src/config');
const DRY_RUN = process.argv.includes('--dry-run');
if (!config.database || config.database.type !== 'postgres') {
console.error('[migrate] config.database.type must be "postgres" to run this script');
process.exit(1);
}
const sqlitePath = path.resolve(__dirname, '..', config.database.path || './archive.sqlite');
let sqlite;
try {
sqlite = new Database(sqlitePath, { readonly: true });
sqliteVec.load(sqlite);
} catch (e) {
console.error(`[migrate] could not open sqlite at ${sqlitePath}:`, e.message);
process.exit(1);
}
const pool = new Pool(config.database.postgres);
async function query(sql, params = []) {
const client = await pool.connect();
try {
return await client.query(sql, params);
} finally {
client.release();
}
}
// tables to migrate in order (respects FK: events before articles)
const TABLES = [
{
name: 'events',
columns: ['id', 'title', 'created_at'],
conflict: 'id',
},
{
name: 'articles',
columns: [
'id', 'title', 'description', 'content', 'image',
'content_status', 'content_error', 'content_attempted_at',
'content_attempt_count', 'content_retry_after',
'is_index_page', 'has_embedding', 'url', 'normalized_title',
'source', 'pub_date', 'pub_date_effective', 'language',
'event_id', 'ingested_at',
],
conflict: 'url',
},
{
name: 'article_embedding_store',
columns: ['article_id', 'model', 'embedding', 'embedded_at'],
conflict: '(article_id, model)',
blobColumns: ['embedding'],
},
{
name: 'article_embedding_meta',
columns: ['article_id', 'model', 'embedded_at'],
conflict: 'article_id',
},
{
name: 'query_embeddings',
columns: ['query', 'model', 'embedding', 'created_at'],
conflict: '(query, model)',
blobColumns: ['embedding'],
},
{
name: 'gdelt_backfill_windows',
columns: ['source_id', 'window_start', 'window_end', 'completed_at'],
conflict: '(source_id, window_start, window_end)',
},
{
name: 'crawler_page_classifications',
columns: ['url', 'site_name', 'classification', 'pattern', 'classified_at'],
conflict: 'url',
},
{
name: 'crawler_url_patterns',
columns: ['site_name', 'pattern', 'classification', 'hit_count', 'updated_at'],
conflict: '(site_name, pattern)',
},
{
name: 'crawler_site_rules',
columns: ['site_name', 'rule_type', 'rule_value', 'classification', 'hit_count', 'updated_at'],
conflict: '(site_name, rule_type, rule_value)',
},
{
name: 'domain_fetch_policy',
columns: [
'domain', 'policy', 'consecutive_plain_failures', 'consecutive_browser_failures',
'plain_success_count', 'browser_success_count', 'expires_at', 'updated_at',
],
conflict: 'domain',
},
];
async function migrateTable(table) {
let rows;
try {
rows = sqlite.prepare(`SELECT * FROM ${table.name}`).all();
} catch (e) {
console.log(`[migrate] skipping ${table.name}: ${e.message}`);
return 0;
}
if (rows.length === 0) {
console.log(`[migrate] ${table.name}: empty, skipping`);
return 0;
}
console.log(`[migrate] ${table.name}: ${rows.length} rows`);
if (DRY_RUN) return rows.length;
const cols = table.columns;
const placeholders = cols.map((_, i) => `$${i + 1}`).join(', ');
const colList = cols.join(', ');
const sql = `
INSERT INTO ${table.name} (${colList})
VALUES (${placeholders})
ON CONFLICT (${table.conflict}) DO NOTHING
`;
let inserted = 0;
const BATCH = 500;
for (let i = 0; i < rows.length; i += BATCH) {
const batch = rows.slice(i, i + BATCH);
const client = await pool.connect();
try {
await client.query('BEGIN');
for (const row of batch) {
const vals = cols.map(col => {
const v = row[col];
// sqlite blobs come back as Buffer — postgres bytea also accepts Buffer, good
if (table.blobColumns && table.blobColumns.includes(col) && v instanceof Buffer) {
return v;
}
return v ?? null;
});
const r = await client.query(sql, vals);
inserted += r.rowCount;
}
await client.query('COMMIT');
} catch (e) {
await client.query('ROLLBACK');
throw e;
} finally {
client.release();
}
process.stdout.write(`\r[migrate] ${table.name}: ${Math.min(i + BATCH, rows.length)}/${rows.length}`);
}
console.log(`\r[migrate] ${table.name}: inserted ${inserted} new rows`);
return inserted;
}
async function migrateVectors() {
// article_embeddings in sqlite is a vec0 virtual table —
// we can't SELECT * from it directly but we can read raw float32
// arrays from article_embedding_store and push them to postgres vector column.
//
// Requires pgvector extension: CREATE EXTENSION IF NOT EXISTS vector;
let hasPgvector = false;
try {
await query(`SELECT NULL::vector`);
hasPgvector = true;
} catch (_) {}
if (!hasPgvector) {
console.log('[migrate] pgvector not available — skipping article_embeddings vector migration');
console.log('[migrate] run: CREATE EXTENSION IF NOT EXISTS vector; then re-run this script');
return;
}
const rows = sqlite.prepare(`
SELECT article_id, embedding FROM article_embedding_store
`).all();
if (rows.length === 0) return;
console.log(`[migrate] article_embeddings: ${rows.length} vectors`);
if (DRY_RUN) return;
const client = await pool.connect();
let inserted = 0;
try {
await client.query('BEGIN');
for (const row of rows) {
// embedding is a Float32 buffer — convert to float array for pgvector
const buf = row.embedding;
const floats = [];
for (let i = 0; i < buf.length; i += 4) {
floats.push(buf.readFloatLE(i));
}
const vec = '[' + floats.join(',') + ']';
const r = await client.query(`
INSERT INTO article_embeddings (article_id, embedding)
VALUES ($1, $2::vector)
ON CONFLICT (article_id) DO NOTHING
`, [row.article_id, vec]);
inserted += r.rowCount;
}
await client.query('COMMIT');
} catch (e) {
await client.query('ROLLBACK');
console.error('[migrate] vector migration failed:', e.message);
} finally {
client.release();
}
console.log(`[migrate] article_embeddings: inserted ${inserted} vectors`);
}
async function resetSequences() {
// after bulk insert with explicit IDs, postgres sequences need to be reset
for (const t of [{ table: 'articles' }, { table: 'events' }]) {
await query(`
SELECT setval(
pg_get_serial_sequence('${t.table}', 'id'),
COALESCE((SELECT MAX(id) FROM ${t.table}), 1)
)
`);
}
console.log('[migrate] sequences reset');
}
async function main() {
console.log(`[migrate] sqlite → postgres (dry-run: ${DRY_RUN})`);
console.log(`[migrate] source: ${sqlitePath}`);
for (const table of TABLES) {
await migrateTable(table);
}
await migrateVectors();
if (!DRY_RUN) await resetSequences();
console.log('[migrate] done');
await pool.end();
sqlite.close();
}
main().catch(e => {
console.error('[migrate] fatal:', e);
process.exit(1);
});