Refactor concurrency handling to utilize SweepstoreWorkerTicketSnapshot for improved state management and add snapshot method in header.dart

This commit is contained in:
ImBenji
2025-11-23 20:00:50 +00:00
parent 809d79cfc8
commit 580f07c483
2 changed files with 64 additions and 7 deletions

View File

@@ -72,11 +72,13 @@ void spawnTicket(RandomAccessFile file, {
continue;
}
int identifier = ticket.identifier;
SweepstoreWorkerTicketSnapshot ticketSnapshot = ticket.snapshot();
int identifier = ticketSnapshot.identifier;
bool identifier_unassigned = identifier == 0;
bool stale_heartbeat = (DateTime.now().millisecondsSinceEpoch32() - ticket.workerHeartbeat) > 2000;
bool is_free = ticket.ticketState == SweepstoreTicketState.FREE;
bool stale_heartbeat = (DateTime.now().millisecondsSinceEpoch32() - ticketSnapshot.workerHeartbeat) > 2000;
bool is_free = ticketSnapshot.ticketState == SweepstoreTicketState.FREE;
if (identifier_unassigned && stale_heartbeat && is_free) {
ticket.write(
@@ -92,16 +94,17 @@ void spawnTicket(RandomAccessFile file, {
}
preciseSleep(Duration(milliseconds: 1));
preciseSleep(Duration(milliseconds: 2));
// Ensure we still own the ticket - if not, reset and try again
if (ticketIndex != null) {
SweepstoreWorkerTicket ticket = concurrencyHeader[ticketIndex];
if (ticket.identifier != newIdentifier) {
SweepstoreWorkerTicketSnapshot verifySnapshot = concurrencyHeader[ticketIndex].snapshot();
if (verifySnapshot.identifier != newIdentifier) {
log("Lost ticket $ticketIndex, retrying...");
ticketIndex = null;
} else {
return ticket;
return concurrencyHeader[ticketIndex];
}
}

View File

@@ -359,4 +359,58 @@ class SweepstoreWorkerTicket {
return false;
}
}
SweepstoreWorkerTicketSnapshot snapshot() {
_concurrencyHeader._header._file.lockSync(FileLock.blockingShared, _baseOffset, _baseOffset + ticketSize);
_concurrencyHeader._header._file.setPositionSync(_baseOffset);
List<int> existingBuffer = _concurrencyHeader._header._file.readSync(ticketSize);
RandomAccessMemory buffer = RandomAccessMemory(existingBuffer);
_concurrencyHeader._header._file.unlockSync(_baseOffset, _baseOffset + ticketSize);
buffer.setPositionSync(0);
int identifier = buffer.readIntSync(4);
int workerHeartbeat = buffer.readIntSync(4);
SweepstoreTicketState ticketState = SweepstoreTicketState.values[buffer.readIntSync(1)];
SweepstoreTicketOperation ticketOperation = SweepstoreTicketOperation.values[buffer.readIntSync(1)];
int keyHash = buffer.readIntSync(8);
SweepstorePointer writePointer = SweepstorePointer(buffer.readIntSync(8));
int writeSize = buffer.readIntSync(4);
return SweepstoreWorkerTicketSnapshot._(
ticketIndex: ticketIndex,
identifier: identifier,
workerHeartbeat: workerHeartbeat,
ticketState: ticketState,
ticketOperation: ticketOperation,
keyHash: keyHash,
writePointer: writePointer,
writeSize: writeSize,
);
}
}
class SweepstoreWorkerTicketSnapshot {
final int ticketIndex;
final int identifier;
final int workerHeartbeat;
final SweepstoreTicketState ticketState;
final SweepstoreTicketOperation ticketOperation;
final int keyHash;
final SweepstorePointer writePointer;
final int writeSize;
SweepstoreWorkerTicketSnapshot._({
required this.ticketIndex,
required this.identifier,
required this.workerHeartbeat,
required this.ticketState,
required this.ticketOperation,
required this.keyHash,
required this.writePointer,
required this.writeSize,
});
}