Add concurrency handling implementation with ticket management and file locking

This commit is contained in:
ImBenji
2025-12-02 14:20:33 +00:00
parent eae4d0e24e
commit b6237a32d4

View File

@@ -0,0 +1,116 @@
//
// Created by Benjamin Watt on 02/12/2025.
//
#include "sweepstore/sweepstore.h"
#include <string>
#include <filesystem>
#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <vector>
#include "sweepstore/utils/helpers.h"
#include "sweepstore/utils/file_handle.h"
int main() {
namespace fs = std::filesystem;
std::string filePath = "./example.bin";
Sweepstore sweepstore(filePath);
sweepstore.initialise(32);
preciseSleep(std::chrono::milliseconds(1000));
std::vector<uint8_t> fileData = loadFile(filePath);
std::cout << binaryDump(fileData) << std::endl;
std::cout << "Concurrent Workers: " << sweepstore.getConcurrencyHeader()->readNumberOfWorkers() << std::endl;
std::cout << "Stale Ticket Threshold: " << STALE_HEARTBEAT_THRESHOLD_MS << std::endl;
SweepstoreConcurrency::initialiseMasterAsync(filePath);
int iterations = 1;
int currentIteration = 0;
while (true) {
if (++currentIteration > iterations) {
break;
}
int concurrencyTest = 256;
std::atomic<int> completedJobs = 0;
// Worker pool infrastructure
std::queue<std::function<void()>> taskQueue;
std::mutex queueMutex;
std::condition_variable queueCV;
std::condition_variable completionCV;
std::atomic<bool> shutdown{false};
auto start = std::chrono::high_resolution_clock::now();
// Create 32 persistent worker threads
std::vector<std::thread> workers;
for (int i = 0; i < 32; i++) {
workers.emplace_back([&]() {
while (!shutdown) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queueMutex);
queueCV.wait(lock, [&]{ return !taskQueue.empty() || shutdown; });
if (shutdown && taskQueue.empty()) return;
if (!taskQueue.empty()) {
task = std::move(taskQueue.front());
taskQueue.pop();
}
}
if (task) {
task();
int completed = ++completedJobs;
if (completed == concurrencyTest) {
completionCV.notify_one();
}
}
}
});
}
// Queue 256 tasks
{
std::unique_lock<std::mutex> lock(queueMutex);
for (int i = 0; i < concurrencyTest; i++) {
taskQueue.push([&sweepstore, i]() {
sweepstore["key_" + std::to_string(i)] = "value_" + std::to_string(i);
});
}
}
queueCV.notify_all();
// Wait for completion
{
std::unique_lock<std::mutex> lock(queueMutex);
completionCV.wait(lock, [&]{ return completedJobs >= concurrencyTest; });
}
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
std::cout << "Completed " << concurrencyTest << " operations in " << duration << " ms." << std::endl;
// Shutdown workers
shutdown = true;
queueCV.notify_all();
for (auto& worker : workers) {
worker.join();
}
}
return 0;
}