Add RandomAccessMemory class for in-memory binary operations

This commit is contained in:
ImBenji
2025-11-23 05:29:08 +00:00
parent 9216cd1638
commit 4295d119d7
26 changed files with 2124 additions and 949 deletions

2
dart/CHANGELOG.md Normal file
View File

@@ -0,0 +1,2 @@
1.0.0 Initial Release

190
dart/lib/concurrency.dart Normal file
View File

@@ -0,0 +1,190 @@
import 'dart:io';
import 'dart:isolate';
import 'dart:math';
import 'package:sweepstore/header.dart';
import 'package:sweepstore/helper_extensions.dart';
import 'package:sweepstore/structures.dart';
int _randomId() {
// mix timestamp with random for better uniquness
// keep it positive to avoid signed int issues when storing
int time = DateTime.now().millisecondsSinceEpoch32();
int random = Random().nextInt(0x80000000);
return (time ^ random) & 0x7FFFFFFF;
}
// Spawn a ticket for a worker to perform an operation
void spawnTicket(RandomAccessFile file, {
required SweepstoreTicketOperation operation,
required int keyHash,
required int writeSize,
required void Function() onApproved,
String? debugLabel,
}) {
void log(String message) {
String prefix = debugLabel != null ? "\x1B[38;5;208m[Ticket Spawner - $debugLabel]:\x1B[0m " : "\x1B[38;5;208m[Ticket Spawner]:\x1B[0m ";
print("$prefix$message");
}
void tickSleep([int microsecondVariance = 10]) {
sleep(Duration(microseconds: 100 + Random().nextInt(microsecondVariance)));
}
Map<String, int> expSleepTracker = {};
void expSleep(String label) {
int count = expSleepTracker[label] ?? 0;
int sleepTime = (1 << count); // Exponential backoff
sleep(Duration(milliseconds: sleepTime));
expSleepTracker[label] = count + 1;
}
// Reduce the chance of race conditions by adding a small random delay
tickSleep(100);
SweepstoreHeader header = SweepstoreHeader(file);
SweepstoreConcurrencyHeader concurrencyHeader = SweepstoreConcurrencyHeader(header);
int? ticketIndex;
int myIdentifier = _randomId();
// Try to acquire a ticket - (Acquire loop)
while (ticketIndex == null) {
for (int i = 0; i < concurrencyHeader.numberOfWorkers; i++) {
SweepstoreWorkerTicket ticket = concurrencyHeader[i];
if (!ticket.writable()) {
continue;
}
int identifier = ticket.identifier;
bool identifier_unassigned = identifier == 0;
bool stale_heartbeat = (DateTime.now().millisecondsSinceEpoch32() - ticket.workerHeartbeat) > 2000;
bool is_free = ticket.ticketState == SweepstoreTicketState.FREE;
if (identifier_unassigned && stale_heartbeat && is_free) {
ticket.write(
identifier: myIdentifier,
ticketState: SweepstoreTicketState.WAITING,
);
ticketIndex = i;
log("Acquired ticket $ticketIndex with identifier $myIdentifier.");
break;
}
}
expSleep("acquire_loop");
// Ensure we still own the ticket - if not, reset and try again
if (ticketIndex != null) {
SweepstoreWorkerTicket ticket = concurrencyHeader[ticketIndex];
if (ticket.identifier != myIdentifier) {
log("Lost ticket $ticketIndex, retrying...");
ticketIndex = null;
}
}
}
// We have a ticket, set it up
SweepstoreWorkerTicket myTicket = concurrencyHeader[ticketIndex];
myTicket.write(
workerHeartbeat: DateTime.now().millisecondsSinceEpoch32(),
ticketState: SweepstoreTicketState.WAITING,
ticketOperation: operation,
keyHash: keyHash,
writeSize: writeSize,
);
// Wait for approval - (Approval loop)
while (true) {
// Check we still own the ticket
if (myTicket.identifier != myIdentifier) {
String exceptionMessage = "CRITICAL: Lost ownership of ticket $ticketIndex, was expecting identifier $myIdentifier but found ${myTicket.identifier}.";
throw Exception(exceptionMessage);
}
if (myTicket.ticketState == SweepstoreTicketState.APPROVED) {
myTicket.write(
ticketState: SweepstoreTicketState.EXECUTING,
);
onApproved();
myTicket.write(
ticketState: SweepstoreTicketState.COMPLETED,
);
break;
}
// randomSleep(10);
tickSleep();
// Update heartbeat
if (DateTime.now().millisecondsSinceEpoch32() != myTicket.workerHeartbeat) {
myTicket.write(
workerHeartbeat: myTicket.workerHeartbeat,
);
}
}
}
// Master side
void initialiseMasterListener(RandomAccessFile file) async {
String filePath = file.path;
Isolate.spawn((_) {
void log(String message) {
print("\x1B[38;5;82m[Master Listener]:\x1B[0m $message");
}
RandomAccessFile file = File(filePath).openSync(mode: FileMode.append);
SweepstoreHeader header = SweepstoreHeader(file);
SweepstoreConcurrencyHeader concurrencyHeader = SweepstoreConcurrencyHeader(header);
while (true) {
for (int i = 0; i < concurrencyHeader.numberOfWorkers; i++) {
SweepstoreWorkerTicket ticket = concurrencyHeader[i];
if (ticket.ticketState == SweepstoreTicketState.WAITING) {
log("Found waiting ticket $i (Key Hash: ${ticket.keyHash})...");
// Approve the ticket
ticket.write(
ticketState: SweepstoreTicketState.APPROVED,
);
log("Approved ticket $i.");
} else if (ticket.ticketState == SweepstoreTicketState.COMPLETED) {
log("Ticket $i completed. Resetting ticket...");
// Reset the ticket
ticket.write(
identifier: 0,
workerHeartbeat: 0,
ticketState: SweepstoreTicketState.FREE,
ticketOperation: SweepstoreTicketOperation.NONE,
keyHash: 0,
writeSize: 0,
);
log("Reset ticket $i.");
}
}
sleep(Duration(milliseconds: 1));
}
}, null);
}

50
dart/lib/debug.dart Normal file
View File

@@ -0,0 +1,50 @@
import 'dart:typed_data';
String binaryDump(Uint8List data) {
StringBuffer buffer = StringBuffer();
for (int i = 0; i < data.length; i += 16) {
// Address
buffer.write('0x${i.toRadixString(16).padLeft(4, '0').toUpperCase()} (${i.toString().padLeft(4)}) | ');
// Hex bytes
for (int j = 0; j < 16; j++) {
if (i + j < data.length) {
buffer.write('${data[i + j].toRadixString(16).padLeft(2, '0').toUpperCase()} ');
} else {
buffer.write(' ');
}
}
buffer.write(' | ');
// Integer representation
for (int j = 0; j < 16; j++) {
if (i + j < data.length) {
buffer.write('${data[i + j].toString().padLeft(3)} ');
} else {
buffer.write(' ');
}
}
buffer.write(' | ');
// ASCII representation
for (int j = 0; j < 16; j++) {
if (i + j < data.length) {
int byte = data[i + j];
if (byte >= 32 && byte <= 126) {
buffer.write(String.fromCharCode(byte));
} else {
buffer.write('.');
}
}
}
buffer.write(' | ');
if (i + 16 < data.length) buffer.writeln();
}
return buffer.toString();
}

View File

