Enhance concurrency handling with improved ticket acquisition logic and logging

This commit is contained in:
ImBenji
2025-11-23 19:21:13 +00:00
parent f5a23fa5c4
commit 809d79cfc8
3 changed files with 76 additions and 46 deletions

View File

@@ -25,15 +25,24 @@ void spawnTicket(RandomAccessFile file, {
String? debugLabel, String? debugLabel,
}) { }) {
/*
Useful Functions
*/
/// Logging function
void log(String message) { void log(String message) {
String prefix = debugLabel != null ? "\x1B[38;5;208m[Ticket Spawner - $debugLabel]:\x1B[0m " : "\x1B[38;5;208m[Ticket Spawner]:\x1B[0m "; String prefix = debugLabel != null ? "\x1B[38;5;208m[Ticket Spawner - $debugLabel]:\x1B[0m " : "\x1B[38;5;208m[Ticket Spawner]:\x1B[0m ";
// print("$prefix$message"); // print("$prefix$message");
} }
/// Sleep a bit - with variance - mainly used for heartbeats
void tickSleep([int microsecondVariance = 10]) { void tickSleep([int microsecondVariance = 10]) {
preciseSleep(Duration(microseconds: 100 + Random().nextInt(microsecondVariance))); preciseSleep(Duration(microseconds: 500 + Random().nextInt(microsecondVariance)));
} }
/// Exponential sleep function
Map<String, int> expSleepTracker = {}; Map<String, int> expSleepTracker = {};
void expSleep(String label) { void expSleep(String label) {
int count = expSleepTracker[label] ?? 0; int count = expSleepTracker[label] ?? 0;
@@ -43,60 +52,72 @@ void spawnTicket(RandomAccessFile file, {
expSleepTracker[label] = count + 1; expSleepTracker[label] = count + 1;
} }
// Reduce the chance of race conditions by adding a small random delay // Get the header
tickSleep(100);
SweepstoreHeader header = SweepstoreHeader(file); SweepstoreHeader header = SweepstoreHeader(file);
SweepstoreConcurrencyHeader concurrencyHeader = SweepstoreConcurrencyHeader(header); SweepstoreConcurrencyHeader concurrencyHeader = SweepstoreConcurrencyHeader(header);
int? ticketIndex; /*
int myIdentifier = _randomId(); Ticket Acquisition
*/
SweepstoreWorkerTicket acquireTicket(int newIdentifier) {
int? ticketIndex;
// Try to acquire a ticket - (Acquire loop) while (true) {
while (ticketIndex == null) {
for (int i = 0; i < concurrencyHeader.numberOfWorkers; i++) { for (int i = 0; i < concurrencyHeader.numberOfWorkers; i++) {
SweepstoreWorkerTicket ticket = concurrencyHeader[i]; SweepstoreWorkerTicket ticket = concurrencyHeader[i];
if (!ticket.writable()) { if (!ticket.writable()) {
continue; continue;
} }
int identifier = ticket.identifier; int identifier = ticket.identifier;
bool identifier_unassigned = identifier == 0; bool identifier_unassigned = identifier == 0;
bool stale_heartbeat = (DateTime.now().millisecondsSinceEpoch32() - ticket.workerHeartbeat) > 2000; bool stale_heartbeat = (DateTime.now().millisecondsSinceEpoch32() - ticket.workerHeartbeat) > 2000;
bool is_free = ticket.ticketState == SweepstoreTicketState.FREE; bool is_free = ticket.ticketState == SweepstoreTicketState.FREE;
if (identifier_unassigned && stale_heartbeat && is_free) { if (identifier_unassigned && stale_heartbeat && is_free) {
ticket.write( ticket.write(
identifier: myIdentifier, identifier: newIdentifier,
ticketState: SweepstoreTicketState.WAITING, workerHeartbeat: DateTime.now().millisecondsSinceEpoch32(),
); ticketState: SweepstoreTicketState.WAITING,
ticketIndex = i; );
log("Acquired ticket $ticketIndex with identifier $myIdentifier."); ticketIndex = i;
break; log("Acquired ticket $ticketIndex with identifier $newIdentifier.");
break;
}
} }
preciseSleep(Duration(milliseconds: 1));
// Ensure we still own the ticket - if not, reset and try again
if (ticketIndex != null) {
SweepstoreWorkerTicket ticket = concurrencyHeader[ticketIndex];
if (ticket.identifier != newIdentifier) {
log("Lost ticket $ticketIndex, retrying...");
ticketIndex = null;
} else {
return ticket;
}
}
expSleep("acquire_loop");
} }
expSleep("acquire_loop"); throw Exception("Failed to acquire ticket.");
// Ensure we still own the ticket - if not, reset and try again
if (ticketIndex != null) {
SweepstoreWorkerTicket ticket = concurrencyHeader[ticketIndex];
if (ticket.identifier != myIdentifier) {
log("Lost ticket $ticketIndex, retrying...");
ticketIndex = null;
}
}
} }
// Reduce the chance of race conditions by adding a small random delay
tickSleep(500);
int myIdentifier = _randomId();
// We have a ticket, set it up // We have a ticket, set it up
SweepstoreWorkerTicket myTicket = concurrencyHeader[ticketIndex]; SweepstoreWorkerTicket myTicket = acquireTicket(myIdentifier);
myTicket.write( myTicket.write(
workerHeartbeat: DateTime.now().millisecondsSinceEpoch32(), workerHeartbeat: DateTime.now().millisecondsSinceEpoch32(),
ticketState: SweepstoreTicketState.WAITING, ticketState: SweepstoreTicketState.WAITING,
@@ -110,7 +131,7 @@ void spawnTicket(RandomAccessFile file, {
// Check we still own the ticket // Check we still own the ticket
if (myTicket.identifier != myIdentifier) { if (myTicket.identifier != myIdentifier) {
String exceptionMessage = "CRITICAL: Lost ownership of ticket $ticketIndex, was expecting identifier $myIdentifier but found ${myTicket.identifier}."; String exceptionMessage = "CRITICAL: Lost ownership of ticket ${myTicket.ticketIndex}, was expecting identifier $myIdentifier but found ${myTicket.identifier}.";
throw Exception(exceptionMessage); throw Exception(exceptionMessage);
} }
@@ -130,15 +151,13 @@ void spawnTicket(RandomAccessFile file, {
tickSleep(); tickSleep();
// Update heartbeat // Update heartbeat
if (DateTime.now().millisecondsSinceEpoch32() != myTicket.workerHeartbeat) { int now = DateTime.now().millisecondsSinceEpoch32();
if (now - myTicket.workerHeartbeat > 700) {
myTicket.write( myTicket.write(
workerHeartbeat: myTicket.workerHeartbeat, workerHeartbeat: DateTime.now().millisecondsSinceEpoch32()
); );
} }
} }
} }
// Master side // Master side

View File

@@ -22,8 +22,15 @@ void main() async {
final file = File('example.bin'); final file = File('example.bin');
if (await file.exists()) { if (await file.exists()) {
final raf = await file.open(mode: FileMode.read); // Check file size first
int fileSize = await file.length();
if (fileSize < 48) {
print('Error: example.bin too small ($fileSize bytes) - Refresh #$refreshCount');
await Future.delayed(Duration(seconds: 1));
continue;
}
final raf = await file.open(mode: FileMode.read);
try { try {
final header = SweepstoreHeader(raf); final header = SweepstoreHeader(raf);
@@ -71,7 +78,10 @@ void main() async {
// updat previous master heartbeat // updat previous master heartbeat
previousMasterHeartbeat = concurrency.masterHeartbeat; previousMasterHeartbeat = concurrency.masterHeartbeat;
print('--- Refreshing in 1 seconds ---'); print('--- Refreshing in 1 second ---');
} catch (e) {
print('Error reading file: $e');
print('File may be in inconsistent state, retrying...');
} finally { } finally {
await raf.close(); await raf.close();
} }
@@ -79,6 +89,6 @@ void main() async {
print('Error: example.bin not found - Refresh #$refreshCount'); print('Error: example.bin not found - Refresh #$refreshCount');
} }
await Future.delayed(Duration(milliseconds: 1)); await Future.delayed(Duration(seconds: 1));
} }
} }

View File

@@ -337,6 +337,7 @@ class SweepstoreWorkerTicket {
_concurrencyHeader._header._file.setPositionSync(_baseOffset); _concurrencyHeader._header._file.setPositionSync(_baseOffset);
_concurrencyHeader._header._file.writeFromSync(buffer.toUint8List()); _concurrencyHeader._header._file.writeFromSync(buffer.toUint8List());
_concurrencyHeader._header._file.flushSync();
} finally { } finally {
_concurrencyHeader._header._file.unlockSync(_baseOffset, _baseOffset + ticketSize); _concurrencyHeader._header._file.unlockSync(_baseOffset, _baseOffset + ticketSize);