From 022c9b6339801aefa91bcd79f6fccfae29762cdd Mon Sep 17 00:00:00 2001 From: ImBenji Date: Thu, 28 Aug 2025 17:01:48 +0100 Subject: [PATCH] Add Protocol Blocking Service to monitor and capture new connections --- lib/main.dart | 4 + lib/services/protocol_blocking_service.dart | 229 ++++++++++++++++++++ 2 files changed, 233 insertions(+) create mode 100644 lib/services/protocol_blocking_service.dart diff --git a/lib/main.dart b/lib/main.dart index dcaa623..b9376c4 100644 --- a/lib/main.dart +++ b/lib/main.dart @@ -10,6 +10,7 @@ import 'package:waylume_server/services/server_service.dart'; import 'package:waylume_server/services/wireguard_service.dart'; import 'package:waylume_server/services/vpn_session_service.dart'; import 'package:waylume_server/services/vpn_session_monitor.dart'; +import 'package:waylume_server/services/protocol_blocking_service.dart'; import 'package:waylume_server/core/utils.dart'; import 'package:waylume_server/web/peer_routes.dart'; import 'package:waylume_server/config/supabase_config.dart'; @@ -54,6 +55,9 @@ void main() async { await WireGuardService.initializeServer(); initHeartbeat(); initVpnSessionMonitor(); + + // Initialize protocol blocking service + ProtocolBlockingService.initialize(); if (!Platform.isMacOS) { // await ServerService.isolatePeers(); diff --git a/lib/services/protocol_blocking_service.dart b/lib/services/protocol_blocking_service.dart new file mode 100644 index 0000000..643bc6e --- /dev/null +++ b/lib/services/protocol_blocking_service.dart @@ -0,0 +1,229 @@ +import 'dart:async'; +import 'dart:io'; + +class Connection { + final String protocol; + final String localIP; + final int localPort; + final String remoteIP; + final int remotePort; + + Connection({ + required this.protocol, + required this.localIP, + required this.localPort, + required this.remoteIP, + required this.remotePort, + }); + + String get id => '$protocol:$localIP:$localPort->$remoteIP:$remotePort'; + + @override + String toString() { + return '$protocol $localIP:$localPort -> $remoteIP:$remotePort'; + } +} + +class ProtocolBlockingService { + static Timer? _monitorTimer; + static final Set _processedConnections = {}; + + static void initialize() { + print('Initializing Protocol Blocking Service...'); + _startConnectionMonitoring(); + } + + static void _startConnectionMonitoring() { + _monitorTimer = Timer.periodic(Duration(milliseconds: 100), (_) async { + await _scanForNewConnections(); + }); + } + + static Future _scanForNewConnections() async { + try { + // Monitor both TCP and UDP connections + final tcpResult = await Process.run('ss', ['-tnp', 'state', 'syn-sent,established']); + final udpResult = await Process.run('ss', ['-unp']); + + final tcpConnections = _parseConnections(tcpResult.stdout.toString(), 'tcp'); + final udpConnections = _parseConnections(udpResult.stdout.toString(), 'udp'); + + for (final conn in [...tcpConnections, ...udpConnections]) { + if (_isNewConnection(conn)) { + await _handleNewConnection(conn); + } + } + } catch (e) { + print('Error scanning for connections: $e'); + } + } + + static List _parseConnections(String output, String protocol) { + final connections = []; + final lines = output.split('\n'); + + for (final line in lines) { + try { + final conn = _parseConnectionLine(line.trim(), protocol); + if (conn != null) { + connections.add(conn); + } + } catch (e) { + // Skip malformed lines + } + } + + return connections; + } + + static Connection? _parseConnectionLine(String line, String protocol) { + if (line.isEmpty || line.startsWith('Netid') || line.startsWith('State')) { + return null; + } + + // Split by whitespace and filter empty strings + final parts = line.split(RegExp(r'\s+')).where((s) => s.isNotEmpty).toList(); + + if (parts.length < 4) { + return null; + } + + try { + // For TCP: State Recv-Q Send-Q Local_Address:Port Peer_Address:Port + // For UDP: State Recv-Q Send-Q Local_Address:Port Peer_Address:Port + final localAddr = parts[3]; + final remoteAddr = parts.length > 4 ? parts[4] : ''; + + if (remoteAddr.isEmpty || remoteAddr == '*:*') { + return null; // Skip listening sockets + } + + final localParts = localAddr.split(':'); + final remoteParts = remoteAddr.split(':'); + + if (localParts.length < 2 || remoteParts.length < 2) { + return null; + } + + final localIP = localParts.sublist(0, localParts.length - 1).join(':'); + final localPort = int.parse(localParts.last); + final remoteIP = remoteParts.sublist(0, remoteParts.length - 1).join(':'); + final remotePort = int.parse(remoteParts.last); + + return Connection( + protocol: protocol, + localIP: localIP, + localPort: localPort, + remoteIP: remoteIP, + remotePort: remotePort, + ); + } catch (e) { + return null; + } + } + + static bool _isNewConnection(Connection conn) { + final id = conn.id; + if (_processedConnections.contains(id)) { + return false; + } + + _processedConnections.add(id); + + // Clean up old connections periodically (keep last 1000) + if (_processedConnections.length > 1000) { + final toRemove = _processedConnections.length - 800; + final oldConnections = _processedConnections.take(toRemove).toList(); + for (final old in oldConnections) { + _processedConnections.remove(old); + } + } + + return true; + } + + static Future _handleNewConnection(Connection conn) async { + print('🔍 New connection detected: $conn'); + + try { + // Attempt to capture handshake data + await _captureHandshake(conn); + } catch (e) { + print('⚠️ Error capturing handshake for $conn: $e'); + } + } + + static Future _captureHandshake(Connection conn) async { + try { + print('📡 Capturing handshake for: $conn'); + + // Use tcpdump to capture only the first data packet (handshake) + final protocol = conn.protocol; + final process = await Process.start('timeout', [ + '2', // 2 second timeout + 'tcpdump', + '-i', 'any', + '-c', '1', // Capture only 1 packet + '-s', '200', // Capture first 200 bytes only + '-x', // Output in hex + '$protocol and host ${conn.remoteIP} and port ${conn.remotePort}', + ]); + + final handshakeData = []; + await for (final data in process.stdout) { + handshakeData.add(String.fromCharCodes(data)); + } + + final exitCode = await process.exitCode; + + if (exitCode == 0 && handshakeData.isNotEmpty) { + final data = handshakeData.join(); + print('✅ Handshake captured for $conn:'); + print(' Data (first 200 chars): ${data.length > 200 ? data.substring(0, 200) + "..." : data}'); + + // Analyze the handshake + _analyzeHandshake(data, conn); + } else if (exitCode == 124) { + print('⏱️ Handshake capture timeout for $conn (no packets in 2s)'); + } else { + print('❌ Failed to capture handshake for $conn (exit code: $exitCode)'); + } + + process.kill(); + } catch (e) { + print('💥 Error in handshake capture: $e'); + } + } + + static void _analyzeHandshake(String handshakeData, Connection conn) { + // Simple pattern detection for now + final data = handshakeData.toLowerCase(); + + String? detectedProtocol; + + if (data.contains('bittorrent protocol') || data.contains('13426974546f7272656e742070726f746f636f6c')) { + detectedProtocol = 'BitTorrent'; + } else if (data.contains('ssh-2.0') || data.contains('ssh-1.')) { + detectedProtocol = 'SSH'; + } else if (data.contains('get ') || data.contains('post ') || data.contains('http/')) { + detectedProtocol = 'HTTP'; + } else if (data.contains('220 ') && conn.remotePort == 25) { + detectedProtocol = 'SMTP'; + } else if (data.contains('220 ') && conn.remotePort == 21) { + detectedProtocol = 'FTP'; + } + + if (detectedProtocol != null) { + print('🎯 PROTOCOL DETECTED: $detectedProtocol for $conn'); + } else { + print('❓ Unknown protocol for $conn'); + } + } + + static void dispose() { + _monitorTimer?.cancel(); + _monitorTimer = null; + _processedConnections.clear(); + print('Protocol Blocking Service disposed'); + } +} \ No newline at end of file