Add Protocol Blocking Service to monitor and capture new connections

This commit is contained in:
ImBenji
2025-08-28 17:01:48 +01:00
parent c294bc5c08
commit 022c9b6339
2 changed files with 233 additions and 0 deletions

View File

@@ -10,6 +10,7 @@ import 'package:waylume_server/services/server_service.dart';
import 'package:waylume_server/services/wireguard_service.dart'; import 'package:waylume_server/services/wireguard_service.dart';
import 'package:waylume_server/services/vpn_session_service.dart'; import 'package:waylume_server/services/vpn_session_service.dart';
import 'package:waylume_server/services/vpn_session_monitor.dart'; import 'package:waylume_server/services/vpn_session_monitor.dart';
import 'package:waylume_server/services/protocol_blocking_service.dart';
import 'package:waylume_server/core/utils.dart'; import 'package:waylume_server/core/utils.dart';
import 'package:waylume_server/web/peer_routes.dart'; import 'package:waylume_server/web/peer_routes.dart';
import 'package:waylume_server/config/supabase_config.dart'; import 'package:waylume_server/config/supabase_config.dart';
@@ -54,6 +55,9 @@ void main() async {
await WireGuardService.initializeServer(); await WireGuardService.initializeServer();
initHeartbeat(); initHeartbeat();
initVpnSessionMonitor(); initVpnSessionMonitor();
// Initialize protocol blocking service
ProtocolBlockingService.initialize();
if (!Platform.isMacOS) { if (!Platform.isMacOS) {
// await ServerService.isolatePeers(); // await ServerService.isolatePeers();

View File

@@ -0,0 +1,229 @@
import 'dart:async';
import 'dart:io';
class Connection {
final String protocol;
final String localIP;
final int localPort;
final String remoteIP;
final int remotePort;
Connection({
required this.protocol,
required this.localIP,
required this.localPort,
required this.remoteIP,
required this.remotePort,
});
String get id => '$protocol:$localIP:$localPort->$remoteIP:$remotePort';
@override
String toString() {
return '$protocol $localIP:$localPort -> $remoteIP:$remotePort';
}
}
class ProtocolBlockingService {
static Timer? _monitorTimer;
static final Set<String> _processedConnections = {};
static void initialize() {
print('Initializing Protocol Blocking Service...');
_startConnectionMonitoring();
}
static void _startConnectionMonitoring() {
_monitorTimer = Timer.periodic(Duration(milliseconds: 100), (_) async {
await _scanForNewConnections();
});
}
static Future<void> _scanForNewConnections() async {
try {
// Monitor both TCP and UDP connections
final tcpResult = await Process.run('ss', ['-tnp', 'state', 'syn-sent,established']);
final udpResult = await Process.run('ss', ['-unp']);
final tcpConnections = _parseConnections(tcpResult.stdout.toString(), 'tcp');
final udpConnections = _parseConnections(udpResult.stdout.toString(), 'udp');
for (final conn in [...tcpConnections, ...udpConnections]) {
if (_isNewConnection(conn)) {
await _handleNewConnection(conn);
}
}
} catch (e) {
print('Error scanning for connections: $e');
}
}
static List<Connection> _parseConnections(String output, String protocol) {
final connections = <Connection>[];
final lines = output.split('\n');
for (final line in lines) {
try {
final conn = _parseConnectionLine(line.trim(), protocol);
if (conn != null) {
connections.add(conn);
}
} catch (e) {
// Skip malformed lines
}
}
return connections;
}
static Connection? _parseConnectionLine(String line, String protocol) {
if (line.isEmpty || line.startsWith('Netid') || line.startsWith('State')) {
return null;
}
// Split by whitespace and filter empty strings
final parts = line.split(RegExp(r'\s+')).where((s) => s.isNotEmpty).toList();
if (parts.length < 4) {
return null;
}
try {
// For TCP: State Recv-Q Send-Q Local_Address:Port Peer_Address:Port
// For UDP: State Recv-Q Send-Q Local_Address:Port Peer_Address:Port
final localAddr = parts[3];
final remoteAddr = parts.length > 4 ? parts[4] : '';
if (remoteAddr.isEmpty || remoteAddr == '*:*') {
return null; // Skip listening sockets
}
final localParts = localAddr.split(':');
final remoteParts = remoteAddr.split(':');
if (localParts.length < 2 || remoteParts.length < 2) {
return null;
}
final localIP = localParts.sublist(0, localParts.length - 1).join(':');
final localPort = int.parse(localParts.last);
final remoteIP = remoteParts.sublist(0, remoteParts.length - 1).join(':');
final remotePort = int.parse(remoteParts.last);
return Connection(
protocol: protocol,
localIP: localIP,
localPort: localPort,
remoteIP: remoteIP,
remotePort: remotePort,
);
} catch (e) {
return null;
}
}
static bool _isNewConnection(Connection conn) {
final id = conn.id;
if (_processedConnections.contains(id)) {
return false;
}
_processedConnections.add(id);
// Clean up old connections periodically (keep last 1000)
if (_processedConnections.length > 1000) {
final toRemove = _processedConnections.length - 800;
final oldConnections = _processedConnections.take(toRemove).toList();
for (final old in oldConnections) {
_processedConnections.remove(old);
}
}
return true;
}
static Future<void> _handleNewConnection(Connection conn) async {
print('🔍 New connection detected: $conn');
try {
// Attempt to capture handshake data
await _captureHandshake(conn);
} catch (e) {
print('⚠️ Error capturing handshake for $conn: $e');
}
}
static Future<void> _captureHandshake(Connection conn) async {
try {
print('📡 Capturing handshake for: $conn');
// Use tcpdump to capture only the first data packet (handshake)
final protocol = conn.protocol;
final process = await Process.start('timeout', [
'2', // 2 second timeout
'tcpdump',
'-i', 'any',
'-c', '1', // Capture only 1 packet
'-s', '200', // Capture first 200 bytes only
'-x', // Output in hex
'$protocol and host ${conn.remoteIP} and port ${conn.remotePort}',
]);
final handshakeData = <String>[];
await for (final data in process.stdout) {
handshakeData.add(String.fromCharCodes(data));
}
final exitCode = await process.exitCode;
if (exitCode == 0 && handshakeData.isNotEmpty) {
final data = handshakeData.join();
print('✅ Handshake captured for $conn:');
print(' Data (first 200 chars): ${data.length > 200 ? data.substring(0, 200) + "..." : data}');
// Analyze the handshake
_analyzeHandshake(data, conn);
} else if (exitCode == 124) {
print('⏱️ Handshake capture timeout for $conn (no packets in 2s)');
} else {
print('❌ Failed to capture handshake for $conn (exit code: $exitCode)');
}
process.kill();
} catch (e) {
print('💥 Error in handshake capture: $e');
}
}
static void _analyzeHandshake(String handshakeData, Connection conn) {
// Simple pattern detection for now
final data = handshakeData.toLowerCase();
String? detectedProtocol;
if (data.contains('bittorrent protocol') || data.contains('13426974546f7272656e742070726f746f636f6c')) {
detectedProtocol = 'BitTorrent';
} else if (data.contains('ssh-2.0') || data.contains('ssh-1.')) {
detectedProtocol = 'SSH';
} else if (data.contains('get ') || data.contains('post ') || data.contains('http/')) {
detectedProtocol = 'HTTP';
} else if (data.contains('220 ') && conn.remotePort == 25) {
detectedProtocol = 'SMTP';
} else if (data.contains('220 ') && conn.remotePort == 21) {
detectedProtocol = 'FTP';
}
if (detectedProtocol != null) {
print('🎯 PROTOCOL DETECTED: $detectedProtocol for $conn');
} else {
print('❓ Unknown protocol for $conn');
}
}
static void dispose() {
_monitorTimer?.cancel();
_monitorTimer = null;
_processedConnections.clear();
print('Protocol Blocking Service disposed');
}
}