diff --git a/cpp/src/Private/sweepstore/benchmark.cpp b/cpp/src/Private/sweepstore/benchmark.cpp index 63e8ec1..c8ba8ae 100644 --- a/cpp/src/Private/sweepstore/benchmark.cpp +++ b/cpp/src/Private/sweepstore/benchmark.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include "sweepstore/utils/helpers.h" @@ -26,8 +27,8 @@ // BENCHMARK CONFIGURATION // ============================================================================ const int SWEEPSTORE_CONCURRENT_WORKERS = 32; // Number of concurrent workers for Sweepstore -const int WORKER_THREAD_COUNT = 32; // Number of worker threads in the benchmark pool -const int BENCHMARK_ITERATIONS = 16; // Number of benchmark iterations +const int WORKER_THREAD_COUNT = 8; // Number of worker threads in the benchmark pool +const int BENCHMARK_ITERATIONS = 10; // Number of benchmark iterations const int INITIAL_CONCURRENT_WORKERS = 1; // Starting number of concurrent operations (doubles each iteration) const int ITERATION_DELAY_MS = 200; // Delay between iterations in milliseconds const int INITIAL_SLEEP_MS = 1000; // Initial sleep before benchmark starts @@ -41,15 +42,16 @@ int main() { std::string filePath = "./example.bin"; - Sweepstore sweepstore(filePath); - sweepstore.initialise(SWEEPSTORE_CONCURRENT_WORKERS); + // Use unique_ptr to control destruction timing + auto sweepstore = std::make_unique(filePath); + sweepstore->initialise(SWEEPSTORE_CONCURRENT_WORKERS); preciseSleep(std::chrono::milliseconds(INITIAL_SLEEP_MS)); std::vector fileData = loadFile(filePath); std::cout << binaryDump(fileData) << std::endl; - std::cout << "Concurrent Workers: " << sweepstore.getConcurrencyHeader()->readNumberOfWorkers() << std::endl; + std::cout << "Concurrent Workers: " << sweepstore->getConcurrencyHeader()->readNumberOfWorkers() << std::endl; std::cout << "Stale Ticket Threshold: " << STALE_HEARTBEAT_THRESHOLD_MS << std::endl; SweepstoreConcurrency::initialiseMasterAsync(filePath); @@ -107,7 +109,7 @@ int main() { std::unique_lock lock(queueMutex); for (int i = 0; i < concurrencyTest; i++) { taskQueue.push([i, &sweepstore, &completedJobs]() { - sweepstore["key_" + std::to_string(i)] = "value_" + std::to_string(i); + (*sweepstore)["key_" + std::to_string(i)] = "value_" + std::to_string(i); ++completedJobs; }); } @@ -141,8 +143,8 @@ int main() { worker.join(); } - // Flush main thread timing data and finalize output file - std::cout << "Main thread flushing..." << std::endl; + // Write timing data NOW while everything is still valid + std::cout << "Flushing timing data..." << std::endl; std::cout.flush(); SweepstoreTiming::flushThreadData(); @@ -150,12 +152,9 @@ int main() { std::cout.flush(); SweepstoreTiming::finalizeOutputFile(); - std::cout << "Exiting..." << std::endl; + std::cout << "Benchmark complete." << std::endl; std::cout.flush(); - // Small delay to ensure all file I/O completes - preciseSleep(std::chrono::milliseconds(100)); - - // Exit immediately without running destructors to avoid crashes - std::_Exit(0); + // Exit immediately to avoid thread-local destructors conflicting with detached master thread + _Exit(0); } \ No newline at end of file diff --git a/cpp/src/Private/sweepstore/header.cpp b/cpp/src/Private/sweepstore/header.cpp index e8c97ca..86098fc 100644 --- a/cpp/src/Private/sweepstore/header.cpp +++ b/cpp/src/Private/sweepstore/header.cpp @@ -155,9 +155,6 @@ void SweepstoreWorkerTicket::write(SweepstoreWorkerTicketSnapshot &snapshot) { SWEEPSTORE_TIME_FUNCTION(); RandomAccessMemory buffer; - - - buffer.setPositionSync(0); buffer.writeIntSync(snapshot.identifier, 4); buffer.writeIntSync(snapshot.workerHeartbeat, 4); @@ -177,9 +174,7 @@ void SweepstoreWorkerTicket::write(SweepstoreWorkerTicketSnapshot &snapshot) { std::vector data = buffer.readSync(buffer.length()); char* dataPtr = reinterpret_cast(data.data()); - // Write to file with byte-range lock (allows parallel access to different tickets) - SweepstoreFileLock lock(file.getPath(), getOffset(), TICKET_SIZE, SweepstoreFileLock::Mode::Exclusive); - SweepstoreFileLock::Scoped scopedLock(lock); + // Write to file (byte-range locking handled automatically by seekAndWrite) file.seekAndWrite(getOffset(), dataPtr, data.size()); file.flush(); } @@ -193,10 +188,7 @@ bool SweepstoreWorkerTicket::writable() { SweepstoreWorkerTicketSnapshot SweepstoreWorkerTicket::snapshot() { SWEEPSTORE_TIME_FUNCTION(); - // Byte-range lock for this ticket (allows parallel access to different tickets) - SweepstoreFileLock lock(file.getPath(), getOffset(), TICKET_SIZE, SweepstoreFileLock::Mode::Shared); - SweepstoreFileLock::Scoped scopedLock(lock); - + // Byte-range locking handled automatically by seekAndRead std::unique_ptr buffer(new char[TICKET_SIZE]); file.seekAndRead(getOffset(), buffer.get(), TICKET_SIZE); RandomAccessMemory ram(reinterpret_cast(buffer.get()), TICKET_SIZE); diff --git a/cpp/src/Private/sweepstore/utils/file_handle.cpp b/cpp/src/Private/sweepstore/utils/file_handle.cpp index 41f8563..ebf4cda 100644 --- a/cpp/src/Private/sweepstore/utils/file_handle.cpp +++ b/cpp/src/Private/sweepstore/utils/file_handle.cpp @@ -1,4 +1,5 @@ #include "sweepstore/utils/file_handle.h" +#include "sweepstore/utils/file_lock.h" #include "sweepstore/utils/timing.h" #include @@ -70,10 +71,17 @@ void SweepstoreFileHandle::seekAndRead(uint64_t offset, char* buffer, size_t siz unrealHandle->Seek(offset); unrealHandle->Read(reinterpret_cast(buffer), size); #else - std::lock_guard lock(streamMutex); - stream.seekg(offset, std::ios::beg); - if (stream.fail()) stream.clear(); - stream.read(buffer, size); + // Acquire byte-range lock (allows parallel access to different byte ranges) + SweepstoreFileLock rangeLock(path, offset, size, SweepstoreFileLock::Mode::Shared); + rangeLock.lock(); + + // Brief stream mutex only during actual I/O + { + std::lock_guard lock(streamMutex); + stream.seekg(offset, std::ios::beg); + if (stream.fail()) stream.clear(); + stream.read(buffer, size); + } #endif } @@ -85,11 +93,18 @@ void SweepstoreFileHandle::seekAndWrite(uint64_t offset, const char* buffer, siz unrealHandle->Write(reinterpret_cast(buffer), size); unrealHandle->Flush(); #else - std::lock_guard lock(streamMutex); - stream.seekp(offset, std::ios::beg); - if (stream.fail()) stream.clear(); - stream.write(buffer, size); - stream.flush(); + // Acquire byte-range lock (allows parallel access to different byte ranges) + SweepstoreFileLock rangeLock(path, offset, size, SweepstoreFileLock::Mode::Exclusive); + rangeLock.lock(); + + // Brief stream mutex only during actual I/O + { + std::lock_guard lock(streamMutex); + stream.seekp(offset, std::ios::beg); + if (stream.fail()) stream.clear(); + stream.write(buffer, size); + stream.flush(); + } #endif } diff --git a/cpp/src/Public/sweepstore/concurrency.h b/cpp/src/Public/sweepstore/concurrency.h index cc40ea6..c6aa674 100644 --- a/cpp/src/Public/sweepstore/concurrency.h +++ b/cpp/src/Public/sweepstore/concurrency.h @@ -8,7 +8,7 @@ #include "sweepstore/utils/timing.h" -#define STALE_HEARTBEAT_THRESHOLD_MS 250 +#define STALE_HEARTBEAT_THRESHOLD_MS 1000 enum SweepstoreTicketOperation : int; class SweepstoreFileHandle; diff --git a/cpp/src/Public/sweepstore/sweepstore.h b/cpp/src/Public/sweepstore/sweepstore.h index 1febfc9..28e77e2 100644 --- a/cpp/src/Public/sweepstore/sweepstore.h +++ b/cpp/src/Public/sweepstore/sweepstore.h @@ -55,6 +55,8 @@ public: sizeof(T), [this, key = this->key, &value]() { + + } ); }