From e6ccad87b42c80750c3251eb791796e199d49232 Mon Sep 17 00:00:00 2001 From: ImBenji Date: Sun, 23 Nov 2025 21:02:40 +0000 Subject: [PATCH] Refactor logging and ticket ownership verification in concurrency handling for improved clarity and reliability --- dart/lib/concurrency.dart | 22 +++++++++++++++++----- dart/lib/header.dart | 4 ++-- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/dart/lib/concurrency.dart b/dart/lib/concurrency.dart index c258fab..1b6939b 100644 --- a/dart/lib/concurrency.dart +++ b/dart/lib/concurrency.dart @@ -36,7 +36,7 @@ void spawnTicket(RandomAccessFile file, { void log(String message) { String prefix = debugLabel != null ? "\x1B[38;5;208m[Ticket Spawner - $debugLabel]:\x1B[0m " : "\x1B[38;5;208m[Ticket Spawner]:\x1B[0m "; - // print("$prefix$message"); + print("$prefix$message"); } @@ -90,7 +90,7 @@ void spawnTicket(RandomAccessFile file, { ticketState: SweepstoreTicketState.WAITING, ); ticketIndex = i; - log("Acquired ticket $ticketIndex with identifier $newIdentifier."); + // log("Acquired ticket $ticketIndex with identifier $newIdentifier."); break; } @@ -104,7 +104,7 @@ void spawnTicket(RandomAccessFile file, { SweepstoreWorkerTicketSnapshot verifySnapshot = concurrencyHeader[ticketIndex].snapshot(); if (verifySnapshot.identifier != newIdentifier) { - log("Lost ticket $ticketIndex, retrying..."); + // log("Lost ticket $ticketIndex, retrying..."); ticketIndex = null; } else { return concurrencyHeader[ticketIndex]; @@ -139,8 +139,20 @@ void spawnTicket(RandomAccessFile file, { // Check we still own the ticket if (snapshot.identifier != myIdentifier) { - String exceptionMessage = "CRITICAL: Lost ownership of ticket ${myTicket.ticketIndex}, was expecting identifier $myIdentifier but found ${snapshot.identifier}."; - throw Exception(exceptionMessage); + + preciseSleep(Duration(milliseconds: 10)); + + // Re-verify we lost the ticket + SweepstoreWorkerTicketSnapshot recheckSnapshot = myTicket.snapshot(); + if (recheckSnapshot.identifier != myIdentifier) { + String exceptionMessage = "CRITICAL: Lost ownership of ticket ${myTicket.ticketIndex}, was expecting identifier $myIdentifier but found ${snapshot.identifier}."; + // throw Exception(exceptionMessage); + log(exceptionMessage); + return spawnTicket(file, operation: operation, keyHash: keyHash, writeSize: writeSize, onApproved: onApproved); + } + + // Nvm, false alarm + log("False alarm, still own ticket ${myTicket.ticketIndex}."); } if (snapshot.ticketState == SweepstoreTicketState.APPROVED) { diff --git a/dart/lib/header.dart b/dart/lib/header.dart index 85b8e46..ce27781 100644 --- a/dart/lib/header.dart +++ b/dart/lib/header.dart @@ -362,12 +362,12 @@ class SweepstoreWorkerTicket { SweepstoreWorkerTicketSnapshot snapshot() { - // _concurrencyHeader._header._file.lockSync(FileLock.blockingShared, _baseOffset, _baseOffset + ticketSize); + _concurrencyHeader._header._file.lockSync(FileLock.blockingShared, _baseOffset, _baseOffset + ticketSize); _concurrencyHeader._header._file.setPositionSync(_baseOffset); List existingBuffer = _concurrencyHeader._header._file.readSync(ticketSize); RandomAccessMemory buffer = RandomAccessMemory(existingBuffer); - // _concurrencyHeader._header._file.unlockSync(_baseOffset, _baseOffset + ticketSize); + _concurrencyHeader._header._file.unlockSync(_baseOffset, _baseOffset + ticketSize); buffer.setPositionSync(0); int identifier = buffer.readIntSync(4);