import 'dart:async'; import 'dart:convert'; import 'dart:typed_data'; import 'package:flutter/material.dart'; import 'package:flutter/services.dart'; import 'package:flutter_webrtc/flutter_webrtc.dart'; import '../services/signalling.dart'; class ViewerScreen extends StatefulWidget { final String serverUrl; const ViewerScreen({super.key, required this.serverUrl}); @override State createState() => _ViewerScreenState(); } class _ViewerScreenState extends State { static const _videoCh = MethodChannel("com.pulsar/video"); final _sig = SignallingService(); RTCPeerConnection? _pc; RTCDataChannel? _dataChannel; final _roomCodeCtrl = TextEditingController(); String _status = 'Enter a room code to connect'; bool _streaming = false; bool _joining = false; StreamSubscription? _sigSub; int? _textureId; bool _decoderReady = false; bool _settingUpDecoder = false; // frames that arrive while createDecoder is in-flight are queued here and // drained once the decoder is ready, so we don't miss the initial keyframe final _pendingNals = <(Uint8List, int)>[]; // widget key for normalising input coords final _videoKey = GlobalKey(); Size _videoSize = Size.zero; // stats bool _showStats = false; int _statFps = 0; double _statBitrateMbps = 0; int _statDropped = 0; String _statResolution = '—'; int _statLatencyMs = 0; double _statDecodeMs = 0; // actually submit time for feedNal int _frameCount = 0; int _byteCount = 0; int _droppedCount = 0; double _decodeMsSum = 0; int _decodeMsCount = 0; Timer? _statsTimer; @override void dispose() { _statsTimer?.cancel(); _sigSub?.cancel(); _sig.dispose(); _pc?.close(); _roomCodeCtrl.dispose(); if (_textureId != null) { _videoCh.invokeMethod('stopDecoder', {'textureId': _textureId}); } super.dispose(); } Future _join() async { final roomId = _roomCodeCtrl.text.trim().toUpperCase(); if (roomId.isEmpty) return; setState(() { _joining = true; _status = 'Connecting...'; }); try { _sig.connect(widget.serverUrl); } catch (e) { setState(() { _status = 'Failed to connect: $e'; _joining = false; }); return; } // build the peer connection BEFORE telling the server we've joined, // otherwise the host's offer can race in before _pc exists and the // whole answer step gets silently dropped await _initPeerConnection(roomId); _sigSub = _sig.messages.listen( (msg) => _onSignal(msg, roomId), onError: (e) { setState(() => _status = 'Signal error: $e'); }, ); _sig.send({'type': 'join', 'roomId': roomId}); } Future _onSignal(Map msg, String roomId) async { final type = msg['type'] as String?; switch (type) { case 'joined': setState(() => _status = 'Waiting for host offer...'); break; case 'signal': final data = msg['data'] as Map; await _handleSignalData(data, roomId); break; case 'host_disconnected': _statsTimer?.cancel(); setState(() { _status = 'Host disconnected'; _streaming = false; }); break; case 'error': setState(() { _status = 'Error: ${msg['message']}'; _joining = false; }); break; } } Future _initPeerConnection(String roomId) async { final config = { 'iceServers': [{'urls': 'stun:stun.l.google.com:19302'}], 'sdpSemantics': 'unified-plan', 'encodedInsertableStreams': false, }; _pc = await createPeerConnection(config); _pc!.onIceCandidate = (candidate) { _sig.send({ 'type': 'signal', 'roomId': roomId, 'data': {'type': 'candidate', 'candidate': candidate.toMap()}, }); }; _pc!.onConnectionState = (state) { if (state == RTCPeerConnectionState.RTCPeerConnectionStateConnected) { setState(() => _status = 'Connected'); _statsTimer = Timer.periodic(const Duration(seconds: 1), (_) => _flushStats()); } else if (state == RTCPeerConnectionState.RTCPeerConnectionStateFailed || state == RTCPeerConnectionState.RTCPeerConnectionStateDisconnected) { _statsTimer?.cancel(); setState(() { _status = 'Connection lost'; _streaming = false; }); } }; _pc!.onDataChannel = (channel) { _dataChannel = channel; channel.onMessage = (msg) { if (msg.isBinary) _onData(msg.binary); }; }; } void _flushStats() { setState(() { _statFps = _frameCount; _statBitrateMbps = _byteCount * 8 / 1e6; _statDropped = _droppedCount; _statDecodeMs = _decodeMsCount > 0 ? _decodeMsSum / _decodeMsCount : 0; _frameCount = 0; _byteCount = 0; _droppedCount = 0; _decodeMsSum = 0; _decodeMsCount = 0; }); _sendInput({ 'type': 'stats', 'fps': _statFps, 'bitrateMbps': _statBitrateMbps, 'dropped': _statDropped, 'resolution': _statResolution, 'latencyMs': _statLatencyMs, 'decodeMs': double.parse(_statDecodeMs.toStringAsFixed(1)), 'paintMs': 0.0, }); } void _onData(Uint8List bytes) { if (bytes.isEmpty) return; _byteCount += bytes.length; final type = bytes[0]; if (type == 0x01) { // config packet: SPS+PPS — create/reconfigure the decoder final spsPps = Uint8List.sublistView(bytes, 1); _setupDecoder(spsPps); } else if (type == 0x02) { // frame packet: [0x02][8-byte ts][Annex B NALU] if (bytes.length < 10) return; final bd = ByteData.sublistView(bytes, 1, 9); final ts = bd.getInt64(0, Endian.big); final latency = DateTime.now().millisecondsSinceEpoch - ts; final nal = Uint8List.sublistView(bytes, 9); // drop frames that are already stale — prevents burst playback after a lag spike if (latency > 300) return; if (!_decoderReady) { _pendingNals.add((nal, latency)); } else { _feedNal(nal, latency); } } } Future _setupDecoder(Uint8List spsPps) async { // a second 0x01 packet can arrive while createDecoder is still awaiting; // ignore it — the first decoder will be ready soon enough if (_settingUpDecoder) return; _settingUpDecoder = true; _decoderReady = false; _pendingNals.clear(); if (_textureId != null) { await _videoCh.invokeMethod('stopDecoder', {'textureId': _textureId}); _textureId = null; } try { final res = await _videoCh.invokeMethod('createDecoder', {'spsPps': spsPps}); final id = (res?['textureId'] as num?)?.toInt(); if (!mounted) return; setState(() { _textureId = id; if (!_streaming) _streaming = true; }); _decoderReady = true; // drain frames that queued up while createDecoder was in-flight final pending = List.of(_pendingNals); _pendingNals.clear(); for (final (nal, latency) in pending) { _feedNal(nal, latency); } } catch (_) { _decoderReady = false; } finally { _settingUpDecoder = false; } } Future _feedNal(Uint8List nal, int latency) async { if (_textureId == null) return; try { final t0 = DateTime.now(); await _videoCh.invokeMethod('feedNal', { 'textureId': _textureId, 'nal': nal, }); final elapsed = DateTime.now().difference(t0).inMicroseconds / 1000.0; if (!mounted) return; _decodeMsSum += elapsed; _decodeMsCount++; _frameCount++; setState(() { _statLatencyMs = latency.clamp(0, 9999); }); } catch (_) {} } Future _handleSignalData(Map data, String roomId) async { final sigType = data['type'] as String?; if (sigType == 'offer') { final offer = RTCSessionDescription(data['sdp'] as String, 'offer'); await _pc?.setRemoteDescription(offer); final answer = await _pc!.createAnswer(); await _pc!.setLocalDescription(answer); _sig.send({ 'type': 'signal', 'roomId': roomId, 'data': {'type': 'answer', 'sdp': answer.sdp}, }); } else if (sigType == 'candidate') { final raw = data['candidate'] as Map; final candidate = RTCIceCandidate( raw['candidate'] as String, raw['sdpMid'] as String?, raw['sdpMLineIndex'] as int?, ); await _pc?.addCandidate(candidate); } } void _sendInput(Map ev) { if (_dataChannel == null) return; try { _dataChannel!.send(RTCDataChannelMessage(jsonEncode(ev))); } catch (_) {} } void _updateVideoSize() { final ctx = _videoKey.currentContext; if (ctx != null) { final box = ctx.findRenderObject() as RenderBox?; if (box != null) _videoSize = box.size; } } Offset _normalise(Offset local) { _updateVideoSize(); if (_videoSize.isEmpty) return Offset.zero; return Offset( (local.dx / _videoSize.width).clamp(0.0, 1.0), (local.dy / _videoSize.height).clamp(0.0, 1.0), ); } void _onPointerMove(PointerMoveEvent e) { final n = _normalise(e.localPosition); _sendInput({'type': 'move', 'x': n.dx, 'y': n.dy}); } void _onPointerDown(PointerDownEvent e) { final n = _normalise(e.localPosition); _sendInput({'type': 'click', 'x': n.dx, 'y': n.dy, 'button': 0}); } void _onKeyEvent(KeyEvent e) { if (e is KeyDownEvent) { _sendInput({'type': 'keydown', 'keyCode': e.logicalKey.keyId}); } else if (e is KeyUpEvent) { _sendInput({'type': 'keyup', 'keyCode': e.logicalKey.keyId}); } } @override Widget build(BuildContext context) { return Scaffold( appBar: AppBar( title: const Text('Pulsar — Viewer'), backgroundColor: Theme.of(context).colorScheme.inversePrimary, ), floatingActionButton: _streaming ? FloatingActionButton( mini: true, tooltip: 'Stats', onPressed: () => setState(() => _showStats = !_showStats), child: const Icon(Icons.bar_chart), ) : null, body: _streaming ? _buildStream() : _buildJoinView(), ); } Widget _buildJoinView() { return Center( child: ConstrainedBox( constraints: const BoxConstraints(maxWidth: 360), child: Padding( padding: const EdgeInsets.all(32), child: Column( mainAxisAlignment: MainAxisAlignment.center, crossAxisAlignment: CrossAxisAlignment.stretch, children: [ const Icon(Icons.connected_tv, size: 64, color: Colors.indigo), const SizedBox(height: 24), TextField( controller: _roomCodeCtrl, textCapitalization: TextCapitalization.characters, maxLength: 6, textAlign: TextAlign.center, style: const TextStyle( fontSize: 28, fontFamily: 'monospace', letterSpacing: 8, fontWeight: FontWeight.bold, ), decoration: const InputDecoration( labelText: 'Room Code', border: OutlineInputBorder(), counterText: '', ), onSubmitted: (_) => _joining ? null : _join(), ), const SizedBox(height: 16), FilledButton( onPressed: _joining ? null : _join, child: _joining ? const SizedBox( height: 20, width: 20, child: CircularProgressIndicator(strokeWidth: 2, color: Colors.white), ) : const Text('Connect'), ), const SizedBox(height: 16), Text( _status, textAlign: TextAlign.center, style: const TextStyle(color: Colors.grey), ), ], ), ), ), ); } Widget _buildStream() { return Stack( children: [ Focus( autofocus: true, onKeyEvent: (_, e) { _onKeyEvent(e); return KeyEventResult.handled; }, child: Listener( onPointerMove: _onPointerMove, onPointerDown: _onPointerDown, child: _textureId != null ? SizedBox.expand( key: _videoKey, child: Texture(textureId: _textureId!), ) : const Center(child: CircularProgressIndicator()), ), ), if (_showStats) Positioned( top: 12, left: 12, child: _StatsOverlay(lines: [ 'FPS: $_statFps', 'Bitrate: ${_statBitrateMbps.toStringAsFixed(2)} Mbps', 'Dropped: $_statDropped/s', 'Res: $_statResolution', 'Latency: ${_statLatencyMs}ms', 'Submit: ${_statDecodeMs.toStringAsFixed(1)}ms', ]), ), ], ); } } class _StatsOverlay extends StatelessWidget { final List lines; const _StatsOverlay({required this.lines}); @override Widget build(BuildContext context) { return Container( padding: const EdgeInsets.symmetric(horizontal: 10, vertical: 8), decoration: BoxDecoration( color: Colors.black.withValues(alpha: 0.6), borderRadius: BorderRadius.circular(6), ), child: Column( crossAxisAlignment: CrossAxisAlignment.start, children: lines.map((l) => Text( l, style: const TextStyle( color: Colors.white, fontFamily: 'monospace', fontSize: 12, ), )).toList(), ), ); } }