an # Distributed Render Node Architecture ## Goal Allow lightweight render nodes (like personal PC with GPU) to handle video rendering for the main server, with automatic fallback and minimal resource usage on nodes. ## Design Philosophy - **Single Codebase**: Same code runs as main server or render node (determined by env vars) - **Proxy-Like**: Nodes act almost like a rendering proxy with minimal state - **Stateful but Lightweight**: Nodes use local SQLite for temporary config caching (1hr TTL) - **Bandwidth Aware**: Minimize data transfer, handle slow upload speeds - **No Replication**: Nodes only cache render configs, never replicate main server's session DB - **Low Memory Usage**: Configs stored in SQLite instead of memory (important for large base64 data) ## Environment Variables ### Main Server Mode (default) ```env # No special vars needed - runs as normal API server ``` ### Render Node Mode ```env MAIN_SERVER_URL=http://192.168.1.100:3000 # or ws://... for WebSocket URL NODE_KEY=shared-secret-key NODE_NAME=my-gaming-pc # optional, defaults to hostname HAS_GPU=true # optional, for prioritization ``` **Note:** No port configuration needed - node connects outbound to main server via WebSocket ## Architecture ``` ┌──────────────────────────────────────────────────────┐ │ Main Server │ │ - Full SQLite DB (sessions, snapshots) │ │ - Long-term cache (24hr TTL) │ │ - Node registry (in-memory) │ │ - Job dispatcher │ └──────────────────────────────────────────────────────┘ │ ┌────────────┴────────────┐ │ │ ┌───────▼─────────┐ ┌────────▼────────┐ │ Render Node 1 │ │ Render Node 2 │ │ (Your PC) │ │ (Server Local) │ │ │ │ │ │ - Local SQLite │ │ - Local SQLite │ │ (cache only) │ │ (cache only) │ │ - 1hr TTL │ │ - 1hr TTL │ │ - Auto-cleanup │ │ - Auto-cleanup │ │ - GPU: NVENC │ │ - CPU: libx264 │ │ - Priority: 1 │ │ - Priority: 99 │ └─────────────────┘ └─────────────────┘ ``` ## Data Flow ### Initial Render (No Cache) ``` 1. Client → Main Server: POST /v2/quote (with base64 image) 2. Main Server: Store session in DB 3. Main Server: Pick best available node 4. Main Server → Node: POST /internal/render Payload: { sessionId: "abc123", config: { ...full config with base64... } } 5. Node: Cache config temporarily (key: sessionId) 6. Node: Render video 7. Node → Main Server: Return MP4 buffer 8. Main Server: Cache MP4 (24hr) 9. Main Server: Update session renderStatus = 'completed' ``` ### Edit Same Session (Cache Hit) ``` 1. Client → Main Server: PATCH /v2/quote/abc123 (change text) 2. Main Server: Update session in DB 3. Main Server: Check which node has sessionId cached 4. Main Server → Same Node: POST /internal/render Payload: { sessionId: "abc123", config: { ...full config with base64... } // sent again but node may have it cached } 5. Node: Check cache for sessionId → HIT! Reuse base64 6. Node: Render with updated config 7. Node → Main Server: Return MP4 buffer 8. Main Server: Cache new MP4 ``` ### Node Unavailable (Fallback) ``` 1. Main Server → Node: POST /internal/render (5s timeout) 2. Node: No response / timeout 3. Main Server: Mark node as offline 4. Main Server: Render locally (fallback) ``` ## WebSocket Communication ### Why WebSocket? - **No Port Forwarding**: Node initiates outbound connection to main server - **Persistent Connection**: Single connection for all communication - **Bidirectional**: Server can push jobs, node can push results - **Built-in Heartbeat**: WebSocket connection state = node status (no explicit heartbeat needed) - **Auto Ping/Pong**: WebSocket library handles keep-alive automatically ### Node Connection (on startup) ```javascript let ws = null; function connectToMainServer() { ws = new WebSocket('ws://main-server:3000/nodes'); ws.on('open', () => { console.log('[Node] Connected to main server'); // Send registration ws.send(JSON.stringify({ type: 'register', key: NODE_KEY, name: NODE_NAME, hasGpu: HAS_GPU })); }); ws.on('close', () => { console.log('[Node] Disconnected from main server, reconnecting in 5s...'); setTimeout(connectToMainServer, 5000); }); ws.on('error', (error) => { console.error('[Node] WebSocket error:', error.message); }); } connectToMainServer(); ``` **No explicit heartbeat needed** - WebSocket connection state indicates node availability ### Server Side ```javascript // Main server accepts WebSocket connections const wss = new WebSocketServer({ noServer: true }); server.on('upgrade', (request, socket, head) => { if (request.url === '/nodes') { wss.handleUpgrade(request, socket, head, (ws) => { wss.emit('connection', ws, request); }); } }); ``` ### Node Registry (Main Server, in-memory) ```javascript const nodeRegistry = new Map(); // nodeId -> { ws, name, hasGpu, ... } wss.on('connection', (ws) => { let nodeId = null; ws.on('message', (data) => { const msg = JSON.parse(data); if (msg.type === 'register') { // Verify key if (msg.key !== NODE_KEY) { ws.close(1008, 'Invalid key'); return; } nodeId = generateId(); nodeRegistry.set(nodeId, { id: nodeId, ws: ws, name: msg.name, hasGpu: msg.hasGpu, currentJobs: 0, cachedSessions: new Set(), connectedAt: Date.now() }); ws.send(JSON.stringify({ type: 'registered', nodeId: nodeId })); console.log(`[NodeRegistry] ${msg.name} registered (GPU: ${msg.hasGpu})`); } // Handle other message types (progress, result_start, etc.) // ... }); // Automatic cleanup on disconnect ws.on('close', () => { if (nodeId && nodeRegistry.has(nodeId)) { const node = nodeRegistry.get(nodeId); console.log(`[NodeRegistry] ${node.name} disconnected`); // Cancel any pending jobs on this node if (node.currentJobs > 0) { console.log(`[NodeRegistry] ${node.name} had ${node.currentJobs} active jobs, will fallback`); } // Remove from registry nodeRegistry.delete(nodeId); } }); ws.on('error', (error) => { console.error(`[NodeRegistry] WebSocket error for node ${nodeId}:`, error.message); }); }); ``` **Connection-based availability:** - Node in registry = online and available - WebSocket closes = automatically removed from registry - No need for timeout checks or heartbeat monitoring ## Render Job Dispatch ### Node Selection Algorithm ```javascript function selectNode(sessionId) { const onlineNodes = Object.values(nodeRegistry) .filter(n => n.status === 'online') .sort((a, b) => { // 1. Prefer node with session cached const aHasCache = a.cachedSessions.has(sessionId); const bHasCache = b.cachedSessions.has(sessionId); if (aHasCache && !bHasCache) return -1; if (!aHasCache && bHasCache) return 1; // 2. Prefer GPU nodes if (a.hasGpu && !b.hasGpu) return -1; if (!a.hasGpu && b.hasGpu) return 1; // 3. Prefer nodes with fewer jobs return a.currentJobs - b.currentJobs; }); return onlineNodes[0] || null; // null = render locally } ``` ## WebSocket Message Protocol ### Server → Node: Render Job ```json { "type": "render", "jobId": "uuid-job-123", "sessionId": "abc123", "config": { "displayName": "User", "username": "@user", "text": "Hello", "avatarUrl": "data:image/png;base64,...", "imageUrl": "data:video/mp4;base64,...", "timestamp": 1234567890, "verified": true, "engagement": { ... } } } ``` ### Node → Server: Progress Update ```json { "type": "progress", "jobId": "uuid-job-123", "sessionId": "abc123", "stage": "rendering", // or "encoding" "progress": 45 } ``` ### Node → Server: Render Complete (Chunked Transfer) **Step 1: Send metadata** ```json { "type": "result_start", "jobId": "uuid-job-123", "sessionId": "abc123", "success": true, "format": "mp4", "totalSize": 15728640, "chunkSize": 1048576, "totalChunks": 15 } ``` **Step 2: Send chunks (1MB each)** ```json { "type": "result_chunk", "jobId": "uuid-job-123", "chunkIndex": 0, "data": "" } ``` ```json { "type": "result_chunk", "jobId": "uuid-job-123", "chunkIndex": 1, "data": "" } ``` ... (continue for all chunks) **Step 3: Send completion** ```json { "type": "result_end", "jobId": "uuid-job-123", "sessionId": "abc123" } ``` ### Node → Server: Render Failed ```json { "type": "result", "jobId": "uuid-job-123", "sessionId": "abc123", "success": false, "error": "FFmpeg encoding failed" } ``` ## Node Implementation ### WebSocket Client (on node) ```javascript const ws = new WebSocket(`ws://${MAIN_SERVER_URL}/nodes`); ws.on('message', async (data) => { const msg = JSON.parse(data); if (msg.type === 'render') { // Check cache first (SQLite lookup) let config = getCachedConfig(msg.sessionId); if (!config) { // Cache miss - use provided config and cache it config = msg.config; cacheConfig(msg.sessionId, config); console.log(`[Node] Cache miss for session ${msg.sessionId}, cached new config`); } else { console.log(`[Node] Cache hit for session ${msg.sessionId}, reusing cached config`); } try { // Render video/image const buffer = await renderVideo(config, (progress, stage) => { // Send progress updates during rendering ws.send(JSON.stringify({ type: 'progress', jobId: msg.jobId, sessionId: msg.sessionId, stage: stage, progress: progress })); }); // Send result in chunks const CHUNK_SIZE = 1048576; // 1MB chunks const totalChunks = Math.ceil(buffer.length / CHUNK_SIZE); // Step 1: Send metadata ws.send(JSON.stringify({ type: 'result_start', jobId: msg.jobId, sessionId: msg.sessionId, success: true, format: isVideo ? 'mp4' : 'png', totalSize: buffer.length, chunkSize: CHUNK_SIZE, totalChunks: totalChunks })); // Step 2: Send chunks for (let i = 0; i < totalChunks; i++) { const start = i * CHUNK_SIZE; const end = Math.min(start + CHUNK_SIZE, buffer.length); const chunk = buffer.slice(start, end); ws.send(JSON.stringify({ type: 'result_chunk', jobId: msg.jobId, chunkIndex: i, data: chunk.toString('base64') })); // Small delay to avoid overwhelming the connection await new Promise(resolve => setTimeout(resolve, 10)); } // Step 3: Send completion ws.send(JSON.stringify({ type: 'result_end', jobId: msg.jobId, sessionId: msg.sessionId })); } catch (error) { // Send error ws.send(JSON.stringify({ type: 'result', jobId: msg.jobId, sessionId: msg.sessionId, success: false, error: error.message })); } } }); ``` ### Server-Side: Reassembling Chunks ```javascript const pendingJobs = new Map(); // jobId -> { chunks, metadata } wss.on('connection', (ws) => { // ... registration code ... ws.on('message', (data) => { const msg = JSON.parse(data); if (msg.type === 'result_start') { // Initialize job pendingJobs.set(msg.jobId, { sessionId: msg.sessionId, format: msg.format, totalSize: msg.totalSize, totalChunks: msg.totalChunks, chunks: new Array(msg.totalChunks), receivedChunks: 0 }); } else if (msg.type === 'result_chunk') { const job = pendingJobs.get(msg.jobId); if (!job) return; // Store chunk job.chunks[msg.chunkIndex] = Buffer.from(msg.data, 'base64'); job.receivedChunks++; // Log progress const uploadProgress = Math.floor((job.receivedChunks / job.totalChunks) * 100); console.log(`[Job ${msg.jobId}] Upload progress: ${uploadProgress}%`); } else if (msg.type === 'result_end') { const job = pendingJobs.get(msg.jobId); if (!job) return; // Reassemble buffer const completeBuffer = Buffer.concat(job.chunks); // Cache the result cacheVideo(job.sessionId, completeBuffer); // Update session status updateSessionRenderStatus(job.sessionId, 'completed', null, null, 100, 'completed'); // Cleanup pendingJobs.delete(msg.jobId); console.log(`[Job ${msg.jobId}] Render complete, cached ${completeBuffer.length} bytes`); } else if (msg.type === 'progress') { // Forward progress to SSE clients updateSessionRenderStatus(msg.sessionId, 'rendering', null, null, msg.progress, msg.stage); } }); }); ``` ### Node Cache (SQLite) ```javascript // Lightweight SQLite DB on each node (data/node_cache.db) const Database = require('better-sqlite3'); const db = new Database('data/node_cache.db'); // Create cache table db.exec(` CREATE TABLE IF NOT EXISTS config_cache ( session_id TEXT PRIMARY KEY, config_json TEXT NOT NULL, cached_at INTEGER NOT NULL ) `); // Cache config function cacheConfig(sessionId, config) { const stmt = db.prepare(` INSERT OR REPLACE INTO config_cache (session_id, config_json, cached_at) VALUES (?, ?, ?) `); stmt.run(sessionId, JSON.stringify(config), Date.now()); } // Get cached config function getCachedConfig(sessionId) { const stmt = db.prepare(` SELECT config_json FROM config_cache WHERE session_id = ? AND cached_at > ? `); const oneHourAgo = Date.now() - 3600000; const row = stmt.get(sessionId, oneHourAgo); return row ? JSON.parse(row.config_json) : null; } // Aggressive cleanup every 10 minutes setInterval(() => { const oneHourAgo = Date.now() - 3600000; const stmt = db.prepare('DELETE FROM config_cache WHERE cached_at < ?'); const result = stmt.run(oneHourAgo); if (result.changes > 0) { console.log(`[Node Cache] Cleaned up ${result.changes} expired config(s)`); } }, 600000); // 10 minutes // Cleanup on exit process.on('exit', () => { db.close(); }); ``` **Benefits:** - Low memory usage (configs stored on disk) - Fast lookups (indexed by session_id) - Automatic persistence (survives restarts if < 1hr old) - Small disk footprint (configs expire after 1 hour) ## Bandwidth Optimizations ### 1. Chunked Transfer - Videos sent in 1MB chunks over WebSocket - Prevents memory issues with large files - Allows progress tracking during upload - Small delay between chunks prevents connection overwhelming - Server can timeout slow uploads and fallback to local ### 2. Cache Reuse - Node keeps config cached for 1 hour - If main server sends same sessionId, node reuses cached base64 - Only updated fields need to be applied - Massive bandwidth savings on session edits ### 3. Timeout & Fallback - Monitor upload progress per chunk - If upload stalls for >30s, cancel and fallback - Track node performance metrics (avg upload speed) - Prefer faster nodes for future jobs ### 4. Compression (Optional) - Could gzip chunks if bandwidth is critical - Probably not needed - MP4 is already compressed - Base64 encoding adds ~33% overhead (unavoidable for JSON) ## Security ### Authentication - Shared `NODE_KEY` in environment - All internal endpoints check `X-Node-Key` header - Reject requests without valid key ### Rate Limiting (Optional) - Limit render jobs per node to prevent abuse - Max 5 concurrent jobs per node ## Monitoring & Debugging ### Main Server Logs ``` [NodeRegistry] my-gaming-pc registered (GPU: true) [NodeRegistry] my-gaming-pc heartbeat received [Dispatcher] Assigned session abc123 to my-gaming-pc (cache hit) [Dispatcher] Node my-gaming-pc offline, rendering locally ``` ### Node Logs ``` [Node] Registered with main server: http://192.168.1.100:3000 [Node] Heartbeat sent [Node] Render request received: session abc123 (cache miss) [Node] Render completed in 3.2s ``` ## Error Handling ### Node Failures 1. **Node offline**: Fallback to local render immediately 2. **Node timeout**: Wait 5s max, then fallback 3. **Render failed on node**: Node returns 500, main server fallbacks 4. **Upload timeout**: If MP4 upload takes >30s, cancel and fallback ### Node Recovery - Node auto-reconnects 5s after WebSocket disconnection - Node clears cache on restart - Main server automatically removes node from registry on WebSocket close - No timeout monitoring needed - connection state is the source of truth ## File Changes ### New Files - `renderNode.js` - Node WebSocket client, cache management, render worker - `nodeRegistry.js` - Main server's WebSocket server, node management, job dispatcher - `DISTRIBUTED_RENDERING_PLAN.md` - This file ### Modified Files - `api.js` - Initialize WebSocket server and node mode detection - `videoRenderer.js` - Check for available nodes before local render, dispatch jobs via WebSocket - `v2Routes.js` - Forward progress updates from nodes to SSE clients ### Dependencies to Add ```json { "ws": "^8.14.0" } ``` ## Deployment ### Main Server ```bash # .env # No special config needed npm start ``` ### Render Node (Your PC) ```bash # .env MAIN_SERVER_URL=http://your-server-ip:3000 NODE_KEY=your-shared-secret HAS_GPU=true npm start ``` **Node Resource Usage:** - **Memory**: Low (~50-100MB idle, spikes during render) - **Disk**: `data/node_cache.db` (typically < 10MB, auto-cleaned every 10 min) - **CPU/GPU**: Only used during active renders - **Network**: Minimal (WebSocket connection + render jobs) ### Docker Compose Example ```yaml version: '3.8' services: main-server: build: . ports: - "3000:3000" environment: - NODE_ENV=production render-node-local: build: . environment: - MAIN_SERVER_URL=http://main-server:3000 - NODE_KEY=shared-secret - NODE_NAME=local-fallback - NODE_PORT=3001 ``` ## Testing Plan ### Unit Tests - [ ] Node registration with valid/invalid key - [ ] Heartbeat updates lastHeartbeat timestamp - [ ] selectNode() prioritizes GPU nodes - [ ] selectNode() prioritizes cached sessions - [ ] Cache cleanup removes old entries ### Integration Tests - [ ] Node registers and receives heartbeat acknowledgment - [ ] Main server dispatches job to online node - [ ] Main server falls back when node is offline - [ ] Session edit reuses cached config on same node - [ ] Node cache expires after 1 hour - [ ] Multiple nodes balance load ### Manual Tests 1. Start main server 2. Start node on PC with GPU 3. Create session with video → verify node renders 4. Stop node → verify fallback to local 5. Edit session → verify same node reused (if online) 6. Wait 1 hour → verify node cache cleared ## Future Enhancements (Out of Scope) - Load balancing across multiple nodes - Node performance metrics (render time tracking) - Node priority override (manual priority setting) - Webhook notifications when node comes online/offline - Web UI for node management - Automatic node discovery (mDNS/Bonjour)