From 08369358888182d75b1bd591cc19e9fe028ee849 Mon Sep 17 00:00:00 2001 From: ImBenji Date: Thu, 11 Dec 2025 17:40:35 +0000 Subject: [PATCH] Fixed windows deadlocks, performance is shit tho --- cpp/src/Private/sweepstore/concurrency.cpp | 28 +++++++++++++++++---- cpp/src/Public/sweepstore/utils/file_lock.h | 3 +++ 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/cpp/src/Private/sweepstore/concurrency.cpp b/cpp/src/Private/sweepstore/concurrency.cpp index 64807d4..05bef90 100644 --- a/cpp/src/Private/sweepstore/concurrency.cpp +++ b/cpp/src/Private/sweepstore/concurrency.cpp @@ -107,13 +107,13 @@ void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file, SweepstoreWorkerTicketSnapshot snapshot = ticket.snapshot(); - int identifier = snapshot.identifier; + uint32_t identifier = snapshot.identifier; bool identifier_unassigned = identifier == 0; bool stale_heartbeat = millisecondsSinceEpoch32() - snapshot.workerHeartbeat > STALE_HEARTBEAT_THRESHOLD_MS; bool is_free = snapshot.state == SweepstoreTicketState::FREE; - if (identifier_unassigned && stale_heartbeat && is_free) { + if ((identifier_unassigned && is_free) || stale_heartbeat) { SWEEPSTORE_TIME_SCOPE("Claim Ticket"); snapshot.identifier = newIdentifier; snapshot.workerHeartbeat = millisecondsSinceEpoch32(); @@ -218,7 +218,7 @@ void SweepstoreConcurrency::initialiseMaster(std::string filePath) { SWEEPSTORE_TIME_FUNCTION(); auto log = [&](const std::string &message) { - debugPrint("\033[38;5;33m[Concurrency Master]:\033[0m " + message); + debugPrint(("\033[38;5;33m[Concurrency Master]:\033[0m " + message).c_str()); }; SweepstoreFileHandle file(filePath, std::ios::binary | std::ios::in | std::ios::out); @@ -252,9 +252,27 @@ void SweepstoreConcurrency::initialiseMaster(std::string filePath) { ticket.write(cleanSnapshot); log("Reset ticket " + std::to_string(i) + "."); } + } - // Handle stale tickets - uint32_t currentTime = millisecondsSinceEpoch32(); + continue; + + // Cleanup any stale tickets + uint32_t currentTime = millisecondsSinceEpoch32(); + for (uint32_t i = 0; i < concurrentWorkers; i++) { + SweepstoreWorkerTicket ticket(i, file); + SweepstoreWorkerTicketSnapshot snapshot = ticket.snapshot(); + + if (snapshot.state != SweepstoreTicketState::FREE) { + continue; + } + + bool stale_heartbeat = currentTime - snapshot.workerHeartbeat > STALE_HEARTBEAT_THRESHOLD_MS; + + if (stale_heartbeat) { + SweepstoreWorkerTicketSnapshot cleanSnapshot = SweepstoreWorkerTicketSnapshot(); + ticket.write(cleanSnapshot); + log("Stale heartbeat " + std::to_string(i) + "."); + } } preciseSleep(std::chrono::milliseconds(1)); diff --git a/cpp/src/Public/sweepstore/utils/file_lock.h b/cpp/src/Public/sweepstore/utils/file_lock.h index 12b7f31..4a60980 100644 --- a/cpp/src/Public/sweepstore/utils/file_lock.h +++ b/cpp/src/Public/sweepstore/utils/file_lock.h @@ -84,6 +84,9 @@ private: if (it->second == Mode::Shared && mode == Mode::Exclusive) { releaseInternal(); // Release the old shared lock activeLocks.erase(it); + // Small delay to allow OS to process the unlock before re-locking + // This prevents deadlock when multiple threads upgrade simultaneously + Sleep(1); } else { // Already hold compatible or same lock locked = true;