From bdd1fab997c6b44e31048006699cc5ed0de916b6 Mon Sep 17 00:00:00 2001 From: ImBenji Date: Sat, 13 Dec 2025 16:45:06 +0000 Subject: [PATCH] Refactor benchmark configuration and improve file handling with shared streams --- cpp/CMakeLists.txt | 1 - cpp/src/Private/sweepstore/benchmark.cpp | 29 +- cpp/src/Private/sweepstore/concurrency.cpp | 44 ++- cpp/src/Private/sweepstore/header.cpp | 79 ++--- cpp/src/Private/sweepstore/utils/fd_pool.cpp | 6 - .../Private/sweepstore/utils/file_handle.cpp | 166 +++++----- .../Private/sweepstore/utils/file_lock.cpp | 10 +- cpp/src/Public/sweepstore/utils/file_handle.h | 89 +----- cpp/src/Public/sweepstore/utils/file_lock.h | 290 +++++------------- 9 files changed, 249 insertions(+), 465 deletions(-) delete mode 100644 cpp/src/Private/sweepstore/utils/fd_pool.cpp diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index a742f3e..2472255 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -28,7 +28,6 @@ add_executable(main src/Private/sweepstore/concurrency.cpp src/Public/sweepstore/utils/file_lock.h src/Private/sweepstore/utils/file_lock.cpp - src/Private/sweepstore/utils/fd_pool.cpp src/Public/sweepstore/utils/file_handle.h src/Private/sweepstore/utils/file_handle.cpp src/Public/sweepstore/header.h diff --git a/cpp/src/Private/sweepstore/benchmark.cpp b/cpp/src/Private/sweepstore/benchmark.cpp index c27e66b..63e8ec1 100644 --- a/cpp/src/Private/sweepstore/benchmark.cpp +++ b/cpp/src/Private/sweepstore/benchmark.cpp @@ -22,6 +22,17 @@ #include "sweepstore/structures.h" #include "sweepstore/concurrency.h" +// ============================================================================ +// BENCHMARK CONFIGURATION +// ============================================================================ +const int SWEEPSTORE_CONCURRENT_WORKERS = 32; // Number of concurrent workers for Sweepstore +const int WORKER_THREAD_COUNT = 32; // Number of worker threads in the benchmark pool +const int BENCHMARK_ITERATIONS = 16; // Number of benchmark iterations +const int INITIAL_CONCURRENT_WORKERS = 1; // Starting number of concurrent operations (doubles each iteration) +const int ITERATION_DELAY_MS = 200; // Delay between iterations in milliseconds +const int INITIAL_SLEEP_MS = 1000; // Initial sleep before benchmark starts +// ============================================================================ + int main() { namespace fs = std::filesystem; @@ -31,9 +42,9 @@ int main() { std::string filePath = "./example.bin"; Sweepstore sweepstore(filePath); - sweepstore.initialise(32); + sweepstore.initialise(SWEEPSTORE_CONCURRENT_WORKERS); - preciseSleep(std::chrono::milliseconds(1000)); + preciseSleep(std::chrono::milliseconds(INITIAL_SLEEP_MS)); std::vector fileData = loadFile(filePath); std::cout << binaryDump(fileData) << std::endl; @@ -43,10 +54,8 @@ int main() { SweepstoreConcurrency::initialiseMasterAsync(filePath); - int iterations = 16; int currentIteration = 0; - - int concurrencyTest = 1; + int concurrencyTest = INITIAL_CONCURRENT_WORKERS; // Worker pool infrastructure - created once and reused std::queue> taskQueue; @@ -56,9 +65,9 @@ int main() { std::atomic shutdown{false}; std::atomic completedJobs{0}; - // Create 32 persistent worker threads BEFORE timing + // Create persistent worker threads BEFORE timing std::vector workers; - for (int i = 0; i < 32; i++) { + for (int i = 0; i < WORKER_THREAD_COUNT; i++) { workers.emplace_back([&]() { while (!shutdown) { std::function task; @@ -87,7 +96,7 @@ int main() { while (true) { - if (++currentIteration > iterations) { + if (++currentIteration > BENCHMARK_ITERATIONS) { break; } @@ -117,12 +126,12 @@ int main() { auto end = std::chrono::high_resolution_clock::now(); auto duration = std::chrono::duration_cast(end - start).count(); - std::cout << "[" << currentIteration << "/" << iterations << "] Completed " << concurrencyTest << " operations in " << duration << " ms." << std::endl; + std::cout << "[" << currentIteration << "/" << BENCHMARK_ITERATIONS << "] Completed " << concurrencyTest << " operations in " << duration << " ms." << std::endl; concurrencyTest *= 2; // Wait between iterations - preciseSleep(std::chrono::milliseconds(200)); + preciseSleep(std::chrono::milliseconds(ITERATION_DELAY_MS)); } // Shutdown workers after all iterations diff --git a/cpp/src/Private/sweepstore/concurrency.cpp b/cpp/src/Private/sweepstore/concurrency.cpp index 0b99479..a7d6eae 100644 --- a/cpp/src/Private/sweepstore/concurrency.cpp +++ b/cpp/src/Private/sweepstore/concurrency.cpp @@ -31,7 +31,7 @@ int randomId() { return (time ^ random) & 0x7FFFFFFF; } -void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file, +void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* file, const SweepstoreTicketOperation& operation, const uint32_t keyHash, const uint32_t targetSize, @@ -42,7 +42,7 @@ void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file, // FileHandle now uses thread-local streams internally - no need to create new handle! // Each thread automatically gets its own fstream from the shared file handle - SweepstoreFileHandle* file = new SweepstoreFileHandle(_file->getPath(), std::ios::in | std::ios::out | std::ios::binary); + // SweepstoreFileHandle* file = new SweepstoreFileHandle(_file->getPath(), std::ios::in | std::ios::out | std::ios::binary); /* Useful Functions @@ -52,8 +52,10 @@ void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file, auto log = [&](const std::string &message) { std::string prefix = !debugLabel.empty() ? "\033[38;5;208m[Ticket Spawner - " + debugLabel + "]:\033[0m " : "\033[38;5;208m[Ticket Spawner]:\033[0m "; // debugPrint(prefix + message); + // std::cout << prefix << message << std::endl; }; + // Sleep with variance (additive only) auto varySleep = [&](std::chrono::nanoseconds minSleepDuration, std::chrono::nanoseconds variance) { // SWEEPSTORE_TIME_SCOPE("Varying Sleep"); @@ -93,9 +95,9 @@ void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file, uint32_t ticketIndex = -1u; - while (true) { + uint32_t concurrentWorkers = concurrencyHeader.readNumberOfWorkers(); - uint32_t concurrentWorkers = concurrencyHeader.readNumberOfWorkers(); + while (true) { for (uint32_t i = 0; i < concurrentWorkers; i++) { @@ -114,6 +116,11 @@ void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file, bool is_free = snapshot.state == SweepstoreTicketState::FREE; if ((identifier_unassigned && is_free) || stale_heartbeat) { + + if (i >= 32) { + std::cout << "What the actual fuck" << std::endl; + } + SWEEPSTORE_TIME_SCOPE("Claim Ticket"); snapshot.identifier = newIdentifier; snapshot.workerHeartbeat = millisecondsSinceEpoch32(); @@ -156,20 +163,14 @@ void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file, // Wait for approval while (true) { + auto start = std::chrono::high_resolution_clock::now(); SweepstoreWorkerTicketSnapshot snapshot = myTicket.snapshot(); - // Update heartbeat - uint32_t currentTime = millisecondsSinceEpoch32(); - if (currentTime - snapshot.workerHeartbeat > STALE_HEARTBEAT_THRESHOLD_MS / 5) { - snapshot.workerHeartbeat = currentTime; - myTicket.write(snapshot); - } - // Check if we still own the ticket if (snapshot.identifier != myIdentifier) { - preciseSleep(std::chrono::milliseconds(10)); + preciseSleep(std::chrono::milliseconds(2)); // Re-verify we lost the ticket SweepstoreWorkerTicketSnapshot recheckSnapshot = myTicket.snapshot(); @@ -179,7 +180,7 @@ void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file, // ReSharper disable once CppDFAInfiniteRecursion spawnTicket( - _file, + file, operation, keyHash, targetSize, @@ -195,6 +196,16 @@ void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file, snapshot = recheckSnapshot; } + // Update heartbeat + uint32_t currentTime = millisecondsSinceEpoch32(); + uint32_t sinceLastHeartbeat = currentTime - snapshot.workerHeartbeat; + snapshot.workerHeartbeat = currentTime; + myTicket.write(snapshot); + if (sinceLastHeartbeat > STALE_HEARTBEAT_THRESHOLD_MS / 5) { + + std::cout << "\033[39;5;82m[Ticket Spawner - " << debugLabel << "]:\033[0m Heartbeat updated for ticket " << myTicket.getTicketIndex() << "." << std::endl; + } + if (snapshot.state == SweepstoreTicketState::APPROVED) { snapshot.state = SweepstoreTicketState::EXECUTING; myTicket.write(snapshot); @@ -208,10 +219,15 @@ void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file, } varySleep(std::chrono::microseconds(500), std::chrono::microseconds(200)); + + auto end = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast(end - start).count(); + + // std::cout << "Loop duration: " << duration << " ms." << std::endl; } // std::cout << "\033[38;5;82m[Ticket Spawner - " << debugLabel << "]:\033[0m Completed ticket " << myTicket.getTicketIndex() << "." << std::endl; - delete file; + // delete file; } void SweepstoreConcurrency::initialiseMaster(std::string filePath) { diff --git a/cpp/src/Private/sweepstore/header.cpp b/cpp/src/Private/sweepstore/header.cpp index eb0f487..e8c97ca 100644 --- a/cpp/src/Private/sweepstore/header.cpp +++ b/cpp/src/Private/sweepstore/header.cpp @@ -6,9 +6,8 @@ #include "sweepstore/utils/timing.h" std::string SweepstoreHeader::readMagicNumber() { - file.readSeek(0, std::ios::beg); char buffer[4]; - file.readBytes(buffer, 4); + file.seekAndRead(0, buffer, 4); return std::string(buffer, 4); } @@ -16,14 +15,12 @@ void SweepstoreHeader::writeMagicNumber(const std::string& magicNumber) { if (magicNumber.size() != 4) { throw std::invalid_argument("Magic number must be exactly 4 characters long."); } - file.writeSeek(0, std::ios::beg); - file.writeBytes(magicNumber.c_str(), 4); + file.seekAndWrite(0, magicNumber.c_str(), 4); } std::string SweepstoreHeader::readVersion() { - file.readSeek(4, std::ios::beg); char buffer[12]; - file.readBytes(buffer, 12); + file.seekAndRead(4, buffer, 12); // Trim leading and trailing spaces std::string version(buffer, 12); @@ -41,46 +38,39 @@ void SweepstoreHeader::writeVersion(const std::string& version) { std::string paddedVersion = " " + version; paddedVersion.resize(12, ' '); - file.writeSeek(4, std::ios::beg); - file.writeBytes(paddedVersion.c_str(), 12); + file.seekAndWrite(4, paddedVersion.c_str(), 12); } SweepstorePointer SweepstoreHeader::readAddressTablePointer() { - file.readSeek(16, std::ios::beg); int64_t address; - file.readBytes(reinterpret_cast(&address), sizeof(address)); + file.seekAndRead(16, reinterpret_cast(&address), sizeof(address)); return address; // Implicit conversion to SweepstorePointer } void SweepstoreHeader::writeAddressTablePointer(const SweepstorePointer& ptr) { - file.writeSeek(16, std::ios::beg); int64_t address = ptr; - file.writeBytes(reinterpret_cast(&address), sizeof(address)); + file.seekAndWrite(16, reinterpret_cast(&address), sizeof(address)); } uint32_t SweepstoreHeader::readFreeListCount() { - file.readSeek(24, std::ios::beg); uint32_t count; - file.readBytes(reinterpret_cast(&count), sizeof(count)); + file.seekAndRead(24, reinterpret_cast(&count), sizeof(count)); return count; } void SweepstoreHeader::writeFreeListCount(uint32_t count) { - file.writeSeek(24, std::ios::beg); - file.writeBytes(reinterpret_cast(&count), sizeof(count)); + file.seekAndWrite(24, reinterpret_cast(&count), sizeof(count)); } bool SweepstoreHeader::readIsFreeListLifted() { - file.readSeek(28, std::ios::beg); char flag; - file.readBytes(&flag, sizeof(flag)); + file.seekAndRead(28, &flag, sizeof(flag)); return flag != 0; } void SweepstoreHeader::writeIsFreeListLifted(bool isLifted) { - file.writeSeek(28, std::ios::beg); char flag = isLifted ? 1 : 0; - file.writeBytes(&flag, sizeof(flag)); + file.seekAndWrite(28, &flag, sizeof(flag)); } void SweepstoreHeader::initialise() { @@ -93,53 +83,44 @@ void SweepstoreHeader::initialise() { } uint64_t SweepstoreConcurrencyHeader::readMasterIdentifier() { - file.readSeek(29, std::ios::beg); uint64_t identifier; - file.readBytes(reinterpret_cast(&identifier), sizeof(identifier)); + file.seekAndRead(29, reinterpret_cast(&identifier), sizeof(identifier)); return identifier; } void SweepstoreConcurrencyHeader::writeMasterIdentifier(uint64_t identifier) { - file.writeSeek(29, std::ios::beg); - file.writeBytes(reinterpret_cast(&identifier), sizeof(identifier)); + file.seekAndWrite(29, reinterpret_cast(&identifier), sizeof(identifier)); } uint32_t SweepstoreConcurrencyHeader::readMasterHeartbeat() { - file.readSeek(37, std::ios::beg); uint32_t heartbeat; - file.readBytes(reinterpret_cast(&heartbeat), sizeof(heartbeat)); + file.seekAndRead(37, reinterpret_cast(&heartbeat), sizeof(heartbeat)); return heartbeat; } void SweepstoreConcurrencyHeader::writeMasterHeartbeat(uint32_t heartbeat) { - file.writeSeek(37, std::ios::beg); - file.writeBytes(reinterpret_cast(&heartbeat), sizeof(heartbeat)); + file.seekAndWrite(37, reinterpret_cast(&heartbeat), sizeof(heartbeat)); } uint32_t SweepstoreConcurrencyHeader::readNumberOfWorkers() { - SWEEPSTORE_TIME_FUNCTION(); - file.readSeek(41, std::ios::beg); uint32_t numWorkers; - file.readBytes(reinterpret_cast(&numWorkers), sizeof(numWorkers)); + file.seekAndRead(41, reinterpret_cast(&numWorkers), sizeof(numWorkers)); return numWorkers; } void SweepstoreConcurrencyHeader::writeNumberOfWorkers(uint32_t numWorkers) { - file.writeSeek(41, std::ios::beg); - file.writeBytes(reinterpret_cast(&numWorkers), sizeof(numWorkers)); + file.seekAndWrite(41, reinterpret_cast(&numWorkers), sizeof(numWorkers)); } bool SweepstoreConcurrencyHeader::readIsReadAllowed() { - file.readSeek(45, std::ios::beg); char flag; - file.readBytes(&flag, sizeof(flag)); + file.seekAndRead(45, &flag, sizeof(flag)); return flag != 0; } void SweepstoreConcurrencyHeader::writeIsReadAllowed(bool isAllowed) { - file.writeSeek(45, std::ios::beg); char flag = isAllowed ? 1 : 0; - file.writeBytes(&flag, sizeof(flag)); + file.seekAndWrite(45, &flag, sizeof(flag)); } void SweepstoreConcurrencyHeader::initialise(int concurrentWorkers) { @@ -149,6 +130,11 @@ void SweepstoreConcurrencyHeader::initialise(int concurrentWorkers) { writeNumberOfWorkers(concurrentWorkers); writeIsReadAllowed(true); uint32_t verifyWorkers = readNumberOfWorkers(); + + if (verifyWorkers != static_cast(concurrentWorkers)) { + throw std::runtime_error("Failed to verify number of concurrent workers in concurrency header. Expected " + std::to_string(concurrentWorkers) + ", got " + std::to_string(verifyWorkers) + "."); + } + for (uint32_t i = 0; i < verifyWorkers; i++) { SweepstoreWorkerTicketSnapshot ticket = SweepstoreWorkerTicketSnapshot(); ticket.identifier = 0; @@ -191,29 +177,28 @@ void SweepstoreWorkerTicket::write(SweepstoreWorkerTicketSnapshot &snapshot) { std::vector data = buffer.readSync(buffer.length()); char* dataPtr = reinterpret_cast(data.data()); - // Write to file + // Write to file with byte-range lock (allows parallel access to different tickets) SweepstoreFileLock lock(file.getPath(), getOffset(), TICKET_SIZE, SweepstoreFileLock::Mode::Exclusive); - lock.lock(); - file.writeSeek(getOffset()); - file.writeBytes(dataPtr, data.size()); - lock.unlock(); + SweepstoreFileLock::Scoped scopedLock(lock); + file.seekAndWrite(getOffset(), dataPtr, data.size()); file.flush(); } bool SweepstoreWorkerTicket::writable() { SWEEPSTORE_TIME_FUNCTION(); - SweepstoreFileLock lock(file.getPath(), getOffset(), TICKET_SIZE, SweepstoreFileLock::Mode::Exclusive); + // SweepstoreFileLock lock(file.getPath(), getOffset(), TICKET_SIZE, SweepstoreFileLock::Mode::Exclusive); return true; } SweepstoreWorkerTicketSnapshot SweepstoreWorkerTicket::snapshot() { SWEEPSTORE_TIME_FUNCTION(); + + // Byte-range lock for this ticket (allows parallel access to different tickets) SweepstoreFileLock lock(file.getPath(), getOffset(), TICKET_SIZE, SweepstoreFileLock::Mode::Shared); - lock.lock(); - file.readSeek(getOffset()); + SweepstoreFileLock::Scoped scopedLock(lock); + std::unique_ptr buffer(new char[TICKET_SIZE]); - file.readBytes(buffer.get(), TICKET_SIZE); - lock.unlock(); + file.seekAndRead(getOffset(), buffer.get(), TICKET_SIZE); RandomAccessMemory ram(reinterpret_cast(buffer.get()), TICKET_SIZE); SweepstoreWorkerTicketSnapshot snapshot; diff --git a/cpp/src/Private/sweepstore/utils/fd_pool.cpp b/cpp/src/Private/sweepstore/utils/fd_pool.cpp deleted file mode 100644 index b80e1ee..0000000 --- a/cpp/src/Private/sweepstore/utils/fd_pool.cpp +++ /dev/null @@ -1,6 +0,0 @@ -#include "sweepstore/utils/file_handle.h" - -// Thread-local stream cache definition for file handles -#ifndef WITH_UNREAL -thread_local std::unordered_map> SweepstoreFileHandle::streamCache; -#endif diff --git a/cpp/src/Private/sweepstore/utils/file_handle.cpp b/cpp/src/Private/sweepstore/utils/file_handle.cpp index 256212f..41f8563 100644 --- a/cpp/src/Private/sweepstore/utils/file_handle.cpp +++ b/cpp/src/Private/sweepstore/utils/file_handle.cpp @@ -1,7 +1,8 @@ #include "sweepstore/utils/file_handle.h" #include "sweepstore/utils/timing.h" +#include -// Constructor - just stores path and mode, actual stream is created per-thread +// Constructor SweepstoreFileHandle::SweepstoreFileHandle(const std::string& p, std::ios::openmode mode) : path(p) , openMode(mode) @@ -27,29 +28,14 @@ SweepstoreFileHandle::SweepstoreFileHandle(const std::string& p, std::ios::openm } #else { - // Thread-local streams created on demand in getThreadStream() -} -#endif - -#ifndef WITH_UNREAL -// Get or create the fstream for this thread -std::fstream& SweepstoreFileHandle::getThreadStream() { - auto it = streamCache.find(path); - if (it == streamCache.end() || !it->second || !it->second->is_open()) { - // Create new stream for this thread - auto stream = std::make_unique(path, openMode); - if (!stream->is_open()) { - throw std::runtime_error("Failed to open file: " + path); - } - streamCache[path] = std::move(stream); - return *streamCache[path]; + // Open the single shared stream + stream.open(path, openMode); + if (!stream.is_open()) { + throw std::runtime_error("Failed to open file: " + path); } - return *it->second; -} -const std::fstream& SweepstoreFileHandle::getThreadStream() const { - // Use const_cast to reuse the non-const version - return const_cast(this)->getThreadStream(); + // Disable stream buffering for cache coherency across threads + stream.rdbuf()->pubsetbuf(0, 0); } #endif @@ -58,8 +44,7 @@ bool SweepstoreFileHandle::isOpen() const { #ifdef WITH_UNREAL return unrealHandle != nullptr; #else - auto it = streamCache.find(path); - return it != streamCache.end() && it->second && it->second->is_open(); + return stream.is_open(); #endif } @@ -71,15 +56,43 @@ void SweepstoreFileHandle::close() { unrealHandle = nullptr; } #else - // Close this thread's stream if it exists - auto it = streamCache.find(path); - if (it != streamCache.end() && it->second && it->second->is_open()) { - it->second->close(); + std::lock_guard lock(streamMutex); + if (stream.is_open()) { + stream.close(); } #endif } -#if defined(_WIN32) || defined(WITH_UNREAL) +// seekAndRead +void SweepstoreFileHandle::seekAndRead(uint64_t offset, char* buffer, size_t size) { + SWEEPSTORE_TIME_FUNCTION(); +#ifdef WITH_UNREAL + unrealHandle->Seek(offset); + unrealHandle->Read(reinterpret_cast(buffer), size); +#else + std::lock_guard lock(streamMutex); + stream.seekg(offset, std::ios::beg); + if (stream.fail()) stream.clear(); + stream.read(buffer, size); +#endif +} + +// seekAndWrite +void SweepstoreFileHandle::seekAndWrite(uint64_t offset, const char* buffer, size_t size) { + SWEEPSTORE_TIME_FUNCTION(); +#ifdef WITH_UNREAL + unrealHandle->Seek(offset); + unrealHandle->Write(reinterpret_cast(buffer), size); + unrealHandle->Flush(); +#else + std::lock_guard lock(streamMutex); + stream.seekp(offset, std::ios::beg); + if (stream.fail()) stream.clear(); + stream.write(buffer, size); + stream.flush(); +#endif +} + // flush void SweepstoreFileHandle::flush() { #ifdef WITH_UNREAL @@ -87,13 +100,11 @@ void SweepstoreFileHandle::flush() { unrealHandle->Flush(); } #else - // Windows-specific implementation for guaranteed flush to disk - auto& stream = getThreadStream(); + std::lock_guard lock(streamMutex); stream.flush(); - // On Windows, also call sync to push to OS buffers - // Then open a Windows HANDLE to the same file and call FlushFileBuffers - // This is more reliable than trying to extract the HANDLE from fstream + // On Windows, also sync to disk +#ifdef _WIN32 HANDLE h = CreateFileA( path.c_str(), GENERIC_WRITE, @@ -109,66 +120,41 @@ void SweepstoreFileHandle::flush() { CloseHandle(h); } #endif +#endif } -// readSeek -void SweepstoreFileHandle::readSeek(std::streampos pos, std::ios::seekdir dir) { +// Move constructor +SweepstoreFileHandle::SweepstoreFileHandle(SweepstoreFileHandle&& other) noexcept + : path(std::move(other.path)) + , openMode(other.openMode) #ifdef WITH_UNREAL - // Unreal doesn't have separate read/write pointers, so just seek - int64 unrealPos = static_cast(pos); - if (dir == std::ios::beg) { - unrealHandle->Seek(unrealPos); - } else if (dir == std::ios::cur) { - unrealHandle->Seek(unrealHandle->Tell() + unrealPos); - } else if (dir == std::ios::end) { - unrealHandle->SeekFromEnd(unrealPos); + , unrealHandle(other.unrealHandle) +{ + other.unrealHandle = nullptr; +} +#else + , stream(std::move(other.stream)) +{ +} +#endif + +// Move assignment +SweepstoreFileHandle& SweepstoreFileHandle::operator=(SweepstoreFileHandle&& other) noexcept { + if (this != &other) { + close(); + path = std::move(other.path); + openMode = other.openMode; +#ifdef WITH_UNREAL + unrealHandle = other.unrealHandle; + other.unrealHandle = nullptr; +#else + stream = std::move(other.stream); +#endif } -#else - // Windows - simplified to only seek read pointer - auto& stream = getThreadStream(); - stream.seekg(pos, dir); - if (stream.fail()) { - stream.clear(); - } -#endif + return *this; } -// writeSeek -void SweepstoreFileHandle::writeSeek(std::streampos pos, std::ios::seekdir dir) { -#ifdef WITH_UNREAL - // Same as readSeek for Unreal - readSeek(pos, dir); -#else - // Windows - simplified to only seek write pointer - auto& stream = getThreadStream(); - stream.seekp(pos, dir); - if (stream.fail()) { - stream.clear(); - } -#endif +// Destructor +SweepstoreFileHandle::~SweepstoreFileHandle() { + close(); } - -// readBytes -void SweepstoreFileHandle::readBytes(char* buffer, std::streamsize size) { -#ifdef WITH_UNREAL - unrealHandle->Read(reinterpret_cast(buffer), size); -#else - // Windows - auto& stream = getThreadStream(); - stream.read(buffer, size); -#endif -} - -// writeBytes -void SweepstoreFileHandle::writeBytes(const char* buffer, std::streamsize size) { - SWEEPSTORE_TIME_FUNCTION(); -#ifdef WITH_UNREAL - unrealHandle->Write(reinterpret_cast(buffer), size); - unrealHandle->Flush(); // Unreal requires explicit flush -#else - // Windows - auto& stream = getThreadStream(); - stream.write(buffer, size); -#endif -} -#endif // _WIN32 || WITH_UNREAL \ No newline at end of file diff --git a/cpp/src/Private/sweepstore/utils/file_lock.cpp b/cpp/src/Private/sweepstore/utils/file_lock.cpp index 8656820..f550979 100644 --- a/cpp/src/Private/sweepstore/utils/file_lock.cpp +++ b/cpp/src/Private/sweepstore/utils/file_lock.cpp @@ -1,10 +1,4 @@ #include "sweepstore/utils/file_lock.h" -// Define thread-local static members -thread_local std::map SweepstoreFileLock::activeLocks; - -#ifdef _WIN32 -thread_local std::unordered_map SweepstoreFileLock::handleCache; -#else -thread_local std::unordered_map SweepstoreFileLock::fdCache; -#endif +// Implementation is entirely in the header (inline and static members) +// This file exists to satisfy CMake build requirements \ No newline at end of file diff --git a/cpp/src/Public/sweepstore/utils/file_handle.h b/cpp/src/Public/sweepstore/utils/file_handle.h index f9e3cfb..23b382b 100644 --- a/cpp/src/Public/sweepstore/utils/file_handle.h +++ b/cpp/src/Public/sweepstore/utils/file_handle.h @@ -2,9 +2,7 @@ #include #include -#include -#include -#include +#include #ifdef _WIN32 #include @@ -22,15 +20,15 @@ class SweepstoreFileHandle { private: std::string path; std::ios::openmode openMode; + #ifdef WITH_UNREAL IFileHandle* unrealHandle; #else - // Thread-local cache: each thread gets its own fstream per path - static thread_local std::unordered_map> streamCache; + // Single shared stream for all threads + std::fstream stream; - // Get or create the fstream for this thread - std::fstream& getThreadStream(); - const std::fstream& getThreadStream() const; + // Mutex protecting the stream + std::mutex streamMutex; #endif public: @@ -38,76 +36,23 @@ public: const std::string& getPath() const { return path; } -#ifndef WITH_UNREAL - std::fstream& getStream() { return getThreadStream(); } - const std::fstream& getStream() const { return getThreadStream(); } - - // Smart pointer-like interface - std::fstream* operator->() { return &getThreadStream(); } - const std::fstream* operator->() const { return &getThreadStream(); } - - std::fstream& operator*() { return getThreadStream(); } - const std::fstream& operator*() const { return getThreadStream(); } -#endif - bool isOpen() const; void close(); - // Windows-compatible I/O wrappers -#if defined(_WIN32) || defined(WITH_UNREAL) + // Main I/O API - atomic seek+read/write operations + void seekAndRead(uint64_t offset, char* buffer, size_t size); + void seekAndWrite(uint64_t offset, const char* buffer, size_t size); + + // Explicit flush void flush(); - void readSeek(std::streampos pos, std::ios::seekdir dir = std::ios::beg); - void writeSeek(std::streampos pos, std::ios::seekdir dir = std::ios::beg); - void readBytes(char* buffer, std::streamsize size); - void writeBytes(const char* buffer, std::streamsize size); -#else - // Inline for non-Windows to avoid overhead - inline void flush() { - getThreadStream().flush(); - } - inline void readSeek(std::streampos pos, std::ios::seekdir dir = std::ios::beg) { - getThreadStream().seekg(pos, dir); - } - inline void writeSeek(std::streampos pos, std::ios::seekdir dir = std::ios::beg) { - getThreadStream().seekp(pos, dir); - } - inline void readBytes(char* buffer, std::streamsize size) { - getThreadStream().read(buffer, size); - } - inline void writeBytes(const char* buffer, std::streamsize size) { - getThreadStream().write(buffer, size); - } -#endif - SweepstoreFileHandle(SweepstoreFileHandle&& other) noexcept - : path(std::move(other.path)) - , openMode(other.openMode) -#ifdef WITH_UNREAL - , unrealHandle(other.unrealHandle) -#endif - { -#ifdef WITH_UNREAL - other.unrealHandle = nullptr; -#endif - } - - SweepstoreFileHandle& operator=(SweepstoreFileHandle&& other) noexcept { - if (this != &other) { - close(); - path = std::move(other.path); - openMode = other.openMode; -#ifdef WITH_UNREAL - unrealHandle = other.unrealHandle; - other.unrealHandle = nullptr; -#endif - } - return *this; - } + // Move semantics + SweepstoreFileHandle(SweepstoreFileHandle&& other) noexcept; + SweepstoreFileHandle& operator=(SweepstoreFileHandle&& other) noexcept; + // Delete copy semantics SweepstoreFileHandle(const SweepstoreFileHandle&) = delete; SweepstoreFileHandle& operator=(const SweepstoreFileHandle&) = delete; - ~SweepstoreFileHandle() { - close(); - } -}; \ No newline at end of file + ~SweepstoreFileHandle(); +}; diff --git a/cpp/src/Public/sweepstore/utils/file_lock.h b/cpp/src/Public/sweepstore/utils/file_lock.h index 4a60980..52332eb 100644 --- a/cpp/src/Public/sweepstore/utils/file_lock.h +++ b/cpp/src/Public/sweepstore/utils/file_lock.h @@ -2,43 +2,46 @@ #include #include -#include -#include +#include +#include #include +#include +#include +#include -#include "sweepstore/utils/timing.h" - -#ifdef _WIN32 - #include -#else - #include - #include - #include -#endif - -// Simple file lock using flock() with thread-local FD cache -// Each thread has its own FD, flock() is per-FD, so threads don't conflict -// Matches Dart's paradigm: each isolate has its own RandomAccessFile +// C++ level byte-range locking +// Allows Thread A (ticket 5) and Thread B (ticket 10) to work in parallel on different byte ranges +// Uses static shared state to coordinate locks across all SweepstoreFileLock instances class SweepstoreFileLock { public: enum class Mode { Shared, Exclusive }; private: - // Key: file path + offset, Value: Mode - struct LockKey { - std::string path; + struct LockRange { uint64_t offset; uint64_t length; + Mode mode; - bool operator<(const LockKey& other) const { - if (path != other.path) return path < other.path; - if (offset != other.offset) return offset < other.offset; - return length < other.length; + uint64_t end() const { return offset + length; } + + bool overlaps(uint64_t otherOffset, uint64_t otherLength) const { + uint64_t otherEnd = otherOffset + otherLength; + return offset < otherEnd && otherOffset < end(); } }; - // Track active locks per thread to prevent self-deadlock - static thread_local std::map activeLocks; + // Static shared state for all locks across all instances + struct SharedLockState { + std::mutex mutex; + std::condition_variable cv; + // Map: file path -> list of active lock ranges + std::map> activeLocks; + }; + + static SharedLockState& getSharedState() { + static SharedLockState state; + return state; + } std::string filePath; uint64_t offset; @@ -46,183 +49,79 @@ private: Mode mode; bool locked = false; -#ifdef _WIN32 - // Thread-local HANDLE cache for Windows - static thread_local std::unordered_map handleCache; - - static HANDLE getOrOpenHandle(const std::string& path) { - auto it = handleCache.find(path); - if (it != handleCache.end()) { - return it->second; + // Check if acquiring this lock would conflict with existing locks + bool wouldConflict(const std::vector& existingLocks) const { + for (const auto& existing : existingLocks) { + if (existing.overlaps(offset, length)) { + // Conflict if either lock is exclusive + if (mode == Mode::Exclusive || existing.mode == Mode::Exclusive) { + return true; + } + // Shared locks don't conflict with each other + } } - - HANDLE handle = CreateFileA( - path.c_str(), - GENERIC_READ | GENERIC_WRITE, - FILE_SHARE_READ | FILE_SHARE_WRITE, - NULL, - OPEN_EXISTING, - FILE_ATTRIBUTE_NORMAL, - NULL - ); - - if (handle == INVALID_HANDLE_VALUE) { - throw std::runtime_error("Failed to open file for locking: " + path); - } - - handleCache[path] = handle; - return handle; + return false; } void acquire() { - LockKey key{filePath, offset, length}; + auto& state = getSharedState(); + std::unique_lock lock(state.mutex); - // Check if we already hold a lock on this region - auto it = activeLocks.find(key); - if (it != activeLocks.end()) { - // If we're trying to upgrade from shared to exclusive, release first - if (it->second == Mode::Shared && mode == Mode::Exclusive) { - releaseInternal(); // Release the old shared lock - activeLocks.erase(it); - // Small delay to allow OS to process the unlock before re-locking - // This prevents deadlock when multiple threads upgrade simultaneously - Sleep(1); - } else { - // Already hold compatible or same lock - locked = true; - return; + // Wait until no conflicts + state.cv.wait(lock, [&]() { + auto it = state.activeLocks.find(filePath); + if (it == state.activeLocks.end()) { + return true; // No locks on this file yet } - } + return !wouldConflict(it->second); + }); - HANDLE handle = getOrOpenHandle(filePath); - OVERLAPPED overlapped = {}; // Proper zero-initialization - overlapped.Offset = static_cast(offset & 0xFFFFFFFF); - overlapped.OffsetHigh = static_cast(offset >> 32); - - DWORD length_low = static_cast(length & 0xFFFFFFFF); - DWORD length_high = static_cast(length >> 32); - DWORD flags = (mode == Mode::Exclusive) ? LOCKFILE_EXCLUSIVE_LOCK : 0; - - if (!LockFileEx(handle, flags, 0, length_low, length_high, &overlapped)) { - throw std::runtime_error("Failed to acquire file lock"); - } + // Add our lock + state.activeLocks[filePath].push_back({offset, length, mode}); locked = true; - activeLocks[key] = mode; - } - - void releaseInternal() { - if (locked) { - HANDLE handle = getOrOpenHandle(filePath); - OVERLAPPED overlapped = {}; - overlapped.Offset = static_cast(offset & 0xFFFFFFFF); - overlapped.OffsetHigh = static_cast(offset >> 32); - - DWORD length_low = static_cast(length & 0xFFFFFFFF); - DWORD length_high = static_cast(length >> 32); - - UnlockFileEx(handle, 0, length_low, length_high, &overlapped); - locked = false; - } } void release() { - if (locked) { - LockKey key{filePath, offset, length}; - releaseInternal(); - activeLocks.erase(key); - } - } -#else - // Thread-local FD cache - each thread has its own FD per file - static thread_local std::unordered_map fdCache; + if (!locked) return; - static int getOrOpenFD(const std::string& path) { - auto it = fdCache.find(path); - if (it != fdCache.end()) { - return it->second; - } + auto& state = getSharedState(); + std::unique_lock lock(state.mutex); - int fd = open(path.c_str(), O_RDWR); - if (fd == -1) { - throw std::runtime_error("Failed to open file for locking: " + path); - } + auto it = state.activeLocks.find(filePath); + if (it != state.activeLocks.end()) { + auto& locks = it->second; - fdCache[path] = fd; - return fd; - } + // Remove our lock (find by offset/length/mode match) + locks.erase( + std::remove_if(locks.begin(), locks.end(), [&](const LockRange& r) { + return r.offset == offset && r.length == length && r.mode == mode; + }), + locks.end() + ); - void acquire() { - LockKey key{filePath, offset, length}; - - // Check if we already hold a lock on this region - auto it = activeLocks.find(key); - if (it != activeLocks.end()) { - // If we're trying to upgrade from shared to exclusive, release first - if (it->second == Mode::Shared && mode == Mode::Exclusive) { - releaseInternal(); // Release the old shared lock - activeLocks.erase(it); - } else { - // Already hold compatible or same lock - locked = true; - return; + // Clean up if no more locks on this file + if (locks.empty()) { + state.activeLocks.erase(it); } } - int fd = getOrOpenFD(filePath); + locked = false; - struct flock lock_info; - lock_info.l_type = (mode == Mode::Exclusive) ? F_WRLCK : F_RDLCK; - lock_info.l_whence = SEEK_SET; - lock_info.l_start = offset; - lock_info.l_len = length; - lock_info.l_pid = 0; - - if (fcntl(fd, F_SETLKW, &lock_info) == -1) { - throw std::runtime_error("Failed to acquire file lock"); - } - locked = true; - activeLocks[key] = mode; + // Notify waiting threads + state.cv.notify_all(); } - void releaseInternal() { - if (locked) { - int fd = getOrOpenFD(filePath); - - struct flock lock_info; - lock_info.l_type = F_UNLCK; - lock_info.l_whence = SEEK_SET; - lock_info.l_start = offset; - lock_info.l_len = length; - lock_info.l_pid = 0; - - fcntl(fd, F_SETLK, &lock_info); - locked = false; - } - } - - void release() { - if (locked) { - LockKey key{filePath, offset, length}; - releaseInternal(); - activeLocks.erase(key); - } - } -#endif - public: - // Constructor accepts offset/length for byte-range locking SweepstoreFileLock(const std::string& path, uint64_t off, uint64_t len, Mode m) : filePath(path), offset(off), length(len), mode(m) {} ~SweepstoreFileLock() { release(); } void lock() { - SWEEPSTORE_TIME_FUNCTION(); if (!locked) acquire(); } void unlock() { - SWEEPSTORE_TIME_FUNCTION(); release(); } @@ -230,59 +129,16 @@ public: return locked; } - // Check if file is currently locked (non-blocking test) - bool isLocked() { -#ifdef _WIN32 - HANDLE handle = getOrOpenHandle(filePath); - OVERLAPPED overlapped = {}; - overlapped.Offset = static_cast(offset & 0xFFFFFFFF); - overlapped.OffsetHigh = static_cast(offset >> 32); - - DWORD length_low = static_cast(length & 0xFFFFFFFF); - DWORD length_high = static_cast(length >> 32); - DWORD flags = (mode == Mode::Exclusive) ? LOCKFILE_EXCLUSIVE_LOCK : 0; - flags |= LOCKFILE_FAIL_IMMEDIATELY; - - // Try non-blocking lock - if (!LockFileEx(handle, flags, 0, length_low, length_high, &overlapped)) { - return true; // Already locked - } - - // Got the lock, release immediately - UnlockFileEx(handle, 0, length_low, length_high, &overlapped); - return false; -#else - int fd = getOrOpenFD(filePath); - - struct flock lock_info; - lock_info.l_type = (mode == Mode::Exclusive) ? F_WRLCK : F_RDLCK; - lock_info.l_whence = SEEK_SET; - lock_info.l_start = offset; - lock_info.l_len = length; - lock_info.l_pid = 0; - - // Try non-blocking lock - if (fcntl(fd, F_SETLK, &lock_info) == -1) { - return true; // Already locked - } - - // Got the lock, release immediately - lock_info.l_type = F_UNLCK; - fcntl(fd, F_SETLK, &lock_info); - return false; -#endif - } - // RAII helper for scoped locking class Scoped { - SweepstoreFileLock& lock; + SweepstoreFileLock& lockRef; public: - Scoped(SweepstoreFileLock& l) : lock(l) { - lock.lock(); + Scoped(SweepstoreFileLock& l) : lockRef(l) { + lockRef.lock(); } ~Scoped() { - lock.unlock(); + lockRef.unlock(); } Scoped(const Scoped&) = delete; @@ -292,4 +148,4 @@ public: // Disable copying SweepstoreFileLock(const SweepstoreFileLock&) = delete; SweepstoreFileLock& operator=(const SweepstoreFileLock&) = delete; -}; +}; \ No newline at end of file