Refactor benchmark configuration and improve file handling with shared streams

This commit is contained in:
ImBenji
2025-12-13 16:45:06 +00:00
parent c97f36cfb6
commit bdd1fab997
9 changed files with 249 additions and 465 deletions

View File

@@ -28,7 +28,6 @@ add_executable(main
src/Private/sweepstore/concurrency.cpp src/Private/sweepstore/concurrency.cpp
src/Public/sweepstore/utils/file_lock.h src/Public/sweepstore/utils/file_lock.h
src/Private/sweepstore/utils/file_lock.cpp src/Private/sweepstore/utils/file_lock.cpp
src/Private/sweepstore/utils/fd_pool.cpp
src/Public/sweepstore/utils/file_handle.h src/Public/sweepstore/utils/file_handle.h
src/Private/sweepstore/utils/file_handle.cpp src/Private/sweepstore/utils/file_handle.cpp
src/Public/sweepstore/header.h src/Public/sweepstore/header.h

View File

@@ -22,6 +22,17 @@
#include "sweepstore/structures.h" #include "sweepstore/structures.h"
#include "sweepstore/concurrency.h" #include "sweepstore/concurrency.h"
// ============================================================================
// BENCHMARK CONFIGURATION
// ============================================================================
const int SWEEPSTORE_CONCURRENT_WORKERS = 32; // Number of concurrent workers for Sweepstore
const int WORKER_THREAD_COUNT = 32; // Number of worker threads in the benchmark pool
const int BENCHMARK_ITERATIONS = 16; // Number of benchmark iterations
const int INITIAL_CONCURRENT_WORKERS = 1; // Starting number of concurrent operations (doubles each iteration)
const int ITERATION_DELAY_MS = 200; // Delay between iterations in milliseconds
const int INITIAL_SLEEP_MS = 1000; // Initial sleep before benchmark starts
// ============================================================================
int main() { int main() {
namespace fs = std::filesystem; namespace fs = std::filesystem;
@@ -31,9 +42,9 @@ int main() {
std::string filePath = "./example.bin"; std::string filePath = "./example.bin";
Sweepstore sweepstore(filePath); Sweepstore sweepstore(filePath);
sweepstore.initialise(32); sweepstore.initialise(SWEEPSTORE_CONCURRENT_WORKERS);
preciseSleep(std::chrono::milliseconds(1000)); preciseSleep(std::chrono::milliseconds(INITIAL_SLEEP_MS));
std::vector<uint8_t> fileData = loadFile(filePath); std::vector<uint8_t> fileData = loadFile(filePath);
std::cout << binaryDump(fileData) << std::endl; std::cout << binaryDump(fileData) << std::endl;
@@ -43,10 +54,8 @@ int main() {
SweepstoreConcurrency::initialiseMasterAsync(filePath); SweepstoreConcurrency::initialiseMasterAsync(filePath);
int iterations = 16;
int currentIteration = 0; int currentIteration = 0;
int concurrencyTest = INITIAL_CONCURRENT_WORKERS;
int concurrencyTest = 1;
// Worker pool infrastructure - created once and reused // Worker pool infrastructure - created once and reused
std::queue<std::function<void()>> taskQueue; std::queue<std::function<void()>> taskQueue;
@@ -56,9 +65,9 @@ int main() {
std::atomic<bool> shutdown{false}; std::atomic<bool> shutdown{false};
std::atomic<int> completedJobs{0}; std::atomic<int> completedJobs{0};
// Create 32 persistent worker threads BEFORE timing // Create persistent worker threads BEFORE timing
std::vector<std::thread> workers; std::vector<std::thread> workers;
for (int i = 0; i < 32; i++) { for (int i = 0; i < WORKER_THREAD_COUNT; i++) {
workers.emplace_back([&]() { workers.emplace_back([&]() {
while (!shutdown) { while (!shutdown) {
std::function<void()> task; std::function<void()> task;
@@ -87,7 +96,7 @@ int main() {
while (true) { while (true) {
if (++currentIteration > iterations) { if (++currentIteration > BENCHMARK_ITERATIONS) {
break; break;
} }
@@ -117,12 +126,12 @@ int main() {
auto end = std::chrono::high_resolution_clock::now(); auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count(); auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
std::cout << "[" << currentIteration << "/" << iterations << "] Completed " << concurrencyTest << " operations in " << duration << " ms." << std::endl; std::cout << "[" << currentIteration << "/" << BENCHMARK_ITERATIONS << "] Completed " << concurrencyTest << " operations in " << duration << " ms." << std::endl;
concurrencyTest *= 2; concurrencyTest *= 2;
// Wait between iterations // Wait between iterations
preciseSleep(std::chrono::milliseconds(200)); preciseSleep(std::chrono::milliseconds(ITERATION_DELAY_MS));
} }
// Shutdown workers after all iterations // Shutdown workers after all iterations

View File

@@ -31,7 +31,7 @@ int randomId() {
return (time ^ random) & 0x7FFFFFFF; return (time ^ random) & 0x7FFFFFFF;
} }
void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file, void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* file,
const SweepstoreTicketOperation& operation, const SweepstoreTicketOperation& operation,
const uint32_t keyHash, const uint32_t keyHash,
const uint32_t targetSize, const uint32_t targetSize,
@@ -42,7 +42,7 @@ void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file,
// FileHandle now uses thread-local streams internally - no need to create new handle! // FileHandle now uses thread-local streams internally - no need to create new handle!
// Each thread automatically gets its own fstream from the shared file handle // Each thread automatically gets its own fstream from the shared file handle
SweepstoreFileHandle* file = new SweepstoreFileHandle(_file->getPath(), std::ios::in | std::ios::out | std::ios::binary); // SweepstoreFileHandle* file = new SweepstoreFileHandle(_file->getPath(), std::ios::in | std::ios::out | std::ios::binary);
/* /*
Useful Functions Useful Functions
@@ -52,8 +52,10 @@ void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file,
auto log = [&](const std::string &message) { auto log = [&](const std::string &message) {
std::string prefix = !debugLabel.empty() ? "\033[38;5;208m[Ticket Spawner - " + debugLabel + "]:\033[0m " : "\033[38;5;208m[Ticket Spawner]:\033[0m "; std::string prefix = !debugLabel.empty() ? "\033[38;5;208m[Ticket Spawner - " + debugLabel + "]:\033[0m " : "\033[38;5;208m[Ticket Spawner]:\033[0m ";
// debugPrint(prefix + message); // debugPrint(prefix + message);
// std::cout << prefix << message << std::endl;
}; };
// Sleep with variance (additive only) // Sleep with variance (additive only)
auto varySleep = [&](std::chrono::nanoseconds minSleepDuration, std::chrono::nanoseconds variance) { auto varySleep = [&](std::chrono::nanoseconds minSleepDuration, std::chrono::nanoseconds variance) {
// SWEEPSTORE_TIME_SCOPE("Varying Sleep"); // SWEEPSTORE_TIME_SCOPE("Varying Sleep");
@@ -93,9 +95,9 @@ void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file,
uint32_t ticketIndex = -1u; uint32_t ticketIndex = -1u;
while (true) { uint32_t concurrentWorkers = concurrencyHeader.readNumberOfWorkers();
uint32_t concurrentWorkers = concurrencyHeader.readNumberOfWorkers(); while (true) {
for (uint32_t i = 0; i < concurrentWorkers; i++) { for (uint32_t i = 0; i < concurrentWorkers; i++) {
@@ -114,6 +116,11 @@ void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file,
bool is_free = snapshot.state == SweepstoreTicketState::FREE; bool is_free = snapshot.state == SweepstoreTicketState::FREE;
if ((identifier_unassigned && is_free) || stale_heartbeat) { if ((identifier_unassigned && is_free) || stale_heartbeat) {
if (i >= 32) {
std::cout << "What the actual fuck" << std::endl;
}
SWEEPSTORE_TIME_SCOPE("Claim Ticket"); SWEEPSTORE_TIME_SCOPE("Claim Ticket");
snapshot.identifier = newIdentifier; snapshot.identifier = newIdentifier;
snapshot.workerHeartbeat = millisecondsSinceEpoch32(); snapshot.workerHeartbeat = millisecondsSinceEpoch32();
@@ -156,20 +163,14 @@ void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file,
// Wait for approval // Wait for approval
while (true) { while (true) {
auto start = std::chrono::high_resolution_clock::now();
SweepstoreWorkerTicketSnapshot snapshot = myTicket.snapshot(); SweepstoreWorkerTicketSnapshot snapshot = myTicket.snapshot();
// Update heartbeat
uint32_t currentTime = millisecondsSinceEpoch32();
if (currentTime - snapshot.workerHeartbeat > STALE_HEARTBEAT_THRESHOLD_MS / 5) {
snapshot.workerHeartbeat = currentTime;
myTicket.write(snapshot);
}
// Check if we still own the ticket // Check if we still own the ticket
if (snapshot.identifier != myIdentifier) { if (snapshot.identifier != myIdentifier) {
preciseSleep(std::chrono::milliseconds(10)); preciseSleep(std::chrono::milliseconds(2));
// Re-verify we lost the ticket // Re-verify we lost the ticket
SweepstoreWorkerTicketSnapshot recheckSnapshot = myTicket.snapshot(); SweepstoreWorkerTicketSnapshot recheckSnapshot = myTicket.snapshot();
@@ -179,7 +180,7 @@ void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file,
// ReSharper disable once CppDFAInfiniteRecursion // ReSharper disable once CppDFAInfiniteRecursion
spawnTicket( spawnTicket(
_file, file,
operation, operation,
keyHash, keyHash,
targetSize, targetSize,
@@ -195,6 +196,16 @@ void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file,
snapshot = recheckSnapshot; snapshot = recheckSnapshot;
} }
// Update heartbeat
uint32_t currentTime = millisecondsSinceEpoch32();
uint32_t sinceLastHeartbeat = currentTime - snapshot.workerHeartbeat;
snapshot.workerHeartbeat = currentTime;
myTicket.write(snapshot);
if (sinceLastHeartbeat > STALE_HEARTBEAT_THRESHOLD_MS / 5) {
std::cout << "\033[39;5;82m[Ticket Spawner - " << debugLabel << "]:\033[0m Heartbeat updated for ticket " << myTicket.getTicketIndex() << "." << std::endl;
}
if (snapshot.state == SweepstoreTicketState::APPROVED) { if (snapshot.state == SweepstoreTicketState::APPROVED) {
snapshot.state = SweepstoreTicketState::EXECUTING; snapshot.state = SweepstoreTicketState::EXECUTING;
myTicket.write(snapshot); myTicket.write(snapshot);
@@ -208,10 +219,15 @@ void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file,
} }
varySleep(std::chrono::microseconds(500), std::chrono::microseconds(200)); varySleep(std::chrono::microseconds(500), std::chrono::microseconds(200));
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
// std::cout << "Loop duration: " << duration << " ms." << std::endl;
} }
// std::cout << "\033[38;5;82m[Ticket Spawner - " << debugLabel << "]:\033[0m Completed ticket " << myTicket.getTicketIndex() << "." << std::endl; // std::cout << "\033[38;5;82m[Ticket Spawner - " << debugLabel << "]:\033[0m Completed ticket " << myTicket.getTicketIndex() << "." << std::endl;
delete file; // delete file;
} }
void SweepstoreConcurrency::initialiseMaster(std::string filePath) { void SweepstoreConcurrency::initialiseMaster(std::string filePath) {

View File

@@ -6,9 +6,8 @@
#include "sweepstore/utils/timing.h" #include "sweepstore/utils/timing.h"
std::string SweepstoreHeader::readMagicNumber() { std::string SweepstoreHeader::readMagicNumber() {
file.readSeek(0, std::ios::beg);
char buffer[4]; char buffer[4];
file.readBytes(buffer, 4); file.seekAndRead(0, buffer, 4);
return std::string(buffer, 4); return std::string(buffer, 4);
} }
@@ -16,14 +15,12 @@ void SweepstoreHeader::writeMagicNumber(const std::string& magicNumber) {
if (magicNumber.size() != 4) { if (magicNumber.size() != 4) {
throw std::invalid_argument("Magic number must be exactly 4 characters long."); throw std::invalid_argument("Magic number must be exactly 4 characters long.");
} }
file.writeSeek(0, std::ios::beg); file.seekAndWrite(0, magicNumber.c_str(), 4);
file.writeBytes(magicNumber.c_str(), 4);
} }
std::string SweepstoreHeader::readVersion() { std::string SweepstoreHeader::readVersion() {
file.readSeek(4, std::ios::beg);
char buffer[12]; char buffer[12];
file.readBytes(buffer, 12); file.seekAndRead(4, buffer, 12);
// Trim leading and trailing spaces // Trim leading and trailing spaces
std::string version(buffer, 12); std::string version(buffer, 12);
@@ -41,46 +38,39 @@ void SweepstoreHeader::writeVersion(const std::string& version) {
std::string paddedVersion = " " + version; std::string paddedVersion = " " + version;
paddedVersion.resize(12, ' '); paddedVersion.resize(12, ' ');
file.writeSeek(4, std::ios::beg); file.seekAndWrite(4, paddedVersion.c_str(), 12);
file.writeBytes(paddedVersion.c_str(), 12);
} }
SweepstorePointer SweepstoreHeader::readAddressTablePointer() { SweepstorePointer SweepstoreHeader::readAddressTablePointer() {
file.readSeek(16, std::ios::beg);
int64_t address; int64_t address;
file.readBytes(reinterpret_cast<char*>(&address), sizeof(address)); file.seekAndRead(16, reinterpret_cast<char*>(&address), sizeof(address));
return address; // Implicit conversion to SweepstorePointer return address; // Implicit conversion to SweepstorePointer
} }
void SweepstoreHeader::writeAddressTablePointer(const SweepstorePointer& ptr) { void SweepstoreHeader::writeAddressTablePointer(const SweepstorePointer& ptr) {
file.writeSeek(16, std::ios::beg);
int64_t address = ptr; int64_t address = ptr;
file.writeBytes(reinterpret_cast<const char*>(&address), sizeof(address)); file.seekAndWrite(16, reinterpret_cast<const char*>(&address), sizeof(address));
} }
uint32_t SweepstoreHeader::readFreeListCount() { uint32_t SweepstoreHeader::readFreeListCount() {
file.readSeek(24, std::ios::beg);
uint32_t count; uint32_t count;
file.readBytes(reinterpret_cast<char*>(&count), sizeof(count)); file.seekAndRead(24, reinterpret_cast<char*>(&count), sizeof(count));
return count; return count;
} }
void SweepstoreHeader::writeFreeListCount(uint32_t count) { void SweepstoreHeader::writeFreeListCount(uint32_t count) {
file.writeSeek(24, std::ios::beg); file.seekAndWrite(24, reinterpret_cast<const char*>(&count), sizeof(count));
file.writeBytes(reinterpret_cast<const char*>(&count), sizeof(count));
} }
bool SweepstoreHeader::readIsFreeListLifted() { bool SweepstoreHeader::readIsFreeListLifted() {
file.readSeek(28, std::ios::beg);
char flag; char flag;
file.readBytes(&flag, sizeof(flag)); file.seekAndRead(28, &flag, sizeof(flag));
return flag != 0; return flag != 0;
} }
void SweepstoreHeader::writeIsFreeListLifted(bool isLifted) { void SweepstoreHeader::writeIsFreeListLifted(bool isLifted) {
file.writeSeek(28, std::ios::beg);
char flag = isLifted ? 1 : 0; char flag = isLifted ? 1 : 0;
file.writeBytes(&flag, sizeof(flag)); file.seekAndWrite(28, &flag, sizeof(flag));
} }
void SweepstoreHeader::initialise() { void SweepstoreHeader::initialise() {
@@ -93,53 +83,44 @@ void SweepstoreHeader::initialise() {
} }
uint64_t SweepstoreConcurrencyHeader::readMasterIdentifier() { uint64_t SweepstoreConcurrencyHeader::readMasterIdentifier() {
file.readSeek(29, std::ios::beg);
uint64_t identifier; uint64_t identifier;
file.readBytes(reinterpret_cast<char*>(&identifier), sizeof(identifier)); file.seekAndRead(29, reinterpret_cast<char*>(&identifier), sizeof(identifier));
return identifier; return identifier;
} }
void SweepstoreConcurrencyHeader::writeMasterIdentifier(uint64_t identifier) { void SweepstoreConcurrencyHeader::writeMasterIdentifier(uint64_t identifier) {
file.writeSeek(29, std::ios::beg); file.seekAndWrite(29, reinterpret_cast<const char*>(&identifier), sizeof(identifier));
file.writeBytes(reinterpret_cast<const char*>(&identifier), sizeof(identifier));
} }
uint32_t SweepstoreConcurrencyHeader::readMasterHeartbeat() { uint32_t SweepstoreConcurrencyHeader::readMasterHeartbeat() {
file.readSeek(37, std::ios::beg);
uint32_t heartbeat; uint32_t heartbeat;
file.readBytes(reinterpret_cast<char*>(&heartbeat), sizeof(heartbeat)); file.seekAndRead(37, reinterpret_cast<char*>(&heartbeat), sizeof(heartbeat));
return heartbeat; return heartbeat;
} }
void SweepstoreConcurrencyHeader::writeMasterHeartbeat(uint32_t heartbeat) { void SweepstoreConcurrencyHeader::writeMasterHeartbeat(uint32_t heartbeat) {
file.writeSeek(37, std::ios::beg); file.seekAndWrite(37, reinterpret_cast<const char*>(&heartbeat), sizeof(heartbeat));
file.writeBytes(reinterpret_cast<const char*>(&heartbeat), sizeof(heartbeat));
} }
uint32_t SweepstoreConcurrencyHeader::readNumberOfWorkers() { uint32_t SweepstoreConcurrencyHeader::readNumberOfWorkers() {
SWEEPSTORE_TIME_FUNCTION();
file.readSeek(41, std::ios::beg);
uint32_t numWorkers; uint32_t numWorkers;
file.readBytes(reinterpret_cast<char*>(&numWorkers), sizeof(numWorkers)); file.seekAndRead(41, reinterpret_cast<char*>(&numWorkers), sizeof(numWorkers));
return numWorkers; return numWorkers;
} }
void SweepstoreConcurrencyHeader::writeNumberOfWorkers(uint32_t numWorkers) { void SweepstoreConcurrencyHeader::writeNumberOfWorkers(uint32_t numWorkers) {
file.writeSeek(41, std::ios::beg); file.seekAndWrite(41, reinterpret_cast<const char*>(&numWorkers), sizeof(numWorkers));
file.writeBytes(reinterpret_cast<const char*>(&numWorkers), sizeof(numWorkers));
} }
bool SweepstoreConcurrencyHeader::readIsReadAllowed() { bool SweepstoreConcurrencyHeader::readIsReadAllowed() {
file.readSeek(45, std::ios::beg);
char flag; char flag;
file.readBytes(&flag, sizeof(flag)); file.seekAndRead(45, &flag, sizeof(flag));
return flag != 0; return flag != 0;
} }
void SweepstoreConcurrencyHeader::writeIsReadAllowed(bool isAllowed) { void SweepstoreConcurrencyHeader::writeIsReadAllowed(bool isAllowed) {
file.writeSeek(45, std::ios::beg);
char flag = isAllowed ? 1 : 0; char flag = isAllowed ? 1 : 0;
file.writeBytes(&flag, sizeof(flag)); file.seekAndWrite(45, &flag, sizeof(flag));
} }
void SweepstoreConcurrencyHeader::initialise(int concurrentWorkers) { void SweepstoreConcurrencyHeader::initialise(int concurrentWorkers) {
@@ -149,6 +130,11 @@ void SweepstoreConcurrencyHeader::initialise(int concurrentWorkers) {
writeNumberOfWorkers(concurrentWorkers); writeNumberOfWorkers(concurrentWorkers);
writeIsReadAllowed(true); writeIsReadAllowed(true);
uint32_t verifyWorkers = readNumberOfWorkers(); uint32_t verifyWorkers = readNumberOfWorkers();
if (verifyWorkers != static_cast<uint32_t>(concurrentWorkers)) {
throw std::runtime_error("Failed to verify number of concurrent workers in concurrency header. Expected " + std::to_string(concurrentWorkers) + ", got " + std::to_string(verifyWorkers) + ".");
}
for (uint32_t i = 0; i < verifyWorkers; i++) { for (uint32_t i = 0; i < verifyWorkers; i++) {
SweepstoreWorkerTicketSnapshot ticket = SweepstoreWorkerTicketSnapshot(); SweepstoreWorkerTicketSnapshot ticket = SweepstoreWorkerTicketSnapshot();
ticket.identifier = 0; ticket.identifier = 0;
@@ -191,29 +177,28 @@ void SweepstoreWorkerTicket::write(SweepstoreWorkerTicketSnapshot &snapshot) {
std::vector<uint8_t> data = buffer.readSync(buffer.length()); std::vector<uint8_t> data = buffer.readSync(buffer.length());
char* dataPtr = reinterpret_cast<char*>(data.data()); char* dataPtr = reinterpret_cast<char*>(data.data());
// Write to file // Write to file with byte-range lock (allows parallel access to different tickets)
SweepstoreFileLock lock(file.getPath(), getOffset(), TICKET_SIZE, SweepstoreFileLock::Mode::Exclusive); SweepstoreFileLock lock(file.getPath(), getOffset(), TICKET_SIZE, SweepstoreFileLock::Mode::Exclusive);
lock.lock(); SweepstoreFileLock::Scoped scopedLock(lock);
file.writeSeek(getOffset()); file.seekAndWrite(getOffset(), dataPtr, data.size());
file.writeBytes(dataPtr, data.size());
lock.unlock();
file.flush(); file.flush();
} }
bool SweepstoreWorkerTicket::writable() { bool SweepstoreWorkerTicket::writable() {
SWEEPSTORE_TIME_FUNCTION(); SWEEPSTORE_TIME_FUNCTION();
SweepstoreFileLock lock(file.getPath(), getOffset(), TICKET_SIZE, SweepstoreFileLock::Mode::Exclusive); // SweepstoreFileLock lock(file.getPath(), getOffset(), TICKET_SIZE, SweepstoreFileLock::Mode::Exclusive);
return true; return true;
} }
SweepstoreWorkerTicketSnapshot SweepstoreWorkerTicket::snapshot() { SweepstoreWorkerTicketSnapshot SweepstoreWorkerTicket::snapshot() {
SWEEPSTORE_TIME_FUNCTION(); SWEEPSTORE_TIME_FUNCTION();
// Byte-range lock for this ticket (allows parallel access to different tickets)
SweepstoreFileLock lock(file.getPath(), getOffset(), TICKET_SIZE, SweepstoreFileLock::Mode::Shared); SweepstoreFileLock lock(file.getPath(), getOffset(), TICKET_SIZE, SweepstoreFileLock::Mode::Shared);
lock.lock(); SweepstoreFileLock::Scoped scopedLock(lock);
file.readSeek(getOffset());
std::unique_ptr<char[]> buffer(new char[TICKET_SIZE]); std::unique_ptr<char[]> buffer(new char[TICKET_SIZE]);
file.readBytes(buffer.get(), TICKET_SIZE); file.seekAndRead(getOffset(), buffer.get(), TICKET_SIZE);
lock.unlock();
RandomAccessMemory ram(reinterpret_cast<uint8_t*>(buffer.get()), TICKET_SIZE); RandomAccessMemory ram(reinterpret_cast<uint8_t*>(buffer.get()), TICKET_SIZE);
SweepstoreWorkerTicketSnapshot snapshot; SweepstoreWorkerTicketSnapshot snapshot;

View File

@@ -1,6 +0,0 @@
#include "sweepstore/utils/file_handle.h"
// Thread-local stream cache definition for file handles
#ifndef WITH_UNREAL
thread_local std::unordered_map<std::string, std::unique_ptr<std::fstream>> SweepstoreFileHandle::streamCache;
#endif

View File

@@ -1,7 +1,8 @@
#include "sweepstore/utils/file_handle.h" #include "sweepstore/utils/file_handle.h"
#include "sweepstore/utils/timing.h" #include "sweepstore/utils/timing.h"
#include <iostream>
// Constructor - just stores path and mode, actual stream is created per-thread // Constructor
SweepstoreFileHandle::SweepstoreFileHandle(const std::string& p, std::ios::openmode mode) SweepstoreFileHandle::SweepstoreFileHandle(const std::string& p, std::ios::openmode mode)
: path(p) : path(p)
, openMode(mode) , openMode(mode)
@@ -27,29 +28,14 @@ SweepstoreFileHandle::SweepstoreFileHandle(const std::string& p, std::ios::openm
} }
#else #else
{ {
// Thread-local streams created on demand in getThreadStream() // Open the single shared stream
} stream.open(path, openMode);
#endif if (!stream.is_open()) {
throw std::runtime_error("Failed to open file: " + path);
#ifndef WITH_UNREAL
// Get or create the fstream for this thread
std::fstream& SweepstoreFileHandle::getThreadStream() {
auto it = streamCache.find(path);
if (it == streamCache.end() || !it->second || !it->second->is_open()) {
// Create new stream for this thread
auto stream = std::make_unique<std::fstream>(path, openMode);
if (!stream->is_open()) {
throw std::runtime_error("Failed to open file: " + path);
}
streamCache[path] = std::move(stream);
return *streamCache[path];
} }
return *it->second;
}
const std::fstream& SweepstoreFileHandle::getThreadStream() const { // Disable stream buffering for cache coherency across threads
// Use const_cast to reuse the non-const version stream.rdbuf()->pubsetbuf(0, 0);
return const_cast<SweepstoreFileHandle*>(this)->getThreadStream();
} }
#endif #endif
@@ -58,8 +44,7 @@ bool SweepstoreFileHandle::isOpen() const {
#ifdef WITH_UNREAL #ifdef WITH_UNREAL
return unrealHandle != nullptr; return unrealHandle != nullptr;
#else #else
auto it = streamCache.find(path); return stream.is_open();
return it != streamCache.end() && it->second && it->second->is_open();
#endif #endif
} }
@@ -71,15 +56,43 @@ void SweepstoreFileHandle::close() {
unrealHandle = nullptr; unrealHandle = nullptr;
} }
#else #else
// Close this thread's stream if it exists std::lock_guard<std::mutex> lock(streamMutex);
auto it = streamCache.find(path); if (stream.is_open()) {
if (it != streamCache.end() && it->second && it->second->is_open()) { stream.close();
it->second->close();
} }
#endif #endif
} }
#if defined(_WIN32) || defined(WITH_UNREAL) // seekAndRead
void SweepstoreFileHandle::seekAndRead(uint64_t offset, char* buffer, size_t size) {
SWEEPSTORE_TIME_FUNCTION();
#ifdef WITH_UNREAL
unrealHandle->Seek(offset);
unrealHandle->Read(reinterpret_cast<uint8*>(buffer), size);
#else
std::lock_guard<std::mutex> lock(streamMutex);
stream.seekg(offset, std::ios::beg);
if (stream.fail()) stream.clear();
stream.read(buffer, size);
#endif
}
// seekAndWrite
void SweepstoreFileHandle::seekAndWrite(uint64_t offset, const char* buffer, size_t size) {
SWEEPSTORE_TIME_FUNCTION();
#ifdef WITH_UNREAL
unrealHandle->Seek(offset);
unrealHandle->Write(reinterpret_cast<const uint8*>(buffer), size);
unrealHandle->Flush();
#else
std::lock_guard<std::mutex> lock(streamMutex);
stream.seekp(offset, std::ios::beg);
if (stream.fail()) stream.clear();
stream.write(buffer, size);
stream.flush();
#endif
}
// flush // flush
void SweepstoreFileHandle::flush() { void SweepstoreFileHandle::flush() {
#ifdef WITH_UNREAL #ifdef WITH_UNREAL
@@ -87,13 +100,11 @@ void SweepstoreFileHandle::flush() {
unrealHandle->Flush(); unrealHandle->Flush();
} }
#else #else
// Windows-specific implementation for guaranteed flush to disk std::lock_guard<std::mutex> lock(streamMutex);
auto& stream = getThreadStream();
stream.flush(); stream.flush();
// On Windows, also call sync to push to OS buffers // On Windows, also sync to disk
// Then open a Windows HANDLE to the same file and call FlushFileBuffers #ifdef _WIN32
// This is more reliable than trying to extract the HANDLE from fstream
HANDLE h = CreateFileA( HANDLE h = CreateFileA(
path.c_str(), path.c_str(),
GENERIC_WRITE, GENERIC_WRITE,
@@ -109,66 +120,41 @@ void SweepstoreFileHandle::flush() {
CloseHandle(h); CloseHandle(h);
} }
#endif #endif
#endif
} }
// readSeek // Move constructor
void SweepstoreFileHandle::readSeek(std::streampos pos, std::ios::seekdir dir) { SweepstoreFileHandle::SweepstoreFileHandle(SweepstoreFileHandle&& other) noexcept
: path(std::move(other.path))
, openMode(other.openMode)
#ifdef WITH_UNREAL #ifdef WITH_UNREAL
// Unreal doesn't have separate read/write pointers, so just seek , unrealHandle(other.unrealHandle)
int64 unrealPos = static_cast<int64>(pos); {
if (dir == std::ios::beg) { other.unrealHandle = nullptr;
unrealHandle->Seek(unrealPos); }
} else if (dir == std::ios::cur) { #else
unrealHandle->Seek(unrealHandle->Tell() + unrealPos); , stream(std::move(other.stream))
} else if (dir == std::ios::end) { {
unrealHandle->SeekFromEnd(unrealPos); }
#endif
// Move assignment
SweepstoreFileHandle& SweepstoreFileHandle::operator=(SweepstoreFileHandle&& other) noexcept {
if (this != &other) {
close();
path = std::move(other.path);
openMode = other.openMode;
#ifdef WITH_UNREAL
unrealHandle = other.unrealHandle;
other.unrealHandle = nullptr;
#else
stream = std::move(other.stream);
#endif
} }
#else return *this;
// Windows - simplified to only seek read pointer
auto& stream = getThreadStream();
stream.seekg(pos, dir);
if (stream.fail()) {
stream.clear();
}
#endif
} }
// writeSeek // Destructor
void SweepstoreFileHandle::writeSeek(std::streampos pos, std::ios::seekdir dir) { SweepstoreFileHandle::~SweepstoreFileHandle() {
#ifdef WITH_UNREAL close();
// Same as readSeek for Unreal
readSeek(pos, dir);
#else
// Windows - simplified to only seek write pointer
auto& stream = getThreadStream();
stream.seekp(pos, dir);
if (stream.fail()) {
stream.clear();
}
#endif
} }
// readBytes
void SweepstoreFileHandle::readBytes(char* buffer, std::streamsize size) {
#ifdef WITH_UNREAL
unrealHandle->Read(reinterpret_cast<uint8*>(buffer), size);
#else
// Windows
auto& stream = getThreadStream();
stream.read(buffer, size);
#endif
}
// writeBytes
void SweepstoreFileHandle::writeBytes(const char* buffer, std::streamsize size) {
SWEEPSTORE_TIME_FUNCTION();
#ifdef WITH_UNREAL
unrealHandle->Write(reinterpret_cast<const uint8*>(buffer), size);
unrealHandle->Flush(); // Unreal requires explicit flush
#else
// Windows
auto& stream = getThreadStream();
stream.write(buffer, size);
#endif
}
#endif // _WIN32 || WITH_UNREAL

View File

@@ -1,10 +1,4 @@
#include "sweepstore/utils/file_lock.h" #include "sweepstore/utils/file_lock.h"
// Define thread-local static members // Implementation is entirely in the header (inline and static members)
thread_local std::map<SweepstoreFileLock::LockKey, SweepstoreFileLock::Mode> SweepstoreFileLock::activeLocks; // This file exists to satisfy CMake build requirements
#ifdef _WIN32
thread_local std::unordered_map<std::string, HANDLE> SweepstoreFileLock::handleCache;
#else
thread_local std::unordered_map<std::string, int> SweepstoreFileLock::fdCache;
#endif

View File

@@ -2,9 +2,7 @@
#include <fstream> #include <fstream>
#include <string> #include <string>
#include <memory> #include <mutex>
#include <iostream>
#include <unordered_map>
#ifdef _WIN32 #ifdef _WIN32
#include <windows.h> #include <windows.h>
@@ -22,15 +20,15 @@ class SweepstoreFileHandle {
private: private:
std::string path; std::string path;
std::ios::openmode openMode; std::ios::openmode openMode;
#ifdef WITH_UNREAL #ifdef WITH_UNREAL
IFileHandle* unrealHandle; IFileHandle* unrealHandle;
#else #else
// Thread-local cache: each thread gets its own fstream per path // Single shared stream for all threads
static thread_local std::unordered_map<std::string, std::unique_ptr<std::fstream>> streamCache; std::fstream stream;
// Get or create the fstream for this thread // Mutex protecting the stream
std::fstream& getThreadStream(); std::mutex streamMutex;
const std::fstream& getThreadStream() const;
#endif #endif
public: public:
@@ -38,76 +36,23 @@ public:
const std::string& getPath() const { return path; } const std::string& getPath() const { return path; }
#ifndef WITH_UNREAL
std::fstream& getStream() { return getThreadStream(); }
const std::fstream& getStream() const { return getThreadStream(); }
// Smart pointer-like interface
std::fstream* operator->() { return &getThreadStream(); }
const std::fstream* operator->() const { return &getThreadStream(); }
std::fstream& operator*() { return getThreadStream(); }
const std::fstream& operator*() const { return getThreadStream(); }
#endif
bool isOpen() const; bool isOpen() const;
void close(); void close();
// Windows-compatible I/O wrappers // Main I/O API - atomic seek+read/write operations
#if defined(_WIN32) || defined(WITH_UNREAL) void seekAndRead(uint64_t offset, char* buffer, size_t size);
void seekAndWrite(uint64_t offset, const char* buffer, size_t size);
// Explicit flush
void flush(); void flush();
void readSeek(std::streampos pos, std::ios::seekdir dir = std::ios::beg);
void writeSeek(std::streampos pos, std::ios::seekdir dir = std::ios::beg);
void readBytes(char* buffer, std::streamsize size);
void writeBytes(const char* buffer, std::streamsize size);
#else
// Inline for non-Windows to avoid overhead
inline void flush() {
getThreadStream().flush();
}
inline void readSeek(std::streampos pos, std::ios::seekdir dir = std::ios::beg) {
getThreadStream().seekg(pos, dir);
}
inline void writeSeek(std::streampos pos, std::ios::seekdir dir = std::ios::beg) {
getThreadStream().seekp(pos, dir);
}
inline void readBytes(char* buffer, std::streamsize size) {
getThreadStream().read(buffer, size);
}
inline void writeBytes(const char* buffer, std::streamsize size) {
getThreadStream().write(buffer, size);
}
#endif
SweepstoreFileHandle(SweepstoreFileHandle&& other) noexcept // Move semantics
: path(std::move(other.path)) SweepstoreFileHandle(SweepstoreFileHandle&& other) noexcept;
, openMode(other.openMode) SweepstoreFileHandle& operator=(SweepstoreFileHandle&& other) noexcept;
#ifdef WITH_UNREAL
, unrealHandle(other.unrealHandle)
#endif
{
#ifdef WITH_UNREAL
other.unrealHandle = nullptr;
#endif
}
SweepstoreFileHandle& operator=(SweepstoreFileHandle&& other) noexcept {
if (this != &other) {
close();
path = std::move(other.path);
openMode = other.openMode;
#ifdef WITH_UNREAL
unrealHandle = other.unrealHandle;
other.unrealHandle = nullptr;
#endif
}
return *this;
}
// Delete copy semantics
SweepstoreFileHandle(const SweepstoreFileHandle&) = delete; SweepstoreFileHandle(const SweepstoreFileHandle&) = delete;
SweepstoreFileHandle& operator=(const SweepstoreFileHandle&) = delete; SweepstoreFileHandle& operator=(const SweepstoreFileHandle&) = delete;
~SweepstoreFileHandle() { ~SweepstoreFileHandle();
close(); };
}
};

View File

@@ -2,43 +2,46 @@
#include <string> #include <string>
#include <cstdint> #include <cstdint>
#include <unordered_map> #include <mutex>
#include <stdexcept> #include <condition_variable>
#include <map> #include <map>
#include <vector>
#include <algorithm>
#include <memory>
#include "sweepstore/utils/timing.h" // C++ level byte-range locking
// Allows Thread A (ticket 5) and Thread B (ticket 10) to work in parallel on different byte ranges
#ifdef _WIN32 // Uses static shared state to coordinate locks across all SweepstoreFileLock instances
#include <windows.h>
#else
#include <fcntl.h>
#include <unistd.h>
#include <sys/file.h>
#endif
// Simple file lock using flock() with thread-local FD cache
// Each thread has its own FD, flock() is per-FD, so threads don't conflict
// Matches Dart's paradigm: each isolate has its own RandomAccessFile
class SweepstoreFileLock { class SweepstoreFileLock {
public: public:
enum class Mode { Shared, Exclusive }; enum class Mode { Shared, Exclusive };
private: private:
// Key: file path + offset, Value: Mode struct LockRange {
struct LockKey {
std::string path;
uint64_t offset; uint64_t offset;
uint64_t length; uint64_t length;
Mode mode;
bool operator<(const LockKey& other) const { uint64_t end() const { return offset + length; }
if (path != other.path) return path < other.path;
if (offset != other.offset) return offset < other.offset; bool overlaps(uint64_t otherOffset, uint64_t otherLength) const {
return length < other.length; uint64_t otherEnd = otherOffset + otherLength;
return offset < otherEnd && otherOffset < end();
} }
}; };
// Track active locks per thread to prevent self-deadlock // Static shared state for all locks across all instances
static thread_local std::map<LockKey, Mode> activeLocks; struct SharedLockState {
std::mutex mutex;
std::condition_variable cv;
// Map: file path -> list of active lock ranges
std::map<std::string, std::vector<LockRange>> activeLocks;
};
static SharedLockState& getSharedState() {
static SharedLockState state;
return state;
}
std::string filePath; std::string filePath;
uint64_t offset; uint64_t offset;
@@ -46,183 +49,79 @@ private:
Mode mode; Mode mode;
bool locked = false; bool locked = false;
#ifdef _WIN32 // Check if acquiring this lock would conflict with existing locks
// Thread-local HANDLE cache for Windows bool wouldConflict(const std::vector<LockRange>& existingLocks) const {
static thread_local std::unordered_map<std::string, HANDLE> handleCache; for (const auto& existing : existingLocks) {
if (existing.overlaps(offset, length)) {
static HANDLE getOrOpenHandle(const std::string& path) { // Conflict if either lock is exclusive
auto it = handleCache.find(path); if (mode == Mode::Exclusive || existing.mode == Mode::Exclusive) {
if (it != handleCache.end()) { return true;
return it->second; }
// Shared locks don't conflict with each other
}
} }
return false;
HANDLE handle = CreateFileA(
path.c_str(),
GENERIC_READ | GENERIC_WRITE,
FILE_SHARE_READ | FILE_SHARE_WRITE,
NULL,
OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL,
NULL
);
if (handle == INVALID_HANDLE_VALUE) {
throw std::runtime_error("Failed to open file for locking: " + path);
}
handleCache[path] = handle;
return handle;
} }
void acquire() { void acquire() {
LockKey key{filePath, offset, length}; auto& state = getSharedState();
std::unique_lock<std::mutex> lock(state.mutex);
// Check if we already hold a lock on this region // Wait until no conflicts
auto it = activeLocks.find(key); state.cv.wait(lock, [&]() {
if (it != activeLocks.end()) { auto it = state.activeLocks.find(filePath);
// If we're trying to upgrade from shared to exclusive, release first if (it == state.activeLocks.end()) {
if (it->second == Mode::Shared && mode == Mode::Exclusive) { return true; // No locks on this file yet
releaseInternal(); // Release the old shared lock
activeLocks.erase(it);
// Small delay to allow OS to process the unlock before re-locking
// This prevents deadlock when multiple threads upgrade simultaneously
Sleep(1);
} else {
// Already hold compatible or same lock
locked = true;
return;
} }
} return !wouldConflict(it->second);
});
HANDLE handle = getOrOpenHandle(filePath); // Add our lock
OVERLAPPED overlapped = {}; // Proper zero-initialization state.activeLocks[filePath].push_back({offset, length, mode});
overlapped.Offset = static_cast<DWORD>(offset & 0xFFFFFFFF);
overlapped.OffsetHigh = static_cast<DWORD>(offset >> 32);
DWORD length_low = static_cast<DWORD>(length & 0xFFFFFFFF);
DWORD length_high = static_cast<DWORD>(length >> 32);
DWORD flags = (mode == Mode::Exclusive) ? LOCKFILE_EXCLUSIVE_LOCK : 0;
if (!LockFileEx(handle, flags, 0, length_low, length_high, &overlapped)) {
throw std::runtime_error("Failed to acquire file lock");
}
locked = true; locked = true;
activeLocks[key] = mode;
}
void releaseInternal() {
if (locked) {
HANDLE handle = getOrOpenHandle(filePath);
OVERLAPPED overlapped = {};
overlapped.Offset = static_cast<DWORD>(offset & 0xFFFFFFFF);
overlapped.OffsetHigh = static_cast<DWORD>(offset >> 32);
DWORD length_low = static_cast<DWORD>(length & 0xFFFFFFFF);
DWORD length_high = static_cast<DWORD>(length >> 32);
UnlockFileEx(handle, 0, length_low, length_high, &overlapped);
locked = false;
}
} }
void release() { void release() {
if (locked) { if (!locked) return;
LockKey key{filePath, offset, length};
releaseInternal();
activeLocks.erase(key);
}
}
#else
// Thread-local FD cache - each thread has its own FD per file
static thread_local std::unordered_map<std::string, int> fdCache;
static int getOrOpenFD(const std::string& path) { auto& state = getSharedState();
auto it = fdCache.find(path); std::unique_lock<std::mutex> lock(state.mutex);
if (it != fdCache.end()) {
return it->second;
}
int fd = open(path.c_str(), O_RDWR); auto it = state.activeLocks.find(filePath);
if (fd == -1) { if (it != state.activeLocks.end()) {
throw std::runtime_error("Failed to open file for locking: " + path); auto& locks = it->second;
}
fdCache[path] = fd; // Remove our lock (find by offset/length/mode match)
return fd; locks.erase(
} std::remove_if(locks.begin(), locks.end(), [&](const LockRange& r) {
return r.offset == offset && r.length == length && r.mode == mode;
}),
locks.end()
);
void acquire() { // Clean up if no more locks on this file
LockKey key{filePath, offset, length}; if (locks.empty()) {
state.activeLocks.erase(it);
// Check if we already hold a lock on this region
auto it = activeLocks.find(key);
if (it != activeLocks.end()) {
// If we're trying to upgrade from shared to exclusive, release first
if (it->second == Mode::Shared && mode == Mode::Exclusive) {
releaseInternal(); // Release the old shared lock
activeLocks.erase(it);
} else {
// Already hold compatible or same lock
locked = true;
return;
} }
} }
int fd = getOrOpenFD(filePath); locked = false;
struct flock lock_info; // Notify waiting threads
lock_info.l_type = (mode == Mode::Exclusive) ? F_WRLCK : F_RDLCK; state.cv.notify_all();
lock_info.l_whence = SEEK_SET;
lock_info.l_start = offset;
lock_info.l_len = length;
lock_info.l_pid = 0;
if (fcntl(fd, F_SETLKW, &lock_info) == -1) {
throw std::runtime_error("Failed to acquire file lock");
}
locked = true;
activeLocks[key] = mode;
} }
void releaseInternal() {
if (locked) {
int fd = getOrOpenFD(filePath);
struct flock lock_info;
lock_info.l_type = F_UNLCK;
lock_info.l_whence = SEEK_SET;
lock_info.l_start = offset;
lock_info.l_len = length;
lock_info.l_pid = 0;
fcntl(fd, F_SETLK, &lock_info);
locked = false;
}
}
void release() {
if (locked) {
LockKey key{filePath, offset, length};
releaseInternal();
activeLocks.erase(key);
}
}
#endif
public: public:
// Constructor accepts offset/length for byte-range locking
SweepstoreFileLock(const std::string& path, uint64_t off, uint64_t len, Mode m) SweepstoreFileLock(const std::string& path, uint64_t off, uint64_t len, Mode m)
: filePath(path), offset(off), length(len), mode(m) {} : filePath(path), offset(off), length(len), mode(m) {}
~SweepstoreFileLock() { release(); } ~SweepstoreFileLock() { release(); }
void lock() { void lock() {
SWEEPSTORE_TIME_FUNCTION();
if (!locked) acquire(); if (!locked) acquire();
} }
void unlock() { void unlock() {
SWEEPSTORE_TIME_FUNCTION();
release(); release();
} }
@@ -230,59 +129,16 @@ public:
return locked; return locked;
} }
// Check if file is currently locked (non-blocking test)
bool isLocked() {
#ifdef _WIN32
HANDLE handle = getOrOpenHandle(filePath);
OVERLAPPED overlapped = {};
overlapped.Offset = static_cast<DWORD>(offset & 0xFFFFFFFF);
overlapped.OffsetHigh = static_cast<DWORD>(offset >> 32);
DWORD length_low = static_cast<DWORD>(length & 0xFFFFFFFF);
DWORD length_high = static_cast<DWORD>(length >> 32);
DWORD flags = (mode == Mode::Exclusive) ? LOCKFILE_EXCLUSIVE_LOCK : 0;
flags |= LOCKFILE_FAIL_IMMEDIATELY;
// Try non-blocking lock
if (!LockFileEx(handle, flags, 0, length_low, length_high, &overlapped)) {
return true; // Already locked
}
// Got the lock, release immediately
UnlockFileEx(handle, 0, length_low, length_high, &overlapped);
return false;
#else
int fd = getOrOpenFD(filePath);
struct flock lock_info;
lock_info.l_type = (mode == Mode::Exclusive) ? F_WRLCK : F_RDLCK;
lock_info.l_whence = SEEK_SET;
lock_info.l_start = offset;
lock_info.l_len = length;
lock_info.l_pid = 0;
// Try non-blocking lock
if (fcntl(fd, F_SETLK, &lock_info) == -1) {
return true; // Already locked
}
// Got the lock, release immediately
lock_info.l_type = F_UNLCK;
fcntl(fd, F_SETLK, &lock_info);
return false;
#endif
}
// RAII helper for scoped locking // RAII helper for scoped locking
class Scoped { class Scoped {
SweepstoreFileLock& lock; SweepstoreFileLock& lockRef;
public: public:
Scoped(SweepstoreFileLock& l) : lock(l) { Scoped(SweepstoreFileLock& l) : lockRef(l) {
lock.lock(); lockRef.lock();
} }
~Scoped() { ~Scoped() {
lock.unlock(); lockRef.unlock();
} }
Scoped(const Scoped&) = delete; Scoped(const Scoped&) = delete;
@@ -292,4 +148,4 @@ public:
// Disable copying // Disable copying
SweepstoreFileLock(const SweepstoreFileLock&) = delete; SweepstoreFileLock(const SweepstoreFileLock&) = delete;
SweepstoreFileLock& operator=(const SweepstoreFileLock&) = delete; SweepstoreFileLock& operator=(const SweepstoreFileLock&) = delete;
}; };