diff --git a/dart/lib/concurrency.dart b/dart/lib/concurrency.dart index 1477eb4..70815bd 100644 --- a/dart/lib/concurrency.dart +++ b/dart/lib/concurrency.dart @@ -72,11 +72,13 @@ void spawnTicket(RandomAccessFile file, { continue; } - int identifier = ticket.identifier; + SweepstoreWorkerTicketSnapshot ticketSnapshot = ticket.snapshot(); + + int identifier = ticketSnapshot.identifier; bool identifier_unassigned = identifier == 0; - bool stale_heartbeat = (DateTime.now().millisecondsSinceEpoch32() - ticket.workerHeartbeat) > 2000; - bool is_free = ticket.ticketState == SweepstoreTicketState.FREE; + bool stale_heartbeat = (DateTime.now().millisecondsSinceEpoch32() - ticketSnapshot.workerHeartbeat) > 2000; + bool is_free = ticketSnapshot.ticketState == SweepstoreTicketState.FREE; if (identifier_unassigned && stale_heartbeat && is_free) { ticket.write( @@ -92,16 +94,17 @@ void spawnTicket(RandomAccessFile file, { } - preciseSleep(Duration(milliseconds: 1)); + preciseSleep(Duration(milliseconds: 2)); // Ensure we still own the ticket - if not, reset and try again if (ticketIndex != null) { - SweepstoreWorkerTicket ticket = concurrencyHeader[ticketIndex]; - if (ticket.identifier != newIdentifier) { + SweepstoreWorkerTicketSnapshot verifySnapshot = concurrencyHeader[ticketIndex].snapshot(); + + if (verifySnapshot.identifier != newIdentifier) { log("Lost ticket $ticketIndex, retrying..."); ticketIndex = null; } else { - return ticket; + return concurrencyHeader[ticketIndex]; } } diff --git a/dart/lib/header.dart b/dart/lib/header.dart index 45b8340..122b2a0 100644 --- a/dart/lib/header.dart +++ b/dart/lib/header.dart @@ -359,4 +359,58 @@ class SweepstoreWorkerTicket { return false; } } + + SweepstoreWorkerTicketSnapshot snapshot() { + + _concurrencyHeader._header._file.lockSync(FileLock.blockingShared, _baseOffset, _baseOffset + ticketSize); + + _concurrencyHeader._header._file.setPositionSync(_baseOffset); + List existingBuffer = _concurrencyHeader._header._file.readSync(ticketSize); + RandomAccessMemory buffer = RandomAccessMemory(existingBuffer); + _concurrencyHeader._header._file.unlockSync(_baseOffset, _baseOffset + ticketSize); + + buffer.setPositionSync(0); + int identifier = buffer.readIntSync(4); + int workerHeartbeat = buffer.readIntSync(4); + SweepstoreTicketState ticketState = SweepstoreTicketState.values[buffer.readIntSync(1)]; + SweepstoreTicketOperation ticketOperation = SweepstoreTicketOperation.values[buffer.readIntSync(1)]; + int keyHash = buffer.readIntSync(8); + SweepstorePointer writePointer = SweepstorePointer(buffer.readIntSync(8)); + int writeSize = buffer.readIntSync(4); + return SweepstoreWorkerTicketSnapshot._( + ticketIndex: ticketIndex, + identifier: identifier, + workerHeartbeat: workerHeartbeat, + ticketState: ticketState, + ticketOperation: ticketOperation, + keyHash: keyHash, + writePointer: writePointer, + writeSize: writeSize, + ); + } + +} + +class SweepstoreWorkerTicketSnapshot { + + final int ticketIndex; + final int identifier; + final int workerHeartbeat; + final SweepstoreTicketState ticketState; + final SweepstoreTicketOperation ticketOperation; + final int keyHash; + final SweepstorePointer writePointer; + final int writeSize; + + SweepstoreWorkerTicketSnapshot._({ + required this.ticketIndex, + required this.identifier, + required this.workerHeartbeat, + required this.ticketState, + required this.ticketOperation, + required this.keyHash, + required this.writePointer, + required this.writeSize, + }); + } \ No newline at end of file