From 747f0bd1ed2bc38ee0cedcc17f02f0f7d1fba1c3 Mon Sep 17 00:00:00 2001 From: ImBenji Date: Sun, 23 Nov 2025 17:57:30 +0000 Subject: [PATCH] Refactor concurrency handling and improve sleep precision in concurrency.dart and header.dart --- dart/lib/concurrency.dart | 14 +- dart/lib/dev_tools/watch_tickets.dart | 2 +- dart/lib/header.dart | 133 +++--------------- .../{helper_extensions.dart => helpers.dart} | 13 ++ dart/lib/sweepstore.dart | 10 +- 5 files changed, 48 insertions(+), 124 deletions(-) rename dart/lib/{helper_extensions.dart => helpers.dart} (94%) diff --git a/dart/lib/concurrency.dart b/dart/lib/concurrency.dart index bf75286..060ef62 100644 --- a/dart/lib/concurrency.dart +++ b/dart/lib/concurrency.dart @@ -5,7 +5,7 @@ import 'dart:isolate'; import 'dart:math'; import 'package:sweepstore/header.dart'; -import 'package:sweepstore/helper_extensions.dart'; +import 'package:sweepstore/helpers.dart'; import 'package:sweepstore/structures.dart'; int _randomId() { @@ -28,17 +28,18 @@ void spawnTicket(RandomAccessFile file, { void log(String message) { String prefix = debugLabel != null ? "\x1B[38;5;208m[Ticket Spawner - $debugLabel]:\x1B[0m " : "\x1B[38;5;208m[Ticket Spawner]:\x1B[0m "; - print("$prefix$message"); + // print("$prefix$message"); } void tickSleep([int microsecondVariance = 10]) { - sleep(Duration(microseconds: 100 + Random().nextInt(microsecondVariance))); + preciseSleep(Duration(microseconds: 100 + Random().nextInt(microsecondVariance))); } Map expSleepTracker = {}; void expSleep(String label) { int count = expSleepTracker[label] ?? 0; int sleepTime = (1 << count); // Exponential backoff - sleep(Duration(milliseconds: sleepTime)); + // sleepTime = max(1, min(sleepTime, 1000)); // Clamp between 1ms and 1000ms + preciseSleep(Duration(microseconds: sleepTime * 5000)); expSleepTracker[label] = count + 1; } @@ -83,6 +84,7 @@ void spawnTicket(RandomAccessFile file, { expSleep("acquire_loop"); + // Ensure we still own the ticket - if not, reset and try again if (ticketIndex != null) { SweepstoreWorkerTicket ticket = concurrencyHeader[ticketIndex]; @@ -145,7 +147,7 @@ void initialiseMasterListener(RandomAccessFile file) async { Isolate.spawn((_) { void log(String message) { - print("\x1B[38;5;82m[Master Listener]:\x1B[0m $message"); + // print("\x1B[38;5;82m[Master Listener]:\x1B[0m $message"); } RandomAccessFile file = File(filePath).openSync(mode: FileMode.append); @@ -183,7 +185,7 @@ void initialiseMasterListener(RandomAccessFile file) async { } - sleep(Duration(milliseconds: 1)); + preciseSleep(Duration(milliseconds: 1)); } }, null); diff --git a/dart/lib/dev_tools/watch_tickets.dart b/dart/lib/dev_tools/watch_tickets.dart index 9fa0221..42665a2 100644 --- a/dart/lib/dev_tools/watch_tickets.dart +++ b/dart/lib/dev_tools/watch_tickets.dart @@ -79,6 +79,6 @@ void main() async { print('Error: example.bin not found - Refresh #$refreshCount'); } - await Future.delayed(Duration(seconds: 1)); + await Future.delayed(Duration(milliseconds: 1)); } } \ No newline at end of file diff --git a/dart/lib/header.dart b/dart/lib/header.dart index 3b65cfb..f0da975 100644 --- a/dart/lib/header.dart +++ b/dart/lib/header.dart @@ -3,7 +3,12 @@ import 'dart:convert'; import 'dart:io'; import 'package:sweepstore/structures.dart'; -import 'helper_extensions.dart'; +import 'helpers.dart'; + +int roundToNearest16(int value) { + int rounded = (value + 15) & ~15; + return rounded; +} void initialiseSweepstoreHeader(RandomAccessFile file, { int concurrentWorkers = 4, @@ -200,18 +205,18 @@ class SweepstoreConcurrencyHeaderWriter extends SweepstoreConcurrencyHeader { } -const int endOfStaticHeaderOffset = 46; +final int endOfStaticHeaderOffset = roundToNearest16(46); class SweepstoreWorkerTicket { - static const int _ticketSize = 30; + static final int ticketSize = roundToNearest16(29); final SweepstoreConcurrencyHeader _concurrencyHeader; final int ticketIndex; SweepstoreWorkerTicket(this.ticketIndex, this._concurrencyHeader); // All offsets are relative to the start of the workers ticket - int get _baseOffset => endOfStaticHeaderOffset + (ticketIndex * _ticketSize); + int get _baseOffset => endOfStaticHeaderOffset + (ticketIndex * ticketSize); // Offset 0 - 4 bytes int get identifier { @@ -289,10 +294,10 @@ class SweepstoreWorkerTicket { try { - _concurrencyHeader._header._file.lockSync(FileLock.blockingExclusive, _baseOffset, _baseOffset + _ticketSize); + _concurrencyHeader._header._file.lockSync(FileLock.blockingExclusive, _baseOffset, _baseOffset + ticketSize); _concurrencyHeader._header._file.setPositionSync(_baseOffset); - List existingBuffer = _concurrencyHeader._header._file.readSync(_ticketSize); + List existingBuffer = _concurrencyHeader._header._file.readSync(ticketSize); RandomAccessMemory buffer = RandomAccessMemory(existingBuffer); if (identifier != null) { @@ -324,11 +329,17 @@ class SweepstoreWorkerTicket { buffer.writeIntSync(writeSize, 4); } + // Pad the rest of the ticket with zeros if necessary + buffer.setPositionSync(30); + while (buffer.positionSync() < ticketSize) { + buffer.writeIntSync(0, 1); + } + _concurrencyHeader._header._file.setPositionSync(_baseOffset); _concurrencyHeader._header._file.writeFromSync(buffer.toUint8List()); } finally { - _concurrencyHeader._header._file.unlockSync(_baseOffset, _baseOffset + _ticketSize); + _concurrencyHeader._header._file.unlockSync(_baseOffset, _baseOffset + ticketSize); } } @@ -337,118 +348,14 @@ class SweepstoreWorkerTicket { _concurrencyHeader._header._file.lockSync( FileLock.blockingExclusive, _baseOffset, - _baseOffset + _ticketSize + _baseOffset + ticketSize ); // Successfully locked - immediately unlock and return true - _concurrencyHeader._header._file.unlockSync(_baseOffset, _baseOffset + _ticketSize); + _concurrencyHeader._header._file.unlockSync(_baseOffset, _baseOffset + ticketSize); return true; } catch (e) { // Lock failed - already held by another process return false; } } -} - -@deprecated -class _SweepstoreWorkerTicket { - - static const int _ticketSize = 30; - - final SweepstoreConcurrencyHeader _concurrencyHeader; - final int ticketIndex; - - _SweepstoreWorkerTicket(this.ticketIndex, this._concurrencyHeader); - - // All offsets are relative to the start of the workers ticket - int get _baseOffset => endOfStaticHeaderOffset + (ticketIndex * _ticketSize); - - // Offset 0 - 4 bytes - int get identifier { - _concurrencyHeader._header._file.setPositionSync(_baseOffset); - int id = _concurrencyHeader._header._file.readIntSync(4); - return id; - } - set identifier(int id) { - _concurrencyHeader._header._file.setPositionSync(_baseOffset); - _concurrencyHeader._header._file.writeIntSync(id, 4); - _concurrencyHeader._header._file.flushSync(); - } - - // Offset 4 - 4 bytes - int get workerHeartbeat { - _concurrencyHeader._header._file.setPositionSync(_baseOffset + 4); - int heartbeat = _concurrencyHeader._header._file.readIntSync(4); - return heartbeat; - } - set workerHeartbeat(int heartbeat) { - _concurrencyHeader._header._file.setPositionSync(_baseOffset + 4); - _concurrencyHeader._header._file.writeIntSync(heartbeat, 4); - } - - // Offset 8 - 1 byte - SweepstoreTicketState get ticketState { - _concurrencyHeader._header._file.setPositionSync(_baseOffset + 8); - int stateValue = _concurrencyHeader._header._file.readIntSync(1); - return SweepstoreTicketState.values[stateValue]; - } - set ticketState(SweepstoreTicketState state) { - _concurrencyHeader._header._file.setPositionSync(_baseOffset + 8); - _concurrencyHeader._header._file.writeIntSync(state.index, 1); - _concurrencyHeader._header._file.flushSync(); - } - - // Offset 9 - 1 byte - SweepstoreTicketOperation get ticketOperation { - _concurrencyHeader._header._file.setPositionSync(_baseOffset + 9); - int operationValue = _concurrencyHeader._header._file.readIntSync(1); - return SweepstoreTicketOperation.values[operationValue]; - } - set ticketOperation(SweepstoreTicketOperation operation) { - _concurrencyHeader._header._file.setPositionSync(_baseOffset + 9); - _concurrencyHeader._header._file.writeIntSync(operation.index, 1); - } - - // Offset 10 - 8 bytes - int get keyHash { - _concurrencyHeader._header._file.setPositionSync(_baseOffset + 10); - int hash = _concurrencyHeader._header._file.readIntSync(8); - return hash; - } - set keyHash(int hash) { - _concurrencyHeader._header._file.setPositionSync(_baseOffset + 10); - _concurrencyHeader._header._file.writeIntSync(hash, 8); - } - - // Offset 18 - 8 bytes - SweepstorePointer get writePointer { - _concurrencyHeader._header._file.setPositionSync(_baseOffset + 18); - int address = _concurrencyHeader._header._file.readIntSync(8); - return SweepstorePointer(address); - } - set writePointer(SweepstorePointer pointer) { - _concurrencyHeader._header._file.setPositionSync(_baseOffset + 18); - _concurrencyHeader._header._file.writeIntSync(pointer.address, 8); - } - - // Offset 26 - 4 bytes - int get writeSize { - _concurrencyHeader._header._file.setPositionSync(_baseOffset + 26); - int size = _concurrencyHeader._header._file.readIntSync(4); - return size; - } - set writeSize(int size) { - _concurrencyHeader._header._file.setPositionSync(_baseOffset + 26); - _concurrencyHeader._header._file.writeIntSync(size, 4); - } - - // Helpers - - void lock() { - _concurrencyHeader._header._file.lockSync(FileLock.exclusive, _baseOffset, _baseOffset + _ticketSize); - } - - void unlock() { - _concurrencyHeader._header._file.unlockSync(_baseOffset, _baseOffset + _ticketSize); - } - } \ No newline at end of file diff --git a/dart/lib/helper_extensions.dart b/dart/lib/helpers.dart similarity index 94% rename from dart/lib/helper_extensions.dart rename to dart/lib/helpers.dart index 5b02f9f..7cdbfe4 100644 --- a/dart/lib/helper_extensions.dart +++ b/dart/lib/helpers.dart @@ -225,4 +225,17 @@ extension SweepstoreDateTimeHelper on DateTime { int millisecondsSinceEpoch64() { return millisecondsSinceEpoch; } +} + +const Duration _windowsMinSleepTime = Duration(milliseconds: 16); + +void preciseSleep(Duration duration) { + if (Platform.isWindows && duration < _windowsMinSleepTime) { + // Busy-wait for short durations on Windows + final end = DateTime.now().add(duration); + while (DateTime.now().isBefore(end)) {} + } else { + // Use normal sleep for longer durations or on Unix + sleep(duration); + } } \ No newline at end of file diff --git a/dart/lib/sweepstore.dart b/dart/lib/sweepstore.dart index 8cda4ec..03fd021 100644 --- a/dart/lib/sweepstore.dart +++ b/dart/lib/sweepstore.dart @@ -40,7 +40,7 @@ class Sweepstore { keyHash: key.hashCode, writeSize: 0, // Placeholder onApproved: () { - print("Writing key: $key with hash ${key.hashCode} and value: $value"); + // print("Writing key: $key with hash ${key.hashCode} and value: $value"); }, debugLabel: key ); @@ -60,16 +60,16 @@ Future main() async { Sweepstore store = Sweepstore(filePath); store.initialise( - concurrentWorkers: 20 + concurrentWorkers: 18 ); initialiseMasterListener(file.openSync(mode: FileMode.append)); print(binaryDump(file.readAsBytesSync())); - int iteration = 1; + int iteration = 100; for (int j = 0; j < iteration; j++) { - int concurrencyTest = 256; + int concurrencyTest = 128; final receivePort = ReceivePort(); int completedJobs = 0; @@ -101,5 +101,7 @@ Future main() async { print('\x1B[95mAll jobs completed!\x1B[0m'); print('\x1B[95mTime taken: ${stopwatch.elapsedMilliseconds}ms (${stopwatch.elapsed.inSeconds}s)\x1B[0m'); + + // sleep(Duration(seconds: 2)); } } \ No newline at end of file