diff --git a/cpp/src/Private/sweepstore/benchmark.cpp b/cpp/src/Private/sweepstore/benchmark.cpp index bc4bd7a..3b104e3 100644 --- a/cpp/src/Private/sweepstore/benchmark.cpp +++ b/cpp/src/Private/sweepstore/benchmark.cpp @@ -37,63 +37,63 @@ int main() { SweepstoreConcurrency::initialiseMasterAsync(filePath); - int iterations = 32; + int iterations = 16; int currentIteration = 0; int concurrencyTest = 1; + // Worker pool infrastructure - created once and reused + std::queue> taskQueue; + std::mutex queueMutex; + std::condition_variable queueCV; + std::condition_variable completionCV; + std::atomic shutdown{false}; + std::atomic completedJobs{0}; + + // Create 32 persistent worker threads BEFORE timing + std::vector workers; + for (int i = 0; i < 32; i++) { + workers.emplace_back([&]() { + while (!shutdown) { + std::function task; + { + std::unique_lock lock(queueMutex); + queueCV.wait(lock, [&]{ return !taskQueue.empty() || shutdown; }); + if (shutdown && taskQueue.empty()) return; + if (!taskQueue.empty()) { + task = std::move(taskQueue.front()); + taskQueue.pop(); + } + } + if (task) { + task(); + completionCV.notify_one(); + } + } + }); + } + while (true) { if (++currentIteration > iterations) { break; } - std::atomic completedJobs = 0; + completedJobs = 0; - // Worker pool infrastructure - std::queue> taskQueue; - std::mutex queueMutex; - std::condition_variable queueCV; - std::condition_variable completionCV; - std::atomic shutdown{false}; - - auto start = std::chrono::high_resolution_clock::now(); - - // Create 32 persistent worker threads - std::vector workers; - for (int i = 0; i < 32; i++) { - workers.emplace_back([&]() { - while (!shutdown) { - std::function task; - { - std::unique_lock lock(queueMutex); - queueCV.wait(lock, [&]{ return !taskQueue.empty() || shutdown; }); - if (shutdown && taskQueue.empty()) return; - if (!taskQueue.empty()) { - task = std::move(taskQueue.front()); - taskQueue.pop(); - } - } - if (task) { - task(); - int completed = ++completedJobs; - if (completed == concurrencyTest) { - completionCV.notify_one(); - } - } - } - }); - } - - // Queue 256 tasks - each will open its own handle + // Queue tasks { std::unique_lock lock(queueMutex); for (int i = 0; i < concurrencyTest; i++) { - taskQueue.push([filePath, i, &sweepstore]() { + taskQueue.push([i, &sweepstore, &completedJobs]() { sweepstore["key_" + std::to_string(i)] = "value_" + std::to_string(i); + ++completedJobs; }); } } + + // Start timing JUST before notifying workers + auto start = std::chrono::high_resolution_clock::now(); queueCV.notify_all(); // Wait for completion @@ -107,15 +107,15 @@ int main() { std::cout << "[" << currentIteration << "/" << iterations << "] Completed " << concurrencyTest << " operations in " << duration << " ms." << std::endl; - // Shutdown workers - shutdown = true; - queueCV.notify_all(); - for (auto& worker : workers) { - worker.join(); - } - concurrencyTest *= 2; } + // Shutdown workers after all iterations + shutdown = true; + queueCV.notify_all(); + for (auto& worker : workers) { + worker.join(); + } + return 0; } \ No newline at end of file