@@ -0,0 +1,31 @@
import 'dart:io';
import 'dart:async';
import 'package:sweepstore/debug.dart';
void main() async {
int refreshCount = 0;
while (true) {
// Clear console
if (Platform.isWindows) {
print(Process.runSync("cls", [], runInShell: true).stdout);
} else {
print(Process.runSync("clear", [], runInShell: true).stdout);
}
refreshCount++;
// Read example.bin
final file = File('example.bin');
if (await file.exists()) {
final data = await file.readAsBytes();
print('Binary dump of example.bin (${data.length} bytes) - Refresh #$refreshCount\n');
print(binaryDump(data));
print('\n--- Refreshing in 1 seconds ---');
} else {
print('Error: example.bin not found - Refresh #$refreshCount');
}
await Future.delayed(Duration(seconds: 1));
}
}

View File

@@ -0,0 +1,84 @@
import 'dart:io';
import 'dart:async';
import 'package:sweepstore/header.dart';
void main() async {
int refreshCount = 0;
int? previousMasterHeartbeat;
Map<int, int> previousWorkerHeartbeats = {};
while (true) {
// Clear console
if (Platform.isWindows) {
print(Process.runSync("cls", [], runInShell: true).stdout);
} else {
print(Process.runSync("clear", [], runInShell: true).stdout);
}
refreshCount++;
// Read example.bin
final file = File('example.bin');
if (await file.exists()) {
final raf = await file.open(mode: FileMode.read);
try {
final header = SweepstoreHeader(raf);
final concurrency = header.concurrency;
int now32 = (DateTime.now().millisecondsSinceEpoch ~/ 1000) & 0xFFFFFFFF;
print('Sweepstore Tickets - Refresh #$refreshCount');
print('Current Time (now32): $now32');
print('Master ID: ${concurrency.masterIdentifier}');
int masterAge = now32 - concurrency.masterHeartbeat;
String masterStatus = masterAge > 5 ? "(stale)" : "(active)";
String masterPrevious = previousMasterHeartbeat != null ? "(previously $previousMasterHeartbeat)" : "";
print('Master Heartbeat: ${concurrency.masterHeartbeat} $masterStatus $masterPrevious');
print('Workers: ${concurrency.numberOfWorkers}');
print('Read Allowed: ${concurrency.isReadAllowed}');
print('');
// display each ticket
for (int i = 0; i < concurrency.numberOfWorkers; i++) {
final ticket = concurrency[i];
print('--- Ticket #$i ---');
print(' Identifier: ${ticket.identifier}');
int workerAge = now32 - ticket.workerHeartbeat;
String workerStatus = workerAge > 5 ? "(stale)" : "(active)";
String workerPrevious = previousWorkerHeartbeats.containsKey(i) ? "(previously ${previousWorkerHeartbeats[i]})" : "";
print(' Heartbeat: ${ticket.workerHeartbeat} $workerStatus $workerPrevious');
print(' State: ${ticket.ticketState.name}');
print(' Operation: ${ticket.ticketOperation.name}');
print(' Key Hash: ${ticket.keyHash}');
print(' Write Ptr: ${ticket.writePointer}');
print(' Write Size: ${ticket.writeSize} bytes');
print('');
// update previous heartbeat
previousWorkerHeartbeats[i] = ticket.workerHeartbeat;
}
// updat previous master heartbeat
previousMasterHeartbeat = concurrency.masterHeartbeat;
print('--- Refreshing in 1 seconds ---');
} finally {
await raf.close();
}
} else {
print('Error: example.bin not found - Refresh #$refreshCount');
}
await Future.delayed(Duration(seconds: 1));
}
}

440
dart/lib/header.dart Normal file
View File

