diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index b660dab..536bbb1 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -20,6 +20,7 @@ add_executable(main src/Public/sweepstore/concurrency.h src/Private/sweepstore/concurrency.cpp src/Public/sweepstore/utils/file_lock.h + src/Private/sweepstore/utils/file_lock.cpp src/Private/sweepstore/utils/fd_pool.cpp src/Public/sweepstore/utils/file_handle.h src/Private/sweepstore/utils/file_handle.cpp diff --git a/cpp/src/Private/sweepstore/header.cpp b/cpp/src/Private/sweepstore/header.cpp index 654e31c..4595b7b 100644 --- a/cpp/src/Private/sweepstore/header.cpp +++ b/cpp/src/Private/sweepstore/header.cpp @@ -165,7 +165,8 @@ void SweepstoreConcurrencyHeader::initialise(int concurrentWorkers) { void SweepstoreWorkerTicket::write(SweepstoreWorkerTicketSnapshot &snapshot) { RandomAccessMemory buffer; - uint64_t offset = getOffset(); + SweepstoreFileLock lock(file.getPath(), 0, 0, SweepstoreFileLock::Mode::Exclusive); + SweepstoreFileLock::Scoped scopedLock(lock); buffer.setPositionSync(0); buffer.writeIntSync(snapshot.identifier, 4); @@ -187,20 +188,23 @@ void SweepstoreWorkerTicket::write(SweepstoreWorkerTicketSnapshot &snapshot) { char* dataPtr = reinterpret_cast(data.data()); // Write to file - file.writeSeek(offset); + file.writeSeek(getOffset()); file.writeBytes(dataPtr, data.size()); file.flush(); } bool SweepstoreWorkerTicket::writable() { - return true; + SweepstoreFileLock lock(file.getPath(), 0, 0, SweepstoreFileLock::Mode::Exclusive); + return lock.isLocked() == false; } SweepstoreWorkerTicketSnapshot SweepstoreWorkerTicket::snapshot() { - uint64_t offset = getOffset(); - file.readSeek(offset); + SweepstoreFileLock lock(file.getPath(), 0, 0, SweepstoreFileLock::Mode::Shared); + lock.lock(); + file.readSeek(getOffset()); std::unique_ptr buffer(new char[TICKET_SIZE]); file.readBytes(buffer.get(), TICKET_SIZE); + lock.unlock(); RandomAccessMemory ram(reinterpret_cast(buffer.get()), TICKET_SIZE); SweepstoreWorkerTicketSnapshot snapshot; diff --git a/cpp/src/Private/sweepstore/utils/file_handle.cpp b/cpp/src/Private/sweepstore/utils/file_handle.cpp index 20d9840..89b3d18 100644 --- a/cpp/src/Private/sweepstore/utils/file_handle.cpp +++ b/cpp/src/Private/sweepstore/utils/file_handle.cpp @@ -86,10 +86,27 @@ void SweepstoreFileHandle::flush() { unrealHandle->Flush(); } #else + // Windows-specific implementation for guaranteed flush to disk auto& stream = getThreadStream(); stream.flush(); - // On Windows, also sync to ensure data hits disk - stream.sync(); + + // On Windows, also call sync to push to OS buffers + // Then open a Windows HANDLE to the same file and call FlushFileBuffers + // This is more reliable than trying to extract the HANDLE from fstream + HANDLE h = CreateFileA( + path.c_str(), + GENERIC_WRITE, + FILE_SHARE_READ | FILE_SHARE_WRITE, + NULL, + OPEN_EXISTING, + FILE_ATTRIBUTE_NORMAL, + NULL + ); + + if (h != INVALID_HANDLE_VALUE) { + FlushFileBuffers(h); + CloseHandle(h); + } #endif } @@ -106,15 +123,12 @@ void SweepstoreFileHandle::readSeek(std::streampos pos, std::ios::seekdir dir) { unrealHandle->SeekFromEnd(unrealPos); } #else - // Windows + // Windows - simplified to only seek read pointer auto& stream = getThreadStream(); - // On Windows, flush and sync to disk, then invalidate buffers - stream.flush(); - stream.sync(); - stream.clear(); - // Sync both pointers to same position - stream.seekp(pos, dir); stream.seekg(pos, dir); + if (stream.fail()) { + stream.clear(); + } #endif } @@ -124,11 +138,12 @@ void SweepstoreFileHandle::writeSeek(std::streampos pos, std::ios::seekdir dir) // Same as readSeek for Unreal readSeek(pos, dir); #else - // Windows + // Windows - simplified to only seek write pointer auto& stream = getThreadStream(); - stream.flush(); - stream.sync(); stream.seekp(pos, dir); + if (stream.fail()) { + stream.clear(); + } #endif } @@ -140,10 +155,6 @@ void SweepstoreFileHandle::readBytes(char* buffer, std::streamsize size) { // Windows auto& stream = getThreadStream(); stream.read(buffer, size); - // Check for read errors on Windows - if (stream.fail() && !stream.eof()) { - stream.clear(); - } #endif } @@ -156,10 +167,6 @@ void SweepstoreFileHandle::writeBytes(const char* buffer, std::streamsize size) // Windows auto& stream = getThreadStream(); stream.write(buffer, size); - // Check for write errors on Windows - if (stream.fail()) { - stream.clear(); - } #endif } #endif // _WIN32 || WITH_UNREAL \ No newline at end of file diff --git a/cpp/src/Public/sweepstore/utils/file_lock.h b/cpp/src/Public/sweepstore/utils/file_lock.h index 3e1a52e..615eb2f 100644 --- a/cpp/src/Public/sweepstore/utils/file_lock.h +++ b/cpp/src/Public/sweepstore/utils/file_lock.h @@ -22,6 +22,8 @@ public: private: std::string filePath; + uint64_t offset; + uint64_t length; Mode mode; bool locked = false; @@ -55,11 +57,15 @@ private: void acquire() { HANDLE handle = getOrOpenHandle(filePath); - OVERLAPPED overlapped = {0}; - DWORD flags = (mode == Mode::Exclusive) ? LOCKFILE_EXCLUSIVE_LOCK : 0; - flags |= LOCKFILE_FAIL_IMMEDIATELY; + OVERLAPPED overlapped = {}; // Proper zero-initialization + overlapped.Offset = static_cast(offset & 0xFFFFFFFF); + overlapped.OffsetHigh = static_cast(offset >> 32); - if (!LockFileEx(handle, flags, 0, MAXDWORD, MAXDWORD, &overlapped)) { + DWORD length_low = static_cast(length & 0xFFFFFFFF); + DWORD length_high = static_cast(length >> 32); + DWORD flags = (mode == Mode::Exclusive) ? LOCKFILE_EXCLUSIVE_LOCK : 0; + + if (!LockFileEx(handle, flags, 0, length_low, length_high, &overlapped)) { throw std::runtime_error("Failed to acquire file lock"); } locked = true; @@ -68,8 +74,14 @@ private: void release() { if (locked) { HANDLE handle = getOrOpenHandle(filePath); - OVERLAPPED overlapped = {0}; - UnlockFileEx(handle, 0, MAXDWORD, MAXDWORD, &overlapped); + OVERLAPPED overlapped = {}; + overlapped.Offset = static_cast(offset & 0xFFFFFFFF); + overlapped.OffsetHigh = static_cast(offset >> 32); + + DWORD length_low = static_cast(length & 0xFFFFFFFF); + DWORD length_high = static_cast(length >> 32); + + UnlockFileEx(handle, 0, length_low, length_high, &overlapped); locked = false; } } @@ -94,9 +106,15 @@ private: void acquire() { int fd = getOrOpenFD(filePath); - int operation = (mode == Mode::Exclusive) ? LOCK_EX : LOCK_SH; - if (flock(fd, operation) == -1) { + struct flock lock_info; + lock_info.l_type = (mode == Mode::Exclusive) ? F_WRLCK : F_RDLCK; + lock_info.l_whence = SEEK_SET; + lock_info.l_start = offset; + lock_info.l_len = length; + lock_info.l_pid = 0; + + if (fcntl(fd, F_SETLKW, &lock_info) == -1) { throw std::runtime_error("Failed to acquire file lock"); } locked = true; @@ -105,16 +123,24 @@ private: void release() { if (locked) { int fd = getOrOpenFD(filePath); - flock(fd, LOCK_UN); + + struct flock lock_info; + lock_info.l_type = F_UNLCK; + lock_info.l_whence = SEEK_SET; + lock_info.l_start = offset; + lock_info.l_len = length; + lock_info.l_pid = 0; + + fcntl(fd, F_SETLK, &lock_info); locked = false; } } #endif public: - // Constructor accepts offset/length for API compatibility (unused with flock) - SweepstoreFileLock(const std::string& path, uint64_t, uint64_t, Mode m) - : filePath(path), mode(m) {} + // Constructor accepts offset/length for byte-range locking + SweepstoreFileLock(const std::string& path, uint64_t off, uint64_t len, Mode m) + : filePath(path), offset(off), length(len), mode(m) {} ~SweepstoreFileLock() { release(); } @@ -131,32 +157,44 @@ public: } // Check if file is currently locked (non-blocking test) - bool isLocked() const { + bool isLocked() { #ifdef _WIN32 HANDLE handle = getOrOpenHandle(filePath); - OVERLAPPED overlapped = {0}; + OVERLAPPED overlapped = {}; + overlapped.Offset = static_cast(offset & 0xFFFFFFFF); + overlapped.OffsetHigh = static_cast(offset >> 32); + + DWORD length_low = static_cast(length & 0xFFFFFFFF); + DWORD length_high = static_cast(length >> 32); DWORD flags = (mode == Mode::Exclusive) ? LOCKFILE_EXCLUSIVE_LOCK : 0; flags |= LOCKFILE_FAIL_IMMEDIATELY; // Try non-blocking lock - if (!LockFileEx(handle, flags, 0, MAXDWORD, MAXDWORD, &overlapped)) { + if (!LockFileEx(handle, flags, 0, length_low, length_high, &overlapped)) { return true; // Already locked } // Got the lock, release immediately - UnlockFileEx(handle, 0, MAXDWORD, MAXDWORD, &overlapped); + UnlockFileEx(handle, 0, length_low, length_high, &overlapped); return false; #else int fd = getOrOpenFD(filePath); - int operation = (mode == Mode::Exclusive) ? LOCK_EX : LOCK_SH; + + struct flock lock_info; + lock_info.l_type = (mode == Mode::Exclusive) ? F_WRLCK : F_RDLCK; + lock_info.l_whence = SEEK_SET; + lock_info.l_start = offset; + lock_info.l_len = length; + lock_info.l_pid = 0; // Try non-blocking lock - if (flock(fd, operation | LOCK_NB) == -1) { + if (fcntl(fd, F_SETLK, &lock_info) == -1) { return true; // Already locked } // Got the lock, release immediately - flock(fd, LOCK_UN); + lock_info.l_type = F_UNLCK; + fcntl(fd, F_SETLK, &lock_info); return false; #endif }