Fixed windows deadlocks, performance is shit tho

This commit is contained in:
ImBenji
2025-12-05 10:19:32 +00:00
parent 4227b1db75
commit bec317745d
4 changed files with 94 additions and 44 deletions

View File

@@ -20,6 +20,7 @@ add_executable(main
src/Public/sweepstore/concurrency.h src/Public/sweepstore/concurrency.h
src/Private/sweepstore/concurrency.cpp src/Private/sweepstore/concurrency.cpp
src/Public/sweepstore/utils/file_lock.h src/Public/sweepstore/utils/file_lock.h
src/Private/sweepstore/utils/file_lock.cpp
src/Private/sweepstore/utils/fd_pool.cpp src/Private/sweepstore/utils/fd_pool.cpp
src/Public/sweepstore/utils/file_handle.h src/Public/sweepstore/utils/file_handle.h
src/Private/sweepstore/utils/file_handle.cpp src/Private/sweepstore/utils/file_handle.cpp

View File

@@ -165,7 +165,8 @@ void SweepstoreConcurrencyHeader::initialise(int concurrentWorkers) {
void SweepstoreWorkerTicket::write(SweepstoreWorkerTicketSnapshot &snapshot) { void SweepstoreWorkerTicket::write(SweepstoreWorkerTicketSnapshot &snapshot) {
RandomAccessMemory buffer; RandomAccessMemory buffer;
uint64_t offset = getOffset(); SweepstoreFileLock lock(file.getPath(), 0, 0, SweepstoreFileLock::Mode::Exclusive);
SweepstoreFileLock::Scoped scopedLock(lock);
buffer.setPositionSync(0); buffer.setPositionSync(0);
buffer.writeIntSync(snapshot.identifier, 4); buffer.writeIntSync(snapshot.identifier, 4);
@@ -187,20 +188,23 @@ void SweepstoreWorkerTicket::write(SweepstoreWorkerTicketSnapshot &snapshot) {
char* dataPtr = reinterpret_cast<char*>(data.data()); char* dataPtr = reinterpret_cast<char*>(data.data());
// Write to file // Write to file
file.writeSeek(offset); file.writeSeek(getOffset());
file.writeBytes(dataPtr, data.size()); file.writeBytes(dataPtr, data.size());
file.flush(); file.flush();
} }
bool SweepstoreWorkerTicket::writable() { bool SweepstoreWorkerTicket::writable() {
return true; SweepstoreFileLock lock(file.getPath(), 0, 0, SweepstoreFileLock::Mode::Exclusive);
return lock.isLocked() == false;
} }
SweepstoreWorkerTicketSnapshot SweepstoreWorkerTicket::snapshot() { SweepstoreWorkerTicketSnapshot SweepstoreWorkerTicket::snapshot() {
uint64_t offset = getOffset(); SweepstoreFileLock lock(file.getPath(), 0, 0, SweepstoreFileLock::Mode::Shared);
file.readSeek(offset); lock.lock();
file.readSeek(getOffset());
std::unique_ptr<char[]> buffer(new char[TICKET_SIZE]); std::unique_ptr<char[]> buffer(new char[TICKET_SIZE]);
file.readBytes(buffer.get(), TICKET_SIZE); file.readBytes(buffer.get(), TICKET_SIZE);
lock.unlock();
RandomAccessMemory ram(reinterpret_cast<uint8_t*>(buffer.get()), TICKET_SIZE); RandomAccessMemory ram(reinterpret_cast<uint8_t*>(buffer.get()), TICKET_SIZE);
SweepstoreWorkerTicketSnapshot snapshot; SweepstoreWorkerTicketSnapshot snapshot;

View File

@@ -86,10 +86,27 @@ void SweepstoreFileHandle::flush() {
unrealHandle->Flush(); unrealHandle->Flush();
} }
#else #else
// Windows-specific implementation for guaranteed flush to disk
auto& stream = getThreadStream(); auto& stream = getThreadStream();
stream.flush(); stream.flush();
// On Windows, also sync to ensure data hits disk
stream.sync(); // On Windows, also call sync to push to OS buffers
// Then open a Windows HANDLE to the same file and call FlushFileBuffers
// This is more reliable than trying to extract the HANDLE from fstream
HANDLE h = CreateFileA(
path.c_str(),
GENERIC_WRITE,
FILE_SHARE_READ | FILE_SHARE_WRITE,
NULL,
OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL,
NULL
);
if (h != INVALID_HANDLE_VALUE) {
FlushFileBuffers(h);
CloseHandle(h);
}
#endif #endif
} }
@@ -106,15 +123,12 @@ void SweepstoreFileHandle::readSeek(std::streampos pos, std::ios::seekdir dir) {
unrealHandle->SeekFromEnd(unrealPos); unrealHandle->SeekFromEnd(unrealPos);
} }
#else #else
// Windows // Windows - simplified to only seek read pointer
auto& stream = getThreadStream(); auto& stream = getThreadStream();
// On Windows, flush and sync to disk, then invalidate buffers
stream.flush();
stream.sync();
stream.clear();
// Sync both pointers to same position
stream.seekp(pos, dir);
stream.seekg(pos, dir); stream.seekg(pos, dir);
if (stream.fail()) {
stream.clear();
}
#endif #endif
} }
@@ -124,11 +138,12 @@ void SweepstoreFileHandle::writeSeek(std::streampos pos, std::ios::seekdir dir)
// Same as readSeek for Unreal // Same as readSeek for Unreal
readSeek(pos, dir); readSeek(pos, dir);
#else #else
// Windows // Windows - simplified to only seek write pointer
auto& stream = getThreadStream(); auto& stream = getThreadStream();
stream.flush();
stream.sync();
stream.seekp(pos, dir); stream.seekp(pos, dir);
if (stream.fail()) {
stream.clear();
}
#endif #endif
} }
@@ -140,10 +155,6 @@ void SweepstoreFileHandle::readBytes(char* buffer, std::streamsize size) {
// Windows // Windows
auto& stream = getThreadStream(); auto& stream = getThreadStream();
stream.read(buffer, size); stream.read(buffer, size);
// Check for read errors on Windows
if (stream.fail() && !stream.eof()) {
stream.clear();
}
#endif #endif
} }
@@ -156,10 +167,6 @@ void SweepstoreFileHandle::writeBytes(const char* buffer, std::streamsize size)
// Windows // Windows
auto& stream = getThreadStream(); auto& stream = getThreadStream();
stream.write(buffer, size); stream.write(buffer, size);
// Check for write errors on Windows
if (stream.fail()) {
stream.clear();
}
#endif #endif
} }
#endif // _WIN32 || WITH_UNREAL #endif // _WIN32 || WITH_UNREAL

