import 'dart:io'; import 'dart:isolate'; import 'dart:math'; import 'package:sweepstore/header.dart'; import 'package:sweepstore/helpers.dart'; import 'package:sweepstore/structures.dart'; // Stale Heartbeat threshold in milliseconds const int STALE_HEARTBEAT_THRESHOLD_MS = 5000; // 5 seconds int _randomId() { // mix timestamp with random for better uniquness // keep it positive to avoid signed int issues when storing int time = DateTime.now().millisecondsSinceEpoch32(); int random = Random().nextInt(0x80000000); return (time ^ random) & 0x7FFFFFFF; } // Spawn a ticket for a worker to perform an operation void spawnTicket(RandomAccessFile file, { required SweepstoreTicketOperation operation, required int keyHash, required int writeSize, required void Function() onApproved, String? debugLabel, }) { /* Useful Functions */ /// Logging function void log(String message) { String prefix = debugLabel != null ? "\x1B[38;5;208m[Ticket Spawner - $debugLabel]:\x1B[0m " : "\x1B[38;5;208m[Ticket Spawner]:\x1B[0m "; // print("$prefix$message"); } /// Sleep a bit - with variance - mainly used for heartbeats void tickSleep([int microsecondVariance = 10]) { preciseSleep(Duration(microseconds: 500 + Random().nextInt(microsecondVariance))); } /// Exponential sleep function Map expSleepTracker = {}; void expSleep(String label) { int count = expSleepTracker[label] ?? 0; int sleepTime = (1 << count); // Exponential backoff // sleepTime = max(1, min(sleepTime, 1000)); // Clamp between 1ms and 1000ms preciseSleep(Duration(microseconds: sleepTime * 1000)); expSleepTracker[label] = count + 1; } // Get the header SweepstoreHeader header = SweepstoreHeader(file); SweepstoreConcurrencyHeader concurrencyHeader = SweepstoreConcurrencyHeader(header); /* Ticket Acquisition */ SweepstoreWorkerTicket acquireTicket(int newIdentifier) { int? ticketIndex; while (true) { for (int i = 0; i < concurrencyHeader.numberOfWorkers; i++) { SweepstoreWorkerTicket ticket = concurrencyHeader[i]; if (!ticket.writable()) { continue; } SweepstoreWorkerTicketSnapshot ticketSnapshot = ticket.snapshot(); int identifier = ticketSnapshot.identifier; bool identifier_unassigned = identifier == 0; bool stale_heartbeat = (DateTime.now().millisecondsSinceEpoch32() - ticketSnapshot.workerHeartbeat) > STALE_HEARTBEAT_THRESHOLD_MS; bool is_free = ticketSnapshot.ticketState == SweepstoreTicketState.FREE; if (identifier_unassigned && stale_heartbeat && is_free) { ticket.write( identifier: newIdentifier, workerHeartbeat: DateTime.now().millisecondsSinceEpoch32(), ticketState: SweepstoreTicketState.WAITING, ); ticketIndex = i; log("Acquired ticket $ticketIndex with identifier $newIdentifier."); break; } } preciseSleep(Duration(milliseconds: 2)); // Ensure we still own the ticket - if not, reset and try again if (ticketIndex != null) { SweepstoreWorkerTicketSnapshot verifySnapshot = concurrencyHeader[ticketIndex].snapshot(); if (verifySnapshot.identifier != newIdentifier) { log("Lost ticket $ticketIndex, retrying..."); ticketIndex = null; } else { return concurrencyHeader[ticketIndex]; } } expSleep("acquire_loop"); } throw Exception("Failed to acquire ticket."); } // Reduce the chance of race conditions by adding a small random delay tickSleep(500); int myIdentifier = _randomId(); // We have a ticket, set it up SweepstoreWorkerTicket myTicket = acquireTicket(myIdentifier); myTicket.write( workerHeartbeat: DateTime.now().millisecondsSinceEpoch32(), ticketState: SweepstoreTicketState.WAITING, ticketOperation: operation, keyHash: keyHash, writeSize: writeSize, ); // Wait for approval - (Approval loop) while (true) { SweepstoreWorkerTicketSnapshot snapshot = myTicket.snapshot(); // Check we still own the ticket if (snapshot.identifier != myIdentifier) { String exceptionMessage = "CRITICAL: Lost ownership of ticket ${myTicket.ticketIndex}, was expecting identifier $myIdentifier but found ${snapshot.identifier}."; throw Exception(exceptionMessage); } if (snapshot.ticketState == SweepstoreTicketState.APPROVED) { myTicket.write( ticketState: SweepstoreTicketState.EXECUTING, ); onApproved(); myTicket.write( ticketState: SweepstoreTicketState.COMPLETED, ); break; } // randomSleep(10); tickSleep(); // Update heartbeat int now = DateTime.now().millisecondsSinceEpoch32(); if (now - snapshot.workerHeartbeat > 700) { myTicket.write( workerHeartbeat: now ); } } } // Master side void initialiseMasterListener(RandomAccessFile file) async { String filePath = file.path; Isolate.spawn((_) { void log(String message) { // print("\x1B[38;5;82m[Master Listener]:\x1B[0m $message"); } RandomAccessFile file = File(filePath).openSync(mode: FileMode.append); SweepstoreHeader header = SweepstoreHeader(file); SweepstoreConcurrencyHeader concurrencyHeader = SweepstoreConcurrencyHeader(header); while (true) { for (int i = 0; i < concurrencyHeader.numberOfWorkers; i++) { SweepstoreWorkerTicket ticket = concurrencyHeader[i]; SweepstoreWorkerTicketSnapshot snapshot = ticket.snapshot(); if (snapshot.ticketState == SweepstoreTicketState.WAITING) { log("Found waiting ticket $i (Key Hash: ${snapshot.keyHash})..."); // Approve the ticket ticket.write( ticketState: SweepstoreTicketState.APPROVED, ); log("Approved ticket $i."); } else if (snapshot.ticketState == SweepstoreTicketState.COMPLETED) { log("Ticket $i completed. Resetting ticket..."); // Reset the ticket ticket.write( identifier: 0, workerHeartbeat: 0, ticketState: SweepstoreTicketState.FREE, ticketOperation: SweepstoreTicketOperation.NONE, keyHash: 0, writeSize: 0, ); log("Reset ticket $i."); } } preciseSleep(Duration(milliseconds: 1)); } }, null); }