- Implement Express routes for creating, updating, retrieving, and deleting quote sessions - Add video detection and rendering pipeline using Playwright and FFmpeg - Add caching utilities for images and videos - Provide helpers for formatting counts, timestamps, and normalizing usernames - Add snapshot creation and retrieval endpoints - Implement SSE endpoint for render progress updates - Add Flutter widget for rendering tweet templates with image/video support
20 KiB
20 KiB
an # Distributed Render Node Architecture
Goal
Allow lightweight render nodes (like personal PC with GPU) to handle video rendering for the main server, with automatic fallback and minimal resource usage on nodes.
Design Philosophy
- Single Codebase: Same code runs as main server or render node (determined by env vars)
- Proxy-Like: Nodes act almost like a rendering proxy with minimal state
- Stateful but Lightweight: Nodes use local SQLite for temporary config caching (1hr TTL)
- Bandwidth Aware: Minimize data transfer, handle slow upload speeds
- No Replication: Nodes only cache render configs, never replicate main server's session DB
- Low Memory Usage: Configs stored in SQLite instead of memory (important for large base64 data)
Environment Variables
Main Server Mode (default)
# No special vars needed - runs as normal API server
Render Node Mode
MAIN_SERVER_URL=http://192.168.1.100:3000 # or ws://... for WebSocket URL
NODE_KEY=shared-secret-key
NODE_NAME=my-gaming-pc # optional, defaults to hostname
HAS_GPU=true # optional, for prioritization
Note: No port configuration needed - node connects outbound to main server via WebSocket
Architecture
┌──────────────────────────────────────────────────────┐
│ Main Server │
│ - Full SQLite DB (sessions, snapshots) │
│ - Long-term cache (24hr TTL) │
│ - Node registry (in-memory) │
│ - Job dispatcher │
└──────────────────────────────────────────────────────┘
│
┌────────────┴────────────┐
│ │
┌───────▼─────────┐ ┌────────▼────────┐
│ Render Node 1 │ │ Render Node 2 │
│ (Your PC) │ │ (Server Local) │
│ │ │ │
│ - Local SQLite │ │ - Local SQLite │
│ (cache only) │ │ (cache only) │
│ - 1hr TTL │ │ - 1hr TTL │
│ - Auto-cleanup │ │ - Auto-cleanup │
│ - GPU: NVENC │ │ - CPU: libx264 │
│ - Priority: 1 │ │ - Priority: 99 │
└─────────────────┘ └─────────────────┘
Data Flow
Initial Render (No Cache)
1. Client → Main Server: POST /v2/quote (with base64 image)
2. Main Server: Store session in DB
3. Main Server: Pick best available node
4. Main Server → Node: POST /internal/render
Payload: {
sessionId: "abc123",
config: { ...full config with base64... }
}
5. Node: Cache config temporarily (key: sessionId)
6. Node: Render video
7. Node → Main Server: Return MP4 buffer
8. Main Server: Cache MP4 (24hr)
9. Main Server: Update session renderStatus = 'completed'
Edit Same Session (Cache Hit)
1. Client → Main Server: PATCH /v2/quote/abc123 (change text)
2. Main Server: Update session in DB
3. Main Server: Check which node has sessionId cached
4. Main Server → Same Node: POST /internal/render
Payload: {
sessionId: "abc123",
config: { ...full config with base64... } // sent again but node may have it cached
}
5. Node: Check cache for sessionId → HIT! Reuse base64
6. Node: Render with updated config
7. Node → Main Server: Return MP4 buffer
8. Main Server: Cache new MP4
Node Unavailable (Fallback)
1. Main Server → Node: POST /internal/render (5s timeout)
2. Node: No response / timeout
3. Main Server: Mark node as offline
4. Main Server: Render locally (fallback)
WebSocket Communication
Why WebSocket?
- No Port Forwarding: Node initiates outbound connection to main server
- Persistent Connection: Single connection for all communication
- Bidirectional: Server can push jobs, node can push results
- Built-in Heartbeat: WebSocket connection state = node status (no explicit heartbeat needed)
- Auto Ping/Pong: WebSocket library handles keep-alive automatically
Node Connection (on startup)
let ws = null;
function connectToMainServer() {
ws = new WebSocket('ws://main-server:3000/nodes');
ws.on('open', () => {
console.log('[Node] Connected to main server');
// Send registration
ws.send(JSON.stringify({
type: 'register',
key: NODE_KEY,
name: NODE_NAME,
hasGpu: HAS_GPU
}));
});
ws.on('close', () => {
console.log('[Node] Disconnected from main server, reconnecting in 5s...');
setTimeout(connectToMainServer, 5000);
});
ws.on('error', (error) => {
console.error('[Node] WebSocket error:', error.message);
});
}
connectToMainServer();
No explicit heartbeat needed - WebSocket connection state indicates node availability
Server Side
// Main server accepts WebSocket connections
const wss = new WebSocketServer({ noServer: true });
server.on('upgrade', (request, socket, head) => {
if (request.url === '/nodes') {
wss.handleUpgrade(request, socket, head, (ws) => {
wss.emit('connection', ws, request);
});
}
});
Node Registry (Main Server, in-memory)
const nodeRegistry = new Map(); // nodeId -> { ws, name, hasGpu, ... }
wss.on('connection', (ws) => {
let nodeId = null;
ws.on('message', (data) => {
const msg = JSON.parse(data);
if (msg.type === 'register') {
// Verify key
if (msg.key !== NODE_KEY) {
ws.close(1008, 'Invalid key');
return;
}
nodeId = generateId();
nodeRegistry.set(nodeId, {
id: nodeId,
ws: ws,
name: msg.name,
hasGpu: msg.hasGpu,
currentJobs: 0,
cachedSessions: new Set(),
connectedAt: Date.now()
});
ws.send(JSON.stringify({
type: 'registered',
nodeId: nodeId
}));
console.log(`[NodeRegistry] ${msg.name} registered (GPU: ${msg.hasGpu})`);
}
// Handle other message types (progress, result_start, etc.)
// ...
});
// Automatic cleanup on disconnect
ws.on('close', () => {
if (nodeId && nodeRegistry.has(nodeId)) {
const node = nodeRegistry.get(nodeId);
console.log(`[NodeRegistry] ${node.name} disconnected`);
// Cancel any pending jobs on this node
if (node.currentJobs > 0) {
console.log(`[NodeRegistry] ${node.name} had ${node.currentJobs} active jobs, will fallback`);
}
// Remove from registry
nodeRegistry.delete(nodeId);
}
});
ws.on('error', (error) => {
console.error(`[NodeRegistry] WebSocket error for node ${nodeId}:`, error.message);
});
});
Connection-based availability:
- Node in registry = online and available
- WebSocket closes = automatically removed from registry
- No need for timeout checks or heartbeat monitoring
Render Job Dispatch
Node Selection Algorithm
function selectNode(sessionId) {
const onlineNodes = Object.values(nodeRegistry)
.filter(n => n.status === 'online')
.sort((a, b) => {
// 1. Prefer node with session cached
const aHasCache = a.cachedSessions.has(sessionId);
const bHasCache = b.cachedSessions.has(sessionId);
if (aHasCache && !bHasCache) return -1;
if (!aHasCache && bHasCache) return 1;
// 2. Prefer GPU nodes
if (a.hasGpu && !b.hasGpu) return -1;
if (!a.hasGpu && b.hasGpu) return 1;
// 3. Prefer nodes with fewer jobs
return a.currentJobs - b.currentJobs;
});
return onlineNodes[0] || null; // null = render locally
}
WebSocket Message Protocol
Server → Node: Render Job
{
"type": "render",
"jobId": "uuid-job-123",
"sessionId": "abc123",
"config": {
"displayName": "User",
"username": "@user",
"text": "Hello",
"avatarUrl": "data:image/png;base64,...",
"imageUrl": "data:video/mp4;base64,...",
"timestamp": 1234567890,
"verified": true,
"engagement": { ... }
}
}
Node → Server: Progress Update
{
"type": "progress",
"jobId": "uuid-job-123",
"sessionId": "abc123",
"stage": "rendering", // or "encoding"
"progress": 45
}
Node → Server: Render Complete (Chunked Transfer)
Step 1: Send metadata
{
"type": "result_start",
"jobId": "uuid-job-123",
"sessionId": "abc123",
"success": true,
"format": "mp4",
"totalSize": 15728640,
"chunkSize": 1048576,
"totalChunks": 15
}
Step 2: Send chunks (1MB each)
{
"type": "result_chunk",
"jobId": "uuid-job-123",
"chunkIndex": 0,
"data": "<base64-encoded chunk>"
}
{
"type": "result_chunk",
"jobId": "uuid-job-123",
"chunkIndex": 1,
"data": "<base64-encoded chunk>"
}
... (continue for all chunks)
Step 3: Send completion
{
"type": "result_end",
"jobId": "uuid-job-123",
"sessionId": "abc123"
}
Node → Server: Render Failed
{
"type": "result",
"jobId": "uuid-job-123",
"sessionId": "abc123",
"success": false,
"error": "FFmpeg encoding failed"
}
Node Implementation
WebSocket Client (on node)
const ws = new WebSocket(`ws://${MAIN_SERVER_URL}/nodes`);
ws.on('message', async (data) => {
const msg = JSON.parse(data);
if (msg.type === 'render') {
// Check cache first (SQLite lookup)
let config = getCachedConfig(msg.sessionId);
if (!config) {
// Cache miss - use provided config and cache it
config = msg.config;
cacheConfig(msg.sessionId, config);
console.log(`[Node] Cache miss for session ${msg.sessionId}, cached new config`);
} else {
console.log(`[Node] Cache hit for session ${msg.sessionId}, reusing cached config`);
}
try {
// Render video/image
const buffer = await renderVideo(config, (progress, stage) => {
// Send progress updates during rendering
ws.send(JSON.stringify({
type: 'progress',
jobId: msg.jobId,
sessionId: msg.sessionId,
stage: stage,
progress: progress
}));
});
// Send result in chunks
const CHUNK_SIZE = 1048576; // 1MB chunks
const totalChunks = Math.ceil(buffer.length / CHUNK_SIZE);
// Step 1: Send metadata
ws.send(JSON.stringify({
type: 'result_start',
jobId: msg.jobId,
sessionId: msg.sessionId,
success: true,
format: isVideo ? 'mp4' : 'png',
totalSize: buffer.length,
chunkSize: CHUNK_SIZE,
totalChunks: totalChunks
}));
// Step 2: Send chunks
for (let i = 0; i < totalChunks; i++) {
const start = i * CHUNK_SIZE;
const end = Math.min(start + CHUNK_SIZE, buffer.length);
const chunk = buffer.slice(start, end);
ws.send(JSON.stringify({
type: 'result_chunk',
jobId: msg.jobId,
chunkIndex: i,
data: chunk.toString('base64')
}));
// Small delay to avoid overwhelming the connection
await new Promise(resolve => setTimeout(resolve, 10));
}
// Step 3: Send completion
ws.send(JSON.stringify({
type: 'result_end',
jobId: msg.jobId,
sessionId: msg.sessionId
}));
} catch (error) {
// Send error
ws.send(JSON.stringify({
type: 'result',
jobId: msg.jobId,
sessionId: msg.sessionId,
success: false,
error: error.message
}));
}
}
});
Server-Side: Reassembling Chunks
const pendingJobs = new Map(); // jobId -> { chunks, metadata }
wss.on('connection', (ws) => {
// ... registration code ...
ws.on('message', (data) => {
const msg = JSON.parse(data);
if (msg.type === 'result_start') {
// Initialize job
pendingJobs.set(msg.jobId, {
sessionId: msg.sessionId,
format: msg.format,
totalSize: msg.totalSize,
totalChunks: msg.totalChunks,
chunks: new Array(msg.totalChunks),
receivedChunks: 0
});
}
else if (msg.type === 'result_chunk') {
const job = pendingJobs.get(msg.jobId);
if (!job) return;
// Store chunk
job.chunks[msg.chunkIndex] = Buffer.from(msg.data, 'base64');
job.receivedChunks++;
// Log progress
const uploadProgress = Math.floor((job.receivedChunks / job.totalChunks) * 100);
console.log(`[Job ${msg.jobId}] Upload progress: ${uploadProgress}%`);
}
else if (msg.type === 'result_end') {
const job = pendingJobs.get(msg.jobId);
if (!job) return;
// Reassemble buffer
const completeBuffer = Buffer.concat(job.chunks);
// Cache the result
cacheVideo(job.sessionId, completeBuffer);
// Update session status
updateSessionRenderStatus(job.sessionId, 'completed', null, null, 100, 'completed');
// Cleanup
pendingJobs.delete(msg.jobId);
console.log(`[Job ${msg.jobId}] Render complete, cached ${completeBuffer.length} bytes`);
}
else if (msg.type === 'progress') {
// Forward progress to SSE clients
updateSessionRenderStatus(msg.sessionId, 'rendering', null, null, msg.progress, msg.stage);
}
});
});
Node Cache (SQLite)
// Lightweight SQLite DB on each node (data/node_cache.db)
const Database = require('better-sqlite3');
const db = new Database('data/node_cache.db');
// Create cache table
db.exec(`
CREATE TABLE IF NOT EXISTS config_cache (
session_id TEXT PRIMARY KEY,
config_json TEXT NOT NULL,
cached_at INTEGER NOT NULL
)
`);
// Cache config
function cacheConfig(sessionId, config) {
const stmt = db.prepare(`
INSERT OR REPLACE INTO config_cache (session_id, config_json, cached_at)
VALUES (?, ?, ?)
`);
stmt.run(sessionId, JSON.stringify(config), Date.now());
}
// Get cached config
function getCachedConfig(sessionId) {
const stmt = db.prepare(`
SELECT config_json FROM config_cache
WHERE session_id = ? AND cached_at > ?
`);
const oneHourAgo = Date.now() - 3600000;
const row = stmt.get(sessionId, oneHourAgo);
return row ? JSON.parse(row.config_json) : null;
}
// Aggressive cleanup every 10 minutes
setInterval(() => {
const oneHourAgo = Date.now() - 3600000;
const stmt = db.prepare('DELETE FROM config_cache WHERE cached_at < ?');
const result = stmt.run(oneHourAgo);
if (result.changes > 0) {
console.log(`[Node Cache] Cleaned up ${result.changes} expired config(s)`);
}
}, 600000); // 10 minutes
// Cleanup on exit
process.on('exit', () => {
db.close();
});
Benefits:
- Low memory usage (configs stored on disk)
- Fast lookups (indexed by session_id)
- Automatic persistence (survives restarts if < 1hr old)
- Small disk footprint (configs expire after 1 hour)
Bandwidth Optimizations
1. Chunked Transfer
- Videos sent in 1MB chunks over WebSocket
- Prevents memory issues with large files
- Allows progress tracking during upload
- Small delay between chunks prevents connection overwhelming
- Server can timeout slow uploads and fallback to local
2. Cache Reuse
- Node keeps config cached for 1 hour
- If main server sends same sessionId, node reuses cached base64
- Only updated fields need to be applied
- Massive bandwidth savings on session edits
3. Timeout & Fallback
- Monitor upload progress per chunk
- If upload stalls for >30s, cancel and fallback
- Track node performance metrics (avg upload speed)
- Prefer faster nodes for future jobs
4. Compression (Optional)
- Could gzip chunks if bandwidth is critical
- Probably not needed - MP4 is already compressed
- Base64 encoding adds ~33% overhead (unavoidable for JSON)
Security
Authentication
- Shared
NODE_KEYin environment - All internal endpoints check
X-Node-Keyheader - Reject requests without valid key
Rate Limiting (Optional)
- Limit render jobs per node to prevent abuse
- Max 5 concurrent jobs per node
Monitoring & Debugging
Main Server Logs
[NodeRegistry] my-gaming-pc registered (GPU: true)
[NodeRegistry] my-gaming-pc heartbeat received
[Dispatcher] Assigned session abc123 to my-gaming-pc (cache hit)
[Dispatcher] Node my-gaming-pc offline, rendering locally
Node Logs
[Node] Registered with main server: http://192.168.1.100:3000
[Node] Heartbeat sent
[Node] Render request received: session abc123 (cache miss)
[Node] Render completed in 3.2s
Error Handling
Node Failures
- Node offline: Fallback to local render immediately
- Node timeout: Wait 5s max, then fallback
- Render failed on node: Node returns 500, main server fallbacks
- Upload timeout: If MP4 upload takes >30s, cancel and fallback
Node Recovery
- Node auto-reconnects 5s after WebSocket disconnection
- Node clears cache on restart
- Main server automatically removes node from registry on WebSocket close
- No timeout monitoring needed - connection state is the source of truth
File Changes
New Files
renderNode.js- Node WebSocket client, cache management, render workernodeRegistry.js- Main server's WebSocket server, node management, job dispatcherDISTRIBUTED_RENDERING_PLAN.md- This file
Modified Files
api.js- Initialize WebSocket server and node mode detectionvideoRenderer.js- Check for available nodes before local render, dispatch jobs via WebSocketv2Routes.js- Forward progress updates from nodes to SSE clients
Dependencies to Add
{
"ws": "^8.14.0"
}
Deployment
Main Server
# .env
# No special config needed
npm start
Render Node (Your PC)
# .env
MAIN_SERVER_URL=http://your-server-ip:3000
NODE_KEY=your-shared-secret
HAS_GPU=true
npm start
Node Resource Usage:
- Memory: Low (~50-100MB idle, spikes during render)
- Disk:
data/node_cache.db(typically < 10MB, auto-cleaned every 10 min) - CPU/GPU: Only used during active renders
- Network: Minimal (WebSocket connection + render jobs)
Docker Compose Example
version: '3.8'
services:
main-server:
build: .
ports:
- "3000:3000"
environment:
- NODE_ENV=production
render-node-local:
build: .
environment:
- MAIN_SERVER_URL=http://main-server:3000
- NODE_KEY=shared-secret
- NODE_NAME=local-fallback
- NODE_PORT=3001
Testing Plan
Unit Tests
- Node registration with valid/invalid key
- Heartbeat updates lastHeartbeat timestamp
- selectNode() prioritizes GPU nodes
- selectNode() prioritizes cached sessions
- Cache cleanup removes old entries
Integration Tests
- Node registers and receives heartbeat acknowledgment
- Main server dispatches job to online node
- Main server falls back when node is offline
- Session edit reuses cached config on same node
- Node cache expires after 1 hour
- Multiple nodes balance load
Manual Tests
- Start main server
- Start node on PC with GPU
- Create session with video → verify node renders
- Stop node → verify fallback to local
- Edit session → verify same node reused (if online)
- Wait 1 hour → verify node cache cleared
Future Enhancements (Out of Scope)
- Load balancing across multiple nodes
- Node performance metrics (render time tracking)
- Node priority override (manual priority setting)
- Webhook notifications when node comes online/offline
- Web UI for node management
- Automatic node discovery (mDNS/Bonjour)