192 lines
5.5 KiB
Dart
192 lines
5.5 KiB
Dart
|
|
|
|
import 'dart:io';
|
|
import 'dart:isolate';
|
|
import 'dart:math';
|
|
|
|
import 'package:sweepstore/header.dart';
|
|
import 'package:sweepstore/helpers.dart';
|
|
import 'package:sweepstore/structures.dart';
|
|
|
|
int _randomId() {
|
|
// mix timestamp with random for better uniquness
|
|
// keep it positive to avoid signed int issues when storing
|
|
int time = DateTime.now().millisecondsSinceEpoch32();
|
|
int random = Random().nextInt(0x80000000);
|
|
return (time ^ random) & 0x7FFFFFFF;
|
|
}
|
|
|
|
// Spawn a ticket for a worker to perform an operation
|
|
void spawnTicket(RandomAccessFile file, {
|
|
required SweepstoreTicketOperation operation,
|
|
required int keyHash,
|
|
required int writeSize,
|
|
required void Function() onApproved,
|
|
String? debugLabel,
|
|
}) {
|
|
|
|
void log(String message) {
|
|
|
|
String prefix = debugLabel != null ? "\x1B[38;5;208m[Ticket Spawner - $debugLabel]:\x1B[0m " : "\x1B[38;5;208m[Ticket Spawner]:\x1B[0m ";
|
|
// print("$prefix$message");
|
|
|
|
}
|
|
void tickSleep([int microsecondVariance = 10]) {
|
|
preciseSleep(Duration(microseconds: 100 + Random().nextInt(microsecondVariance)));
|
|
}
|
|
Map<String, int> expSleepTracker = {};
|
|
void expSleep(String label) {
|
|
int count = expSleepTracker[label] ?? 0;
|
|
int sleepTime = (1 << count); // Exponential backoff
|
|
// sleepTime = max(1, min(sleepTime, 1000)); // Clamp between 1ms and 1000ms
|
|
preciseSleep(Duration(microseconds: sleepTime * 1000));
|
|
expSleepTracker[label] = count + 1;
|
|
}
|
|
|
|
// Reduce the chance of race conditions by adding a small random delay
|
|
tickSleep(100);
|
|
|
|
SweepstoreHeader header = SweepstoreHeader(file);
|
|
SweepstoreConcurrencyHeader concurrencyHeader = SweepstoreConcurrencyHeader(header);
|
|
|
|
int? ticketIndex;
|
|
int myIdentifier = _randomId();
|
|
|
|
// Try to acquire a ticket - (Acquire loop)
|
|
while (ticketIndex == null) {
|
|
|
|
for (int i = 0; i < concurrencyHeader.numberOfWorkers; i++) {
|
|
|
|
SweepstoreWorkerTicket ticket = concurrencyHeader[i];
|
|
|
|
if (!ticket.writable()) {
|
|
continue;
|
|
}
|
|
|
|
int identifier = ticket.identifier;
|
|
|
|
bool identifier_unassigned = identifier == 0;
|
|
bool stale_heartbeat = (DateTime.now().millisecondsSinceEpoch32() - ticket.workerHeartbeat) > 2000;
|
|
bool is_free = ticket.ticketState == SweepstoreTicketState.FREE;
|
|
|
|
if (identifier_unassigned && stale_heartbeat && is_free) {
|
|
ticket.write(
|
|
identifier: myIdentifier,
|
|
ticketState: SweepstoreTicketState.WAITING,
|
|
);
|
|
ticketIndex = i;
|
|
log("Acquired ticket $ticketIndex with identifier $myIdentifier.");
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
expSleep("acquire_loop");
|
|
|
|
|
|
// Ensure we still own the ticket - if not, reset and try again
|
|
if (ticketIndex != null) {
|
|
SweepstoreWorkerTicket ticket = concurrencyHeader[ticketIndex];
|
|
if (ticket.identifier != myIdentifier) {
|
|
log("Lost ticket $ticketIndex, retrying...");
|
|
ticketIndex = null;
|
|
}
|
|
}
|
|
}
|
|
|
|
// We have a ticket, set it up
|
|
SweepstoreWorkerTicket myTicket = concurrencyHeader[ticketIndex];
|
|
myTicket.write(
|
|
workerHeartbeat: DateTime.now().millisecondsSinceEpoch32(),
|
|
ticketState: SweepstoreTicketState.WAITING,
|
|
ticketOperation: operation,
|
|
keyHash: keyHash,
|
|
writeSize: writeSize,
|
|
);
|
|
|
|
// Wait for approval - (Approval loop)
|
|
while (true) {
|
|
|
|
// Check we still own the ticket
|
|
if (myTicket.identifier != myIdentifier) {
|
|
String exceptionMessage = "CRITICAL: Lost ownership of ticket $ticketIndex, was expecting identifier $myIdentifier but found ${myTicket.identifier}.";
|
|
throw Exception(exceptionMessage);
|
|
}
|
|
|
|
if (myTicket.ticketState == SweepstoreTicketState.APPROVED) {
|
|
myTicket.write(
|
|
ticketState: SweepstoreTicketState.EXECUTING,
|
|
);
|
|
onApproved();
|
|
myTicket.write(
|
|
ticketState: SweepstoreTicketState.COMPLETED,
|
|
);
|
|
|
|
break;
|
|
}
|
|
|
|
// randomSleep(10);
|
|
tickSleep();
|
|
|
|
// Update heartbeat
|
|
if (DateTime.now().millisecondsSinceEpoch32() != myTicket.workerHeartbeat) {
|
|
myTicket.write(
|
|
workerHeartbeat: myTicket.workerHeartbeat,
|
|
);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
// Master side
|
|
void initialiseMasterListener(RandomAccessFile file) async {
|
|
String filePath = file.path;
|
|
Isolate.spawn((_) {
|
|
|
|
void log(String message) {
|
|
// print("\x1B[38;5;82m[Master Listener]:\x1B[0m $message");
|
|
}
|
|
|
|
RandomAccessFile file = File(filePath).openSync(mode: FileMode.append);
|
|
|
|
SweepstoreHeader header = SweepstoreHeader(file);
|
|
SweepstoreConcurrencyHeader concurrencyHeader = SweepstoreConcurrencyHeader(header);
|
|
|
|
while (true) {
|
|
|
|
for (int i = 0; i < concurrencyHeader.numberOfWorkers; i++) {
|
|
|
|
SweepstoreWorkerTicket ticket = concurrencyHeader[i];
|
|
|
|
if (ticket.ticketState == SweepstoreTicketState.WAITING) {
|
|
log("Found waiting ticket $i (Key Hash: ${ticket.keyHash})...");
|
|
|
|
// Approve the ticket
|
|
ticket.write(
|
|
ticketState: SweepstoreTicketState.APPROVED,
|
|
);
|
|
log("Approved ticket $i.");
|
|
} else if (ticket.ticketState == SweepstoreTicketState.COMPLETED) {
|
|
log("Ticket $i completed. Resetting ticket...");
|
|
// Reset the ticket
|
|
ticket.write(
|
|
identifier: 0,
|
|
workerHeartbeat: 0,
|
|
ticketState: SweepstoreTicketState.FREE,
|
|
ticketOperation: SweepstoreTicketOperation.NONE,
|
|
keyHash: 0,
|
|
writeSize: 0,
|
|
);
|
|
log("Reset ticket $i.");
|
|
}
|
|
|
|
}
|
|
|
|
preciseSleep(Duration(milliseconds: 1));
|
|
|
|
}
|
|
}, null);
|
|
} |