Files
SweepStore/dart/lib/concurrency.dart

192 lines
5.5 KiB
Dart

import 'dart:io';
import 'dart:isolate';
import 'dart:math';
import 'package:sweepstore/header.dart';
import 'package:sweepstore/helpers.dart';
import 'package:sweepstore/structures.dart';
int _randomId() {
// mix timestamp with random for better uniquness
// keep it positive to avoid signed int issues when storing
int time = DateTime.now().millisecondsSinceEpoch32();
int random = Random().nextInt(0x80000000);
return (time ^ random) & 0x7FFFFFFF;
}
// Spawn a ticket for a worker to perform an operation
void spawnTicket(RandomAccessFile file, {
required SweepstoreTicketOperation operation,
required int keyHash,
required int writeSize,
required void Function() onApproved,
String? debugLabel,
}) {
void log(String message) {
String prefix = debugLabel != null ? "\x1B[38;5;208m[Ticket Spawner - $debugLabel]:\x1B[0m " : "\x1B[38;5;208m[Ticket Spawner]:\x1B[0m ";
// print("$prefix$message");
}
void tickSleep([int microsecondVariance = 10]) {
preciseSleep(Duration(microseconds: 100 + Random().nextInt(microsecondVariance)));
}
Map<String, int> expSleepTracker = {};
void expSleep(String label) {
int count = expSleepTracker[label] ?? 0;
int sleepTime = (1 << count); // Exponential backoff
// sleepTime = max(1, min(sleepTime, 1000)); // Clamp between 1ms and 1000ms
preciseSleep(Duration(microseconds: sleepTime * 5000));
expSleepTracker[label] = count + 1;
}
// Reduce the chance of race conditions by adding a small random delay
tickSleep(100);
SweepstoreHeader header = SweepstoreHeader(file);
SweepstoreConcurrencyHeader concurrencyHeader = SweepstoreConcurrencyHeader(header);
int? ticketIndex;
int myIdentifier = _randomId();
// Try to acquire a ticket - (Acquire loop)
while (ticketIndex == null) {
for (int i = 0; i < concurrencyHeader.numberOfWorkers; i++) {
SweepstoreWorkerTicket ticket = concurrencyHeader[i];
if (!ticket.writable()) {
continue;
}
int identifier = ticket.identifier;
bool identifier_unassigned = identifier == 0;
bool stale_heartbeat = (DateTime.now().millisecondsSinceEpoch32() - ticket.workerHeartbeat) > 2000;
bool is_free = ticket.ticketState == SweepstoreTicketState.FREE;
if (identifier_unassigned && stale_heartbeat && is_free) {
ticket.write(
identifier: myIdentifier,
ticketState: SweepstoreTicketState.WAITING,
);
ticketIndex = i;
log("Acquired ticket $ticketIndex with identifier $myIdentifier.");
break;
}
}
expSleep("acquire_loop");
// Ensure we still own the ticket - if not, reset and try again
if (ticketIndex != null) {
SweepstoreWorkerTicket ticket = concurrencyHeader[ticketIndex];
if (ticket.identifier != myIdentifier) {
log("Lost ticket $ticketIndex, retrying...");
ticketIndex = null;
}
}
}
// We have a ticket, set it up
SweepstoreWorkerTicket myTicket = concurrencyHeader[ticketIndex];
myTicket.write(
workerHeartbeat: DateTime.now().millisecondsSinceEpoch32(),
ticketState: SweepstoreTicketState.WAITING,
ticketOperation: operation,
keyHash: keyHash,
writeSize: writeSize,
);
// Wait for approval - (Approval loop)
while (true) {
// Check we still own the ticket
if (myTicket.identifier != myIdentifier) {
String exceptionMessage = "CRITICAL: Lost ownership of ticket $ticketIndex, was expecting identifier $myIdentifier but found ${myTicket.identifier}.";
throw Exception(exceptionMessage);
}
if (myTicket.ticketState == SweepstoreTicketState.APPROVED) {
myTicket.write(
ticketState: SweepstoreTicketState.EXECUTING,
);
onApproved();
myTicket.write(
ticketState: SweepstoreTicketState.COMPLETED,
);
break;
}
// randomSleep(10);
tickSleep();
// Update heartbeat
if (DateTime.now().millisecondsSinceEpoch32() != myTicket.workerHeartbeat) {
myTicket.write(
workerHeartbeat: myTicket.workerHeartbeat,
);
}
}
}
// Master side
void initialiseMasterListener(RandomAccessFile file) async {
String filePath = file.path;
Isolate.spawn((_) {
void log(String message) {
// print("\x1B[38;5;82m[Master Listener]:\x1B[0m $message");
}
RandomAccessFile file = File(filePath).openSync(mode: FileMode.append);
SweepstoreHeader header = SweepstoreHeader(file);
SweepstoreConcurrencyHeader concurrencyHeader = SweepstoreConcurrencyHeader(header);
while (true) {
for (int i = 0; i < concurrencyHeader.numberOfWorkers; i++) {
SweepstoreWorkerTicket ticket = concurrencyHeader[i];
if (ticket.ticketState == SweepstoreTicketState.WAITING) {
log("Found waiting ticket $i (Key Hash: ${ticket.keyHash})...");
// Approve the ticket
ticket.write(
ticketState: SweepstoreTicketState.APPROVED,
);
log("Approved ticket $i.");
} else if (ticket.ticketState == SweepstoreTicketState.COMPLETED) {
log("Ticket $i completed. Resetting ticket...");
// Reset the ticket
ticket.write(
identifier: 0,
workerHeartbeat: 0,
ticketState: SweepstoreTicketState.FREE,
ticketOperation: SweepstoreTicketOperation.NONE,
keyHash: 0,
writeSize: 0,
);
log("Reset ticket $i.");
}
}
preciseSleep(Duration(milliseconds: 1));
}
}, null);
}