From b6237a32d413589c3f01f5da1fceb8fa19ee6834 Mon Sep 17 00:00:00 2001 From: ImBenji Date: Tue, 2 Dec 2025 14:20:33 +0000 Subject: [PATCH] Add concurrency handling implementation with ticket management and file locking --- cpp/src/Private/sweepstore/benchmark.cpp | 116 +++++++++++++++++++++++ 1 file changed, 116 insertions(+) create mode 100644 cpp/src/Private/sweepstore/benchmark.cpp diff --git a/cpp/src/Private/sweepstore/benchmark.cpp b/cpp/src/Private/sweepstore/benchmark.cpp new file mode 100644 index 0000000..c94f864 --- /dev/null +++ b/cpp/src/Private/sweepstore/benchmark.cpp @@ -0,0 +1,116 @@ +// +// Created by Benjamin Watt on 02/12/2025. +// + +#include "sweepstore/sweepstore.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "sweepstore/utils/helpers.h" +#include "sweepstore/utils/file_handle.h" + +int main() { + namespace fs = std::filesystem; + + std::string filePath = "./example.bin"; + + Sweepstore sweepstore(filePath); + sweepstore.initialise(32); + + preciseSleep(std::chrono::milliseconds(1000)); + + std::vector fileData = loadFile(filePath); + std::cout << binaryDump(fileData) << std::endl; + + std::cout << "Concurrent Workers: " << sweepstore.getConcurrencyHeader()->readNumberOfWorkers() << std::endl; + std::cout << "Stale Ticket Threshold: " << STALE_HEARTBEAT_THRESHOLD_MS << std::endl; + + SweepstoreConcurrency::initialiseMasterAsync(filePath); + + int iterations = 1; + int currentIteration = 0; + + while (true) { + + if (++currentIteration > iterations) { + break; + } + + int concurrencyTest = 256; + std::atomic completedJobs = 0; + + // Worker pool infrastructure + std::queue> taskQueue; + std::mutex queueMutex; + std::condition_variable queueCV; + std::condition_variable completionCV; + std::atomic shutdown{false}; + + auto start = std::chrono::high_resolution_clock::now(); + + // Create 32 persistent worker threads + std::vector workers; + for (int i = 0; i < 32; i++) { + workers.emplace_back([&]() { + while (!shutdown) { + std::function task; + { + std::unique_lock lock(queueMutex); + queueCV.wait(lock, [&]{ return !taskQueue.empty() || shutdown; }); + if (shutdown && taskQueue.empty()) return; + if (!taskQueue.empty()) { + task = std::move(taskQueue.front()); + taskQueue.pop(); + } + } + if (task) { + task(); + int completed = ++completedJobs; + if (completed == concurrencyTest) { + completionCV.notify_one(); + } + } + } + }); + } + + // Queue 256 tasks + { + std::unique_lock lock(queueMutex); + for (int i = 0; i < concurrencyTest; i++) { + taskQueue.push([&sweepstore, i]() { + sweepstore["key_" + std::to_string(i)] = "value_" + std::to_string(i); + }); + } + } + queueCV.notify_all(); + + // Wait for completion + { + std::unique_lock lock(queueMutex); + completionCV.wait(lock, [&]{ return completedJobs >= concurrencyTest; }); + } + + auto end = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast(end - start).count(); + + std::cout << "Completed " << concurrencyTest << " operations in " << duration << " ms." << std::endl; + + // Shutdown workers + shutdown = true; + queueCV.notify_all(); + for (auto& worker : workers) { + worker.join(); + } + } + + return 0; +} \ No newline at end of file