Files
Quote-Generator/docs/DISTRIBUTED_RENDERING_PLAN.md
ImBenji 537fc4f750 Add v2 quote API with video support and Flutter tweet template widget
- Implement Express routes for creating, updating, retrieving, and deleting quote sessions
- Add video detection and rendering pipeline using Playwright and FFmpeg
- Add caching utilities for images and videos
- Provide helpers for formatting counts, timestamps, and normalizing usernames
- Add snapshot creation and retrieval endpoints
- Implement SSE endpoint for render progress updates
- Add Flutter widget for rendering tweet templates with image/video support
2026-02-13 21:56:26 +00:00

20 KiB

an # Distributed Render Node Architecture

Goal

Allow lightweight render nodes (like personal PC with GPU) to handle video rendering for the main server, with automatic fallback and minimal resource usage on nodes.

Design Philosophy

  • Single Codebase: Same code runs as main server or render node (determined by env vars)
  • Proxy-Like: Nodes act almost like a rendering proxy with minimal state
  • Stateful but Lightweight: Nodes use local SQLite for temporary config caching (1hr TTL)
  • Bandwidth Aware: Minimize data transfer, handle slow upload speeds
  • No Replication: Nodes only cache render configs, never replicate main server's session DB
  • Low Memory Usage: Configs stored in SQLite instead of memory (important for large base64 data)

Environment Variables

Main Server Mode (default)

# No special vars needed - runs as normal API server

Render Node Mode

MAIN_SERVER_URL=http://192.168.1.100:3000   # or ws://... for WebSocket URL
NODE_KEY=shared-secret-key
NODE_NAME=my-gaming-pc              # optional, defaults to hostname
HAS_GPU=true                        # optional, for prioritization

Note: No port configuration needed - node connects outbound to main server via WebSocket

Architecture

┌──────────────────────────────────────────────────────┐
│ Main Server                                           │
│ - Full SQLite DB (sessions, snapshots)               │
│ - Long-term cache (24hr TTL)                         │
│ - Node registry (in-memory)                          │
│ - Job dispatcher                                     │
└──────────────────────────────────────────────────────┘
                     │
        ┌────────────┴────────────┐
        │                         │
┌───────▼─────────┐      ┌────────▼────────┐
│ Render Node 1   │      │ Render Node 2   │
│ (Your PC)       │      │ (Server Local)  │
│                 │      │                 │
│ - Local SQLite  │      │ - Local SQLite  │
│   (cache only)  │      │   (cache only)  │
│ - 1hr TTL       │      │ - 1hr TTL       │
│ - Auto-cleanup  │      │ - Auto-cleanup  │
│ - GPU: NVENC    │      │ - CPU: libx264  │
│ - Priority: 1   │      │ - Priority: 99  │
└─────────────────┘      └─────────────────┘

Data Flow

Initial Render (No Cache)

1. Client → Main Server: POST /v2/quote (with base64 image)
2. Main Server: Store session in DB
3. Main Server: Pick best available node
4. Main Server → Node: POST /internal/render
   Payload: {
     sessionId: "abc123",
     config: { ...full config with base64... }
   }
5. Node: Cache config temporarily (key: sessionId)
6. Node: Render video
7. Node → Main Server: Return MP4 buffer
8. Main Server: Cache MP4 (24hr)
9. Main Server: Update session renderStatus = 'completed'

Edit Same Session (Cache Hit)

1. Client → Main Server: PATCH /v2/quote/abc123 (change text)
2. Main Server: Update session in DB
3. Main Server: Check which node has sessionId cached
4. Main Server → Same Node: POST /internal/render
   Payload: {
     sessionId: "abc123",
     config: { ...full config with base64... }  // sent again but node may have it cached
   }
5. Node: Check cache for sessionId → HIT! Reuse base64
6. Node: Render with updated config
7. Node → Main Server: Return MP4 buffer
8. Main Server: Cache new MP4

Node Unavailable (Fallback)

1. Main Server → Node: POST /internal/render (5s timeout)
2. Node: No response / timeout
3. Main Server: Mark node as offline
4. Main Server: Render locally (fallback)

WebSocket Communication

