This commit is contained in:
ImBenji
2026-05-04 08:21:31 +01:00
parent 0ed740ad19
commit bfdaf4a801
9 changed files with 1081 additions and 149 deletions
+188 -59
View File
@@ -1,6 +1,7 @@
import 'dart:async';
import 'dart:convert';
import 'dart:math';
import 'dart:typed_data';
import 'package:flutter/material.dart';
import 'package:flutter/services.dart';
@@ -8,8 +9,9 @@ import 'package:flutter_webrtc/flutter_webrtc.dart';
import '../services/signalling.dart';
// platform channel for injecting input into the OS
const _inputChannel = MethodChannel('com.pulsar/input');
const _videoChannel = MethodChannel('com.pulsar/video');
const _frameEvents = EventChannel('com.pulsar/video/frames');
class HostScreen extends StatefulWidget {
final String serverUrl;
@@ -23,13 +25,31 @@ class _HostScreenState extends State<HostScreen> {
final _sig = SignallingService();
RTCPeerConnection? _pc;
RTCDataChannel? _dataChannel;
MediaStream? _localStream;
String _roomId = '';
String _status = 'Initialising...';
bool _connected = false;
StreamSubscription? _sigSub;
StreamSubscription? _frameSub;
// stats
bool _showStats = false;
int _statFps = 0;
double _statBitrateMbps = 0;
int _statDropped = 0;
int _statBufferKb = 0;
double _statSendMs = 0;
int _frameCount = 0;
int _byteCount = 0;
int _droppedCount = 0;
double _sendMsSum = 0;
int _sendMsCount = 0;
Timer? _statsTimer;
// viewer stats received over data channel
Map<String, dynamic>? _viewerStats;
@override
void initState() {
@@ -67,7 +87,6 @@ class _HostScreenState extends State<HostScreen> {
switch (type) {
case 'created':
_setState('Waiting for viewer...');
await _startCapture();
break;
case 'peer_joined':
@@ -82,6 +101,7 @@ class _HostScreenState extends State<HostScreen> {
case 'peer_disconnected':
_setState('Viewer disconnected');
_stopCapture();
setState(() => _connected = false);
break;
@@ -91,17 +111,6 @@ class _HostScreenState extends State<HostScreen> {
}
}
Future<void> _startCapture() async {
try {
_localStream = await navigator.mediaDevices.getDisplayMedia({
'video': {'cursor': 'always'},
'audio': false,
});
} catch (e) {
_setState('Screen capture failed: $e');
}
}
Future<void> _createOffer() async {
final config = {
'iceServers': [
@@ -111,31 +120,34 @@ class _HostScreenState extends State<HostScreen> {
_pc = await createPeerConnection(config);
// data channel for input events (host creates it as offerer)
_dataChannel = await _pc!.createDataChannel(
'input',
RTCDataChannelInit()..ordered = true,
);
// unordered + unreliable — drop stale frames, only the latest matters
final dcInit = RTCDataChannelInit()
..ordered = false
..maxRetransmits = 0;
_dataChannel = await _pc!.createDataChannel('frames', dcInit);
_dataChannel!.onMessage = (msg) {
_handleInputEvent(msg.text);
};
// add screen tracks
if (_localStream != null) {
for (final track in _localStream!.getTracks()) {
await _pc!.addTrack(track, _localStream!);
if (msg.isBinary) return;
try {
final parsed = jsonDecode(msg.text) as Map<String, dynamic>;
if (parsed['type'] == 'stats') {
setState(() {
_viewerStats = parsed;
});
} else {
_handleInputEvent(msg.text);
}
} catch (_) {
_handleInputEvent(msg.text);
}
}
};
_pc!.onIceCandidate = (candidate) {
_sig.send({
'type': 'signal',
'roomId': _roomId,
'data': {
'type': 'candidate',
'candidate': candidate.toMap(),
},
'data': {'type': 'candidate', 'candidate': candidate.toMap()},
});
};
@@ -143,9 +155,11 @@ class _HostScreenState extends State<HostScreen> {
if (state == RTCPeerConnectionState.RTCPeerConnectionStateConnected) {
_setState('Connected');
setState(() => _connected = true);
_startCapture();
} else if (state == RTCPeerConnectionState.RTCPeerConnectionStateFailed ||
state == RTCPeerConnectionState.RTCPeerConnectionStateDisconnected) {
_setState('Connection lost');
_stopCapture();
setState(() => _connected = false);
}
};
@@ -177,6 +191,58 @@ class _HostScreenState extends State<HostScreen> {
}
}
Future<void> _startCapture() async {
_statsTimer = Timer.periodic(const Duration(seconds: 1), (_) => _flushStats());
// subscribe BEFORE starting the encoder so the initial SPS+PPS+keyframe isn't dropped
_frameSub = _frameEvents.receiveBroadcastStream().listen((dynamic raw) {
final bytes = (raw as Uint8List);
final dc = _dataChannel;
if (dc == null) return;
final buffered = dc.bufferedAmount ?? 0;
if (buffered > 262144) {
_droppedCount++;
return;
}
try {
final sendStart = DateTime.now();
dc.send(RTCDataChannelMessage.fromBinary(bytes));
_sendMsSum += DateTime.now().difference(sendStart).inMicroseconds / 1000.0;
_sendMsCount++;
_frameCount++;
_byteCount += bytes.length;
} catch (_) {}
});
await _videoChannel.invokeMethod('start');
await _videoChannel.invokeMethod('forceKeyframe');
}
void _stopCapture() {
_frameSub?.cancel();
_frameSub = null;
_statsTimer?.cancel();
_statsTimer = null;
_videoChannel.invokeMethod('stop');
}
void _flushStats() {
final buf = _dataChannel?.bufferedAmount ?? 0;
setState(() {
_statFps = _frameCount;
_statBitrateMbps = _byteCount * 8 / 1e6;
_statDropped = _droppedCount;
_statBufferKb = (buf / 1024).round();
_statSendMs = _sendMsCount > 0 ? _sendMsSum / _sendMsCount : 0;
_frameCount = 0;
_byteCount = 0;
_droppedCount = 0;
_sendMsSum = 0; _sendMsCount = 0;
});
}
void _handleInputEvent(String json) async {
try {
final ev = jsonDecode(json) as Map<String, dynamic>;
@@ -190,10 +256,10 @@ class _HostScreenState extends State<HostScreen> {
@override
void dispose() {
_stopCapture();
_sigSub?.cancel();
_sig.dispose();
_pc?.close();
_localStream?.dispose();
super.dispose();
}
@@ -204,37 +270,100 @@ class _HostScreenState extends State<HostScreen> {
title: const Text('Pulsar — Host'),
backgroundColor: Theme.of(context).colorScheme.inversePrimary,
),
body: Center(
child: Column(
mainAxisAlignment: MainAxisAlignment.center,
children: [
const Icon(Icons.monitor, size: 80, color: Colors.indigo),
const SizedBox(height: 24),
Text('Room Code', style: Theme.of(context).textTheme.titleMedium),
const SizedBox(height: 8),
SelectableText(
_roomId,
style: Theme.of(context).textTheme.displaySmall?.copyWith(
fontFamily: 'monospace',
fontWeight: FontWeight.bold,
letterSpacing: 8,
),
),
const SizedBox(height: 32),
Row(
mainAxisSize: MainAxisSize.min,
floatingActionButton: FloatingActionButton(
mini: true,
tooltip: 'Stats',
onPressed: () => setState(() => _showStats = !_showStats),
child: const Icon(Icons.bar_chart),
),
body: Stack(
children: [
Center(
child: Column(
mainAxisAlignment: MainAxisAlignment.center,
children: [
Icon(
_connected ? Icons.circle : Icons.circle_outlined,
color: _connected ? Colors.green : Colors.orange,
size: 14,
const Icon(Icons.monitor, size: 80, color: Colors.indigo),
const SizedBox(height: 24),
Text('Room Code', style: Theme.of(context).textTheme.titleMedium),
const SizedBox(height: 8),
SelectableText(
_roomId,
style: Theme.of(context).textTheme.displaySmall?.copyWith(
fontFamily: 'monospace',
fontWeight: FontWeight.bold,
letterSpacing: 8,
),
),
const SizedBox(height: 32),
Row(
mainAxisSize: MainAxisSize.min,
children: [
Icon(
_connected ? Icons.circle : Icons.circle_outlined,
color: _connected ? Colors.green : Colors.orange,
size: 14,
),
const SizedBox(width: 8),
Text(_status),
],
),
const SizedBox(width: 8),
Text(_status),
],
),
],
),
),
if (_showStats)
Positioned(
top: 12,
left: 12,
child: _StatsOverlay(lines: [
'Host:',
' FPS: $_statFps',
' Bitrate: ${_statBitrateMbps.toStringAsFixed(2)} Mbps',
' Dropped: $_statDropped/s',
' Buffer: $_statBufferKb KB',
' Send: ${_statSendMs.toStringAsFixed(1)}ms',
'',
if (_viewerStats == null)
'Viewer: no data'
else ...[
'Viewer:',
' FPS: ${_viewerStats!['fps']}',
' Bitrate: ${(_viewerStats!['bitrateMbps'] as num).toStringAsFixed(2)} Mbps',
' Dropped: ${_viewerStats!['dropped']}/s',
' Res: ${_viewerStats!['resolution']}',
' Latency: ${_viewerStats!['latencyMs']}ms',
' Submit: ${_viewerStats!['decodeMs'] ?? ''}ms',
],
]),
),
],
),
);
}
}
class _StatsOverlay extends StatelessWidget {
final List<String> lines;
const _StatsOverlay({required this.lines});
@override
Widget build(BuildContext context) {
return Container(
padding: const EdgeInsets.symmetric(horizontal: 10, vertical: 8),
decoration: BoxDecoration(
color: Colors.black.withValues(alpha: 0.6),
borderRadius: BorderRadius.circular(6),
),
child: Column(
crossAxisAlignment: CrossAxisAlignment.start,
children: lines.map((l) => Text(
l,
style: const TextStyle(
color: Colors.white,
fontFamily: 'monospace',
fontSize: 12,
),
)).toList(),
),
);
}
+231 -45
View File
@@ -1,5 +1,6 @@
import 'dart:async';
import 'dart:convert';
import 'dart:typed_data';
import 'package:flutter/material.dart';
import 'package:flutter/services.dart';
@@ -16,10 +17,11 @@ class ViewerScreen extends StatefulWidget {
}
class _ViewerScreenState extends State<ViewerScreen> {
static const _videoCh = MethodChannel("com.pulsar/video");
final _sig = SignallingService();
RTCPeerConnection? _pc;
RTCDataChannel? _dataChannel;
final _remoteRenderer = RTCVideoRenderer();
final _roomCodeCtrl = TextEditingController();
String _status = 'Enter a room code to connect';
@@ -28,23 +30,46 @@ class _ViewerScreenState extends State<ViewerScreen> {
StreamSubscription? _sigSub;
// size of the rendered video widget, used to normalise pointer coords
Size _videoSize = Size.zero;
final _videoKey = GlobalKey();
int? _textureId;
bool _decoderReady = false;
bool _settingUpDecoder = false;
@override
void initState() {
super.initState();
_remoteRenderer.initialize();
}
// frames that arrive while createDecoder is in-flight are queued here and
// drained once the decoder is ready, so we don't miss the initial keyframe
final _pendingNals = <(Uint8List, int)>[];
// widget key for normalising input coords
final _videoKey = GlobalKey();
Size _videoSize = Size.zero;
// stats
bool _showStats = false;
int _statFps = 0;
double _statBitrateMbps = 0;
int _statDropped = 0;
String _statResolution = '';
int _statLatencyMs = 0;
double _statDecodeMs = 0; // actually submit time for feedNal
int _frameCount = 0;
int _byteCount = 0;
int _droppedCount = 0;
double _decodeMsSum = 0;
int _decodeMsCount = 0;
Timer? _statsTimer;
@override
void dispose() {
_statsTimer?.cancel();
_sigSub?.cancel();
_sig.dispose();
_pc?.close();
_remoteRenderer.dispose();
_roomCodeCtrl.dispose();
if (_textureId != null) {
_videoCh.invokeMethod('stopDecoder', {'textureId': _textureId});
}
super.dispose();
}
@@ -92,6 +117,7 @@ class _ViewerScreenState extends State<ViewerScreen> {
break;
case 'host_disconnected':
_statsTimer?.cancel();
setState(() {
_status = 'Host disconnected';
_streaming = false;
@@ -109,9 +135,9 @@ class _ViewerScreenState extends State<ViewerScreen> {
Future<void> _initPeerConnection(String roomId) async {
final config = {
'iceServers': [
{'urls': 'stun:stun.l.google.com:19302'},
],
'iceServers': [{'urls': 'stun:stun.l.google.com:19302'}],
'sdpSemantics': 'unified-plan',
'encodedInsertableStreams': false,
};
_pc = await createPeerConnection(config);
@@ -120,18 +146,17 @@ class _ViewerScreenState extends State<ViewerScreen> {
_sig.send({
'type': 'signal',
'roomId': roomId,
'data': {
'type': 'candidate',
'candidate': candidate.toMap(),
},
'data': {'type': 'candidate', 'candidate': candidate.toMap()},
});
};
_pc!.onConnectionState = (state) {
if (state == RTCPeerConnectionState.RTCPeerConnectionStateConnected) {
setState(() => _status = 'Connected');
_statsTimer = Timer.periodic(const Duration(seconds: 1), (_) => _flushStats());
} else if (state == RTCPeerConnectionState.RTCPeerConnectionStateFailed ||
state == RTCPeerConnectionState.RTCPeerConnectionStateDisconnected) {
_statsTimer?.cancel();
setState(() {
_status = 'Connection lost';
_streaming = false;
@@ -139,21 +164,133 @@ class _ViewerScreenState extends State<ViewerScreen> {
}
};
_pc!.onTrack = (event) {
if (event.track.kind == 'video') {
setState(() {
_remoteRenderer.srcObject = event.streams[0];
_streaming = true;
_status = 'Streaming';
});
}
};
_pc!.onDataChannel = (channel) {
_dataChannel = channel;
channel.onMessage = (msg) {
if (msg.isBinary) _onData(msg.binary);
};
};
}
void _flushStats() {
setState(() {
_statFps = _frameCount;
_statBitrateMbps = _byteCount * 8 / 1e6;
_statDropped = _droppedCount;
_statDecodeMs = _decodeMsCount > 0 ? _decodeMsSum / _decodeMsCount : 0;
_frameCount = 0;
_byteCount = 0;
_droppedCount = 0;
_decodeMsSum = 0; _decodeMsCount = 0;
});
_sendInput({
'type': 'stats',
'fps': _statFps,
'bitrateMbps': _statBitrateMbps,
'dropped': _statDropped,
'resolution': _statResolution,
'latencyMs': _statLatencyMs,
'decodeMs': double.parse(_statDecodeMs.toStringAsFixed(1)),
'paintMs': 0.0,
});
}
void _onData(Uint8List bytes) {
if (bytes.isEmpty) return;
_byteCount += bytes.length;
final type = bytes[0];
if (type == 0x01) {
// config packet: SPS+PPS — create/reconfigure the decoder
final spsPps = Uint8List.sublistView(bytes, 1);
_setupDecoder(spsPps);
} else if (type == 0x02) {
// frame packet: [0x02][8-byte ts][Annex B NALU]
if (bytes.length < 10) return;
final bd = ByteData.sublistView(bytes, 1, 9);
final ts = bd.getInt64(0, Endian.big);
final latency = DateTime.now().millisecondsSinceEpoch - ts;
final nal = Uint8List.sublistView(bytes, 9);
// drop frames that are already stale — prevents burst playback after a lag spike
if (latency > 300) return;
if (!_decoderReady) {
_pendingNals.add((nal, latency));
} else {
_feedNal(nal, latency);
}
}
}
Future<void> _setupDecoder(Uint8List spsPps) async {
// a second 0x01 packet can arrive while createDecoder is still awaiting;
// ignore it — the first decoder will be ready soon enough
if (_settingUpDecoder) return;
_settingUpDecoder = true;
_decoderReady = false;
_pendingNals.clear();
if (_textureId != null) {
await _videoCh.invokeMethod('stopDecoder', {'textureId': _textureId});
_textureId = null;
}
try {
final res = await _videoCh.invokeMethod<Map>('createDecoder', {'spsPps': spsPps});
final id = (res?['textureId'] as num?)?.toInt();
if (!mounted) return;
setState(() {
_textureId = id;
if (!_streaming) _streaming = true;
});
_decoderReady = true;
// drain frames that queued up while createDecoder was in-flight
final pending = List.of(_pendingNals);
_pendingNals.clear();
for (final (nal, latency) in pending) {
_feedNal(nal, latency);
}
} catch (_) {
_decoderReady = false;
} finally {
_settingUpDecoder = false;
}
}
Future<void> _feedNal(Uint8List nal, int latency) async {
if (_textureId == null) return;
try {
final t0 = DateTime.now();
await _videoCh.invokeMethod('feedNal', {
'textureId': _textureId,
'nal': nal,
});
final elapsed = DateTime.now().difference(t0).inMicroseconds / 1000.0;
if (!mounted) return;
_decodeMsSum += elapsed;
_decodeMsCount++;
_frameCount++;
setState(() {
_statLatencyMs = latency.clamp(0, 9999);
});
} catch (_) {}
}
Future<void> _handleSignalData(Map<String, dynamic> data, String roomId) async {
final sigType = data['type'] as String?;
@@ -169,7 +306,6 @@ class _ViewerScreenState extends State<ViewerScreen> {
'roomId': roomId,
'data': {'type': 'answer', 'sdp': answer.sdp},
});
} else if (sigType == 'candidate') {
final raw = data['candidate'] as Map<String, dynamic>;
final candidate = RTCIceCandidate(
@@ -188,7 +324,6 @@ class _ViewerScreenState extends State<ViewerScreen> {
} catch (_) {}
}
// get video widget size via the global key
void _updateVideoSize() {
final ctx = _videoKey.currentContext;
if (ctx != null) {
@@ -206,8 +341,6 @@ class _ViewerScreenState extends State<ViewerScreen> {
);
}
// ---- input capture
void _onPointerMove(PointerMoveEvent e) {
final n = _normalise(e.localPosition);
_sendInput({'type': 'move', 'x': n.dx, 'y': n.dy});
@@ -215,7 +348,6 @@ class _ViewerScreenState extends State<ViewerScreen> {
void _onPointerDown(PointerDownEvent e) {
final n = _normalise(e.localPosition);
// button 0 = left, 1 = right (PointerDeviceKind bit not directly available)
_sendInput({'type': 'click', 'x': n.dx, 'y': n.dy, 'button': 0});
}
@@ -234,6 +366,14 @@ class _ViewerScreenState extends State<ViewerScreen> {
title: const Text('Pulsar — Viewer'),
backgroundColor: Theme.of(context).colorScheme.inversePrimary,
),
floatingActionButton: _streaming
? FloatingActionButton(
mini: true,
tooltip: 'Stats',
onPressed: () => setState(() => _showStats = !_showStats),
child: const Icon(Icons.bar_chart),
)
: null,
body: _streaming ? _buildStream() : _buildJoinView(),
);
}
@@ -296,20 +436,66 @@ class _ViewerScreenState extends State<ViewerScreen> {
}
Widget _buildStream() {
return Focus(
autofocus: true,
onKeyEvent: (_, e) {
_onKeyEvent(e);
return KeyEventResult.handled;
},
child: Listener(
onPointerMove: _onPointerMove,
onPointerDown: _onPointerDown,
child: RTCVideoView(
_remoteRenderer,
key: _videoKey,
objectFit: RTCVideoViewObjectFit.RTCVideoViewObjectFitContain,
return Stack(
children: [
Focus(
autofocus: true,
onKeyEvent: (_, e) {
_onKeyEvent(e);
return KeyEventResult.handled;
},
child: Listener(
onPointerMove: _onPointerMove,
onPointerDown: _onPointerDown,
child: _textureId != null
? SizedBox.expand(
key: _videoKey,
child: Texture(textureId: _textureId!),
)
: const Center(child: CircularProgressIndicator()),
),
),
if (_showStats)
Positioned(
top: 12,
left: 12,
child: _StatsOverlay(lines: [
'FPS: $_statFps',
'Bitrate: ${_statBitrateMbps.toStringAsFixed(2)} Mbps',
'Dropped: $_statDropped/s',
'Res: $_statResolution',
'Latency: ${_statLatencyMs}ms',
'Submit: ${_statDecodeMs.toStringAsFixed(1)}ms',
]),
),
],
);
}
}
class _StatsOverlay extends StatelessWidget {
final List<String> lines;
const _StatsOverlay({required this.lines});
@override
Widget build(BuildContext context) {
return Container(
padding: const EdgeInsets.symmetric(horizontal: 10, vertical: 8),
decoration: BoxDecoration(
color: Colors.black.withValues(alpha: 0.6),
borderRadius: BorderRadius.circular(6),
),
child: Column(
crossAxisAlignment: CrossAxisAlignment.start,
children: lines.map((l) => Text(
l,
style: const TextStyle(
color: Colors.white,
fontFamily: 'monospace',
fontSize: 12,
),
)).toList(),
),
);
}
+2 -2
View File
@@ -20,10 +20,10 @@ class SignallingService {
} catch (_) {}
},
onError: (err) {
_controller.addError(err);
if (!_controller.isClosed) _controller.addError(err);
},
onDone: () {
_controller.addError(Exception('WebSocket connection closed'));
if (!_controller.isClosed) _controller.addError(Exception('WebSocket connection closed'));
},
);
}