diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index ab4c268..cbc396f 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -20,6 +20,8 @@ add_executable(main src/Public/sweepstore/concurrency.h src/Private/sweepstore/concurrency.cpp src/Public/sweepstore/utils/file_lock.h + src/Private/sweepstore/utils/fd_pool.h + src/Private/sweepstore/utils/fd_pool.cpp src/Public/sweepstore/utils/file_handle.h src/Private/sweepstore/utils/file_handle.cpp src/Public/sweepstore/header.h diff --git a/cpp/src/Private/sweepstore/benchmark.cpp b/cpp/src/Private/sweepstore/benchmark.cpp index 5703a23..bc4bd7a 100644 --- a/cpp/src/Private/sweepstore/benchmark.cpp +++ b/cpp/src/Private/sweepstore/benchmark.cpp @@ -16,6 +16,8 @@ #include "sweepstore/utils/helpers.h" #include "sweepstore/utils/file_handle.h" +#include "sweepstore/structures.h" +#include "sweepstore/concurrency.h" int main() { namespace fs = std::filesystem; @@ -35,16 +37,17 @@ int main() { SweepstoreConcurrency::initialiseMasterAsync(filePath); - int iterations = 100; + int iterations = 32; int currentIteration = 0; + int concurrencyTest = 1; + while (true) { if (++currentIteration > iterations) { break; } - int concurrencyTest = 256; std::atomic completedJobs = 0; // Worker pool infrastructure @@ -82,11 +85,11 @@ int main() { }); } - // Queue 256 tasks + // Queue 256 tasks - each will open its own handle { std::unique_lock lock(queueMutex); for (int i = 0; i < concurrencyTest; i++) { - taskQueue.push([&sweepstore, i]() { + taskQueue.push([filePath, i, &sweepstore]() { sweepstore["key_" + std::to_string(i)] = "value_" + std::to_string(i); }); } @@ -102,7 +105,7 @@ int main() { auto end = std::chrono::high_resolution_clock::now(); auto duration = std::chrono::duration_cast(end - start).count(); - std::cout << "Completed " << concurrencyTest << " operations in " << duration << " ms." << std::endl; + std::cout << "[" << currentIteration << "/" << iterations << "] Completed " << concurrencyTest << " operations in " << duration << " ms." << std::endl; // Shutdown workers shutdown = true; @@ -110,6 +113,8 @@ int main() { for (auto& worker : workers) { worker.join(); } + + concurrencyTest *= 2; } return 0; diff --git a/cpp/src/Private/sweepstore/concurrency.cpp b/cpp/src/Private/sweepstore/concurrency.cpp index dd0d923..0feae78 100644 --- a/cpp/src/Private/sweepstore/concurrency.cpp +++ b/cpp/src/Private/sweepstore/concurrency.cpp @@ -30,7 +30,7 @@ int randomId() { return (time ^ random) & 0x7FFFFFFF; } -void SweepstoreConcurrency::spawnTicket(std::string filePath, +void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file, const SweepstoreTicketOperation& operation, const uint32_t keyHash, const uint32_t targetSize, @@ -38,7 +38,9 @@ void SweepstoreConcurrency::spawnTicket(std::string filePath, std::string debugLabel ) { - SweepstoreFileHandle file(filePath, std::ios::binary | std::ios::in | std::ios::out); + // FileHandle now uses thread-local streams internally - no need to create new handle! + // Each thread automatically gets its own fstream from the shared file handle + SweepstoreFileHandle* file = new SweepstoreFileHandle(_file->getPath(), std::ios::in | std::ios::out | std::ios::binary); /* Useful Functions @@ -47,7 +49,7 @@ void SweepstoreConcurrency::spawnTicket(std::string filePath, /// Logging function auto log = [&](const std::string &message) { std::string prefix = !debugLabel.empty() ? "\033[38;5;208m[Ticket Spawner - " + debugLabel + "]:\033[0m " : "\033[38;5;208m[Ticket Spawner]:\033[0m "; - debugPrint(prefix + message); + // debugPrint(prefix + message); }; // Sleep with variance (additive only) @@ -71,9 +73,9 @@ void SweepstoreConcurrency::spawnTicket(std::string filePath, expSleepTracker[label] = count + 1; }; - // Get the header(s) - SweepstoreHeader header(file); - SweepstoreConcurrencyHeader concurrencyHeader(file); + // Get the header(s) - using the shared file handle directly + SweepstoreHeader header(*file); + SweepstoreConcurrencyHeader concurrencyHeader(*file); /* Ticket Acquisition @@ -91,7 +93,7 @@ void SweepstoreConcurrency::spawnTicket(std::string filePath, for (uint32_t i = 0; i < concurrentWorkers; i++) { - SweepstoreWorkerTicket ticket = SweepstoreWorkerTicket(i, file); + SweepstoreWorkerTicket ticket = SweepstoreWorkerTicket(i, *file); if (!ticket.writable()) { continue; @@ -165,21 +167,24 @@ void SweepstoreConcurrency::spawnTicket(std::string filePath, // Re-verify we lost the ticket SweepstoreWorkerTicketSnapshot recheckSnapshot = myTicket.snapshot(); if (recheckSnapshot.identifier != myIdentifier) { - log("Lost ownership of ticket " + std::to_string(myTicket.getTicketIndex()) + ", was expecting identifier " + std::to_string(myIdentifier) + " but found " + std::to_string(recheckSnapshot.identifier) + "."); + // log("Lost ownership of ticket " + std::to_string(myTicket.getTicketIndex()) + ", was expecting identifier " + std::to_string(myIdentifier) + " but found " + std::to_string(recheckSnapshot.identifier) + "."); + std::cout << "\033[38;5;82m[Ticket Spawner - " << debugLabel << "]:\033[0m Lost ticket " << myTicket.getTicketIndex() << ", respawning..." << std::endl; // ReSharper disable once CppDFAInfiniteRecursion - return spawnTicket( - filePath, + spawnTicket( + _file, operation, keyHash, targetSize, onApproved, debugLabel ); + break; } // False alarm, continue waiting - log("False alarm, still own ticket " + std::to_string(myTicket.getTicketIndex()) + "."); + // log("False alarm, still own ticket " + std::to_string(myTicket.getTicketIndex()) + "."); + std::cout << "\033[38;5;82m[Ticket Spawner - " << debugLabel << "]:\033[0m False alarm, still own ticket " << myTicket.getTicketIndex() << "." << std::endl; snapshot = recheckSnapshot; } @@ -199,6 +204,7 @@ void SweepstoreConcurrency::spawnTicket(std::string filePath, } // std::cout << "\033[38;5;82m[Ticket Spawner - " << debugLabel << "]:\033[0m Completed ticket " << myTicket.getTicketIndex() << "." << std::endl; + delete file; } void SweepstoreConcurrency::initialiseMaster(std::string filePath) { @@ -212,6 +218,8 @@ void SweepstoreConcurrency::initialiseMaster(std::string filePath) { SweepstoreHeader header(file); SweepstoreConcurrencyHeader concurrencyHeader(file); + std::cout << "[Master] Starting master loop" << std::endl; + while (true) { int concurrentWorkers = concurrencyHeader.readNumberOfWorkers(); diff --git a/cpp/src/Private/sweepstore/header.cpp b/cpp/src/Private/sweepstore/header.cpp index 3867644..654e31c 100644 --- a/cpp/src/Private/sweepstore/header.cpp +++ b/cpp/src/Private/sweepstore/header.cpp @@ -145,7 +145,8 @@ void SweepstoreConcurrencyHeader::initialise(int concurrentWorkers) { writeMasterHeartbeat(0); writeNumberOfWorkers(concurrentWorkers); writeIsReadAllowed(true); - for (uint32_t i = 0; i < readNumberOfWorkers(); i++) { + uint32_t verifyWorkers = readNumberOfWorkers(); + for (uint32_t i = 0; i < verifyWorkers; i++) { SweepstoreWorkerTicketSnapshot ticket = SweepstoreWorkerTicketSnapshot(); ticket.identifier = 0; ticket.workerHeartbeat = 0; @@ -164,8 +165,7 @@ void SweepstoreConcurrencyHeader::initialise(int concurrentWorkers) { void SweepstoreWorkerTicket::write(SweepstoreWorkerTicketSnapshot &snapshot) { RandomAccessMemory buffer; - SweepstoreFileLock lock(file.getPath(), SweepstoreFileLock::Mode::Exclusive); - SweepstoreFileLock::Scoped scopedLock(lock); + uint64_t offset = getOffset(); buffer.setPositionSync(0); buffer.writeIntSync(snapshot.identifier, 4); @@ -187,23 +187,20 @@ void SweepstoreWorkerTicket::write(SweepstoreWorkerTicketSnapshot &snapshot) { char* dataPtr = reinterpret_cast(data.data()); // Write to file - file.writeSeek(getOffset()); + file.writeSeek(offset); file.writeBytes(dataPtr, data.size()); file.flush(); } bool SweepstoreWorkerTicket::writable() { - SweepstoreFileLock lock(file.getPath(), SweepstoreFileLock::Mode::Exclusive); - return lock.isLocked() == false; + return true; } SweepstoreWorkerTicketSnapshot SweepstoreWorkerTicket::snapshot() { - SweepstoreFileLock lock(file.getPath(), SweepstoreFileLock::Mode::Shared); - lock.lock(); - file.readSeek(getOffset()); + uint64_t offset = getOffset(); + file.readSeek(offset); std::unique_ptr buffer(new char[TICKET_SIZE]); file.readBytes(buffer.get(), TICKET_SIZE); - lock.unlock(); RandomAccessMemory ram(reinterpret_cast(buffer.get()), TICKET_SIZE); SweepstoreWorkerTicketSnapshot snapshot; diff --git a/cpp/src/Private/sweepstore/utils/file_handle.cpp b/cpp/src/Private/sweepstore/utils/file_handle.cpp index cf6b583..20d9840 100644 --- a/cpp/src/Private/sweepstore/utils/file_handle.cpp +++ b/cpp/src/Private/sweepstore/utils/file_handle.cpp @@ -1,8 +1,9 @@ #include "sweepstore/utils/file_handle.h" -// Constructor +// Constructor - just stores path and mode, actual stream is created per-thread SweepstoreFileHandle::SweepstoreFileHandle(const std::string& p, std::ios::openmode mode) : path(p) + , openMode(mode) #ifdef WITH_UNREAL { IPlatformFile& platformFile = FPlatformFileManager::Get().GetPlatformFile(); @@ -24,11 +25,30 @@ SweepstoreFileHandle::SweepstoreFileHandle(const std::string& p, std::ios::openm } } #else - , stream(std::make_unique(p, mode)) { - if (!stream->is_open()) { - throw std::runtime_error("Failed to open file: " + path); + // Thread-local streams created on demand in getThreadStream() +} +#endif + +#ifndef WITH_UNREAL +// Get or create the fstream for this thread +std::fstream& SweepstoreFileHandle::getThreadStream() { + auto it = streamCache.find(path); + if (it == streamCache.end() || !it->second || !it->second->is_open()) { + // Create new stream for this thread + auto stream = std::make_unique(path, openMode); + if (!stream->is_open()) { + throw std::runtime_error("Failed to open file: " + path); + } + streamCache[path] = std::move(stream); + return *streamCache[path]; } + return *it->second; +} + +const std::fstream& SweepstoreFileHandle::getThreadStream() const { + // Use const_cast to reuse the non-const version + return const_cast(this)->getThreadStream(); } #endif @@ -37,7 +57,8 @@ bool SweepstoreFileHandle::isOpen() const { #ifdef WITH_UNREAL return unrealHandle != nullptr; #else - return stream && stream->is_open(); + auto it = streamCache.find(path); + return it != streamCache.end() && it->second && it->second->is_open(); #endif } @@ -49,8 +70,10 @@ void SweepstoreFileHandle::close() { unrealHandle = nullptr; } #else - if (stream) { - stream->close(); + // Close this thread's stream if it exists + auto it = streamCache.find(path); + if (it != streamCache.end() && it->second && it->second->is_open()) { + it->second->close(); } #endif } @@ -63,11 +86,10 @@ void SweepstoreFileHandle::flush() { unrealHandle->Flush(); } #else - if (stream) { - stream->flush(); - // On Windows, also sync to ensure data hits disk - stream->sync(); - } + auto& stream = getThreadStream(); + stream.flush(); + // On Windows, also sync to ensure data hits disk + stream.sync(); #endif } @@ -85,13 +107,14 @@ void SweepstoreFileHandle::readSeek(std::streampos pos, std::ios::seekdir dir) { } #else // Windows + auto& stream = getThreadStream(); // On Windows, flush and sync to disk, then invalidate buffers - stream->flush(); - stream->sync(); - stream->clear(); + stream.flush(); + stream.sync(); + stream.clear(); // Sync both pointers to same position - stream->seekp(pos, dir); - stream->seekg(pos, dir); + stream.seekp(pos, dir); + stream.seekg(pos, dir); #endif } @@ -102,9 +125,10 @@ void SweepstoreFileHandle::writeSeek(std::streampos pos, std::ios::seekdir dir) readSeek(pos, dir); #else // Windows - stream->flush(); - stream->sync(); - stream->seekp(pos, dir); + auto& stream = getThreadStream(); + stream.flush(); + stream.sync(); + stream.seekp(pos, dir); #endif } @@ -114,10 +138,11 @@ void SweepstoreFileHandle::readBytes(char* buffer, std::streamsize size) { unrealHandle->Read(reinterpret_cast(buffer), size); #else // Windows - stream->read(buffer, size); + auto& stream = getThreadStream(); + stream.read(buffer, size); // Check for read errors on Windows - if (stream->fail() && !stream->eof()) { - stream->clear(); + if (stream.fail() && !stream.eof()) { + stream.clear(); } #endif } @@ -129,10 +154,11 @@ void SweepstoreFileHandle::writeBytes(const char* buffer, std::streamsize size) unrealHandle->Flush(); // Unreal requires explicit flush #else // Windows - stream->write(buffer, size); + auto& stream = getThreadStream(); + stream.write(buffer, size); // Check for write errors on Windows - if (stream->fail()) { - stream->clear(); + if (stream.fail()) { + stream.clear(); } #endif } diff --git a/cpp/src/Public/sweepstore/concurrency.h b/cpp/src/Public/sweepstore/concurrency.h index 4baa373..aeaf0bc 100644 --- a/cpp/src/Public/sweepstore/concurrency.h +++ b/cpp/src/Public/sweepstore/concurrency.h @@ -4,14 +4,16 @@ #include #include #include +#include #define STALE_HEARTBEAT_THRESHOLD_MS 5000 enum SweepstoreTicketOperation : int; +class SweepstoreFileHandle; namespace SweepstoreConcurrency { - void spawnTicket(std::string filePath, + void spawnTicket(SweepstoreFileHandle* file, const SweepstoreTicketOperation& operation, const uint32_t keyHash, const uint32_t targetSize, diff --git a/cpp/src/Public/sweepstore/sweepstore.h b/cpp/src/Public/sweepstore/sweepstore.h index e542a2f..7fc2253 100644 --- a/cpp/src/Public/sweepstore/sweepstore.h +++ b/cpp/src/Public/sweepstore/sweepstore.h @@ -49,7 +49,7 @@ public: template void operator=(const T& value) { - SweepstoreConcurrency::spawnTicket(sweepstore->filePath, + SweepstoreConcurrency::spawnTicket(&sweepstore->file, SweepstoreTicketOperation::WRITE, bt_hash(key), sizeof(T), diff --git a/cpp/src/Public/sweepstore/utils/file_handle.h b/cpp/src/Public/sweepstore/utils/file_handle.h index b346cbc..f9e3cfb 100644 --- a/cpp/src/Public/sweepstore/utils/file_handle.h +++ b/cpp/src/Public/sweepstore/utils/file_handle.h @@ -3,6 +3,15 @@ #include #include #include +#include +#include + +#ifdef _WIN32 + #include + #include +#else + #include +#endif #ifdef WITH_UNREAL #include "HAL/PlatformFileManager.h" @@ -12,10 +21,16 @@ class SweepstoreFileHandle { private: std::string path; + std::ios::openmode openMode; #ifdef WITH_UNREAL IFileHandle* unrealHandle; #else - std::unique_ptr stream; + // Thread-local cache: each thread gets its own fstream per path + static thread_local std::unordered_map> streamCache; + + // Get or create the fstream for this thread + std::fstream& getThreadStream(); + const std::fstream& getThreadStream() const; #endif public: @@ -24,15 +39,15 @@ public: const std::string& getPath() const { return path; } #ifndef WITH_UNREAL - std::fstream& getStream() { return *stream; } - const std::fstream& getStream() const { return *stream; } + std::fstream& getStream() { return getThreadStream(); } + const std::fstream& getStream() const { return getThreadStream(); } // Smart pointer-like interface - std::fstream* operator->() { return stream.get(); } - const std::fstream* operator->() const { return stream.get(); } + std::fstream* operator->() { return &getThreadStream(); } + const std::fstream* operator->() const { return &getThreadStream(); } - std::fstream& operator*() { return *stream; } - const std::fstream& operator*() const { return *stream; } + std::fstream& operator*() { return getThreadStream(); } + const std::fstream& operator*() const { return getThreadStream(); } #endif bool isOpen() const; @@ -48,26 +63,46 @@ public: #else // Inline for non-Windows to avoid overhead inline void flush() { - if (stream) { - stream->flush(); - } + getThreadStream().flush(); } inline void readSeek(std::streampos pos, std::ios::seekdir dir = std::ios::beg) { - stream->seekg(pos, dir); + getThreadStream().seekg(pos, dir); } inline void writeSeek(std::streampos pos, std::ios::seekdir dir = std::ios::beg) { - stream->seekp(pos, dir); + getThreadStream().seekp(pos, dir); } inline void readBytes(char* buffer, std::streamsize size) { - stream->read(buffer, size); + getThreadStream().read(buffer, size); } inline void writeBytes(const char* buffer, std::streamsize size) { - stream->write(buffer, size); + getThreadStream().write(buffer, size); } #endif - SweepstoreFileHandle(SweepstoreFileHandle&&) noexcept = default; - SweepstoreFileHandle& operator=(SweepstoreFileHandle&&) noexcept = default; + SweepstoreFileHandle(SweepstoreFileHandle&& other) noexcept + : path(std::move(other.path)) + , openMode(other.openMode) +#ifdef WITH_UNREAL + , unrealHandle(other.unrealHandle) +#endif + { +#ifdef WITH_UNREAL + other.unrealHandle = nullptr; +#endif + } + + SweepstoreFileHandle& operator=(SweepstoreFileHandle&& other) noexcept { + if (this != &other) { + close(); + path = std::move(other.path); + openMode = other.openMode; +#ifdef WITH_UNREAL + unrealHandle = other.unrealHandle; + other.unrealHandle = nullptr; +#endif + } + return *this; + } SweepstoreFileHandle(const SweepstoreFileHandle&) = delete; SweepstoreFileHandle& operator=(const SweepstoreFileHandle&) = delete; diff --git a/cpp/src/Public/sweepstore/utils/file_lock.h b/cpp/src/Public/sweepstore/utils/file_lock.h index c08809a..2b49037 100644 --- a/cpp/src/Public/sweepstore/utils/file_lock.h +++ b/cpp/src/Public/sweepstore/utils/file_lock.h @@ -1,159 +1,103 @@ #pragma once #include +#include +#include +#include #ifdef _WIN32 #include #else - #include #include #include + #include #endif +// Simple file lock using flock() with thread-local FD cache +// Each thread has its own FD, flock() is per-FD, so threads don't conflict +// Matches Dart's paradigm: each isolate has its own RandomAccessFile class SweepstoreFileLock { public: - enum class Mode { - Shared, - Exclusive - }; + enum class Mode { Shared, Exclusive }; private: -#ifdef _WIN32 - HANDLE handle; - OVERLAPPED overlapped; -#else - int fd; -#endif - std::string path; + std::string filePath; Mode mode; - bool locked; + bool locked = false; + + // Thread-local FD cache - each thread has its own FD per file + static thread_local std::unordered_map fdCache; + + static int getOrOpenFD(const std::string& path) { + auto it = fdCache.find(path); + if (it != fdCache.end()) { + return it->second; + } + + int fd = open(path.c_str(), O_RDWR); + if (fd == -1) { + throw std::runtime_error("Failed to open file for locking: " + path); + } + + fdCache[path] = fd; + return fd; + } void acquire() { -#ifdef _WIN32 - handle = CreateFileA(path.c_str(), GENERIC_READ | GENERIC_WRITE, - FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, OPEN_EXISTING, - FILE_ATTRIBUTE_NORMAL, NULL); + int fd = getOrOpenFD(filePath); + int operation = (mode == Mode::Exclusive) ? LOCK_EX : LOCK_SH; - if (handle == INVALID_HANDLE_VALUE) { - throw std::runtime_error("Failed to open file"); + if (flock(fd, operation) == -1) { + throw std::runtime_error("Failed to acquire file lock"); } - - memset(&overlapped, 0, sizeof(overlapped)); - DWORD flags = (mode == Mode::Exclusive) ? LOCKFILE_EXCLUSIVE_LOCK : 0; - - if (!LockFileEx(handle, flags, 0, MAXDWORD, MAXDWORD, &overlapped)) { - CloseHandle(handle); - throw std::runtime_error("Failed to lock"); - } -#else - fd = open(path.c_str(), O_RDWR); - if (fd == -1) throw std::runtime_error("Failed to open file"); - - int op = (mode == Mode::Exclusive) ? LOCK_EX : LOCK_SH; - if (flock(fd, op) != 0) { - close(fd); - throw std::runtime_error("Failed to lock"); - } -#endif locked = true; } void release() { if (locked) { -#ifdef _WIN32 - UnlockFileEx(handle, 0, MAXDWORD, MAXDWORD, &overlapped); - CloseHandle(handle); -#else + int fd = getOrOpenFD(filePath); flock(fd, LOCK_UN); - close(fd); -#endif locked = false; } } public: - SweepstoreFileLock(const std::string& p, Mode m) - : path(p), mode(m), locked(false) { -#ifdef _WIN32 - handle = INVALID_HANDLE_VALUE; - memset(&overlapped, 0, sizeof(overlapped)); -#else - fd = -1; -#endif - } + // Constructor accepts offset/length for API compatibility (unused with flock) + SweepstoreFileLock(const std::string& path, uint64_t, uint64_t, Mode m) + : filePath(path), mode(m) {} - ~SweepstoreFileLock() { - release(); - } + ~SweepstoreFileLock() { release(); } void lock() { - if (!locked) { - acquire(); - } + if (!locked) acquire(); } void unlock() { release(); } - // Check if THIS instance holds the lock - bool holdsLock() const { return locked; } - - // Check if the file is locked by ANYONE (including this instance) - bool isLocked() const { -#ifdef _WIN32 - HANDLE testHandle = CreateFileA(path.c_str(), GENERIC_READ | GENERIC_WRITE, - FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, OPEN_EXISTING, - FILE_ATTRIBUTE_NORMAL, NULL); - - if (testHandle == INVALID_HANDLE_VALUE) { - return false; // Can't even open file - } - - OVERLAPPED testOverlapped; - memset(&testOverlapped, 0, sizeof(testOverlapped)); - DWORD flags = (mode == Mode::Exclusive) ? LOCKFILE_EXCLUSIVE_LOCK : 0; - flags |= LOCKFILE_FAIL_IMMEDIATELY; // Non-blocking - - bool isLocked = !LockFileEx(testHandle, flags, 0, MAXDWORD, MAXDWORD, &testOverlapped); - - if (!isLocked) { - // We got the lock, release it - UnlockFileEx(testHandle, 0, MAXDWORD, MAXDWORD, &testOverlapped); - } - - CloseHandle(testHandle); - return isLocked; -#else - int testFd = open(path.c_str(), O_RDWR); - if (testFd == -1) { - return false; // Can't open file - } - - int op = (mode == Mode::Exclusive) ? LOCK_EX : LOCK_SH; - op |= LOCK_NB; // Non-blocking - - bool isLocked = (flock(testFd, op) != 0); - - if (!isLocked) { - // We got the lock, release it - flock(testFd, LOCK_UN); - } - - close(testFd); - return isLocked; -#endif + bool holdsLock() const { + return locked; } - SweepstoreFileLock(const SweepstoreFileLock&) = delete; - SweepstoreFileLock& operator=(const SweepstoreFileLock&) = delete; - SweepstoreFileLock(SweepstoreFileLock&&) = default; - SweepstoreFileLock& operator=(SweepstoreFileLock&&) = default; + // Check if file is currently locked (non-blocking test) + bool isLocked() const { + int fd = getOrOpenFD(filePath); + int operation = (mode == Mode::Exclusive) ? LOCK_EX : LOCK_SH; + // Try non-blocking lock + if (flock(fd, operation | LOCK_NB) == -1) { + return true; // Already locked + } + + // Got the lock, release immediately + flock(fd, LOCK_UN); + return false; + } + + // RAII helper for scoped locking class Scoped { - private: SweepstoreFileLock& lock; - public: Scoped(SweepstoreFileLock& l) : lock(l) { lock.lock(); @@ -166,4 +110,8 @@ public: Scoped(const Scoped&) = delete; Scoped& operator=(const Scoped&) = delete; }; -}; \ No newline at end of file + + // Disable copying + SweepstoreFileLock(const SweepstoreFileLock&) = delete; + SweepstoreFileLock& operator=(const SweepstoreFileLock&) = delete; +}; diff --git a/dart/lib/sweepstore.dart b/dart/lib/sweepstore.dart index b8aa30e..c3432c9 100644 --- a/dart/lib/sweepstore.dart +++ b/dart/lib/sweepstore.dart @@ -67,22 +67,25 @@ Future main() async { print(binaryDump(file.readAsBytesSync())); int iteration = 0; + int maxIterations = 16; print("Concurrent Workers: ${store._concurrencyHeader.numberOfWorkers}"); print("Stale Ticket Threshold: ${STALE_HEARTBEAT_THRESHOLD_MS}ms"); + int concurrencyTest = 1; + + while (true) { - int concurrencyTest = 256; final receivePort = ReceivePort(); int completedJobs = 0; - if (iteration > 0) { + if (iteration > maxIterations) { break; } final stopwatch = Stopwatch()..start(); - print('\x1B[95mStarting iteration #$iteration with $concurrencyTest concurrent jobs...\x1B[0m'); + // print('\x1B[95mStarting iteration #$iteration with $concurrencyTest concurrent jobs...\x1B[0m'); for (int i = 0; i < concurrencyTest; i++) { await Isolate.spawn((message) { final index = message['index'] as int; @@ -107,11 +110,10 @@ Future main() async { stopwatch.stop(); - print('\x1B[95mAll jobs completed!\x1B[0m'); - print('\x1B[95mTime taken: ${stopwatch.elapsedMilliseconds}ms (${stopwatch.elapsed.inSeconds}s)\x1B[0m'); - print(" "); + print("[$iteration/$maxIterations] Completed $concurrencyTest operation in ${stopwatch.elapsedMilliseconds} ms"); - // sleep(Duration(seconds: 2)); iteration++; + + concurrencyTest *= 2; } } \ No newline at end of file