Reduce benchmark iterations and optimize worker thread management for improved performance

This commit is contained in:
ImBenji
2025-12-05 10:10:17 +00:00
parent 4bace6f681
commit 1084d62b9c

View File

@@ -37,63 +37,63 @@ int main() {
SweepstoreConcurrency::initialiseMasterAsync(filePath); SweepstoreConcurrency::initialiseMasterAsync(filePath);
int iterations = 32; int iterations = 16;
int currentIteration = 0; int currentIteration = 0;
int concurrencyTest = 1; int concurrencyTest = 1;
// Worker pool infrastructure - created once and reused
std::queue<std::function<void()>> taskQueue;
std::mutex queueMutex;
std::condition_variable queueCV;
std::condition_variable completionCV;
std::atomic<bool> shutdown{false};
std::atomic<int> completedJobs{0};
// Create 32 persistent worker threads BEFORE timing
std::vector<std::thread> workers;
for (int i = 0; i < 32; i++) {
workers.emplace_back([&]() {
while (!shutdown) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queueMutex);
queueCV.wait(lock, [&]{ return !taskQueue.empty() || shutdown; });
if (shutdown && taskQueue.empty()) return;
if (!taskQueue.empty()) {
task = std::move(taskQueue.front());
taskQueue.pop();
}
}
if (task) {
task();
completionCV.notify_one();
}
}
});
}
while (true) { while (true) {
if (++currentIteration > iterations) { if (++currentIteration > iterations) {
break; break;
} }
std::atomic<int> completedJobs = 0; completedJobs = 0;
// Worker pool infrastructure // Queue tasks
std::queue<std::function<void()>> taskQueue;
std::mutex queueMutex;
std::condition_variable queueCV;
std::condition_variable completionCV;
std::atomic<bool> shutdown{false};
auto start = std::chrono::high_resolution_clock::now();
// Create 32 persistent worker threads
std::vector<std::thread> workers;
for (int i = 0; i < 32; i++) {
workers.emplace_back([&]() {
while (!shutdown) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queueMutex);
queueCV.wait(lock, [&]{ return !taskQueue.empty() || shutdown; });
if (shutdown && taskQueue.empty()) return;
if (!taskQueue.empty()) {
task = std::move(taskQueue.front());
taskQueue.pop();
}
}
if (task) {
task();
int completed = ++completedJobs;
if (completed == concurrencyTest) {
completionCV.notify_one();
}
}
}
});
}
// Queue 256 tasks - each will open its own handle
{ {
std::unique_lock<std::mutex> lock(queueMutex); std::unique_lock<std::mutex> lock(queueMutex);
for (int i = 0; i < concurrencyTest; i++) { for (int i = 0; i < concurrencyTest; i++) {
taskQueue.push([filePath, i, &sweepstore]() { taskQueue.push([i, &sweepstore, &completedJobs]() {
sweepstore["key_" + std::to_string(i)] = "value_" + std::to_string(i); sweepstore["key_" + std::to_string(i)] = "value_" + std::to_string(i);
++completedJobs;
}); });
} }
} }
// Start timing JUST before notifying workers
auto start = std::chrono::high_resolution_clock::now();
queueCV.notify_all(); queueCV.notify_all();
// Wait for completion // Wait for completion
@@ -107,15 +107,15 @@ int main() {
std::cout << "[" << currentIteration << "/" << iterations << "] Completed " << concurrencyTest << " operations in " << duration << " ms." << std::endl; std::cout << "[" << currentIteration << "/" << iterations << "] Completed " << concurrencyTest << " operations in " << duration << " ms." << std::endl;
// Shutdown workers
shutdown = true;
queueCV.notify_all();
for (auto& worker : workers) {
worker.join();
}
concurrencyTest *= 2; concurrencyTest *= 2;
} }
// Shutdown workers after all iterations
shutdown = true;
queueCV.notify_all();
for (auto& worker : workers) {
worker.join();
}
return 0; return 0;
} }