@@ -0,0 +1,440 @@
import 'dart:convert';
import 'dart:io';
import 'package:sweepstore/structures.dart';
import 'helper_extensions.dart';
void initialiseSweepstoreHeader(RandomAccessFile file, {
int concurrentWorkers = 4,
}) {
SweepstoreHeaderWriter header = SweepstoreHeaderWriter(file);
if (header.magicNumber == 'SWPT') {
throw ArgumentError('Sweepstore file is already initialised.');
}
SweepstoreConcurrencyHeaderWriter concurrencyHeader = SweepstoreConcurrencyHeaderWriter(header);
header.magicNumber = 'SWPT';
header.version = "undefined";
header.addressTablePointer = SweepstorePointer.nullptr;
header.freeListCount = 0;
header.isFreeListLifted = false;
concurrencyHeader.masterIdentifier = 0;
concurrencyHeader.masterHeartbeat = 0;
concurrencyHeader.numberOfWorkers = concurrentWorkers;
concurrencyHeader.isReadAllowed = false;
for (int i = 0; i < concurrentWorkers; i++) {
SweepstoreWorkerTicket ticket = concurrencyHeader[i];
ticket.write(
identifier: 0,
workerHeartbeat: 0,
ticketState: SweepstoreTicketState.FREE,
ticketOperation: SweepstoreTicketOperation.NONE,
keyHash: 0,
writePointer: SweepstorePointer.nullptr,
writeSize: 0,
);
}
}
class SweepstoreHeader {
final RandomAccessFile _file;
SweepstoreHeader(this._file);
// Offset 0 - 4 bytes
String get magicNumber {
_file.setPositionSync(0);
final bytes = _file.readSync(4);
return String.fromCharCodes(bytes);
}
// Offset 4 - 12 bytes
String get version {
_file.setPositionSync(4);
String version = utf8.decode(_file.readSync(12)).trim();
return version;
}
// Offset 16 - 8 bytes
SweepstorePointer get addressTablePointer {
_file.setPositionSync(16);
final address = _file.readIntSync(8);
return SweepstorePointer(address);
}
// Offset 24 - 4 bytes
int get freeListCount {
_file.setPositionSync(24);
int count = _file.readIntSync(4);
return count;
}
// Offset 28 - 1 byte
bool get isFreeListLifted {
_file.setPositionSync(28);
int flag = _file.readIntSync(1);
return flag != 0;
}
SweepstoreConcurrencyHeader get concurrency => SweepstoreConcurrencyHeader(this);
}
class SweepstoreHeaderWriter extends SweepstoreHeader {
SweepstoreHeaderWriter(RandomAccessFile file) : super(file);
// Offset 0 - 4 bytes
void set magicNumber(String value) {
if (value.length != 4) {
throw ArgumentError('Magic number must be exactly 4 characters long');
}
_file.setPositionSync(0);
_file.writeFromSync(value.codeUnits);
}
// Offset 4 - 12 bytes
void set version(String value) {
if (value.length > 11) {
throw ArgumentError('Version string must be at most 11 characters long');
}
_file.setPositionSync(4);
_file.writeFromSync(utf8.encode(" " + value.padRight(11, ' ').substring(0, 11)));
}
// Offset 16 - 8 bytes
void set addressTablePointer(SweepstorePointer pointer) {
_file.setPositionSync(16);
_file.writeIntSync(pointer.address, 8);
}
// Offset 24 - 4 bytes
void set freeListCount(int value) {
_file.setPositionSync(24);
_file.writeIntSync(value, 4);
}
// Offset 28 - 1 byte
void set isFreeListLifted(bool lifted) {
_file.setPositionSync(28);
_file.writeIntSync(lifted ? 1 : 0, 1);
}
}
class SweepstoreConcurrencyHeader {
final SweepstoreHeader _header;
SweepstoreConcurrencyHeader(this._header);
// Offset 29 - 8 bytes
int get masterIdentifier {
_header._file.setPositionSync(29);
int id = _header._file.readIntSync(8);
return id;
}
// Offset 37 - 4 bytes
int get masterHeartbeat {
_header._file.setPositionSync(37);
int heartbeat = _header._file.readIntSync(4);
return heartbeat;
}
// Offset 41 - 4 bytes
int get numberOfWorkers {
_header._file.setPositionSync(41);
int numWorkers = _header._file.readIntSync(4);
return numWorkers;
}
// Offset 45 - 1 byte
bool get isReadAllowed {
_header._file.setPositionSync(45);
int flag = _header._file.readIntSync(1);
return flag != 0;
}
SweepstoreWorkerTicket operator [](int ticketIndex) {
if (ticketIndex < 0 || ticketIndex >= numberOfWorkers) {
throw RangeError.index(ticketIndex, this, 'ticketIndex', null, numberOfWorkers);
}
return SweepstoreWorkerTicket(ticketIndex, this);
}
}
class SweepstoreConcurrencyHeaderWriter extends SweepstoreConcurrencyHeader {
SweepstoreConcurrencyHeaderWriter(SweepstoreHeader header) : super(header);
// Offset 29 - 8 bytes
void set masterIdentifier(int id) {
_header._file.setPositionSync(29);
_header._file.writeIntSync(id, 8);
}
// Offset 37 - 4 bytes
void set masterHeartbeat(int heartbeat) {
_header._file.setPositionSync(37);
_header._file.writeIntSync(heartbeat, 4);
}
// Offset 41 - 4 bytes
void set numberOfWorkers(int numWorkers) {
_header._file.setPositionSync(41);
_header._file.writeIntSync(numWorkers, 4);
}
// Offset 45 - 1 byte
void set isReadAllowed(bool allowed) {
_header._file.setPositionSync(45);
_header._file.writeIntSync(allowed ? 1 : 0, 1);
}
}
const int endOfStaticHeaderOffset = 46;
class SweepstoreWorkerTicket {
static const int _ticketSize = 30;
final SweepstoreConcurrencyHeader _concurrencyHeader;
final int ticketIndex;
SweepstoreWorkerTicket(this.ticketIndex, this._concurrencyHeader);
// All offsets are relative to the start of the workers ticket
int get _baseOffset => endOfStaticHeaderOffset + (ticketIndex * _ticketSize);
// Offset 0 - 4 bytes
int get identifier {
_concurrencyHeader._header._file.setPositionSync(_baseOffset);
int id = _concurrencyHeader._header._file.readIntSync(4);
return id;
}
// Offset 4 - 4 bytes
int get workerHeartbeat {
_concurrencyHeader._header._file.setPositionSync(_baseOffset + 4);
int heartbeat = _concurrencyHeader._header._file.readIntSync(4);
return heartbeat;
}
// Offset 8 - 1 byte
SweepstoreTicketState get ticketState {
_concurrencyHeader._header._file.setPositionSync(_baseOffset + 8);
int stateValue = _concurrencyHeader._header._file.readIntSync(1);
return SweepstoreTicketState.values[stateValue];
}
// Offset 9 - 1 byte
SweepstoreTicketOperation get ticketOperation {
_concurrencyHeader._header._file.setPositionSync(_baseOffset + 9);
int operationValue = _concurrencyHeader._header._file.readIntSync(1);
return SweepstoreTicketOperation.values[operationValue];
}
// Offset 10 - 8 bytes
int get keyHash {
_concurrencyHeader._header._file.setPositionSync(_baseOffset + 10);
int hash = _concurrencyHeader._header._file.readIntSync(8);
return hash;
}
// Offset 18 - 8 bytes
SweepstorePointer get writePointer {
_concurrencyHeader._header._file.setPositionSync(_baseOffset + 18);
int address = _concurrencyHeader._header._file.readIntSync(8);
return SweepstorePointer(address);
}
// Offset 26 - 4 bytes
int get writeSize {
_concurrencyHeader._header._file.setPositionSync(_baseOffset + 26);
int size = _concurrencyHeader._header._file.readIntSync(4);
return size;
}
// Writer
void write({
int? identifier,
int? workerHeartbeat,
SweepstoreTicketState? ticketState,
SweepstoreTicketOperation? ticketOperation,
int? keyHash,
SweepstorePointer? writePointer,
int? writeSize,
}) {
try {
_concurrencyHeader._header._file.lockSync(FileLock.blockingExclusive, _baseOffset, _baseOffset + _ticketSize);
_concurrencyHeader._header._file.setPositionSync(_baseOffset);
List<int> existingBuffer = _concurrencyHeader._header._file.readSync(_ticketSize);
RandomAccessMemory buffer = RandomAccessMemory(existingBuffer);
if (identifier != null) {
buffer.setPositionSync(0);
buffer.writeIntSync(identifier, 4);
}
if (workerHeartbeat != null) {
buffer.setPositionSync(4);
buffer.writeIntSync(workerHeartbeat, 4);
}
if (ticketState != null) {
buffer.setPositionSync(8);
buffer.writeIntSync(ticketState.index, 1);
}
if (ticketOperation != null) {
buffer.setPositionSync(9);
buffer.writeIntSync(ticketOperation.index, 1);
}
if (keyHash != null) {
buffer.setPositionSync(10);
buffer.writeIntSync(keyHash, 8);
}
if (writePointer != null) {
buffer.setPositionSync(18);
buffer.writeIntSync(writePointer.address, 8);
}
if (writeSize != null) {
buffer.setPositionSync(26);
buffer.writeIntSync(writeSize, 4);
}
_concurrencyHeader._header._file.setPositionSync(_baseOffset);
_concurrencyHeader._header._file.writeFromSync(buffer.toUint8List());
} finally {
_concurrencyHeader._header._file.unlockSync(_baseOffset, _baseOffset + _ticketSize);
}
}
bool writable() {
try {
_concurrencyHeader._header._file.lockSync(
FileLock.blockingExclusive,
_baseOffset,
_baseOffset + _ticketSize
);
// Successfully locked - immediately unlock and return true
_concurrencyHeader._header._file.unlockSync(_baseOffset, _baseOffset + _ticketSize);
return true;
} catch (e) {
// Lock failed - already held by another process
return false;
}
}
}
@deprecated
class _SweepstoreWorkerTicket {
static const int _ticketSize = 30;
final SweepstoreConcurrencyHeader _concurrencyHeader;
final int ticketIndex;
_SweepstoreWorkerTicket(this.ticketIndex, this._concurrencyHeader);
// All offsets are relative to the start of the workers ticket
int get _baseOffset => endOfStaticHeaderOffset + (ticketIndex * _ticketSize);
// Offset 0 - 4 bytes
int get identifier {
_concurrencyHeader._header._file.setPositionSync(_baseOffset);
int id = _concurrencyHeader._header._file.readIntSync(4);
return id;
}
set identifier(int id) {
_concurrencyHeader._header._file.setPositionSync(_baseOffset);
_concurrencyHeader._header._file.writeIntSync(id, 4);
_concurrencyHeader._header._file.flushSync();
}
// Offset 4 - 4 bytes
int get workerHeartbeat {
_concurrencyHeader._header._file.setPositionSync(_baseOffset + 4);
int heartbeat = _concurrencyHeader._header._file.readIntSync(4);
return heartbeat;
}
set workerHeartbeat(int heartbeat) {
_concurrencyHeader._header._file.setPositionSync(_baseOffset + 4);
_concurrencyHeader._header._file.writeIntSync(heartbeat, 4);
}
// Offset 8 - 1 byte
SweepstoreTicketState get ticketState {
_concurrencyHeader._header._file.setPositionSync(_baseOffset + 8);
int stateValue = _concurrencyHeader._header._file.readIntSync(1);
return SweepstoreTicketState.values[stateValue];
}
set ticketState(SweepstoreTicketState state) {
_concurrencyHeader._header._file.setPositionSync(_baseOffset + 8);
_concurrencyHeader._header._file.writeIntSync(state.index, 1);
_concurrencyHeader._header._file.flushSync();
}
// Offset 9 - 1 byte
SweepstoreTicketOperation get ticketOperation {
_concurrencyHeader._header._file.setPositionSync(_baseOffset + 9);
int operationValue = _concurrencyHeader._header._file.readIntSync(1);
return SweepstoreTicketOperation.values[operationValue];
}
set ticketOperation(SweepstoreTicketOperation operation) {
_concurrencyHeader._header._file.setPositionSync(_baseOffset + 9);
_concurrencyHeader._header._file.writeIntSync(operation.index, 1);
}
// Offset 10 - 8 bytes
int get keyHash {
_concurrencyHeader._header._file.setPositionSync(_baseOffset + 10);
int hash = _concurrencyHeader._header._file.readIntSync(8);
return hash;
}
set keyHash(int hash) {
_concurrencyHeader._header._file.setPositionSync(_baseOffset + 10);
_concurrencyHeader._header._file.writeIntSync(hash, 8);
}
// Offset 18 - 8 bytes
SweepstorePointer get writePointer {
_concurrencyHeader._header._file.setPositionSync(_baseOffset + 18);
int address = _concurrencyHeader._header._file.readIntSync(8);
return SweepstorePointer(address);
}
set writePointer(SweepstorePointer pointer) {
_concurrencyHeader._header._file.setPositionSync(_baseOffset + 18);
_concurrencyHeader._header._file.writeIntSync(pointer.address, 8);
}
// Offset 26 - 4 bytes
int get writeSize {
_concurrencyHeader._header._file.setPositionSync(_baseOffset + 26);
int size = _concurrencyHeader._header._file.readIntSync(4);
return size;
}
set writeSize(int size) {
_concurrencyHeader._header._file.setPositionSync(_baseOffset + 26);
_concurrencyHeader._header._file.writeIntSync(size, 4);
}
// Helpers
void lock() {
_concurrencyHeader._header._file.lockSync(FileLock.exclusive, _baseOffset, _baseOffset + _ticketSize);
}
void unlock() {
_concurrencyHeader._header._file.unlockSync(_baseOffset, _baseOffset + _ticketSize);
}
}

