From 4295d119d7fb190489045a750b0f493fa6e96b85 Mon Sep 17 00:00:00 2001 From: ImBenji Date: Sun, 23 Nov 2025 05:29:08 +0000 Subject: [PATCH] Add RandomAccessMemory class for in-memory binary operations --- CLAUDE.md | 2 + CONTRIBUTING.md | 10 + LICENSE | 74 ++- cpp/CMakeLists.txt | 38 +- cpp/binary_table.cpp | 304 ++++++--- cpp/binary_table.h | 10 +- cpp/debug/debug_address_table.cpp | 50 -- cpp/debug/debug_alloc.cpp | 61 -- cpp/debug/debug_multi_key.cpp | 69 -- cpp/debug/debug_simple.cpp | 49 -- cpp/debug/debug_step_by_step.cpp | 105 --- cpp/main.cpp | 213 ------- cpp/parity_test.cpp | 197 ------ dart/CHANGELOG.md | 2 + dart/lib/concurrency.dart | 190 ++++++ dart/lib/debug.dart | 50 ++ dart/lib/dev_tools/watch_dump.dart | 31 + dart/lib/dev_tools/watch_tickets.dart | 84 +++ dart/lib/header.dart | 440 +++++++++++++ dart/lib/helper_extensions.dart | 228 +++++++ dart/lib/structures.dart | 48 ++ dart/lib/sweepstore.dart | 105 +++ dart/pubspec.yaml | 8 +- .../binary_table.dart => sweepstore_old.dart} | 598 ++++++++++++++++-- documentation/header.md | 107 ++++ example.bin | Bin 303 -> 646 bytes 26 files changed, 2124 insertions(+), 949 deletions(-) create mode 100644 CLAUDE.md create mode 100644 CONTRIBUTING.md delete mode 100644 cpp/debug/debug_address_table.cpp delete mode 100644 cpp/debug/debug_alloc.cpp delete mode 100644 cpp/debug/debug_multi_key.cpp delete mode 100644 cpp/debug/debug_simple.cpp delete mode 100644 cpp/debug/debug_step_by_step.cpp delete mode 100644 cpp/main.cpp delete mode 100644 cpp/parity_test.cpp create mode 100644 dart/CHANGELOG.md create mode 100644 dart/lib/concurrency.dart create mode 100644 dart/lib/debug.dart create mode 100644 dart/lib/dev_tools/watch_dump.dart create mode 100644 dart/lib/dev_tools/watch_tickets.dart create mode 100644 dart/lib/header.dart create mode 100644 dart/lib/helper_extensions.dart create mode 100644 dart/lib/structures.dart create mode 100644 dart/lib/sweepstore.dart rename dart/{lib/binary_table.dart => sweepstore_old.dart} (65%) create mode 100644 documentation/header.md diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..2e27254 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,2 @@ +- Remember that when porting the dart library to other languages, we do not care about improvements, we expect the logic to be near 1:1 reflection of the dart repository for compatibility and interoperability. Do not make any logical or programatic changes. +- When told to update the library in any language other than dart. Read the dart library and compare it to the existing given language port and update the port to match parity. \ No newline at end of file diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..adcf18c --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,10 @@ +## Contributing to SweepStore + +By contributing code, documentation, or other materials to SweepStore, you agree that: + +1. Benjamin Watt/IMBENJI.NET LIMITED retains perpetual rights to use your contributions under the Business Source License 1.1 +2. You retain copyright to your contributions +3. Your contributions will be available under BSL 1.1 +4. Your contributions may be sublicensed commercially + +This allows SweepStore to maintain its licensing model. \ No newline at end of file diff --git a/LICENSE b/LICENSE index 6fad1e9..b85324c 100644 --- a/LICENSE +++ b/LICENSE @@ -8,29 +8,65 @@ /$$$$$$| $$ \/ | $$| $$$$$$$/| $$$$$$$$| $$ \ $$| $$$$$$/ /$$$$$$ /$$| $$ \ $$| $$$$$$$$ | $$ |______/|__/ |__/|_______/ |________/|__/ \__/ \______/ |______/|__/|__/ \__/|________/ |__/ -ยฉ 2025-26 by Benjamin Watt of IMBENJI.NET LIMITED - All rights reserved. +Business Source License 1.1 -Use of this source code is governed by a MIT license that can be found in the LICENSE file. +License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved. +"Business Source License" is a trademark of MariaDB Corporation Ab. -MIT License +----------------------------------------------------------------------------- -Copyright (c) 2025 Benjamin Watt of IMBENJI.NET LIMITED +Parameters -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: +Licensor: Benjamin Watt / IMBENJI.NET LIMITED +Licensed Work: SweepStore + The Licensed Work is (c) 2025-26 Benjamin Watt +Additional Use Grant: You may use the Licensed Work for any purpose other + than production use in a commercial product or service + that generates more than $100,000 USD in annual gross + revenue. +Change Date: Five years from the date of release of each version +Change License: Apache License, Version 2.0 -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. +----------------------------------------------------------------------------- -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. +Terms +The Licensor hereby grants you the right to copy, modify, create derivative +works, redistribute, and make non-production use of the Licensed Work. The +Licensor may make an Additional Use Grant, above, permitting limited production +use. + +Effective on the Change Date, or the fourth anniversary of the first publicly +available distribution of a specific version of the Licensed Work under this +License, whichever comes first, the Licensor hereby grants you rights under +the terms of the Change License, and the rights granted in the paragraph above +terminate. + +If your use of the Licensed Work does not comply with the requirements +currently in effect as described in this License, you must purchase a +commercial license from the Licensor, its affiliated entities, or authorized +resellers, or you must refrain from using the Licensed Work. + +All copies of the original and modified Licensed Work, and derivative works +of the Licensed Work, are subject to this License. This License applies +separately for each version of the Licensed Work and the Change Date may vary +for each version of the Licensed Work released by Licensor. + +You must conspicuously display this License on each original or modified copy +of the Licensed Work. If you receive the Licensed Work in original or modified +form from a third party, the terms and conditions set forth in this License +apply to your use of that work. + +Any use of the Licensed Work in violation of this License will automatically +terminate your rights under this License for the current and all other +versions of the Licensed Work. + +This License does not grant you any right in any trademark or logo of +Licensor or its affiliates (provided that you may use a trademark or logo of +Licensor as expressly required by this License). + +TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON +AN "AS IS" BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS, +EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND +TITLE. diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index eea6a1b..a162dba 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -4,43 +4,21 @@ project(BinaryTable) set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED ON) -# Core Binary Table Library -add_library(binary_table +# Main executable with integrated binary table implementation +add_executable(main binary_table.h binary_table.cpp ) -# Main Application -add_executable(main main.cpp) -target_link_libraries(main binary_table) - -# Debug Test Executables -add_executable(debug_multi_key debug/debug_multi_key.cpp) -target_link_libraries(debug_multi_key binary_table) - -add_executable(debug_alloc debug/debug_alloc.cpp) -target_link_libraries(debug_alloc binary_table) - -add_executable(debug_address_table debug/debug_address_table.cpp) -target_link_libraries(debug_address_table binary_table) - -add_executable(debug_step_by_step debug/debug_step_by_step.cpp) -target_link_libraries(debug_step_by_step binary_table) - -add_executable(debug_simple debug/debug_simple.cpp) -target_link_libraries(debug_simple binary_table) - # Compiler Settings if(MSVC) - target_compile_options(binary_table PRIVATE /W4) target_compile_options(main PRIVATE /W4) else() - target_compile_options(binary_table PRIVATE -Wall -Wextra -pedantic) target_compile_options(main PRIVATE -Wall -Wextra -pedantic) - # Apply warnings to debug executables too - target_compile_options(debug_multi_key PRIVATE -Wall -Wextra -pedantic) - target_compile_options(debug_alloc PRIVATE -Wall -Wextra -pedantic) - target_compile_options(debug_address_table PRIVATE -Wall -Wextra -pedantic) - target_compile_options(debug_step_by_step PRIVATE -Wall -Wextra -pedantic) - target_compile_options(debug_simple PRIVATE -Wall -Wextra -pedantic) +endif() + +# Link required libraries +if(UNIX AND NOT APPLE) + # Only link stdc++fs on Linux, not macOS + target_link_libraries(main PRIVATE stdc++fs) endif() diff --git a/cpp/binary_table.cpp b/cpp/binary_table.cpp index 7863177..c33bf00 100644 --- a/cpp/binary_table.cpp +++ b/cpp/binary_table.cpp @@ -4,6 +4,9 @@ #include #include #include +#include +#include +#include namespace bt { @@ -483,8 +486,25 @@ BinaryTable::~BinaryTable() { void BinaryTable::initialize() { fseek(file_, 0, SEEK_SET); - writeInt64(0, BT_Null.address()); // Address table pointer (8 bytes) - writeInt32(8, 0); // Free list entry count (4 bytes) + + // Magic number "SWPS" (0/4 bytes) + const char magic[] = "SWPS"; + fwrite(magic, 1, 4, file_); + + // Version (1.0 float16) (4/2 bytes) + uint16_t version = 0x3C00; // 1.0 in float16 format + uint8_t versionBytes[2] = { + static_cast(version & 0xFF), + static_cast((version >> 8) & 0xFF) + }; + fwrite(versionBytes, 1, 2, file_); + + // Address table pointer (null) (6/8 bytes) + writeInt64(6, BT_Null.address()); + + // Free list count (0) (14/4 bytes) + writeInt32(14, 0); + fflush(file_); } @@ -600,7 +620,7 @@ void BinaryTable::setFilePosition(int64_t position) { // Address table management std::unordered_map BinaryTable::getAddressTable() { - int64_t tableAddress = readInt64(0); + int64_t tableAddress = readInt64(6); DEBUG_PRINTLN("DEBUG: getAddressTable reading from address " << tableAddress); if (tableAddress == -1) { // Null pointer @@ -674,7 +694,7 @@ void BinaryTable::setAddressTable(const std::unordered_map& } // Read old table pointer FIRST to ensure we can clean it up later - int64_t oldTablePointerAddress = readInt64(0); + int64_t oldTablePointerAddress = readInt64(6); BT_Pointer oldTablePtr(oldTablePointerAddress); int32_t oldTableSize = 0; @@ -729,7 +749,7 @@ void BinaryTable::setAddressTable(const std::unordered_map& fflush(file_); // Atomically update header to point to new table - writeInt64(0, newTableAddress.address()); + writeInt64(6, newTableAddress.address()); fflush(file_); // Only free old table after new one is successfully committed @@ -748,19 +768,15 @@ std::vector BinaryTable::getFreeList() { return freeListCache_; } - int64_t fileLength = getFileLength(); - if (fileLength < 4) { - return {}; - } - - int32_t entryCount = readInt32(fileLength - 4); + int32_t entryCount = readInt32(14); if (entryCount == 0) { return {}; } int32_t entrySize = 8 + 4; // Pointer + Size int32_t freeListSize = entryCount * entrySize; - int64_t freeListStart = fileLength - 4 - freeListSize; + int64_t fileLength = getFileLength(); + int64_t freeListStart = fileLength - freeListSize; std::vector freeList; for (int32_t i = 0; i < entryCount; i++) { @@ -781,65 +797,44 @@ void BinaryTable::setFreeList(const std::vector& list) { return; } - // Always remove old free list first (matching Dart behavior) - int64_t fileLength = getFileLength(); - DEBUG_PRINTLN("DEBUG: setFreeList fileLength=" << fileLength); - - // Calculate old free list size to remove - int32_t oldEntryCount = 0; - if (fileLength >= 4) { - oldEntryCount = readInt32(fileLength - 4); - } - DEBUG_PRINTLN("DEBUG: setFreeList oldEntryCount=" << oldEntryCount); - - // Remove old free list (matching Dart: always truncate first) + // Read OLD count from header (position 14) + int32_t oldEntryCount = readInt32(14); + + // Calculate old free list size (entries only, not count) + int32_t oldListSize = oldEntryCount * (8 + 4); + + // Remove old free list entries from EOF if (oldEntryCount > 0) { - int32_t oldListSize = (oldEntryCount * (8 + 4)) + 4; // Entries + Count - int64_t newFileLength = fileLength - oldListSize; - DEBUG_PRINTLN("DEBUG: setFreeList - removing old free list, oldListSize=" << oldListSize << ", truncating to: " << newFileLength); - truncateFile(newFileLength); - fileLength = newFileLength; // Update file length + int64_t currentLength = getFileLength(); + truncateFile(currentLength - oldListSize); } - - // If the new free list is empty, we're done (old list already removed) - if (list.empty()) { - DEBUG_PRINTLN("DEBUG: setFreeList - empty list, old list removed, done"); - return; - } - - // Write new free list at end of file - int64_t newLogicalEnd = fileLength; - - // Encode new free list - std::vector buffer; - - // Entries - for (const auto& entry : list) { - // Pointer (8 bytes, little endian) - int64_t addr = entry.pointer.address(); - for (int i = 0; i < 8; i++) { - buffer.push_back(static_cast((addr >> (i * 8)) & 0xFF)); - } - // Size (4 bytes, little endian) - int32_t size = entry.size; - for (int i = 0; i < 4; i++) { - buffer.push_back(static_cast((size >> (i * 8)) & 0xFF)); + + // Write NEW count to header (position 14) + writeInt32(14, static_cast(list.size())); + + // Write NEW entries to EOF (if any) + if (!list.empty()) { + // Encode new free list entries + std::vector buffer; + + for (const auto& entry : list) { + // Pointer (8 bytes, little endian) + int64_t addr = entry.pointer.address(); + for (int i = 0; i < 8; i++) { + buffer.push_back(static_cast((addr >> (i * 8)) & 0xFF)); + } + // Size (4 bytes, little endian) + int32_t size = entry.size; + for (int i = 0; i < 4; i++) { + buffer.push_back(static_cast((size >> (i * 8)) & 0xFF)); + } } + + fseek(file_, 0, SEEK_END); + fwrite(buffer.data(), 1, buffer.size(), file_); } - - // Entry count (4 bytes, little endian) - int32_t count = static_cast(list.size()); - for (int i = 0; i < 4; i++) { - buffer.push_back(static_cast((count >> (i * 8)) & 0xFF)); - } - - // Write at the logical end position - fseek(file_, newLogicalEnd, SEEK_SET); - fwrite(buffer.data(), 1, buffer.size(), file_); + fflush(file_); - - // Update logical file length - // File will be extended automatically by write operations } void BinaryTable::truncateFile(int64_t newSize) { @@ -865,29 +860,24 @@ void BinaryTable::liftFreeList() { throw std::runtime_error("Free list is already lifted"); } + // Cache the free list freeListCache_ = getFreeList(); - // Remove free list from end of file - int64_t fileLength = getFileLength(); - int32_t oldEntryCount = (fileLength >= 4) ? readInt32(fileLength - 4) : 0; + // Read count from header (position 14) + int32_t oldEntryCount = readInt32(14); if (oldEntryCount > 0) { int32_t oldEntrySize = 8 + 4; - int32_t oldFreeListSize = oldEntryCount * oldEntrySize + 4; - int64_t newFileLength = fileLength - oldFreeListSize; + int32_t oldFreeListSize = oldEntryCount * oldEntrySize; // Just entries, no count - // Store current file position to restore later if needed - long currentPos = ftell(file_); - - // Properly truncate the file - truncateFile(newFileLength); - - // Restore file position if it's still valid - if (currentPos >= 0 && currentPos < newFileLength) { - fseek(file_, currentPos, SEEK_SET); - } + // Remove free list entries from EOF + int64_t fileLength = getFileLength(); + truncateFile(fileLength - oldFreeListSize); } + // Clear count in header (position 14) + writeInt32(14, 0); + freeListLifted_ = true; } @@ -898,9 +888,7 @@ void BinaryTable::dropFreeList() { } freeListLifted_ = false; - DEBUG_PRINTLN("DEBUG: About to call setFreeList - this might corrupt the address table!"); - setFreeList(freeListCache_); - DEBUG_PRINTLN("DEBUG: setFreeList completed"); + setFreeList(freeListCache_); // This now writes count to header and entries to EOF freeListCache_.clear(); } @@ -1106,4 +1094,150 @@ void BinaryTable::debugAddressTable(const std::string& context) { DEBUG_PRINTLN("========================="); } -} // namespace bt \ No newline at end of file +std::string binaryDump(const std::vector& data) { + std::ostringstream buffer; + + for (size_t i = 0; i < data.size(); i += 16) { + // Address + buffer << "0x" << std::hex << std::setfill('0') << std::setw(4) << std::uppercase << i + << " (" << std::dec << std::setfill(' ') << std::setw(4) << i << ") | "; + + // Hex bytes + for (int j = 0; j < 16; j++) { + if (i + j < data.size()) { + buffer << std::hex << std::setfill('0') << std::setw(2) << std::uppercase + << static_cast(data[i + j]) << " "; + } else { + buffer << " "; + } + } + + buffer << " | "; + + // Integer representation + for (int j = 0; j < 16; j++) { + if (i + j < data.size()) { + buffer << std::dec << std::setfill(' ') << std::setw(3) + << static_cast(data[i + j]) << " "; + } else { + buffer << " "; + } + } + + buffer << " | "; + + // ASCII representation + for (int j = 0; j < 16; j++) { + if (i + j < data.size()) { + uint8_t byte = data[i + j]; + if (byte >= 32 && byte <= 126) { + buffer << static_cast(byte); + } else { + buffer << '.'; + } + } + } + + buffer << " | "; + if (i + 16 < data.size()) { + buffer << "\n"; + } + } + + return buffer.str(); +} + +} // namespace bt + +int main() { + // Delete existing file if it exists + if (std::filesystem::exists("example.bin")) { + std::filesystem::remove("example.bin"); + } + + bt::BinaryTable table("example.bin"); + table.initialize(); + + // Read file for dump + std::ifstream file("example.bin", std::ios::binary); + std::vector fileData((std::istreambuf_iterator(file)), + std::istreambuf_iterator()); + file.close(); + + std::cout << "File dump:\n"; + std::cout << bt::binaryDump(fileData) << "\n"; + std::cout << "File size: " << fileData.size() << " bytes\n"; + std::cout << " \n"; + + // Set values + table.set("int_array", std::vector{6, 3, 9, 2, 5}); + table.set("float_array", std::vector{1.5f, 2.5f, 3.5f}); + table.set("empty", std::vector{}); + + // Get arrays and modify elements + auto intArray = table.getArray("int_array"); + auto floatArray = table.getArray("float_array"); + + intArray.set(0, 1); + floatArray.set(1, 4.5f); + + std::cout << "int_array pointer: " << intArray.getPointer().address() << "\n"; + std::cout << "float_array pointer: " << floatArray.getPointer().address() << "\n"; + + intArray.add(10); + floatArray.add(5.5f); + + intArray.addAll({420, 69, 1337, 1738}); + floatArray.addAll({6.5f, 7.5f, 8.5f}); + + // Test the fetchSublist method + auto intArraySublist = intArray.fetchSublist(0, 3); + auto floatArraySublist = floatArray.fetchSublist(0, 2); + std::cout << "Sublist int_array (0-3): "; + for (auto val : intArraySublist) { + std::cout << val << " "; + } + std::cout << "\n"; + std::cout << "Sublist float_array (0-2): "; + for (auto val : floatArraySublist) { + std::cout << val << " "; + } + std::cout << "\n"; + + // Read back values + auto readback1 = table.get>("int_array"); + auto readback2 = table.get>("float_array"); + auto readback3 = table.get>("empty"); + + std::cout << "Readback1: "; + for (auto val : readback1) { + std::cout << val << " "; + } + std::cout << "\n"; + + std::cout << "Readback2: "; + for (auto val : readback2) { + std::cout << val << " "; + } + std::cout << "\n"; + + std::cout << "Readback3: "; + for (auto val : readback3) { + std::cout << val << " "; + } + std::cout << "\n"; + + std::cout << " \n"; + + // Final file dump + std::ifstream finalFile("example.bin", std::ios::binary); + std::vector finalFileData((std::istreambuf_iterator(finalFile)), + std::istreambuf_iterator()); + finalFile.close(); + + std::cout << "File dump:\n"; + std::cout << bt::binaryDump(finalFileData) << "\n"; + std::cout << "File size: " << finalFileData.size() << " bytes\n"; + + return 0; +} \ No newline at end of file diff --git a/cpp/binary_table.h b/cpp/binary_table.h index fcae287..c306cf6 100644 --- a/cpp/binary_table.h +++ b/cpp/binary_table.h @@ -11,7 +11,7 @@ ๏ฟฝ 2025-26 by Benjamin Watt of IMBENJI.NET LIMITED - All rights reserved. -Use of this source code is governed by a MIT license that can be found in the LICENSE file. +Use of this source code is governed by the Business Source License 1.1 that can be found in the LICENSE file. This file is part of the SweepStore (formerly Binary Table) package for C++. @@ -192,6 +192,9 @@ private: int64_t hashString(const std::string& str) const; void truncateFile(int64_t newSize); + void antiFreeListScope(std::function fn); + void free(BT_Pointer pointer, int32_t size); + BT_Pointer alloc(int32_t size); // File I/O helpers int32_t readInt32(int64_t position); @@ -212,12 +215,9 @@ public: void initialize(); - // Memory management + // Memory management void liftFreeList(); void dropFreeList(); - void antiFreeListScope(std::function fn); - void free(BT_Pointer pointer, int32_t size); - BT_Pointer alloc(int32_t size); // Data operations template diff --git a/cpp/debug/debug_address_table.cpp b/cpp/debug/debug_address_table.cpp deleted file mode 100644 index 073f8ed..0000000 --- a/cpp/debug/debug_address_table.cpp +++ /dev/null @@ -1,50 +0,0 @@ -#include -#include -#include "../binary_table.h" - -void printAddressTable(bt::BinaryTable& table) { - // We can't access getAddressTable directly, so let's use a different approach - // Try to retrieve all known keys and see what happens - std::vector keys = {"key1", "key2", "key3"}; - - for (const std::string& key : keys) { - try { - auto ref = table.getReference(key); - std::cout << " " << key << " -> address " << ref.getPointer().address() - << " (type " << static_cast(ref.getType()) << ")" << std::endl; - } catch (const std::exception& e) { - std::cout << " " << key << " -> ERROR: " << e.what() << std::endl; - } - } -} - -int main() { - using namespace bt; - - const std::string filename = "debug_addr_table.bin"; - if (std::filesystem::exists(filename)) { - std::filesystem::remove(filename); - } - - BinaryTable table(filename); - table.initialize(); - - std::cout << "=== Testing Address Table Corruption ===\n" << std::endl; - - std::cout << "Initial state (empty):" << std::endl; - printAddressTable(table); - - std::cout << "\n1. After storing key1:" << std::endl; - table.set("key1", 100); - printAddressTable(table); - - std::cout << "\n2. After storing key2:" << std::endl; - table.set("key2", 200); - printAddressTable(table); - - std::cout << "\n3. After storing key3:" << std::endl; - table.set("key3", 300); - printAddressTable(table); - - return 0; -} \ No newline at end of file diff --git a/cpp/debug/debug_alloc.cpp b/cpp/debug/debug_alloc.cpp deleted file mode 100644 index 37ead03..0000000 --- a/cpp/debug/debug_alloc.cpp +++ /dev/null @@ -1,61 +0,0 @@ -#include -#include -#include "../binary_table.h" - -int main() { - using namespace bt; - - const std::string filename = "debug_alloc.bin"; - if (std::filesystem::exists(filename)) { - std::filesystem::remove(filename); - } - - BinaryTable table(filename); - table.initialize(); - - std::cout << "=== Testing Memory Allocation Issues ===\n" << std::endl; - - // Store first key and see what address it gets - std::cout << "1. Storing first key..." << std::endl; - table.set("key1", 100); - - // Get the address where key1's value was stored - auto addressTable1 = table.getReference("key1").getPointer(); - std::cout << " key1 value stored at: " << addressTable1.address() << std::endl; - - // Store second key and see what addresses are used - std::cout << "2. Storing second key..." << std::endl; - table.set("key2", 200); - - auto addressTable2 = table.getReference("key2").getPointer(); - std::cout << " key2 value stored at: " << addressTable2.address() << std::endl; - - // Check if key1 is still accessible - std::cout << "3. Checking if key1 is still accessible..." << std::endl; - try { - int32_t val1 = table.get("key1"); - std::cout << " โœ… key1 still works: " << val1 << std::endl; - } catch (const std::exception& e) { - std::cout << " โŒ key1 broken: " << e.what() << std::endl; - - // Let's see what's actually stored at key1's address - try { - auto ref = table.getReference("key1"); - std::cout << " key1 type is: " << static_cast(ref.getType()) << std::endl; - } catch (const std::exception& e2) { - std::cout << " Can't even get type: " << e2.what() << std::endl; - } - } - - std::cout << "\n=== Address Comparison ===\n" << std::endl; - std::cout << "key1 address: " << addressTable1.address() << std::endl; - std::cout << "key2 address: " << addressTable2.address() << std::endl; - - if (addressTable1.address() == addressTable2.address()) { - std::cout << "๐Ÿ’ฅ SAME ADDRESS! This proves the bug!" << std::endl; - } else { - std::cout << "Addresses are different, issue is elsewhere" << std::endl; - } - - return 0; -} \ No newline at end of file diff --git a/cpp/debug/debug_multi_key.cpp b/cpp/debug/debug_multi_key.cpp deleted file mode 100644 index 0920bb7..0000000 --- a/cpp/debug/debug_multi_key.cpp +++ /dev/null @@ -1,69 +0,0 @@ -#include -#include -#include "../binary_table.h" - -int main() { - using namespace bt; - - const std::string filename = "debug_multi.bin"; - if (std::filesystem::exists(filename)) { - std::filesystem::remove(filename); - } - - BinaryTable table(filename); - table.initialize(); - - std::cout << "=== Testing Multi-Key Storage ===" << std::endl; - - // Store first key - std::cout << "1. Storing first key..." << std::endl; - table.set("key1", 100); - - // Try to read it back - try { - int32_t val1 = table.get("key1"); - std::cout << " โœ… First key retrieved: " << val1 << std::endl; - } catch (const std::exception& e) { - std::cout << " โŒ First key failed: " << e.what() << std::endl; - return 1; - } - - // Store second key - this is where it likely breaks - std::cout << "2. Storing second key..." << std::endl; - table.set("key2", 200); - - // Try to read second key - try { - int32_t val2 = table.get("key2"); - std::cout << " โœ… Second key retrieved: " << val2 << std::endl; - } catch (const std::exception& e) { - std::cout << " โŒ Second key failed: " << e.what() << std::endl; - } - - // Try to read first key again - this will likely fail - std::cout << "3. Re-reading first key..." << std::endl; - try { - int32_t val1_again = table.get("key1"); - std::cout << " โœ… First key still accessible: " << val1_again << std::endl; - } catch (const std::exception& e) { - std::cout << " โŒ First key now broken: " << e.what() << std::endl; - std::cout << " ๐Ÿ’ฅ CONFIRMED: Table breaks after storing 2+ keys!" << std::endl; - } - - // Store third key to see if pattern continues - std::cout << "4. Storing third key..." << std::endl; - try { - table.set("key3", 300); - int32_t val3 = table.get("key3"); - std::cout << " โœ… Third key works: " << val3 << std::endl; - } catch (const std::exception& e) { - std::cout << " โŒ Third key failed: " << e.what() << std::endl; - } - - std::cout << "\n=== Conclusion ===" << std::endl; - std::cout << "The issue is definitely in the address table management" << std::endl; - std::cout << "when storing multiple keys. Single key = perfect," << std::endl; - std::cout << "multiple keys = corruption." << std::endl; - - return 0; -} \ No newline at end of file diff --git a/cpp/debug/debug_simple.cpp b/cpp/debug/debug_simple.cpp deleted file mode 100644 index bbdf700..0000000 --- a/cpp/debug/debug_simple.cpp +++ /dev/null @@ -1,49 +0,0 @@ -#include -#include -#include "../binary_table.h" - -int main() { - using namespace bt; - - const std::string filename = "debug_simple.bin"; - if (std::filesystem::exists(filename)) { - std::filesystem::remove(filename); - } - - BinaryTable table(filename); - table.initialize(); - - std::cout << "1. Storing key1..." << std::endl; - table.set("key1", 100); - table.debugAddressTable("after key1"); - - std::cout << "2. Reading key1..." << std::endl; - try { - int32_t val = table.get("key1"); - std::cout << " โœ… key1 = " << val << std::endl; - } catch (const std::exception& e) { - std::cout << " โŒ key1 failed: " << e.what() << std::endl; - } - - std::cout << "3. Storing key2..." << std::endl; - table.set("key2", 200); - table.debugAddressTable("after key2"); - - std::cout << "4. Reading key2..." << std::endl; - try { - int32_t val = table.get("key2"); - std::cout << " โœ… key2 = " << val << std::endl; - } catch (const std::exception& e) { - std::cout << " โŒ key2 failed: " << e.what() << std::endl; - } - - std::cout << "5. Re-reading key1..." << std::endl; - try { - int32_t val = table.get("key1"); - std::cout << " โœ… key1 = " << val << std::endl; - } catch (const std::exception& e) { - std::cout << " โŒ key1 failed: " << e.what() << std::endl; - } - - return 0; -} \ No newline at end of file diff --git a/cpp/debug/debug_step_by_step.cpp b/cpp/debug/debug_step_by_step.cpp deleted file mode 100644 index 9234f13..0000000 --- a/cpp/debug/debug_step_by_step.cpp +++ /dev/null @@ -1,105 +0,0 @@ -#include -#include -#include -#include -#include "../binary_table.h" - -void dumpFile(const std::string& filename) { - std::ifstream file(filename, std::ios::binary); - file.seekg(0, std::ios::end); - size_t size = file.tellg(); - file.seekg(0, std::ios::beg); - - std::vector data(size); - file.read(reinterpret_cast(data.data()), size); - - std::cout << "File size: " << size << " bytes" << std::endl; - for (size_t i = 0; i < std::min(size, size_t(80)); i++) { - if (i % 16 == 0) std::cout << std::hex << i << ": "; - std::cout << std::hex << std::setfill('0') << std::setw(2) << (int)data[i] << " "; - if (i % 16 == 15) std::cout << std::endl; - } - if (size % 16 != 0) std::cout << std::endl; -} - -int main() { - using namespace bt; - - const std::string filename = "debug_step.bin"; - if (std::filesystem::exists(filename)) { - std::filesystem::remove(filename); - } - - BinaryTable table(filename); - table.initialize(); - - std::cout << "=== Step-by-step Address Table Debug ===\n" << std::endl; - - std::cout << "After initialize():" << std::endl; - dumpFile(filename); - - std::cout << "\n1. Before storing key1:" << std::endl; - // Try reading the address table header - { - std::ifstream file(filename, std::ios::binary); - int64_t addr; - file.read(reinterpret_cast(&addr), 8); - std::cout << "Address table pointer: " << addr << std::endl; - } - - std::cout << "\n2. Storing key1..." << std::endl; - table.set("key1", 100); - - std::cout << "After storing key1:" << std::endl; - dumpFile(filename); - - // Try reading the address table - { - std::ifstream file(filename, std::ios::binary); - int64_t addr; - file.read(reinterpret_cast(&addr), 8); - std::cout << "Address table pointer: " << addr << std::endl; - - if (addr != -1) { - file.seekg(addr); - uint8_t type; - int32_t count; - file.read(reinterpret_cast(&type), 1); - file.read(reinterpret_cast(&count), 4); - std::cout << "Address table type: " << (int)type << ", count: " << count << std::endl; - } - } - - std::cout << "\n3. Storing key2..." << std::endl; - table.set("key2", 200); - - std::cout << "After storing key2:" << std::endl; - dumpFile(filename); - - // Try reading the address table again - { - std::ifstream file(filename, std::ios::binary); - int64_t addr; - file.read(reinterpret_cast(&addr), 8); - std::cout << "Address table pointer: " << addr << std::endl; - - if (addr != -1) { - file.seekg(addr); - uint8_t type; - int32_t count; - file.read(reinterpret_cast(&type), 1); - file.read(reinterpret_cast(&count), 4); - std::cout << "Address table type: " << (int)type << ", count: " << count << std::endl; - - // Read the entries - for (int32_t i = 0; i < count && i < 5; i++) { - int64_t keyHash, valueAddr; - file.read(reinterpret_cast(&keyHash), 8); - file.read(reinterpret_cast(&valueAddr), 8); - std::cout << "Entry " << i << ": hash=" << keyHash << ", addr=" << valueAddr << std::endl; - } - } - } - - return 0; -} \ No newline at end of file diff --git a/cpp/main.cpp b/cpp/main.cpp deleted file mode 100644 index 9abc9e6..0000000 --- a/cpp/main.cpp +++ /dev/null @@ -1,213 +0,0 @@ -#include -#include -#include "binary_table.h" - -void printBinaryDump(const std::vector& data) { - for (size_t i = 0; i < data.size(); i += 16) { - // Address - printf("0x%04X (%4zu) | ", static_cast(i), i); - - // Hex bytes - for (int j = 0; j < 16; j++) { - if (i + j < data.size()) { - printf("%02X ", data[i + j]); - } else { - printf(" "); - } - } - - printf(" | "); - - // Integer representation - for (int j = 0; j < 16; j++) { - if (i + j < data.size()) { - printf("%3d ", data[i + j]); - } else { - printf(" "); - } - } - - printf(" | "); - - // ASCII representation - for (int j = 0; j < 16; j++) { - if (i + j < data.size()) { - uint8_t byte = data[i + j]; - if (byte >= 32 && byte <= 126) { - printf("%c", static_cast(byte)); - } else { - printf("."); - } - } - } - - printf(" |\n"); - } -} - -std::vector readFile(const std::string& path) { - std::ifstream file(path, std::ios::binary); - file.seekg(0, std::ios::end); - size_t size = file.tellg(); - file.seekg(0, std::ios::beg); - - std::vector data(size); - file.read(reinterpret_cast(data.data()), size); - return data; -} - -int main() { - using namespace bt; - - std::cout << "C++ Binary Table - Reading Dart Reference File" << std::endl; - std::cout << "===============================================" << std::endl; - - // Read the file created by Dart - const std::string filename = "dart_reference.bin"; - if (!std::filesystem::exists(filename)) { - std::cout << "โŒ Reference file not found: " << filename << std::endl; - return 1; - } - - std::cout << "๐Ÿ“ Reading reference file created by Dart..." << std::endl; - auto fileData = readFile(filename); - printBinaryDump(fileData); - std::cout << "File size: " << fileData.size() << " bytes\n" << std::endl; - - // Try to read the file with C++ implementation - try { - BinaryTable table(filename); - - std::cout << "๐Ÿ” Testing C++ reading of Dart-created file..." << std::endl; - - // Try to read the arrays that Dart created - std::cout << "Attempting to read 'int_array'..." << std::endl; - try { - auto intArray = table.getArray("int_array"); - std::cout << "โœ… int_array found, length: " << intArray.length() << std::endl; - - if (intArray.length() > 0) { - std::cout << "First few elements: "; - int count = std::min(5, static_cast(intArray.length())); - for (int i = 0; i < count; i++) { - std::cout << intArray[i] << " "; - } - std::cout << std::endl; - } - } catch (const std::exception& e) { - std::cout << "โŒ Failed to read int_array: " << e.what() << std::endl; - } - - std::cout << "\nAttempting to read 'float_array'..." << std::endl; - try { - auto floatArray = table.getArray("float_array"); - std::cout << "โœ… float_array found, length: " << floatArray.length() << std::endl; - - if (floatArray.length() > 0) { - std::cout << "First few elements: "; - int count = std::min(5, static_cast(floatArray.length())); - for (int i = 0; i < count; i++) { - std::cout << floatArray[i] << " "; - } - std::cout << std::endl; - } - } catch (const std::exception& e) { - std::cout << "โŒ Failed to read float_array: " << e.what() << std::endl; - } - - std::cout << "\nAttempting to read 'empty' array..." << std::endl; - try { - auto emptyArray = table.getArray("empty"); - std::cout << "โœ… empty array found, length: " << emptyArray.length() << std::endl; - } catch (const std::exception& e) { - std::cout << "โŒ Failed to read empty array: " << e.what() << std::endl; - } - - } catch (const std::exception& e) { - std::cout << "โŒ Failed to read file: " << e.what() << std::endl; - } - - std::cout << "\n" << std::string(50, '=') << std::endl; - std::cout << "Testing C++ Writing -> C++ Reading" << std::endl; - std::cout << std::string(50, '=') << std::endl; - - // Test C++ writing by creating a simple file - const std::string testFilename = "cpp_test.bin"; - if (std::filesystem::exists(testFilename)) { - std::filesystem::remove(testFilename); - } - - try { - BinaryTable writeTable(testFilename); - writeTable.initialize(); - - std::cout << "๐Ÿ“ Writing simple data with C++..." << std::endl; - - // Write very simple data first - writeTable.set("test_int", 42); - std::cout << "โœ… Wrote integer" << std::endl; - - // Read it back immediately - int32_t readInt = writeTable.get("test_int"); - std::cout << "โœ… Read back integer: " << readInt << std::endl; - - // Write a simple array - writeTable.set>("simple_array", {1, 2, 3}); - std::cout << "โœ… Wrote simple array" << std::endl; - - auto readArray = writeTable.getArray("simple_array"); - std::cout << "โœ… Read back array, length: " << readArray.length() << std::endl; - - if (readArray.length() > 0) { - std::cout << "Array elements: "; - for (int i = 0; i < readArray.length(); i++) { - std::cout << readArray[i] << " "; - } - std::cout << std::endl; - } - - // Test array operations - std::cout << "\n๐Ÿ“ Testing array operations..." << std::endl; - readArray.set(0, 99); // Modify first element - readArray.add(4); // Add element - readArray.addAll({5, 6}); // Add multiple - - std::cout << "After modifications, length: " << readArray.length() << std::endl; - std::cout << "Elements: "; - for (int i = 0; i < readArray.length(); i++) { - std::cout << readArray[i] << " "; - } - std::cout << std::endl; - - // Test sublist - auto sublist = readArray.fetchSublist(0, 3); - std::cout << "Sublist (0-3): "; - for (auto val : sublist) { - std::cout << val << " "; - } - std::cout << std::endl; - - std::cout << "\n๐ŸŽ‰ C++ Implementation Status:" << std::endl; - std::cout << "โœ… File reading (Dart compatibility)" << std::endl; - std::cout << "โœ… File writing" << std::endl; - std::cout << "โœ… Basic data types (int, float, string)" << std::endl; - std::cout << "โœ… Array storage and retrieval" << std::endl; - std::cout << "โœ… Array operations (set, add, addAll)" << std::endl; - std::cout << "โœ… Array sublist fetching" << std::endl; - std::cout << "โœ… Type-safe template system" << std::endl; - std::cout << "โœ… Memory-efficient file access" << std::endl; - std::cout << "โœ… Full interoperability with Dart" << std::endl; - - } catch (const std::exception& e) { - std::cout << "โŒ C++ write/read test failed: " << e.what() << std::endl; - - // Show the file that was created - if (std::filesystem::exists(testFilename)) { - std::cout << "\nFile that was created:" << std::endl; - auto data = readFile(testFilename); - printBinaryDump(data); - } - } - - return 0; -} \ No newline at end of file diff --git a/cpp/parity_test.cpp b/cpp/parity_test.cpp deleted file mode 100644 index 0cdf266..0000000 --- a/cpp/parity_test.cpp +++ /dev/null @@ -1,197 +0,0 @@ -#include -#include -#include -#include -#include "binary_table.h" - -void printBinaryDump(const std::string& filename) { - std::ifstream file(filename, std::ios::binary); - if (!file) { - std::cout << "Cannot open file for dump" << std::endl; - return; - } - - file.seekg(0, std::ios::end); - size_t size = file.tellg(); - file.seekg(0, std::ios::beg); - - std::vector data(size); - file.read(reinterpret_cast(data.data()), size); - file.close(); - - std::cout << "\n=== Binary Dump of " << filename << " (" << size << " bytes) ===" << std::endl; - - for (size_t i = 0; i < data.size(); i += 16) { - printf("0x%04X | ", static_cast(i)); - - // Hex bytes - for (int j = 0; j < 16; j++) { - if (i + j < data.size()) { - printf("%02X ", data[i + j]); - } else { - printf(" "); - } - } - - printf(" | "); - - // ASCII representation - for (int j = 0; j < 16; j++) { - if (i + j < data.size()) { - uint8_t byte = data[i + j]; - printf("%c", (byte >= 32 && byte <= 126) ? byte : '.'); - } - } - - printf("\n"); - } - std::cout << "=========================" << std::endl; -} - -// Test equivalent to Dart's main() function -int main() { - std::cout << "๐Ÿงช C++ Binary Table Parity Test (matching Dart behavior)" << std::endl; - std::cout << "=========================================================" << std::endl; - - const std::string filename = "cpp_parity_test.bin"; - - // Clean up any existing file - std::filesystem::remove(filename); - - try { - bt::BinaryTable table(filename); - table.initialize(); - - std::cout << "\n1. Testing basic data types..." << std::endl; - - // Set basic values - table.set("myInt", 42); - table.set("myFloat", 3.14f); - table.set("myString", "Hello, World!"); - - // Verify basic values - assert(table.get("myInt") == 42); - assert(table.get("myFloat") == 3.14f); - assert(table.get("myString") == "Hello, World!"); - - std::cout << "โœ… Basic data types work correctly" << std::endl; - - std::cout << "\n2. Testing array operations..." << std::endl; - - // Test array creation and access - std::vector testArray = {10, 20, 30, 40, 50}; - table.set>("myArray", testArray); - - auto retrievedArray = table.get>("myArray"); - assert(retrievedArray.size() == 5); - for (size_t i = 0; i < retrievedArray.size(); i++) { - assert(retrievedArray[i] == testArray[i]); - } - - std::cout << "โœ… Array storage and retrieval work correctly" << std::endl; - - // Test uniform array operations - auto uniformArray = table.getArray("myArray"); - assert(uniformArray.length() == 5); - assert(uniformArray[0] == 10); - assert(uniformArray[4] == 50); - - // Test array modification - uniformArray.set(2, 999); - assert(uniformArray[2] == 999); - - // Test array extension - uniformArray.add(60); - assert(uniformArray.length() == 6); - assert(uniformArray[5] == 60); - - std::cout << "โœ… Uniform array operations work correctly" << std::endl; - - std::cout << "\n3. Testing multi-key operations (previously causing corruption)..." << std::endl; - - // Add multiple keys to test address table stability - table.set("key1", 100); - table.set("key2", 200); - table.set("key3", 300); - table.set("str1", "First"); - table.set("str2", "Second"); - - // Verify all keys are accessible - assert(table.get("key1") == 100); - assert(table.get("key2") == 200); - assert(table.get("key3") == 300); - assert(table.get("str1") == "First"); - assert(table.get("str2") == "Second"); - - std::cout << "โœ… Multi-key operations work without corruption" << std::endl; - - std::cout << "\n4. Testing remove operations..." << std::endl; - - // Test removal - table.remove("key2"); - - // Verify removed key is gone - try { - table.get("key2"); - assert(false && "Should have thrown exception"); - } catch (const std::runtime_error&) { - // Expected - } - - // Verify other keys still work - assert(table.get("key1") == 100); - assert(table.get("key3") == 300); - - std::cout << "โœ… Remove operations work correctly" << std::endl; - - std::cout << "\n5. Testing fetchSublist functionality..." << std::endl; - - auto sublist = uniformArray.fetchSublist(1, 4); - assert(sublist.size() == 3); - assert(sublist[0] == 20); // myArray[1] - assert(sublist[1] == 999); // myArray[2] (modified) - assert(sublist[2] == 40); // myArray[3] - - std::cout << "โœ… fetchSublist works correctly" << std::endl; - - std::cout << "\n6. Testing free list and truncation operations..." << std::endl; - - // Create some data, then remove it to test free list - table.set("temp1", 1000); - table.set("temp2", 2000); - table.set("temp3", 3000); - - table.remove("temp1"); - table.remove("temp2"); - table.remove("temp3"); - - // Test truncation - table.truncate(); - - // Verify original data still accessible - assert(table.get("myInt") == 42); - assert(table.get("myString") == "Hello, World!"); - assert(table.get("key1") == 100); - - std::cout << "โœ… Free list and truncation work correctly" << std::endl; - - std::cout << "\n๐ŸŽ‰ ALL TESTS PASSED! C++ implementation has Dart parity!" << std::endl; - - // Print final file dump for verification - printBinaryDump(filename); - - // Clean up - std::filesystem::remove(filename); - - return 0; - - } catch (const std::exception& e) { - std::cout << "โŒ Test failed: " << e.what() << std::endl; - - // Print file dump for debugging - printBinaryDump(filename); - - std::filesystem::remove(filename); - return 1; - } -} \ No newline at end of file diff --git a/dart/CHANGELOG.md b/dart/CHANGELOG.md new file mode 100644 index 0000000..87d1c6a --- /dev/null +++ b/dart/CHANGELOG.md @@ -0,0 +1,2 @@ + +1.0.0 Initial Release \ No newline at end of file diff --git a/dart/lib/concurrency.dart b/dart/lib/concurrency.dart new file mode 100644 index 0000000..bf75286 --- /dev/null +++ b/dart/lib/concurrency.dart @@ -0,0 +1,190 @@ + + +import 'dart:io'; +import 'dart:isolate'; +import 'dart:math'; + +import 'package:sweepstore/header.dart'; +import 'package:sweepstore/helper_extensions.dart'; +import 'package:sweepstore/structures.dart'; + +int _randomId() { + // mix timestamp with random for better uniquness + // keep it positive to avoid signed int issues when storing + int time = DateTime.now().millisecondsSinceEpoch32(); + int random = Random().nextInt(0x80000000); + return (time ^ random) & 0x7FFFFFFF; +} + +// Spawn a ticket for a worker to perform an operation +void spawnTicket(RandomAccessFile file, { + required SweepstoreTicketOperation operation, + required int keyHash, + required int writeSize, + required void Function() onApproved, + String? debugLabel, +}) { + + void log(String message) { + + String prefix = debugLabel != null ? "\x1B[38;5;208m[Ticket Spawner - $debugLabel]:\x1B[0m " : "\x1B[38;5;208m[Ticket Spawner]:\x1B[0m "; + print("$prefix$message"); + + } + void tickSleep([int microsecondVariance = 10]) { + sleep(Duration(microseconds: 100 + Random().nextInt(microsecondVariance))); + } + Map expSleepTracker = {}; + void expSleep(String label) { + int count = expSleepTracker[label] ?? 0; + int sleepTime = (1 << count); // Exponential backoff + sleep(Duration(milliseconds: sleepTime)); + expSleepTracker[label] = count + 1; + } + + // Reduce the chance of race conditions by adding a small random delay + tickSleep(100); + + SweepstoreHeader header = SweepstoreHeader(file); + SweepstoreConcurrencyHeader concurrencyHeader = SweepstoreConcurrencyHeader(header); + + int? ticketIndex; + int myIdentifier = _randomId(); + + // Try to acquire a ticket - (Acquire loop) + while (ticketIndex == null) { + + for (int i = 0; i < concurrencyHeader.numberOfWorkers; i++) { + + SweepstoreWorkerTicket ticket = concurrencyHeader[i]; + + if (!ticket.writable()) { + continue; + } + + int identifier = ticket.identifier; + + bool identifier_unassigned = identifier == 0; + bool stale_heartbeat = (DateTime.now().millisecondsSinceEpoch32() - ticket.workerHeartbeat) > 2000; + bool is_free = ticket.ticketState == SweepstoreTicketState.FREE; + + if (identifier_unassigned && stale_heartbeat && is_free) { + ticket.write( + identifier: myIdentifier, + ticketState: SweepstoreTicketState.WAITING, + ); + ticketIndex = i; + log("Acquired ticket $ticketIndex with identifier $myIdentifier."); + break; + + } + + } + + expSleep("acquire_loop"); + + // Ensure we still own the ticket - if not, reset and try again + if (ticketIndex != null) { + SweepstoreWorkerTicket ticket = concurrencyHeader[ticketIndex]; + if (ticket.identifier != myIdentifier) { + log("Lost ticket $ticketIndex, retrying..."); + ticketIndex = null; + } + } + } + + // We have a ticket, set it up + SweepstoreWorkerTicket myTicket = concurrencyHeader[ticketIndex]; + myTicket.write( + workerHeartbeat: DateTime.now().millisecondsSinceEpoch32(), + ticketState: SweepstoreTicketState.WAITING, + ticketOperation: operation, + keyHash: keyHash, + writeSize: writeSize, + ); + + // Wait for approval - (Approval loop) + while (true) { + + // Check we still own the ticket + if (myTicket.identifier != myIdentifier) { + String exceptionMessage = "CRITICAL: Lost ownership of ticket $ticketIndex, was expecting identifier $myIdentifier but found ${myTicket.identifier}."; + throw Exception(exceptionMessage); + } + + if (myTicket.ticketState == SweepstoreTicketState.APPROVED) { + myTicket.write( + ticketState: SweepstoreTicketState.EXECUTING, + ); + onApproved(); + myTicket.write( + ticketState: SweepstoreTicketState.COMPLETED, + ); + + break; + } + + // randomSleep(10); + tickSleep(); + + // Update heartbeat + if (DateTime.now().millisecondsSinceEpoch32() != myTicket.workerHeartbeat) { + myTicket.write( + workerHeartbeat: myTicket.workerHeartbeat, + ); + } + } + + + +} + +// Master side +void initialiseMasterListener(RandomAccessFile file) async { + String filePath = file.path; + Isolate.spawn((_) { + + void log(String message) { + print("\x1B[38;5;82m[Master Listener]:\x1B[0m $message"); + } + + RandomAccessFile file = File(filePath).openSync(mode: FileMode.append); + + SweepstoreHeader header = SweepstoreHeader(file); + SweepstoreConcurrencyHeader concurrencyHeader = SweepstoreConcurrencyHeader(header); + + while (true) { + + for (int i = 0; i < concurrencyHeader.numberOfWorkers; i++) { + + SweepstoreWorkerTicket ticket = concurrencyHeader[i]; + + if (ticket.ticketState == SweepstoreTicketState.WAITING) { + log("Found waiting ticket $i (Key Hash: ${ticket.keyHash})..."); + + // Approve the ticket + ticket.write( + ticketState: SweepstoreTicketState.APPROVED, + ); + log("Approved ticket $i."); + } else if (ticket.ticketState == SweepstoreTicketState.COMPLETED) { + log("Ticket $i completed. Resetting ticket..."); + // Reset the ticket + ticket.write( + identifier: 0, + workerHeartbeat: 0, + ticketState: SweepstoreTicketState.FREE, + ticketOperation: SweepstoreTicketOperation.NONE, + keyHash: 0, + writeSize: 0, + ); + log("Reset ticket $i."); + } + + } + + sleep(Duration(milliseconds: 1)); + + } + }, null); +} \ No newline at end of file diff --git a/dart/lib/debug.dart b/dart/lib/debug.dart new file mode 100644 index 0000000..d7e6b9b --- /dev/null +++ b/dart/lib/debug.dart @@ -0,0 +1,50 @@ + +import 'dart:typed_data'; + +String binaryDump(Uint8List data) { + StringBuffer buffer = StringBuffer(); + + for (int i = 0; i < data.length; i += 16) { + // Address + buffer.write('0x${i.toRadixString(16).padLeft(4, '0').toUpperCase()} (${i.toString().padLeft(4)}) | '); + + // Hex bytes + for (int j = 0; j < 16; j++) { + if (i + j < data.length) { + buffer.write('${data[i + j].toRadixString(16).padLeft(2, '0').toUpperCase()} '); + } else { + buffer.write(' '); + } + } + + buffer.write(' | '); + + // Integer representation + for (int j = 0; j < 16; j++) { + if (i + j < data.length) { + buffer.write('${data[i + j].toString().padLeft(3)} '); + } else { + buffer.write(' '); + } + } + + buffer.write(' | '); + + // ASCII representation + for (int j = 0; j < 16; j++) { + if (i + j < data.length) { + int byte = data[i + j]; + if (byte >= 32 && byte <= 126) { + buffer.write(String.fromCharCode(byte)); + } else { + buffer.write('.'); + } + } + } + + buffer.write(' | '); + if (i + 16 < data.length) buffer.writeln(); + } + + return buffer.toString(); +} \ No newline at end of file diff --git a/dart/lib/dev_tools/watch_dump.dart b/dart/lib/dev_tools/watch_dump.dart new file mode 100644 index 0000000..67ea985 --- /dev/null +++ b/dart/lib/dev_tools/watch_dump.dart @@ -0,0 +1,31 @@ +import 'dart:io'; +import 'dart:async'; +import 'package:sweepstore/debug.dart'; + +void main() async { + int refreshCount = 0; + while (true) { + // Clear console + if (Platform.isWindows) { + print(Process.runSync("cls", [], runInShell: true).stdout); + } else { + print(Process.runSync("clear", [], runInShell: true).stdout); + } + + refreshCount++; + + // Read example.bin + final file = File('example.bin'); + + if (await file.exists()) { + final data = await file.readAsBytes(); + print('Binary dump of example.bin (${data.length} bytes) - Refresh #$refreshCount\n'); + print(binaryDump(data)); + print('\n--- Refreshing in 1 seconds ---'); + } else { + print('Error: example.bin not found - Refresh #$refreshCount'); + } + + await Future.delayed(Duration(seconds: 1)); + } +} \ No newline at end of file diff --git a/dart/lib/dev_tools/watch_tickets.dart b/dart/lib/dev_tools/watch_tickets.dart new file mode 100644 index 0000000..9fa0221 --- /dev/null +++ b/dart/lib/dev_tools/watch_tickets.dart @@ -0,0 +1,84 @@ +import 'dart:io'; +import 'dart:async'; +import 'package:sweepstore/header.dart'; + + +void main() async { + int refreshCount = 0; + int? previousMasterHeartbeat; + Map previousWorkerHeartbeats = {}; + + while (true) { + // Clear console + if (Platform.isWindows) { + print(Process.runSync("cls", [], runInShell: true).stdout); + } else { + print(Process.runSync("clear", [], runInShell: true).stdout); + } + + refreshCount++; + + // Read example.bin + final file = File('example.bin'); + + if (await file.exists()) { + final raf = await file.open(mode: FileMode.read); + + + try { + final header = SweepstoreHeader(raf); + final concurrency = header.concurrency; + + int now32 = (DateTime.now().millisecondsSinceEpoch ~/ 1000) & 0xFFFFFFFF; + + print('Sweepstore Tickets - Refresh #$refreshCount'); + print('Current Time (now32): $now32'); + print('Master ID: ${concurrency.masterIdentifier}'); + + int masterAge = now32 - concurrency.masterHeartbeat; + String masterStatus = masterAge > 5 ? "(stale)" : "(active)"; + String masterPrevious = previousMasterHeartbeat != null ? "(previously $previousMasterHeartbeat)" : ""; + print('Master Heartbeat: ${concurrency.masterHeartbeat} $masterStatus $masterPrevious'); + + print('Workers: ${concurrency.numberOfWorkers}'); + print('Read Allowed: ${concurrency.isReadAllowed}'); + print(''); + + // display each ticket + for (int i = 0; i < concurrency.numberOfWorkers; i++) { + final ticket = concurrency[i]; + + print('--- Ticket #$i ---'); + print(' Identifier: ${ticket.identifier}'); + + int workerAge = now32 - ticket.workerHeartbeat; + String workerStatus = workerAge > 5 ? "(stale)" : "(active)"; + String workerPrevious = previousWorkerHeartbeats.containsKey(i) ? "(previously ${previousWorkerHeartbeats[i]})" : ""; + print(' Heartbeat: ${ticket.workerHeartbeat} $workerStatus $workerPrevious'); + + print(' State: ${ticket.ticketState.name}'); + print(' Operation: ${ticket.ticketOperation.name}'); + print(' Key Hash: ${ticket.keyHash}'); + print(' Write Ptr: ${ticket.writePointer}'); + print(' Write Size: ${ticket.writeSize} bytes'); + print(''); + + // update previous heartbeat + previousWorkerHeartbeats[i] = ticket.workerHeartbeat; + + } + + // updat previous master heartbeat + previousMasterHeartbeat = concurrency.masterHeartbeat; + + print('--- Refreshing in 1 seconds ---'); + } finally { + await raf.close(); + } + } else { + print('Error: example.bin not found - Refresh #$refreshCount'); + } + + await Future.delayed(Duration(seconds: 1)); + } +} \ No newline at end of file diff --git a/dart/lib/header.dart b/dart/lib/header.dart new file mode 100644 index 0000000..99f0fd2 --- /dev/null +++ b/dart/lib/header.dart @@ -0,0 +1,440 @@ + +import 'dart:convert'; +import 'dart:io'; +import 'package:sweepstore/structures.dart'; + +import 'helper_extensions.dart'; + +void initialiseSweepstoreHeader(RandomAccessFile file, { + int concurrentWorkers = 4, +}) { + + SweepstoreHeaderWriter header = SweepstoreHeaderWriter(file); + + if (header.magicNumber == 'SWPT') { + throw ArgumentError('Sweepstore file is already initialised.'); + } + + SweepstoreConcurrencyHeaderWriter concurrencyHeader = SweepstoreConcurrencyHeaderWriter(header); + + header.magicNumber = 'SWPT'; + header.version = "undefined"; + header.addressTablePointer = SweepstorePointer.nullptr; + header.freeListCount = 0; + header.isFreeListLifted = false; + concurrencyHeader.masterIdentifier = 0; + concurrencyHeader.masterHeartbeat = 0; + concurrencyHeader.numberOfWorkers = concurrentWorkers; + concurrencyHeader.isReadAllowed = false; + + for (int i = 0; i < concurrentWorkers; i++) { + SweepstoreWorkerTicket ticket = concurrencyHeader[i]; + ticket.write( + identifier: 0, + workerHeartbeat: 0, + ticketState: SweepstoreTicketState.FREE, + ticketOperation: SweepstoreTicketOperation.NONE, + keyHash: 0, + writePointer: SweepstorePointer.nullptr, + writeSize: 0, + ); + } + +} + +class SweepstoreHeader { + + final RandomAccessFile _file; + + SweepstoreHeader(this._file); + + // Offset 0 - 4 bytes + String get magicNumber { + _file.setPositionSync(0); + final bytes = _file.readSync(4); + return String.fromCharCodes(bytes); + } + + // Offset 4 - 12 bytes + String get version { + _file.setPositionSync(4); + String version = utf8.decode(_file.readSync(12)).trim(); + return version; + } + + // Offset 16 - 8 bytes + SweepstorePointer get addressTablePointer { + _file.setPositionSync(16); + final address = _file.readIntSync(8); + return SweepstorePointer(address); + } + + // Offset 24 - 4 bytes + int get freeListCount { + _file.setPositionSync(24); + int count = _file.readIntSync(4); + return count; + } + + // Offset 28 - 1 byte + bool get isFreeListLifted { + _file.setPositionSync(28); + int flag = _file.readIntSync(1); + return flag != 0; + } + + SweepstoreConcurrencyHeader get concurrency => SweepstoreConcurrencyHeader(this); +} + +class SweepstoreHeaderWriter extends SweepstoreHeader { + + SweepstoreHeaderWriter(RandomAccessFile file) : super(file); + + // Offset 0 - 4 bytes + void set magicNumber(String value) { + if (value.length != 4) { + throw ArgumentError('Magic number must be exactly 4 characters long'); + } + _file.setPositionSync(0); + _file.writeFromSync(value.codeUnits); + } + + // Offset 4 - 12 bytes + void set version(String value) { + if (value.length > 11) { + throw ArgumentError('Version string must be at most 11 characters long'); + } + _file.setPositionSync(4); + _file.writeFromSync(utf8.encode(" " + value.padRight(11, ' ').substring(0, 11))); + } + + // Offset 16 - 8 bytes + void set addressTablePointer(SweepstorePointer pointer) { + _file.setPositionSync(16); + _file.writeIntSync(pointer.address, 8); + } + + // Offset 24 - 4 bytes + void set freeListCount(int value) { + _file.setPositionSync(24); + _file.writeIntSync(value, 4); + } + + // Offset 28 - 1 byte + void set isFreeListLifted(bool lifted) { + _file.setPositionSync(28); + _file.writeIntSync(lifted ? 1 : 0, 1); + } +} + +class SweepstoreConcurrencyHeader { + + final SweepstoreHeader _header; + + SweepstoreConcurrencyHeader(this._header); + + // Offset 29 - 8 bytes + int get masterIdentifier { + _header._file.setPositionSync(29); + int id = _header._file.readIntSync(8); + return id; + } + + // Offset 37 - 4 bytes + int get masterHeartbeat { + _header._file.setPositionSync(37); + int heartbeat = _header._file.readIntSync(4); + return heartbeat; + } + + // Offset 41 - 4 bytes + int get numberOfWorkers { + _header._file.setPositionSync(41); + int numWorkers = _header._file.readIntSync(4); + return numWorkers; + } + + // Offset 45 - 1 byte + bool get isReadAllowed { + _header._file.setPositionSync(45); + int flag = _header._file.readIntSync(1); + return flag != 0; + } + + SweepstoreWorkerTicket operator [](int ticketIndex) { + if (ticketIndex < 0 || ticketIndex >= numberOfWorkers) { + throw RangeError.index(ticketIndex, this, 'ticketIndex', null, numberOfWorkers); + } + return SweepstoreWorkerTicket(ticketIndex, this); + } + +} + +class SweepstoreConcurrencyHeaderWriter extends SweepstoreConcurrencyHeader { + + SweepstoreConcurrencyHeaderWriter(SweepstoreHeader header) : super(header); + + // Offset 29 - 8 bytes + void set masterIdentifier(int id) { + _header._file.setPositionSync(29); + _header._file.writeIntSync(id, 8); + } + + // Offset 37 - 4 bytes + void set masterHeartbeat(int heartbeat) { + _header._file.setPositionSync(37); + _header._file.writeIntSync(heartbeat, 4); + } + + // Offset 41 - 4 bytes + void set numberOfWorkers(int numWorkers) { + _header._file.setPositionSync(41); + _header._file.writeIntSync(numWorkers, 4); + } + + // Offset 45 - 1 byte + void set isReadAllowed(bool allowed) { + _header._file.setPositionSync(45); + _header._file.writeIntSync(allowed ? 1 : 0, 1); + } + +} + +const int endOfStaticHeaderOffset = 46; + +class SweepstoreWorkerTicket { + + static const int _ticketSize = 30; + final SweepstoreConcurrencyHeader _concurrencyHeader; + final int ticketIndex; + + SweepstoreWorkerTicket(this.ticketIndex, this._concurrencyHeader); + + // All offsets are relative to the start of the workers ticket + int get _baseOffset => endOfStaticHeaderOffset + (ticketIndex * _ticketSize); + + // Offset 0 - 4 bytes + int get identifier { + _concurrencyHeader._header._file.setPositionSync(_baseOffset); + int id = _concurrencyHeader._header._file.readIntSync(4); + return id; + } + + // Offset 4 - 4 bytes + int get workerHeartbeat { + _concurrencyHeader._header._file.setPositionSync(_baseOffset + 4); + int heartbeat = _concurrencyHeader._header._file.readIntSync(4); + return heartbeat; + } + + // Offset 8 - 1 byte + SweepstoreTicketState get ticketState { + _concurrencyHeader._header._file.setPositionSync(_baseOffset + 8); + int stateValue = _concurrencyHeader._header._file.readIntSync(1); + return SweepstoreTicketState.values[stateValue]; + } + + // Offset 9 - 1 byte + SweepstoreTicketOperation get ticketOperation { + _concurrencyHeader._header._file.setPositionSync(_baseOffset + 9); + int operationValue = _concurrencyHeader._header._file.readIntSync(1); + return SweepstoreTicketOperation.values[operationValue]; + } + + // Offset 10 - 8 bytes + int get keyHash { + _concurrencyHeader._header._file.setPositionSync(_baseOffset + 10); + int hash = _concurrencyHeader._header._file.readIntSync(8); + return hash; + } + + // Offset 18 - 8 bytes + SweepstorePointer get writePointer { + _concurrencyHeader._header._file.setPositionSync(_baseOffset + 18); + int address = _concurrencyHeader._header._file.readIntSync(8); + return SweepstorePointer(address); + } + + // Offset 26 - 4 bytes + int get writeSize { + _concurrencyHeader._header._file.setPositionSync(_baseOffset + 26); + int size = _concurrencyHeader._header._file.readIntSync(4); + return size; + } + + // Writer + void write({ + int? identifier, + int? workerHeartbeat, + SweepstoreTicketState? ticketState, + SweepstoreTicketOperation? ticketOperation, + int? keyHash, + SweepstorePointer? writePointer, + int? writeSize, + }) { + + try { + + _concurrencyHeader._header._file.lockSync(FileLock.blockingExclusive, _baseOffset, _baseOffset + _ticketSize); + + _concurrencyHeader._header._file.setPositionSync(_baseOffset); + List existingBuffer = _concurrencyHeader._header._file.readSync(_ticketSize); + RandomAccessMemory buffer = RandomAccessMemory(existingBuffer); + + if (identifier != null) { + buffer.setPositionSync(0); + buffer.writeIntSync(identifier, 4); + } + if (workerHeartbeat != null) { + buffer.setPositionSync(4); + buffer.writeIntSync(workerHeartbeat, 4); + } + if (ticketState != null) { + buffer.setPositionSync(8); + buffer.writeIntSync(ticketState.index, 1); + } + if (ticketOperation != null) { + buffer.setPositionSync(9); + buffer.writeIntSync(ticketOperation.index, 1); + } + if (keyHash != null) { + buffer.setPositionSync(10); + buffer.writeIntSync(keyHash, 8); + } + if (writePointer != null) { + buffer.setPositionSync(18); + buffer.writeIntSync(writePointer.address, 8); + } + if (writeSize != null) { + buffer.setPositionSync(26); + buffer.writeIntSync(writeSize, 4); + } + + _concurrencyHeader._header._file.setPositionSync(_baseOffset); + _concurrencyHeader._header._file.writeFromSync(buffer.toUint8List()); + + } finally { + _concurrencyHeader._header._file.unlockSync(_baseOffset, _baseOffset + _ticketSize); + } + } + + bool writable() { + try { + _concurrencyHeader._header._file.lockSync( + FileLock.blockingExclusive, + _baseOffset, + _baseOffset + _ticketSize + ); + // Successfully locked - immediately unlock and return true + _concurrencyHeader._header._file.unlockSync(_baseOffset, _baseOffset + _ticketSize); + return true; + } catch (e) { + // Lock failed - already held by another process + return false; + } + } +} + +@deprecated +class _SweepstoreWorkerTicket { + + static const int _ticketSize = 30; + + final SweepstoreConcurrencyHeader _concurrencyHeader; + final int ticketIndex; + + _SweepstoreWorkerTicket(this.ticketIndex, this._concurrencyHeader); + + // All offsets are relative to the start of the workers ticket + int get _baseOffset => endOfStaticHeaderOffset + (ticketIndex * _ticketSize); + + // Offset 0 - 4 bytes + int get identifier { + _concurrencyHeader._header._file.setPositionSync(_baseOffset); + int id = _concurrencyHeader._header._file.readIntSync(4); + return id; + } + set identifier(int id) { + _concurrencyHeader._header._file.setPositionSync(_baseOffset); + _concurrencyHeader._header._file.writeIntSync(id, 4); + _concurrencyHeader._header._file.flushSync(); + } + + // Offset 4 - 4 bytes + int get workerHeartbeat { + _concurrencyHeader._header._file.setPositionSync(_baseOffset + 4); + int heartbeat = _concurrencyHeader._header._file.readIntSync(4); + return heartbeat; + } + set workerHeartbeat(int heartbeat) { + _concurrencyHeader._header._file.setPositionSync(_baseOffset + 4); + _concurrencyHeader._header._file.writeIntSync(heartbeat, 4); + } + + // Offset 8 - 1 byte + SweepstoreTicketState get ticketState { + _concurrencyHeader._header._file.setPositionSync(_baseOffset + 8); + int stateValue = _concurrencyHeader._header._file.readIntSync(1); + return SweepstoreTicketState.values[stateValue]; + } + set ticketState(SweepstoreTicketState state) { + _concurrencyHeader._header._file.setPositionSync(_baseOffset + 8); + _concurrencyHeader._header._file.writeIntSync(state.index, 1); + _concurrencyHeader._header._file.flushSync(); + } + + // Offset 9 - 1 byte + SweepstoreTicketOperation get ticketOperation { + _concurrencyHeader._header._file.setPositionSync(_baseOffset + 9); + int operationValue = _concurrencyHeader._header._file.readIntSync(1); + return SweepstoreTicketOperation.values[operationValue]; + } + set ticketOperation(SweepstoreTicketOperation operation) { + _concurrencyHeader._header._file.setPositionSync(_baseOffset + 9); + _concurrencyHeader._header._file.writeIntSync(operation.index, 1); + } + + // Offset 10 - 8 bytes + int get keyHash { + _concurrencyHeader._header._file.setPositionSync(_baseOffset + 10); + int hash = _concurrencyHeader._header._file.readIntSync(8); + return hash; + } + set keyHash(int hash) { + _concurrencyHeader._header._file.setPositionSync(_baseOffset + 10); + _concurrencyHeader._header._file.writeIntSync(hash, 8); + } + + // Offset 18 - 8 bytes + SweepstorePointer get writePointer { + _concurrencyHeader._header._file.setPositionSync(_baseOffset + 18); + int address = _concurrencyHeader._header._file.readIntSync(8); + return SweepstorePointer(address); + } + set writePointer(SweepstorePointer pointer) { + _concurrencyHeader._header._file.setPositionSync(_baseOffset + 18); + _concurrencyHeader._header._file.writeIntSync(pointer.address, 8); + } + + // Offset 26 - 4 bytes + int get writeSize { + _concurrencyHeader._header._file.setPositionSync(_baseOffset + 26); + int size = _concurrencyHeader._header._file.readIntSync(4); + return size; + } + set writeSize(int size) { + _concurrencyHeader._header._file.setPositionSync(_baseOffset + 26); + _concurrencyHeader._header._file.writeIntSync(size, 4); + } + + // Helpers + + void lock() { + _concurrencyHeader._header._file.lockSync(FileLock.exclusive, _baseOffset, _baseOffset + _ticketSize); + } + + void unlock() { + _concurrencyHeader._header._file.unlockSync(_baseOffset, _baseOffset + _ticketSize); + } + +} \ No newline at end of file diff --git a/dart/lib/helper_extensions.dart b/dart/lib/helper_extensions.dart new file mode 100644 index 0000000..5b02f9f --- /dev/null +++ b/dart/lib/helper_extensions.dart @@ -0,0 +1,228 @@ + +import 'dart:io'; +import 'dart:math'; +import 'dart:typed_data'; + +import 'package:sweepstore/structures.dart'; + + +class RandomAccessMemory { + List _buffer; + int _position = 0; + + RandomAccessMemory([List? initialData]) : _buffer = initialData != null ? List.from(initialData) : []; + + // Position management + int positionSync() => _position; + + void setPositionSync(int position) { + _position = position; + } + + int length() => _buffer.length; + + // Read bytes + List readSync(int count) { + if (_position + count > _buffer.length) { + throw RangeError('Not enough bytes to read'); + } + List result = _buffer.sublist(_position, _position + count); + _position += count; + return result; + } + + // Write bytes + void writeFromSync(List bytes) { + for (int i = 0; i < bytes.length; i++) { + if (_position + i >= _buffer.length) { + _buffer.add(bytes[i]); + } else { + _buffer[_position + i] = bytes[i]; + } + } + _position += bytes.length; + } + + // Read/Write Int Dynamic + int readIntSync([int size = 4, Endian endianness = Endian.little]) { + if (size < 1 || size > 8) { + throw ArgumentError('Size must be between 1 and 8 bytes'); + } + + List bytes = readSync(size); + + // Build integer from bytes with proper endianness + int result = 0; + if (endianness == Endian.little) { + for (int i = size - 1; i >= 0; i--) { + result = (result << 8) | bytes[i]; + } + } else { + for (int i = 0; i < size; i++) { + result = (result << 8) | bytes[i]; + } + } + + // Sign extend if MSB is set + int signBit = 1 << (size * 8 - 1); + if (result & signBit != 0) { + result -= 1 << (size * 8); + } + + return result; + } + + void writeIntSync(int value, [int size = 4, Endian endianness = Endian.little]) { + if (size < 1 || size > 8) { + throw ArgumentError('Size must be between 1 and 8 bytes'); + } + + List bytes = List.filled(size, 0); + + // Extract bytes with proper endianness + if (endianness == Endian.little) { + for (int i = 0; i < size; i++) { + bytes[i] = (value >> (i * 8)) & 0xFF; + } + } else { + for (int i = 0; i < size; i++) { + bytes[size - 1 - i] = (value >> (i * 8)) & 0xFF; + } + } + + writeFromSync(bytes); + } + + // Read/Write Pointers + SweepstorePointer readPointerSync() { + int offset = readIntSync(SweepstorePrimitives.POINTER.size); + return SweepstorePointer(offset); + } + + void writePointerSync(SweepstorePointer pointer) { + writeIntSync(pointer.address, SweepstorePrimitives.POINTER.size); + } + + // Read/Write Float32 + double readFloat32Sync([Endian endianness = Endian.little]) { + List bytes = readSync(4); + return ByteData.sublistView(Uint8List.fromList(bytes)).getFloat32(0, endianness); + } + + void writeFloat32Sync(double value, [Endian endianness = Endian.little]) { + ByteData byteData = ByteData(4); + byteData.setFloat32(0, value, endianness); + writeFromSync(byteData.buffer.asUint8List()); + } + + // Read/Write Float64 (Double) + double readFloat64Sync([Endian endianness = Endian.little]) { + List bytes = readSync(8); + return ByteData.sublistView(Uint8List.fromList(bytes)).getFloat64(0, endianness); + } + + void writeFloat64Sync(double value, [Endian endianness = Endian.little]) { + ByteData byteData = ByteData(8); + byteData.setFloat64(0, value, endianness); + writeFromSync(byteData.buffer.asUint8List()); + } + + // Conversion methods + List toList() => List.from(_buffer); + + Uint8List toUint8List() => Uint8List.fromList(_buffer); +} + +extension SweepstoreRandomAccessFileHelper on RandomAccessFile { + + // Read/Write Int Dynamic - Can specify size in bytes, does not have to align to 1, 2, 4, or 8 bytes. Default is 4 bytes (Int32) + int readIntSync([int size = 4, Endian endianness = Endian.little]) { + if (size < 1 || size > 8) { + throw ArgumentError('Size must be between 1 and 8 bytes'); + } + + List bytes = readSync(size); + + // Build integer from bytes with proper endianness + int result = 0; + if (endianness == Endian.little) { + for (int i = size - 1; i >= 0; i--) { + result = (result << 8) | bytes[i]; + } + } else { + for (int i = 0; i < size; i++) { + result = (result << 8) | bytes[i]; + } + } + + // Sign extend if MSB is set + int signBit = 1 << (size * 8 - 1); + if (result & signBit != 0) { + result -= 1 << (size * 8); + } + + return result; + } + + void writeIntSync(int value, [int size = 4, Endian endianness = Endian.little]) { + if (size < 1 || size > 8) { + throw ArgumentError('Size must be between 1 and 8 bytes'); + } + + List bytes = List.filled(size, 0); + + // Extract bytes with proper endianness + if (endianness == Endian.little) { + for (int i = 0; i < size; i++) { + bytes[i] = (value >> (i * 8)) & 0xFF; + } + } else { + for (int i = 0; i < size; i++) { + bytes[size - 1 - i] = (value >> (i * 8)) & 0xFF; + } + } + + writeFromSync(bytes); + } + + // Read/Write Pointers + SweepstorePointer readPointerSync() { + int offset = readIntSync(SweepstorePrimitives.POINTER.size); + return SweepstorePointer(offset); + } + void writePointerSync(SweepstorePointer pointer) { + writeIntSync(pointer.address, SweepstorePrimitives.POINTER.size); + } + + // Read/Write Float32 + double readFloat32Sync([Endian endianness = Endian.little]) { + List bytes = readSync(4); + return ByteData.sublistView(Uint8List.fromList(bytes)).getFloat32(0, endianness); + } + void writeFloat32Sync(double value, [Endian endianness = Endian.little]) { + ByteData byteData = ByteData(4); + byteData.setFloat32(0, value, endianness); + writeFromSync(byteData.buffer.asUint8List()); + } + + // Read/Write Float64 (Double) + double readFloat64Sync([Endian endianness = Endian.little]) { + List bytes = readSync(8); + return ByteData.sublistView(Uint8List.fromList(bytes)).getFloat64(0, endianness); + } + void writeFloat64Sync(double value, [Endian endianness = Endian.little]) { + ByteData byteData = ByteData(8); + byteData.setFloat64(0, value, endianness); + writeFromSync(byteData.buffer.asUint8List()); + } + +} + +extension SweepstoreDateTimeHelper on DateTime { + int millisecondsSinceEpoch32() { + return (millisecondsSinceEpoch ~/ 1000) & 0xFFFFFFFF; + } + int millisecondsSinceEpoch64() { + return millisecondsSinceEpoch; + } +} \ No newline at end of file diff --git a/dart/lib/structures.dart b/dart/lib/structures.dart new file mode 100644 index 0000000..89fbe3c --- /dev/null +++ b/dart/lib/structures.dart @@ -0,0 +1,48 @@ + +enum SweepstorePrimitives { + + POINTER (8), + ADDRESS_TABLE (-1); + + final int size; + final bool arrayType; + const SweepstorePrimitives(this.size, { + this.arrayType = false + }); + +} + +class SweepstorePointer { + + static const SweepstorePointer nullptr = SweepstorePointer(-1); + + final int address; + + const SweepstorePointer(this.address); + + bool get isNull => address == -1; + + operator ==(Object other) { + if (identical(this, other)) return true; + if (other is! SweepstorePointer) return false; + return address == other.address; + } + + @override + String toString() => '0x${address.toRadixString(16)} ($address)'; +} + +enum SweepstoreTicketState { + FREE, + WAITING, + APPROVED, + EXECUTING, + COMPLETED, +} + +enum SweepstoreTicketOperation { + NONE, + READ, + MODIFY, + WRITE, +} \ No newline at end of file diff --git a/dart/lib/sweepstore.dart b/dart/lib/sweepstore.dart new file mode 100644 index 0000000..8cda4ec --- /dev/null +++ b/dart/lib/sweepstore.dart @@ -0,0 +1,105 @@ + +import 'dart:io'; +import 'dart:isolate'; +import 'dart:math'; +import 'package:sweepstore/debug.dart'; +import 'package:sweepstore/header.dart'; +import 'package:sweepstore/structures.dart'; +import 'package:sweepstore/concurrency.dart'; + +class Sweepstore { + + final RandomAccessFile _file; + + Sweepstore(String filePath) + : _file = File(filePath).openSync(mode: FileMode.append) + { + _header = SweepstoreHeaderWriter(_file); + } + + late final SweepstoreHeaderWriter _header; + late final SweepstoreConcurrencyHeaderWriter _concurrencyHeader = SweepstoreConcurrencyHeaderWriter(_header); + + void initialise({ + int concurrentWorkers = 4, + }) { + + initialiseSweepstoreHeader(_file, + concurrentWorkers: concurrentWorkers, + ); + + _header.version = "1.1.0.1"; + print("Version: ${_header.version}"); + + } + + void operator []=(String key, dynamic value) { + + spawnTicket(_file, + operation: SweepstoreTicketOperation.WRITE, + keyHash: key.hashCode, + writeSize: 0, // Placeholder + onApproved: () { + print("Writing key: $key with hash ${key.hashCode} and value: $value"); + }, + debugLabel: key + ); + + } +} + +Future main() async { + + String filePath = '../example.bin'; + + File file = File(filePath); + if (file.existsSync()) { + file.deleteSync(); + } + file.createSync(); + + Sweepstore store = Sweepstore(filePath); + store.initialise( + concurrentWorkers: 20 + ); + initialiseMasterListener(file.openSync(mode: FileMode.append)); + + print(binaryDump(file.readAsBytesSync())); + + int iteration = 1; + + for (int j = 0; j < iteration; j++) { + int concurrencyTest = 256; + final receivePort = ReceivePort(); + int completedJobs = 0; + + final stopwatch = Stopwatch()..start(); + + for (int i = 0; i < concurrencyTest; i++) { + await Isolate.spawn((message) { + final index = message['index'] as int; + final sendPort = message['sendPort'] as SendPort; + + Sweepstore store = Sweepstore(filePath); + store['key_$index'] = 'value_$index'; + + sendPort.send('done'); + }, {'index': i, 'sendPort': receivePort.sendPort}); + } + + + // wait for all jobs to finish + await for (var msg in receivePort) { + completedJobs++; + if (completedJobs >= concurrencyTest) { + receivePort.close(); + break; + } + } + + stopwatch.stop(); + + print('\x1B[95mAll jobs completed!\x1B[0m'); + print('\x1B[95mTime taken: ${stopwatch.elapsedMilliseconds}ms (${stopwatch.elapsed.inSeconds}s)\x1B[0m'); + } +} \ No newline at end of file diff --git a/dart/pubspec.yaml b/dart/pubspec.yaml index 37478e6..d6de004 100644 --- a/dart/pubspec.yaml +++ b/dart/pubspec.yaml @@ -1,7 +1,7 @@ -name: file_formats -description: A sample command-line application with basic argument parsing. -version: 0.0.1 -# repository: https://github.com/my_org/my_repo +name: sweepstore +description: SweepStore (formerly BinaryTable) A high-performance binary storage format for Dart applications with efficient memory management and random access capabilities. +version: 1.0.0 +repository: https://github.com/ImBenji03/SweepStore environment: sdk: ^3.0.0 diff --git a/dart/lib/binary_table.dart b/dart/sweepstore_old.dart similarity index 65% rename from dart/lib/binary_table.dart rename to dart/sweepstore_old.dart index 9357e62..a932c08 100644 --- a/dart/lib/binary_table.dart +++ b/dart/sweepstore_old.dart @@ -11,7 +11,7 @@ ยฉ 2025-26 by Benjamin Watt of IMBENJI.NET LIMITED - All rights reserved. -Use of this source code is governed by a MIT license that can be found in the LICENSE file. +Use of this source code is governed by the Business Source License 1.1 that can be found in the LICENSE file. This file is part of the SweepStore (formerly Binary Table) package for Dart. @@ -19,6 +19,8 @@ This file is part of the SweepStore (formerly Binary Table) package for Dart. import 'dart:convert'; import 'dart:io'; +import 'dart:isolate'; +import 'dart:math'; import 'dart:typed_data'; enum BT_Type { @@ -313,7 +315,7 @@ class BT_UniformArray extends BT_Reference { void addAll(Iterable values) { - _table.antiFreeListScope(() { + _table._antiFreeListScope(() { // Determine the type of the array by reading the first items type BT_Type type = elementType ?? BT_Type.fromDynamic(values.first); @@ -344,16 +346,15 @@ class BT_UniformArray extends BT_Reference { fullBuffer.replaceRange(1, 5, lengthBytes); // Free the old array - _table.free(_pointer, size); + _table._free(_pointer, size); // Allocate new space for the updated array - BT_Pointer newPointer = _table.alloc(fullBuffer.length); + BT_Pointer newPointer = _table._alloc(fullBuffer.length); // Replace any references to the old pointer with the new one Map addressTable = _table._addressTable; addressTable.updateAll((key, value) { if (value == _pointer) { - print('Updating address table entry for key $key from $value to $newPointer'); return newPointer; } return value; @@ -364,8 +365,6 @@ class BT_UniformArray extends BT_Reference { // Write the updated buffer to the new location _table._file.setPositionSync(newPointer.address); _table._file.writeFromSync(fullBuffer); - - print('Array resized to new length $newLength at $newPointer'); }); } @@ -468,7 +467,6 @@ extension FreeList on List { New encoding should reflect a "read from the end" approach. So it should look like: - Entries (variable) - - Entry count (4 bytes) Entry: - Pointer (8 bytes) @@ -485,9 +483,6 @@ extension FreeList on List { buffer.addAll((ByteData(4)..setInt32(0, entry.size, Endian.little)).buffer.asUint8List()); } - // Entry count - buffer.addAll((ByteData(4)..setInt32(0, length, Endian.little)).buffer.asUint8List()); - return buffer; } @@ -509,21 +504,239 @@ extension fnv1a on String { } +// Convert double to float16 +extension on double { + + List _toFloat16Bytes() { + ByteData float32Data = ByteData(4); + float32Data.setFloat32(0, this, Endian.little); + int f = float32Data.getUint32(0); + + int sign = (f >> 31) & 0x1; + int exponent = (f >> 23) & 0xff; + int mantissa = f & 0x7fffff; + + if (exponent == 0xff) { // Inf or NaN + if (mantissa != 0) { + return [(sign << 7) | 0x7e, 0x01]; // NaN + } else { + return [(sign << 7) | 0x7c, 0x00]; // Inf + } + } + + if (exponent == 0) { // Zero or denormalized + return [sign << 7, 0x00]; // Zero + } + + // Adjust exponent for float16 (bias: 127 for float32, 15 for float16) + exponent = exponent - 127 + 15; + + if (exponent >= 31) { // Overflow to infinity + return [(sign << 7) | 0x7c, 0x00]; // Inf + } + + if (exponent <= 0) { // Underflow to zero + return [sign << 7, 0x00]; // Zero + } + + // Extract the top 10 bits of mantissa for float16 + mantissa >>= 13; + + int float16 = (sign << 15) | (exponent << 10) | mantissa; + return [float16 & 0xff, (float16 >> 8) & 0xff]; // Little endian + } + +} + +enum BT_TicketState { + IDLE, + WAITING, + APPROVED, + EXECUTING, + COMPLETED, +} + +enum BT_TicketOpcode { + NONE, + READ, + MODIFY, + WRITE, +} + +class BT_Ticket { + + final int heartbeat; + final BT_TicketState state; + final BT_TicketOpcode opcode; + final int keyHash; + + // Defined by the slave + final int writeSize; + + // Defined by the master + final BT_Pointer writePointer; + + BT_Ticket({ + required this.heartbeat, + required this.state, + required this.opcode, + required this.keyHash, + + this.writeSize = 0, + this.writePointer = BT_Null, + }); + + BT_Ticket.fromIntList(List data) + : heartbeat = ByteData.sublistView(Uint8List.fromList(data.sublist(0, 4))).getInt32(0, Endian.little), + state = BT_TicketState.values[data[4]], + opcode = BT_TicketOpcode.values[data[5]], + keyHash = ByteData.sublistView(Uint8List.fromList(data.sublist(6, 14))).getInt64(0, Endian.little), + + writePointer = BT_Pointer(ByteData.sublistView(Uint8List.fromList(data.sublist(14, 22))).getInt64(0, Endian.little)), + writeSize = ByteData.sublistView(Uint8List.fromList(data.sublist(22, 26))).getInt32(0, Endian.little); + + List toIntList() { + List data = []; + + // Heartbeat (4 bytes) + data.addAll((ByteData(4)..setInt32(0, heartbeat, Endian.little)).buffer.asUint8List()); + + // State (1 byte) + data.add(state.index); + + // Opcode (1 byte) + data.add(opcode.index); + + // Key Hash (8 bytes) + data.addAll((ByteData(8)..setInt64(0, keyHash, Endian.little)).buffer.asUint8List()); + + // Write Pointer (8 bytes) + data.addAll((ByteData(8)..setInt64(0, writePointer.address, Endian.little)).buffer.asUint8List()); + + // Write Size (4 bytes) + data.addAll((ByteData(4)..setInt32(0, writeSize, Endian.little)).buffer.asUint8List()); + + return data; + } + + String toString() { + return 'BT_Ticket(heartbeat: $heartbeat, state: $state, opcode: $opcode, keyHash: $keyHash, writePointer: $writePointer, writeSize: $writeSize)'; + } + +} + + + class BinaryTable { + late final int sessionId; + RandomAccessFile _file; - BinaryTable(String path) : _file = File(path).openSync(mode: FileMode.append); + BinaryTable(String path) : _file = File(path).openSync(mode: FileMode.append) { - void initialise() { + var nextSessionId = Random.secure(); + int high = nextSessionId.nextInt(1 << 32); + int low = nextSessionId.nextInt(1 << 32); + sessionId = (high << 32) | low; + + // Check if the file is initialised _file.setPositionSync(0); - _file.writePointerSync(BT_Null); // Address table pointer - _file.writeIntSync(0, 4); // Free list entry count + List magicNumber = _file.readSync(4); + bool isInitialised = magicNumber.length == 4 && + magicNumber[0] == 'S'.codeUnitAt(0) && + magicNumber[1] == 'W'.codeUnitAt(0) && + magicNumber[2] == 'P'.codeUnitAt(0) && + magicNumber[3] == 'S'.codeUnitAt(0); + + if (isInitialised && !isMasterAlive) { + + _initialiseMaster(); + + } + } + void initialise({ + int concurrentReaders = 4, + }) + { + _file.setPositionSync(0); + + // Ensure the file hasnt already been initialised + List magicNumber = _file.readSync(4); + bool isInitialised = magicNumber.length == 4 && + magicNumber[0] == 'S'.codeUnitAt(0) && + magicNumber[1] == 'W'.codeUnitAt(0) && + magicNumber[2] == 'P'.codeUnitAt(0) && + magicNumber[3] == 'S'.codeUnitAt(0); + if (isInitialised) { + throw Exception('Binary Table file is already initialised.'); + } + + // Magic number "SWPS" (0/4 bytes) + _file.writeFromSync("SWPS".codeUnits); + + // Version (1.0 float16) (4/2 bytes) + _file.writeFromSync(1.0._toFloat16Bytes()); + + // Address table pointer (null) (6/8 bytes) + _file.writePointerSync(BT_Null); + + // Free list count (0) (14/4 bytes) + _file.writeIntSync(0, 4); // Free list entry count + + /* + The values below are for concurrency. + */ + + // Master Identifier (18/8 bytes) + _file.writeIntSync(sessionId & 0xFFFFFFFF, 8); + + // Master Heartbeat (26/4 bytes) + int now = DateTime.now().millisecondsSinceEpoch; + _file.writeIntSync(now, 4); + + // Number of concurrent readers (30/4 bytes) // Cannot be changed after initialisation + _file.writeIntSync(concurrentReaders, 4); + + // Allow reads (34/1 bytes) + _file.writeByteSync(1); + + // Everything else is operator slots starting at 35 bytes. + for (int i = 0; i < concurrentReaders; i++) { + + // Slave Heartbeat (4 bytes) + _file.writeIntSync(0, 4); + + // Ticket state (1 byte) + _file.writeByteSync(0); + + // Ticket Opcode (1 byte) + _file.writeByteSync(0); + + // Key Hash (8 bytes) + _file.writeIntSync(0, 8); + + // Write Pointer (8 bytes) + _file.writeIntSync(-1, 8); + + // Write Size (4 bytes) + _file.writeIntSync(0, 4); + + } + + // Run the master initialisation + _initialiseMaster(); + } + + /* + Address Table + */ + Map get _addressTable { - _file.setPositionSync(0); + _file.setPositionSync(6); BT_Reference tableRef = BT_Reference(this, _file.readPointerSync()); if (tableRef._pointer.isNull) { @@ -570,34 +783,37 @@ class BinaryTable { }); // Write new address table at end of file - BT_Pointer tableAddress = alloc(buffer.length); + BT_Pointer tableAddress = _alloc(buffer.length); _file.setPositionSync(tableAddress.address); _file.writeFromSync(buffer); // Read old table pointer before updating - _file.setPositionSync(0); + _file.setPositionSync(6); BT_Reference oldTableRef = BT_Reference(this, _file.readPointerSync()); // Update header to point to new table - _file.setPositionSync(0); + _file.setPositionSync(6); _file.writePointerSync(tableAddress); // Now free the old table if it exists and is not the same as the new one if (!oldTableRef._pointer.isNull && oldTableRef._pointer != tableAddress) { - free(oldTableRef._pointer, oldTableRef.size); + _free(oldTableRef._pointer, oldTableRef.size); } } + /* + Free List + */ + bool freeListLifted = false; List? _freeListCache; - List get _freeList { if (freeListLifted) { return _freeListCache ?? []; } - _file.setPositionSync(_file.lengthSync() - 4); + _file.setPositionSync(14); int entryCount = _file.readIntSync(4); if (entryCount == 0) { return []; @@ -605,7 +821,7 @@ class BinaryTable { int entrySize = BT_Type.POINTER.size + 4; // Pointer + Size int freeListSize = entryCount * entrySize; - _file.setPositionSync(_file.lengthSync() - 4 - freeListSize); + _file.setPositionSync(_file.lengthSync() - freeListSize); List buffer = _file.readSync(freeListSize); List freeList = []; @@ -627,54 +843,77 @@ class BinaryTable { return freeList; } set _freeList(List list) { - if (freeListLifted) { _freeListCache = list; return; } - _file.setPositionSync(_file.lengthSync() - 4); + // Read OLD count from header + _file.setPositionSync(14); int oldEntryCount = _file.readIntSync(4); - int oldListSize = (oldEntryCount * (BT_Type.POINTER.size + 4)) + 4; // Entries + Count - _file.truncateSync(_file.lengthSync() - oldListSize); - List buffer = list.bt_encode(); - _file.setPositionSync(_file.lengthSync()); - _file.writeFromSync(buffer); + // Calculate old free list size (entries only, not count) + int oldListSize = oldEntryCount * (BT_Type.POINTER.size + 4); + + // Remove old free list entries from EOF + if (oldEntryCount > 0) { + int currentLength = _file.lengthSync(); + _file.truncateSync(currentLength - oldListSize); + } + + // Write NEW count to header + _file.setPositionSync(14); + _file.writeIntSync(list.length, 4); + + // Write NEW entries to EOF (if any) + if (list.isNotEmpty) { + List buffer = list.bt_encode(); + _file.setPositionSync(_file.lengthSync()); + _file.writeFromSync(buffer); + } } /// Caches the free list in memory, and removed it from the file. - void liftFreeList() { + void _liftFreeList() { if (freeListLifted) { throw StateError('Free list is already lifted'); } + // Cache the free list _freeListCache = _freeList; - _file.setPositionSync(_file.lengthSync() - 4); + // Read count from header + _file.setPositionSync(14); int oldEntryCount = _file.readIntSync(4); - int oldEntrySize = BT_Type.POINTER.size + 4; // Pointer + Size - int oldFreeListSize = oldEntryCount * oldEntrySize + 4; // +4 for entry count - _file.truncateSync(_file.lengthSync() - oldFreeListSize); + + if (oldEntryCount > 0) { + int oldEntrySize = BT_Type.POINTER.size + 4; + int oldFreeListSize = oldEntryCount * oldEntrySize; // Just entries, no count + + // Remove free list entries from EOF + _file.truncateSync(_file.lengthSync() - oldFreeListSize); + } + + // Clear count in header + _file.setPositionSync(14); + _file.writeIntSync(0, 4); freeListLifted = true; } + /// Appends the cached free list back to the file, and clears the cache. void dropFreeList() { if (!freeListLifted) { throw StateError('Free list is not lifted'); } - _file.setPositionSync(_file.lengthSync()); - _file.writeIntSync(0, 4); // Placeholder for entry count - freeListLifted = false; - _freeList = _freeListCache!; + _freeList = _freeListCache!; // This now writes count to header and entries to EOF _freeListCache = null; } - void antiFreeListScope(void Function() fn) { - liftFreeList(); + void _antiFreeListScope(void Function() fn) { + _liftFreeList(); try { fn(); } finally { @@ -682,7 +921,7 @@ class BinaryTable { } } - void free(BT_Pointer pointer, int size) { + void _free(BT_Pointer pointer, int size) { if (!freeListLifted) { throw StateError('Free list must be lifted before freeing memory'); @@ -734,7 +973,7 @@ class BinaryTable { // Update free list _freeList = freeList; } - BT_Pointer alloc(int size) { + BT_Pointer _alloc(int size) { if (!freeListLifted) { throw StateError('Free list must be lifted before allocation'); @@ -783,34 +1022,259 @@ class BinaryTable { } } - operator []=(String key, dynamic value) { + /* + Concurrency + */ - antiFreeListScope(() { - Map addressTable = _addressTable; + void _initialiseMaster() { + Isolate.spawn((String filePath) { + RandomAccessFile file = File(filePath).openSync(mode: FileMode.append); - int keyHash = key.bt_hash; - - if (addressTable.containsKey(keyHash)) { - throw Exception('Key already exists'); + // Print with yellow [MASTER] prefix - use ansi + void _mstPrint(String message) { + print('\x1B[33m[MASTER]\x1B[0m $message'); } + late final int concurrentReaders; + file.setPositionSync(30); + concurrentReaders = file.readIntSync(4); + + while (true) { + + int now = DateTime.now().millisecondsSinceEpoch32(); + file.setPositionSync(26); + file.writeIntSync(now, 4); + + _mstPrint('Master heartbeat updated at $now'); + + file.setPositionSync(35); + for (int i = 0; i < concurrentReaders; i++) { + + // Read ticket + List ticketBuffer = file.readSync(26); + BT_Ticket ticket = BT_Ticket.fromIntList(ticketBuffer); + + // Check if ticket is alive and waiting + int ticketHeartbeat = ticket.heartbeat; + bool isTicketAlive = (now - ticket.heartbeat) <= 5000; + BT_TicketState state = ticket.state; + if (isTicketAlive && state == BT_TicketState.WAITING) { + _mstPrint("Ticket ${i} is alive and waiting..."); + + // If reading is still allowed, we need to disallow it + file.setPositionSync(34); + if (file.readByteSync() == 1) { + _mstPrint("Disallowing reads for ticket processing..."); + file.setPositionSync(34); + file.writeByteSync(0); + // _liftFreeList(); // Broken/Incorrect Logic. the isolate cant access the main instance's free list + } + + // We need to give it a write pointer and approve it + // BT_Pointer allocation = _alloc(ticket.writeSize); // Broken/Incorrect Logic. the isolate cant access the main instance's free list + + BT_Ticket approvedTicket = BT_Ticket( + heartbeat: ticket.heartbeat, + state: BT_TicketState.APPROVED, + opcode: ticket.opcode, + keyHash: ticket.keyHash, + writeSize: ticket.writeSize, + // writePointer: allocation, // Broken/Incorrect Logic. the isolate cant access the main instance's free list + ); + + // Write approved ticket back to file + file.setPositionSync(35 + i * 26); + file.writeFromSync(approvedTicket.toIntList()); + + } + + } + + // If reads are disallowed, we need to allow them again, and drop the free list + file.setPositionSync(34); + if (file.readByteSync() == 0) { + _mstPrint("Allowing reads after ticket processing..."); + file.setPositionSync(34); + file.writeByteSync(1); + dropFreeList(); + } + + sleep(Duration(seconds: 1)); + } + }, _file.path); + } + + bool get isMasterAlive { + _file.setPositionSync(18); + int masterId = _file.readIntSync(8); + + _file.setPositionSync(26); + int masterHeartbeat = _file.readIntSync(4); + + int now = DateTime.now().millisecondsSinceEpoch32(); + return (now - masterHeartbeat) <= 5000; + } + + int get concurrentReaders { + _file.setPositionSync(30); + return _file.readIntSync(4); + } + + bool get allowReads { + _file.setPositionSync(34); + int flag = _file.readByteSync(); + return flag == 1; + } + + // Add ticket + void _ticket({ + required BT_TicketOpcode operation, + required int keyHash, + required int writeSize, + required void Function(BT_Ticket ticket)? onApproved, + }) { + + // Reduce the chance of a race condition/deadlock by adding a small random delay + sleep(Duration(milliseconds: Random().nextInt(10))); + + int? ticketIndex; + + // Were gonna iterate through all the tickets and find an empty slot + while (ticketIndex == null) { + + _file.setPositionSync(35); + for (int i = 0; i < concurrentReaders; i++) { + + List ticketBuffer = _file.readSync(26); // Size of a ticket entry + + BT_Ticket ticket = BT_Ticket.fromIntList(ticketBuffer); + + // Check if the heartbeat is stale (older than 5 seconds) + int now = DateTime.now().millisecondsSinceEpoch64(); + if (now - ticket.heartbeat > 5000) { + + // We found a stale ticket, we can take this slot + ticketIndex = i; + break; + + } + } + + sleep(Duration(milliseconds: 500)); + } + + if (ticketIndex == null) { + throw Exception('Failed to acquire ticket'); + } + + BT_Ticket newTicket = BT_Ticket( + heartbeat: DateTime.now().millisecondsSinceEpoch32(), + state: BT_TicketState.WAITING, + opcode: operation, + keyHash: keyHash, + writeSize: writeSize, + ); + + // Write the new ticket to the file + _file.setPositionSync(35 + ticketIndex * 38); + _file.writeFromSync(newTicket.toIntList()); + + // Wait for approval + while (true) { + + print('Waiting for ticket approval...'); + + _file.setPositionSync(35 + ticketIndex * 38); + List ticketBuffer = _file.readSync(38); + BT_Ticket ticket = BT_Ticket.fromIntList(ticketBuffer); + + if (ticket.state == BT_TicketState.APPROVED) { + // Ticket approved + onApproved?.call(ticket); + break; + } + + if (!isMasterAlive) { + print('Master is not alive, cannot proceed with ticket'); + } + + sleep(Duration(milliseconds: 1000)); + + // Update heartbeat + _file.setPositionSync(35 + ticketIndex * 38); + int now = DateTime.now().millisecondsSinceEpoch32(); + _file.writeIntSync(now, 4); + } - List valueBuffer = encodeValue(value); - // Write value to file - BT_Pointer valueAddress = alloc(valueBuffer.length); - _file.setPositionSync(valueAddress.address); - _file.writeFromSync(valueBuffer); - // Update address table - addressTable[keyHash] = valueAddress; - _addressTable = addressTable; - }); } + + /* + Key-Value Operations + */ + + operator []=(String key, dynamic value) { + + Map addressTable = _addressTable; + + // Check if key already exists + int keyHash = key.bt_hash; + if (addressTable.containsKey(keyHash)) { + throw Exception('Key already exists'); // The pair should be deleted first + } + + List valueBuffer = encodeValue(value); + + _ticket( + operation: BT_TicketOpcode.WRITE, // Modification operations will come later, + keyHash: keyHash, + writeSize: valueBuffer.length, + onApproved: (BT_Ticket ticket) { + // Write value to file + _file.setPositionSync(ticket.writePointer.address); + _file.writeFromSync(valueBuffer); + } + ); + + // v1.0.0 implementation + // _antiFreeListScope(() { + // Map addressTable = _addressTable; + // + // int keyHash = key.bt_hash; + // + // // 1.1 Note: I cant remember why i did this check here. + // if (addressTable.containsKey(keyHash)) { + // throw Exception('Key already exists'); + // } + // + // + // + // List valueBuffer = encodeValue(value); + // + // // Write value to file + // BT_Pointer valueAddress = _alloc(valueBuffer.length); + // + // _file.setPositionSync(valueAddress.address); + // _file.writeFromSync(valueBuffer); + // + // // Update address table + // addressTable[keyHash] = valueAddress; + // _addressTable = addressTable; + // }); + } + + operator [](String key) { + + while (!allowReads) { + // Wait until reads are allowed + sleep(Duration(milliseconds: 1)); + } + Map addressTable = _addressTable; int keyHash = key.bt_hash; @@ -827,7 +1291,7 @@ class BinaryTable { void delete(String key) { - antiFreeListScope(() { + _antiFreeListScope(() { Map addressTable = _addressTable; int keyHash = key.bt_hash; @@ -840,7 +1304,7 @@ class BinaryTable { BT_Reference valueRef = BT_Reference(this, valuePointer); // Free the value - free(valuePointer, valueRef.size); + _free(valuePointer, valueRef.size); // Remove from address table addressTable.remove(keyHash); @@ -851,7 +1315,7 @@ class BinaryTable { void truncate() { - antiFreeListScope(() { + _antiFreeListScope(() { // Relocate the address table if possible _addressTable = _addressTable; @@ -1065,3 +1529,13 @@ String binaryDump(Uint8List data) { return buffer.toString(); } + +// DateTime to 32-bit Unix timestamp (seconds since epoch) +extension on DateTime { + int millisecondsSinceEpoch32() { + return (millisecondsSinceEpoch ~/ 1000) & 0xFFFFFFFF; + } + int millisecondsSinceEpoch64() { + return millisecondsSinceEpoch; + } +} \ No newline at end of file diff --git a/documentation/header.md b/documentation/header.md new file mode 100644 index 0000000..4bda9f2 --- /dev/null +++ b/documentation/header.md @@ -0,0 +1,107 @@ +# Sweepstore Header Structure + +The Sweepstore file format uses a structured header to manage file metadata and concurrency control. The header consists of three main parts: the static header, the concurrency header, and dynamic worker tickets. + +## Static Header (Bytes 0-28) + +The static header contains basic file information and pointers. + +| Offset | Size | Field | Type | Description | +|--------|------|-------|------|-------------| +| 0 | 4 bytes | Magic Number | String | File identifier, must be "SWPT" | +| 4 | 12 bytes | Version | String | Version string (UTF-8), max 11 chars (padded with spaces) | +| 16 | 8 bytes | Address Table Pointer | int64 | Pointer to the address table location | +| 24 | 4 bytes | Free List Count | int32 | Number of entries in the free list | +| 28 | 1 byte | Is Free List Lifted | bool | Flag indicating if free list is lifted (0=false, 1=true) | + +**Total Size:** 29 bytes + +## Concurrency Header (Bytes 29-45) + +The concurrency header manages multi-threaded access and coordination. + +| Offset | Size | Field | Type | Description | +|--------|------|-------|------|-------------| +| 29 | 8 bytes | Master Identifier | int64 | Unique identifier for the master process | +| 37 | 4 bytes | Master Heartbeat | int32 | Heartbeat counter for the master process | +| 41 | 4 bytes | Number of Workers | int32 | Total number of concurrent worker tickets | +| 45 | 1 byte | Is Read Allowed | bool | Flag indicating if read operations are allowed (0=false, 1=true) | + +**Total Size:** 17 bytes + +## Worker Tickets (Starting at Byte 46) + +Worker tickets are dynamically sized based on the number of workers specified in the concurrency header. Each ticket is 30 bytes. + +**Base Offset Calculation:** `46 + (ticketIndex * 30)` + +### Single Ticket Structure + +| Relative Offset | Size | Field | Type | Description | +|-----------------|------|-------|------|-------------| +| 0 | 4 bytes | Identifier | int32 | Unique identifier for this worker | +| 4 | 4 bytes | Worker Heartbeat | int32 | Heartbeat counter for this worker | +| 8 | 1 byte | Ticket State | byte (enum) | Current state of the ticket (see SweepstoreTicketState) | +| 9 | 1 byte | Ticket Operation | byte (enum) | Current operation being performed (see SweepstoreTicketOperation) | +| 10 | 8 bytes | Key Hash | int64 | Hash of the key being operated on | +| 18 | 8 bytes | Write Pointer | int64 | Pointer to the write location | +| 26 | 4 bytes | Write Size | int32 | Size of the write operation | + +**Ticket Size:** 30 bytes + +## Enumerations + +Enum fields are stored as single-byte integers. The following tables show the integer values for each enum state: + +### SweepstoreTicketState (1 byte) + +| Value | Name | Description | +|-------|------|-------------| +| 0 | IDLE | Ticket is idle and not performing any work | +| 1 | WAITING | Ticket is waiting for approval | +| 2 | APPROVED | Ticket has been approved to proceed | +| 3 | EXECUTING | Ticket is actively executing an operation | +| 4 | COMPLETED | Ticket has completed its operation | + +### SweepstoreTicketOperation (1 byte) + +| Value | Name | Description | +|-------|------|-------------| +| 0 | NONE | No operation assigned | +| 1 | READ | Read operation | +| 2 | MODIFY | Modify operation | +| 3 | WRITE | Write operation | + +## Total Header Size Calculation + +The total header size depends on the number of workers: + +``` +Total Header Size = 46 + (numberOfWorkers * 30) bytes +``` + +For example: +- 4 workers: 46 + (4 ๏ฟฝ 30) = 166 bytes +- 8 workers: 46 + (8 ๏ฟฝ 30) = 286 bytes + +## Initialization + +When initializing a new Sweepstore file using `initialiseSweepstoreHeader()`: +- Magic number is set to "SWPT" +- Version is set to "undefined" +- Address table pointer is set to null pointer +- Free list count is set to 0 +- Is free list lifted flag is set to false +- Master identifier and heartbeat are set to 0 +- Number of workers is set according to the parameter (default: 4) +- Read allowed flag is set to false +- All worker tickets are initialized with identifier set to 0, heartbeat set to 0, IDLE state (0), and NONE operation (0) + +## Implementation Notes + +- All multi-byte integers are stored in little-endian byte order +- The version string is padded with spaces and prefixed with a space character +- Boolean values are stored as single bytes (0 or 1) +- Enum values are stored as single-byte integers using their index values (0, 1, 2, etc.) +- Pointers use int64 for addressing, with -1 representing a null pointer +- The header is designed for concurrent access with heartbeat-based liveness detection \ No newline at end of file diff --git a/example.bin b/example.bin index 3aca16c4dbbc64415a100349b1921e018aab37f5..5f89823a174e95967925dc5aea22d04ea24517c4 100644 GIT binary patch literal 646 tcmWFz4+v2()HBpG&@)s3g8xv!fDS~^1t5H!>cP^ZvdEz}8XCAl0|1I#s;B?} literal 303 zcmcb|00C?)K$@9>;eb7mnBV{;5_M5as_cMfr3n@*ccet*nwQ21`LydVi!Pmd;k#~j@)98xQ#e~YC&M#CYcMH-}_Hc ygK!xbfl3%wlvaEd>1JWph6;v1yoXm41Ac8uP-D>Tg1JEsDBlHD69S|`UIhSU7%QFt