import 'dart:async'; import 'dart:io'; import 'package:waylume_server/services/vpn_session_service.dart'; class Connection { final String protocol; final String localIP; final int localPort; final String remoteIP; final int remotePort; Connection({ required this.protocol, required this.localIP, required this.localPort, required this.remoteIP, required this.remotePort, }); String get id => '$protocol:$localIP:$localPort->$remoteIP:$remotePort'; @override String toString() { return '$protocol $localIP:$localPort -> $remoteIP:$remotePort'; } } class ProtocolBlockingService { static Timer? _monitorTimer; static final Set _processedConnections = {}; static Set _activePeerIPs = {}; static void initialize() { print('Initializing Protocol Blocking Service...'); _startConnectionMonitoring(); } static void _startConnectionMonitoring() { print('๐Ÿ“ก Starting connection monitoring timer (100ms intervals)...'); // Update peer IPs every 30 seconds Timer.periodic(Duration(seconds: 30), (_) async { await _updateActivePeerIPs(); }); // Initial peer IP update _updateActivePeerIPs(); _monitorTimer = Timer.periodic(Duration(milliseconds: 100), (_) async { await _scanForNewConnections(); }); } static Future _updateActivePeerIPs() async { try { final peers = await VpnSessionService.getAllLocalPeers(); final newPeerIPs = peers.map((peer) => peer['ip_address'] as String).toSet(); if (newPeerIPs.length != _activePeerIPs.length || !_activePeerIPs.containsAll(newPeerIPs)) { _activePeerIPs = newPeerIPs; print('๐Ÿ”„ Updated active peer IPs: $_activePeerIPs'); } } catch (e) { print('โŒ Error updating peer IPs: $e'); } } static int _scanCount = 0; static Future _scanForNewConnections() async { _scanCount++; if (_activePeerIPs.isEmpty) { return; // No peers to monitor } try { // Monitor peer traffic directly using tcpdump on wg0 interface await _monitorPeerTraffic(); if (_scanCount % 100 == 0) { print('๐Ÿ” Monitoring ${_activePeerIPs.length} active peers: $_activePeerIPs'); // Test if we can see ANY traffic from peers if (_scanCount == 100) { await _testPeerConnectivity(); } } } catch (e) { print('โŒ Error monitoring peer traffic: $e'); } } static Future _monitorPeerTraffic() async { // Capture ANY packet on wg0 interface and print it try { final process = await Process.start('timeout', [ '0.1', // Very short timeout 'tcpdump', '-i', 'wg0', '-c', '1', '-v', // Verbose ]); final output = []; await for (final data in process.stdout) { output.add(String.fromCharCodes(data)); } final exitCode = await process.exitCode; process.kill(); if (exitCode == 0 && output.isNotEmpty) { final packetData = output.join().trim(); print('๐Ÿ“ฆ PACKET DETECTED: $packetData'); // Check if it's from our monitored peers for (final peerIP in _activePeerIPs) { if (packetData.contains(peerIP)) { print('๐ŸŽฏ PEER TRAFFIC FROM $peerIP: $packetData'); await _analyzeNewPacket(packetData, peerIP); break; } } } } catch (e) { // Ignore timeout errors - normal when no packets if (!e.toString().contains('timeout') && !e.toString().contains('No such device')) { print('โŒ Error monitoring traffic: $e'); } } } static Future _analyzeNewPacket(String packetData, String peerIP) async { if (_processedConnections.contains(packetData.hashCode.toString())) { return; // Already processed this packet pattern } _processedConnections.add(packetData.hashCode.toString()); print('๐Ÿ” New peer traffic detected from $peerIP'); print('๐Ÿ“ก Capturing detailed handshake...'); // Now capture the full handshake with more detail await _captureDetailedHandshake(peerIP, packetData); } static Future _captureDetailedHandshake(String peerIP, String initialPacket) async { try { final process = await Process.start('timeout', [ '2', 'tcpdump', '-i', 'wg0', '-c', '3', // Capture a few packets to get handshake '-s', '200', '-x', 'src $peerIP', ]); final handshakeData = []; await for (final data in process.stdout) { handshakeData.add(String.fromCharCodes(data)); } final exitCode = await process.exitCode; process.kill(); if (exitCode == 0 && handshakeData.isNotEmpty) { final data = handshakeData.join(); print('โœ… Handshake captured for peer $peerIP'); _analyzeHandshake(data, Connection( protocol: 'unknown', localIP: peerIP, localPort: 0, remoteIP: 'unknown', remotePort: 0, )); } else { print('โฑ๏ธ No additional handshake data captured for peer $peerIP'); } } catch (e) { print('โŒ Error capturing detailed handshake: $e'); } } static Future _testPeerConnectivity() async { print('๐Ÿงช Testing peer connectivity on wg0 interface...'); for (final peerIP in _activePeerIPs) { try { // Check if wg0 interface exists and peer can be seen final wgResult = await Process.run('ip', ['addr', 'show', 'wg0']); if (wgResult.exitCode == 0) { print('โœ… wg0 interface exists'); } else { print('โŒ wg0 interface not found'); return; } print('๐Ÿ” Testing live traffic capture for peer $peerIP...'); // Test very basic traffic capture from peer - longer timeout final process = await Process.start('timeout', [ '10', // Longer timeout to catch browsing traffic 'tcpdump', '-i', 'wg0', '-c', '5', // Capture multiple packets '-v', // Verbose output 'host $peerIP', ]); final output = []; await for (final data in process.stdout) { output.add(String.fromCharCodes(data)); } final exitCode = await process.exitCode; process.kill(); if (exitCode == 0 && output.isNotEmpty) { print('โœ… LIVE TRAFFIC DETECTED from peer $peerIP:'); print('${output.join().trim()}'); print('โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€'); } else { print('โŒ No traffic detected from peer $peerIP in 10 seconds'); print(' Try browsing a website now...'); // Also test ANY traffic on wg0 print('๐Ÿ” Testing if wg0 interface has ANY traffic...'); final anyTrafficProcess = await Process.start('timeout', [ '5', 'tcpdump', '-i', 'wg0', '-c', '3', ]); final anyOutput = []; await for (final data in anyTrafficProcess.stdout) { anyOutput.add(String.fromCharCodes(data)); } final anyExitCode = await anyTrafficProcess.exitCode; anyTrafficProcess.kill(); if (anyExitCode == 0 && anyOutput.isNotEmpty) { print('โœ… wg0 has traffic, but not from expected peer IP:'); print('${anyOutput.join().trim()}'); } else { print('โŒ No traffic at all on wg0 interface'); } } } catch (e) { print('โŒ Error testing peer $peerIP connectivity: $e'); } } } static List _parseConnections(String output, String protocol) { final connections = []; final lines = output.split('\n'); for (final line in lines) { try { final conn = _parseConnectionLine(line.trim(), protocol); if (conn != null) { connections.add(conn); } } catch (e) { // Skip malformed lines } } return connections; } static Connection? _parseConnectionLine(String line, String protocol) { if (line.isEmpty || line.startsWith('Netid') || line.startsWith('State')) { return null; } // Split by whitespace and filter empty strings final parts = line.split(RegExp(r'\s+')).where((s) => s.isNotEmpty).toList(); if (parts.length < 4) { return null; } try { // For TCP: State Recv-Q Send-Q Local_Address:Port Peer_Address:Port // For UDP: State Recv-Q Send-Q Local_Address:Port Peer_Address:Port final localAddr = parts[3]; final remoteAddr = parts.length > 4 ? parts[4] : ''; if (remoteAddr.isEmpty || remoteAddr == '*:*') { return null; // Skip listening sockets } final localParts = localAddr.split(':'); final remoteParts = remoteAddr.split(':'); if (localParts.length < 2 || remoteParts.length < 2) { return null; } final localIP = localParts.sublist(0, localParts.length - 1).join(':'); final localPort = int.parse(localParts.last); final remoteIP = remoteParts.sublist(0, remoteParts.length - 1).join(':'); final remotePort = int.parse(remoteParts.last); return Connection( protocol: protocol, localIP: localIP, localPort: localPort, remoteIP: remoteIP, remotePort: remotePort, ); } catch (e) { return null; } } static bool _isNewConnection(Connection conn) { final id = conn.id; if (_processedConnections.contains(id)) { return false; } _processedConnections.add(id); // Clean up old connections periodically (keep last 1000) if (_processedConnections.length > 1000) { final toRemove = _processedConnections.length - 800; final oldConnections = _processedConnections.take(toRemove).toList(); for (final old in oldConnections) { _processedConnections.remove(old); } } return true; } static bool _isPeerConnection(Connection conn) { // Check if the connection is FROM a VPN peer (local IP is peer IP) return _activePeerIPs.contains(conn.localIP); } static Future _handleNewConnection(Connection conn) async { print('๐Ÿ” New connection detected: $conn'); try { // Attempt to capture handshake data await _captureHandshake(conn); } catch (e) { print('โš ๏ธ Error capturing handshake for $conn: $e'); } } static Future _captureHandshake(Connection conn) async { try { print('๐Ÿ“ก Capturing handshake for: $conn'); // Use tcpdump to capture packets on WireGuard interface specifically final process = await Process.start('timeout', [ '2', // 2 second timeout 'tcpdump', '-i', 'wg0', // Monitor WireGuard interface specifically '-c', '1', // Capture only 1 packet '-s', '200', // Capture first 200 bytes only '-x', // Output in hex 'src ${conn.localIP} and dst ${conn.remoteIP}', ]); final handshakeData = []; await for (final data in process.stdout) { handshakeData.add(String.fromCharCodes(data)); } final exitCode = await process.exitCode; if (exitCode == 0 && handshakeData.isNotEmpty) { final data = handshakeData.join(); print('โœ… Handshake captured for $conn:'); print(' Data (first 200 chars): ${data.length > 200 ? data.substring(0, 200) + "..." : data}'); // Analyze the handshake _analyzeHandshake(data, conn); } else if (exitCode == 124) { print('โฑ๏ธ Handshake capture timeout for $conn (no packets in 2s)'); } else { print('โŒ Failed to capture handshake for $conn (exit code: $exitCode)'); } process.kill(); } catch (e) { print('๐Ÿ’ฅ Error in handshake capture: $e'); } } static Future _analyzeHandshake(String handshakeData, Connection conn) async { print('โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ• nDPI PROTOCOL ANALYSIS โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•'); print('๐Ÿ“ Connection: $conn'); // Extract hex bytes from tcpdump output final hexBytes = _extractHexBytes(handshakeData); if (hexBytes.isEmpty) { print('โŒ No hex data found in tcpdump output'); print('โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•'); return; } // Convert hex bytes to single hex string for C analyzer final hexString = hexBytes.join(''); print('๐Ÿ”ข Analyzing ${hexBytes.length} bytes of packet data'); try { // First check if the protocol analyzer exists final analyzerCheck = await Process.run('ls', ['-la', './protocol_analyzer']); if (analyzerCheck.exitCode != 0) { print('โš ๏ธ nDPI protocol analyzer not available - using basic pattern matching'); await _basicProtocolAnalysis(hexBytes, conn); return; } // Call our C nDPI analyzer final result = await Process.run('./protocol_analyzer', [hexString]); if (result.exitCode == 0) { print('โœ… nDPI Analysis Results:'); print(result.stdout.toString().trim()); // Parse JSON output to extract protocol info try { final jsonStr = result.stdout.toString().trim(); // Simple protocol extraction - look for protocol field final protocolMatch = RegExp(r'"protocol":\s*"([^"]+)"').firstMatch(jsonStr); final categoryMatch = RegExp(r'"category":\s*"([^"]+)"').firstMatch(jsonStr); if (protocolMatch != null) { final protocol = protocolMatch.group(1) ?? 'Unknown'; final category = categoryMatch?.group(1) ?? 'Unknown'; print('๐ŸŽฏ DETECTED: $protocol (Category: $category)'); // Check if this is a protocol we want to block if (_shouldBlockProtocol(protocol, category)) { print('๐Ÿšซ BLOCKING PROTOCOL: $protocol'); // TODO: Implement blocking logic here } else { print('โœ… ALLOWING PROTOCOL: $protocol'); } } } catch (e) { print('โš ๏ธ Error parsing nDPI results: $e'); } } else { print('โŒ nDPI analyzer failed:'); print(' Exit code: ${result.exitCode}'); print(' Error: ${result.stderr}'); } } catch (e) { print('โŒ Error running nDPI analyzer: $e'); } print('โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•'); } static bool _shouldBlockProtocol(String protocol, String category) { // Define protocols/categories to block final blockedProtocols = { 'BitTorrent', 'uTorrent', 'Transmission', 'qBittorrent', 'eMule', 'KaZaA', 'Gnutella', 'DirectConnect', 'Skype_Call', // Block Skype calls but allow chat }; final blockedCategories = { 'Download', 'P2P', 'FileSharing' }; return blockedProtocols.contains(protocol) || blockedCategories.contains(category); } static Future _basicProtocolAnalysis(List hexBytes, Connection conn) async { print('๐Ÿ” Using basic pattern matching fallback'); // Convert hex to ASCII for pattern matching final asciiData = _extractAsciiFromHex(hexBytes); final hexString = hexBytes.join('').toLowerCase(); String? detectedProtocol; // BitTorrent detection if (asciiData.contains('BitTorrent protocol') || hexString.contains('13426974546f7272656e742070726f746f636f6c')) { detectedProtocol = 'BitTorrent'; } // SSH detection else if (asciiData.contains('SSH-2.0') || asciiData.contains('SSH-1.')) { detectedProtocol = 'SSH'; } // HTTP detection else if (asciiData.toLowerCase().contains('get ') || asciiData.toLowerCase().contains('post ') || asciiData.toLowerCase().contains('http/')) { detectedProtocol = 'HTTP'; } // TLS/SSL detection (0x16 = handshake record type) else if (hexBytes.isNotEmpty && hexBytes.first.toLowerCase() == '16') { detectedProtocol = 'TLS/SSL'; } // SMTP detection else if (asciiData.contains('220 ') && conn.remotePort == 25) { detectedProtocol = 'SMTP'; } if (detectedProtocol != null) { print('๐ŸŽฏ BASIC DETECTION: $detectedProtocol'); // Simple blocking logic for basic patterns if (['BitTorrent', 'eMule'].contains(detectedProtocol)) { print('๐Ÿšซ BLOCKING PROTOCOL: $detectedProtocol'); } else { print('โœ… ALLOWING PROTOCOL: $detectedProtocol'); } } else { print('โ“ UNKNOWN PROTOCOL (basic analysis)'); print('๐Ÿ”ค ASCII sample: ${asciiData.replaceAll(RegExp(r'[^\x20-\x7E]'), '.').substring(0, 50)}...'); } } static List _extractHexBytes(String tcpdumpOutput) { final hexPattern = RegExp(r'0x[0-9a-f]+:\s*([0-9a-f\s]+)', caseSensitive: false); final matches = hexPattern.allMatches(tcpdumpOutput); final hexBytes = []; for (final match in matches) { final hexLine = match.group(1)?.replaceAll(' ', '') ?? ''; for (int i = 0; i < hexLine.length; i += 2) { if (i + 1 < hexLine.length) { hexBytes.add(hexLine.substring(i, i + 2)); } } } return hexBytes; } static String _extractAsciiFromHex(List hexBytes) { return hexBytes .map((hex) => int.tryParse(hex, radix: 16) ?? 0) .map((byte) => (byte >= 32 && byte <= 126) ? String.fromCharCode(byte) : '.') .join(''); } static void dispose() { _monitorTimer?.cancel(); _monitorTimer = null; _processedConnections.clear(); print('Protocol Blocking Service disposed'); } }