View File

@@ -0,0 +1,228 @@
import 'dart:io';
import 'dart:math';
import 'dart:typed_data';
import 'package:sweepstore/structures.dart';
class RandomAccessMemory {
List<int> _buffer;
int _position = 0;
RandomAccessMemory([List<int>? initialData]) : _buffer = initialData != null ? List<int>.from(initialData) : [];
// Position management
int positionSync() => _position;
void setPositionSync(int position) {
_position = position;
}
int length() => _buffer.length;
// Read bytes
List<int> readSync(int count) {
if (_position + count > _buffer.length) {
throw RangeError('Not enough bytes to read');
}
List<int> result = _buffer.sublist(_position, _position + count);
_position += count;
return result;
}
// Write bytes
void writeFromSync(List<int> bytes) {
for (int i = 0; i < bytes.length; i++) {
if (_position + i >= _buffer.length) {
_buffer.add(bytes[i]);
} else {
_buffer[_position + i] = bytes[i];
}
}
_position += bytes.length;
}
// Read/Write Int Dynamic
int readIntSync([int size = 4, Endian endianness = Endian.little]) {
if (size < 1 || size > 8) {
throw ArgumentError('Size must be between 1 and 8 bytes');
}
List<int> bytes = readSync(size);
// Build integer from bytes with proper endianness
int result = 0;
if (endianness == Endian.little) {
for (int i = size - 1; i >= 0; i--) {
result = (result << 8) | bytes[i];
}
} else {
for (int i = 0; i < size; i++) {
result = (result << 8) | bytes[i];
}
}
// Sign extend if MSB is set
int signBit = 1 << (size * 8 - 1);
if (result & signBit != 0) {
result -= 1 << (size * 8);
}
return result;
}
void writeIntSync(int value, [int size = 4, Endian endianness = Endian.little]) {
if (size < 1 || size > 8) {
throw ArgumentError('Size must be between 1 and 8 bytes');
}
List<int> bytes = List.filled(size, 0);
// Extract bytes with proper endianness
if (endianness == Endian.little) {
for (int i = 0; i < size; i++) {
bytes[i] = (value >> (i * 8)) & 0xFF;
}
} else {
for (int i = 0; i < size; i++) {
bytes[size - 1 - i] = (value >> (i * 8)) & 0xFF;
}
}
writeFromSync(bytes);
}
// Read/Write Pointers
SweepstorePointer readPointerSync() {
int offset = readIntSync(SweepstorePrimitives.POINTER.size);
return SweepstorePointer(offset);
}
void writePointerSync(SweepstorePointer pointer) {
writeIntSync(pointer.address, SweepstorePrimitives.POINTER.size);
}
// Read/Write Float32
double readFloat32Sync([Endian endianness = Endian.little]) {
List<int> bytes = readSync(4);
return ByteData.sublistView(Uint8List.fromList(bytes)).getFloat32(0, endianness);
}
void writeFloat32Sync(double value, [Endian endianness = Endian.little]) {
ByteData byteData = ByteData(4);
byteData.setFloat32(0, value, endianness);
writeFromSync(byteData.buffer.asUint8List());
}
// Read/Write Float64 (Double)
double readFloat64Sync([Endian endianness = Endian.little]) {
List<int> bytes = readSync(8);
return ByteData.sublistView(Uint8List.fromList(bytes)).getFloat64(0, endianness);
}
void writeFloat64Sync(double value, [Endian endianness = Endian.little]) {
ByteData byteData = ByteData(8);
byteData.setFloat64(0, value, endianness);
writeFromSync(byteData.buffer.asUint8List());
}
// Conversion methods
List<int> toList() => List<int>.from(_buffer);
Uint8List toUint8List() => Uint8List.fromList(_buffer);
}
extension SweepstoreRandomAccessFileHelper on RandomAccessFile {
// Read/Write Int Dynamic - Can specify size in bytes, does not have to align to 1, 2, 4, or 8 bytes. Default is 4 bytes (Int32)
int readIntSync([int size = 4, Endian endianness = Endian.little]) {
if (size < 1 || size > 8) {
throw ArgumentError('Size must be between 1 and 8 bytes');
}
List<int> bytes = readSync(size);
// Build integer from bytes with proper endianness
int result = 0;
if (endianness == Endian.little) {
for (int i = size - 1; i >= 0; i--) {
result = (result << 8) | bytes[i];
}
} else {
for (int i = 0; i < size; i++) {
result = (result << 8) | bytes[i];
}
}
// Sign extend if MSB is set
int signBit = 1 << (size * 8 - 1);
if (result & signBit != 0) {
result -= 1 << (size * 8);
}
return result;
}
void writeIntSync(int value, [int size = 4, Endian endianness = Endian.little]) {
if (size < 1 || size > 8) {
throw ArgumentError('Size must be between 1 and 8 bytes');
}
List<int> bytes = List.filled(size, 0);
// Extract bytes with proper endianness
if (endianness == Endian.little) {
for (int i = 0; i < size; i++) {
bytes[i] = (value >> (i * 8)) & 0xFF;
}
} else {
for (int i = 0; i < size; i++) {
bytes[size - 1 - i] = (value >> (i * 8)) & 0xFF;
}
}
writeFromSync(bytes);
}
// Read/Write Pointers
SweepstorePointer readPointerSync() {
int offset = readIntSync(SweepstorePrimitives.POINTER.size);
return SweepstorePointer(offset);
}
void writePointerSync(SweepstorePointer pointer) {
writeIntSync(pointer.address, SweepstorePrimitives.POINTER.size);
}
// Read/Write Float32
double readFloat32Sync([Endian endianness = Endian.little]) {
List<int> bytes = readSync(4);
return ByteData.sublistView(Uint8List.fromList(bytes)).getFloat32(0, endianness);
}
void writeFloat32Sync(double value, [Endian endianness = Endian.little]) {
ByteData byteData = ByteData(4);
byteData.setFloat32(0, value, endianness);
writeFromSync(byteData.buffer.asUint8List());
}
// Read/Write Float64 (Double)
double readFloat64Sync([Endian endianness = Endian.little]) {
List<int> bytes = readSync(8);
return ByteData.sublistView(Uint8List.fromList(bytes)).getFloat64(0, endianness);
}
void writeFloat64Sync(double value, [Endian endianness = Endian.little]) {
ByteData byteData = ByteData(8);
byteData.setFloat64(0, value, endianness);
writeFromSync(byteData.buffer.asUint8List());
}
}
extension SweepstoreDateTimeHelper on DateTime {
int millisecondsSinceEpoch32() {
return (millisecondsSinceEpoch ~/ 1000) & 0xFFFFFFFF;
}
int millisecondsSinceEpoch64() {
return millisecondsSinceEpoch;
}
}