Why WebSocket?

  • No Port Forwarding: Node initiates outbound connection to main server
  • Persistent Connection: Single connection for all communication
  • Bidirectional: Server can push jobs, node can push results
  • Built-in Heartbeat: WebSocket connection state = node status (no explicit heartbeat needed)
  • Auto Ping/Pong: WebSocket library handles keep-alive automatically

Node Connection (on startup)

let ws = null;

function connectToMainServer() {
  ws = new WebSocket('ws://main-server:3000/nodes');

  ws.on('open', () => {
    console.log('[Node] Connected to main server');

    // Send registration
    ws.send(JSON.stringify({
      type: 'register',
      key: NODE_KEY,
      name: NODE_NAME,
      hasGpu: HAS_GPU
    }));
  });

  ws.on('close', () => {
    console.log('[Node] Disconnected from main server, reconnecting in 5s...');
    setTimeout(connectToMainServer, 5000);
  });

  ws.on('error', (error) => {
    console.error('[Node] WebSocket error:', error.message);
  });
}

connectToMainServer();

No explicit heartbeat needed - WebSocket connection state indicates node availability

Server Side

// Main server accepts WebSocket connections
const wss = new WebSocketServer({ noServer: true });

server.on('upgrade', (request, socket, head) => {
  if (request.url === '/nodes') {
    wss.handleUpgrade(request, socket, head, (ws) => {
      wss.emit('connection', ws, request);
    });
  }
});

Node Registry (Main Server, in-memory)

const nodeRegistry = new Map(); // nodeId -> { ws, name, hasGpu, ... }

wss.on('connection', (ws) => {
  let nodeId = null;

  ws.on('message', (data) => {
    const msg = JSON.parse(data);

    if (msg.type === 'register') {
      // Verify key
      if (msg.key !== NODE_KEY) {
        ws.close(1008, 'Invalid key');
        return;
      }

      nodeId = generateId();
      nodeRegistry.set(nodeId, {
        id: nodeId,
        ws: ws,
        name: msg.name,
        hasGpu: msg.hasGpu,
        currentJobs: 0,
        cachedSessions: new Set(),
        connectedAt: Date.now()
      });

      ws.send(JSON.stringify({
        type: 'registered',
        nodeId: nodeId
      }));

      console.log(`[NodeRegistry] ${msg.name} registered (GPU: ${msg.hasGpu})`);
    }

    // Handle other message types (progress, result_start, etc.)
    // ...
  });

  // Automatic cleanup on disconnect
  ws.on('close', () => {
    if (nodeId && nodeRegistry.has(nodeId)) {
      const node = nodeRegistry.get(nodeId);
      console.log(`[NodeRegistry] ${node.name} disconnected`);

      // Cancel any pending jobs on this node
      if (node.currentJobs > 0) {
        console.log(`[NodeRegistry] ${node.name} had ${node.currentJobs} active jobs, will fallback`);
      }

      // Remove from registry
      nodeRegistry.delete(nodeId);
    }
  });

  ws.on('error', (error) => {
    console.error(`[NodeRegistry] WebSocket error for node ${nodeId}:`, error.message);
  });
});

Connection-based availability:

  • Node in registry = online and available
  • WebSocket closes = automatically removed from registry
  • No need for timeout checks or heartbeat monitoring

Render Job Dispatch

Node Selection Algorithm

function selectNode(sessionId) {
  const onlineNodes = Object.values(nodeRegistry)
    .filter(n => n.status === 'online')
    .sort((a, b) => {
      // 1. Prefer node with session cached
      const aHasCache = a.cachedSessions.has(sessionId);
      const bHasCache = b.cachedSessions.has(sessionId);
      if (aHasCache && !bHasCache) return -1;
      if (!aHasCache && bHasCache) return 1;

      // 2. Prefer GPU nodes
      if (a.hasGpu && !b.hasGpu) return -1;
      if (!a.hasGpu && b.hasGpu) return 1;

      // 3. Prefer nodes with fewer jobs
      return a.currentJobs - b.currentJobs;
    });

  return onlineNodes[0] || null; // null = render locally
}

WebSocket Message Protocol

Server → Node: Render Job