View File

@@ -22,6 +22,8 @@ public:
private: private:
std::string filePath; std::string filePath;
uint64_t offset;
uint64_t length;
Mode mode; Mode mode;
bool locked = false; bool locked = false;
@@ -55,11 +57,15 @@ private:
void acquire() { void acquire() {
HANDLE handle = getOrOpenHandle(filePath); HANDLE handle = getOrOpenHandle(filePath);
OVERLAPPED overlapped = {0}; OVERLAPPED overlapped = {}; // Proper zero-initialization
DWORD flags = (mode == Mode::Exclusive) ? LOCKFILE_EXCLUSIVE_LOCK : 0; overlapped.Offset = static_cast<DWORD>(offset & 0xFFFFFFFF);
flags |= LOCKFILE_FAIL_IMMEDIATELY; overlapped.OffsetHigh = static_cast<DWORD>(offset >> 32);
if (!LockFileEx(handle, flags, 0, MAXDWORD, MAXDWORD, &overlapped)) { DWORD length_low = static_cast<DWORD>(length & 0xFFFFFFFF);
DWORD length_high = static_cast<DWORD>(length >> 32);
DWORD flags = (mode == Mode::Exclusive) ? LOCKFILE_EXCLUSIVE_LOCK : 0;
if (!LockFileEx(handle, flags, 0, length_low, length_high, &overlapped)) {
throw std::runtime_error("Failed to acquire file lock"); throw std::runtime_error("Failed to acquire file lock");
} }
locked = true; locked = true;
@@ -68,8 +74,14 @@ private:
void release() { void release() {
if (locked) { if (locked) {
HANDLE handle = getOrOpenHandle(filePath); HANDLE handle = getOrOpenHandle(filePath);
OVERLAPPED overlapped = {0}; OVERLAPPED overlapped = {};
UnlockFileEx(handle, 0, MAXDWORD, MAXDWORD, &overlapped); overlapped.Offset = static_cast<DWORD>(offset & 0xFFFFFFFF);
overlapped.OffsetHigh = static_cast<DWORD>(offset >> 32);
DWORD length_low = static_cast<DWORD>(length & 0xFFFFFFFF);
DWORD length_high = static_cast<DWORD>(length >> 32);
UnlockFileEx(handle, 0, length_low, length_high, &overlapped);
locked = false; locked = false;
} }
} }
@@ -94,9 +106,15 @@ private:
void acquire() { void acquire() {
int fd = getOrOpenFD(filePath); int fd = getOrOpenFD(filePath);
int operation = (mode == Mode::Exclusive) ? LOCK_EX : LOCK_SH;
if (flock(fd, operation) == -1) { struct flock lock_info;
lock_info.l_type = (mode == Mode::Exclusive) ? F_WRLCK : F_RDLCK;
lock_info.l_whence = SEEK_SET;
lock_info.l_start = offset;
lock_info.l_len = length;
lock_info.l_pid = 0;
if (fcntl(fd, F_SETLKW, &lock_info) == -1) {
throw std::runtime_error("Failed to acquire file lock"); throw std::runtime_error("Failed to acquire file lock");
} }
locked = true; locked = true;
@@ -105,16 +123,24 @@ private:
void release() { void release() {
if (locked) { if (locked) {
int fd = getOrOpenFD(filePath); int fd = getOrOpenFD(filePath);
flock(fd, LOCK_UN);
struct flock lock_info;
lock_info.l_type = F_UNLCK;
lock_info.l_whence = SEEK_SET;
lock_info.l_start = offset;
lock_info.l_len = length;
lock_info.l_pid = 0;
fcntl(fd, F_SETLK, &lock_info);
locked = false; locked = false;
} }
} }
#endif #endif
public: public:
// Constructor accepts offset/length for API compatibility (unused with flock) // Constructor accepts offset/length for byte-range locking
SweepstoreFileLock(const std::string& path, uint64_t, uint64_t, Mode m) SweepstoreFileLock(const std::string& path, uint64_t off, uint64_t len, Mode m)
: filePath(path), mode(m) {} : filePath(path), offset(off), length(len), mode(m) {}
~SweepstoreFileLock() { release(); } ~SweepstoreFileLock() { release(); }
@@ -131,32 +157,44 @@ public:
} }
// Check if file is currently locked (non-blocking test) // Check if file is currently locked (non-blocking test)
bool isLocked() const { bool isLocked() {
#ifdef _WIN32 #ifdef _WIN32
HANDLE handle = getOrOpenHandle(filePath); HANDLE handle = getOrOpenHandle(filePath);
OVERLAPPED overlapped = {0}; OVERLAPPED overlapped = {};
overlapped.Offset = static_cast<DWORD>(offset & 0xFFFFFFFF);
overlapped.OffsetHigh = static_cast<DWORD>(offset >> 32);
DWORD length_low = static_cast<DWORD>(length & 0xFFFFFFFF);
DWORD length_high = static_cast<DWORD>(length >> 32);
DWORD flags = (mode == Mode::Exclusive) ? LOCKFILE_EXCLUSIVE_LOCK : 0; DWORD flags = (mode == Mode::Exclusive) ? LOCKFILE_EXCLUSIVE_LOCK : 0;
flags |= LOCKFILE_FAIL_IMMEDIATELY; flags |= LOCKFILE_FAIL_IMMEDIATELY;
// Try non-blocking lock // Try non-blocking lock
if (!LockFileEx(handle, flags, 0, MAXDWORD, MAXDWORD, &overlapped)) { if (!LockFileEx(handle, flags, 0, length_low, length_high, &overlapped)) {
return true; // Already locked return true; // Already locked
} }
// Got the lock, release immediately // Got the lock, release immediately
UnlockFileEx(handle, 0, MAXDWORD, MAXDWORD, &overlapped); UnlockFileEx(handle, 0, length_low, length_high, &overlapped);
return false; return false;
#else #else
int fd = getOrOpenFD(filePath); int fd = getOrOpenFD(filePath);
int operation = (mode == Mode::Exclusive) ? LOCK_EX : LOCK_SH;
struct flock lock_info;
lock_info.l_type = (mode == Mode::Exclusive) ? F_WRLCK : F_RDLCK;
lock_info.l_whence = SEEK_SET;
lock_info.l_start = offset;
lock_info.l_len = length;
lock_info.l_pid = 0;
// Try non-blocking lock // Try non-blocking lock
if (flock(fd, operation | LOCK_NB) == -1) { if (fcntl(fd, F_SETLK, &lock_info) == -1) {
return true; // Already locked return true; // Already locked
} }
// Got the lock, release immediately // Got the lock, release immediately
flock(fd, LOCK_UN); lock_info.l_type = F_UNLCK;
fcntl(fd, F_SETLK, &lock_info);
return false; return false;
#endif #endif
} }