From 809d79cfc8f5f23be42421ba4d77914fd8db91e6 Mon Sep 17 00:00:00 2001 From: ImBenji Date: Sun, 23 Nov 2025 19:21:13 +0000 Subject: [PATCH] Enhance concurrency handling with improved ticket acquisition logic and logging --- dart/lib/concurrency.dart | 105 +++++++++++++++----------- dart/lib/dev_tools/watch_tickets.dart | 16 +++- dart/lib/header.dart | 1 + 3 files changed, 76 insertions(+), 46 deletions(-) diff --git a/dart/lib/concurrency.dart b/dart/lib/concurrency.dart index 9fd4810..1477eb4 100644 --- a/dart/lib/concurrency.dart +++ b/dart/lib/concurrency.dart @@ -25,15 +25,24 @@ void spawnTicket(RandomAccessFile file, { String? debugLabel, }) { + /* + Useful Functions + */ + + /// Logging function void log(String message) { String prefix = debugLabel != null ? "\x1B[38;5;208m[Ticket Spawner - $debugLabel]:\x1B[0m " : "\x1B[38;5;208m[Ticket Spawner]:\x1B[0m "; // print("$prefix$message"); } + + /// Sleep a bit - with variance - mainly used for heartbeats void tickSleep([int microsecondVariance = 10]) { - preciseSleep(Duration(microseconds: 100 + Random().nextInt(microsecondVariance))); + preciseSleep(Duration(microseconds: 500 + Random().nextInt(microsecondVariance))); } + + /// Exponential sleep function Map expSleepTracker = {}; void expSleep(String label) { int count = expSleepTracker[label] ?? 0; @@ -43,60 +52,72 @@ void spawnTicket(RandomAccessFile file, { expSleepTracker[label] = count + 1; } - // Reduce the chance of race conditions by adding a small random delay - tickSleep(100); - + // Get the header SweepstoreHeader header = SweepstoreHeader(file); SweepstoreConcurrencyHeader concurrencyHeader = SweepstoreConcurrencyHeader(header); - int? ticketIndex; - int myIdentifier = _randomId(); + /* + Ticket Acquisition + */ + SweepstoreWorkerTicket acquireTicket(int newIdentifier) { + int? ticketIndex; - // Try to acquire a ticket - (Acquire loop) - while (ticketIndex == null) { + while (true) { - for (int i = 0; i < concurrencyHeader.numberOfWorkers; i++) { + for (int i = 0; i < concurrencyHeader.numberOfWorkers; i++) { - SweepstoreWorkerTicket ticket = concurrencyHeader[i]; + SweepstoreWorkerTicket ticket = concurrencyHeader[i]; - if (!ticket.writable()) { - continue; - } + if (!ticket.writable()) { + continue; + } - int identifier = ticket.identifier; + int identifier = ticket.identifier; - bool identifier_unassigned = identifier == 0; - bool stale_heartbeat = (DateTime.now().millisecondsSinceEpoch32() - ticket.workerHeartbeat) > 2000; - bool is_free = ticket.ticketState == SweepstoreTicketState.FREE; + bool identifier_unassigned = identifier == 0; + bool stale_heartbeat = (DateTime.now().millisecondsSinceEpoch32() - ticket.workerHeartbeat) > 2000; + bool is_free = ticket.ticketState == SweepstoreTicketState.FREE; - if (identifier_unassigned && stale_heartbeat && is_free) { - ticket.write( - identifier: myIdentifier, - ticketState: SweepstoreTicketState.WAITING, - ); - ticketIndex = i; - log("Acquired ticket $ticketIndex with identifier $myIdentifier."); - break; + if (identifier_unassigned && stale_heartbeat && is_free) { + ticket.write( + identifier: newIdentifier, + workerHeartbeat: DateTime.now().millisecondsSinceEpoch32(), + ticketState: SweepstoreTicketState.WAITING, + ); + ticketIndex = i; + log("Acquired ticket $ticketIndex with identifier $newIdentifier."); + break; + + } } + preciseSleep(Duration(milliseconds: 1)); + + // Ensure we still own the ticket - if not, reset and try again + if (ticketIndex != null) { + SweepstoreWorkerTicket ticket = concurrencyHeader[ticketIndex]; + if (ticket.identifier != newIdentifier) { + log("Lost ticket $ticketIndex, retrying..."); + ticketIndex = null; + } else { + return ticket; + } + } + + expSleep("acquire_loop"); } - expSleep("acquire_loop"); - - - // Ensure we still own the ticket - if not, reset and try again - if (ticketIndex != null) { - SweepstoreWorkerTicket ticket = concurrencyHeader[ticketIndex]; - if (ticket.identifier != myIdentifier) { - log("Lost ticket $ticketIndex, retrying..."); - ticketIndex = null; - } - } + throw Exception("Failed to acquire ticket."); } + // Reduce the chance of race conditions by adding a small random delay + tickSleep(500); + + int myIdentifier = _randomId(); + // We have a ticket, set it up - SweepstoreWorkerTicket myTicket = concurrencyHeader[ticketIndex]; + SweepstoreWorkerTicket myTicket = acquireTicket(myIdentifier); myTicket.write( workerHeartbeat: DateTime.now().millisecondsSinceEpoch32(), ticketState: SweepstoreTicketState.WAITING, @@ -110,7 +131,7 @@ void spawnTicket(RandomAccessFile file, { // Check we still own the ticket if (myTicket.identifier != myIdentifier) { - String exceptionMessage = "CRITICAL: Lost ownership of ticket $ticketIndex, was expecting identifier $myIdentifier but found ${myTicket.identifier}."; + String exceptionMessage = "CRITICAL: Lost ownership of ticket ${myTicket.ticketIndex}, was expecting identifier $myIdentifier but found ${myTicket.identifier}."; throw Exception(exceptionMessage); } @@ -130,15 +151,13 @@ void spawnTicket(RandomAccessFile file, { tickSleep(); // Update heartbeat - if (DateTime.now().millisecondsSinceEpoch32() != myTicket.workerHeartbeat) { + int now = DateTime.now().millisecondsSinceEpoch32(); + if (now - myTicket.workerHeartbeat > 700) { myTicket.write( - workerHeartbeat: myTicket.workerHeartbeat, + workerHeartbeat: DateTime.now().millisecondsSinceEpoch32() ); } } - - - } // Master side diff --git a/dart/lib/dev_tools/watch_tickets.dart b/dart/lib/dev_tools/watch_tickets.dart index 42665a2..98ce7c4 100644 --- a/dart/lib/dev_tools/watch_tickets.dart +++ b/dart/lib/dev_tools/watch_tickets.dart @@ -22,8 +22,15 @@ void main() async { final file = File('example.bin'); if (await file.exists()) { - final raf = await file.open(mode: FileMode.read); + // Check file size first + int fileSize = await file.length(); + if (fileSize < 48) { + print('Error: example.bin too small ($fileSize bytes) - Refresh #$refreshCount'); + await Future.delayed(Duration(seconds: 1)); + continue; + } + final raf = await file.open(mode: FileMode.read); try { final header = SweepstoreHeader(raf); @@ -71,7 +78,10 @@ void main() async { // updat previous master heartbeat previousMasterHeartbeat = concurrency.masterHeartbeat; - print('--- Refreshing in 1 seconds ---'); + print('--- Refreshing in 1 second ---'); + } catch (e) { + print('Error reading file: $e'); + print('File may be in inconsistent state, retrying...'); } finally { await raf.close(); } @@ -79,6 +89,6 @@ void main() async { print('Error: example.bin not found - Refresh #$refreshCount'); } - await Future.delayed(Duration(milliseconds: 1)); + await Future.delayed(Duration(seconds: 1)); } } \ No newline at end of file diff --git a/dart/lib/header.dart b/dart/lib/header.dart index f0da975..45b8340 100644 --- a/dart/lib/header.dart +++ b/dart/lib/header.dart @@ -337,6 +337,7 @@ class SweepstoreWorkerTicket { _concurrencyHeader._header._file.setPositionSync(_baseOffset); _concurrencyHeader._header._file.writeFromSync(buffer.toUint8List()); + _concurrencyHeader._header._file.flushSync(); } finally { _concurrencyHeader._header._file.unlockSync(_baseOffset, _baseOffset + ticketSize);