Files
SweepStore/dart/lib/sweepstore.dart

119 lines
2.8 KiB
Dart

import 'dart:io';
import 'dart:isolate';
import 'dart:math';
import 'package:sweepstore/debug.dart';
import 'package:sweepstore/header.dart';
import 'package:sweepstore/structures.dart';
import 'package:sweepstore/concurrency.dart';
class Sweepstore {
final RandomAccessFile _file;
Sweepstore(String filePath)
: _file = File(filePath).openSync(mode: FileMode.append)
{
_header = SweepstoreHeaderWriter(_file);
}
late final SweepstoreHeaderWriter _header;
late final SweepstoreConcurrencyHeaderWriter _concurrencyHeader = SweepstoreConcurrencyHeaderWriter(_header);
void initialise({
int concurrentWorkers = 4,
}) {
initialiseSweepstoreHeader(_file,
concurrentWorkers: concurrentWorkers,
);
_header.version = "1.1.0.1";
print("Version: ${_header.version}");
}
void operator []=(String key, dynamic value) {
spawnTicket(_file,
operation: SweepstoreTicketOperation.WRITE,
keyHash: key.hashCode,
writeSize: 0, // Placeholder
onApproved: () {
// print("Writing key: $key with hash ${key.hashCode} and value: $value");
},
debugLabel: key
);
}
}
Future<void> main() async {
String filePath = '../example.bin';
File file = File(filePath);
if (file.existsSync()) {
file.deleteSync();
}
file.createSync();
Sweepstore store = Sweepstore(filePath);
store.initialise(
concurrentWorkers: 32
);
initialiseMasterListener(file.openSync(mode: FileMode.append));
print(binaryDump(file.readAsBytesSync()));
int iteration = 0;
int maxIterations = 16;
print("Concurrent Workers: ${store._concurrencyHeader.numberOfWorkers}");
print("Stale Ticket Threshold: ${STALE_HEARTBEAT_THRESHOLD_MS}ms");
int concurrencyTest = 1;
while (true) {
final receivePort = ReceivePort();
int completedJobs = 0;
if (iteration > maxIterations) {
break;
}
final stopwatch = Stopwatch()..start();
// print('\x1B[95mStarting iteration #$iteration with $concurrencyTest concurrent jobs...\x1B[0m');
for (int i = 0; i < concurrencyTest; i++) {
await Isolate.spawn((message) {
final index = message['index'] as int;
final sendPort = message['sendPort'] as SendPort;
Sweepstore store = Sweepstore(filePath);
store['key_$index'] = 'value_$index';
sendPort.send('done');
}, {'index': i, 'sendPort': receivePort.sendPort});
}
// wait for all jobs to finish
await for (var msg in receivePort) {
completedJobs++;
if (completedJobs >= concurrencyTest) {
receivePort.close();
break;
}
}
stopwatch.stop();
print("[$iteration/$maxIterations] Completed $concurrencyTest operation in ${stopwatch.elapsedMilliseconds} ms");
iteration++;
concurrencyTest *= 2;
}
}