Files
waylume_server/lib/services/protocol_blocking_service.dart

612 lines
20 KiB
Dart

import 'dart:async';
import 'dart:io';
import 'package:waylume_server/services/vpn_session_service.dart';
class Connection {
final String protocol;
final String localIP;
final int localPort;
final String remoteIP;
final int remotePort;
Connection({
required this.protocol,
required this.localIP,
required this.localPort,
required this.remoteIP,
required this.remotePort,
});
String get id => '$protocol:$localIP:$localPort->$remoteIP:$remotePort';
@override
String toString() {
return '$protocol $localIP:$localPort -> $remoteIP:$remotePort';
}
}
class ProtocolBlockingService {
static Timer? _monitorTimer;
static final Set<String> _processedConnections = {};
static Set<String> _activePeerIPs = {};
static void initialize() {
print('Initializing Protocol Blocking Service...');
_startConnectionMonitoring();
}
static void _startConnectionMonitoring() {
print('📡 Starting connection monitoring timer (100ms intervals)...');
// Update peer IPs every 30 seconds
Timer.periodic(Duration(seconds: 30), (_) async {
await _updateActivePeerIPs();
});
// Initial peer IP update
_updateActivePeerIPs();
_monitorTimer = Timer.periodic(Duration(milliseconds: 100), (_) async {
await _scanForNewConnections();
});
}
static Future<void> _updateActivePeerIPs() async {
try {
final peers = await VpnSessionService.getAllLocalPeers();
final newPeerIPs = peers.map((peer) => peer['ip_address'] as String).toSet();
if (newPeerIPs.length != _activePeerIPs.length || !_activePeerIPs.containsAll(newPeerIPs)) {
_activePeerIPs = newPeerIPs;
print('🔄 Updated active peer IPs: $_activePeerIPs');
}
} catch (e) {
print('❌ Error updating peer IPs: $e');
}
}
static int _scanCount = 0;
static Future<void> _scanForNewConnections() async {
_scanCount++;
if (_activePeerIPs.isEmpty) {
return; // No peers to monitor
}
try {
// Monitor peer traffic directly using tcpdump on ALL interfaces
await _monitorPeerTraffic();
if (_scanCount % 100 == 0) {
print('🔍 Monitoring ${_activePeerIPs.length} active peers: $_activePeerIPs');
// Test if we can see ANY traffic from peers
if (_scanCount == 100) {
await _testPeerConnectivity();
}
if (_scanCount == 200) {
await _debugNetworking();
}
}
} catch (e) {
print('❌ Error monitoring peer traffic: $e');
}
}
static Future<void> _monitorPeerTraffic() async {
// Capture ANY packet on ALL interfaces and print it
try {
final process = await Process.start('timeout', [
'0.1', // Very short timeout
'tcpdump',
'-i', 'any', // Monitor ALL interfaces
'-c', '1',
'-v', // Verbose
]);
final output = <String>[];
await for (final data in process.stdout) {
output.add(String.fromCharCodes(data));
}
final exitCode = await process.exitCode;
process.kill();
if (exitCode == 0 && output.isNotEmpty) {
final packetData = output.join().trim();
print('📦 PACKET DETECTED ON ANY INTERFACE: $packetData');
// Check if it's from our monitored peers
for (final peerIP in _activePeerIPs) {
if (packetData.contains(peerIP)) {
print('🎯 PEER TRAFFIC FROM $peerIP: $packetData');
await _analyzeNewPacket(packetData, peerIP);
break;
}
}
}
} catch (e) {
// Ignore timeout errors - normal when no packets
if (!e.toString().contains('timeout') && !e.toString().contains('No such device')) {
print('❌ Error monitoring traffic: $e');
}
}
}
static Future<void> _analyzeNewPacket(String packetData, String peerIP) async {
if (_processedConnections.contains(packetData.hashCode.toString())) {
return; // Already processed this packet pattern
}
_processedConnections.add(packetData.hashCode.toString());
print('🔍 New peer traffic detected from $peerIP');
print('📡 Capturing detailed handshake...');
// Now capture the full handshake with more detail
await _captureDetailedHandshake(peerIP, packetData);
}
static Future<void> _captureDetailedHandshake(String peerIP, String initialPacket) async {
try {
final process = await Process.start('timeout', [
'2',
'tcpdump',
'-i', 'wg0',
'-c', '3', // Capture a few packets to get handshake
'-s', '200',
'-x',
'src $peerIP',
]);
final handshakeData = <String>[];
await for (final data in process.stdout) {
handshakeData.add(String.fromCharCodes(data));
}
final exitCode = await process.exitCode;
process.kill();
if (exitCode == 0 && handshakeData.isNotEmpty) {
final data = handshakeData.join();
print('✅ Handshake captured for peer $peerIP');
_analyzeHandshake(data, Connection(
protocol: 'unknown',
localIP: peerIP,
localPort: 0,
remoteIP: 'unknown',
remotePort: 0,
));
} else {
print('⏱️ No additional handshake data captured for peer $peerIP');
}
} catch (e) {
print('❌ Error capturing detailed handshake: $e');
}
}
static Future<void> _testPeerConnectivity() async {
print('🧪 Testing peer connectivity on wg0 interface...');
for (final peerIP in _activePeerIPs) {
try {
// Check if wg0 interface exists and peer can be seen
final wgResult = await Process.run('ip', ['addr', 'show', 'wg0']);
if (wgResult.exitCode == 0) {
print('✅ wg0 interface exists');
} else {
print('❌ wg0 interface not found');
return;
}
print('🔍 Testing live traffic capture for peer $peerIP...');
// Test very basic traffic capture from peer - longer timeout
final process = await Process.start('timeout', [
'10', // Longer timeout to catch browsing traffic
'tcpdump',
'-i', 'wg0',
'-c', '5', // Capture multiple packets
'-v', // Verbose output
'host $peerIP',
]);
final output = <String>[];
await for (final data in process.stdout) {
output.add(String.fromCharCodes(data));
}
final exitCode = await process.exitCode;
process.kill();
if (exitCode == 0 && output.isNotEmpty) {
print('✅ LIVE TRAFFIC DETECTED from peer $peerIP:');
print('${output.join().trim()}');
print('───────────────────────────────────────');
} else {
print('❌ No traffic detected from peer $peerIP in 10 seconds');
print(' Try browsing a website now...');
// Also test ANY traffic on wg0
print('🔍 Testing if wg0 interface has ANY traffic...');
final anyTrafficProcess = await Process.start('timeout', [
'5',
'tcpdump',
'-i', 'wg0',
'-c', '3',
]);
final anyOutput = <String>[];
await for (final data in anyTrafficProcess.stdout) {
anyOutput.add(String.fromCharCodes(data));
}
final anyExitCode = await anyTrafficProcess.exitCode;
anyTrafficProcess.kill();
if (anyExitCode == 0 && anyOutput.isNotEmpty) {
print('✅ wg0 has traffic, but not from expected peer IP:');
print('${anyOutput.join().trim()}');
} else {
print('❌ No traffic at all on wg0 interface');
}
}
} catch (e) {
print('❌ Error testing peer $peerIP connectivity: $e');
}
}
}
static Future<void> _debugNetworking() async {
print('🔧 NETWORK DEBUGGING...');
try {
// Check all network interfaces
print('📡 Available network interfaces:');
final ifaceResult = await Process.run('ip', ['link', 'show']);
if (ifaceResult.exitCode == 0) {
print(ifaceResult.stdout.toString().trim());
}
// Check specifically for wg0
print('\n🔍 WireGuard wg0 interface details:');
final wg0Result = await Process.run('ip', ['addr', 'show', 'wg0']);
if (wg0Result.exitCode == 0) {
print(wg0Result.stdout.toString().trim());
} else {
print('❌ wg0 interface not found: ${wg0Result.stderr}');
}
// Check WireGuard status
print('\n📋 WireGuard peer status:');
final wgResult = await Process.run('wg', ['show']);
if (wgResult.exitCode == 0) {
print(wgResult.stdout.toString().trim());
}
// Try tcpdump on different interfaces
print('\n🕵️ Testing packet capture capabilities:');
// Test Docker internal interface
final dockerTest = await Process.run('timeout', ['2', 'tcpdump', '-i', 'any', '-c', '1']);
if (dockerTest.exitCode == 0 && dockerTest.stdout.toString().isNotEmpty) {
print('✅ Can capture on "any" interface');
print(' Sample: ${dockerTest.stdout.toString().trim()}');
} else {
print('❌ Cannot capture on "any" interface');
print(' Error: ${dockerTest.stderr}');
}
// Test if we need different interface name
final ethTest = await Process.run('timeout', ['2', 'tcpdump', '-i', 'eth0', '-c', '1']);
if (ethTest.exitCode == 0 && ethTest.stdout.toString().isNotEmpty) {
print('✅ Traffic detected on eth0');
print(' Sample: ${ethTest.stdout.toString().trim()}');
}
} catch (e) {
print('❌ Network debugging error: $e');
}
print('🔧 Network debugging complete\n');
}
static List<Connection> _parseConnections(String output, String protocol) {
final connections = <Connection>[];
final lines = output.split('\n');
for (final line in lines) {
try {
final conn = _parseConnectionLine(line.trim(), protocol);
if (conn != null) {
connections.add(conn);
}
} catch (e) {
// Skip malformed lines
}
}
return connections;
}
static Connection? _parseConnectionLine(String line, String protocol) {
if (line.isEmpty || line.startsWith('Netid') || line.startsWith('State')) {
return null;
}
// Split by whitespace and filter empty strings
final parts = line.split(RegExp(r'\s+')).where((s) => s.isNotEmpty).toList();
if (parts.length < 4) {
return null;
}
try {
// For TCP: State Recv-Q Send-Q Local_Address:Port Peer_Address:Port
// For UDP: State Recv-Q Send-Q Local_Address:Port Peer_Address:Port
final localAddr = parts[3];
final remoteAddr = parts.length > 4 ? parts[4] : '';
if (remoteAddr.isEmpty || remoteAddr == '*:*') {
return null; // Skip listening sockets
}
final localParts = localAddr.split(':');
final remoteParts = remoteAddr.split(':');
if (localParts.length < 2 || remoteParts.length < 2) {
return null;
}
final localIP = localParts.sublist(0, localParts.length - 1).join(':');
final localPort = int.parse(localParts.last);
final remoteIP = remoteParts.sublist(0, remoteParts.length - 1).join(':');
final remotePort = int.parse(remoteParts.last);
return Connection(
protocol: protocol,
localIP: localIP,
localPort: localPort,
remoteIP: remoteIP,
remotePort: remotePort,
);
} catch (e) {
return null;
}
}
static bool _isNewConnection(Connection conn) {
final id = conn.id;
if (_processedConnections.contains(id)) {
return false;
}
_processedConnections.add(id);
// Clean up old connections periodically (keep last 1000)
if (_processedConnections.length > 1000) {
final toRemove = _processedConnections.length - 800;
final oldConnections = _processedConnections.take(toRemove).toList();
for (final old in oldConnections) {
_processedConnections.remove(old);
}
}
return true;
}
static bool _isPeerConnection(Connection conn) {
// Check if the connection is FROM a VPN peer (local IP is peer IP)
return _activePeerIPs.contains(conn.localIP);
}
static Future<void> _handleNewConnection(Connection conn) async {
print('🔍 New connection detected: $conn');
try {
// Attempt to capture handshake data
await _captureHandshake(conn);
} catch (e) {
print('⚠️ Error capturing handshake for $conn: $e');
}
}
static Future<void> _captureHandshake(Connection conn) async {
try {
print('📡 Capturing handshake for: $conn');
// Use tcpdump to capture packets on WireGuard interface specifically
final process = await Process.start('timeout', [
'2', // 2 second timeout
'tcpdump',
'-i', 'wg0', // Monitor WireGuard interface specifically
'-c', '1', // Capture only 1 packet
'-s', '200', // Capture first 200 bytes only
'-x', // Output in hex
'src ${conn.localIP} and dst ${conn.remoteIP}',
]);
final handshakeData = <String>[];
await for (final data in process.stdout) {
handshakeData.add(String.fromCharCodes(data));
}
final exitCode = await process.exitCode;
if (exitCode == 0 && handshakeData.isNotEmpty) {
final data = handshakeData.join();
print('✅ Handshake captured for $conn:');
print(' Data (first 200 chars): ${data.length > 200 ? data.substring(0, 200) + "..." : data}');
// Analyze the handshake
_analyzeHandshake(data, conn);
} else if (exitCode == 124) {
print('⏱️ Handshake capture timeout for $conn (no packets in 2s)');
} else {
print('❌ Failed to capture handshake for $conn (exit code: $exitCode)');
}
process.kill();
} catch (e) {
print('💥 Error in handshake capture: $e');
}
}
static Future<void> _analyzeHandshake(String handshakeData, Connection conn) async {
print('════════════════ nDPI PROTOCOL ANALYSIS ════════════════');
print('📍 Connection: $conn');
// Extract hex bytes from tcpdump output
final hexBytes = _extractHexBytes(handshakeData);
if (hexBytes.isEmpty) {
print('❌ No hex data found in tcpdump output');
print('══════════════════════════════════════════════════════════════');
return;
}
// Convert hex bytes to single hex string for C analyzer
final hexString = hexBytes.join('');
print('🔢 Analyzing ${hexBytes.length} bytes of packet data');
try {
// First check if the protocol analyzer exists
final analyzerCheck = await Process.run('ls', ['-la', './protocol_analyzer']);
if (analyzerCheck.exitCode != 0) {
print('⚠️ nDPI protocol analyzer not available - using basic pattern matching');
await _basicProtocolAnalysis(hexBytes, conn);
return;
}
// Call our C nDPI analyzer
final result = await Process.run('./protocol_analyzer', [hexString]);
if (result.exitCode == 0) {
print('✅ nDPI Analysis Results:');
print(result.stdout.toString().trim());
// Parse JSON output to extract protocol info
try {
final jsonStr = result.stdout.toString().trim();
// Simple protocol extraction - look for protocol field
final protocolMatch = RegExp(r'"protocol":\s*"([^"]+)"').firstMatch(jsonStr);
final categoryMatch = RegExp(r'"category":\s*"([^"]+)"').firstMatch(jsonStr);
if (protocolMatch != null) {
final protocol = protocolMatch.group(1) ?? 'Unknown';
final category = categoryMatch?.group(1) ?? 'Unknown';
print('🎯 DETECTED: $protocol (Category: $category)');
// Check if this is a protocol we want to block
if (_shouldBlockProtocol(protocol, category)) {
print('🚫 BLOCKING PROTOCOL: $protocol');
// TODO: Implement blocking logic here
} else {
print('✅ ALLOWING PROTOCOL: $protocol');
}
}
} catch (e) {
print('⚠️ Error parsing nDPI results: $e');
}
} else {
print('❌ nDPI analyzer failed:');
print(' Exit code: ${result.exitCode}');
print(' Error: ${result.stderr}');
}
} catch (e) {
print('❌ Error running nDPI analyzer: $e');
}
print('══════════════════════════════════════════════════════════════');
}
static bool _shouldBlockProtocol(String protocol, String category) {
// Define protocols/categories to block
final blockedProtocols = {
'BitTorrent', 'uTorrent', 'Transmission', 'qBittorrent',
'eMule', 'KaZaA', 'Gnutella', 'DirectConnect',
'Skype_Call', // Block Skype calls but allow chat
};
final blockedCategories = {
'Download', 'P2P', 'FileSharing'
};
return blockedProtocols.contains(protocol) ||
blockedCategories.contains(category);
}
static Future<void> _basicProtocolAnalysis(List<String> hexBytes, Connection conn) async {
print('🔍 Using basic pattern matching fallback');
// Convert hex to ASCII for pattern matching
final asciiData = _extractAsciiFromHex(hexBytes);
final hexString = hexBytes.join('').toLowerCase();
String? detectedProtocol;
// BitTorrent detection
if (asciiData.contains('BitTorrent protocol') || hexString.contains('13426974546f7272656e742070726f746f636f6c')) {
detectedProtocol = 'BitTorrent';
}
// SSH detection
else if (asciiData.contains('SSH-2.0') || asciiData.contains('SSH-1.')) {
detectedProtocol = 'SSH';
}
// HTTP detection
else if (asciiData.toLowerCase().contains('get ') || asciiData.toLowerCase().contains('post ') || asciiData.toLowerCase().contains('http/')) {
detectedProtocol = 'HTTP';
}
// TLS/SSL detection (0x16 = handshake record type)
else if (hexBytes.isNotEmpty && hexBytes.first.toLowerCase() == '16') {
detectedProtocol = 'TLS/SSL';
}
// SMTP detection
else if (asciiData.contains('220 ') && conn.remotePort == 25) {
detectedProtocol = 'SMTP';
}
if (detectedProtocol != null) {
print('🎯 BASIC DETECTION: $detectedProtocol');
// Simple blocking logic for basic patterns
if (['BitTorrent', 'eMule'].contains(detectedProtocol)) {
print('🚫 BLOCKING PROTOCOL: $detectedProtocol');
} else {
print('✅ ALLOWING PROTOCOL: $detectedProtocol');
}
} else {
print('❓ UNKNOWN PROTOCOL (basic analysis)');
print('🔤 ASCII sample: ${asciiData.replaceAll(RegExp(r'[^\x20-\x7E]'), '.').substring(0, 50)}...');
}
}
static List<String> _extractHexBytes(String tcpdumpOutput) {
final hexPattern = RegExp(r'0x[0-9a-f]+:\s*([0-9a-f\s]+)', caseSensitive: false);
final matches = hexPattern.allMatches(tcpdumpOutput);
final hexBytes = <String>[];
for (final match in matches) {
final hexLine = match.group(1)?.replaceAll(' ', '') ?? '';
for (int i = 0; i < hexLine.length; i += 2) {
if (i + 1 < hexLine.length) {
hexBytes.add(hexLine.substring(i, i + 2));
}
}
}
return hexBytes;
}
static String _extractAsciiFromHex(List<String> hexBytes) {
return hexBytes
.map((hex) => int.tryParse(hex, radix: 16) ?? 0)
.map((byte) => (byte >= 32 && byte <= 126) ? String.fromCharCode(byte) : '.')
.join('');
}
static void dispose() {
_monitorTimer?.cancel();
_monitorTimer = null;
_processedConnections.clear();
print('Protocol Blocking Service disposed');
}
}