Files
Quote-Generator/docs/DISTRIBUTED_RENDERING_PLAN.md
ImBenji 537fc4f750 Add v2 quote API with video support and Flutter tweet template widget
- Implement Express routes for creating, updating, retrieving, and deleting quote sessions
- Add video detection and rendering pipeline using Playwright and FFmpeg
- Add caching utilities for images and videos
- Provide helpers for formatting counts, timestamps, and normalizing usernames
- Add snapshot creation and retrieval endpoints
- Implement SSE endpoint for render progress updates
- Add Flutter widget for rendering tweet templates with image/video support
2026-02-13 21:56:26 +00:00

709 lines
20 KiB
Markdown

an # Distributed Render Node Architecture
## Goal
Allow lightweight render nodes (like personal PC with GPU) to handle video rendering for the main server, with automatic fallback and minimal resource usage on nodes.
## Design Philosophy
- **Single Codebase**: Same code runs as main server or render node (determined by env vars)
- **Proxy-Like**: Nodes act almost like a rendering proxy with minimal state
- **Stateful but Lightweight**: Nodes use local SQLite for temporary config caching (1hr TTL)
- **Bandwidth Aware**: Minimize data transfer, handle slow upload speeds
- **No Replication**: Nodes only cache render configs, never replicate main server's session DB
- **Low Memory Usage**: Configs stored in SQLite instead of memory (important for large base64 data)
## Environment Variables
### Main Server Mode (default)
```env
# No special vars needed - runs as normal API server
```
### Render Node Mode
```env
MAIN_SERVER_URL=http://192.168.1.100:3000 # or ws://... for WebSocket URL
NODE_KEY=shared-secret-key
NODE_NAME=my-gaming-pc # optional, defaults to hostname
HAS_GPU=true # optional, for prioritization
```
**Note:** No port configuration needed - node connects outbound to main server via WebSocket
## Architecture
```
┌──────────────────────────────────────────────────────┐
│ Main Server │
│ - Full SQLite DB (sessions, snapshots) │
│ - Long-term cache (24hr TTL) │
│ - Node registry (in-memory) │
│ - Job dispatcher │
└──────────────────────────────────────────────────────┘
┌────────────┴────────────┐
│ │
┌───────▼─────────┐ ┌────────▼────────┐
│ Render Node 1 │ │ Render Node 2 │
│ (Your PC) │ │ (Server Local) │
│ │ │ │
│ - Local SQLite │ │ - Local SQLite │
│ (cache only) │ │ (cache only) │
│ - 1hr TTL │ │ - 1hr TTL │
│ - Auto-cleanup │ │ - Auto-cleanup │
│ - GPU: NVENC │ │ - CPU: libx264 │
│ - Priority: 1 │ │ - Priority: 99 │
└─────────────────┘ └─────────────────┘
```
## Data Flow
### Initial Render (No Cache)
```
1. Client → Main Server: POST /v2/quote (with base64 image)
2. Main Server: Store session in DB
3. Main Server: Pick best available node
4. Main Server → Node: POST /internal/render
Payload: {
sessionId: "abc123",
config: { ...full config with base64... }
}
5. Node: Cache config temporarily (key: sessionId)
6. Node: Render video
7. Node → Main Server: Return MP4 buffer
8. Main Server: Cache MP4 (24hr)
9. Main Server: Update session renderStatus = 'completed'
```
### Edit Same Session (Cache Hit)
```
1. Client → Main Server: PATCH /v2/quote/abc123 (change text)
2. Main Server: Update session in DB
3. Main Server: Check which node has sessionId cached
4. Main Server → Same Node: POST /internal/render
Payload: {
sessionId: "abc123",
config: { ...full config with base64... } // sent again but node may have it cached
}
5. Node: Check cache for sessionId → HIT! Reuse base64
6. Node: Render with updated config
7. Node → Main Server: Return MP4 buffer
8. Main Server: Cache new MP4
```
### Node Unavailable (Fallback)
```
1. Main Server → Node: POST /internal/render (5s timeout)
2. Node: No response / timeout
3. Main Server: Mark node as offline
4. Main Server: Render locally (fallback)
```
## WebSocket Communication
### Why WebSocket?
- **No Port Forwarding**: Node initiates outbound connection to main server
- **Persistent Connection**: Single connection for all communication
- **Bidirectional**: Server can push jobs, node can push results
- **Built-in Heartbeat**: WebSocket connection state = node status (no explicit heartbeat needed)
- **Auto Ping/Pong**: WebSocket library handles keep-alive automatically
### Node Connection (on startup)
```javascript
let ws = null;
function connectToMainServer() {
ws = new WebSocket('ws://main-server:3000/nodes');
ws.on('open', () => {
console.log('[Node] Connected to main server');
// Send registration
ws.send(JSON.stringify({
type: 'register',
key: NODE_KEY,
name: NODE_NAME,
hasGpu: HAS_GPU
}));
});
ws.on('close', () => {
console.log('[Node] Disconnected from main server, reconnecting in 5s...');
setTimeout(connectToMainServer, 5000);
});
ws.on('error', (error) => {
console.error('[Node] WebSocket error:', error.message);
});
}
connectToMainServer();
```
**No explicit heartbeat needed** - WebSocket connection state indicates node availability
### Server Side
```javascript
// Main server accepts WebSocket connections
const wss = new WebSocketServer({ noServer: true });
server.on('upgrade', (request, socket, head) => {
if (request.url === '/nodes') {
wss.handleUpgrade(request, socket, head, (ws) => {
wss.emit('connection', ws, request);
});
}
});
```
### Node Registry (Main Server, in-memory)
```javascript
const nodeRegistry = new Map(); // nodeId -> { ws, name, hasGpu, ... }
wss.on('connection', (ws) => {
let nodeId = null;
ws.on('message', (data) => {
const msg = JSON.parse(data);
if (msg.type === 'register') {
// Verify key
if (msg.key !== NODE_KEY) {
ws.close(1008, 'Invalid key');
return;
}
nodeId = generateId();
nodeRegistry.set(nodeId, {
id: nodeId,
ws: ws,
name: msg.name,
hasGpu: msg.hasGpu,
currentJobs: 0,
cachedSessions: new Set(),
connectedAt: Date.now()
});
ws.send(JSON.stringify({
type: 'registered',
nodeId: nodeId
}));
console.log(`[NodeRegistry] ${msg.name} registered (GPU: ${msg.hasGpu})`);
}
// Handle other message types (progress, result_start, etc.)
// ...
});
// Automatic cleanup on disconnect
ws.on('close', () => {
if (nodeId && nodeRegistry.has(nodeId)) {
const node = nodeRegistry.get(nodeId);
console.log(`[NodeRegistry] ${node.name} disconnected`);
// Cancel any pending jobs on this node
if (node.currentJobs > 0) {
console.log(`[NodeRegistry] ${node.name} had ${node.currentJobs} active jobs, will fallback`);
}
// Remove from registry
nodeRegistry.delete(nodeId);
}
});
ws.on('error', (error) => {
console.error(`[NodeRegistry] WebSocket error for node ${nodeId}:`, error.message);
});
});
```
**Connection-based availability:**
- Node in registry = online and available
- WebSocket closes = automatically removed from registry
- No need for timeout checks or heartbeat monitoring
## Render Job Dispatch
### Node Selection Algorithm
```javascript
function selectNode(sessionId) {
const onlineNodes = Object.values(nodeRegistry)
.filter(n => n.status === 'online')
.sort((a, b) => {
// 1. Prefer node with session cached
const aHasCache = a.cachedSessions.has(sessionId);
const bHasCache = b.cachedSessions.has(sessionId);
if (aHasCache && !bHasCache) return -1;
if (!aHasCache && bHasCache) return 1;
// 2. Prefer GPU nodes
if (a.hasGpu && !b.hasGpu) return -1;
if (!a.hasGpu && b.hasGpu) return 1;
// 3. Prefer nodes with fewer jobs
return a.currentJobs - b.currentJobs;
});
return onlineNodes[0] || null; // null = render locally
}
```
## WebSocket Message Protocol
### Server → Node: Render Job
```json
{
"type": "render",
"jobId": "uuid-job-123",
"sessionId": "abc123",
"config": {
"displayName": "User",
"username": "@user",
"text": "Hello",
"avatarUrl": "data:image/png;base64,...",
"imageUrl": "data:video/mp4;base64,...",
"timestamp": 1234567890,
"verified": true,
"engagement": { ... }
}
}
```
### Node → Server: Progress Update
```json
{
"type": "progress",
"jobId": "uuid-job-123",
"sessionId": "abc123",
"stage": "rendering", // or "encoding"
"progress": 45
}
```
### Node → Server: Render Complete (Chunked Transfer)
**Step 1: Send metadata**
```json
{
"type": "result_start",
"jobId": "uuid-job-123",
"sessionId": "abc123",
"success": true,
"format": "mp4",
"totalSize": 15728640,
"chunkSize": 1048576,
"totalChunks": 15
}
```
**Step 2: Send chunks (1MB each)**
```json
{
"type": "result_chunk",
"jobId": "uuid-job-123",
"chunkIndex": 0,
"data": "<base64-encoded chunk>"
}
```
```json
{
"type": "result_chunk",
"jobId": "uuid-job-123",
"chunkIndex": 1,
"data": "<base64-encoded chunk>"
}
```
... (continue for all chunks)
**Step 3: Send completion**
```json
{
"type": "result_end",
"jobId": "uuid-job-123",
"sessionId": "abc123"
}
```
### Node → Server: Render Failed
```json
{
"type": "result",
"jobId": "uuid-job-123",
"sessionId": "abc123",
"success": false,
"error": "FFmpeg encoding failed"
}
```
## Node Implementation
### WebSocket Client (on node)
```javascript
const ws = new WebSocket(`ws://${MAIN_SERVER_URL}/nodes`);
ws.on('message', async (data) => {
const msg = JSON.parse(data);
if (msg.type === 'render') {
// Check cache first (SQLite lookup)
let config = getCachedConfig(msg.sessionId);
if (!config) {
// Cache miss - use provided config and cache it
config = msg.config;
cacheConfig(msg.sessionId, config);
console.log(`[Node] Cache miss for session ${msg.sessionId}, cached new config`);
} else {
console.log(`[Node] Cache hit for session ${msg.sessionId}, reusing cached config`);
}
try {
// Render video/image
const buffer = await renderVideo(config, (progress, stage) => {
// Send progress updates during rendering
ws.send(JSON.stringify({
type: 'progress',
jobId: msg.jobId,
sessionId: msg.sessionId,
stage: stage,
progress: progress
}));
});
// Send result in chunks
const CHUNK_SIZE = 1048576; // 1MB chunks
const totalChunks = Math.ceil(buffer.length / CHUNK_SIZE);
// Step 1: Send metadata
ws.send(JSON.stringify({
type: 'result_start',
jobId: msg.jobId,
sessionId: msg.sessionId,
success: true,
format: isVideo ? 'mp4' : 'png',
totalSize: buffer.length,
chunkSize: CHUNK_SIZE,
totalChunks: totalChunks
}));
// Step 2: Send chunks
for (let i = 0; i < totalChunks; i++) {
const start = i * CHUNK_SIZE;
const end = Math.min(start + CHUNK_SIZE, buffer.length);
const chunk = buffer.slice(start, end);
ws.send(JSON.stringify({
type: 'result_chunk',
jobId: msg.jobId,
chunkIndex: i,
data: chunk.toString('base64')
}));
// Small delay to avoid overwhelming the connection
await new Promise(resolve => setTimeout(resolve, 10));
}
// Step 3: Send completion
ws.send(JSON.stringify({
type: 'result_end',
jobId: msg.jobId,
sessionId: msg.sessionId
}));
} catch (error) {
// Send error
ws.send(JSON.stringify({
type: 'result',
jobId: msg.jobId,
sessionId: msg.sessionId,
success: false,
error: error.message
}));
}
}
});
```
### Server-Side: Reassembling Chunks
```javascript
const pendingJobs = new Map(); // jobId -> { chunks, metadata }
wss.on('connection', (ws) => {
// ... registration code ...
ws.on('message', (data) => {
const msg = JSON.parse(data);
if (msg.type === 'result_start') {
// Initialize job
pendingJobs.set(msg.jobId, {
sessionId: msg.sessionId,
format: msg.format,
totalSize: msg.totalSize,
totalChunks: msg.totalChunks,
chunks: new Array(msg.totalChunks),
receivedChunks: 0
});
}
else if (msg.type === 'result_chunk') {
const job = pendingJobs.get(msg.jobId);
if (!job) return;
// Store chunk
job.chunks[msg.chunkIndex] = Buffer.from(msg.data, 'base64');
job.receivedChunks++;
// Log progress
const uploadProgress = Math.floor((job.receivedChunks / job.totalChunks) * 100);
console.log(`[Job ${msg.jobId}] Upload progress: ${uploadProgress}%`);
}
else if (msg.type === 'result_end') {
const job = pendingJobs.get(msg.jobId);
if (!job) return;
// Reassemble buffer
const completeBuffer = Buffer.concat(job.chunks);
// Cache the result
cacheVideo(job.sessionId, completeBuffer);
// Update session status
updateSessionRenderStatus(job.sessionId, 'completed', null, null, 100, 'completed');
// Cleanup
pendingJobs.delete(msg.jobId);
console.log(`[Job ${msg.jobId}] Render complete, cached ${completeBuffer.length} bytes`);
}
else if (msg.type === 'progress') {
// Forward progress to SSE clients
updateSessionRenderStatus(msg.sessionId, 'rendering', null, null, msg.progress, msg.stage);
}
});
});
```
### Node Cache (SQLite)
```javascript
// Lightweight SQLite DB on each node (data/node_cache.db)
const Database = require('better-sqlite3');
const db = new Database('data/node_cache.db');
// Create cache table
db.exec(`
CREATE TABLE IF NOT EXISTS config_cache (
session_id TEXT PRIMARY KEY,
config_json TEXT NOT NULL,
cached_at INTEGER NOT NULL
)
`);
// Cache config
function cacheConfig(sessionId, config) {
const stmt = db.prepare(`
INSERT OR REPLACE INTO config_cache (session_id, config_json, cached_at)
VALUES (?, ?, ?)
`);
stmt.run(sessionId, JSON.stringify(config), Date.now());
}
// Get cached config
function getCachedConfig(sessionId) {
const stmt = db.prepare(`
SELECT config_json FROM config_cache
WHERE session_id = ? AND cached_at > ?
`);
const oneHourAgo = Date.now() - 3600000;
const row = stmt.get(sessionId, oneHourAgo);
return row ? JSON.parse(row.config_json) : null;
}
// Aggressive cleanup every 10 minutes
setInterval(() => {
const oneHourAgo = Date.now() - 3600000;
const stmt = db.prepare('DELETE FROM config_cache WHERE cached_at < ?');
const result = stmt.run(oneHourAgo);
if (result.changes > 0) {
console.log(`[Node Cache] Cleaned up ${result.changes} expired config(s)`);
}
}, 600000); // 10 minutes
// Cleanup on exit
process.on('exit', () => {
db.close();
});
```
**Benefits:**
- Low memory usage (configs stored on disk)
- Fast lookups (indexed by session_id)
- Automatic persistence (survives restarts if < 1hr old)
- Small disk footprint (configs expire after 1 hour)
## Bandwidth Optimizations
### 1. Chunked Transfer
- Videos sent in 1MB chunks over WebSocket
- Prevents memory issues with large files
- Allows progress tracking during upload
- Small delay between chunks prevents connection overwhelming
- Server can timeout slow uploads and fallback to local
### 2. Cache Reuse
- Node keeps config cached for 1 hour
- If main server sends same sessionId, node reuses cached base64
- Only updated fields need to be applied
- Massive bandwidth savings on session edits
### 3. Timeout & Fallback
- Monitor upload progress per chunk
- If upload stalls for >30s, cancel and fallback
- Track node performance metrics (avg upload speed)
- Prefer faster nodes for future jobs
### 4. Compression (Optional)
- Could gzip chunks if bandwidth is critical
- Probably not needed - MP4 is already compressed
- Base64 encoding adds ~33% overhead (unavoidable for JSON)
## Security
### Authentication
- Shared `NODE_KEY` in environment
- All internal endpoints check `X-Node-Key` header
- Reject requests without valid key
### Rate Limiting (Optional)
- Limit render jobs per node to prevent abuse
- Max 5 concurrent jobs per node
## Monitoring & Debugging
### Main Server Logs
```
[NodeRegistry] my-gaming-pc registered (GPU: true)
[NodeRegistry] my-gaming-pc heartbeat received
[Dispatcher] Assigned session abc123 to my-gaming-pc (cache hit)
[Dispatcher] Node my-gaming-pc offline, rendering locally
```
### Node Logs
```
[Node] Registered with main server: http://192.168.1.100:3000
[Node] Heartbeat sent
[Node] Render request received: session abc123 (cache miss)
[Node] Render completed in 3.2s
```
## Error Handling
### Node Failures
1. **Node offline**: Fallback to local render immediately
2. **Node timeout**: Wait 5s max, then fallback
3. **Render failed on node**: Node returns 500, main server fallbacks
4. **Upload timeout**: If MP4 upload takes >30s, cancel and fallback
### Node Recovery
- Node auto-reconnects 5s after WebSocket disconnection
- Node clears cache on restart
- Main server automatically removes node from registry on WebSocket close
- No timeout monitoring needed - connection state is the source of truth
## File Changes
### New Files
- `renderNode.js` - Node WebSocket client, cache management, render worker
- `nodeRegistry.js` - Main server's WebSocket server, node management, job dispatcher
- `DISTRIBUTED_RENDERING_PLAN.md` - This file
### Modified Files
- `api.js` - Initialize WebSocket server and node mode detection
- `videoRenderer.js` - Check for available nodes before local render, dispatch jobs via WebSocket
- `v2Routes.js` - Forward progress updates from nodes to SSE clients
### Dependencies to Add
```json
{
"ws": "^8.14.0"
}
```
## Deployment
### Main Server
```bash
# .env
# No special config needed
npm start
```
### Render Node (Your PC)
```bash
# .env
MAIN_SERVER_URL=http://your-server-ip:3000
NODE_KEY=your-shared-secret
HAS_GPU=true
npm start
```
**Node Resource Usage:**
- **Memory**: Low (~50-100MB idle, spikes during render)
- **Disk**: `data/node_cache.db` (typically < 10MB, auto-cleaned every 10 min)
- **CPU/GPU**: Only used during active renders
- **Network**: Minimal (WebSocket connection + render jobs)
### Docker Compose Example
```yaml
version: '3.8'
services:
main-server:
build: .
ports:
- "3000:3000"
environment:
- NODE_ENV=production
render-node-local:
build: .
environment:
- MAIN_SERVER_URL=http://main-server:3000
- NODE_KEY=shared-secret
- NODE_NAME=local-fallback
- NODE_PORT=3001
```
## Testing Plan
### Unit Tests
- [ ] Node registration with valid/invalid key
- [ ] Heartbeat updates lastHeartbeat timestamp
- [ ] selectNode() prioritizes GPU nodes
- [ ] selectNode() prioritizes cached sessions
- [ ] Cache cleanup removes old entries
### Integration Tests
- [ ] Node registers and receives heartbeat acknowledgment
- [ ] Main server dispatches job to online node
- [ ] Main server falls back when node is offline
- [ ] Session edit reuses cached config on same node
- [ ] Node cache expires after 1 hour
- [ ] Multiple nodes balance load
### Manual Tests
1. Start main server
2. Start node on PC with GPU
3. Create session with video → verify node renders
4. Stop node → verify fallback to local
5. Edit session → verify same node reused (if online)
6. Wait 1 hour → verify node cache cleared
## Future Enhancements (Out of Scope)
- Load balancing across multiple nodes
- Node performance metrics (render time tracking)
- Node priority override (manual priority setting)
- Webhook notifications when node comes online/offline
- Web UI for node management
- Automatic node discovery (mDNS/Bonjour)