48
dart/lib/structures.dart Normal file
View File

@@ -0,0 +1,48 @@
enum SweepstorePrimitives {
POINTER (8),
ADDRESS_TABLE (-1);
final int size;
final bool arrayType;
const SweepstorePrimitives(this.size, {
this.arrayType = false
});
}
class SweepstorePointer {
static const SweepstorePointer nullptr = SweepstorePointer(-1);
final int address;
const SweepstorePointer(this.address);
bool get isNull => address == -1;
operator ==(Object other) {
if (identical(this, other)) return true;
if (other is! SweepstorePointer) return false;
return address == other.address;
}
@override
String toString() => '0x${address.toRadixString(16)} ($address)';
}
enum SweepstoreTicketState {
FREE,
WAITING,
APPROVED,
EXECUTING,
COMPLETED,
}
enum SweepstoreTicketOperation {
NONE,
READ,
MODIFY,
WRITE,
}

105
dart/lib/sweepstore.dart Normal file
View File

@@ -0,0 +1,105 @@
import 'dart:io';
import 'dart:isolate';
import 'dart:math';
import 'package:sweepstore/debug.dart';
import 'package:sweepstore/header.dart';
import 'package:sweepstore/structures.dart';
import 'package:sweepstore/concurrency.dart';
class Sweepstore {
final RandomAccessFile _file;
Sweepstore(String filePath)
: _file = File(filePath).openSync(mode: FileMode.append)
{
_header = SweepstoreHeaderWriter(_file);
}
late final SweepstoreHeaderWriter _header;
late final SweepstoreConcurrencyHeaderWriter _concurrencyHeader = SweepstoreConcurrencyHeaderWriter(_header);
void initialise({
int concurrentWorkers = 4,
}) {
initialiseSweepstoreHeader(_file,
concurrentWorkers: concurrentWorkers,
);
_header.version = "1.1.0.1";
print("Version: ${_header.version}");
}
void operator []=(String key, dynamic value) {
spawnTicket(_file,
operation: SweepstoreTicketOperation.WRITE,
keyHash: key.hashCode,
writeSize: 0, // Placeholder
onApproved: () {
print("Writing key: $key with hash ${key.hashCode} and value: $value");
},
debugLabel: key
);
}
}
Future<void> main() async {
String filePath = '../example.bin';
File file = File(filePath);
if (file.existsSync()) {
file.deleteSync();
}
file.createSync();
Sweepstore store = Sweepstore(filePath);
store.initialise(
concurrentWorkers: 20
);
initialiseMasterListener(file.openSync(mode: FileMode.append));
print(binaryDump(file.readAsBytesSync()));
int iteration = 1;
for (int j = 0; j < iteration; j++) {
int concurrencyTest = 256;
final receivePort = ReceivePort();
int completedJobs = 0;
final stopwatch = Stopwatch()..start();
for (int i = 0; i < concurrencyTest; i++) {
await Isolate.spawn((message) {
final index = message['index'] as int;
final sendPort = message['sendPort'] as SendPort;
Sweepstore store = Sweepstore(filePath);
store['key_$index'] = 'value_$index';
sendPort.send('done');
}, {'index': i, 'sendPort': receivePort.sendPort});
}
// wait for all jobs to finish
await for (var msg in receivePort) {
completedJobs++;
if (completedJobs >= concurrencyTest) {
receivePort.close();
break;
}
}
stopwatch.stop();
print('\x1B[95mAll jobs completed!\x1B[0m');
print('\x1B[95mTime taken: ${stopwatch.elapsedMilliseconds}ms (${stopwatch.elapsed.inSeconds}s)\x1B[0m');
}
}

View File

@@ -1,7 +1,7 @@
name: file_formats
description: A sample command-line application with basic argument parsing.
version: 0.0.1
# repository: https://github.com/my_org/my_repo
name: sweepstore
description: SweepStore (formerly BinaryTable) A high-performance binary storage format for Dart applications with efficient memory management and random access capabilities.
version: 1.0.0
repository: https://github.com/ImBenji03/SweepStore
environment:
sdk: ^3.0.0

View File

