From f8e8636677dfd234426a94cdc32f0aaecaa5c05f Mon Sep 17 00:00:00 2001 From: ImBenji Date: Sun, 23 Nov 2025 20:27:08 +0000 Subject: [PATCH] Refactor concurrency handling to introduce STALE_HEARTBEAT_THRESHOLD_MS constant and utilize SweepstoreWorkerTicketSnapshot for improved state management --- dart/lib/concurrency.dart | 24 +++-- dart/lib/dev_tools/watch_tickets.dart | 19 ++-- dart/lib/header.dart | 128 +++++++++++++------------- dart/lib/sweepstore.dart | 12 ++- 4 files changed, 98 insertions(+), 85 deletions(-) diff --git a/dart/lib/concurrency.dart b/dart/lib/concurrency.dart index 70815bd..c258fab 100644 --- a/dart/lib/concurrency.dart +++ b/dart/lib/concurrency.dart @@ -8,6 +8,9 @@ import 'package:sweepstore/header.dart'; import 'package:sweepstore/helpers.dart'; import 'package:sweepstore/structures.dart'; +// Stale Heartbeat threshold in milliseconds +const int STALE_HEARTBEAT_THRESHOLD_MS = 5000; // 5 seconds + int _randomId() { // mix timestamp with random for better uniquness // keep it positive to avoid signed int issues when storing @@ -77,7 +80,7 @@ void spawnTicket(RandomAccessFile file, { int identifier = ticketSnapshot.identifier; bool identifier_unassigned = identifier == 0; - bool stale_heartbeat = (DateTime.now().millisecondsSinceEpoch32() - ticketSnapshot.workerHeartbeat) > 2000; + bool stale_heartbeat = (DateTime.now().millisecondsSinceEpoch32() - ticketSnapshot.workerHeartbeat) > STALE_HEARTBEAT_THRESHOLD_MS; bool is_free = ticketSnapshot.ticketState == SweepstoreTicketState.FREE; if (identifier_unassigned && stale_heartbeat && is_free) { @@ -132,13 +135,15 @@ void spawnTicket(RandomAccessFile file, { // Wait for approval - (Approval loop) while (true) { + SweepstoreWorkerTicketSnapshot snapshot = myTicket.snapshot(); + // Check we still own the ticket - if (myTicket.identifier != myIdentifier) { - String exceptionMessage = "CRITICAL: Lost ownership of ticket ${myTicket.ticketIndex}, was expecting identifier $myIdentifier but found ${myTicket.identifier}."; + if (snapshot.identifier != myIdentifier) { + String exceptionMessage = "CRITICAL: Lost ownership of ticket ${myTicket.ticketIndex}, was expecting identifier $myIdentifier but found ${snapshot.identifier}."; throw Exception(exceptionMessage); } - if (myTicket.ticketState == SweepstoreTicketState.APPROVED) { + if (snapshot.ticketState == SweepstoreTicketState.APPROVED) { myTicket.write( ticketState: SweepstoreTicketState.EXECUTING, ); @@ -155,9 +160,9 @@ void spawnTicket(RandomAccessFile file, { // Update heartbeat int now = DateTime.now().millisecondsSinceEpoch32(); - if (now - myTicket.workerHeartbeat > 700) { + if (now - snapshot.workerHeartbeat > 700) { myTicket.write( - workerHeartbeat: DateTime.now().millisecondsSinceEpoch32() + workerHeartbeat: now ); } } @@ -182,16 +187,17 @@ void initialiseMasterListener(RandomAccessFile file) async { for (int i = 0; i < concurrencyHeader.numberOfWorkers; i++) { SweepstoreWorkerTicket ticket = concurrencyHeader[i]; + SweepstoreWorkerTicketSnapshot snapshot = ticket.snapshot(); - if (ticket.ticketState == SweepstoreTicketState.WAITING) { - log("Found waiting ticket $i (Key Hash: ${ticket.keyHash})..."); + if (snapshot.ticketState == SweepstoreTicketState.WAITING) { + log("Found waiting ticket $i (Key Hash: ${snapshot.keyHash})..."); // Approve the ticket ticket.write( ticketState: SweepstoreTicketState.APPROVED, ); log("Approved ticket $i."); - } else if (ticket.ticketState == SweepstoreTicketState.COMPLETED) { + } else if (snapshot.ticketState == SweepstoreTicketState.COMPLETED) { log("Ticket $i completed. Resetting ticket..."); // Reset the ticket ticket.write( diff --git a/dart/lib/dev_tools/watch_tickets.dart b/dart/lib/dev_tools/watch_tickets.dart index 98ce7c4..64513ec 100644 --- a/dart/lib/dev_tools/watch_tickets.dart +++ b/dart/lib/dev_tools/watch_tickets.dart @@ -54,24 +54,25 @@ void main() async { // display each ticket for (int i = 0; i < concurrency.numberOfWorkers; i++) { final ticket = concurrency[i]; + final snapshot = ticket.snapshot(); print('--- Ticket #$i ---'); - print(' Identifier: ${ticket.identifier}'); + print(' Identifier: ${snapshot.identifier}'); - int workerAge = now32 - ticket.workerHeartbeat; + int workerAge = now32 - snapshot.workerHeartbeat; String workerStatus = workerAge > 5 ? "(stale)" : "(active)"; String workerPrevious = previousWorkerHeartbeats.containsKey(i) ? "(previously ${previousWorkerHeartbeats[i]})" : ""; - print(' Heartbeat: ${ticket.workerHeartbeat} $workerStatus $workerPrevious'); + print(' Heartbeat: ${snapshot.workerHeartbeat} $workerStatus $workerPrevious'); - print(' State: ${ticket.ticketState.name}'); - print(' Operation: ${ticket.ticketOperation.name}'); - print(' Key Hash: ${ticket.keyHash}'); - print(' Write Ptr: ${ticket.writePointer}'); - print(' Write Size: ${ticket.writeSize} bytes'); + print(' State: ${snapshot.ticketState.name}'); + print(' Operation: ${snapshot.ticketOperation.name}'); + print(' Key Hash: ${snapshot.keyHash}'); + print(' Write Ptr: ${snapshot.writePointer}'); + print(' Write Size: ${snapshot.writeSize} bytes'); print(''); // update previous heartbeat - previousWorkerHeartbeats[i] = ticket.workerHeartbeat; + previousWorkerHeartbeats[i] = snapshot.workerHeartbeat; } diff --git a/dart/lib/header.dart b/dart/lib/header.dart index 122b2a0..85b8e46 100644 --- a/dart/lib/header.dart +++ b/dart/lib/header.dart @@ -218,68 +218,68 @@ class SweepstoreWorkerTicket { // All offsets are relative to the start of the workers ticket int get _baseOffset => endOfStaticHeaderOffset + (ticketIndex * ticketSize); - // Offset 0 - 4 bytes - int get identifier { - _concurrencyHeader._header._file.lockSync(FileLock.blockingShared, _baseOffset, _baseOffset + 4); - _concurrencyHeader._header._file.setPositionSync(_baseOffset); - int id = _concurrencyHeader._header._file.readIntSync(4); - _concurrencyHeader._header._file.unlockSync(_baseOffset, _baseOffset + 4); - return id; - } - - // Offset 4 - 4 bytes - int get workerHeartbeat { - _concurrencyHeader._header._file.lockSync(FileLock.blockingShared, _baseOffset + 4, _baseOffset + 8); - _concurrencyHeader._header._file.setPositionSync(_baseOffset + 4); - int heartbeat = _concurrencyHeader._header._file.readIntSync(4); - _concurrencyHeader._header._file.unlockSync(_baseOffset + 4, _baseOffset + 8); - return heartbeat; - } - - // Offset 8 - 1 byte - SweepstoreTicketState get ticketState { - _concurrencyHeader._header._file.lockSync(FileLock.blockingShared, _baseOffset + 8, _baseOffset + 9); - _concurrencyHeader._header._file.setPositionSync(_baseOffset + 8); - int stateValue = _concurrencyHeader._header._file.readIntSync(1); - _concurrencyHeader._header._file.unlockSync(_baseOffset + 8, _baseOffset + 9); - return SweepstoreTicketState.values[stateValue]; - } - - // Offset 9 - 1 byte - SweepstoreTicketOperation get ticketOperation { - _concurrencyHeader._header._file.lockSync(FileLock.blockingShared, _baseOffset + 9, _baseOffset + 10); - _concurrencyHeader._header._file.setPositionSync(_baseOffset + 9); - int operationValue = _concurrencyHeader._header._file.readIntSync(1); - _concurrencyHeader._header._file.unlockSync(_baseOffset + 9, _baseOffset + 10); - return SweepstoreTicketOperation.values[operationValue]; - } - - // Offset 10 - 8 bytes - int get keyHash { - _concurrencyHeader._header._file.lockSync(FileLock.blockingShared, _baseOffset + 10, _baseOffset + 18); - _concurrencyHeader._header._file.setPositionSync(_baseOffset + 10); - int hash = _concurrencyHeader._header._file.readIntSync(8); - _concurrencyHeader._header._file.unlockSync(_baseOffset + 10, _baseOffset + 18); - return hash; - } - - // Offset 18 - 8 bytes - SweepstorePointer get writePointer { - _concurrencyHeader._header._file.lockSync(FileLock.blockingShared, _baseOffset + 18, _baseOffset + 26); - _concurrencyHeader._header._file.setPositionSync(_baseOffset + 18); - int address = _concurrencyHeader._header._file.readIntSync(8); - _concurrencyHeader._header._file.unlockSync(_baseOffset + 18, _baseOffset + 26); - return SweepstorePointer(address); - } - - // Offset 26 - 4 bytes - int get writeSize { - _concurrencyHeader._header._file.lockSync(FileLock.blockingShared, _baseOffset + 26, _baseOffset + 30); - _concurrencyHeader._header._file.setPositionSync(_baseOffset + 26); - int size = _concurrencyHeader._header._file.readIntSync(4); - _concurrencyHeader._header._file.unlockSync(_baseOffset + 26, _baseOffset + 30); - return size; - } + // // Offset 0 - 4 bytes + // int get identifier { + // _concurrencyHeader._header._file.lockSync(FileLock.blockingShared, _baseOffset, _baseOffset + 4); + // _concurrencyHeader._header._file.setPositionSync(_baseOffset); + // int id = _concurrencyHeader._header._file.readIntSync(4); + // _concurrencyHeader._header._file.unlockSync(_baseOffset, _baseOffset + 4); + // return id; + // } + // + // // Offset 4 - 4 bytes + // int get workerHeartbeat { + // _concurrencyHeader._header._file.lockSync(FileLock.blockingShared, _baseOffset + 4, _baseOffset + 8); + // _concurrencyHeader._header._file.setPositionSync(_baseOffset + 4); + // int heartbeat = _concurrencyHeader._header._file.readIntSync(4); + // _concurrencyHeader._header._file.unlockSync(_baseOffset + 4, _baseOffset + 8); + // return heartbeat; + // } + // + // // Offset 8 - 1 byte + // SweepstoreTicketState get ticketState { + // _concurrencyHeader._header._file.lockSync(FileLock.blockingShared, _baseOffset + 8, _baseOffset + 9); + // _concurrencyHeader._header._file.setPositionSync(_baseOffset + 8); + // int stateValue = _concurrencyHeader._header._file.readIntSync(1); + // _concurrencyHeader._header._file.unlockSync(_baseOffset + 8, _baseOffset + 9); + // return SweepstoreTicketState.values[stateValue]; + // } + // + // // Offset 9 - 1 byte + // SweepstoreTicketOperation get ticketOperation { + // _concurrencyHeader._header._file.lockSync(FileLock.blockingShared, _baseOffset + 9, _baseOffset + 10); + // _concurrencyHeader._header._file.setPositionSync(_baseOffset + 9); + // int operationValue = _concurrencyHeader._header._file.readIntSync(1); + // _concurrencyHeader._header._file.unlockSync(_baseOffset + 9, _baseOffset + 10); + // return SweepstoreTicketOperation.values[operationValue]; + // } + // + // // Offset 10 - 8 bytes + // int get keyHash { + // _concurrencyHeader._header._file.lockSync(FileLock.blockingShared, _baseOffset + 10, _baseOffset + 18); + // _concurrencyHeader._header._file.setPositionSync(_baseOffset + 10); + // int hash = _concurrencyHeader._header._file.readIntSync(8); + // _concurrencyHeader._header._file.unlockSync(_baseOffset + 10, _baseOffset + 18); + // return hash; + // } + // + // // Offset 18 - 8 bytes + // SweepstorePointer get writePointer { + // _concurrencyHeader._header._file.lockSync(FileLock.blockingShared, _baseOffset + 18, _baseOffset + 26); + // _concurrencyHeader._header._file.setPositionSync(_baseOffset + 18); + // int address = _concurrencyHeader._header._file.readIntSync(8); + // _concurrencyHeader._header._file.unlockSync(_baseOffset + 18, _baseOffset + 26); + // return SweepstorePointer(address); + // } + // + // // Offset 26 - 4 bytes + // int get writeSize { + // _concurrencyHeader._header._file.lockSync(FileLock.blockingShared, _baseOffset + 26, _baseOffset + 30); + // _concurrencyHeader._header._file.setPositionSync(_baseOffset + 26); + // int size = _concurrencyHeader._header._file.readIntSync(4); + // _concurrencyHeader._header._file.unlockSync(_baseOffset + 26, _baseOffset + 30); + // return size; + // } // Writer void write({ @@ -362,12 +362,12 @@ class SweepstoreWorkerTicket { SweepstoreWorkerTicketSnapshot snapshot() { - _concurrencyHeader._header._file.lockSync(FileLock.blockingShared, _baseOffset, _baseOffset + ticketSize); + // _concurrencyHeader._header._file.lockSync(FileLock.blockingShared, _baseOffset, _baseOffset + ticketSize); _concurrencyHeader._header._file.setPositionSync(_baseOffset); List existingBuffer = _concurrencyHeader._header._file.readSync(ticketSize); RandomAccessMemory buffer = RandomAccessMemory(existingBuffer); - _concurrencyHeader._header._file.unlockSync(_baseOffset, _baseOffset + ticketSize); + // _concurrencyHeader._header._file.unlockSync(_baseOffset, _baseOffset + ticketSize); buffer.setPositionSync(0); int identifier = buffer.readIntSync(4); diff --git a/dart/lib/sweepstore.dart b/dart/lib/sweepstore.dart index 03fd021..e245899 100644 --- a/dart/lib/sweepstore.dart +++ b/dart/lib/sweepstore.dart @@ -60,21 +60,25 @@ Future main() async { Sweepstore store = Sweepstore(filePath); store.initialise( - concurrentWorkers: 18 + concurrentWorkers: 32 ); initialiseMasterListener(file.openSync(mode: FileMode.append)); print(binaryDump(file.readAsBytesSync())); - int iteration = 100; + int iteration = 0; - for (int j = 0; j < iteration; j++) { + print("Concurrent Workers: ${store._concurrencyHeader.numberOfWorkers}"); + print("Stale Ticket Threshold: ${STALE_HEARTBEAT_THRESHOLD_MS}ms"); + + while (true) { int concurrencyTest = 128; final receivePort = ReceivePort(); int completedJobs = 0; final stopwatch = Stopwatch()..start(); + print('\x1B[95mStarting iteration #$iteration with $concurrencyTest concurrent jobs...\x1B[0m'); for (int i = 0; i < concurrencyTest; i++) { await Isolate.spawn((message) { final index = message['index'] as int; @@ -101,7 +105,9 @@ Future main() async { print('\x1B[95mAll jobs completed!\x1B[0m'); print('\x1B[95mTime taken: ${stopwatch.elapsedMilliseconds}ms (${stopwatch.elapsed.inSeconds}s)\x1B[0m'); + print(" "); // sleep(Duration(seconds: 2)); + iteration++; } } \ No newline at end of file