Add concurrency handling implementation with ticket management and file locking
This commit is contained in:
248
cpp/src/Private/sweepstore/concurrency.cpp
Normal file
248
cpp/src/Private/sweepstore/concurrency.cpp
Normal file
@@ -0,0 +1,248 @@
|
||||
|
||||
|
||||
#include <functional>
|
||||
#include <iosfwd>
|
||||
|
||||
#include "sweepstore/concurrency.h"
|
||||
|
||||
#include <iostream>
|
||||
#include <random>
|
||||
|
||||
#include "sweepstore/header.h"
|
||||
#include "sweepstore/utils/helpers.h"
|
||||
#include "sweepstore/utils/file_handle.h"
|
||||
|
||||
|
||||
uint64_t getRandomOffset(uint64_t maxValue) {
|
||||
static std::random_device rd;
|
||||
static std::mt19937_64 gen(rd());
|
||||
std::uniform_int_distribution<uint64_t> dist(0, maxValue);
|
||||
return dist(gen);
|
||||
}
|
||||
|
||||
int randomId() {
|
||||
// mix timestamp with random for better uniqueness
|
||||
// keep it positive to avoid signed int issues when storing
|
||||
auto now = std::chrono::system_clock::now();
|
||||
auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
|
||||
int32_t time = static_cast<int32_t>(millis & 0xFFFFFFFF); // Get lower 32 bits
|
||||
int32_t random = static_cast<int32_t>(getRandomOffset(0x7FFFFFFF)); // 0 to 0x7FFFFFFF
|
||||
return (time ^ random) & 0x7FFFFFFF;
|
||||
}
|
||||
|
||||
void SweepstoreConcurrency::spawnTicket(std::string filePath,
|
||||
const SweepstoreTicketOperation& operation,
|
||||
const uint32_t keyHash,
|
||||
const uint32_t targetSize,
|
||||
const std::function<void()> onApproved,
|
||||
std::string debugLabel
|
||||
) {
|
||||
|
||||
SweepstoreFileHandle file(filePath, std::ios::binary | std::ios::in | std::ios::out);
|
||||
|
||||
/*
|
||||
Useful Functions
|
||||
*/
|
||||
|
||||
/// Logging function
|
||||
auto log = [&](const std::string &message) {
|
||||
std::string prefix = !debugLabel.empty() ? "\033[38;5;208m[Ticket Spawner - " + debugLabel + "]:\033[0m " : "\033[38;5;208m[Ticket Spawner]:\033[0m ";
|
||||
debugPrint(prefix + message);
|
||||
};
|
||||
|
||||
// Sleep with variance (additive only)
|
||||
auto varySleep = [&](std::chrono::nanoseconds minSleepDuration, std::chrono::nanoseconds variance) {
|
||||
if (variance.count() <= 0) {
|
||||
preciseSleep(minSleepDuration);
|
||||
} else {
|
||||
// Generate random duration within variance
|
||||
uint64_t randomOffset = getRandomOffset(variance.count());
|
||||
preciseSleep(minSleepDuration + std::chrono::nanoseconds(randomOffset));
|
||||
}
|
||||
};
|
||||
|
||||
// Exponential sleep
|
||||
std::unordered_map<std::string, int> expSleepTracker = {};
|
||||
auto expSleep = [&expSleepTracker](const std::string& label) {
|
||||
int count = expSleepTracker[label]; // defaults to 0 if not found
|
||||
int sleepTime = (1 << count); // Exponential backoff
|
||||
sleepTime = std::max(1, std::min(sleepTime, 1000)); // Clamp between 1ms and 1000ms
|
||||
preciseSleep(std::chrono::microseconds(sleepTime * 5000));
|
||||
expSleepTracker[label] = count + 1;
|
||||
};
|
||||
|
||||
// Get the header(s)
|
||||
SweepstoreHeader header(file);
|
||||
SweepstoreConcurrencyHeader concurrencyHeader(file);
|
||||
|
||||
/*
|
||||
Ticket Acquisition
|
||||
*/
|
||||
auto acquireTicket = [&](uint32_t newIdentifier) -> SweepstoreWorkerTicket {
|
||||
|
||||
// Reduce the chance of race condition
|
||||
varySleep(std::chrono::microseconds(500), std::chrono::microseconds(200));
|
||||
|
||||
uint32_t ticketIndex = -1u;
|
||||
|
||||
while (true) {
|
||||
|
||||
uint32_t concurrentWorkers = concurrencyHeader.readNumberOfWorkers();
|
||||
|
||||
for (uint32_t i = 0; i < concurrentWorkers; i++) {
|
||||
|
||||
SweepstoreWorkerTicket ticket = SweepstoreWorkerTicket(i, file);
|
||||
|
||||
if (!ticket.writable()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SweepstoreWorkerTicketSnapshot snapshot = ticket.snapshot();
|
||||
|
||||
int identifier = snapshot.identifier;
|
||||
|
||||
bool identifier_unassigned = identifier == 0;
|
||||
bool stale_heartbeat = millisecondsSinceEpoch32() - snapshot.workerHeartbeat > STALE_HEARTBEAT_THRESHOLD_MS;
|
||||
bool is_free = snapshot.state == SweepstoreTicketState::FREE;
|
||||
|
||||
if (identifier_unassigned && stale_heartbeat && is_free) {
|
||||
snapshot.identifier = newIdentifier;
|
||||
snapshot.workerHeartbeat = millisecondsSinceEpoch32();
|
||||
snapshot.state = SweepstoreTicketState::WAITING;
|
||||
ticket.write(snapshot);
|
||||
ticketIndex = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
preciseSleep(std::chrono::milliseconds(2));
|
||||
|
||||
// Ensure we still own the ticket - if not, reset and try again
|
||||
if (ticketIndex != -1u) {
|
||||
SweepstoreWorkerTicketSnapshot verifySnapshot = concurrencyHeader[ticketIndex].snapshot();
|
||||
|
||||
if (verifySnapshot.identifier != newIdentifier) {
|
||||
ticketIndex = -1; // Lost the ticket, try again
|
||||
} else {
|
||||
log("Acquired ticket " + std::to_string(ticketIndex) + " with identifier " + std::to_string(newIdentifier) + ".");
|
||||
return concurrencyHeader[ticketIndex];
|
||||
}
|
||||
}
|
||||
|
||||
expSleep("acquireTicket");
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
|
||||
uint32_t myIdentifier = randomId();
|
||||
|
||||
SweepstoreWorkerTicket myTicket = acquireTicket(myIdentifier);
|
||||
SweepstoreWorkerTicketSnapshot mySnapshot = myTicket.snapshot();
|
||||
mySnapshot.workerHeartbeat = millisecondsSinceEpoch32();
|
||||
mySnapshot.state = SweepstoreTicketState::WAITING;
|
||||
mySnapshot.operation = operation;
|
||||
mySnapshot.keyHash = keyHash;
|
||||
mySnapshot.targetSize = targetSize;
|
||||
myTicket.write(mySnapshot);
|
||||
|
||||
// Wait for approval
|
||||
while (true) {
|
||||
|
||||
SweepstoreWorkerTicketSnapshot snapshot = myTicket.snapshot();
|
||||
|
||||
// Update heartbeat
|
||||
uint32_t currentTime = millisecondsSinceEpoch32();
|
||||
if (currentTime - snapshot.workerHeartbeat > 700) {
|
||||
snapshot.workerHeartbeat = currentTime;
|
||||
myTicket.write(snapshot);
|
||||
}
|
||||
|
||||
// Check if we still own the ticket
|
||||
if (snapshot.identifier != myIdentifier) {
|
||||
|
||||
preciseSleep(std::chrono::milliseconds(10));
|
||||
|
||||
// Re-verify we lost the ticket
|
||||
SweepstoreWorkerTicketSnapshot recheckSnapshot = myTicket.snapshot();
|
||||
if (recheckSnapshot.identifier != myIdentifier) {
|
||||
log("Lost ownership of ticket " + std::to_string(myTicket.getTicketIndex()) + ", was expecting identifier " + std::to_string(myIdentifier) + " but found " + std::to_string(recheckSnapshot.identifier) + ".");
|
||||
|
||||
// ReSharper disable once CppDFAInfiniteRecursion
|
||||
return spawnTicket(
|
||||
filePath,
|
||||
operation,
|
||||
keyHash,
|
||||
targetSize,
|
||||
onApproved,
|
||||
debugLabel
|
||||
);
|
||||
}
|
||||
|
||||
// False alarm, continue waiting
|
||||
log("False alarm, still own ticket " + std::to_string(myTicket.getTicketIndex()) + ".");
|
||||
snapshot = recheckSnapshot;
|
||||
}
|
||||
|
||||
if (snapshot.state == SweepstoreTicketState::APPROVED) {
|
||||
snapshot.state = SweepstoreTicketState::EXECUTING;
|
||||
myTicket.write(snapshot);
|
||||
|
||||
onApproved();
|
||||
|
||||
snapshot.state = SweepstoreTicketState::COMPLETED;
|
||||
myTicket.write(snapshot);
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
varySleep(std::chrono::microseconds(500), std::chrono::microseconds(200));
|
||||
}
|
||||
|
||||
// std::cout << "\033[38;5;82m[Ticket Spawner - " << debugLabel << "]:\033[0m Completed ticket " << myTicket.getTicketIndex() << "." << std::endl;
|
||||
}
|
||||
|
||||
void SweepstoreConcurrency::initialiseMaster(std::string filePath) {
|
||||
|
||||
auto log = [&](const std::string &message) {
|
||||
debugPrint("\033[38;5;33m[Concurrency Master]:\033[0m " + message);
|
||||
};
|
||||
|
||||
SweepstoreFileHandle file(filePath, std::ios::binary | std::ios::in | std::ios::out);
|
||||
|
||||
SweepstoreHeader header(file);
|
||||
SweepstoreConcurrencyHeader concurrencyHeader(file);
|
||||
|
||||
while (true) {
|
||||
|
||||
int concurrentWorkers = concurrencyHeader.readNumberOfWorkers();
|
||||
|
||||
for (uint32_t i = 0; i < concurrentWorkers; i++) {
|
||||
|
||||
SweepstoreWorkerTicket ticket(i, file);
|
||||
SweepstoreWorkerTicketSnapshot snapshot = ticket.snapshot();
|
||||
|
||||
if (snapshot.state == WAITING) {
|
||||
log("Found waiting ticket " + std::to_string(i) + "(Key Hash: " + std::to_string(snapshot.keyHash) + ")...");
|
||||
|
||||
// Approve the ticket
|
||||
snapshot.state = APPROVED;
|
||||
ticket.write(snapshot);
|
||||
log("Approved ticket " + std::to_string(i) + ".");
|
||||
} else if (snapshot.state == SweepstoreTicketState::COMPLETED) {
|
||||
log("Ticket " + std::to_string(i) + " has completed. Resetting...");
|
||||
|
||||
// Reset the ticket
|
||||
SweepstoreWorkerTicketSnapshot cleanSnapshot = SweepstoreWorkerTicketSnapshot();
|
||||
ticket.write(cleanSnapshot);
|
||||
log("Reset ticket " + std::to_string(i) + ".");
|
||||
}
|
||||
|
||||
// Handle stale tickets
|
||||
uint32_t currentTime = millisecondsSinceEpoch32();
|
||||
}
|
||||
|
||||
preciseSleep(std::chrono::milliseconds(1));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
220
cpp/src/Private/sweepstore/header.cpp
Normal file
220
cpp/src/Private/sweepstore/header.cpp
Normal file
@@ -0,0 +1,220 @@
|
||||
|
||||
#include "sweepstore/header.h"
|
||||
|
||||
#include "sweepstore/utils/file_lock.h"
|
||||
#include "sweepstore/utils/helpers.h"
|
||||
|
||||
std::string SweepstoreHeader::readMagicNumber() {
|
||||
file->seekg(0, std::ios::beg);
|
||||
char buffer[4];
|
||||
file->read(buffer, 4);
|
||||
return std::string(buffer, 4);
|
||||
}
|
||||
|
||||
void SweepstoreHeader::writeMagicNumber(const std::string& magicNumber) {
|
||||
if (magicNumber.size() != 4) {
|
||||
throw std::invalid_argument("Magic number must be exactly 4 characters long.");
|
||||
}
|
||||
file->seekp(0, std::ios::beg);
|
||||
file->write(magicNumber.c_str(), 4);
|
||||
}
|
||||
|
||||
std::string SweepstoreHeader::readVersion() {
|
||||
file->seekg(4, std::ios::beg);
|
||||
char buffer[12];
|
||||
file->read(buffer, 12);
|
||||
|
||||
// Trim leading and trailing spaces
|
||||
std::string version(buffer, 12);
|
||||
version = trim(version);
|
||||
|
||||
return version;
|
||||
}
|
||||
|
||||
void SweepstoreHeader::writeVersion(const std::string& version) {
|
||||
if (version.size() > 11) {
|
||||
throw std::invalid_argument("Version string must be at most 11 characters long.");
|
||||
}
|
||||
// Pad 1 space to the beginning
|
||||
// And pad to end to make it 12 bytes
|
||||
std::string paddedVersion = " " + version;
|
||||
paddedVersion.resize(12, ' ');
|
||||
|
||||
file->seekp(4, std::ios::beg);
|
||||
file->write(paddedVersion.c_str(), 12);
|
||||
}
|
||||
|
||||
SweepstorePointer SweepstoreHeader::readAddressTablePointer() {
|
||||
file->seekg(16, std::ios::beg);
|
||||
int64_t address;
|
||||
file->read(reinterpret_cast<char*>(&address), sizeof(address));
|
||||
return address; // Implicit conversion to SweepstorePointer
|
||||
}
|
||||
|
||||
void SweepstoreHeader::writeAddressTablePointer(const SweepstorePointer& ptr) {
|
||||
file->seekp(16, std::ios::beg);
|
||||
int64_t address = ptr;
|
||||
file->write(reinterpret_cast<const char*>(&address), sizeof(address));
|
||||
}
|
||||
|
||||
uint32_t SweepstoreHeader::readFreeListCount() {
|
||||
file->seekg(24, std::ios::beg);
|
||||
uint32_t count;
|
||||
file->read(reinterpret_cast<char*>(&count), sizeof(count));
|
||||
return count;
|
||||
}
|
||||
|
||||
void SweepstoreHeader::writeFreeListCount(uint32_t count) {
|
||||
file->seekp(24, std::ios::beg);
|
||||
file->write(reinterpret_cast<const char*>(&count), sizeof(count));
|
||||
}
|
||||
|
||||
bool SweepstoreHeader::readIsFreeListLifted() {
|
||||
file->seekg(28, std::ios::beg);
|
||||
char flag;
|
||||
file->read(&flag, sizeof(flag));
|
||||
return flag != 0;
|
||||
}
|
||||
|
||||
void SweepstoreHeader::writeIsFreeListLifted(bool isLifted) {
|
||||
file->seekp(28, std::ios::beg);
|
||||
char flag = isLifted ? 1 : 0;
|
||||
file->write(&flag, sizeof(flag));
|
||||
}
|
||||
|
||||
void SweepstoreHeader::initialise() {
|
||||
writeMagicNumber("SWPT");
|
||||
writeVersion("undefined");
|
||||
writeAddressTablePointer(SweepstorePointer::NULL_PTR);
|
||||
writeFreeListCount(0);
|
||||
writeIsFreeListLifted(false);
|
||||
file->flush();
|
||||
}
|
||||
|
||||
uint64_t SweepstoreConcurrencyHeader::readMasterIdentifier() {
|
||||
file->seekg(29, std::ios::beg);
|
||||
uint64_t identifier;
|
||||
file->read(reinterpret_cast<char*>(&identifier), sizeof(identifier));
|
||||
return identifier;
|
||||
}
|
||||
|
||||
void SweepstoreConcurrencyHeader::writeMasterIdentifier(uint64_t identifier) {
|
||||
file->seekp(29, std::ios::beg);
|
||||
file->write(reinterpret_cast<const char*>(&identifier), sizeof(identifier));
|
||||
}
|
||||
|
||||
uint32_t SweepstoreConcurrencyHeader::readMasterHeartbeat() {
|
||||
file->seekg(37, std::ios::beg);
|
||||
uint32_t heartbeat;
|
||||
file->read(reinterpret_cast<char*>(&heartbeat), sizeof(heartbeat));
|
||||
return heartbeat;
|
||||
}
|
||||
|
||||
void SweepstoreConcurrencyHeader::writeMasterHeartbeat(uint32_t heartbeat) {
|
||||
file->seekp(37, std::ios::beg);
|
||||
file->write(reinterpret_cast<const char*>(&heartbeat), sizeof(heartbeat));
|
||||
}
|
||||
|
||||
uint32_t SweepstoreConcurrencyHeader::readNumberOfWorkers() {
|
||||
file->seekg(41, std::ios::beg);
|
||||
uint32_t numWorkers;
|
||||
file->read(reinterpret_cast<char*>(&numWorkers), sizeof(numWorkers));
|
||||
return numWorkers;
|
||||
}
|
||||
|
||||
void SweepstoreConcurrencyHeader::writeNumberOfWorkers(uint32_t numWorkers) {
|
||||
file->seekp(41, std::ios::beg);
|
||||
file->write(reinterpret_cast<const char*>(&numWorkers), sizeof(numWorkers));
|
||||
}
|
||||
|
||||
bool SweepstoreConcurrencyHeader::readIsReadAllowed() {
|
||||
file->seekg(45, std::ios::beg);
|
||||
char flag;
|
||||
file->read(&flag, sizeof(flag));
|
||||
return flag != 0;
|
||||
}
|
||||
|
||||
void SweepstoreConcurrencyHeader::writeIsReadAllowed(bool isAllowed) {
|
||||
file->seekp(45, std::ios::beg);
|
||||
char flag = isAllowed ? 1 : 0;
|
||||
file->write(&flag, sizeof(flag));
|
||||
}
|
||||
|
||||
void SweepstoreConcurrencyHeader::initialise(int concurrentWorkers) {
|
||||
writeMasterIdentifier(0);
|
||||
writeMasterHeartbeat(0);
|
||||
writeNumberOfWorkers(concurrentWorkers);
|
||||
writeIsReadAllowed(true);
|
||||
for (uint32_t i = 0; i < readNumberOfWorkers(); i++) {
|
||||
SweepstoreWorkerTicketSnapshot ticket = SweepstoreWorkerTicketSnapshot();
|
||||
ticket.identifier = 0;
|
||||
ticket.workerHeartbeat = 0;
|
||||
ticket.state = SweepstoreTicketState::FREE;
|
||||
ticket.operation = SweepstoreTicketOperation::NONE;
|
||||
ticket.keyHash = 0;
|
||||
ticket.targetAddress = SweepstorePointer::NULL_PTR;
|
||||
ticket.targetSize = 0;
|
||||
|
||||
SweepstoreWorkerTicket ticketWriter = SweepstoreWorkerTicket(i, file);
|
||||
ticketWriter.write(ticket);
|
||||
}
|
||||
file->flush();
|
||||
}
|
||||
|
||||
void SweepstoreWorkerTicket::write(SweepstoreWorkerTicketSnapshot &snapshot) {
|
||||
RandomAccessMemory buffer;
|
||||
|
||||
SweepstoreFileLock lock(file.getPath(), SweepstoreFileLock::Mode::Exclusive);
|
||||
SweepstoreFileLock::Scoped scopedLock(lock);
|
||||
|
||||
buffer.setPositionSync(0);
|
||||
buffer.writeIntSync(snapshot.identifier, 4);
|
||||
buffer.writeIntSync(snapshot.workerHeartbeat, 4);
|
||||
buffer.writeIntSync(static_cast<uint8_t>(snapshot.state), 1);
|
||||
buffer.writeIntSync(static_cast<uint8_t>(snapshot.operation), 1);
|
||||
buffer.writeUIntSync(snapshot.keyHash, 8);
|
||||
buffer.writePointerSync(snapshot.targetAddress, 8);
|
||||
buffer.writeUIntSync(snapshot.targetSize, 4);
|
||||
|
||||
// Pad the rest with zeros if necessary
|
||||
while (buffer.length() < TICKET_SIZE) {
|
||||
buffer.writeIntSync(0, 1);
|
||||
}
|
||||
|
||||
// Prepare data
|
||||
buffer.setPositionSync(0);
|
||||
std::vector<uint8_t> data = buffer.readSync(buffer.length());
|
||||
char* dataPtr = reinterpret_cast<char*>(data.data());
|
||||
|
||||
// Write to file
|
||||
file->seekp(getOffset());
|
||||
file->write(dataPtr, data.size());
|
||||
file->flush();
|
||||
}
|
||||
|
||||
bool SweepstoreWorkerTicket::writable() {
|
||||
SweepstoreFileLock lock(file.getPath(), SweepstoreFileLock::Mode::Exclusive);
|
||||
return lock.isLocked() == false;
|
||||
}
|
||||
|
||||
SweepstoreWorkerTicketSnapshot SweepstoreWorkerTicket::snapshot() {
|
||||
SweepstoreFileLock lock(file.getPath(), SweepstoreFileLock::Mode::Shared);
|
||||
lock.lock();
|
||||
file->seekg(getOffset());
|
||||
std::unique_ptr<char[]> buffer(new char[TICKET_SIZE]);
|
||||
file->read(buffer.get(), TICKET_SIZE);
|
||||
lock.unlock();
|
||||
RandomAccessMemory ram(reinterpret_cast<uint8_t*>(buffer.get()), TICKET_SIZE);
|
||||
|
||||
SweepstoreWorkerTicketSnapshot snapshot;
|
||||
ram.setPositionSync(0);
|
||||
snapshot.identifier = ram.readUIntSync(4);
|
||||
snapshot.workerHeartbeat = ram.readUIntSync(4);
|
||||
snapshot.state = static_cast<SweepstoreTicketState>(ram.readUIntSync(1));
|
||||
snapshot.operation = static_cast<SweepstoreTicketOperation>(ram.readUIntSync(1));
|
||||
snapshot.keyHash = ram.readUIntSync(8);
|
||||
snapshot.targetAddress = ram.readPointerSync(8);
|
||||
snapshot.targetSize = ram.readUIntSync(4);
|
||||
|
||||
return snapshot;
|
||||
}
|
||||
13
cpp/src/Private/sweepstore/structures.cpp
Normal file
13
cpp/src/Private/sweepstore/structures.cpp
Normal file
@@ -0,0 +1,13 @@
|
||||
|
||||
|
||||
|
||||
#include "sweepstore/structures.h"
|
||||
const SweepstorePointer SweepstorePointer::NULL_PTR = SweepstorePointer(UINT64_MAX);
|
||||
|
||||
bool SweepstorePointer::operator==(const SweepstorePointer &p) {
|
||||
if (this->address == p.address) {
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
21
cpp/src/Private/sweepstore/sweepstore.cpp
Normal file
21
cpp/src/Private/sweepstore/sweepstore.cpp
Normal file
@@ -0,0 +1,21 @@
|
||||
//
|
||||
// Created by Benjamin Watt on 24/11/2025.
|
||||
//
|
||||
|
||||
#include "sweepstore/sweepstore.h"
|
||||
|
||||
#include <string>
|
||||
#include <filesystem>
|
||||
#include <iostream>
|
||||
#include <thread>
|
||||
|
||||
#include "sweepstore/utils/helpers.h"
|
||||
#include "sweepstore/utils/file_handle.h"
|
||||
|
||||
void Sweepstore::initialise(int concurrentWorkers) {
|
||||
header->initialise();
|
||||
header->writeVersion("1.1.0.2");
|
||||
concurrencyHeader->initialise(concurrentWorkers);
|
||||
|
||||
debugPrint("Version: " + header->readVersion());
|
||||
}
|
||||
Reference in New Issue
Block a user