@@ -11,7 +11,7 @@
© 2025-26 by Benjamin Watt of IMBENJI.NET LIMITED - All rights reserved.
Use of this source code is governed by a MIT license that can be found in the LICENSE file.
Use of this source code is governed by the Business Source License 1.1 that can be found in the LICENSE file.
This file is part of the SweepStore (formerly Binary Table) package for Dart.
@@ -19,6 +19,8 @@ This file is part of the SweepStore (formerly Binary Table) package for Dart.
import 'dart:convert';
import 'dart:io';
import 'dart:isolate';
import 'dart:math';
import 'dart:typed_data';
enum BT_Type {
@@ -313,7 +315,7 @@ class BT_UniformArray extends BT_Reference {
void addAll(Iterable<dynamic> values) {
_table.antiFreeListScope(() {
_table._antiFreeListScope(() {
// Determine the type of the array by reading the first items type
BT_Type type = elementType ?? BT_Type.fromDynamic(values.first);
@@ -344,16 +346,15 @@ class BT_UniformArray extends BT_Reference {
fullBuffer.replaceRange(1, 5, lengthBytes);
// Free the old array
_table.free(_pointer, size);
_table._free(_pointer, size);
// Allocate new space for the updated array
BT_Pointer newPointer = _table.alloc(fullBuffer.length);
BT_Pointer newPointer = _table._alloc(fullBuffer.length);
// Replace any references to the old pointer with the new one
Map<int, BT_Pointer> addressTable = _table._addressTable;
addressTable.updateAll((key, value) {
if (value == _pointer) {
print('Updating address table entry for key $key from $value to $newPointer');
return newPointer;
}
return value;
@@ -364,8 +365,6 @@ class BT_UniformArray extends BT_Reference {
// Write the updated buffer to the new location
_table._file.setPositionSync(newPointer.address);
_table._file.writeFromSync(fullBuffer);
print('Array resized to new length $newLength at $newPointer');
});
}
@@ -468,7 +467,6 @@ extension FreeList on List<BT_FreeListEntry> {
New encoding should reflect a "read from the end" approach.
So it should look like:
- Entries (variable)
- Entry count (4 bytes)
Entry:
- Pointer (8 bytes)
@@ -485,9 +483,6 @@ extension FreeList on List<BT_FreeListEntry> {
buffer.addAll((ByteData(4)..setInt32(0, entry.size, Endian.little)).buffer.asUint8List());
}
// Entry count
buffer.addAll((ByteData(4)..setInt32(0, length, Endian.little)).buffer.asUint8List());
return buffer;
}
@@ -509,21 +504,239 @@ extension fnv1a on String {
}
// Convert double to float16
extension on double {
List<int> _toFloat16Bytes() {
ByteData float32Data = ByteData(4);
float32Data.setFloat32(0, this, Endian.little);
int f = float32Data.getUint32(0);
int sign = (f >> 31) & 0x1;
int exponent = (f >> 23) & 0xff;
int mantissa = f & 0x7fffff;
if (exponent == 0xff) { // Inf or NaN
if (mantissa != 0) {
return [(sign << 7) | 0x7e, 0x01]; // NaN
} else {
return [(sign << 7) | 0x7c, 0x00]; // Inf
}
}
if (exponent == 0) { // Zero or denormalized
return [sign << 7, 0x00]; // Zero
}
// Adjust exponent for float16 (bias: 127 for float32, 15 for float16)
exponent = exponent - 127 + 15;
if (exponent >= 31) { // Overflow to infinity
return [(sign << 7) | 0x7c, 0x00]; // Inf
}
if (exponent <= 0) { // Underflow to zero
return [sign << 7, 0x00]; // Zero
}
// Extract the top 10 bits of mantissa for float16
mantissa >>= 13;
int float16 = (sign << 15) | (exponent << 10) | mantissa;
return [float16 & 0xff, (float16 >> 8) & 0xff]; // Little endian
}
}
enum BT_TicketState {
IDLE,
WAITING,
APPROVED,
EXECUTING,
COMPLETED,
}
enum BT_TicketOpcode {
NONE,
READ,
MODIFY,
WRITE,
}
class BT_Ticket {
final int heartbeat;
final BT_TicketState state;
final BT_TicketOpcode opcode;
final int keyHash;
// Defined by the slave
final int writeSize;
// Defined by the master
final BT_Pointer writePointer;
BT_Ticket({
required this.heartbeat,
required this.state,
required this.opcode,
required this.keyHash,
this.writeSize = 0,
this.writePointer = BT_Null,
});
BT_Ticket.fromIntList(List<int> data)
: heartbeat = ByteData.sublistView(Uint8List.fromList(data.sublist(0, 4))).getInt32(0, Endian.little),
state = BT_TicketState.values[data[4]],
opcode = BT_TicketOpcode.values[data[5]],
keyHash = ByteData.sublistView(Uint8List.fromList(data.sublist(6, 14))).getInt64(0, Endian.little),
writePointer = BT_Pointer(ByteData.sublistView(Uint8List.fromList(data.sublist(14, 22))).getInt64(0, Endian.little)),
writeSize = ByteData.sublistView(Uint8List.fromList(data.sublist(22, 26))).getInt32(0, Endian.little);
List<int> toIntList() {
List<int> data = [];
// Heartbeat (4 bytes)
data.addAll((ByteData(4)..setInt32(0, heartbeat, Endian.little)).buffer.asUint8List());
// State (1 byte)
data.add(state.index);
// Opcode (1 byte)
data.add(opcode.index);
// Key Hash (8 bytes)
data.addAll((ByteData(8)..setInt64(0, keyHash, Endian.little)).buffer.asUint8List());
// Write Pointer (8 bytes)
data.addAll((ByteData(8)..setInt64(0, writePointer.address, Endian.little)).buffer.asUint8List());
// Write Size (4 bytes)
data.addAll((ByteData(4)..setInt32(0, writeSize, Endian.little)).buffer.asUint8List());
return data;
}
String toString() {
return 'BT_Ticket(heartbeat: $heartbeat, state: $state, opcode: $opcode, keyHash: $keyHash, writePointer: $writePointer, writeSize: $writeSize)';
}
}
class BinaryTable {
late final int sessionId;
RandomAccessFile _file;
BinaryTable(String path) : _file = File(path).openSync(mode: FileMode.append);
BinaryTable(String path) : _file = File(path).openSync(mode: FileMode.append) {
void initialise() {
var nextSessionId = Random.secure();
int high = nextSessionId.nextInt(1 << 32);
int low = nextSessionId.nextInt(1 << 32);
sessionId = (high << 32) | low;
// Check if the file is initialised
_file.setPositionSync(0);
_file.writePointerSync(BT_Null); // Address table pointer
_file.writeIntSync(0, 4); // Free list entry count
List<int> magicNumber = _file.readSync(4);
bool isInitialised = magicNumber.length == 4 &&
magicNumber[0] == 'S'.codeUnitAt(0) &&
magicNumber[1] == 'W'.codeUnitAt(0) &&
magicNumber[2] == 'P'.codeUnitAt(0) &&
magicNumber[3] == 'S'.codeUnitAt(0);
if (isInitialised && !isMasterAlive) {
_initialiseMaster();
}
}
void initialise({
int concurrentReaders = 4,
})
{
_file.setPositionSync(0);
// Ensure the file hasnt already been initialised
List<int> magicNumber = _file.readSync(4);
bool isInitialised = magicNumber.length == 4 &&
magicNumber[0] == 'S'.codeUnitAt(0) &&
magicNumber[1] == 'W'.codeUnitAt(0) &&
magicNumber[2] == 'P'.codeUnitAt(0) &&
magicNumber[3] == 'S'.codeUnitAt(0);
if (isInitialised) {
throw Exception('Binary Table file is already initialised.');
}
// Magic number "SWPS" (0/4 bytes)
_file.writeFromSync("SWPS".codeUnits);
// Version (1.0 float16) (4/2 bytes)
_file.writeFromSync(1.0._toFloat16Bytes());
// Address table pointer (null) (6/8 bytes)
_file.writePointerSync(BT_Null);
// Free list count (0) (14/4 bytes)
_file.writeIntSync(0, 4); // Free list entry count
/*
The values below are for concurrency.
*/
// Master Identifier (18/8 bytes)
_file.writeIntSync(sessionId & 0xFFFFFFFF, 8);
// Master Heartbeat (26/4 bytes)
int now = DateTime.now().millisecondsSinceEpoch;
_file.writeIntSync(now, 4);
// Number of concurrent readers (30/4 bytes) // Cannot be changed after initialisation
_file.writeIntSync(concurrentReaders, 4);
// Allow reads (34/1 bytes)
_file.writeByteSync(1);
// Everything else is operator slots starting at 35 bytes.
for (int i = 0; i < concurrentReaders; i++) {
// Slave Heartbeat (4 bytes)
_file.writeIntSync(0, 4);
// Ticket state (1 byte)
_file.writeByteSync(0);
// Ticket Opcode (1 byte)
_file.writeByteSync(0);
// Key Hash (8 bytes)
_file.writeIntSync(0, 8);
// Write Pointer (8 bytes)
_file.writeIntSync(-1, 8);
// Write Size (4 bytes)
_file.writeIntSync(0, 4);
}
// Run the master initialisation
_initialiseMaster();
}
/*
Address Table
*/
Map<int, BT_Pointer> get _addressTable {
_file.setPositionSync(0);
_file.setPositionSync(6);
BT_Reference tableRef = BT_Reference(this, _file.readPointerSync());
if (tableRef._pointer.isNull) {
@@ -570,34 +783,37 @@ class BinaryTable {
});
// Write new address table at end of file
BT_Pointer tableAddress = alloc(buffer.length);
BT_Pointer tableAddress = _alloc(buffer.length);
_file.setPositionSync(tableAddress.address);
_file.writeFromSync(buffer);
// Read old table pointer before updating
_file.setPositionSync(0);
_file.setPositionSync(6);
BT_Reference oldTableRef = BT_Reference(this, _file.readPointerSync());
// Update header to point to new table
_file.setPositionSync(0);
_file.setPositionSync(6);
_file.writePointerSync(tableAddress);
// Now free the old table if it exists and is not the same as the new one
if (!oldTableRef._pointer.isNull && oldTableRef._pointer != tableAddress) {
free(oldTableRef._pointer, oldTableRef.size);
_free(oldTableRef._pointer, oldTableRef.size);
}
}
/*
Free List
*/
bool freeListLifted = false;
List<BT_FreeListEntry>? _freeListCache;
List<BT_FreeListEntry> get _freeList {
if (freeListLifted) {
return _freeListCache ?? [];
}
_file.setPositionSync(_file.lengthSync() - 4);
_file.setPositionSync(14);
int entryCount = _file.readIntSync(4);
if (entryCount == 0) {
return [];
@@ -605,7 +821,7 @@ class BinaryTable {
int entrySize = BT_Type.POINTER.size + 4; // Pointer + Size
int freeListSize = entryCount * entrySize;
_file.setPositionSync(_file.lengthSync() - 4 - freeListSize);
_file.setPositionSync(_file.lengthSync() - freeListSize);
List<int> buffer = _file.readSync(freeListSize);
List<BT_FreeListEntry> freeList = [];
@@ -627,54 +843,77 @@ class BinaryTable {
return freeList;
}
set _freeList(List<BT_FreeListEntry> list) {
if (freeListLifted) {
_freeListCache = list;
return;
}
_file.setPositionSync(_file.lengthSync() - 4);
// Read OLD count from header
_file.setPositionSync(14);
int oldEntryCount = _file.readIntSync(4);
int oldListSize = (oldEntryCount * (BT_Type.POINTER.size + 4)) + 4; // Entries + Count
_file.truncateSync(_file.lengthSync() - oldListSize);
List<int> buffer = list.bt_encode();
_file.setPositionSync(_file.lengthSync());
_file.writeFromSync(buffer);
// Calculate old free list size (entries only, not count)
int oldListSize = oldEntryCount * (BT_Type.POINTER.size + 4);
// Remove old free list entries from EOF
if (oldEntryCount > 0) {
int currentLength = _file.lengthSync();
_file.truncateSync(currentLength - oldListSize);
}
// Write NEW count to header
_file.setPositionSync(14);
_file.writeIntSync(list.length, 4);
// Write NEW entries to EOF (if any)
if (list.isNotEmpty) {
List<int> buffer = list.bt_encode();
_file.setPositionSync(_file.lengthSync());
_file.writeFromSync(buffer);
}
}
/// Caches the free list in memory, and removed it from the file.
void liftFreeList() {
void _liftFreeList() {
if (freeListLifted) {
throw StateError('Free list is already lifted');
}
// Cache the free list
_freeListCache = _freeList;
_file.setPositionSync(_file.lengthSync() - 4);
// Read count from header
_file.setPositionSync(14);
int oldEntryCount = _file.readIntSync(4);
int oldEntrySize = BT_Type.POINTER.size + 4; // Pointer + Size
int oldFreeListSize = oldEntryCount * oldEntrySize + 4; // +4 for entry count
_file.truncateSync(_file.lengthSync() - oldFreeListSize);
if (oldEntryCount > 0) {
int oldEntrySize = BT_Type.POINTER.size + 4;
int oldFreeListSize = oldEntryCount * oldEntrySize; // Just entries, no count
// Remove free list entries from EOF
_file.truncateSync(_file.lengthSync() - oldFreeListSize);
}
// Clear count in header
_file.setPositionSync(14);
_file.writeIntSync(0, 4);
freeListLifted = true;
}
/// Appends the cached free list back to the file, and clears the cache.
void dropFreeList() {
if (!freeListLifted) {
throw StateError('Free list is not lifted');
}
_file.setPositionSync(_file.lengthSync());
_file.writeIntSync(0, 4); // Placeholder for entry count
freeListLifted = false;
_freeList = _freeListCache!;
_freeList = _freeListCache!; // This now writes count to header and entries to EOF
_freeListCache = null;
}
void antiFreeListScope(void Function() fn) {
liftFreeList();
void _antiFreeListScope(void Function() fn) {
_liftFreeList();
try {
fn();
} finally {
@@ -682,7 +921,7 @@ class BinaryTable {
}
}
void free(BT_Pointer pointer, int size) {
void _free(BT_Pointer pointer, int size) {
if (!freeListLifted) {
throw StateError('Free list must be lifted before freeing memory');
@@ -734,7 +973,7 @@ class BinaryTable {
// Update free list
_freeList = freeList;
}
BT_Pointer alloc(int size) {
BT_Pointer _alloc(int size) {
if (!freeListLifted) {
throw StateError('Free list must be lifted before allocation');
@@ -783,34 +1022,259 @@ class BinaryTable {
}
}
operator []=(String key, dynamic value) {
/*
Concurrency
*/
antiFreeListScope(() {
Map<int, BT_Pointer> addressTable = _addressTable;
void _initialiseMaster() {
Isolate.spawn((String filePath) {
RandomAccessFile file = File(filePath).openSync(mode: FileMode.append);
int keyHash = key.bt_hash;
if (addressTable.containsKey(keyHash)) {
throw Exception('Key already exists');
// Print with yellow [MASTER] prefix - use ansi
void _mstPrint(String message) {
print('\x1B[33m[MASTER]\x1B[0m $message');
}
late final int concurrentReaders;
file.setPositionSync(30);
concurrentReaders = file.readIntSync(4);
while (true) {
int now = DateTime.now().millisecondsSinceEpoch32();
file.setPositionSync(26);
file.writeIntSync(now, 4);
_mstPrint('Master heartbeat updated at $now');
file.setPositionSync(35);
for (int i = 0; i < concurrentReaders; i++) {
// Read ticket
List<int> ticketBuffer = file.readSync(26);
BT_Ticket ticket = BT_Ticket.fromIntList(ticketBuffer);
// Check if ticket is alive and waiting
int ticketHeartbeat = ticket.heartbeat;
bool isTicketAlive = (now - ticket.heartbeat) <= 5000;
BT_TicketState state = ticket.state;
if (isTicketAlive && state == BT_TicketState.WAITING) {
_mstPrint("Ticket ${i} is alive and waiting...");
// If reading is still allowed, we need to disallow it
file.setPositionSync(34);
if (file.readByteSync() == 1) {
_mstPrint("Disallowing reads for ticket processing...");
file.setPositionSync(34);
file.writeByteSync(0);
// _liftFreeList(); // Broken/Incorrect Logic. the isolate cant access the main instance's free list
}
// We need to give it a write pointer and approve it
// BT_Pointer allocation = _alloc(ticket.writeSize); // Broken/Incorrect Logic. the isolate cant access the main instance's free list
BT_Ticket approvedTicket = BT_Ticket(
heartbeat: ticket.heartbeat,
state: BT_TicketState.APPROVED,
opcode: ticket.opcode,
keyHash: ticket.keyHash,
writeSize: ticket.writeSize,
// writePointer: allocation, // Broken/Incorrect Logic. the isolate cant access the main instance's free list
);
// Write approved ticket back to file
file.setPositionSync(35 + i * 26);
file.writeFromSync(approvedTicket.toIntList());
}
}
// If reads are disallowed, we need to allow them again, and drop the free list
file.setPositionSync(34);
if (file.readByteSync() == 0) {
_mstPrint("Allowing reads after ticket processing...");
file.setPositionSync(34);
file.writeByteSync(1);
dropFreeList();
}
sleep(Duration(seconds: 1));
}
}, _file.path);
}
bool get isMasterAlive {
_file.setPositionSync(18);
int masterId = _file.readIntSync(8);
_file.setPositionSync(26);
int masterHeartbeat = _file.readIntSync(4);
int now = DateTime.now().millisecondsSinceEpoch32();
return (now - masterHeartbeat) <= 5000;
}
int get concurrentReaders {
_file.setPositionSync(30);
return _file.readIntSync(4);
}
bool get allowReads {
_file.setPositionSync(34);
int flag = _file.readByteSync();
return flag == 1;
}
// Add ticket
void _ticket({
required BT_TicketOpcode operation,
required int keyHash,
required int writeSize,
required void Function(BT_Ticket ticket)? onApproved,
}) {
// Reduce the chance of a race condition/deadlock by adding a small random delay
sleep(Duration(milliseconds: Random().nextInt(10)));
int? ticketIndex;
// Were gonna iterate through all the tickets and find an empty slot
while (ticketIndex == null) {
_file.setPositionSync(35);
for (int i = 0; i < concurrentReaders; i++) {
List<int> ticketBuffer = _file.readSync(26); // Size of a ticket entry
BT_Ticket ticket = BT_Ticket.fromIntList(ticketBuffer);
// Check if the heartbeat is stale (older than 5 seconds)
int now = DateTime.now().millisecondsSinceEpoch64();
if (now - ticket.heartbeat > 5000) {
// We found a stale ticket, we can take this slot
ticketIndex = i;
break;
}
}
sleep(Duration(milliseconds: 500));
}
if (ticketIndex == null) {
throw Exception('Failed to acquire ticket');
}
BT_Ticket newTicket = BT_Ticket(
heartbeat: DateTime.now().millisecondsSinceEpoch32(),
state: BT_TicketState.WAITING,
opcode: operation,
keyHash: keyHash,
writeSize: writeSize,
);
// Write the new ticket to the file
_file.setPositionSync(35 + ticketIndex * 38);
_file.writeFromSync(newTicket.toIntList());
// Wait for approval
while (true) {
print('Waiting for ticket approval...');
_file.setPositionSync(35 + ticketIndex * 38);
List<int> ticketBuffer = _file.readSync(38);
BT_Ticket ticket = BT_Ticket.fromIntList(ticketBuffer);
if (ticket.state == BT_TicketState.APPROVED) {
// Ticket approved
onApproved?.call(ticket);
break;
}
if (!isMasterAlive) {
print('Master is not alive, cannot proceed with ticket');
}
sleep(Duration(milliseconds: 1000));
// Update heartbeat
_file.setPositionSync(35 + ticketIndex * 38);
int now = DateTime.now().millisecondsSinceEpoch32();
_file.writeIntSync(now, 4);
}
List<int> valueBuffer = encodeValue(value);
// Write value to file
BT_Pointer valueAddress = alloc(valueBuffer.length);
_file.setPositionSync(valueAddress.address);
_file.writeFromSync(valueBuffer);
// Update address table
addressTable[keyHash] = valueAddress;
_addressTable = addressTable;
});
}
/*
Key-Value Operations
*/
operator []=(String key, dynamic value) {
Map<int, BT_Pointer> addressTable = _addressTable;
// Check if key already exists
int keyHash = key.bt_hash;
if (addressTable.containsKey(keyHash)) {
throw Exception('Key already exists'); // The pair should be deleted first
}
List<int> valueBuffer = encodeValue(value);
_ticket(
operation: BT_TicketOpcode.WRITE, // Modification operations will come later,
keyHash: keyHash,
writeSize: valueBuffer.length,
onApproved: (BT_Ticket ticket) {
// Write value to file
_file.setPositionSync(ticket.writePointer.address);
_file.writeFromSync(valueBuffer);
}
);
// v1.0.0 implementation
// _antiFreeListScope(() {
// Map<int, BT_Pointer> addressTable = _addressTable;
//
// int keyHash = key.bt_hash;
//
// // 1.1 Note: I cant remember why i did this check here.
// if (addressTable.containsKey(keyHash)) {
// throw Exception('Key already exists');
// }
//
//
//
// List<int> valueBuffer = encodeValue(value);
//
// // Write value to file
// BT_Pointer valueAddress = _alloc(valueBuffer.length);
//
// _file.setPositionSync(valueAddress.address);
// _file.writeFromSync(valueBuffer);
//
// // Update address table
// addressTable[keyHash] = valueAddress;
// _addressTable = addressTable;
// });
}
operator [](String key) {
while (!allowReads) {
// Wait until reads are allowed
sleep(Duration(milliseconds: 1));
}
Map<int, BT_Pointer> addressTable = _addressTable;
int keyHash = key.bt_hash;
@@ -827,7 +1291,7 @@ class BinaryTable {
void delete(String key) {
antiFreeListScope(() {
_antiFreeListScope(() {
Map<int, BT_Pointer> addressTable = _addressTable;
int keyHash = key.bt_hash;
@@ -840,7 +1304,7 @@ class BinaryTable {
BT_Reference valueRef = BT_Reference(this, valuePointer);
// Free the value
free(valuePointer, valueRef.size);
_free(valuePointer, valueRef.size);
// Remove from address table
addressTable.remove(keyHash);
@@ -851,7 +1315,7 @@ class BinaryTable {
void truncate() {
antiFreeListScope(() {
_antiFreeListScope(() {
// Relocate the address table if possible
_addressTable = _addressTable;
@@ -1065,3 +1529,13 @@ String binaryDump(Uint8List data) {
return buffer.toString();
}
// DateTime to 32-bit Unix timestamp (seconds since epoch)
extension on DateTime {
int millisecondsSinceEpoch32() {
return (millisecondsSinceEpoch ~/ 1000) & 0xFFFFFFFF;
}
int millisecondsSinceEpoch64() {
return millisecondsSinceEpoch;
}
}