#include #include #include "sweepstore/concurrency.h" #include #include #include "sweepstore/header.h" #include "sweepstore/utils/helpers.h" #include "sweepstore/utils/file_handle.h" #include "sweepstore/utils/timing.h" uint64_t getRandomOffset(uint64_t maxValue) { static std::random_device rd; static std::mt19937_64 gen(rd()); std::uniform_int_distribution dist(0, maxValue); return dist(gen); } int randomId() { // mix timestamp with random for better uniqueness // keep it positive to avoid signed int issues when storing auto now = std::chrono::system_clock::now(); auto millis = std::chrono::duration_cast(now.time_since_epoch()).count(); int32_t time = static_cast(millis & 0xFFFFFFFF); // Get lower 32 bits int32_t random = static_cast(getRandomOffset(0x7FFFFFFF)); // 0 to 0x7FFFFFFF return (time ^ random) & 0x7FFFFFFF; } void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file, const SweepstoreTicketOperation& operation, const uint32_t keyHash, const uint32_t targetSize, const std::function onApproved, std::string debugLabel ) { SWEEPSTORE_TIME_FUNCTION(); // FileHandle now uses thread-local streams internally - no need to create new handle! // Each thread automatically gets its own fstream from the shared file handle SweepstoreFileHandle* file = new SweepstoreFileHandle(_file->getPath(), std::ios::in | std::ios::out | std::ios::binary); /* Useful Functions */ /// Logging function auto log = [&](const std::string &message) { std::string prefix = !debugLabel.empty() ? "\033[38;5;208m[Ticket Spawner - " + debugLabel + "]:\033[0m " : "\033[38;5;208m[Ticket Spawner]:\033[0m "; // debugPrint(prefix + message); }; // Sleep with variance (additive only) auto varySleep = [&](std::chrono::nanoseconds minSleepDuration, std::chrono::nanoseconds variance) { // SWEEPSTORE_TIME_SCOPE("Varying Sleep"); if (variance.count() <= 0) { preciseSleep(minSleepDuration); } else { // Generate random duration within variance uint64_t randomOffset = getRandomOffset(variance.count()); preciseSleep(minSleepDuration + std::chrono::nanoseconds(randomOffset)); } }; // Exponential sleep std::unordered_map expSleepTracker = {}; auto expSleep = [&expSleepTracker](const std::string& label) { SWEEPSTORE_TIME_SCOPE("Exponential Sleep"); int count = expSleepTracker[label]; // defaults to 0 if not found int sleepTime = (1 << count); // Exponential backoff sleepTime = std::max(1, std::min(sleepTime, 1000)); // Clamp between 1ms and 1000ms // sleepTime = 1000; preciseSleep(std::chrono::microseconds(sleepTime * 500)); expSleepTracker[label] = count + 1; }; // Get the header(s) - using the shared file handle directly SweepstoreHeader header(*file); SweepstoreConcurrencyHeader concurrencyHeader(*file); /* Ticket Acquisition */ auto acquireTicket = [&](uint32_t newIdentifier) -> SweepstoreWorkerTicket { SWEEPSTORE_TIME_SCOPE("acquireTicket"); // Reduce the chance of race condition varySleep(std::chrono::microseconds(500), std::chrono::microseconds(200)); uint32_t ticketIndex = -1u; while (true) { uint32_t concurrentWorkers = concurrencyHeader.readNumberOfWorkers(); for (uint32_t i = 0; i < concurrentWorkers; i++) { SweepstoreWorkerTicket ticket = SweepstoreWorkerTicket(i, *file); if (!ticket.writable()) { continue; } SweepstoreWorkerTicketSnapshot snapshot = ticket.snapshot(); uint32_t identifier = snapshot.identifier; bool identifier_unassigned = identifier == 0; bool stale_heartbeat = millisecondsSinceEpoch32() - snapshot.workerHeartbeat > STALE_HEARTBEAT_THRESHOLD_MS; bool is_free = snapshot.state == SweepstoreTicketState::FREE; if ((identifier_unassigned && is_free) || stale_heartbeat) { SWEEPSTORE_TIME_SCOPE("Claim Ticket"); snapshot.identifier = newIdentifier; snapshot.workerHeartbeat = millisecondsSinceEpoch32(); snapshot.state = SweepstoreTicketState::WAITING; ticket.write(snapshot); ticketIndex = i; break; } } preciseSleep(std::chrono::milliseconds(2)); // Ensure we still own the ticket - if not, reset and try again if (ticketIndex != -1u) { SweepstoreWorkerTicketSnapshot verifySnapshot = concurrencyHeader[ticketIndex].snapshot(); if (verifySnapshot.identifier != newIdentifier) { ticketIndex = -1; // Lost the ticket, try again } else { log("Acquired ticket " + std::to_string(ticketIndex) + " with identifier " + std::to_string(newIdentifier) + "."); return concurrencyHeader[ticketIndex]; } } expSleep("acquireTicket"); } }; uint32_t myIdentifier = randomId(); SweepstoreWorkerTicket myTicket = acquireTicket(myIdentifier); SweepstoreWorkerTicketSnapshot mySnapshot = myTicket.snapshot(); mySnapshot.workerHeartbeat = millisecondsSinceEpoch32(); mySnapshot.state = SweepstoreTicketState::WAITING; mySnapshot.operation = operation; mySnapshot.keyHash = keyHash; mySnapshot.targetSize = targetSize; myTicket.write(mySnapshot); // Wait for approval while (true) { SweepstoreWorkerTicketSnapshot snapshot = myTicket.snapshot(); // Update heartbeat uint32_t currentTime = millisecondsSinceEpoch32(); if (currentTime - snapshot.workerHeartbeat > 700) { snapshot.workerHeartbeat = currentTime; myTicket.write(snapshot); } // Check if we still own the ticket if (snapshot.identifier != myIdentifier) { preciseSleep(std::chrono::milliseconds(10)); // Re-verify we lost the ticket SweepstoreWorkerTicketSnapshot recheckSnapshot = myTicket.snapshot(); if (recheckSnapshot.identifier != myIdentifier) { // log("Lost ownership of ticket " + std::to_string(myTicket.getTicketIndex()) + ", was expecting identifier " + std::to_string(myIdentifier) + " but found " + std::to_string(recheckSnapshot.identifier) + "."); std::cout << "\033[38;5;82m[Ticket Spawner - " << debugLabel << "]:\033[0m Lost ticket " << myTicket.getTicketIndex() << ", respawning..." << std::endl; // ReSharper disable once CppDFAInfiniteRecursion spawnTicket( _file, operation, keyHash, targetSize, onApproved, debugLabel ); break; } // False alarm, continue waiting // log("False alarm, still own ticket " + std::to_string(myTicket.getTicketIndex()) + "."); std::cout << "\033[38;5;82m[Ticket Spawner - " << debugLabel << "]:\033[0m False alarm, still own ticket " << myTicket.getTicketIndex() << "." << std::endl; snapshot = recheckSnapshot; } if (snapshot.state == SweepstoreTicketState::APPROVED) { snapshot.state = SweepstoreTicketState::EXECUTING; myTicket.write(snapshot); onApproved(); snapshot.state = SweepstoreTicketState::COMPLETED; myTicket.write(snapshot); break; } varySleep(std::chrono::microseconds(500), std::chrono::microseconds(200)); } // std::cout << "\033[38;5;82m[Ticket Spawner - " << debugLabel << "]:\033[0m Completed ticket " << myTicket.getTicketIndex() << "." << std::endl; delete file; } void SweepstoreConcurrency::initialiseMaster(std::string filePath) { SWEEPSTORE_TIME_FUNCTION(); auto log = [&](const std::string &message) { debugPrint(("\033[38;5;33m[Concurrency Master]:\033[0m " + message).c_str()); }; SweepstoreFileHandle file(filePath, std::ios::binary | std::ios::in | std::ios::out); SweepstoreHeader header(file); SweepstoreConcurrencyHeader concurrencyHeader(file); std::cout << "[Master] Starting master loop" << std::endl; while (true) { int concurrentWorkers = concurrencyHeader.readNumberOfWorkers(); for (uint32_t i = 0; i < concurrentWorkers; i++) { SweepstoreWorkerTicket ticket(i, file); SweepstoreWorkerTicketSnapshot snapshot = ticket.snapshot(); if (snapshot.state == WAITING) { log("Found waiting ticket " + std::to_string(i) + "(Key Hash: " + std::to_string(snapshot.keyHash) + ")..."); // Approve the ticket snapshot.state = APPROVED; ticket.write(snapshot); log("Approved ticket " + std::to_string(i) + "."); } else if (snapshot.state == SweepstoreTicketState::COMPLETED) { log("Ticket " + std::to_string(i) + " has completed. Resetting..."); // Reset the ticket SweepstoreWorkerTicketSnapshot cleanSnapshot = SweepstoreWorkerTicketSnapshot(); ticket.write(cleanSnapshot); log("Reset ticket " + std::to_string(i) + "."); } } continue; // Cleanup any stale tickets uint32_t currentTime = millisecondsSinceEpoch32(); for (uint32_t i = 0; i < concurrentWorkers; i++) { SweepstoreWorkerTicket ticket(i, file); SweepstoreWorkerTicketSnapshot snapshot = ticket.snapshot(); if (snapshot.state != SweepstoreTicketState::FREE) { continue; } bool stale_heartbeat = currentTime - snapshot.workerHeartbeat > STALE_HEARTBEAT_THRESHOLD_MS; if (stale_heartbeat) { SweepstoreWorkerTicketSnapshot cleanSnapshot = SweepstoreWorkerTicketSnapshot(); ticket.write(cleanSnapshot); log("Stale heartbeat " + std::to_string(i) + "."); } } preciseSleep(std::chrono::milliseconds(1)); } }