Fixed windows deadlocks, performance is shit tho
This commit is contained in:
@@ -11,6 +11,7 @@
|
||||
#include "sweepstore/header.h"
|
||||
#include "sweepstore/utils/helpers.h"
|
||||
#include "sweepstore/utils/file_handle.h"
|
||||
#include "sweepstore/utils/timing.h"
|
||||
|
||||
|
||||
uint64_t getRandomOffset(uint64_t maxValue) {
|
||||
@@ -37,6 +38,7 @@ void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file,
|
||||
const std::function<void()> onApproved,
|
||||
std::string debugLabel
|
||||
) {
|
||||
SWEEPSTORE_TIME_FUNCTION();
|
||||
|
||||
// FileHandle now uses thread-local streams internally - no need to create new handle!
|
||||
// Each thread automatically gets its own fstream from the shared file handle
|
||||
@@ -54,6 +56,7 @@ void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file,
|
||||
|
||||
// Sleep with variance (additive only)
|
||||
auto varySleep = [&](std::chrono::nanoseconds minSleepDuration, std::chrono::nanoseconds variance) {
|
||||
// SWEEPSTORE_TIME_SCOPE("Varying Sleep");
|
||||
if (variance.count() <= 0) {
|
||||
preciseSleep(minSleepDuration);
|
||||
} else {
|
||||
@@ -66,10 +69,12 @@ void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file,
|
||||
// Exponential sleep
|
||||
std::unordered_map<std::string, int> expSleepTracker = {};
|
||||
auto expSleep = [&expSleepTracker](const std::string& label) {
|
||||
SWEEPSTORE_TIME_SCOPE("Exponential Sleep");
|
||||
int count = expSleepTracker[label]; // defaults to 0 if not found
|
||||
int sleepTime = (1 << count); // Exponential backoff
|
||||
sleepTime = std::max(1, std::min(sleepTime, 1000)); // Clamp between 1ms and 1000ms
|
||||
preciseSleep(std::chrono::milliseconds(sleepTime));
|
||||
// sleepTime = 1000;
|
||||
preciseSleep(std::chrono::microseconds(sleepTime * 500));
|
||||
expSleepTracker[label] = count + 1;
|
||||
};
|
||||
|
||||
@@ -81,6 +86,7 @@ void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file,
|
||||
Ticket Acquisition
|
||||
*/
|
||||
auto acquireTicket = [&](uint32_t newIdentifier) -> SweepstoreWorkerTicket {
|
||||
SWEEPSTORE_TIME_SCOPE("acquireTicket");
|
||||
|
||||
// Reduce the chance of race condition
|
||||
varySleep(std::chrono::microseconds(500), std::chrono::microseconds(200));
|
||||
@@ -108,6 +114,7 @@ void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file,
|
||||
bool is_free = snapshot.state == SweepstoreTicketState::FREE;
|
||||
|
||||
if (identifier_unassigned && stale_heartbeat && is_free) {
|
||||
SWEEPSTORE_TIME_SCOPE("Claim Ticket");
|
||||
snapshot.identifier = newIdentifier;
|
||||
snapshot.workerHeartbeat = millisecondsSinceEpoch32();
|
||||
snapshot.state = SweepstoreTicketState::WAITING;
|
||||
@@ -208,6 +215,7 @@ void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file,
|
||||
}
|
||||
|
||||
void SweepstoreConcurrency::initialiseMaster(std::string filePath) {
|
||||
SWEEPSTORE_TIME_FUNCTION();
|
||||
|
||||
auto log = [&](const std::string &message) {
|
||||
debugPrint("\033[38;5;33m[Concurrency Master]:\033[0m " + message);
|
||||
|
||||
Reference in New Issue
Block a user