{
  "type": "render",
  "jobId": "uuid-job-123",
  "sessionId": "abc123",
  "config": {
    "displayName": "User",
    "username": "@user",
    "text": "Hello",
    "avatarUrl": "data:image/png;base64,...",
    "imageUrl": "data:video/mp4;base64,...",
    "timestamp": 1234567890,
    "verified": true,
    "engagement": { ... }
  }
}

Node → Server: Progress Update

{
  "type": "progress",
  "jobId": "uuid-job-123",
  "sessionId": "abc123",
  "stage": "rendering", // or "encoding"
  "progress": 45
}

Node → Server: Render Complete (Chunked Transfer)

Step 1: Send metadata

{
  "type": "result_start",
  "jobId": "uuid-job-123",
  "sessionId": "abc123",
  "success": true,
  "format": "mp4",
  "totalSize": 15728640,
  "chunkSize": 1048576,
  "totalChunks": 15
}

Step 2: Send chunks (1MB each)

{
  "type": "result_chunk",
  "jobId": "uuid-job-123",
  "chunkIndex": 0,
  "data": "<base64-encoded chunk>"
}
{
  "type": "result_chunk",
  "jobId": "uuid-job-123",
  "chunkIndex": 1,
  "data": "<base64-encoded chunk>"
}

... (continue for all chunks)

Step 3: Send completion

{
  "type": "result_end",
  "jobId": "uuid-job-123",
  "sessionId": "abc123"
}

Node → Server: Render Failed

{
  "type": "result",
  "jobId": "uuid-job-123",
  "sessionId": "abc123",
  "success": false,
  "error": "FFmpeg encoding failed"
}

Node Implementation

WebSocket Client (on node)

const ws = new WebSocket(`ws://${MAIN_SERVER_URL}/nodes`);

ws.on('message', async (data) => {
  const msg = JSON.parse(data);

  if (msg.type === 'render') {
    // Check cache first (SQLite lookup)
    let config = getCachedConfig(msg.sessionId);
    if (!config) {
      // Cache miss - use provided config and cache it
      config = msg.config;
      cacheConfig(msg.sessionId, config);
      console.log(`[Node] Cache miss for session ${msg.sessionId}, cached new config`);
    } else {
      console.log(`[Node] Cache hit for session ${msg.sessionId}, reusing cached config`);
    }

    try {
      // Render video/image
      const buffer = await renderVideo(config, (progress, stage) => {
        // Send progress updates during rendering
        ws.send(JSON.stringify({
          type: 'progress',
          jobId: msg.jobId,
          sessionId: msg.sessionId,
          stage: stage,
          progress: progress
        }));
      });

      // Send result in chunks
      const CHUNK_SIZE = 1048576; // 1MB chunks
      const totalChunks = Math.ceil(buffer.length / CHUNK_SIZE);

      // Step 1: Send metadata
      ws.send(JSON.stringify({
        type: 'result_start',
        jobId: msg.jobId,
        sessionId: msg.sessionId,
        success: true,
        format: isVideo ? 'mp4' : 'png',
        totalSize: buffer.length,
        chunkSize: CHUNK_SIZE,
        totalChunks: totalChunks
      }));

      // Step 2: Send chunks
      for (let i = 0; i < totalChunks; i++) {
        const start = i * CHUNK_SIZE;
        const end = Math.min(start + CHUNK_SIZE, buffer.length);
        const chunk = buffer.slice(start, end);

        ws.send(JSON.stringify({
          type: 'result_chunk',
          jobId: msg.jobId,
          chunkIndex: i,
          data: chunk.toString('base64')
        }));

        // Small delay to avoid overwhelming the connection
        await new Promise(resolve => setTimeout(resolve, 10));
      }

      // Step 3: Send completion
      ws.send(JSON.stringify({
        type: 'result_end',
        jobId: msg.jobId,
        sessionId: msg.sessionId
      }));

    } catch (error) {
      // Send error
      ws.send(JSON.stringify({
        type: 'result',
        jobId: msg.jobId,
        sessionId: msg.sessionId,
        success: false,
        error: error.message
      }));
    }
  }
});

Server-Side: Reassembling Chunks

