Refactor logging and ticket ownership verification in concurrency handling for improved clarity and reliability
This commit is contained in:
@@ -36,7 +36,7 @@ void spawnTicket(RandomAccessFile file, {
|
|||||||
void log(String message) {
|
void log(String message) {
|
||||||
|
|
||||||
String prefix = debugLabel != null ? "\x1B[38;5;208m[Ticket Spawner - $debugLabel]:\x1B[0m " : "\x1B[38;5;208m[Ticket Spawner]:\x1B[0m ";
|
String prefix = debugLabel != null ? "\x1B[38;5;208m[Ticket Spawner - $debugLabel]:\x1B[0m " : "\x1B[38;5;208m[Ticket Spawner]:\x1B[0m ";
|
||||||
// print("$prefix$message");
|
print("$prefix$message");
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -90,7 +90,7 @@ void spawnTicket(RandomAccessFile file, {
|
|||||||
ticketState: SweepstoreTicketState.WAITING,
|
ticketState: SweepstoreTicketState.WAITING,
|
||||||
);
|
);
|
||||||
ticketIndex = i;
|
ticketIndex = i;
|
||||||
log("Acquired ticket $ticketIndex with identifier $newIdentifier.");
|
// log("Acquired ticket $ticketIndex with identifier $newIdentifier.");
|
||||||
break;
|
break;
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -104,7 +104,7 @@ void spawnTicket(RandomAccessFile file, {
|
|||||||
SweepstoreWorkerTicketSnapshot verifySnapshot = concurrencyHeader[ticketIndex].snapshot();
|
SweepstoreWorkerTicketSnapshot verifySnapshot = concurrencyHeader[ticketIndex].snapshot();
|
||||||
|
|
||||||
if (verifySnapshot.identifier != newIdentifier) {
|
if (verifySnapshot.identifier != newIdentifier) {
|
||||||
log("Lost ticket $ticketIndex, retrying...");
|
// log("Lost ticket $ticketIndex, retrying...");
|
||||||
ticketIndex = null;
|
ticketIndex = null;
|
||||||
} else {
|
} else {
|
||||||
return concurrencyHeader[ticketIndex];
|
return concurrencyHeader[ticketIndex];
|
||||||
@@ -139,8 +139,20 @@ void spawnTicket(RandomAccessFile file, {
|
|||||||
|
|
||||||
// Check we still own the ticket
|
// Check we still own the ticket
|
||||||
if (snapshot.identifier != myIdentifier) {
|
if (snapshot.identifier != myIdentifier) {
|
||||||
String exceptionMessage = "CRITICAL: Lost ownership of ticket ${myTicket.ticketIndex}, was expecting identifier $myIdentifier but found ${snapshot.identifier}.";
|
|
||||||
throw Exception(exceptionMessage);
|
preciseSleep(Duration(milliseconds: 10));
|
||||||
|
|
||||||
|
// Re-verify we lost the ticket
|
||||||
|
SweepstoreWorkerTicketSnapshot recheckSnapshot = myTicket.snapshot();
|
||||||
|
if (recheckSnapshot.identifier != myIdentifier) {
|
||||||
|
String exceptionMessage = "CRITICAL: Lost ownership of ticket ${myTicket.ticketIndex}, was expecting identifier $myIdentifier but found ${snapshot.identifier}.";
|
||||||
|
// throw Exception(exceptionMessage);
|
||||||
|
log(exceptionMessage);
|
||||||
|
return spawnTicket(file, operation: operation, keyHash: keyHash, writeSize: writeSize, onApproved: onApproved);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Nvm, false alarm
|
||||||
|
log("False alarm, still own ticket ${myTicket.ticketIndex}.");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (snapshot.ticketState == SweepstoreTicketState.APPROVED) {
|
if (snapshot.ticketState == SweepstoreTicketState.APPROVED) {
|
||||||
|
|||||||
@@ -362,12 +362,12 @@ class SweepstoreWorkerTicket {
|
|||||||
|
|
||||||
SweepstoreWorkerTicketSnapshot snapshot() {
|
SweepstoreWorkerTicketSnapshot snapshot() {
|
||||||
|
|
||||||
// _concurrencyHeader._header._file.lockSync(FileLock.blockingShared, _baseOffset, _baseOffset + ticketSize);
|
_concurrencyHeader._header._file.lockSync(FileLock.blockingShared, _baseOffset, _baseOffset + ticketSize);
|
||||||
|
|
||||||
_concurrencyHeader._header._file.setPositionSync(_baseOffset);
|
_concurrencyHeader._header._file.setPositionSync(_baseOffset);
|
||||||
List<int> existingBuffer = _concurrencyHeader._header._file.readSync(ticketSize);
|
List<int> existingBuffer = _concurrencyHeader._header._file.readSync(ticketSize);
|
||||||
RandomAccessMemory buffer = RandomAccessMemory(existingBuffer);
|
RandomAccessMemory buffer = RandomAccessMemory(existingBuffer);
|
||||||
// _concurrencyHeader._header._file.unlockSync(_baseOffset, _baseOffset + ticketSize);
|
_concurrencyHeader._header._file.unlockSync(_baseOffset, _baseOffset + ticketSize);
|
||||||
|
|
||||||
buffer.setPositionSync(0);
|
buffer.setPositionSync(0);
|
||||||
int identifier = buffer.readIntSync(4);
|
int identifier = buffer.readIntSync(4);
|
||||||
|
|||||||
Reference in New Issue
Block a user