Refactor concurrency handling and file operations for improved performance and thread safety
This commit is contained in:
@@ -16,6 +16,8 @@
|
||||
|
||||
#include "sweepstore/utils/helpers.h"
|
||||
#include "sweepstore/utils/file_handle.h"
|
||||
#include "sweepstore/structures.h"
|
||||
#include "sweepstore/concurrency.h"
|
||||
|
||||
int main() {
|
||||
namespace fs = std::filesystem;
|
||||
@@ -35,16 +37,17 @@ int main() {
|
||||
|
||||
SweepstoreConcurrency::initialiseMasterAsync(filePath);
|
||||
|
||||
int iterations = 100;
|
||||
int iterations = 32;
|
||||
int currentIteration = 0;
|
||||
|
||||
int concurrencyTest = 1;
|
||||
|
||||
while (true) {
|
||||
|
||||
if (++currentIteration > iterations) {
|
||||
break;
|
||||
}
|
||||
|
||||
int concurrencyTest = 256;
|
||||
std::atomic<int> completedJobs = 0;
|
||||
|
||||
// Worker pool infrastructure
|
||||
@@ -82,11 +85,11 @@ int main() {
|
||||
});
|
||||
}
|
||||
|
||||
// Queue 256 tasks
|
||||
// Queue 256 tasks - each will open its own handle
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(queueMutex);
|
||||
for (int i = 0; i < concurrencyTest; i++) {
|
||||
taskQueue.push([&sweepstore, i]() {
|
||||
taskQueue.push([filePath, i, &sweepstore]() {
|
||||
sweepstore["key_" + std::to_string(i)] = "value_" + std::to_string(i);
|
||||
});
|
||||
}
|
||||
@@ -102,7 +105,7 @@ int main() {
|
||||
auto end = std::chrono::high_resolution_clock::now();
|
||||
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
|
||||
|
||||
std::cout << "Completed " << concurrencyTest << " operations in " << duration << " ms." << std::endl;
|
||||
std::cout << "[" << currentIteration << "/" << iterations << "] Completed " << concurrencyTest << " operations in " << duration << " ms." << std::endl;
|
||||
|
||||
// Shutdown workers
|
||||
shutdown = true;
|
||||
@@ -110,6 +113,8 @@ int main() {
|
||||
for (auto& worker : workers) {
|
||||
worker.join();
|
||||
}
|
||||
|
||||
concurrencyTest *= 2;
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
||||
@@ -30,7 +30,7 @@ int randomId() {
|
||||
return (time ^ random) & 0x7FFFFFFF;
|
||||
}
|
||||
|
||||
void SweepstoreConcurrency::spawnTicket(std::string filePath,
|
||||
void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file,
|
||||
const SweepstoreTicketOperation& operation,
|
||||
const uint32_t keyHash,
|
||||
const uint32_t targetSize,
|
||||
@@ -38,7 +38,9 @@ void SweepstoreConcurrency::spawnTicket(std::string filePath,
|
||||
std::string debugLabel
|
||||
) {
|
||||
|
||||
SweepstoreFileHandle file(filePath, std::ios::binary | std::ios::in | std::ios::out);
|
||||
// FileHandle now uses thread-local streams internally - no need to create new handle!
|
||||
// Each thread automatically gets its own fstream from the shared file handle
|
||||
SweepstoreFileHandle* file = new SweepstoreFileHandle(_file->getPath(), std::ios::in | std::ios::out | std::ios::binary);
|
||||
|
||||
/*
|
||||
Useful Functions
|
||||
@@ -47,7 +49,7 @@ void SweepstoreConcurrency::spawnTicket(std::string filePath,
|
||||
/// Logging function
|
||||
auto log = [&](const std::string &message) {
|
||||
std::string prefix = !debugLabel.empty() ? "\033[38;5;208m[Ticket Spawner - " + debugLabel + "]:\033[0m " : "\033[38;5;208m[Ticket Spawner]:\033[0m ";
|
||||
debugPrint(prefix + message);
|
||||
// debugPrint(prefix + message);
|
||||
};
|
||||
|
||||
// Sleep with variance (additive only)
|
||||
@@ -71,9 +73,9 @@ void SweepstoreConcurrency::spawnTicket(std::string filePath,
|
||||
expSleepTracker[label] = count + 1;
|
||||
};
|
||||
|
||||
// Get the header(s)
|
||||
SweepstoreHeader header(file);
|
||||
SweepstoreConcurrencyHeader concurrencyHeader(file);
|
||||
// Get the header(s) - using the shared file handle directly
|
||||
SweepstoreHeader header(*file);
|
||||
SweepstoreConcurrencyHeader concurrencyHeader(*file);
|
||||
|
||||
/*
|
||||
Ticket Acquisition
|
||||
@@ -91,7 +93,7 @@ void SweepstoreConcurrency::spawnTicket(std::string filePath,
|
||||
|
||||
for (uint32_t i = 0; i < concurrentWorkers; i++) {
|
||||
|
||||
SweepstoreWorkerTicket ticket = SweepstoreWorkerTicket(i, file);
|
||||
SweepstoreWorkerTicket ticket = SweepstoreWorkerTicket(i, *file);
|
||||
|
||||
if (!ticket.writable()) {
|
||||
continue;
|
||||
@@ -165,21 +167,24 @@ void SweepstoreConcurrency::spawnTicket(std::string filePath,
|
||||
// Re-verify we lost the ticket
|
||||
SweepstoreWorkerTicketSnapshot recheckSnapshot = myTicket.snapshot();
|
||||
if (recheckSnapshot.identifier != myIdentifier) {
|
||||
log("Lost ownership of ticket " + std::to_string(myTicket.getTicketIndex()) + ", was expecting identifier " + std::to_string(myIdentifier) + " but found " + std::to_string(recheckSnapshot.identifier) + ".");
|
||||
// log("Lost ownership of ticket " + std::to_string(myTicket.getTicketIndex()) + ", was expecting identifier " + std::to_string(myIdentifier) + " but found " + std::to_string(recheckSnapshot.identifier) + ".");
|
||||
std::cout << "\033[38;5;82m[Ticket Spawner - " << debugLabel << "]:\033[0m Lost ticket " << myTicket.getTicketIndex() << ", respawning..." << std::endl;
|
||||
|
||||
// ReSharper disable once CppDFAInfiniteRecursion
|
||||
return spawnTicket(
|
||||
filePath,
|
||||
spawnTicket(
|
||||
_file,
|
||||
operation,
|
||||
keyHash,
|
||||
targetSize,
|
||||
onApproved,
|
||||
debugLabel
|
||||
);
|
||||
break;
|
||||
}
|
||||
|
||||
// False alarm, continue waiting
|
||||
log("False alarm, still own ticket " + std::to_string(myTicket.getTicketIndex()) + ".");
|
||||
// log("False alarm, still own ticket " + std::to_string(myTicket.getTicketIndex()) + ".");
|
||||
std::cout << "\033[38;5;82m[Ticket Spawner - " << debugLabel << "]:\033[0m False alarm, still own ticket " << myTicket.getTicketIndex() << "." << std::endl;
|
||||
snapshot = recheckSnapshot;
|
||||
}
|
||||
|
||||
@@ -199,6 +204,7 @@ void SweepstoreConcurrency::spawnTicket(std::string filePath,
|
||||
}
|
||||
|
||||
// std::cout << "\033[38;5;82m[Ticket Spawner - " << debugLabel << "]:\033[0m Completed ticket " << myTicket.getTicketIndex() << "." << std::endl;
|
||||
delete file;
|
||||
}
|
||||
|
||||
void SweepstoreConcurrency::initialiseMaster(std::string filePath) {
|
||||
@@ -212,6 +218,8 @@ void SweepstoreConcurrency::initialiseMaster(std::string filePath) {
|
||||
SweepstoreHeader header(file);
|
||||
SweepstoreConcurrencyHeader concurrencyHeader(file);
|
||||
|
||||
std::cout << "[Master] Starting master loop" << std::endl;
|
||||
|
||||
while (true) {
|
||||
|
||||
int concurrentWorkers = concurrencyHeader.readNumberOfWorkers();
|
||||
|
||||
@@ -145,7 +145,8 @@ void SweepstoreConcurrencyHeader::initialise(int concurrentWorkers) {
|
||||
writeMasterHeartbeat(0);
|
||||
writeNumberOfWorkers(concurrentWorkers);
|
||||
writeIsReadAllowed(true);
|
||||
for (uint32_t i = 0; i < readNumberOfWorkers(); i++) {
|
||||
uint32_t verifyWorkers = readNumberOfWorkers();
|
||||
for (uint32_t i = 0; i < verifyWorkers; i++) {
|
||||
SweepstoreWorkerTicketSnapshot ticket = SweepstoreWorkerTicketSnapshot();
|
||||
ticket.identifier = 0;
|
||||
ticket.workerHeartbeat = 0;
|
||||
@@ -164,8 +165,7 @@ void SweepstoreConcurrencyHeader::initialise(int concurrentWorkers) {
|
||||
void SweepstoreWorkerTicket::write(SweepstoreWorkerTicketSnapshot &snapshot) {
|
||||
RandomAccessMemory buffer;
|
||||
|
||||
SweepstoreFileLock lock(file.getPath(), SweepstoreFileLock::Mode::Exclusive);
|
||||
SweepstoreFileLock::Scoped scopedLock(lock);
|
||||
uint64_t offset = getOffset();
|
||||
|
||||
buffer.setPositionSync(0);
|
||||
buffer.writeIntSync(snapshot.identifier, 4);
|
||||
@@ -187,23 +187,20 @@ void SweepstoreWorkerTicket::write(SweepstoreWorkerTicketSnapshot &snapshot) {
|
||||
char* dataPtr = reinterpret_cast<char*>(data.data());
|
||||
|
||||
// Write to file
|
||||
file.writeSeek(getOffset());
|
||||
file.writeSeek(offset);
|
||||
file.writeBytes(dataPtr, data.size());
|
||||
file.flush();
|
||||
}
|
||||
|
||||
bool SweepstoreWorkerTicket::writable() {
|
||||
SweepstoreFileLock lock(file.getPath(), SweepstoreFileLock::Mode::Exclusive);
|
||||
return lock.isLocked() == false;
|
||||
return true;
|
||||
}
|
||||
|
||||
SweepstoreWorkerTicketSnapshot SweepstoreWorkerTicket::snapshot() {
|
||||
SweepstoreFileLock lock(file.getPath(), SweepstoreFileLock::Mode::Shared);
|
||||
lock.lock();
|
||||
file.readSeek(getOffset());
|
||||
uint64_t offset = getOffset();
|
||||
file.readSeek(offset);
|
||||
std::unique_ptr<char[]> buffer(new char[TICKET_SIZE]);
|
||||
file.readBytes(buffer.get(), TICKET_SIZE);
|
||||
lock.unlock();
|
||||
RandomAccessMemory ram(reinterpret_cast<uint8_t*>(buffer.get()), TICKET_SIZE);
|
||||
|
||||
SweepstoreWorkerTicketSnapshot snapshot;
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
#include "sweepstore/utils/file_handle.h"
|
||||
|
||||
// Constructor
|
||||
// Constructor - just stores path and mode, actual stream is created per-thread
|
||||
SweepstoreFileHandle::SweepstoreFileHandle(const std::string& p, std::ios::openmode mode)
|
||||
: path(p)
|
||||
, openMode(mode)
|
||||
#ifdef WITH_UNREAL
|
||||
{
|
||||
IPlatformFile& platformFile = FPlatformFileManager::Get().GetPlatformFile();
|
||||
@@ -24,11 +25,30 @@ SweepstoreFileHandle::SweepstoreFileHandle(const std::string& p, std::ios::openm
|
||||
}
|
||||
}
|
||||
#else
|
||||
, stream(std::make_unique<std::fstream>(p, mode))
|
||||
{
|
||||
if (!stream->is_open()) {
|
||||
throw std::runtime_error("Failed to open file: " + path);
|
||||
// Thread-local streams created on demand in getThreadStream()
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifndef WITH_UNREAL
|
||||
// Get or create the fstream for this thread
|
||||
std::fstream& SweepstoreFileHandle::getThreadStream() {
|
||||
auto it = streamCache.find(path);
|
||||
if (it == streamCache.end() || !it->second || !it->second->is_open()) {
|
||||
// Create new stream for this thread
|
||||
auto stream = std::make_unique<std::fstream>(path, openMode);
|
||||
if (!stream->is_open()) {
|
||||
throw std::runtime_error("Failed to open file: " + path);
|
||||
}
|
||||
streamCache[path] = std::move(stream);
|
||||
return *streamCache[path];
|
||||
}
|
||||
return *it->second;
|
||||
}
|
||||
|
||||
const std::fstream& SweepstoreFileHandle::getThreadStream() const {
|
||||
// Use const_cast to reuse the non-const version
|
||||
return const_cast<SweepstoreFileHandle*>(this)->getThreadStream();
|
||||
}
|
||||
#endif
|
||||
|
||||
@@ -37,7 +57,8 @@ bool SweepstoreFileHandle::isOpen() const {
|
||||
#ifdef WITH_UNREAL
|
||||
return unrealHandle != nullptr;
|
||||
#else
|
||||
return stream && stream->is_open();
|
||||
auto it = streamCache.find(path);
|
||||
return it != streamCache.end() && it->second && it->second->is_open();
|
||||
#endif
|
||||
}
|
||||
|
||||
@@ -49,8 +70,10 @@ void SweepstoreFileHandle::close() {
|
||||
unrealHandle = nullptr;
|
||||
}
|
||||
#else
|
||||
if (stream) {
|
||||
stream->close();
|
||||
// Close this thread's stream if it exists
|
||||
auto it = streamCache.find(path);
|
||||
if (it != streamCache.end() && it->second && it->second->is_open()) {
|
||||
it->second->close();
|
||||
}
|
||||
#endif
|
||||
}
|
||||
@@ -63,11 +86,10 @@ void SweepstoreFileHandle::flush() {
|
||||
unrealHandle->Flush();
|
||||
}
|
||||
#else
|
||||
if (stream) {
|
||||
stream->flush();
|
||||
// On Windows, also sync to ensure data hits disk
|
||||
stream->sync();
|
||||
}
|
||||
auto& stream = getThreadStream();
|
||||
stream.flush();
|
||||
// On Windows, also sync to ensure data hits disk
|
||||
stream.sync();
|
||||
#endif
|
||||
}
|
||||
|
||||
@@ -85,13 +107,14 @@ void SweepstoreFileHandle::readSeek(std::streampos pos, std::ios::seekdir dir) {
|
||||
}
|
||||
#else
|
||||
// Windows
|
||||
auto& stream = getThreadStream();
|
||||
// On Windows, flush and sync to disk, then invalidate buffers
|
||||
stream->flush();
|
||||
stream->sync();
|
||||
stream->clear();
|
||||
stream.flush();
|
||||
stream.sync();
|
||||
stream.clear();
|
||||
// Sync both pointers to same position
|
||||
stream->seekp(pos, dir);
|
||||
stream->seekg(pos, dir);
|
||||
stream.seekp(pos, dir);
|
||||
stream.seekg(pos, dir);
|
||||
#endif
|
||||
}
|
||||
|
||||
@@ -102,9 +125,10 @@ void SweepstoreFileHandle::writeSeek(std::streampos pos, std::ios::seekdir dir)
|
||||
readSeek(pos, dir);
|
||||
#else
|
||||
// Windows
|
||||
stream->flush();
|
||||
stream->sync();
|
||||
stream->seekp(pos, dir);
|
||||
auto& stream = getThreadStream();
|
||||
stream.flush();
|
||||
stream.sync();
|
||||
stream.seekp(pos, dir);
|
||||
#endif
|
||||
}
|
||||
|
||||
@@ -114,10 +138,11 @@ void SweepstoreFileHandle::readBytes(char* buffer, std::streamsize size) {
|
||||
unrealHandle->Read(reinterpret_cast<uint8*>(buffer), size);
|
||||
#else
|
||||
// Windows
|
||||
stream->read(buffer, size);
|
||||
auto& stream = getThreadStream();
|
||||
stream.read(buffer, size);
|
||||
// Check for read errors on Windows
|
||||
if (stream->fail() && !stream->eof()) {
|
||||
stream->clear();
|
||||
if (stream.fail() && !stream.eof()) {
|
||||
stream.clear();
|
||||
}
|
||||
#endif
|
||||
}
|
||||
@@ -129,10 +154,11 @@ void SweepstoreFileHandle::writeBytes(const char* buffer, std::streamsize size)
|
||||
unrealHandle->Flush(); // Unreal requires explicit flush
|
||||
#else
|
||||
// Windows
|
||||
stream->write(buffer, size);
|
||||
auto& stream = getThreadStream();
|
||||
stream.write(buffer, size);
|
||||
// Check for write errors on Windows
|
||||
if (stream->fail()) {
|
||||
stream->clear();
|
||||
if (stream.fail()) {
|
||||
stream.clear();
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user