const pendingJobs = new Map(); // jobId -> { chunks, metadata }

wss.on('connection', (ws) => {
  // ... registration code ...

  ws.on('message', (data) => {
    const msg = JSON.parse(data);

    if (msg.type === 'result_start') {
      // Initialize job
      pendingJobs.set(msg.jobId, {
        sessionId: msg.sessionId,
        format: msg.format,
        totalSize: msg.totalSize,
        totalChunks: msg.totalChunks,
        chunks: new Array(msg.totalChunks),
        receivedChunks: 0
      });
    }

    else if (msg.type === 'result_chunk') {
      const job = pendingJobs.get(msg.jobId);
      if (!job) return;

      // Store chunk
      job.chunks[msg.chunkIndex] = Buffer.from(msg.data, 'base64');
      job.receivedChunks++;

      // Log progress
      const uploadProgress = Math.floor((job.receivedChunks / job.totalChunks) * 100);
      console.log(`[Job ${msg.jobId}] Upload progress: ${uploadProgress}%`);
    }

    else if (msg.type === 'result_end') {
      const job = pendingJobs.get(msg.jobId);
      if (!job) return;

      // Reassemble buffer
      const completeBuffer = Buffer.concat(job.chunks);

      // Cache the result
      cacheVideo(job.sessionId, completeBuffer);

      // Update session status
      updateSessionRenderStatus(job.sessionId, 'completed', null, null, 100, 'completed');

      // Cleanup
      pendingJobs.delete(msg.jobId);

      console.log(`[Job ${msg.jobId}] Render complete, cached ${completeBuffer.length} bytes`);
    }

    else if (msg.type === 'progress') {
      // Forward progress to SSE clients
      updateSessionRenderStatus(msg.sessionId, 'rendering', null, null, msg.progress, msg.stage);
    }
  });
});

Node Cache (SQLite)

// Lightweight SQLite DB on each node (data/node_cache.db)
const Database = require('better-sqlite3');
const db = new Database('data/node_cache.db');

// Create cache table
db.exec(`
  CREATE TABLE IF NOT EXISTS config_cache (
    session_id TEXT PRIMARY KEY,
    config_json TEXT NOT NULL,
    cached_at INTEGER NOT NULL
  )
`);

// Cache config
function cacheConfig(sessionId, config) {
  const stmt = db.prepare(`
    INSERT OR REPLACE INTO config_cache (session_id, config_json, cached_at)
    VALUES (?, ?, ?)
  `);
  stmt.run(sessionId, JSON.stringify(config), Date.now());
}

// Get cached config
function getCachedConfig(sessionId) {
  const stmt = db.prepare(`
    SELECT config_json FROM config_cache
    WHERE session_id = ? AND cached_at > ?
  `);
  const oneHourAgo = Date.now() - 3600000;
  const row = stmt.get(sessionId, oneHourAgo);
  return row ? JSON.parse(row.config_json) : null;
}

// Aggressive cleanup every 10 minutes
setInterval(() => {
  const oneHourAgo = Date.now() - 3600000;
  const stmt = db.prepare('DELETE FROM config_cache WHERE cached_at < ?');
  const result = stmt.run(oneHourAgo);
  if (result.changes > 0) {
    console.log(`[Node Cache] Cleaned up ${result.changes} expired config(s)`);
  }
}, 600000); // 10 minutes

// Cleanup on exit
process.on('exit', () => {
  db.close();
});

Benefits:

  • Low memory usage (configs stored on disk)
  • Fast lookups (indexed by session_id)
  • Automatic persistence (survives restarts if < 1hr old)
  • Small disk footprint (configs expire after 1 hour)

Bandwidth Optimizations

1. Chunked Transfer

  • Videos sent in 1MB chunks over WebSocket
  • Prevents memory issues with large files
  • Allows progress tracking during upload
  • Small delay between chunks prevents connection overwhelming
  • Server can timeout slow uploads and fallback to local

2. Cache Reuse

  • Node keeps config cached for 1 hour
  • If main server sends same sessionId, node reuses cached base64
  • Only updated fields need to be applied
  • Massive bandwidth savings on session edits

