From eae4d0e24e1916f142216f83adc613bd37746d6c Mon Sep 17 00:00:00 2001 From: ImBenji Date: Tue, 2 Dec 2025 14:11:45 +0000 Subject: [PATCH] Add concurrency handling implementation with ticket management and file locking --- cpp/CMakeLists.txt | 22 +- cpp/binary_table.cpp | 1243 ----------------- cpp/binary_table.h | 305 ---- cpp/src/Private/sweepstore/concurrency.cpp | 248 ++++ cpp/src/Private/sweepstore/header.cpp | 220 +++ cpp/src/Private/sweepstore/structures.cpp | 13 + cpp/src/Private/sweepstore/sweepstore.cpp | 21 + cpp/src/Public/sweepstore/concurrency.h | 30 + cpp/src/Public/sweepstore/header.h | 143 ++ cpp/src/Public/sweepstore/structures.h | 39 + cpp/src/Public/sweepstore/sweepstore.h | 67 + cpp/src/Public/sweepstore/utils/file_handle.h | 49 + cpp/src/Public/sweepstore/utils/file_lock.h | 169 +++ cpp/src/Public/sweepstore/utils/helpers.h | 381 +++++ dart/lib/sweepstore.dart | 6 +- example.bin | Bin 646 -> 1072 bytes 16 files changed, 1405 insertions(+), 1551 deletions(-) delete mode 100644 cpp/binary_table.cpp delete mode 100644 cpp/binary_table.h create mode 100644 cpp/src/Private/sweepstore/concurrency.cpp create mode 100644 cpp/src/Private/sweepstore/header.cpp create mode 100644 cpp/src/Private/sweepstore/structures.cpp create mode 100644 cpp/src/Private/sweepstore/sweepstore.cpp create mode 100644 cpp/src/Public/sweepstore/concurrency.h create mode 100644 cpp/src/Public/sweepstore/header.h create mode 100644 cpp/src/Public/sweepstore/structures.h create mode 100644 cpp/src/Public/sweepstore/sweepstore.h create mode 100644 cpp/src/Public/sweepstore/utils/file_handle.h create mode 100644 cpp/src/Public/sweepstore/utils/file_lock.h create mode 100644 cpp/src/Public/sweepstore/utils/helpers.h diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index a162dba..90d854f 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -3,13 +3,31 @@ project(BinaryTable) set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED ON) +set(CMAKE_EXPORT_COMPILE_COMMANDS ON) + +# Add include directories globally +include_directories(${CMAKE_SOURCE_DIR}/src/Public) +include_directories(${CMAKE_SOURCE_DIR}/src/Private) # Main executable with integrated binary table implementation add_executable(main - binary_table.h - binary_table.cpp + src/Private/sweepstore/header.cpp + src/Public/sweepstore/utils/helpers.h + src/Private/sweepstore/structures.cpp + src/Public/sweepstore/structures.h + src/Private/sweepstore/sweepstore.cpp + src/Public/sweepstore/sweepstore.h + src/Public/sweepstore/concurrency.h + src/Private/sweepstore/concurrency.cpp + src/Public/sweepstore/utils/file_lock.h + src/Public/sweepstore/utils/file_handle.h + src/Public/sweepstore/header.h + src/Private/sweepstore/benchmark.cpp ) +# Add include directories +target_include_directories(main PRIVATE ${CMAKE_SOURCE_DIR}/src/Public) + # Compiler Settings if(MSVC) target_compile_options(main PRIVATE /W4) diff --git a/cpp/binary_table.cpp b/cpp/binary_table.cpp deleted file mode 100644 index c33bf00..0000000 --- a/cpp/binary_table.cpp +++ /dev/null @@ -1,1243 +0,0 @@ -#include "binary_table.h" -#include -#include -#include -#include -#include -#include -#include -#include - -namespace bt { - -// FNV-1a hash implementation -int64_t BinaryTable::hashString(const std::string& str) const { - uint64_t hash = 0xcbf29ce484222325ULL; // FNV offset basis - for (char c : str) { - hash ^= static_cast(c); - hash *= 0x100000001b3ULL; // FNV prime - } - return static_cast(hash); -} - -// Value encoding implementations -std::vector encodeValue(const int32_t& value) { - std::vector buffer; - buffer.push_back(static_cast(BT_Type::INTEGER)); - - // Little endian encoding - buffer.push_back(value & 0xFF); - buffer.push_back((value >> 8) & 0xFF); - buffer.push_back((value >> 16) & 0xFF); - buffer.push_back((value >> 24) & 0xFF); - - return buffer; -} - -std::vector encodeValue(const float& value) { - std::vector buffer; - buffer.push_back(static_cast(BT_Type::FLOAT)); - - // Convert float to bytes (little endian) - uint32_t floatBits; - std::memcpy(&floatBits, &value, sizeof(float)); - - buffer.push_back(floatBits & 0xFF); - buffer.push_back((floatBits >> 8) & 0xFF); - buffer.push_back((floatBits >> 16) & 0xFF); - buffer.push_back((floatBits >> 24) & 0xFF); - - return buffer; -} - -std::vector encodeValue(const std::string& value) { - std::vector buffer; - buffer.push_back(static_cast(BT_Type::STRING)); - - // String length (little endian) - int32_t length = static_cast(value.length()); - buffer.push_back(length & 0xFF); - buffer.push_back((length >> 8) & 0xFF); - buffer.push_back((length >> 16) & 0xFF); - buffer.push_back((length >> 24) & 0xFF); - - // String bytes - for (char c : value) { - buffer.push_back(static_cast(c)); - } - - return buffer; -} - -std::vector encodeValue(const std::vector& value) { - std::vector buffer; - buffer.push_back(static_cast(BT_Type::INTEGER_ARRAY)); - - // Array length (little endian) - int32_t length = static_cast(value.size()); - buffer.push_back(length & 0xFF); - buffer.push_back((length >> 8) & 0xFF); - buffer.push_back((length >> 16) & 0xFF); - buffer.push_back((length >> 24) & 0xFF); - - // Array elements - for (const auto& item : value) { - auto itemBuffer = encodeValue(item); - buffer.insert(buffer.end(), itemBuffer.begin(), itemBuffer.end()); - } - - return buffer; -} - -std::vector encodeValue(const std::vector& value) { - std::vector buffer; - buffer.push_back(static_cast(BT_Type::FLOAT_ARRAY)); - - // Array length (little endian) - int32_t length = static_cast(value.size()); - buffer.push_back(length & 0xFF); - buffer.push_back((length >> 8) & 0xFF); - buffer.push_back((length >> 16) & 0xFF); - buffer.push_back((length >> 24) & 0xFF); - - // Array elements - for (const auto& item : value) { - auto itemBuffer = encodeValue(item); - buffer.insert(buffer.end(), itemBuffer.begin(), itemBuffer.end()); - } - - return buffer; -} - -// BT_Reference implementation -BT_Reference::BT_Reference(BinaryTable* table, BT_Pointer pointer) - : table_(table), pointer_(pointer) {} - -template<> -int32_t BT_Reference::decodeValue() { - if (pointer_.isNull()) { - throw std::runtime_error("Null pointer"); - } - - table_->setFilePosition(pointer_.address()); - uint8_t typeId = table_->readByte(pointer_.address()); - - if (static_cast(typeId) != BT_Type::INTEGER) { - throw std::runtime_error("Type mismatch"); - } - - return table_->readInt32(pointer_.address() + 1); -} - -template<> -float BT_Reference::decodeValue() { - if (pointer_.isNull()) { - throw std::runtime_error("Null pointer"); - } - - table_->setFilePosition(pointer_.address()); - uint8_t typeId = table_->readByte(pointer_.address()); - - if (static_cast(typeId) != BT_Type::FLOAT) { - throw std::runtime_error("Type mismatch"); - } - - return table_->readFloat32(pointer_.address() + 1); -} - -template<> -std::string BT_Reference::decodeValue() { - if (pointer_.isNull()) { - throw std::runtime_error("Null pointer"); - } - - table_->setFilePosition(pointer_.address()); - uint8_t typeId = table_->readByte(pointer_.address()); - - if (static_cast(typeId) != BT_Type::STRING) { - throw std::runtime_error("Type mismatch"); - } - - int32_t length = table_->readInt32(pointer_.address() + 1); - auto bytes = table_->readBytes(pointer_.address() + 5, length); - - return std::string(bytes.begin(), bytes.end()); -} - -template<> -BT_UniformArray BT_Reference::decodeValue>() { - return BT_UniformArray(table_, pointer_); -} - -template<> -BT_UniformArray BT_Reference::decodeValue>() { - return BT_UniformArray(table_, pointer_); -} - -template<> -std::vector BT_Reference::decodeValue>() { - if (pointer_.isNull()) { - return {}; - } - - uint8_t typeId = table_->readByte(pointer_.address()); - BT_Type type = static_cast(typeId); - - if (type != BT_Type::INTEGER_ARRAY) { - throw std::runtime_error("Type mismatch - expected integer array"); - } - - int32_t length = table_->readInt32(pointer_.address() + 1); - std::vector result; - result.reserve(length); - - // Each element is: type byte (1) + int32 data (4) = 5 bytes - int64_t elementPos = pointer_.address() + 1 + 4; // Skip type and length - - for (int32_t i = 0; i < length; i++) { - // Skip the type byte, read the int32 value - int32_t value = table_->readInt32(elementPos + 1); - result.push_back(value); - elementPos += 5; // Move to next element - } - - return result; -} - -template<> -std::vector BT_Reference::decodeValue>() { - if (pointer_.isNull()) { - return {}; - } - - uint8_t typeId = table_->readByte(pointer_.address()); - BT_Type type = static_cast(typeId); - - if (type != BT_Type::FLOAT_ARRAY) { - throw std::runtime_error("Type mismatch - expected float array"); - } - - int32_t length = table_->readInt32(pointer_.address() + 1); - std::vector result; - result.reserve(length); - - // Each element is: type byte (1) + float data (4) = 5 bytes - int64_t elementPos = pointer_.address() + 1 + 4; // Skip type and length - - for (int32_t i = 0; i < length; i++) { - // Skip the type byte, read the float value - float value = table_->readFloat32(elementPos + 1); - result.push_back(value); - elementPos += 5; // Move to next element - } - - return result; -} - -int32_t BT_Reference::size() const { - if (pointer_.isNull()) { - return 0; - } - - uint8_t typeId = table_->readByte(pointer_.address()); - BT_Type type = static_cast(typeId); - - switch (type) { - case BT_Type::POINTER: - return 1 + 8; // Type byte + pointer - case BT_Type::INTEGER: - case BT_Type::FLOAT: - return 1 + 4; // Type byte + data - case BT_Type::STRING: { - int32_t length = table_->readInt32(pointer_.address() + 1); - return 1 + 4 + length; // Type + length + string bytes - } - case BT_Type::ADDRESS_TABLE: { - int32_t count = table_->readInt32(pointer_.address() + 1); - return 1 + 4 + count * (8 + 8); // Type + count + entries - } - case BT_Type::INTEGER_ARRAY: - case BT_Type::FLOAT_ARRAY: { - int32_t length = table_->readInt32(pointer_.address() + 1); - int32_t elementSize = (type == BT_Type::INTEGER_ARRAY) ? (1 + 4) : (1 + 4); - return 1 + 4 + length * elementSize; - } - } - return 0; -} - -BT_Type BT_Reference::getType() const { - if (pointer_.isNull()) { - throw std::runtime_error("Null pointer"); - } - - uint8_t typeId = table_->readByte(pointer_.address()); - return static_cast(typeId); -} - -// BT_UniformArray template implementations -template -int32_t BT_UniformArray::length() const { - if (this->pointer_.isNull()) { - return 0; - } - - try { - uint8_t typeId = this->table_->readByte(this->pointer_.address()); - BT_Type type = static_cast(typeId); - - if (!isArrayType(type)) { - return 0; // Treat non-array as empty array instead of throwing - } - - return this->table_->readInt32(this->pointer_.address() + 1); - } catch (...) { - return 0; // If we can't read, treat as empty - } -} - -template -T BT_UniformArray::operator[](int32_t index) const { - if (this->pointer_.isNull()) { - throw std::runtime_error("Null pointer"); - } - - int32_t len = length(); - if (index < 0 || index >= len) { - throw std::out_of_range("Index out of range"); - } - - // Determine element type and size - uint8_t elementTypeId = this->table_->readByte(this->pointer_.address() + 1 + 4); - BT_Type elementType = static_cast(elementTypeId); - int32_t elementSize = 1 + getTypeSize(elementType); - - int64_t itemAddress = this->pointer_.address() + 1 + 4 + index * elementSize; - BT_Reference itemRef(this->table_, BT_Pointer(itemAddress)); - - return itemRef.decodeValue(); -} - -template -void BT_UniformArray::set(int32_t index, const T& value) { - if (this->pointer_.isNull()) { - throw std::runtime_error("Null pointer"); - } - - int32_t len = length(); - if (index < 0 || index >= len) { - throw std::out_of_range("Index out of range"); - } - - // Validate type compatibility - BT_Type expectedType = getTypeFromValue(); - uint8_t elementTypeId = this->table_->readByte(this->pointer_.address() + 1 + 4); - BT_Type elementType = static_cast(elementTypeId); - - if (expectedType != elementType) { - throw std::runtime_error("Type mismatch"); - } - - // Encode and write value - auto valueBuffer = encodeValue(value); - int32_t elementSize = 1 + getTypeSize(elementType); - int64_t itemAddress = this->pointer_.address() + 1 + 4 + index * elementSize; - - this->table_->writeBytes(itemAddress, valueBuffer); -} - -template -void BT_UniformArray::add(const T& value) { - addAll({value}); -} - -template -void BT_UniformArray::addAll(const std::vector& values) { - this->table_->antiFreeListScope([&]() { - // Get current element type or determine from new values - BT_Type elementType = getTypeFromValue(); - - if (length() > 0) { - uint8_t existingTypeId = this->table_->readByte(this->pointer_.address() + 1 + 4); - BT_Type existingType = static_cast(existingTypeId); - if (existingType != elementType) { - throw std::runtime_error("Type mismatch"); - } - } - - // Validate all values are compatible - for (const auto& value : values) { - (void)value; // Suppress unused variable warning - BT_Type valueType = getTypeFromValue(); - if (valueType != elementType) { - throw std::runtime_error("Type mismatch in values"); - } - if (getTypeSize(elementType) == -1) { - throw std::runtime_error("Variable size types not supported in uniform arrays"); - } - } - - // Read current array buffer - int32_t currentLength = length(); - int32_t elementSize = 1 + getTypeSize(elementType); - int32_t currentBufferSize = 1 + 4 + currentLength * elementSize; - - std::vector fullBuffer; - if (currentLength > 0) { - fullBuffer = this->table_->readBytes(this->pointer_.address(), currentBufferSize); - } else { - // Empty array, create initial buffer - fullBuffer.push_back(static_cast(elementType == BT_Type::INTEGER ? BT_Type::INTEGER_ARRAY : BT_Type::FLOAT_ARRAY)); - fullBuffer.push_back(0); // Length will be updated - fullBuffer.push_back(0); - fullBuffer.push_back(0); - fullBuffer.push_back(0); - } - - // Add new values to buffer - for (const auto& value : values) { - auto valueBuffer = encodeValue(value); - fullBuffer.insert(fullBuffer.end(), valueBuffer.begin(), valueBuffer.end()); - } - - // Update length in buffer - int32_t newLength = currentLength + static_cast(values.size()); - fullBuffer[1] = newLength & 0xFF; - fullBuffer[2] = (newLength >> 8) & 0xFF; - fullBuffer[3] = (newLength >> 16) & 0xFF; - fullBuffer[4] = (newLength >> 24) & 0xFF; - - // Free old array if it exists - if (!this->pointer_.isNull()) { - this->table_->free(this->pointer_, currentBufferSize); - } - - // Allocate new space - BT_Pointer newPointer = this->table_->alloc(static_cast(fullBuffer.size())); - - // Update any references in address table - auto addressTable = this->table_->getAddressTable(); - for (auto& [key, value] : addressTable) { - if (value == this->pointer_) { - value = newPointer; - } - } - this->table_->setAddressTable(addressTable); - this->pointer_ = newPointer; - - // Write updated buffer - this->table_->writeBytes(newPointer.address(), fullBuffer); - }); -} - -template -std::vector BT_UniformArray::fetchSublist(int32_t start, int32_t end) { - int32_t len = length(); - if (len == 0) { - return {}; - } - - if (end == -1) { - end = len; - } - - if (start < 0 || start >= len || end < start || end > len) { - throw std::out_of_range("Invalid range"); - } - - uint8_t elementTypeId = this->table_->readByte(this->pointer_.address() + 1 + 4); - BT_Type elementType = static_cast(elementTypeId); - int32_t elementSize = 1 + getTypeSize(elementType); - - if (getTypeSize(elementType) == -1) { - throw std::runtime_error("Variable size types not supported in uniform arrays"); - } - - std::vector result; - for (int32_t i = start; i < end; i++) { - int64_t itemAddress = this->pointer_.address() + 1 + 4 + i * elementSize; - BT_Reference itemRef(this->table_, BT_Pointer(itemAddress)); - result.push_back(itemRef.decodeValue()); - } - - return result; -} - -// Explicit template instantiations -template class BT_UniformArray; -template class BT_UniformArray; - -// BinaryTable implementation -BinaryTable::BinaryTable(const std::string& path) - : filePath_(path), freeListLifted_(false) { - file_ = fopen(path.c_str(), "r+b"); - if (!file_) { - // File doesn't exist, create it - file_ = fopen(path.c_str(), "w+b"); - } - -} - -BinaryTable::~BinaryTable() { - if (file_) { - fclose(file_); - } -} - -void BinaryTable::initialize() { - fseek(file_, 0, SEEK_SET); - - // Magic number "SWPS" (0/4 bytes) - const char magic[] = "SWPS"; - fwrite(magic, 1, 4, file_); - - // Version (1.0 float16) (4/2 bytes) - uint16_t version = 0x3C00; // 1.0 in float16 format - uint8_t versionBytes[2] = { - static_cast(version & 0xFF), - static_cast((version >> 8) & 0xFF) - }; - fwrite(versionBytes, 1, 2, file_); - - // Address table pointer (null) (6/8 bytes) - writeInt64(6, BT_Null.address()); - - // Free list count (0) (14/4 bytes) - writeInt32(14, 0); - - fflush(file_); -} - -// File I/O helper implementations -int32_t BinaryTable::readInt32(int64_t position) { - fseek(file_, position, SEEK_SET); - uint8_t bytes[4]; - fread(bytes, 1, 4, file_); - - return static_cast(bytes[0]) | - (static_cast(bytes[1]) << 8) | - (static_cast(bytes[2]) << 16) | - (static_cast(bytes[3]) << 24); -} - -float BinaryTable::readFloat32(int64_t position) { - fseek(file_, position, SEEK_SET); - uint8_t bytes[4]; - fread(bytes, 1, 4, file_); - - uint32_t floatBits = static_cast(bytes[0]) | - (static_cast(bytes[1]) << 8) | - (static_cast(bytes[2]) << 16) | - (static_cast(bytes[3]) << 24); - - float result; - std::memcpy(&result, &floatBits, sizeof(float)); - return result; -} - -int64_t BinaryTable::readInt64(int64_t position) { - fseek(file_, position, SEEK_SET); - uint8_t bytes[8]; - fread(bytes, 1, 8, file_); - - int64_t result = 0; - for (int i = 0; i < 8; i++) { - result |= static_cast(bytes[i]) << (i * 8); - } - - return result; -} - -uint8_t BinaryTable::readByte(int64_t position) { - fseek(file_, position, SEEK_SET); - uint8_t byte; - fread(&byte, 1, 1, file_); - return byte; -} - -std::vector BinaryTable::readBytes(int64_t position, int32_t count) { - fseek(file_, position, SEEK_SET); - std::vector bytes(count); - fread(bytes.data(), 1, count, file_); - return bytes; -} - -void BinaryTable::writeInt32(int64_t position, int32_t value) { - fseek(file_, position, SEEK_SET); - uint8_t bytes[4] = { - static_cast(value & 0xFF), - static_cast((value >> 8) & 0xFF), - static_cast((value >> 16) & 0xFF), - static_cast((value >> 24) & 0xFF) - }; - fwrite(bytes, 1, 4, file_); -} - -void BinaryTable::writeFloat32(int64_t position, float value) { - fseek(file_, position, SEEK_SET); - uint32_t floatBits; - std::memcpy(&floatBits, &value, sizeof(float)); - - uint8_t bytes[4] = { - static_cast(floatBits & 0xFF), - static_cast((floatBits >> 8) & 0xFF), - static_cast((floatBits >> 16) & 0xFF), - static_cast((floatBits >> 24) & 0xFF) - }; - fwrite(bytes, 1, 4, file_); -} - -void BinaryTable::writeInt64(int64_t position, int64_t value) { - fseek(file_, position, SEEK_SET); - uint8_t bytes[8]; - for (int i = 0; i < 8; i++) { - bytes[i] = static_cast((value >> (i * 8)) & 0xFF); - } - fwrite(bytes, 1, 8, file_); -} - -void BinaryTable::writeByte(int64_t position, uint8_t value) { - fseek(file_, position, SEEK_SET); - fwrite(&value, 1, 1, file_); -} - -void BinaryTable::writeBytes(int64_t position, const std::vector& data) { - fseek(file_, position, SEEK_SET); - fwrite(data.data(), 1, data.size(), file_); -} - -int64_t BinaryTable::getFileLength() { - long current = ftell(file_); - fseek(file_, 0, SEEK_END); - long length = ftell(file_); - fseek(file_, current, SEEK_SET); // Restore position - return length; -} - -void BinaryTable::setFilePosition(int64_t position) { - fseek(file_, position, SEEK_SET); -} - -// Address table management -std::unordered_map BinaryTable::getAddressTable() { - int64_t tableAddress = readInt64(6); - DEBUG_PRINTLN("DEBUG: getAddressTable reading from address " << tableAddress); - - if (tableAddress == -1) { // Null pointer - return {}; - } - - // Validate table address is within file bounds - int64_t fileLength = getFileLength(); - if (tableAddress < 0 || tableAddress >= fileLength) { - DEBUG_PRINTLN("DEBUG: Address table pointer is out of bounds: " << tableAddress << " (file length: " << fileLength << ")"); - throw std::runtime_error("Address table pointer is corrupted - out of bounds"); - } - - try { - uint8_t typeId = readByte(tableAddress); - - if (static_cast(typeId) != BT_Type::ADDRESS_TABLE) { - DEBUG_PRINTLN("DEBUG: Invalid type ID at address table location: " << (int)typeId); - // Address table might not be valid yet, return empty - return {}; - } - - int32_t tableCount = readInt32(tableAddress + 1); - - // Validate table count is reasonable - if (tableCount < 0 || tableCount > 1000000) { // Arbitrary but reasonable limit - DEBUG_PRINTLN("DEBUG: Suspicious address table count: " << tableCount); - throw std::runtime_error("Address table appears corrupted - invalid entry count"); - } - - // Validate the entire table fits within file bounds - int64_t requiredSize = 1 + 4 + tableCount * (8 + 8); // Type + count + entries - if (tableAddress + requiredSize > fileLength) { - DEBUG_PRINTLN("DEBUG: Address table extends beyond file bounds"); - throw std::runtime_error("Address table appears corrupted - extends beyond file"); - } - - std::unordered_map addressTable; - - for (int32_t i = 0; i < tableCount; i++) { - int64_t offset = tableAddress + 1 + 4 + i * (8 + 8); - int64_t keyHash = readInt64(offset); - int64_t valueAddress = readInt64(offset + 8); - - // Validate each value address is within bounds (or null) - if (valueAddress != -1 && (valueAddress < 0 || valueAddress >= fileLength)) { - DEBUG_PRINTLN("DEBUG: Invalid value address in entry " << i << ": " << valueAddress); - throw std::runtime_error("Address table entry contains invalid pointer"); - } - - DEBUG_PRINTLN(" Reading entry " << i << ": hash " << keyHash << " -> address " << valueAddress); - addressTable[keyHash] = BT_Pointer(valueAddress); - } - - return addressTable; - } catch (const std::runtime_error&) { - // Re-throw runtime errors (our validation failures) - throw; - } catch (...) { - // If we can't read the address table for other reasons, return empty - DEBUG_PRINTLN("DEBUG: Failed to read address table due to I/O error"); - return {}; - } -} - -void BinaryTable::setAddressTable(const std::unordered_map& table) { - DEBUG_PRINTLN("DEBUG: setAddressTable called! This should NOT happen during get operations!"); - DEBUG_PRINTLN("DEBUG: setAddressTable writing " << table.size() << " entries"); - for (const auto& [key, value] : table) { - DEBUG_PRINTLN(" Writing hash " << key << " -> address " << value.address()); - } - - // Read old table pointer FIRST to ensure we can clean it up later - int64_t oldTablePointerAddress = readInt64(6); - BT_Pointer oldTablePtr(oldTablePointerAddress); - int32_t oldTableSize = 0; - - // Calculate old table size if it exists - if (!oldTablePtr.isNull()) { - try { - BT_Reference oldTableRef(this, oldTablePtr); - oldTableSize = oldTableRef.size(); - } catch (...) { - // If we can't read the old table, we can't free it safely - DEBUG_PRINTLN("DEBUG: WARNING - Cannot read old table for cleanup"); - oldTablePtr = BT_Null; - } - } - - // Build buffer manually (matching Dart implementation exactly) - std::vector buffer; - - // Type byte - buffer.push_back(static_cast(BT_Type::ADDRESS_TABLE)); - - // Table count (little endian, 4 bytes) - int32_t count = static_cast(table.size()); - for (int i = 0; i < 4; i++) { - buffer.push_back(static_cast((count >> (i * 8)) & 0xFF)); - } - - // Table entries - for (const auto& [key, value] : table) { - // Key hash (little endian, 8 bytes) - for (int i = 0; i < 8; i++) { - buffer.push_back(static_cast((key >> (i * 8)) & 0xFF)); - } - // Value address (little endian, 8 bytes) - int64_t addr = value.address(); - for (int i = 0; i < 8; i++) { - buffer.push_back(static_cast((addr >> (i * 8)) & 0xFF)); - } - } - - // Allocate and write new address table - BT_Pointer newTableAddress = alloc(static_cast(buffer.size())); - setFilePosition(newTableAddress.address()); - size_t written = fwrite(buffer.data(), 1, buffer.size(), file_); - (void)written; // Suppress unused variable warning in release builds - - if (written != buffer.size()) { - throw std::runtime_error("Failed to write complete address table"); - } - - // Ensure new table is written to disk before updating header - fflush(file_); - - // Atomically update header to point to new table - writeInt64(6, newTableAddress.address()); - fflush(file_); - - // Only free old table after new one is successfully committed - DEBUG_PRINTLN("DEBUG: oldTablePtr.isNull()=" << oldTablePtr.isNull() << ", oldTablePtr.address()=" << oldTablePtr.address() << ", newTableAddress=" << newTableAddress.address()); - if (!oldTablePtr.isNull() && oldTablePtr != newTableAddress) { - DEBUG_PRINTLN("DEBUG: Calling free() for old table"); - free(oldTablePtr, oldTableSize); - } else { - DEBUG_PRINTLN("DEBUG: NOT calling free() - condition not met"); - } -} - -// Free list management -std::vector BinaryTable::getFreeList() { - if (freeListLifted_) { - return freeListCache_; - } - - int32_t entryCount = readInt32(14); - if (entryCount == 0) { - return {}; - } - - int32_t entrySize = 8 + 4; // Pointer + Size - int32_t freeListSize = entryCount * entrySize; - int64_t fileLength = getFileLength(); - int64_t freeListStart = fileLength - freeListSize; - - std::vector freeList; - for (int32_t i = 0; i < entryCount; i++) { - int64_t offset = freeListStart + i * entrySize; - int64_t pointerAddress = readInt64(offset); - int32_t size = readInt32(offset + 8); - freeList.emplace_back(BT_Pointer(pointerAddress), size); - } - - return freeList; -} - -void BinaryTable::setFreeList(const std::vector& list) { - DEBUG_PRINTLN("DEBUG: setFreeList called with freeListLifted_=" << freeListLifted_ << ", list.size()=" << list.size()); - if (freeListLifted_) { - freeListCache_ = list; - DEBUG_PRINTLN("DEBUG: setFreeList early return - just updating cache"); - return; - } - - // Read OLD count from header (position 14) - int32_t oldEntryCount = readInt32(14); - - // Calculate old free list size (entries only, not count) - int32_t oldListSize = oldEntryCount * (8 + 4); - - // Remove old free list entries from EOF - if (oldEntryCount > 0) { - int64_t currentLength = getFileLength(); - truncateFile(currentLength - oldListSize); - } - - // Write NEW count to header (position 14) - writeInt32(14, static_cast(list.size())); - - // Write NEW entries to EOF (if any) - if (!list.empty()) { - // Encode new free list entries - std::vector buffer; - - for (const auto& entry : list) { - // Pointer (8 bytes, little endian) - int64_t addr = entry.pointer.address(); - for (int i = 0; i < 8; i++) { - buffer.push_back(static_cast((addr >> (i * 8)) & 0xFF)); - } - // Size (4 bytes, little endian) - int32_t size = entry.size; - for (int i = 0; i < 4; i++) { - buffer.push_back(static_cast((size >> (i * 8)) & 0xFF)); - } - } - - fseek(file_, 0, SEEK_END); - fwrite(buffer.data(), 1, buffer.size(), file_); - } - - fflush(file_); -} - -void BinaryTable::truncateFile(int64_t newSize) { - // Actually truncate the file (matching Dart behavior) - DEBUG_PRINTLN("DEBUG: truncateFile - truncating to " << newSize); - fclose(file_); - - try { - std::filesystem::resize_file(filePath_, newSize); - DEBUG_PRINTLN("DEBUG: truncateFile - resize successful"); - } catch (const std::exception& e) { - DEBUG_PRINTLN("DEBUG: truncateFile - resize failed: " << e.what()); - (void)e; // Suppress unused variable warning in release builds - } - - file_ = fopen(filePath_.c_str(), "r+b"); - DEBUG_PRINTLN("DEBUG: truncateFile - reopen: success=" << (file_ != nullptr)); -} - -void BinaryTable::liftFreeList() { - DEBUG_PRINTLN("DEBUG: liftFreeList() called - this truncates the file!"); - if (freeListLifted_) { - throw std::runtime_error("Free list is already lifted"); - } - - // Cache the free list - freeListCache_ = getFreeList(); - - // Read count from header (position 14) - int32_t oldEntryCount = readInt32(14); - - if (oldEntryCount > 0) { - int32_t oldEntrySize = 8 + 4; - int32_t oldFreeListSize = oldEntryCount * oldEntrySize; // Just entries, no count - - // Remove free list entries from EOF - int64_t fileLength = getFileLength(); - truncateFile(fileLength - oldFreeListSize); - } - - // Clear count in header (position 14) - writeInt32(14, 0); - - freeListLifted_ = true; -} - -void BinaryTable::dropFreeList() { - DEBUG_PRINTLN("DEBUG: dropFreeList() called - this writes data back to file!"); - if (!freeListLifted_) { - throw std::runtime_error("Free list is not lifted"); - } - - freeListLifted_ = false; - setFreeList(freeListCache_); // This now writes count to header and entries to EOF - freeListCache_.clear(); -} - -void BinaryTable::antiFreeListScope(std::function fn) { - liftFreeList(); - try { - fn(); - } catch (...) { - dropFreeList(); - throw; - } - dropFreeList(); -} - -// Memory management -void BinaryTable::free(BT_Pointer pointer, int32_t size) { - DEBUG_PRINTLN("DEBUG: free() called with freeListLifted_=" << freeListLifted_); - if (!freeListLifted_) { - DEBUG_PRINTLN("DEBUG: free() THROWING EXCEPTION - free list not lifted!"); - throw std::runtime_error("Free list must be lifted before freeing memory"); - } - - if (pointer.isNull() || size <= 0) { - throw std::invalid_argument("Cannot free null pointer or zero size"); - } - - // Fetch current free list (matching Dart exactly) - std::vector freeList = freeListCache_; - - // Add new free entry - freeList.emplace_back(pointer, size); - - // Merge contiguous free entries (matching Dart logic exactly) - auto mergeContiguousFreeBlocks = [](std::vector freeList) -> std::vector { - if (freeList.empty()) return {}; - - // Create a copy and sort by address to check for contiguous blocks - std::vector sorted = freeList; - std::sort(sorted.begin(), sorted.end(), - [](const BT_FreeListEntry& a, const BT_FreeListEntry& b) { - return a.pointer.address() < b.pointer.address(); - }); - - std::vector merged; - - for (const auto& entry : sorted) { - if (merged.empty()) { - // First entry, just add it - merged.emplace_back(entry.pointer, entry.size); - } else { - auto& last = merged.back(); - - // Check if current entry is contiguous with the last merged entry - if (last.pointer.address() + last.size == entry.pointer.address()) { - // Merge: extend the size of the last entry - last.size += entry.size; - } else { - // Not contiguous, add as separate entry - merged.emplace_back(entry.pointer, entry.size); - } - } - } - - return merged; - }; - - freeList = mergeContiguousFreeBlocks(freeList); - - // Update free list - freeListCache_ = freeList; -} - -BT_Pointer BinaryTable::alloc(int32_t size) { - if (!freeListLifted_) { - throw std::runtime_error("Free list must be lifted before allocation"); - } - - // Find suitable free block - auto it = std::find_if(freeListCache_.begin(), freeListCache_.end(), - [size](const BT_FreeListEntry& entry) { - return entry.size >= size; - }); - - if (it == freeListCache_.end()) { - // No suitable block, allocate at end of file - int64_t allocPos = getFileLength(); - return BT_Pointer(allocPos); - } - - BT_Pointer result = it->pointer; - - if (it->size == size) { - // Exact fit, remove block - freeListCache_.erase(it); - } else { - // Split block - it->pointer = BT_Pointer(it->pointer.address() + size); - it->size -= size; - } - - return result; -} - -// Data operations -BT_Reference BinaryTable::getReference(const std::string& key) { - auto addressTable = getAddressTable(); - int64_t keyHash = hashString(key); - - auto it = addressTable.find(keyHash); - if (it == addressTable.end()) { - throw std::runtime_error("Key does not exist"); - } - - return BT_Reference(this, it->second); -} - -void BinaryTable::remove(const std::string& key) { - antiFreeListScope([&]() { - auto addressTable = getAddressTable(); - int64_t keyHash = hashString(key); - - auto it = addressTable.find(keyHash); - if (it == addressTable.end()) { - throw std::runtime_error("Key does not exist"); - } - - BT_Reference valueRef(this, it->second); - free(it->second, valueRef.size()); - - addressTable.erase(it); - setAddressTable(addressTable); - }); -} - -void BinaryTable::truncate() { - antiFreeListScope([&]() { - // Relocate address table - setAddressTable(getAddressTable()); - - // Check if last free block is at end of file - auto freeList = getFreeList(); - if (freeList.empty()) { - return; - } - - std::sort(freeList.begin(), freeList.end(), - [](const BT_FreeListEntry& a, const BT_FreeListEntry& b) { - return a.pointer.address() < b.pointer.address(); - }); - - const auto& lastEntry = freeList.back(); - int64_t fileEnd = getFileLength(); - int64_t expectedEnd = lastEntry.pointer.address() + lastEntry.size; - - if (expectedEnd == fileEnd) { - freeList.pop_back(); - setFreeList(freeList); - - // Actually truncate file (matching Dart behavior) - truncateFile(lastEntry.pointer.address()); - } - }); -} - -// Debug methods -void BinaryTable::debugAddressTable(const std::string& context) { - DEBUG_PRINT("\n=== DEBUG ADDRESS TABLE"); - if (!context.empty()) { - DEBUG_PRINT(" (" << context << ")"); - } - DEBUG_PRINTLN(" ==="); - - auto addressTable = getAddressTable(); - DEBUG_PRINTLN("Address table has " << addressTable.size() << " entries"); - - for (const auto& [hash, pointer] : addressTable) { - DEBUG_PRINTLN(" Hash " << hash << " -> Address " << pointer.address()); - - if (!pointer.isNull()) { - try { - uint8_t typeByte = readByte(pointer.address()); - DEBUG_PRINTLN(" Type byte: " << (int)typeByte); - - if (typeByte == 2) { // INTEGER - int32_t value = readInt32(pointer.address() + 1); - DEBUG_PRINTLN(" Value: " << value); - (void)value; // Suppress unused variable warning in release builds - } else { - DEBUG_PRINT(" Raw bytes: "); - for (int i = 0; i < 8; i++) { - uint8_t byte = readByte(pointer.address() + i); - DEBUG_PRINT(std::hex << (int)byte << " "); - (void)byte; // Suppress unused variable warning in release builds - } - DEBUG_PRINTLN(std::dec); - } - } catch (const std::exception& e) { - DEBUG_PRINTLN(" Error reading data: " << e.what()); - (void)e; // Suppress unused variable warning in release builds - } - } - } - DEBUG_PRINTLN("========================="); -} - -std::string binaryDump(const std::vector& data) { - std::ostringstream buffer; - - for (size_t i = 0; i < data.size(); i += 16) { - // Address - buffer << "0x" << std::hex << std::setfill('0') << std::setw(4) << std::uppercase << i - << " (" << std::dec << std::setfill(' ') << std::setw(4) << i << ") | "; - - // Hex bytes - for (int j = 0; j < 16; j++) { - if (i + j < data.size()) { - buffer << std::hex << std::setfill('0') << std::setw(2) << std::uppercase - << static_cast(data[i + j]) << " "; - } else { - buffer << " "; - } - } - - buffer << " | "; - - // Integer representation - for (int j = 0; j < 16; j++) { - if (i + j < data.size()) { - buffer << std::dec << std::setfill(' ') << std::setw(3) - << static_cast(data[i + j]) << " "; - } else { - buffer << " "; - } - } - - buffer << " | "; - - // ASCII representation - for (int j = 0; j < 16; j++) { - if (i + j < data.size()) { - uint8_t byte = data[i + j]; - if (byte >= 32 && byte <= 126) { - buffer << static_cast(byte); - } else { - buffer << '.'; - } - } - } - - buffer << " | "; - if (i + 16 < data.size()) { - buffer << "\n"; - } - } - - return buffer.str(); -} - -} // namespace bt - -int main() { - // Delete existing file if it exists - if (std::filesystem::exists("example.bin")) { - std::filesystem::remove("example.bin"); - } - - bt::BinaryTable table("example.bin"); - table.initialize(); - - // Read file for dump - std::ifstream file("example.bin", std::ios::binary); - std::vector fileData((std::istreambuf_iterator(file)), - std::istreambuf_iterator()); - file.close(); - - std::cout << "File dump:\n"; - std::cout << bt::binaryDump(fileData) << "\n"; - std::cout << "File size: " << fileData.size() << " bytes\n"; - std::cout << " \n"; - - // Set values - table.set("int_array", std::vector{6, 3, 9, 2, 5}); - table.set("float_array", std::vector{1.5f, 2.5f, 3.5f}); - table.set("empty", std::vector{}); - - // Get arrays and modify elements - auto intArray = table.getArray("int_array"); - auto floatArray = table.getArray("float_array"); - - intArray.set(0, 1); - floatArray.set(1, 4.5f); - - std::cout << "int_array pointer: " << intArray.getPointer().address() << "\n"; - std::cout << "float_array pointer: " << floatArray.getPointer().address() << "\n"; - - intArray.add(10); - floatArray.add(5.5f); - - intArray.addAll({420, 69, 1337, 1738}); - floatArray.addAll({6.5f, 7.5f, 8.5f}); - - // Test the fetchSublist method - auto intArraySublist = intArray.fetchSublist(0, 3); - auto floatArraySublist = floatArray.fetchSublist(0, 2); - std::cout << "Sublist int_array (0-3): "; - for (auto val : intArraySublist) { - std::cout << val << " "; - } - std::cout << "\n"; - std::cout << "Sublist float_array (0-2): "; - for (auto val : floatArraySublist) { - std::cout << val << " "; - } - std::cout << "\n"; - - // Read back values - auto readback1 = table.get>("int_array"); - auto readback2 = table.get>("float_array"); - auto readback3 = table.get>("empty"); - - std::cout << "Readback1: "; - for (auto val : readback1) { - std::cout << val << " "; - } - std::cout << "\n"; - - std::cout << "Readback2: "; - for (auto val : readback2) { - std::cout << val << " "; - } - std::cout << "\n"; - - std::cout << "Readback3: "; - for (auto val : readback3) { - std::cout << val << " "; - } - std::cout << "\n"; - - std::cout << " \n"; - - // Final file dump - std::ifstream finalFile("example.bin", std::ios::binary); - std::vector finalFileData((std::istreambuf_iterator(finalFile)), - std::istreambuf_iterator()); - finalFile.close(); - - std::cout << "File dump:\n"; - std::cout << bt::binaryDump(finalFileData) << "\n"; - std::cout << "File size: " << finalFileData.size() << " bytes\n"; - - return 0; -} \ No newline at end of file diff --git a/cpp/binary_table.h b/cpp/binary_table.h deleted file mode 100644 index c306cf6..0000000 --- a/cpp/binary_table.h +++ /dev/null @@ -1,305 +0,0 @@ -/* - -/$$$$$$ /$$ /$$ /$$$$$$$ /$$$$$$$$ /$$ /$$ /$$$$$ /$$$$$$ /$$ /$$ /$$$$$$$$ /$$$$$$$$ -|_ $$_/| $$$ /$$$| $$__ $$| $$_____/| $$$ | $$ |__ $$|_ $$_/ | $$$ | $$| $$_____/|__ $$__/ - | $$ | $$$$ /$$$$| $$ \ $$| $$ | $$$$| $$ | $$ | $$ | $$$$| $$| $$ | $$ - | $$ | $$ $$/$$ $$| $$$$$$$ | $$$$$ | $$ $$ $$ | $$ | $$ | $$ $$ $$| $$$$$ | $$ - | $$ | $$ $$$| $$| $$__ $$| $$__/ | $$ $$$$ /$$ | $$ | $$ | $$ $$$$| $$__/ | $$ - | $$ | $$\ $ | $$| $$ \ $$| $$ | $$\ $$$| $$ | $$ | $$ | $$\ $$$| $$ | $$ - /$$$$$$| $$ \/ | $$| $$$$$$$/| $$$$$$$$| $$ \ $$| $$$$$$/ /$$$$$$ /$$| $$ \ $$| $$$$$$$$ | $$ -|______/|__/ |__/|_______/ |________/|__/ \__/ \______/ |______/|__/|__/ \__/|________/ |__/ - -� 2025-26 by Benjamin Watt of IMBENJI.NET LIMITED - All rights reserved. - -Use of this source code is governed by the Business Source License 1.1 that can be found in the LICENSE file. - -This file is part of the SweepStore (formerly Binary Table) package for C++. - - */ - -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -// Debug control - comment out this line to disable all debug output -// #define ENABLE_DEBUG 1 - -#ifdef ENABLE_DEBUG - #define DEBUG_PRINT(x) std::cout << x - #define DEBUG_PRINTLN(x) std::cout << x << std::endl -#else - #define DEBUG_PRINT(x) - #define DEBUG_PRINTLN(x) -#endif - -namespace bt { - -// Forward declarations -class BinaryTable; -class BT_Reference; -template class BT_UniformArray; - -// Type enumeration matching Dart version -enum class BT_Type : uint8_t { - POINTER = 0, - ADDRESS_TABLE = 1, - INTEGER = 2, - FLOAT = 3, - STRING = 4, - INTEGER_ARRAY = 5, - FLOAT_ARRAY = 6 -}; - -// Size mapping for types -constexpr int getTypeSize(BT_Type type) { - switch (type) { - case BT_Type::POINTER: return 8; - case BT_Type::ADDRESS_TABLE: return -1; - case BT_Type::INTEGER: return 4; - case BT_Type::FLOAT: return 4; - case BT_Type::STRING: return -1; - case BT_Type::INTEGER_ARRAY: return -1; - case BT_Type::FLOAT_ARRAY: return -1; - } - return -1; -} - -// Check if type is array type -constexpr bool isArrayType(BT_Type type) { - return type == BT_Type::INTEGER_ARRAY || type == BT_Type::FLOAT_ARRAY; -} - -// Type deduction helpers -template -constexpr BT_Type getTypeFromValue() { - if constexpr (std::is_same_v || std::is_same_v) { - return BT_Type::INTEGER; - } else if constexpr (std::is_same_v) { - return BT_Type::FLOAT; - } else if constexpr (std::is_same_v) { - return BT_Type::STRING; - } else if constexpr (std::is_same_v> || std::is_same_v>) { - return BT_Type::INTEGER_ARRAY; - } else if constexpr (std::is_same_v>) { - return BT_Type::FLOAT_ARRAY; - } else { - static_assert(sizeof(T) == 0, "Unsupported type"); - } -} - -// Pointer class -class BT_Pointer { -private: - int64_t address_; - -public: - explicit BT_Pointer(int64_t address = -1) : address_(address) {} - - bool isNull() const { return address_ == -1; } - int64_t address() const { return address_; } - - bool operator==(const BT_Pointer& other) const { - return address_ == other.address_; - } - - bool operator!=(const BT_Pointer& other) const { - return !(*this == other); - } -}; - -// Null pointer constant -const BT_Pointer BT_Null{-1}; - -// Free list entry -struct BT_FreeListEntry { - BT_Pointer pointer; - int32_t size; - - BT_FreeListEntry(BT_Pointer ptr, int32_t sz) : pointer(ptr), size(sz) {} -}; - -// Value encoding functions -std::vector encodeValue(const int32_t& value); -std::vector encodeValue(const float& value); -std::vector encodeValue(const std::string& value); -std::vector encodeValue(const std::vector& value); -std::vector encodeValue(const std::vector& value); - -// Template wrapper for encoding -template -std::vector encodeValue(const T& value) { - return encodeValue(value); -} - -// Reference class for handling stored values -class BT_Reference { -protected: - BinaryTable* table_; - BT_Pointer pointer_; - -public: - BT_Reference(BinaryTable* table, BT_Pointer pointer); - - template - T decodeValue(); - - int32_t size() const; - BT_Type getType() const; - bool isNull() const { return pointer_.isNull(); } - BT_Pointer getPointer() const { return pointer_; } -}; - -// Uniform array class template -template -class BT_UniformArray : public BT_Reference { -public: - BT_UniformArray(BinaryTable* table, BT_Pointer pointer) : BT_Reference(table, pointer) {} - - int32_t length() const; - T operator[](int32_t index) const; - void set(int32_t index, const T& value); - void add(const T& value); - void addAll(const std::vector& values); - std::vector fetchSublist(int32_t start = 0, int32_t end = -1); -}; - -// Main BinaryTable class -class BinaryTable { -private: - FILE* file_; - std::string filePath_; - - // Free list management - bool freeListLifted_; - std::vector freeListCache_; - - - // Internal methods - std::unordered_map getAddressTable(); - void setAddressTable(const std::unordered_map& table); - std::vector getFreeList(); - void setFreeList(const std::vector& list); - int64_t hashString(const std::string& str) const; - - void truncateFile(int64_t newSize); - void antiFreeListScope(std::function fn); - void free(BT_Pointer pointer, int32_t size); - BT_Pointer alloc(int32_t size); - - // File I/O helpers - int32_t readInt32(int64_t position); - float readFloat32(int64_t position); - int64_t readInt64(int64_t position); - uint8_t readByte(int64_t position); - std::vector readBytes(int64_t position, int32_t count); - - void writeInt32(int64_t position, int32_t value); - void writeFloat32(int64_t position, float value); - void writeInt64(int64_t position, int64_t value); - void writeByte(int64_t position, uint8_t value); - void writeBytes(int64_t position, const std::vector& data); - -public: - explicit BinaryTable(const std::string& path); - ~BinaryTable(); - - void initialize(); - - // Memory management - void liftFreeList(); - void dropFreeList(); - - // Data operations - template - void set(const std::string& key, const T& value); - - template - T get(const std::string& key); - - BT_Reference getReference(const std::string& key); - - template - BT_UniformArray getArray(const std::string& key); - - void remove(const std::string& key); - void truncate(); - - // Debug methods - void debugAddressTable(const std::string& context = ""); - - // File access for reference classes - friend class BT_Reference; - template friend class BT_UniformArray; - - int64_t getFileLength(); - void setFilePosition(int64_t position); -}; - -// Template specializations for decodeValue -template<> int32_t BT_Reference::decodeValue(); -template<> float BT_Reference::decodeValue(); -template<> std::string BT_Reference::decodeValue(); -template<> std::vector BT_Reference::decodeValue>(); -template<> std::vector BT_Reference::decodeValue>(); -template<> BT_UniformArray BT_Reference::decodeValue>(); -template<> BT_UniformArray BT_Reference::decodeValue>(); - -// Template method implementations for BinaryTable -template -void BinaryTable::set(const std::string& key, const T& value) { - antiFreeListScope([&]() { - auto addressTable = getAddressTable(); - int64_t keyHash = hashString(key); - - if (addressTable.find(keyHash) != addressTable.end()) { - throw std::runtime_error("Key already exists"); - } - - auto valueBuffer = encodeValue(value); - BT_Pointer valueAddress = alloc(static_cast(valueBuffer.size())); - - writeBytes(valueAddress.address(), valueBuffer); - - addressTable[keyHash] = valueAddress; - setAddressTable(addressTable); - }); -} - -template -T BinaryTable::get(const std::string& key) { - auto addressTable = getAddressTable(); - int64_t keyHash = hashString(key); - - auto it = addressTable.find(keyHash); - if (it == addressTable.end()) { - throw std::runtime_error("Key does not exist"); - } - - BT_Reference valueRef(this, it->second); - return valueRef.decodeValue(); -} - -template -BT_UniformArray BinaryTable::getArray(const std::string& key) { - auto addressTable = getAddressTable(); - int64_t keyHash = hashString(key); - - auto it = addressTable.find(keyHash); - if (it == addressTable.end()) { - throw std::runtime_error("Key does not exist"); - } - - return BT_UniformArray(this, it->second); -} - -} // namespace bt \ No newline at end of file diff --git a/cpp/src/Private/sweepstore/concurrency.cpp b/cpp/src/Private/sweepstore/concurrency.cpp new file mode 100644 index 0000000..37cd2d6 --- /dev/null +++ b/cpp/src/Private/sweepstore/concurrency.cpp @@ -0,0 +1,248 @@ + + +#include +#include + +#include "sweepstore/concurrency.h" + +#include +#include + +#include "sweepstore/header.h" +#include "sweepstore/utils/helpers.h" +#include "sweepstore/utils/file_handle.h" + + +uint64_t getRandomOffset(uint64_t maxValue) { + static std::random_device rd; + static std::mt19937_64 gen(rd()); + std::uniform_int_distribution dist(0, maxValue); + return dist(gen); +} + +int randomId() { + // mix timestamp with random for better uniqueness + // keep it positive to avoid signed int issues when storing + auto now = std::chrono::system_clock::now(); + auto millis = std::chrono::duration_cast(now.time_since_epoch()).count(); + int32_t time = static_cast(millis & 0xFFFFFFFF); // Get lower 32 bits + int32_t random = static_cast(getRandomOffset(0x7FFFFFFF)); // 0 to 0x7FFFFFFF + return (time ^ random) & 0x7FFFFFFF; +} + +void SweepstoreConcurrency::spawnTicket(std::string filePath, + const SweepstoreTicketOperation& operation, + const uint32_t keyHash, + const uint32_t targetSize, + const std::function onApproved, + std::string debugLabel +) { + + SweepstoreFileHandle file(filePath, std::ios::binary | std::ios::in | std::ios::out); + + /* + Useful Functions + */ + + /// Logging function + auto log = [&](const std::string &message) { + std::string prefix = !debugLabel.empty() ? "\033[38;5;208m[Ticket Spawner - " + debugLabel + "]:\033[0m " : "\033[38;5;208m[Ticket Spawner]:\033[0m "; + debugPrint(prefix + message); + }; + + // Sleep with variance (additive only) + auto varySleep = [&](std::chrono::nanoseconds minSleepDuration, std::chrono::nanoseconds variance) { + if (variance.count() <= 0) { + preciseSleep(minSleepDuration); + } else { + // Generate random duration within variance + uint64_t randomOffset = getRandomOffset(variance.count()); + preciseSleep(minSleepDuration + std::chrono::nanoseconds(randomOffset)); + } + }; + + // Exponential sleep + std::unordered_map expSleepTracker = {}; + auto expSleep = [&expSleepTracker](const std::string& label) { + int count = expSleepTracker[label]; // defaults to 0 if not found + int sleepTime = (1 << count); // Exponential backoff + sleepTime = std::max(1, std::min(sleepTime, 1000)); // Clamp between 1ms and 1000ms + preciseSleep(std::chrono::microseconds(sleepTime * 5000)); + expSleepTracker[label] = count + 1; + }; + + // Get the header(s) + SweepstoreHeader header(file); + SweepstoreConcurrencyHeader concurrencyHeader(file); + + /* + Ticket Acquisition + */ + auto acquireTicket = [&](uint32_t newIdentifier) -> SweepstoreWorkerTicket { + + // Reduce the chance of race condition + varySleep(std::chrono::microseconds(500), std::chrono::microseconds(200)); + + uint32_t ticketIndex = -1u; + + while (true) { + + uint32_t concurrentWorkers = concurrencyHeader.readNumberOfWorkers(); + + for (uint32_t i = 0; i < concurrentWorkers; i++) { + + SweepstoreWorkerTicket ticket = SweepstoreWorkerTicket(i, file); + + if (!ticket.writable()) { + continue; + } + + SweepstoreWorkerTicketSnapshot snapshot = ticket.snapshot(); + + int identifier = snapshot.identifier; + + bool identifier_unassigned = identifier == 0; + bool stale_heartbeat = millisecondsSinceEpoch32() - snapshot.workerHeartbeat > STALE_HEARTBEAT_THRESHOLD_MS; + bool is_free = snapshot.state == SweepstoreTicketState::FREE; + + if (identifier_unassigned && stale_heartbeat && is_free) { + snapshot.identifier = newIdentifier; + snapshot.workerHeartbeat = millisecondsSinceEpoch32(); + snapshot.state = SweepstoreTicketState::WAITING; + ticket.write(snapshot); + ticketIndex = i; + break; + } + } + preciseSleep(std::chrono::milliseconds(2)); + + // Ensure we still own the ticket - if not, reset and try again + if (ticketIndex != -1u) { + SweepstoreWorkerTicketSnapshot verifySnapshot = concurrencyHeader[ticketIndex].snapshot(); + + if (verifySnapshot.identifier != newIdentifier) { + ticketIndex = -1; // Lost the ticket, try again + } else { + log("Acquired ticket " + std::to_string(ticketIndex) + " with identifier " + std::to_string(newIdentifier) + "."); + return concurrencyHeader[ticketIndex]; + } + } + + expSleep("acquireTicket"); + } + + }; + + + uint32_t myIdentifier = randomId(); + + SweepstoreWorkerTicket myTicket = acquireTicket(myIdentifier); + SweepstoreWorkerTicketSnapshot mySnapshot = myTicket.snapshot(); + mySnapshot.workerHeartbeat = millisecondsSinceEpoch32(); + mySnapshot.state = SweepstoreTicketState::WAITING; + mySnapshot.operation = operation; + mySnapshot.keyHash = keyHash; + mySnapshot.targetSize = targetSize; + myTicket.write(mySnapshot); + + // Wait for approval + while (true) { + + SweepstoreWorkerTicketSnapshot snapshot = myTicket.snapshot(); + + // Update heartbeat + uint32_t currentTime = millisecondsSinceEpoch32(); + if (currentTime - snapshot.workerHeartbeat > 700) { + snapshot.workerHeartbeat = currentTime; + myTicket.write(snapshot); + } + + // Check if we still own the ticket + if (snapshot.identifier != myIdentifier) { + + preciseSleep(std::chrono::milliseconds(10)); + + // Re-verify we lost the ticket + SweepstoreWorkerTicketSnapshot recheckSnapshot = myTicket.snapshot(); + if (recheckSnapshot.identifier != myIdentifier) { + log("Lost ownership of ticket " + std::to_string(myTicket.getTicketIndex()) + ", was expecting identifier " + std::to_string(myIdentifier) + " but found " + std::to_string(recheckSnapshot.identifier) + "."); + + // ReSharper disable once CppDFAInfiniteRecursion + return spawnTicket( + filePath, + operation, + keyHash, + targetSize, + onApproved, + debugLabel + ); + } + + // False alarm, continue waiting + log("False alarm, still own ticket " + std::to_string(myTicket.getTicketIndex()) + "."); + snapshot = recheckSnapshot; + } + + if (snapshot.state == SweepstoreTicketState::APPROVED) { + snapshot.state = SweepstoreTicketState::EXECUTING; + myTicket.write(snapshot); + + onApproved(); + + snapshot.state = SweepstoreTicketState::COMPLETED; + myTicket.write(snapshot); + + break; + } + + varySleep(std::chrono::microseconds(500), std::chrono::microseconds(200)); + } + + // std::cout << "\033[38;5;82m[Ticket Spawner - " << debugLabel << "]:\033[0m Completed ticket " << myTicket.getTicketIndex() << "." << std::endl; +} + +void SweepstoreConcurrency::initialiseMaster(std::string filePath) { + + auto log = [&](const std::string &message) { + debugPrint("\033[38;5;33m[Concurrency Master]:\033[0m " + message); + }; + + SweepstoreFileHandle file(filePath, std::ios::binary | std::ios::in | std::ios::out); + + SweepstoreHeader header(file); + SweepstoreConcurrencyHeader concurrencyHeader(file); + + while (true) { + + int concurrentWorkers = concurrencyHeader.readNumberOfWorkers(); + + for (uint32_t i = 0; i < concurrentWorkers; i++) { + + SweepstoreWorkerTicket ticket(i, file); + SweepstoreWorkerTicketSnapshot snapshot = ticket.snapshot(); + + if (snapshot.state == WAITING) { + log("Found waiting ticket " + std::to_string(i) + "(Key Hash: " + std::to_string(snapshot.keyHash) + ")..."); + + // Approve the ticket + snapshot.state = APPROVED; + ticket.write(snapshot); + log("Approved ticket " + std::to_string(i) + "."); + } else if (snapshot.state == SweepstoreTicketState::COMPLETED) { + log("Ticket " + std::to_string(i) + " has completed. Resetting..."); + + // Reset the ticket + SweepstoreWorkerTicketSnapshot cleanSnapshot = SweepstoreWorkerTicketSnapshot(); + ticket.write(cleanSnapshot); + log("Reset ticket " + std::to_string(i) + "."); + } + + // Handle stale tickets + uint32_t currentTime = millisecondsSinceEpoch32(); + } + + preciseSleep(std::chrono::milliseconds(1)); + } + +} + diff --git a/cpp/src/Private/sweepstore/header.cpp b/cpp/src/Private/sweepstore/header.cpp new file mode 100644 index 0000000..f9a3652 --- /dev/null +++ b/cpp/src/Private/sweepstore/header.cpp @@ -0,0 +1,220 @@ + +#include "sweepstore/header.h" + +#include "sweepstore/utils/file_lock.h" +#include "sweepstore/utils/helpers.h" + +std::string SweepstoreHeader::readMagicNumber() { + file->seekg(0, std::ios::beg); + char buffer[4]; + file->read(buffer, 4); + return std::string(buffer, 4); +} + +void SweepstoreHeader::writeMagicNumber(const std::string& magicNumber) { + if (magicNumber.size() != 4) { + throw std::invalid_argument("Magic number must be exactly 4 characters long."); + } + file->seekp(0, std::ios::beg); + file->write(magicNumber.c_str(), 4); +} + +std::string SweepstoreHeader::readVersion() { + file->seekg(4, std::ios::beg); + char buffer[12]; + file->read(buffer, 12); + + // Trim leading and trailing spaces + std::string version(buffer, 12); + version = trim(version); + + return version; +} + +void SweepstoreHeader::writeVersion(const std::string& version) { + if (version.size() > 11) { + throw std::invalid_argument("Version string must be at most 11 characters long."); + } + // Pad 1 space to the beginning + // And pad to end to make it 12 bytes + std::string paddedVersion = " " + version; + paddedVersion.resize(12, ' '); + + file->seekp(4, std::ios::beg); + file->write(paddedVersion.c_str(), 12); +} + +SweepstorePointer SweepstoreHeader::readAddressTablePointer() { + file->seekg(16, std::ios::beg); + int64_t address; + file->read(reinterpret_cast(&address), sizeof(address)); + return address; // Implicit conversion to SweepstorePointer +} + +void SweepstoreHeader::writeAddressTablePointer(const SweepstorePointer& ptr) { + file->seekp(16, std::ios::beg); + int64_t address = ptr; + file->write(reinterpret_cast(&address), sizeof(address)); +} + +uint32_t SweepstoreHeader::readFreeListCount() { + file->seekg(24, std::ios::beg); + uint32_t count; + file->read(reinterpret_cast(&count), sizeof(count)); + return count; +} + +void SweepstoreHeader::writeFreeListCount(uint32_t count) { + file->seekp(24, std::ios::beg); + file->write(reinterpret_cast(&count), sizeof(count)); +} + +bool SweepstoreHeader::readIsFreeListLifted() { + file->seekg(28, std::ios::beg); + char flag; + file->read(&flag, sizeof(flag)); + return flag != 0; +} + +void SweepstoreHeader::writeIsFreeListLifted(bool isLifted) { + file->seekp(28, std::ios::beg); + char flag = isLifted ? 1 : 0; + file->write(&flag, sizeof(flag)); +} + +void SweepstoreHeader::initialise() { + writeMagicNumber("SWPT"); + writeVersion("undefined"); + writeAddressTablePointer(SweepstorePointer::NULL_PTR); + writeFreeListCount(0); + writeIsFreeListLifted(false); + file->flush(); +} + +uint64_t SweepstoreConcurrencyHeader::readMasterIdentifier() { + file->seekg(29, std::ios::beg); + uint64_t identifier; + file->read(reinterpret_cast(&identifier), sizeof(identifier)); + return identifier; +} + +void SweepstoreConcurrencyHeader::writeMasterIdentifier(uint64_t identifier) { + file->seekp(29, std::ios::beg); + file->write(reinterpret_cast(&identifier), sizeof(identifier)); +} + +uint32_t SweepstoreConcurrencyHeader::readMasterHeartbeat() { + file->seekg(37, std::ios::beg); + uint32_t heartbeat; + file->read(reinterpret_cast(&heartbeat), sizeof(heartbeat)); + return heartbeat; +} + +void SweepstoreConcurrencyHeader::writeMasterHeartbeat(uint32_t heartbeat) { + file->seekp(37, std::ios::beg); + file->write(reinterpret_cast(&heartbeat), sizeof(heartbeat)); +} + +uint32_t SweepstoreConcurrencyHeader::readNumberOfWorkers() { + file->seekg(41, std::ios::beg); + uint32_t numWorkers; + file->read(reinterpret_cast(&numWorkers), sizeof(numWorkers)); + return numWorkers; +} + +void SweepstoreConcurrencyHeader::writeNumberOfWorkers(uint32_t numWorkers) { + file->seekp(41, std::ios::beg); + file->write(reinterpret_cast(&numWorkers), sizeof(numWorkers)); +} + +bool SweepstoreConcurrencyHeader::readIsReadAllowed() { + file->seekg(45, std::ios::beg); + char flag; + file->read(&flag, sizeof(flag)); + return flag != 0; +} + +void SweepstoreConcurrencyHeader::writeIsReadAllowed(bool isAllowed) { + file->seekp(45, std::ios::beg); + char flag = isAllowed ? 1 : 0; + file->write(&flag, sizeof(flag)); +} + +void SweepstoreConcurrencyHeader::initialise(int concurrentWorkers) { + writeMasterIdentifier(0); + writeMasterHeartbeat(0); + writeNumberOfWorkers(concurrentWorkers); + writeIsReadAllowed(true); + for (uint32_t i = 0; i < readNumberOfWorkers(); i++) { + SweepstoreWorkerTicketSnapshot ticket = SweepstoreWorkerTicketSnapshot(); + ticket.identifier = 0; + ticket.workerHeartbeat = 0; + ticket.state = SweepstoreTicketState::FREE; + ticket.operation = SweepstoreTicketOperation::NONE; + ticket.keyHash = 0; + ticket.targetAddress = SweepstorePointer::NULL_PTR; + ticket.targetSize = 0; + + SweepstoreWorkerTicket ticketWriter = SweepstoreWorkerTicket(i, file); + ticketWriter.write(ticket); + } + file->flush(); +} + +void SweepstoreWorkerTicket::write(SweepstoreWorkerTicketSnapshot &snapshot) { + RandomAccessMemory buffer; + + SweepstoreFileLock lock(file.getPath(), SweepstoreFileLock::Mode::Exclusive); + SweepstoreFileLock::Scoped scopedLock(lock); + + buffer.setPositionSync(0); + buffer.writeIntSync(snapshot.identifier, 4); + buffer.writeIntSync(snapshot.workerHeartbeat, 4); + buffer.writeIntSync(static_cast(snapshot.state), 1); + buffer.writeIntSync(static_cast(snapshot.operation), 1); + buffer.writeUIntSync(snapshot.keyHash, 8); + buffer.writePointerSync(snapshot.targetAddress, 8); + buffer.writeUIntSync(snapshot.targetSize, 4); + + // Pad the rest with zeros if necessary + while (buffer.length() < TICKET_SIZE) { + buffer.writeIntSync(0, 1); + } + + // Prepare data + buffer.setPositionSync(0); + std::vector data = buffer.readSync(buffer.length()); + char* dataPtr = reinterpret_cast(data.data()); + + // Write to file + file->seekp(getOffset()); + file->write(dataPtr, data.size()); + file->flush(); +} + +bool SweepstoreWorkerTicket::writable() { + SweepstoreFileLock lock(file.getPath(), SweepstoreFileLock::Mode::Exclusive); + return lock.isLocked() == false; +} + +SweepstoreWorkerTicketSnapshot SweepstoreWorkerTicket::snapshot() { + SweepstoreFileLock lock(file.getPath(), SweepstoreFileLock::Mode::Shared); + lock.lock(); + file->seekg(getOffset()); + std::unique_ptr buffer(new char[TICKET_SIZE]); + file->read(buffer.get(), TICKET_SIZE); + lock.unlock(); + RandomAccessMemory ram(reinterpret_cast(buffer.get()), TICKET_SIZE); + + SweepstoreWorkerTicketSnapshot snapshot; + ram.setPositionSync(0); + snapshot.identifier = ram.readUIntSync(4); + snapshot.workerHeartbeat = ram.readUIntSync(4); + snapshot.state = static_cast(ram.readUIntSync(1)); + snapshot.operation = static_cast(ram.readUIntSync(1)); + snapshot.keyHash = ram.readUIntSync(8); + snapshot.targetAddress = ram.readPointerSync(8); + snapshot.targetSize = ram.readUIntSync(4); + + return snapshot; +} diff --git a/cpp/src/Private/sweepstore/structures.cpp b/cpp/src/Private/sweepstore/structures.cpp new file mode 100644 index 0000000..764742e --- /dev/null +++ b/cpp/src/Private/sweepstore/structures.cpp @@ -0,0 +1,13 @@ + + + +#include "sweepstore/structures.h" +const SweepstorePointer SweepstorePointer::NULL_PTR = SweepstorePointer(UINT64_MAX); + +bool SweepstorePointer::operator==(const SweepstorePointer &p) { + if (this->address == p.address) { + return true; + } else { + return false; + } +} diff --git a/cpp/src/Private/sweepstore/sweepstore.cpp b/cpp/src/Private/sweepstore/sweepstore.cpp new file mode 100644 index 0000000..2c45f39 --- /dev/null +++ b/cpp/src/Private/sweepstore/sweepstore.cpp @@ -0,0 +1,21 @@ +// +// Created by Benjamin Watt on 24/11/2025. +// + +#include "sweepstore/sweepstore.h" + +#include +#include +#include +#include + +#include "sweepstore/utils/helpers.h" +#include "sweepstore/utils/file_handle.h" + +void Sweepstore::initialise(int concurrentWorkers) { + header->initialise(); + header->writeVersion("1.1.0.2"); + concurrencyHeader->initialise(concurrentWorkers); + + debugPrint("Version: " + header->readVersion()); +} \ No newline at end of file diff --git a/cpp/src/Public/sweepstore/concurrency.h b/cpp/src/Public/sweepstore/concurrency.h new file mode 100644 index 0000000..8305818 --- /dev/null +++ b/cpp/src/Public/sweepstore/concurrency.h @@ -0,0 +1,30 @@ +#pragma once + +#include +#include +#include +#include + +#define STALE_HEARTBEAT_THRESHOLD_MS 5000 + +enum SweepstoreTicketOperation : int; + +namespace SweepstoreConcurrency { + + void spawnTicket(std::string filePath, + const SweepstoreTicketOperation& operation, + const uint32_t keyHash, + const uint32_t targetSize, + const std::function onApproved, + std::string debugLabel = "" + ); + + void initialiseMaster(std::string filePath); + + inline void initialiseMasterAsync(std::string filePath) { + std::thread([&filePath]() { + initialiseMaster(filePath); + }).detach(); + } + +} \ No newline at end of file diff --git a/cpp/src/Public/sweepstore/header.h b/cpp/src/Public/sweepstore/header.h new file mode 100644 index 0000000..b042f10 --- /dev/null +++ b/cpp/src/Public/sweepstore/header.h @@ -0,0 +1,143 @@ +#pragma once + +#include + +#include "structures.h" +#include "utils/file_handle.h" + +constexpr int roundToNearest16(int number) { + return (number + 15) & ~15; +} + +class SweepstoreHeader { + +private: + SweepstoreFileHandle& file; + +public: + explicit SweepstoreHeader(SweepstoreFileHandle &fileStream) : file(fileStream) {} + + // Offset 0 - 4 bytes + std::string readMagicNumber(); + void writeMagicNumber(const std::string& magicNumber); + + // Offset 4 - 12 bytes + std::string readVersion(); + void writeVersion(const std::string& version); + + // Offset 16 - 8 bytes + SweepstorePointer readAddressTablePointer(); + void writeAddressTablePointer(const SweepstorePointer& ptr); + + // Offset 24 - 4 bytes + uint32_t readFreeListCount(); + void writeFreeListCount(uint32_t count); + + // Offset 28 - 1 byte + bool readIsFreeListLifted(); + void writeIsFreeListLifted(bool isLifted); + + /** + * Initialises the header with default values. + */ + void initialise(); + +}; + +constexpr int SWEEPSTORE_COMBINED_STATIC_HEADER_SIZE = roundToNearest16(46); + +struct SweepstoreWorkerTicketSnapshot { + + SweepstoreWorkerTicketSnapshot() : + identifier(0), + workerHeartbeat(0), + state(SweepstoreTicketState::FREE), + operation(SweepstoreTicketOperation::NONE), + keyHash(0), + targetAddress(SweepstorePointer::NULL_PTR), + targetSize(0) {} + + // Offset 0 - 4 bytes + uint32_t identifier; + + // Offset 4 - 4 bytes + uint32_t workerHeartbeat; + + // Offset 8 - 1 byte + SweepstoreTicketState state; + + // Offset 9 - 1 byte + SweepstoreTicketOperation operation; + + // Offset 10 - 8 bytes + uint64_t keyHash; + + // Offset 18 - 8 bytes + SweepstorePointer targetAddress; + + // Offset 26 - 4 bytes + uint32_t targetSize; +}; + +class SweepstoreWorkerTicket { + + SweepstoreFileHandle& file; + uint32_t ticketIndex; + + uint64_t getOffset() const { + return SWEEPSTORE_COMBINED_STATIC_HEADER_SIZE + (ticketIndex * TICKET_SIZE); + } + +public: + static constexpr int TICKET_SIZE = roundToNearest16(29); + + SweepstoreWorkerTicket(const uint32_t index, SweepstoreFileHandle& fileStream) : + file(fileStream), + ticketIndex(index) {} + + int getTicketIndex() const { + return ticketIndex; + } + + void write(SweepstoreWorkerTicketSnapshot &snapshot); + + bool writable(); + + SweepstoreWorkerTicketSnapshot snapshot(); + +}; + +class SweepstoreConcurrencyHeader { + +private: + SweepstoreFileHandle& file; + +public: + explicit SweepstoreConcurrencyHeader(SweepstoreFileHandle &fileStream) : file(fileStream) {} + + // Offset 29 - 8 bytes + uint64_t readMasterIdentifier(); + void writeMasterIdentifier(uint64_t identifier); + + // Offset 37 - 4 bytes + uint32_t readMasterHeartbeat(); + void writeMasterHeartbeat(uint32_t heartbeat); + + // Offset 41 - 4 bytes + uint32_t readNumberOfWorkers(); + void writeNumberOfWorkers(uint32_t numWorkers); + + // Offset 45 - 1 byte + bool readIsReadAllowed(); + void writeIsReadAllowed(bool isAllowed); + + /** + * Initialises the concurrency header with default values. + */ + void initialise(int concurrentWorkers); + + SweepstoreWorkerTicket operator[](const uint32_t index) const { + return SweepstoreWorkerTicket(index, file); + } + +}; \ No newline at end of file diff --git a/cpp/src/Public/sweepstore/structures.h b/cpp/src/Public/sweepstore/structures.h new file mode 100644 index 0000000..1a1d475 --- /dev/null +++ b/cpp/src/Public/sweepstore/structures.h @@ -0,0 +1,39 @@ +#pragma once +#include + +class SweepstorePointer { + +private: + uint64_t address; + +public: + static const SweepstorePointer NULL_PTR; + + SweepstorePointer(int64_t addr) : address(addr) {} + + bool isNull() { + return this->address == UINT64_MAX; + } + + bool operator==(const SweepstorePointer &p); + + // Implicit conversion to uint64_t + operator uint64_t() const { + return address; + } +}; + +enum SweepstoreTicketState { + FREE, + WAITING, + APPROVED, + EXECUTING, + COMPLETED, +}; + +enum SweepstoreTicketOperation : int { + NONE, + READ, + MODIFY, + WRITE +}; \ No newline at end of file diff --git a/cpp/src/Public/sweepstore/sweepstore.h b/cpp/src/Public/sweepstore/sweepstore.h new file mode 100644 index 0000000..e542a2f --- /dev/null +++ b/cpp/src/Public/sweepstore/sweepstore.h @@ -0,0 +1,67 @@ +#pragma once + +#include + +#include "concurrency.h" +#include "header.h" +#include "utils/helpers.h" +#include "utils/file_handle.h" + +class Sweepstore { + +private: + std::string filePath; + SweepstoreFileHandle file; + SweepstoreHeader* header; + SweepstoreConcurrencyHeader* concurrencyHeader; + +public: + + Sweepstore(const std::string& filePath) : filePath(filePath), file(filePath, std::ios::binary | std::ios::in | std::ios::out | std::ios::trunc) { + header = new SweepstoreHeader(file); + concurrencyHeader = new SweepstoreConcurrencyHeader(file); + } + + ~Sweepstore() { + delete header; + delete concurrencyHeader; + file.close(); + } + + void initialise(int concurrentWorkers = 4); + + SweepstoreConcurrencyHeader* getConcurrencyHeader() { + return concurrencyHeader; + } + + class Proxy { + Sweepstore* sweepstore; + std::string key; + public: + Proxy(Sweepstore* sweepstoreIn, const std::string& keyIn) + : sweepstore(sweepstoreIn), key(keyIn) {} + + template + operator T() { + // Get value from sweepstore + throw std::runtime_error("Not implemented: Reading values from Sweepstore by key is not implemented."); + } + + template + void operator=(const T& value) { + SweepstoreConcurrency::spawnTicket(sweepstore->filePath, + SweepstoreTicketOperation::WRITE, + bt_hash(key), + sizeof(T), + [this, key = this->key, &value]() { + + } + ); + }; + }; + + Proxy operator[](const std::string& key) { + return Proxy(this, key); + } + +}; diff --git a/cpp/src/Public/sweepstore/utils/file_handle.h b/cpp/src/Public/sweepstore/utils/file_handle.h new file mode 100644 index 0000000..bc98188 --- /dev/null +++ b/cpp/src/Public/sweepstore/utils/file_handle.h @@ -0,0 +1,49 @@ +#pragma once + +#include +#include +#include + +class SweepstoreFileHandle { +private: + std::string path; + std::unique_ptr stream; + +public: + SweepstoreFileHandle(const std::string& p, std::ios::openmode mode = std::ios::in | std::ios::out | std::ios::binary) + : path(p), stream(std::make_unique(p, mode)) { + if (!stream->is_open()) { + throw std::runtime_error("Failed to open file: " + path); + } + } + + const std::string& getPath() const { return path; } + + std::fstream& getStream() { return *stream; } + const std::fstream& getStream() const { return *stream; } + + // Smart pointer-like interface + std::fstream* operator->() { return stream.get(); } + const std::fstream* operator->() const { return stream.get(); } + + std::fstream& operator*() { return *stream; } + const std::fstream& operator*() const { return *stream; } + + bool isOpen() const { return stream && stream->is_open(); } + + void close() { + if (stream) { + stream->close(); + } + } + + SweepstoreFileHandle(SweepstoreFileHandle&&) noexcept = default; + SweepstoreFileHandle& operator=(SweepstoreFileHandle&&) noexcept = default; + + SweepstoreFileHandle(const SweepstoreFileHandle&) = delete; + SweepstoreFileHandle& operator=(const SweepstoreFileHandle&) = delete; + + ~SweepstoreFileHandle() { + close(); + } +}; \ No newline at end of file diff --git a/cpp/src/Public/sweepstore/utils/file_lock.h b/cpp/src/Public/sweepstore/utils/file_lock.h new file mode 100644 index 0000000..c08809a --- /dev/null +++ b/cpp/src/Public/sweepstore/utils/file_lock.h @@ -0,0 +1,169 @@ +#pragma once + +#include + +#ifdef _WIN32 + #include +#else + #include + #include + #include +#endif + +class SweepstoreFileLock { +public: + enum class Mode { + Shared, + Exclusive + }; + +private: +#ifdef _WIN32 + HANDLE handle; + OVERLAPPED overlapped; +#else + int fd; +#endif + std::string path; + Mode mode; + bool locked; + + void acquire() { +#ifdef _WIN32 + handle = CreateFileA(path.c_str(), GENERIC_READ | GENERIC_WRITE, + FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, OPEN_EXISTING, + FILE_ATTRIBUTE_NORMAL, NULL); + + if (handle == INVALID_HANDLE_VALUE) { + throw std::runtime_error("Failed to open file"); + } + + memset(&overlapped, 0, sizeof(overlapped)); + DWORD flags = (mode == Mode::Exclusive) ? LOCKFILE_EXCLUSIVE_LOCK : 0; + + if (!LockFileEx(handle, flags, 0, MAXDWORD, MAXDWORD, &overlapped)) { + CloseHandle(handle); + throw std::runtime_error("Failed to lock"); + } +#else + fd = open(path.c_str(), O_RDWR); + if (fd == -1) throw std::runtime_error("Failed to open file"); + + int op = (mode == Mode::Exclusive) ? LOCK_EX : LOCK_SH; + if (flock(fd, op) != 0) { + close(fd); + throw std::runtime_error("Failed to lock"); + } +#endif + locked = true; + } + + void release() { + if (locked) { +#ifdef _WIN32 + UnlockFileEx(handle, 0, MAXDWORD, MAXDWORD, &overlapped); + CloseHandle(handle); +#else + flock(fd, LOCK_UN); + close(fd); +#endif + locked = false; + } + } + +public: + SweepstoreFileLock(const std::string& p, Mode m) + : path(p), mode(m), locked(false) { +#ifdef _WIN32 + handle = INVALID_HANDLE_VALUE; + memset(&overlapped, 0, sizeof(overlapped)); +#else + fd = -1; +#endif + } + + ~SweepstoreFileLock() { + release(); + } + + void lock() { + if (!locked) { + acquire(); + } + } + + void unlock() { + release(); + } + + // Check if THIS instance holds the lock + bool holdsLock() const { return locked; } + + // Check if the file is locked by ANYONE (including this instance) + bool isLocked() const { +#ifdef _WIN32 + HANDLE testHandle = CreateFileA(path.c_str(), GENERIC_READ | GENERIC_WRITE, + FILE_SHARE_READ | FILE_SHARE_WRITE, NULL, OPEN_EXISTING, + FILE_ATTRIBUTE_NORMAL, NULL); + + if (testHandle == INVALID_HANDLE_VALUE) { + return false; // Can't even open file + } + + OVERLAPPED testOverlapped; + memset(&testOverlapped, 0, sizeof(testOverlapped)); + DWORD flags = (mode == Mode::Exclusive) ? LOCKFILE_EXCLUSIVE_LOCK : 0; + flags |= LOCKFILE_FAIL_IMMEDIATELY; // Non-blocking + + bool isLocked = !LockFileEx(testHandle, flags, 0, MAXDWORD, MAXDWORD, &testOverlapped); + + if (!isLocked) { + // We got the lock, release it + UnlockFileEx(testHandle, 0, MAXDWORD, MAXDWORD, &testOverlapped); + } + + CloseHandle(testHandle); + return isLocked; +#else + int testFd = open(path.c_str(), O_RDWR); + if (testFd == -1) { + return false; // Can't open file + } + + int op = (mode == Mode::Exclusive) ? LOCK_EX : LOCK_SH; + op |= LOCK_NB; // Non-blocking + + bool isLocked = (flock(testFd, op) != 0); + + if (!isLocked) { + // We got the lock, release it + flock(testFd, LOCK_UN); + } + + close(testFd); + return isLocked; +#endif + } + + SweepstoreFileLock(const SweepstoreFileLock&) = delete; + SweepstoreFileLock& operator=(const SweepstoreFileLock&) = delete; + SweepstoreFileLock(SweepstoreFileLock&&) = default; + SweepstoreFileLock& operator=(SweepstoreFileLock&&) = default; + + class Scoped { + private: + SweepstoreFileLock& lock; + + public: + Scoped(SweepstoreFileLock& l) : lock(l) { + lock.lock(); + } + + ~Scoped() { + lock.unlock(); + } + + Scoped(const Scoped&) = delete; + Scoped& operator=(const Scoped&) = delete; + }; +}; \ No newline at end of file diff --git a/cpp/src/Public/sweepstore/utils/helpers.h b/cpp/src/Public/sweepstore/utils/helpers.h new file mode 100644 index 0000000..851b4ee --- /dev/null +++ b/cpp/src/Public/sweepstore/utils/helpers.h @@ -0,0 +1,381 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// Toggleable debug printing via preprocessor +#ifndef SWEEPSTORE_DEBUG +#define SWEEPSTORE_DEBUG 0 +#endif + +#if SWEEPSTORE_DEBUG + #define debugPrint(msg) std::cout << msg << std::endl +#else + #define debugPrint(msg) ((void)0) +#endif + + +inline void print(const char* message) { + // Print the message to the console + std::cout << message << std::endl; +} + +inline std::string trim(const std::string& str) { + size_t start = str.find_first_not_of(" \t\n\r"); + if (start == std::string::npos) return ""; // all whitespace + + size_t end = str.find_last_not_of(" \t\n\r"); + return str.substr(start, end - start + 1); +} + +inline std::string binaryDump(const std::vector& data) { + std::ostringstream buffer; + + for (size_t i = 0; i < data.size(); i += 16) { + // Address + buffer << "0x" + << std::setfill('0') << std::setw(4) << std::uppercase << std::hex << i + << " (" << std::dec << std::setw(4) << i << ") | "; + + // Hex bytes + for (size_t j = 0; j < 16; j++) { + if (i + j < data.size()) { + buffer << std::setfill('0') << std::setw(2) << std::uppercase << std::hex + << static_cast(data[i + j]) << " "; + } else { + buffer << " "; + } + } + + buffer << " | "; + + // Integer representation + for (size_t j = 0; j < 16; j++) { + if (i + j < data.size()) { + buffer << std::dec << std::setw(3) << static_cast(data[i + j]) << " "; + } else { + buffer << " "; + } + } + + buffer << " | "; + + // ASCII representation + for (size_t j = 0; j < 16; j++) { + if (i + j < data.size()) { + uint8_t byte = data[i + j]; + if (byte >= 32 && byte <= 126) { + buffer << static_cast(byte); + } else { + buffer << '.'; + } + } + } + + buffer << " | "; + if (i + 16 < data.size()) buffer << '\n'; + } + + return buffer.str(); +} + +inline std::vector loadFile(const std::string& filename) { + std::ifstream file(filename, std::ios::binary | std::ios::ate); + + if (!file) { + throw std::runtime_error("Failed to open file: " + filename); + } + + // Get file size + std::streamsize size = file.tellg(); + file.seekg(0, std::ios::beg); + + // Pre-allocate vector and read + std::vector buffer(size); + if (!file.read(reinterpret_cast(buffer.data()), size)) { + throw std::runtime_error("Failed to read file: " + filename); + } + + return buffer; +} + +enum class Endian { + Little, + Big +}; + +class RandomAccessMemory { +private: + std::vector _buffer; + size_t _position; + +public: + // Constructors + RandomAccessMemory() : _position(0) {} + + explicit RandomAccessMemory(const std::vector& initialData) + : _buffer(initialData), _position(0) {} + + explicit RandomAccessMemory(const uint8_t* data, size_t size) + : _buffer(data, data + size), _position(0) {} + + // Position management + size_t positionSync() const { + return _position; + } + + void setPositionSync(size_t position) { + _position = position; + } + + size_t length() const { + return _buffer.size(); + } + + // Read bytes + std::vector readSync(size_t count) { + if (_position + count > _buffer.size()) { + throw std::range_error("Not enough bytes to read"); + } + + std::vector result(_buffer.begin() + _position, + _buffer.begin() + _position + count); + _position += count; + return result; + } + + // Write bytes + void writeFromSync(const std::vector& bytes) { + for (size_t i = 0; i < bytes.size(); i++) { + if (_position + i >= _buffer.size()) { + _buffer.push_back(bytes[i]); + } else { + _buffer[_position + i] = bytes[i]; + } + } + _position += bytes.size(); + } + + // Read/Write Int Dynamic + int64_t readIntSync(int size = 4, Endian endianness = Endian::Little) { + if (size < 1 || size > 8) { + throw std::invalid_argument("Size must be between 1 and 8 bytes"); + } + + std::vector bytes = readSync(size); + + // Build integer from bytes with proper endianness + int64_t result = 0; + if (endianness == Endian::Little) { + for (int i = size - 1; i >= 0; i--) { + result = (result << 8) | bytes[i]; + } + } else { + for (int i = 0; i < size; i++) { + result = (result << 8) | bytes[i]; + } + } + + // Sign extend if MSB is set + int64_t signBit = 1LL << (size * 8 - 1); + if (result & signBit) { + result -= 1LL << (size * 8); + } + + return result; + } + uint64_t readUIntSync(int size = 4, Endian endianness = Endian::Little) { + if (size < 1 || size > 8) { + throw std::invalid_argument("Size must be between 1 and 8 bytes"); + } + + std::vector bytes = readSync(size); + + // Build integer from bytes with proper endianness + uint64_t result = 0; + if (endianness == Endian::Little) { + for (int i = size - 1; i >= 0; i--) { + result = (result << 8) | bytes[i]; + } + } else { + for (int i = 0; i < size; i++) { + result = (result << 8) | bytes[i]; + } + } + + return result; + } + + void writeIntSync(int64_t value, int size = 4, Endian endianness = Endian::Little) { + if (size < 1 || size > 8) { + throw std::invalid_argument("Size must be between 1 and 8 bytes"); + } + + std::vector bytes(size, 0); + + // Extract bytes with proper endianness + if (endianness == Endian::Little) { + for (int i = 0; i < size; i++) { + bytes[i] = (value >> (i * 8)) & 0xFF; + } + } else { + for (int i = 0; i < size; i++) { + bytes[size - 1 - i] = (value >> (i * 8)) & 0xFF; + } + } + + writeFromSync(bytes); + } + void writeUIntSync(uint64_t value, int size = 4, Endian endianness = Endian::Little) { + if (size < 1 || size > 8) { + throw std::invalid_argument("Size must be between 1 and 8 bytes"); + } + + std::vector bytes(size, 0); + + // Extract bytes with proper endianness + if (endianness == Endian::Little) { + for (int i = 0; i < size; i++) { + bytes[i] = (value >> (i * 8)) & 0xFF; + } + } else { + for (int i = 0; i < size; i++) { + bytes[size - 1 - i] = (value >> (i * 8)) & 0xFF; + } + } + + writeFromSync(bytes); + } + + // Read/Write Pointers (assuming POINTER size is 8 bytes) + SweepstorePointer readPointerSync(int pointerSize = 8) { + int64_t offset = readUIntSync(pointerSize); + return SweepstorePointer(offset); + } + + void writePointerSync(const SweepstorePointer& pointer, int pointerSize = 8) { + writeUIntSync(pointer, pointerSize); + } + + // Read/Write Float32 + float readFloat32Sync(Endian endianness = Endian::Little) { + std::vector bytes = readSync(4); + float value; + + if (endianness == Endian::Little) { + std::memcpy(&value, bytes.data(), 4); + } else { + std::vector reversed(bytes.rbegin(), bytes.rend()); + std::memcpy(&value, reversed.data(), 4); + } + + return value; + } + + void writeFloat32Sync(float value, Endian endianness = Endian::Little) { + std::vector bytes(4); + std::memcpy(bytes.data(), &value, 4); + + if (endianness == Endian::Big) { + std::reverse(bytes.begin(), bytes.end()); + } + + writeFromSync(bytes); + } + + // Read/Write Float64 (Double) + double readFloat64Sync(Endian endianness = Endian::Little) { + std::vector bytes = readSync(8); + double value; + + if (endianness == Endian::Little) { + std::memcpy(&value, bytes.data(), 8); + } else { + std::vector reversed(bytes.rbegin(), bytes.rend()); + std::memcpy(&value, reversed.data(), 8); + } + + return value; + } + + void writeFloat64Sync(double value, Endian endianness = Endian::Little) { + std::vector bytes(8); + std::memcpy(bytes.data(), &value, 8); + + if (endianness == Endian::Big) { + std::reverse(bytes.begin(), bytes.end()); + } + + writeFromSync(bytes); + } + + // Conversion methods + std::vector toVector() const { + return _buffer; + } + + const uint8_t* data() const { + return _buffer.data(); + } + + uint8_t* data() { + return _buffer.data(); + } +}; + + +#ifdef _WIN32 +#include +#endif + +inline void preciseSleep(std::chrono::nanoseconds duration) { + auto start = std::chrono::high_resolution_clock::now(); + +#ifdef _WIN32 + const auto windowsMinSleepTime = std::chrono::milliseconds(1); + + if (duration < windowsMinSleepTime) { + // Pure busy-wait with high-res timer + while (std::chrono::high_resolution_clock::now() - start < duration) { + // Optionally use _mm_pause() or YieldProcessor() to be nicer to hyperthreading + } + } else { + // Hybrid: sleep most of it, busy-wait the remainder + auto sleepDuration = duration - windowsMinSleepTime; + std::this_thread::sleep_for(sleepDuration); + while (std::chrono::high_resolution_clock::now() - start < duration) {} + } +#else + std::this_thread::sleep_for(duration); +#endif +} + +inline int32_t millisecondsSinceEpoch32() { + auto now = std::chrono::system_clock::now(); + auto millis = std::chrono::duration_cast(now.time_since_epoch()).count(); + return static_cast((millis / 1000) & 0xFFFFFFFF); +} + +inline int64_t millisecondsSinceEpoch64() { + auto now = std::chrono::system_clock::now(); + auto millis = std::chrono::duration_cast(now.time_since_epoch()).count(); + return millis; +} + +inline uint64_t bt_hash(const std::string& str) { + uint64_t hash = 0xcbf29ce484222325ULL; // FNV offset basis + + for (unsigned char byte : str) { + hash ^= byte; + hash *= 0x100000001b3ULL; // FNV prime + } + + return hash; +} \ No newline at end of file diff --git a/dart/lib/sweepstore.dart b/dart/lib/sweepstore.dart index e245899..b8aa30e 100644 --- a/dart/lib/sweepstore.dart +++ b/dart/lib/sweepstore.dart @@ -72,10 +72,14 @@ Future main() async { print("Stale Ticket Threshold: ${STALE_HEARTBEAT_THRESHOLD_MS}ms"); while (true) { - int concurrencyTest = 128; + int concurrencyTest = 256; final receivePort = ReceivePort(); int completedJobs = 0; + if (iteration > 0) { + break; + } + final stopwatch = Stopwatch()..start(); print('\x1B[95mStarting iteration #$iteration with $concurrencyTest concurrent jobs...\x1B[0m'); diff --git a/example.bin b/example.bin index 5f89823a174e95967925dc5aea22d04ea24517c4..df65193036c212ce83ac835a85aae0738f742dcb 100644 GIT binary patch literal 1072 zcmWFz4+v2()HBpG&@)s3g8xv!fDRPU1z>!fI-m+hocP^ZvdEz}8XCAl0|1I#s;B?}