284 lines
7.9 KiB
JavaScript
284 lines
7.9 KiB
JavaScript
#!/usr/bin/env node
|
|
// Migrates an existing SQLite archive.sqlite database into postgres.
|
|
// Run once after setting database.type = "postgres" in config.json.
|
|
//
|
|
// Usage: node scripts/migrate-to-postgres.js [--dry-run]
|
|
//
|
|
// What it does:
|
|
// 1. connects to postgres (using config.database.postgres)
|
|
// 2. ensures all tables exist (db-pg.js already creates them on require)
|
|
// 3. streams every row from each sqlite table and upserts into postgres
|
|
// 4. migrates article_embeddings blobs from article_embedding_store
|
|
// into the postgres vector column (requires pgvector extension)
|
|
//
|
|
// Skips rows that already exist (safe to re-run).
|
|
|
|
const path = require('path');
|
|
const Database = require('better-sqlite3');
|
|
const sqliteVec = require('sqlite-vec');
|
|
const { Pool } = require('pg');
|
|
const config = require('../src/config');
|
|
|
|
|
|
const DRY_RUN = process.argv.includes('--dry-run');
|
|
|
|
if (!config.database || config.database.type !== 'postgres') {
|
|
console.error('[migrate] config.database.type must be "postgres" to run this script');
|
|
process.exit(1);
|
|
}
|
|
|
|
const sqlitePath = path.resolve(__dirname, '..', config.database.path || './archive.sqlite');
|
|
|
|
let sqlite;
|
|
try {
|
|
sqlite = new Database(sqlitePath, { readonly: true });
|
|
sqliteVec.load(sqlite);
|
|
} catch (e) {
|
|
console.error(`[migrate] could not open sqlite at ${sqlitePath}:`, e.message);
|
|
process.exit(1);
|
|
}
|
|
|
|
const pool = new Pool(config.database.postgres);
|
|
|
|
async function query(sql, params = []) {
|
|
const client = await pool.connect();
|
|
try {
|
|
return await client.query(sql, params);
|
|
} finally {
|
|
client.release();
|
|
}
|
|
}
|
|
|
|
// tables to migrate in order (respects FK: events before articles)
|
|
const TABLES = [
|
|
{
|
|
name: 'events',
|
|
columns: ['id', 'title', 'created_at'],
|
|
conflict: 'id',
|
|
},
|
|
{
|
|
name: 'articles',
|
|
columns: [
|
|
'id', 'title', 'description', 'content', 'image',
|
|
'content_status', 'content_error', 'content_attempted_at',
|
|
'content_attempt_count', 'content_retry_after',
|
|
'is_index_page', 'has_embedding', 'url', 'normalized_title',
|
|
'source', 'pub_date', 'pub_date_effective', 'language',
|
|
'event_id', 'ingested_at',
|
|
],
|
|
conflict: 'url',
|
|
},
|
|
{
|
|
name: 'article_embedding_store',
|
|
columns: ['article_id', 'model', 'embedding', 'embedded_at'],
|
|
conflict: '(article_id, model)',
|
|
blobColumns: ['embedding'],
|
|
},
|
|
{
|
|
name: 'article_embedding_meta',
|
|
columns: ['article_id', 'model', 'embedded_at'],
|
|
conflict: 'article_id',
|
|
},
|
|
{
|
|
name: 'query_embeddings',
|
|
columns: ['query', 'model', 'embedding', 'created_at'],
|
|
conflict: '(query, model)',
|
|
blobColumns: ['embedding'],
|
|
},
|
|
{
|
|
name: 'gdelt_backfill_windows',
|
|
columns: ['source_id', 'window_start', 'window_end', 'completed_at'],
|
|
conflict: '(source_id, window_start, window_end)',
|
|
},
|
|
{
|
|
name: 'crawler_page_classifications',
|
|
columns: ['url', 'site_name', 'classification', 'pattern', 'classified_at'],
|
|
conflict: 'url',
|
|
},
|
|
{
|
|
name: 'crawler_url_patterns',
|
|
columns: ['site_name', 'pattern', 'classification', 'hit_count', 'updated_at'],
|
|
conflict: '(site_name, pattern)',
|
|
},
|
|
{
|
|
name: 'crawler_site_rules',
|
|
columns: ['site_name', 'rule_type', 'rule_value', 'classification', 'hit_count', 'updated_at'],
|
|
conflict: '(site_name, rule_type, rule_value)',
|
|
},
|
|
{
|
|
name: 'domain_fetch_policy',
|
|
columns: [
|
|
'domain', 'policy', 'consecutive_plain_failures', 'consecutive_browser_failures',
|
|
'plain_success_count', 'browser_success_count', 'expires_at', 'updated_at',
|
|
],
|
|
conflict: 'domain',
|
|
},
|
|
];
|
|
|
|
async function migrateTable(table) {
|
|
let rows;
|
|
try {
|
|
rows = sqlite.prepare(`SELECT * FROM ${table.name}`).all();
|
|
} catch (e) {
|
|
console.log(`[migrate] skipping ${table.name}: ${e.message}`);
|
|
return 0;
|
|
}
|
|
|
|
if (rows.length === 0) {
|
|
console.log(`[migrate] ${table.name}: empty, skipping`);
|
|
return 0;
|
|
}
|
|
|
|
console.log(`[migrate] ${table.name}: ${rows.length} rows`);
|
|
if (DRY_RUN) return rows.length;
|
|
|
|
const cols = table.columns;
|
|
const placeholders = cols.map((_, i) => `$${i + 1}`).join(', ');
|
|
const colList = cols.join(', ');
|
|
|
|
const sql = `
|
|
INSERT INTO ${table.name} (${colList})
|
|
VALUES (${placeholders})
|
|
ON CONFLICT (${table.conflict}) DO NOTHING
|
|
`;
|
|
|
|
let inserted = 0;
|
|
const BATCH = 500;
|
|
|
|
for (let i = 0; i < rows.length; i += BATCH) {
|
|
const batch = rows.slice(i, i + BATCH);
|
|
const client = await pool.connect();
|
|
|
|
try {
|
|
await client.query('BEGIN');
|
|
|
|
for (const row of batch) {
|
|
const vals = cols.map(col => {
|
|
const v = row[col];
|
|
|
|
// sqlite blobs come back as Buffer — postgres bytea also accepts Buffer, good
|
|
if (table.blobColumns && table.blobColumns.includes(col) && v instanceof Buffer) {
|
|
return v;
|
|
}
|
|
|
|
return v ?? null;
|
|
});
|
|
|
|
const r = await client.query(sql, vals);
|
|
inserted += r.rowCount;
|
|
}
|
|
|
|
await client.query('COMMIT');
|
|
} catch (e) {
|
|
await client.query('ROLLBACK');
|
|
throw e;
|
|
} finally {
|
|
client.release();
|
|
}
|
|
|
|
process.stdout.write(`\r[migrate] ${table.name}: ${Math.min(i + BATCH, rows.length)}/${rows.length}`);
|
|
}
|
|
|
|
console.log(`\r[migrate] ${table.name}: inserted ${inserted} new rows`);
|
|
return inserted;
|
|
}
|
|
|
|
async function migrateVectors() {
|
|
// article_embeddings in sqlite is a vec0 virtual table —
|
|
// we can't SELECT * from it directly but we can read raw float32
|
|
// arrays from article_embedding_store and push them to postgres vector column.
|
|
//
|
|
// Requires pgvector extension: CREATE EXTENSION IF NOT EXISTS vector;
|
|
|
|
let hasPgvector = false;
|
|
try {
|
|
await query(`SELECT NULL::vector`);
|
|
hasPgvector = true;
|
|
} catch (_) {}
|
|
|
|
if (!hasPgvector) {
|
|
console.log('[migrate] pgvector not available — skipping article_embeddings vector migration');
|
|
console.log('[migrate] run: CREATE EXTENSION IF NOT EXISTS vector; then re-run this script');
|
|
return;
|
|
}
|
|
|
|
const rows = sqlite.prepare(`
|
|
SELECT article_id, embedding FROM article_embedding_store
|
|
`).all();
|
|
|
|
if (rows.length === 0) return;
|
|
|
|
console.log(`[migrate] article_embeddings: ${rows.length} vectors`);
|
|
if (DRY_RUN) return;
|
|
|
|
const client = await pool.connect();
|
|
let inserted = 0;
|
|
|
|
try {
|
|
await client.query('BEGIN');
|
|
|
|
for (const row of rows) {
|
|
// embedding is a Float32 buffer — convert to float array for pgvector
|
|
const buf = row.embedding;
|
|
const floats = [];
|
|
|
|
for (let i = 0; i < buf.length; i += 4) {
|
|
floats.push(buf.readFloatLE(i));
|
|
}
|
|
|
|
const vec = '[' + floats.join(',') + ']';
|
|
|
|
const r = await client.query(`
|
|
INSERT INTO article_embeddings (article_id, embedding)
|
|
VALUES ($1, $2::vector)
|
|
ON CONFLICT (article_id) DO NOTHING
|
|
`, [row.article_id, vec]);
|
|
|
|
inserted += r.rowCount;
|
|
}
|
|
|
|
await client.query('COMMIT');
|
|
} catch (e) {
|
|
await client.query('ROLLBACK');
|
|
console.error('[migrate] vector migration failed:', e.message);
|
|
} finally {
|
|
client.release();
|
|
}
|
|
|
|
console.log(`[migrate] article_embeddings: inserted ${inserted} vectors`);
|
|
}
|
|
|
|
async function resetSequences() {
|
|
// after bulk insert with explicit IDs, postgres sequences need to be reset
|
|
for (const t of [{ table: 'articles' }, { table: 'events' }]) {
|
|
await query(`
|
|
SELECT setval(
|
|
pg_get_serial_sequence('${t.table}', 'id'),
|
|
COALESCE((SELECT MAX(id) FROM ${t.table}), 1)
|
|
)
|
|
`);
|
|
}
|
|
console.log('[migrate] sequences reset');
|
|
}
|
|
|
|
async function main() {
|
|
console.log(`[migrate] sqlite → postgres (dry-run: ${DRY_RUN})`);
|
|
console.log(`[migrate] source: ${sqlitePath}`);
|
|
|
|
for (const table of TABLES) {
|
|
await migrateTable(table);
|
|
}
|
|
|
|
await migrateVectors();
|
|
|
|
if (!DRY_RUN) await resetSequences();
|
|
|
|
console.log('[migrate] done');
|
|
await pool.end();
|
|
sqlite.close();
|
|
}
|
|
|
|
main().catch(e => {
|
|
console.error('[migrate] fatal:', e);
|
|
process.exit(1);
|
|
});
|