3. Timeout & Fallback

  • Monitor upload progress per chunk
  • If upload stalls for >30s, cancel and fallback
  • Track node performance metrics (avg upload speed)
  • Prefer faster nodes for future jobs

4. Compression (Optional)

  • Could gzip chunks if bandwidth is critical
  • Probably not needed - MP4 is already compressed
  • Base64 encoding adds ~33% overhead (unavoidable for JSON)

Security

Authentication

  • Shared NODE_KEY in environment
  • All internal endpoints check X-Node-Key header
  • Reject requests without valid key

Rate Limiting (Optional)

  • Limit render jobs per node to prevent abuse
  • Max 5 concurrent jobs per node

Monitoring & Debugging

Main Server Logs

[NodeRegistry] my-gaming-pc registered (GPU: true)
[NodeRegistry] my-gaming-pc heartbeat received
[Dispatcher] Assigned session abc123 to my-gaming-pc (cache hit)
[Dispatcher] Node my-gaming-pc offline, rendering locally

Node Logs

[Node] Registered with main server: http://192.168.1.100:3000
[Node] Heartbeat sent
[Node] Render request received: session abc123 (cache miss)
[Node] Render completed in 3.2s

Error Handling

Node Failures

  1. Node offline: Fallback to local render immediately
  2. Node timeout: Wait 5s max, then fallback
  3. Render failed on node: Node returns 500, main server fallbacks
  4. Upload timeout: If MP4 upload takes >30s, cancel and fallback

Node Recovery

  • Node auto-reconnects 5s after WebSocket disconnection
  • Node clears cache on restart
  • Main server automatically removes node from registry on WebSocket close
  • No timeout monitoring needed - connection state is the source of truth

File Changes

New Files

  • renderNode.js - Node WebSocket client, cache management, render worker
  • nodeRegistry.js - Main server's WebSocket server, node management, job dispatcher
  • DISTRIBUTED_RENDERING_PLAN.md - This file

Modified Files

  • api.js - Initialize WebSocket server and node mode detection
  • videoRenderer.js - Check for available nodes before local render, dispatch jobs via WebSocket
  • v2Routes.js - Forward progress updates from nodes to SSE clients

Dependencies to Add

{
  "ws": "^8.14.0"
}

Deployment

Main Server

# .env
# No special config needed
npm start

Render Node (Your PC)

# .env
MAIN_SERVER_URL=http://your-server-ip:3000
NODE_KEY=your-shared-secret
HAS_GPU=true

npm start

Node Resource Usage:

  • Memory: Low (~50-100MB idle, spikes during render)
  • Disk: data/node_cache.db (typically < 10MB, auto-cleaned every 10 min)
  • CPU/GPU: Only used during active renders
  • Network: Minimal (WebSocket connection + render jobs)

Docker Compose Example

version: '3.8'
services:
  main-server:
    build: .
    ports:
      - "3000:3000"
    environment:
      - NODE_ENV=production

  render-node-local:
    build: .
    environment:
      - MAIN_SERVER_URL=http://main-server:3000
      - NODE_KEY=shared-secret
      - NODE_NAME=local-fallback
      - NODE_PORT=3001

Testing Plan

Unit Tests

  • Node registration with valid/invalid key
  • Heartbeat updates lastHeartbeat timestamp
  • selectNode() prioritizes GPU nodes
  • selectNode() prioritizes cached sessions
  • Cache cleanup removes old entries

Integration Tests

  • Node registers and receives heartbeat acknowledgment
  • Main server dispatches job to online node
  • Main server falls back when node is offline
  • Session edit reuses cached config on same node
  • Node cache expires after 1 hour
  • Multiple nodes balance load

Manual Tests

  1. Start main server
  2. Start node on PC with GPU
  3. Create session with video → verify node renders
  4. Stop node → verify fallback to local
  5. Edit session → verify same node reused (if online)
  6. Wait 1 hour → verify node cache cleared

Future Enhancements (Out of Scope)

  • Load balancing across multiple nodes
  • Node performance metrics (render time tracking)
  • Node priority override (manual priority setting)
  • Webhook notifications when node comes online/offline
  • Web UI for node management
  • Automatic node discovery (mDNS/Bonjour)