Compare commits
5 Commits
dev
...
feature/fu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
db17d9a524 | ||
|
|
bdd1fab997 | ||
|
|
c97f36cfb6 | ||
|
|
0836935888 | ||
|
|
fc7297e0f8 |
@@ -5,6 +5,13 @@ set(CMAKE_CXX_STANDARD 17)
|
||||
set(CMAKE_CXX_STANDARD_REQUIRED ON)
|
||||
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
|
||||
|
||||
# Optional: Enable function timing instrumentation via #define
|
||||
# To enable timing, either:
|
||||
# 1. Define SWEEPSTORE_ENABLE_TIMING=1 in your code before including timing.h
|
||||
# 2. Pass -DSWEEPSTORE_ENABLE_TIMING=1 to compiler
|
||||
# 3. Uncomment the line below:
|
||||
add_compile_definitions(SWEEPSTORE_ENABLE_TIMING=1)
|
||||
|
||||
# Add include directories globally
|
||||
include_directories(${CMAKE_SOURCE_DIR}/src/Public)
|
||||
include_directories(${CMAKE_SOURCE_DIR}/src/Private)
|
||||
@@ -21,10 +28,11 @@ add_executable(main
|
||||
src/Private/sweepstore/concurrency.cpp
|
||||
src/Public/sweepstore/utils/file_lock.h
|
||||
src/Private/sweepstore/utils/file_lock.cpp
|
||||
src/Private/sweepstore/utils/fd_pool.cpp
|
||||
src/Public/sweepstore/utils/file_handle.h
|
||||
src/Private/sweepstore/utils/file_handle.cpp
|
||||
src/Public/sweepstore/header.h
|
||||
src/Public/sweepstore/utils/timing.h
|
||||
src/Private/sweepstore/utils/timing.cpp
|
||||
src/Private/sweepstore/benchmark.cpp
|
||||
)
|
||||
|
||||
@@ -43,3 +51,7 @@ if(UNIX AND NOT APPLE)
|
||||
# Only link stdc++fs on Linux, not macOS
|
||||
target_link_libraries(main PRIVATE stdc++fs)
|
||||
endif()
|
||||
|
||||
if(WIN32)
|
||||
target_link_libraries(main PRIVATE winmm)
|
||||
endif()
|
||||
@@ -2,6 +2,8 @@
|
||||
// Created by Benjamin Watt on 02/12/2025.
|
||||
//
|
||||
|
||||
#define SWEEPSTORE_ENABLE_TIMING 1
|
||||
|
||||
#include "sweepstore/sweepstore.h"
|
||||
|
||||
#include <string>
|
||||
@@ -13,34 +15,49 @@
|
||||
#include <condition_variable>
|
||||
#include <atomic>
|
||||
#include <vector>
|
||||
#include <memory>
|
||||
#include <cstdlib>
|
||||
|
||||
#include "sweepstore/utils/helpers.h"
|
||||
#include "sweepstore/utils/file_handle.h"
|
||||
#include "sweepstore/structures.h"
|
||||
#include "sweepstore/concurrency.h"
|
||||
|
||||
// ============================================================================
|
||||
// BENCHMARK CONFIGURATION
|
||||
// ============================================================================
|
||||
const int SWEEPSTORE_CONCURRENT_WORKERS = 32; // Number of concurrent workers for Sweepstore
|
||||
const int WORKER_THREAD_COUNT = 8; // Number of worker threads in the benchmark pool
|
||||
const int BENCHMARK_ITERATIONS = 10; // Number of benchmark iterations
|
||||
const int INITIAL_CONCURRENT_WORKERS = 1; // Starting number of concurrent operations (doubles each iteration)
|
||||
const int ITERATION_DELAY_MS = 200; // Delay between iterations in milliseconds
|
||||
const int INITIAL_SLEEP_MS = 1000; // Initial sleep before benchmark starts
|
||||
// ============================================================================
|
||||
|
||||
int main() {
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
// Initialize timing output file
|
||||
SweepstoreTiming::initOutputFile();
|
||||
|
||||
std::string filePath = "./example.bin";
|
||||
|
||||
Sweepstore sweepstore(filePath);
|
||||
sweepstore.initialise(32);
|
||||
// Use unique_ptr to control destruction timing
|
||||
auto sweepstore = std::make_unique<Sweepstore>(filePath);
|
||||
sweepstore->initialise(SWEEPSTORE_CONCURRENT_WORKERS);
|
||||
|
||||
preciseSleep(std::chrono::milliseconds(1000));
|
||||
preciseSleep(std::chrono::milliseconds(INITIAL_SLEEP_MS));
|
||||
|
||||
std::vector<uint8_t> fileData = loadFile(filePath);
|
||||
std::cout << binaryDump(fileData) << std::endl;
|
||||
|
||||
std::cout << "Concurrent Workers: " << sweepstore.getConcurrencyHeader()->readNumberOfWorkers() << std::endl;
|
||||
std::cout << "Concurrent Workers: " << sweepstore->getConcurrencyHeader()->readNumberOfWorkers() << std::endl;
|
||||
std::cout << "Stale Ticket Threshold: " << STALE_HEARTBEAT_THRESHOLD_MS << std::endl;
|
||||
|
||||
SweepstoreConcurrency::initialiseMasterAsync(filePath);
|
||||
|
||||
int iterations = 16;
|
||||
int currentIteration = 0;
|
||||
|
||||
int concurrencyTest = 1;
|
||||
int concurrencyTest = INITIAL_CONCURRENT_WORKERS;
|
||||
|
||||
// Worker pool infrastructure - created once and reused
|
||||
std::queue<std::function<void()>> taskQueue;
|
||||
@@ -50,16 +67,20 @@ int main() {
|
||||
std::atomic<bool> shutdown{false};
|
||||
std::atomic<int> completedJobs{0};
|
||||
|
||||
// Create 32 persistent worker threads BEFORE timing
|
||||
// Create persistent worker threads BEFORE timing
|
||||
std::vector<std::thread> workers;
|
||||
for (int i = 0; i < 32; i++) {
|
||||
for (int i = 0; i < WORKER_THREAD_COUNT; i++) {
|
||||
workers.emplace_back([&]() {
|
||||
while (!shutdown) {
|
||||
std::function<void()> task;
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(queueMutex);
|
||||
queueCV.wait(lock, [&]{ return !taskQueue.empty() || shutdown; });
|
||||
if (shutdown && taskQueue.empty()) return;
|
||||
if (shutdown && taskQueue.empty()) {
|
||||
// Flush timing data before thread exits
|
||||
SweepstoreTiming::flushThreadData();
|
||||
return;
|
||||
}
|
||||
if (!taskQueue.empty()) {
|
||||
task = std::move(taskQueue.front());
|
||||
taskQueue.pop();
|
||||
@@ -70,12 +91,14 @@ int main() {
|
||||
completionCV.notify_one();
|
||||
}
|
||||
}
|
||||
// Flush timing data before thread exits
|
||||
SweepstoreTiming::flushThreadData();
|
||||
});
|
||||
}
|
||||
|
||||
while (true) {
|
||||
|
||||
if (++currentIteration > iterations) {
|
||||
if (++currentIteration > BENCHMARK_ITERATIONS) {
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -86,7 +109,7 @@ int main() {
|
||||
std::unique_lock<std::mutex> lock(queueMutex);
|
||||
for (int i = 0; i < concurrencyTest; i++) {
|
||||
taskQueue.push([i, &sweepstore, &completedJobs]() {
|
||||
sweepstore["key_" + std::to_string(i)] = "value_" + std::to_string(i);
|
||||
(*sweepstore)["key_" + std::to_string(i)] = "value_" + std::to_string(i);
|
||||
++completedJobs;
|
||||
});
|
||||
}
|
||||
@@ -105,9 +128,12 @@ int main() {
|
||||
auto end = std::chrono::high_resolution_clock::now();
|
||||
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
|
||||
|
||||
std::cout << "[" << currentIteration << "/" << iterations << "] Completed " << concurrencyTest << " operations in " << duration << " ms." << std::endl;
|
||||
std::cout << "[" << currentIteration << "/" << BENCHMARK_ITERATIONS << "] Completed " << concurrencyTest << " operations in " << duration << " ms." << std::endl;
|
||||
|
||||
concurrencyTest *= 2;
|
||||
|
||||
// Wait between iterations
|
||||
preciseSleep(std::chrono::milliseconds(ITERATION_DELAY_MS));
|
||||
}
|
||||
|
||||
// Shutdown workers after all iterations
|
||||
@@ -117,5 +143,18 @@ int main() {
|
||||
worker.join();
|
||||
}
|
||||
|
||||
return 0;
|
||||
// Write timing data NOW while everything is still valid
|
||||
std::cout << "Flushing timing data..." << std::endl;
|
||||
std::cout.flush();
|
||||
SweepstoreTiming::flushThreadData();
|
||||
|
||||
std::cout << "Finalizing trace file..." << std::endl;
|
||||
std::cout.flush();
|
||||
SweepstoreTiming::finalizeOutputFile();
|
||||
|
||||
std::cout << "Benchmark complete." << std::endl;
|
||||
std::cout.flush();
|
||||
|
||||
// Exit immediately to avoid thread-local destructors conflicting with detached master thread
|
||||
_Exit(0);
|
||||
}
|
||||
@@ -11,6 +11,7 @@
|
||||
#include "sweepstore/header.h"
|
||||
#include "sweepstore/utils/helpers.h"
|
||||
#include "sweepstore/utils/file_handle.h"
|
||||
#include "sweepstore/utils/timing.h"
|
||||
|
||||
|
||||
uint64_t getRandomOffset(uint64_t maxValue) {
|
||||
@@ -30,17 +31,18 @@ int randomId() {
|
||||
return (time ^ random) & 0x7FFFFFFF;
|
||||
}
|
||||
|
||||
void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file,
|
||||
void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* file,
|
||||
const SweepstoreTicketOperation& operation,
|
||||
const uint32_t keyHash,
|
||||
const uint32_t targetSize,
|
||||
const std::function<void()> onApproved,
|
||||
std::string debugLabel
|
||||
) {
|
||||
SWEEPSTORE_TIME_FUNCTION();
|
||||
|
||||
// FileHandle now uses thread-local streams internally - no need to create new handle!
|
||||
// Each thread automatically gets its own fstream from the shared file handle
|
||||
SweepstoreFileHandle* file = new SweepstoreFileHandle(_file->getPath(), std::ios::in | std::ios::out | std::ios::binary);
|
||||
// SweepstoreFileHandle* file = new SweepstoreFileHandle(_file->getPath(), std::ios::in | std::ios::out | std::ios::binary);
|
||||
|
||||
/*
|
||||
Useful Functions
|
||||
@@ -50,10 +52,13 @@ void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file,
|
||||
auto log = [&](const std::string &message) {
|
||||
std::string prefix = !debugLabel.empty() ? "\033[38;5;208m[Ticket Spawner - " + debugLabel + "]:\033[0m " : "\033[38;5;208m[Ticket Spawner]:\033[0m ";
|
||||
// debugPrint(prefix + message);
|
||||
// std::cout << prefix << message << std::endl;
|
||||
};
|
||||
|
||||
|
||||
// Sleep with variance (additive only)
|
||||
auto varySleep = [&](std::chrono::nanoseconds minSleepDuration, std::chrono::nanoseconds variance) {
|
||||
// SWEEPSTORE_TIME_SCOPE("Varying Sleep");
|
||||
if (variance.count() <= 0) {
|
||||
preciseSleep(minSleepDuration);
|
||||
} else {
|
||||
@@ -66,10 +71,12 @@ void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file,
|
||||
// Exponential sleep
|
||||
std::unordered_map<std::string, int> expSleepTracker = {};
|
||||
auto expSleep = [&expSleepTracker](const std::string& label) {
|
||||
SWEEPSTORE_TIME_SCOPE("Exponential Sleep");
|
||||
int count = expSleepTracker[label]; // defaults to 0 if not found
|
||||
int sleepTime = (1 << count); // Exponential backoff
|
||||
sleepTime = std::max(1, std::min(sleepTime, 1000)); // Clamp between 1ms and 1000ms
|
||||
preciseSleep(std::chrono::milliseconds(sleepTime));
|
||||
// sleepTime = 1000;
|
||||
preciseSleep(std::chrono::microseconds(sleepTime * 500));
|
||||
expSleepTracker[label] = count + 1;
|
||||
};
|
||||
|
||||
@@ -81,16 +88,17 @@ void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file,
|
||||
Ticket Acquisition
|
||||
*/
|
||||
auto acquireTicket = [&](uint32_t newIdentifier) -> SweepstoreWorkerTicket {
|
||||
SWEEPSTORE_TIME_SCOPE("acquireTicket");
|
||||
|
||||
// Reduce the chance of race condition
|
||||
varySleep(std::chrono::microseconds(500), std::chrono::microseconds(200));
|
||||
|
||||
uint32_t ticketIndex = -1u;
|
||||
|
||||
while (true) {
|
||||
|
||||
uint32_t concurrentWorkers = concurrencyHeader.readNumberOfWorkers();
|
||||
|
||||
while (true) {
|
||||
|
||||
for (uint32_t i = 0; i < concurrentWorkers; i++) {
|
||||
|
||||
SweepstoreWorkerTicket ticket = SweepstoreWorkerTicket(i, *file);
|
||||
@@ -101,13 +109,19 @@ void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file,
|
||||
|
||||
SweepstoreWorkerTicketSnapshot snapshot = ticket.snapshot();
|
||||
|
||||
int identifier = snapshot.identifier;
|
||||
uint32_t identifier = snapshot.identifier;
|
||||
|
||||
bool identifier_unassigned = identifier == 0;
|
||||
bool stale_heartbeat = millisecondsSinceEpoch32() - snapshot.workerHeartbeat > STALE_HEARTBEAT_THRESHOLD_MS;
|
||||
bool is_free = snapshot.state == SweepstoreTicketState::FREE;
|
||||
|
||||
if (identifier_unassigned && stale_heartbeat && is_free) {
|
||||
if ((identifier_unassigned && is_free) || stale_heartbeat) {
|
||||
|
||||
if (i >= 32) {
|
||||
std::cout << "What the actual fuck" << std::endl;
|
||||
}
|
||||
|
||||
SWEEPSTORE_TIME_SCOPE("Claim Ticket");
|
||||
snapshot.identifier = newIdentifier;
|
||||
snapshot.workerHeartbeat = millisecondsSinceEpoch32();
|
||||
snapshot.state = SweepstoreTicketState::WAITING;
|
||||
@@ -149,20 +163,14 @@ void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file,
|
||||
|
||||
// Wait for approval
|
||||
while (true) {
|
||||
auto start = std::chrono::high_resolution_clock::now();
|
||||
|
||||
SweepstoreWorkerTicketSnapshot snapshot = myTicket.snapshot();
|
||||
|
||||
// Update heartbeat
|
||||
uint32_t currentTime = millisecondsSinceEpoch32();
|
||||
if (currentTime - snapshot.workerHeartbeat > 700) {
|
||||
snapshot.workerHeartbeat = currentTime;
|
||||
myTicket.write(snapshot);
|
||||
}
|
||||
|
||||
// Check if we still own the ticket
|
||||
if (snapshot.identifier != myIdentifier) {
|
||||
|
||||
preciseSleep(std::chrono::milliseconds(10));
|
||||
preciseSleep(std::chrono::milliseconds(2));
|
||||
|
||||
// Re-verify we lost the ticket
|
||||
SweepstoreWorkerTicketSnapshot recheckSnapshot = myTicket.snapshot();
|
||||
@@ -172,7 +180,7 @@ void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file,
|
||||
|
||||
// ReSharper disable once CppDFAInfiniteRecursion
|
||||
spawnTicket(
|
||||
_file,
|
||||
file,
|
||||
operation,
|
||||
keyHash,
|
||||
targetSize,
|
||||
@@ -188,6 +196,16 @@ void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file,
|
||||
snapshot = recheckSnapshot;
|
||||
}
|
||||
|
||||
// Update heartbeat
|
||||
uint32_t currentTime = millisecondsSinceEpoch32();
|
||||
uint32_t sinceLastHeartbeat = currentTime - snapshot.workerHeartbeat;
|
||||
snapshot.workerHeartbeat = currentTime;
|
||||
myTicket.write(snapshot);
|
||||
if (sinceLastHeartbeat > STALE_HEARTBEAT_THRESHOLD_MS / 5) {
|
||||
|
||||
std::cout << "\033[39;5;82m[Ticket Spawner - " << debugLabel << "]:\033[0m Heartbeat updated for ticket " << myTicket.getTicketIndex() << "." << std::endl;
|
||||
}
|
||||
|
||||
if (snapshot.state == SweepstoreTicketState::APPROVED) {
|
||||
snapshot.state = SweepstoreTicketState::EXECUTING;
|
||||
myTicket.write(snapshot);
|
||||
@@ -201,16 +219,22 @@ void SweepstoreConcurrency::spawnTicket(SweepstoreFileHandle* _file,
|
||||
}
|
||||
|
||||
varySleep(std::chrono::microseconds(500), std::chrono::microseconds(200));
|
||||
|
||||
auto end = std::chrono::high_resolution_clock::now();
|
||||
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();
|
||||
|
||||
// std::cout << "Loop duration: " << duration << " ms." << std::endl;
|
||||
}
|
||||
|
||||
// std::cout << "\033[38;5;82m[Ticket Spawner - " << debugLabel << "]:\033[0m Completed ticket " << myTicket.getTicketIndex() << "." << std::endl;
|
||||
delete file;
|
||||
// delete file;
|
||||
}
|
||||
|
||||
void SweepstoreConcurrency::initialiseMaster(std::string filePath) {
|
||||
SWEEPSTORE_TIME_FUNCTION();
|
||||
|
||||
auto log = [&](const std::string &message) {
|
||||
debugPrint("\033[38;5;33m[Concurrency Master]:\033[0m " + message);
|
||||
debugPrint(("\033[38;5;33m[Concurrency Master]:\033[0m " + message).c_str());
|
||||
};
|
||||
|
||||
SweepstoreFileHandle file(filePath, std::ios::binary | std::ios::in | std::ios::out);
|
||||
@@ -244,9 +268,27 @@ void SweepstoreConcurrency::initialiseMaster(std::string filePath) {
|
||||
ticket.write(cleanSnapshot);
|
||||
log("Reset ticket " + std::to_string(i) + ".");
|
||||
}
|
||||
}
|
||||
|
||||
// Handle stale tickets
|
||||
continue;
|
||||
|
||||
// Cleanup any stale tickets
|
||||
uint32_t currentTime = millisecondsSinceEpoch32();
|
||||
for (uint32_t i = 0; i < concurrentWorkers; i++) {
|
||||
SweepstoreWorkerTicket ticket(i, file);
|
||||
SweepstoreWorkerTicketSnapshot snapshot = ticket.snapshot();
|
||||
|
||||
if (snapshot.state != SweepstoreTicketState::FREE) {
|
||||
continue;
|
||||
}
|
||||
|
||||
bool stale_heartbeat = currentTime - snapshot.workerHeartbeat > STALE_HEARTBEAT_THRESHOLD_MS;
|
||||
|
||||
if (stale_heartbeat) {
|
||||
SweepstoreWorkerTicketSnapshot cleanSnapshot = SweepstoreWorkerTicketSnapshot();
|
||||
ticket.write(cleanSnapshot);
|
||||
log("Stale heartbeat " + std::to_string(i) + ".");
|
||||
}
|
||||
}
|
||||
|
||||
preciseSleep(std::chrono::milliseconds(1));
|
||||
|
||||
@@ -3,11 +3,11 @@
|
||||
|
||||
#include "sweepstore/utils/file_lock.h"
|
||||
#include "sweepstore/utils/helpers.h"
|
||||
#include "sweepstore/utils/timing.h"
|
||||
|
||||
std::string SweepstoreHeader::readMagicNumber() {
|
||||
file.readSeek(0, std::ios::beg);
|
||||
char buffer[4];
|
||||
file.readBytes(buffer, 4);
|
||||
file.seekAndRead(0, buffer, 4);
|
||||
return std::string(buffer, 4);
|
||||
}
|
||||
|
||||
@@ -15,14 +15,12 @@ void SweepstoreHeader::writeMagicNumber(const std::string& magicNumber) {
|
||||
if (magicNumber.size() != 4) {
|
||||
throw std::invalid_argument("Magic number must be exactly 4 characters long.");
|
||||
}
|
||||
file.writeSeek(0, std::ios::beg);
|
||||
file.writeBytes(magicNumber.c_str(), 4);
|
||||
file.seekAndWrite(0, magicNumber.c_str(), 4);
|
||||
}
|
||||
|
||||
std::string SweepstoreHeader::readVersion() {
|
||||
file.readSeek(4, std::ios::beg);
|
||||
char buffer[12];
|
||||
file.readBytes(buffer, 12);
|
||||
file.seekAndRead(4, buffer, 12);
|
||||
|
||||
// Trim leading and trailing spaces
|
||||
std::string version(buffer, 12);
|
||||
@@ -40,46 +38,39 @@ void SweepstoreHeader::writeVersion(const std::string& version) {
|
||||
std::string paddedVersion = " " + version;
|
||||
paddedVersion.resize(12, ' ');
|
||||
|
||||
file.writeSeek(4, std::ios::beg);
|
||||
file.writeBytes(paddedVersion.c_str(), 12);
|
||||
file.seekAndWrite(4, paddedVersion.c_str(), 12);
|
||||
}
|
||||
|
||||
SweepstorePointer SweepstoreHeader::readAddressTablePointer() {
|
||||
file.readSeek(16, std::ios::beg);
|
||||
int64_t address;
|
||||
file.readBytes(reinterpret_cast<char*>(&address), sizeof(address));
|
||||
file.seekAndRead(16, reinterpret_cast<char*>(&address), sizeof(address));
|
||||
return address; // Implicit conversion to SweepstorePointer
|
||||
}
|
||||
|
||||
void SweepstoreHeader::writeAddressTablePointer(const SweepstorePointer& ptr) {
|
||||
file.writeSeek(16, std::ios::beg);
|
||||
int64_t address = ptr;
|
||||
file.writeBytes(reinterpret_cast<const char*>(&address), sizeof(address));
|
||||
file.seekAndWrite(16, reinterpret_cast<const char*>(&address), sizeof(address));
|
||||
}
|
||||
|
||||
uint32_t SweepstoreHeader::readFreeListCount() {
|
||||
file.readSeek(24, std::ios::beg);
|
||||
uint32_t count;
|
||||
file.readBytes(reinterpret_cast<char*>(&count), sizeof(count));
|
||||
file.seekAndRead(24, reinterpret_cast<char*>(&count), sizeof(count));
|
||||
return count;
|
||||
}
|
||||
|
||||
void SweepstoreHeader::writeFreeListCount(uint32_t count) {
|
||||
file.writeSeek(24, std::ios::beg);
|
||||
file.writeBytes(reinterpret_cast<const char*>(&count), sizeof(count));
|
||||
file.seekAndWrite(24, reinterpret_cast<const char*>(&count), sizeof(count));
|
||||
}
|
||||
|
||||
bool SweepstoreHeader::readIsFreeListLifted() {
|
||||
file.readSeek(28, std::ios::beg);
|
||||
char flag;
|
||||
file.readBytes(&flag, sizeof(flag));
|
||||
file.seekAndRead(28, &flag, sizeof(flag));
|
||||
return flag != 0;
|
||||
}
|
||||
|
||||
void SweepstoreHeader::writeIsFreeListLifted(bool isLifted) {
|
||||
file.writeSeek(28, std::ios::beg);
|
||||
char flag = isLifted ? 1 : 0;
|
||||
file.writeBytes(&flag, sizeof(flag));
|
||||
file.seekAndWrite(28, &flag, sizeof(flag));
|
||||
}
|
||||
|
||||
void SweepstoreHeader::initialise() {
|
||||
@@ -92,60 +83,58 @@ void SweepstoreHeader::initialise() {
|
||||
}
|
||||
|
||||
uint64_t SweepstoreConcurrencyHeader::readMasterIdentifier() {
|
||||
file.readSeek(29, std::ios::beg);
|
||||
uint64_t identifier;
|
||||
file.readBytes(reinterpret_cast<char*>(&identifier), sizeof(identifier));
|
||||
file.seekAndRead(29, reinterpret_cast<char*>(&identifier), sizeof(identifier));
|
||||
return identifier;
|
||||
}
|
||||
|
||||
void SweepstoreConcurrencyHeader::writeMasterIdentifier(uint64_t identifier) {
|
||||
file.writeSeek(29, std::ios::beg);
|
||||
file.writeBytes(reinterpret_cast<const char*>(&identifier), sizeof(identifier));
|
||||
file.seekAndWrite(29, reinterpret_cast<const char*>(&identifier), sizeof(identifier));
|
||||
}
|
||||
|
||||
uint32_t SweepstoreConcurrencyHeader::readMasterHeartbeat() {
|
||||
file.readSeek(37, std::ios::beg);
|
||||
uint32_t heartbeat;
|
||||
file.readBytes(reinterpret_cast<char*>(&heartbeat), sizeof(heartbeat));
|
||||
file.seekAndRead(37, reinterpret_cast<char*>(&heartbeat), sizeof(heartbeat));
|
||||
return heartbeat;
|
||||
}
|
||||
|
||||
void SweepstoreConcurrencyHeader::writeMasterHeartbeat(uint32_t heartbeat) {
|
||||
file.writeSeek(37, std::ios::beg);
|
||||
file.writeBytes(reinterpret_cast<const char*>(&heartbeat), sizeof(heartbeat));
|
||||
file.seekAndWrite(37, reinterpret_cast<const char*>(&heartbeat), sizeof(heartbeat));
|
||||
}
|
||||
|
||||
uint32_t SweepstoreConcurrencyHeader::readNumberOfWorkers() {
|
||||
file.readSeek(41, std::ios::beg);
|
||||
uint32_t numWorkers;
|
||||
file.readBytes(reinterpret_cast<char*>(&numWorkers), sizeof(numWorkers));
|
||||
file.seekAndRead(41, reinterpret_cast<char*>(&numWorkers), sizeof(numWorkers));
|
||||
return numWorkers;
|
||||
}
|
||||
|
||||
void SweepstoreConcurrencyHeader::writeNumberOfWorkers(uint32_t numWorkers) {
|
||||
file.writeSeek(41, std::ios::beg);
|
||||
file.writeBytes(reinterpret_cast<const char*>(&numWorkers), sizeof(numWorkers));
|
||||
file.seekAndWrite(41, reinterpret_cast<const char*>(&numWorkers), sizeof(numWorkers));
|
||||
}
|
||||
|
||||
bool SweepstoreConcurrencyHeader::readIsReadAllowed() {
|
||||
file.readSeek(45, std::ios::beg);
|
||||
char flag;
|
||||
file.readBytes(&flag, sizeof(flag));
|
||||
file.seekAndRead(45, &flag, sizeof(flag));
|
||||
return flag != 0;
|
||||
}
|
||||
|
||||
void SweepstoreConcurrencyHeader::writeIsReadAllowed(bool isAllowed) {
|
||||
file.writeSeek(45, std::ios::beg);
|
||||
char flag = isAllowed ? 1 : 0;
|
||||
file.writeBytes(&flag, sizeof(flag));
|
||||
file.seekAndWrite(45, &flag, sizeof(flag));
|
||||
}
|
||||
|
||||
void SweepstoreConcurrencyHeader::initialise(int concurrentWorkers) {
|
||||
SWEEPSTORE_TIME_FUNCTION();
|
||||
writeMasterIdentifier(0);
|
||||
writeMasterHeartbeat(0);
|
||||
writeNumberOfWorkers(concurrentWorkers);
|
||||
writeIsReadAllowed(true);
|
||||
uint32_t verifyWorkers = readNumberOfWorkers();
|
||||
|
||||
if (verifyWorkers != static_cast<uint32_t>(concurrentWorkers)) {
|
||||
throw std::runtime_error("Failed to verify number of concurrent workers in concurrency header. Expected " + std::to_string(concurrentWorkers) + ", got " + std::to_string(verifyWorkers) + ".");
|
||||
}
|
||||
|
||||
for (uint32_t i = 0; i < verifyWorkers; i++) {
|
||||
SweepstoreWorkerTicketSnapshot ticket = SweepstoreWorkerTicketSnapshot();
|
||||
ticket.identifier = 0;
|
||||
@@ -163,11 +152,9 @@ void SweepstoreConcurrencyHeader::initialise(int concurrentWorkers) {
|
||||
}
|
||||
|
||||
void SweepstoreWorkerTicket::write(SweepstoreWorkerTicketSnapshot &snapshot) {
|
||||
SWEEPSTORE_TIME_FUNCTION();
|
||||
RandomAccessMemory buffer;
|
||||
|
||||
SweepstoreFileLock lock(file.getPath(), 0, 0, SweepstoreFileLock::Mode::Exclusive);
|
||||
SweepstoreFileLock::Scoped scopedLock(lock);
|
||||
|
||||
buffer.setPositionSync(0);
|
||||
buffer.writeIntSync(snapshot.identifier, 4);
|
||||
buffer.writeIntSync(snapshot.workerHeartbeat, 4);
|
||||
@@ -187,24 +174,23 @@ void SweepstoreWorkerTicket::write(SweepstoreWorkerTicketSnapshot &snapshot) {
|
||||
std::vector<uint8_t> data = buffer.readSync(buffer.length());
|
||||
char* dataPtr = reinterpret_cast<char*>(data.data());
|
||||
|
||||
// Write to file
|
||||
file.writeSeek(getOffset());
|
||||
file.writeBytes(dataPtr, data.size());
|
||||
// Write to file (byte-range locking handled automatically by seekAndWrite)
|
||||
file.seekAndWrite(getOffset(), dataPtr, data.size());
|
||||
file.flush();
|
||||
}
|
||||
|
||||
bool SweepstoreWorkerTicket::writable() {
|
||||
SweepstoreFileLock lock(file.getPath(), 0, 0, SweepstoreFileLock::Mode::Exclusive);
|
||||
return lock.isLocked() == false;
|
||||
SWEEPSTORE_TIME_FUNCTION();
|
||||
// SweepstoreFileLock lock(file.getPath(), getOffset(), TICKET_SIZE, SweepstoreFileLock::Mode::Exclusive);
|
||||
return true;
|
||||
}
|
||||
|
||||
SweepstoreWorkerTicketSnapshot SweepstoreWorkerTicket::snapshot() {
|
||||
SweepstoreFileLock lock(file.getPath(), 0, 0, SweepstoreFileLock::Mode::Shared);
|
||||
lock.lock();
|
||||
file.readSeek(getOffset());
|
||||
SWEEPSTORE_TIME_FUNCTION();
|
||||
|
||||
// Byte-range locking handled automatically by seekAndRead
|
||||
std::unique_ptr<char[]> buffer(new char[TICKET_SIZE]);
|
||||
file.readBytes(buffer.get(), TICKET_SIZE);
|
||||
lock.unlock();
|
||||
file.seekAndRead(getOffset(), buffer.get(), TICKET_SIZE);
|
||||
RandomAccessMemory ram(reinterpret_cast<uint8_t*>(buffer.get()), TICKET_SIZE);
|
||||
|
||||
SweepstoreWorkerTicketSnapshot snapshot;
|
||||
|
||||
@@ -1,12 +0,0 @@
|
||||
#include "sweepstore/utils/file_lock.h"
|
||||
#include "sweepstore/utils/file_handle.h"
|
||||
|
||||
// Thread-local FD cache definition for file locking
|
||||
#ifndef _WIN32
|
||||
thread_local std::unordered_map<std::string, int> SweepstoreFileLock::fdCache;
|
||||
#endif
|
||||
|
||||
// Thread-local stream cache definition for file handles
|
||||
#ifndef WITH_UNREAL
|
||||
thread_local std::unordered_map<std::string, std::unique_ptr<std::fstream>> SweepstoreFileHandle::streamCache;
|
||||
#endif
|
||||
@@ -1,6 +1,9 @@
|
||||
#include "sweepstore/utils/file_handle.h"
|
||||
#include "sweepstore/utils/file_lock.h"
|
||||
#include "sweepstore/utils/timing.h"
|
||||
#include <iostream>
|
||||
|
||||
// Constructor - just stores path and mode, actual stream is created per-thread
|
||||
// Constructor
|
||||
SweepstoreFileHandle::SweepstoreFileHandle(const std::string& p, std::ios::openmode mode)
|
||||
: path(p)
|
||||
, openMode(mode)
|
||||
@@ -26,29 +29,14 @@ SweepstoreFileHandle::SweepstoreFileHandle(const std::string& p, std::ios::openm
|
||||
}
|
||||
#else
|
||||
{
|
||||
// Thread-local streams created on demand in getThreadStream()
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifndef WITH_UNREAL
|
||||
// Get or create the fstream for this thread
|
||||
std::fstream& SweepstoreFileHandle::getThreadStream() {
|
||||
auto it = streamCache.find(path);
|
||||
if (it == streamCache.end() || !it->second || !it->second->is_open()) {
|
||||
// Create new stream for this thread
|
||||
auto stream = std::make_unique<std::fstream>(path, openMode);
|
||||
if (!stream->is_open()) {
|
||||
// Open the single shared stream
|
||||
stream.open(path, openMode);
|
||||
if (!stream.is_open()) {
|
||||
throw std::runtime_error("Failed to open file: " + path);
|
||||
}
|
||||
streamCache[path] = std::move(stream);
|
||||
return *streamCache[path];
|
||||
}
|
||||
return *it->second;
|
||||
}
|
||||
|
||||
const std::fstream& SweepstoreFileHandle::getThreadStream() const {
|
||||
// Use const_cast to reuse the non-const version
|
||||
return const_cast<SweepstoreFileHandle*>(this)->getThreadStream();
|
||||
// Disable stream buffering for cache coherency across threads
|
||||
stream.rdbuf()->pubsetbuf(0, 0);
|
||||
}
|
||||
#endif
|
||||
|
||||
@@ -57,8 +45,7 @@ bool SweepstoreFileHandle::isOpen() const {
|
||||
#ifdef WITH_UNREAL
|
||||
return unrealHandle != nullptr;
|
||||
#else
|
||||
auto it = streamCache.find(path);
|
||||
return it != streamCache.end() && it->second && it->second->is_open();
|
||||
return stream.is_open();
|
||||
#endif
|
||||
}
|
||||
|
||||
@@ -70,15 +57,57 @@ void SweepstoreFileHandle::close() {
|
||||
unrealHandle = nullptr;
|
||||
}
|
||||
#else
|
||||
// Close this thread's stream if it exists
|
||||
auto it = streamCache.find(path);
|
||||
if (it != streamCache.end() && it->second && it->second->is_open()) {
|
||||
it->second->close();
|
||||
std::lock_guard<std::mutex> lock(streamMutex);
|
||||
if (stream.is_open()) {
|
||||
stream.close();
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
// seekAndRead
|
||||
void SweepstoreFileHandle::seekAndRead(uint64_t offset, char* buffer, size_t size) {
|
||||
SWEEPSTORE_TIME_FUNCTION();
|
||||
#ifdef WITH_UNREAL
|
||||
unrealHandle->Seek(offset);
|
||||
unrealHandle->Read(reinterpret_cast<uint8*>(buffer), size);
|
||||
#else
|
||||
// Acquire byte-range lock (allows parallel access to different byte ranges)
|
||||
SweepstoreFileLock rangeLock(path, offset, size, SweepstoreFileLock::Mode::Shared);
|
||||
rangeLock.lock();
|
||||
|
||||
// Brief stream mutex only during actual I/O
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(streamMutex);
|
||||
stream.seekg(offset, std::ios::beg);
|
||||
if (stream.fail()) stream.clear();
|
||||
stream.read(buffer, size);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
// seekAndWrite
|
||||
void SweepstoreFileHandle::seekAndWrite(uint64_t offset, const char* buffer, size_t size) {
|
||||
SWEEPSTORE_TIME_FUNCTION();
|
||||
#ifdef WITH_UNREAL
|
||||
unrealHandle->Seek(offset);
|
||||
unrealHandle->Write(reinterpret_cast<const uint8*>(buffer), size);
|
||||
unrealHandle->Flush();
|
||||
#else
|
||||
// Acquire byte-range lock (allows parallel access to different byte ranges)
|
||||
SweepstoreFileLock rangeLock(path, offset, size, SweepstoreFileLock::Mode::Exclusive);
|
||||
rangeLock.lock();
|
||||
|
||||
// Brief stream mutex only during actual I/O
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(streamMutex);
|
||||
stream.seekp(offset, std::ios::beg);
|
||||
if (stream.fail()) stream.clear();
|
||||
stream.write(buffer, size);
|
||||
stream.flush();
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
#if defined(_WIN32) || defined(WITH_UNREAL)
|
||||
// flush
|
||||
void SweepstoreFileHandle::flush() {
|
||||
#ifdef WITH_UNREAL
|
||||
@@ -86,13 +115,11 @@ void SweepstoreFileHandle::flush() {
|
||||
unrealHandle->Flush();
|
||||
}
|
||||
#else
|
||||
// Windows-specific implementation for guaranteed flush to disk
|
||||
auto& stream = getThreadStream();
|
||||
std::lock_guard<std::mutex> lock(streamMutex);
|
||||
stream.flush();
|
||||
|
||||
// On Windows, also call sync to push to OS buffers
|
||||
// Then open a Windows HANDLE to the same file and call FlushFileBuffers
|
||||
// This is more reliable than trying to extract the HANDLE from fstream
|
||||
// On Windows, also sync to disk
|
||||
#ifdef _WIN32
|
||||
HANDLE h = CreateFileA(
|
||||
path.c_str(),
|
||||
GENERIC_WRITE,
|
||||
@@ -108,65 +135,41 @@ void SweepstoreFileHandle::flush() {
|
||||
CloseHandle(h);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
// readSeek
|
||||
void SweepstoreFileHandle::readSeek(std::streampos pos, std::ios::seekdir dir) {
|
||||
#ifdef WITH_UNREAL
|
||||
// Unreal doesn't have separate read/write pointers, so just seek
|
||||
int64 unrealPos = static_cast<int64>(pos);
|
||||
if (dir == std::ios::beg) {
|
||||
unrealHandle->Seek(unrealPos);
|
||||
} else if (dir == std::ios::cur) {
|
||||
unrealHandle->Seek(unrealHandle->Tell() + unrealPos);
|
||||
} else if (dir == std::ios::end) {
|
||||
unrealHandle->SeekFromEnd(unrealPos);
|
||||
}
|
||||
#else
|
||||
// Windows - simplified to only seek read pointer
|
||||
auto& stream = getThreadStream();
|
||||
stream.seekg(pos, dir);
|
||||
if (stream.fail()) {
|
||||
stream.clear();
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
// writeSeek
|
||||
void SweepstoreFileHandle::writeSeek(std::streampos pos, std::ios::seekdir dir) {
|
||||
// Move constructor
|
||||
SweepstoreFileHandle::SweepstoreFileHandle(SweepstoreFileHandle&& other) noexcept
|
||||
: path(std::move(other.path))
|
||||
, openMode(other.openMode)
|
||||
#ifdef WITH_UNREAL
|
||||
// Same as readSeek for Unreal
|
||||
readSeek(pos, dir);
|
||||
, unrealHandle(other.unrealHandle)
|
||||
{
|
||||
other.unrealHandle = nullptr;
|
||||
}
|
||||
#else
|
||||
// Windows - simplified to only seek write pointer
|
||||
auto& stream = getThreadStream();
|
||||
stream.seekp(pos, dir);
|
||||
if (stream.fail()) {
|
||||
stream.clear();
|
||||
, stream(std::move(other.stream))
|
||||
{
|
||||
}
|
||||
#endif
|
||||
|
||||
// Move assignment
|
||||
SweepstoreFileHandle& SweepstoreFileHandle::operator=(SweepstoreFileHandle&& other) noexcept {
|
||||
if (this != &other) {
|
||||
close();
|
||||
path = std::move(other.path);
|
||||
openMode = other.openMode;
|
||||
#ifdef WITH_UNREAL
|
||||
unrealHandle = other.unrealHandle;
|
||||
other.unrealHandle = nullptr;
|
||||
#else
|
||||
stream = std::move(other.stream);
|
||||
#endif
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
// readBytes
|
||||
void SweepstoreFileHandle::readBytes(char* buffer, std::streamsize size) {
|
||||
#ifdef WITH_UNREAL
|
||||
unrealHandle->Read(reinterpret_cast<uint8*>(buffer), size);
|
||||
#else
|
||||
// Windows
|
||||
auto& stream = getThreadStream();
|
||||
stream.read(buffer, size);
|
||||
#endif
|
||||
// Destructor
|
||||
SweepstoreFileHandle::~SweepstoreFileHandle() {
|
||||
close();
|
||||
}
|
||||
|
||||
// writeBytes
|
||||
void SweepstoreFileHandle::writeBytes(const char* buffer, std::streamsize size) {
|
||||
#ifdef WITH_UNREAL
|
||||
unrealHandle->Write(reinterpret_cast<const uint8*>(buffer), size);
|
||||
unrealHandle->Flush(); // Unreal requires explicit flush
|
||||
#else
|
||||
// Windows
|
||||
auto& stream = getThreadStream();
|
||||
stream.write(buffer, size);
|
||||
#endif
|
||||
}
|
||||
#endif // _WIN32 || WITH_UNREAL
|
||||
@@ -1,7 +1,4 @@
|
||||
#include "sweepstore/utils/file_lock.h"
|
||||
|
||||
#ifdef _WIN32
|
||||
thread_local std::unordered_map<std::string, HANDLE> SweepstoreFileLock::handleCache;
|
||||
#else
|
||||
thread_local std::unordered_map<std::string, int> SweepstoreFileLock::fdCache;
|
||||
#endif
|
||||
// Implementation is entirely in the header (inline and static members)
|
||||
// This file exists to satisfy CMake build requirements
|
||||
206
cpp/src/Private/sweepstore/utils/timing.cpp
Normal file
206
cpp/src/Private/sweepstore/utils/timing.cpp
Normal file
@@ -0,0 +1,206 @@
|
||||
#include "sweepstore/utils/timing.h"
|
||||
|
||||
#if SWEEPSTORE_ENABLE_TIMING
|
||||
|
||||
#include <vector>
|
||||
#include <unordered_map>
|
||||
#include <mutex>
|
||||
#include <iostream>
|
||||
#include <fstream>
|
||||
#include <iomanip>
|
||||
#include <algorithm>
|
||||
#include <thread>
|
||||
#include <atomic>
|
||||
#include <cstdlib>
|
||||
|
||||
// Thread-local scope stack for hierarchy tracking
|
||||
struct ScopeStackEntry {
|
||||
uint64_t eventId;
|
||||
const char* name;
|
||||
};
|
||||
|
||||
// Thread-local storage
|
||||
thread_local std::vector<ScopeStackEntry> scopeStack;
|
||||
thread_local std::unordered_map<std::string, ScopeTimingStats> threadLocalStats;
|
||||
thread_local std::vector<TimingTraceEvent> threadLocalTraceEvents;
|
||||
thread_local uint64_t nextEventId = 1;
|
||||
thread_local uint64_t threadLocalId = 0;
|
||||
|
||||
std::atomic<uint64_t> nextThreadId(1); // Sequential thread ID counter
|
||||
|
||||
// Global storage for all events from all threads
|
||||
std::mutex globalEventsMutex;
|
||||
std::vector<TimingTraceEvent> globalTraceEvents;
|
||||
|
||||
namespace SweepstoreTiming {
|
||||
|
||||
uint64_t getNextEventId() {
|
||||
return nextEventId++;
|
||||
}
|
||||
|
||||
uint64_t getParentEventId() {
|
||||
if (scopeStack.empty()) {
|
||||
return 0; // Root scope
|
||||
}
|
||||
return scopeStack.back().eventId;
|
||||
}
|
||||
|
||||
uint64_t getThreadId() {
|
||||
// Assign a unique sequential ID to this thread if not already assigned
|
||||
if (threadLocalId == 0) {
|
||||
threadLocalId = nextThreadId.fetch_add(1);
|
||||
}
|
||||
return threadLocalId;
|
||||
}
|
||||
|
||||
void recordScopeStart(const char* name, uint64_t eventId) {
|
||||
ScopeStackEntry entry;
|
||||
entry.eventId = eventId;
|
||||
entry.name = name;
|
||||
scopeStack.push_back(entry);
|
||||
}
|
||||
|
||||
void recordScopeEnd(const char* name, uint64_t eventId, uint64_t startMicros, uint64_t durationMicros, uint64_t threadId) {
|
||||
// Pop from stack
|
||||
if (!scopeStack.empty()) {
|
||||
scopeStack.pop_back();
|
||||
}
|
||||
|
||||
// Get parent event ID (after popping, so we get the correct parent)
|
||||
uint64_t parentId = scopeStack.empty() ? 0 : scopeStack.back().eventId;
|
||||
|
||||
// Update statistics
|
||||
std::string scopeName(name);
|
||||
auto& stats = threadLocalStats[scopeName];
|
||||
stats.callCount++;
|
||||
stats.totalMicros += durationMicros;
|
||||
stats.minMicros = std::min(stats.minMicros, durationMicros);
|
||||
stats.maxMicros = std::max(stats.maxMicros, durationMicros);
|
||||
|
||||
// Record trace event
|
||||
TimingTraceEvent event;
|
||||
event.name = scopeName;
|
||||
event.startMicros = startMicros;
|
||||
event.durationMicros = durationMicros;
|
||||
event.threadId = threadId;
|
||||
event.parentEventId = parentId;
|
||||
event.eventId = eventId;
|
||||
threadLocalTraceEvents.push_back(event);
|
||||
}
|
||||
|
||||
// Initialize the output file (call once at start)
|
||||
void initOutputFile() {
|
||||
// Just clear the global events vector
|
||||
std::lock_guard<std::mutex> lock(globalEventsMutex);
|
||||
globalTraceEvents.clear();
|
||||
}
|
||||
|
||||
// Append this thread's data to the global collection
|
||||
void flushThreadData() {
|
||||
if (threadLocalTraceEvents.empty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lock(globalEventsMutex);
|
||||
|
||||
// Move thread-local events to global storage
|
||||
globalTraceEvents.insert(
|
||||
globalTraceEvents.end(),
|
||||
threadLocalTraceEvents.begin(),
|
||||
threadLocalTraceEvents.end()
|
||||
);
|
||||
|
||||
std::cout << "Thread " << threadLocalId << " flushed " << threadLocalTraceEvents.size() << " events" << std::endl;
|
||||
|
||||
// Clear the events after flushing
|
||||
threadLocalTraceEvents.clear();
|
||||
}
|
||||
|
||||
// Finalize the output file (call once at end)
|
||||
void finalizeOutputFile() {
|
||||
size_t eventCount = 0;
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(globalEventsMutex);
|
||||
eventCount = globalTraceEvents.size();
|
||||
}
|
||||
|
||||
std::cout << "Sorting " << eventCount << " events by timestamp..." << std::endl;
|
||||
std::cout.flush();
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(globalEventsMutex);
|
||||
|
||||
// Sort all events by start time
|
||||
std::sort(globalTraceEvents.begin(), globalTraceEvents.end(),
|
||||
[](const TimingTraceEvent& a, const TimingTraceEvent& b) {
|
||||
return a.startMicros < b.startMicros;
|
||||
});
|
||||
}
|
||||
|
||||
std::cout << "Writing trace file..." << std::endl;
|
||||
std::cout.flush();
|
||||
|
||||
// Write to file with FILE* for better control
|
||||
FILE* f = fopen("sweepstore_trace.json", "w");
|
||||
if (!f) {
|
||||
std::cerr << "Failed to open sweepstore_trace.json for writing" << std::endl;
|
||||
return;
|
||||
}
|
||||
|
||||
fprintf(f, "{\n");
|
||||
fprintf(f, " \"displayTimeUnit\": \"ms\",\n");
|
||||
fprintf(f, " \"traceEvents\": [\n");
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(globalEventsMutex);
|
||||
|
||||
for (size_t i = 0; i < globalTraceEvents.size(); i++) {
|
||||
const TimingTraceEvent& event = globalTraceEvents[i];
|
||||
|
||||
// Write Begin event
|
||||
fprintf(f, " {\"name\":\"%s\",\"cat\":\"scope\",\"ph\":\"B\",\"ts\":%llu,\"pid\":1,\"tid\":%llu,\"args\":{}},\n",
|
||||
event.name.c_str(),
|
||||
(unsigned long long)event.startMicros,
|
||||
(unsigned long long)event.threadId);
|
||||
|
||||
// Write End event
|
||||
fprintf(f, " {\"name\":\"%s\",\"cat\":\"scope\",\"ph\":\"E\",\"ts\":%llu,\"pid\":1,\"tid\":%llu,\"args\":{}}",
|
||||
event.name.c_str(),
|
||||
(unsigned long long)(event.startMicros + event.durationMicros),
|
||||
(unsigned long long)event.threadId);
|
||||
|
||||
if (i < globalTraceEvents.size() - 1) {
|
||||
fprintf(f, ",\n");
|
||||
} else {
|
||||
fprintf(f, "\n");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fprintf(f, " ]\n}\n");
|
||||
fflush(f); // Ensure all data is written to disk before closing
|
||||
fclose(f);
|
||||
|
||||
std::cout << "Trace written to sweepstore_trace.json (" << eventCount << " events)" << std::endl;
|
||||
std::cout.flush();
|
||||
}
|
||||
|
||||
} // namespace SweepstoreTiming
|
||||
|
||||
#else // SWEEPSTORE_ENABLE_TIMING
|
||||
|
||||
// Stub implementations when timing is disabled
|
||||
namespace SweepstoreTiming {
|
||||
void recordScopeStart(const char*, uint64_t) {}
|
||||
void recordScopeEnd(const char*, uint64_t, uint64_t, uint64_t, uint64_t) {}
|
||||
uint64_t getCurrentEventId() { return 0; }
|
||||
uint64_t getNextEventId() { return 0; }
|
||||
uint64_t getParentEventId() { return 0; }
|
||||
uint64_t getThreadId() { return 0; }
|
||||
void flushThreadData() {}
|
||||
void initOutputFile() {}
|
||||
void finalizeOutputFile() {}
|
||||
}
|
||||
|
||||
#endif // SWEEPSTORE_ENABLE_TIMING
|
||||
@@ -6,7 +6,9 @@
|
||||
#include <thread>
|
||||
#include <memory>
|
||||
|
||||
#define STALE_HEARTBEAT_THRESHOLD_MS 5000
|
||||
#include "sweepstore/utils/timing.h"
|
||||
|
||||
#define STALE_HEARTBEAT_THRESHOLD_MS 1000
|
||||
|
||||
enum SweepstoreTicketOperation : int;
|
||||
class SweepstoreFileHandle;
|
||||
@@ -24,6 +26,7 @@ namespace SweepstoreConcurrency {
|
||||
void initialiseMaster(std::string filePath);
|
||||
|
||||
inline void initialiseMasterAsync(std::string filePath) {
|
||||
SWEEPSTORE_TIME_FUNCTION();
|
||||
std::thread([filePath]() {
|
||||
initialiseMaster(filePath);
|
||||
}).detach();
|
||||
|
||||
@@ -84,7 +84,7 @@ class SweepstoreWorkerTicket {
|
||||
SweepstoreFileHandle& file;
|
||||
uint32_t ticketIndex;
|
||||
|
||||
uint64_t getOffset() const {
|
||||
[[nodiscard]] uint64_t getOffset() const {
|
||||
return SWEEPSTORE_COMBINED_STATIC_HEADER_SIZE + (ticketIndex * TICKET_SIZE);
|
||||
}
|
||||
|
||||
|
||||
@@ -55,6 +55,8 @@ public:
|
||||
sizeof(T),
|
||||
[this, key = this->key, &value]() {
|
||||
|
||||
|
||||
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@@ -2,9 +2,7 @@
|
||||
|
||||
#include <fstream>
|
||||
#include <string>
|
||||
#include <memory>
|
||||
#include <iostream>
|
||||
#include <unordered_map>
|
||||
#include <mutex>
|
||||
|
||||
#ifdef _WIN32
|
||||
#include <windows.h>
|
||||
@@ -22,15 +20,15 @@ class SweepstoreFileHandle {
|
||||
private:
|
||||
std::string path;
|
||||
std::ios::openmode openMode;
|
||||
|
||||
#ifdef WITH_UNREAL
|
||||
IFileHandle* unrealHandle;
|
||||
#else
|
||||
// Thread-local cache: each thread gets its own fstream per path
|
||||
static thread_local std::unordered_map<std::string, std::unique_ptr<std::fstream>> streamCache;
|
||||
// Single shared stream for all threads
|
||||
std::fstream stream;
|
||||
|
||||
// Get or create the fstream for this thread
|
||||
std::fstream& getThreadStream();
|
||||
const std::fstream& getThreadStream() const;
|
||||
// Mutex protecting the stream
|
||||
std::mutex streamMutex;
|
||||
#endif
|
||||
|
||||
public:
|
||||
@@ -38,76 +36,23 @@ public:
|
||||
|
||||
const std::string& getPath() const { return path; }
|
||||
|
||||
#ifndef WITH_UNREAL
|
||||
std::fstream& getStream() { return getThreadStream(); }
|
||||
const std::fstream& getStream() const { return getThreadStream(); }
|
||||
|
||||
// Smart pointer-like interface
|
||||
std::fstream* operator->() { return &getThreadStream(); }
|
||||
const std::fstream* operator->() const { return &getThreadStream(); }
|
||||
|
||||
std::fstream& operator*() { return getThreadStream(); }
|
||||
const std::fstream& operator*() const { return getThreadStream(); }
|
||||
#endif
|
||||
|
||||
bool isOpen() const;
|
||||
void close();
|
||||
|
||||
// Windows-compatible I/O wrappers
|
||||
#if defined(_WIN32) || defined(WITH_UNREAL)
|
||||
// Main I/O API - atomic seek+read/write operations
|
||||
void seekAndRead(uint64_t offset, char* buffer, size_t size);
|
||||
void seekAndWrite(uint64_t offset, const char* buffer, size_t size);
|
||||
|
||||
// Explicit flush
|
||||
void flush();
|
||||
void readSeek(std::streampos pos, std::ios::seekdir dir = std::ios::beg);
|
||||
void writeSeek(std::streampos pos, std::ios::seekdir dir = std::ios::beg);
|
||||
void readBytes(char* buffer, std::streamsize size);
|
||||
void writeBytes(const char* buffer, std::streamsize size);
|
||||
#else
|
||||
// Inline for non-Windows to avoid overhead
|
||||
inline void flush() {
|
||||
getThreadStream().flush();
|
||||
}
|
||||
inline void readSeek(std::streampos pos, std::ios::seekdir dir = std::ios::beg) {
|
||||
getThreadStream().seekg(pos, dir);
|
||||
}
|
||||
inline void writeSeek(std::streampos pos, std::ios::seekdir dir = std::ios::beg) {
|
||||
getThreadStream().seekp(pos, dir);
|
||||
}
|
||||
inline void readBytes(char* buffer, std::streamsize size) {
|
||||
getThreadStream().read(buffer, size);
|
||||
}
|
||||
inline void writeBytes(const char* buffer, std::streamsize size) {
|
||||
getThreadStream().write(buffer, size);
|
||||
}
|
||||
#endif
|
||||
|
||||
SweepstoreFileHandle(SweepstoreFileHandle&& other) noexcept
|
||||
: path(std::move(other.path))
|
||||
, openMode(other.openMode)
|
||||
#ifdef WITH_UNREAL
|
||||
, unrealHandle(other.unrealHandle)
|
||||
#endif
|
||||
{
|
||||
#ifdef WITH_UNREAL
|
||||
other.unrealHandle = nullptr;
|
||||
#endif
|
||||
}
|
||||
|
||||
SweepstoreFileHandle& operator=(SweepstoreFileHandle&& other) noexcept {
|
||||
if (this != &other) {
|
||||
close();
|
||||
path = std::move(other.path);
|
||||
openMode = other.openMode;
|
||||
#ifdef WITH_UNREAL
|
||||
unrealHandle = other.unrealHandle;
|
||||
other.unrealHandle = nullptr;
|
||||
#endif
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
// Move semantics
|
||||
SweepstoreFileHandle(SweepstoreFileHandle&& other) noexcept;
|
||||
SweepstoreFileHandle& operator=(SweepstoreFileHandle&& other) noexcept;
|
||||
|
||||
// Delete copy semantics
|
||||
SweepstoreFileHandle(const SweepstoreFileHandle&) = delete;
|
||||
SweepstoreFileHandle& operator=(const SweepstoreFileHandle&) = delete;
|
||||
|
||||
~SweepstoreFileHandle() {
|
||||
close();
|
||||
}
|
||||
~SweepstoreFileHandle();
|
||||
};
|
||||
@@ -2,143 +2,116 @@
|
||||
|
||||
#include <string>
|
||||
#include <cstdint>
|
||||
#include <unordered_map>
|
||||
#include <stdexcept>
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
#include <map>
|
||||
#include <vector>
|
||||
#include <algorithm>
|
||||
#include <memory>
|
||||
|
||||
#ifdef _WIN32
|
||||
#include <windows.h>
|
||||
#else
|
||||
#include <fcntl.h>
|
||||
#include <unistd.h>
|
||||
#include <sys/file.h>
|
||||
#endif
|
||||
|
||||
// Simple file lock using flock() with thread-local FD cache
|
||||
// Each thread has its own FD, flock() is per-FD, so threads don't conflict
|
||||
// Matches Dart's paradigm: each isolate has its own RandomAccessFile
|
||||
// C++ level byte-range locking
|
||||
// Allows Thread A (ticket 5) and Thread B (ticket 10) to work in parallel on different byte ranges
|
||||
// Uses static shared state to coordinate locks across all SweepstoreFileLock instances
|
||||
class SweepstoreFileLock {
|
||||
public:
|
||||
enum class Mode { Shared, Exclusive };
|
||||
|
||||
private:
|
||||
struct LockRange {
|
||||
uint64_t offset;
|
||||
uint64_t length;
|
||||
Mode mode;
|
||||
|
||||
uint64_t end() const { return offset + length; }
|
||||
|
||||
bool overlaps(uint64_t otherOffset, uint64_t otherLength) const {
|
||||
uint64_t otherEnd = otherOffset + otherLength;
|
||||
return offset < otherEnd && otherOffset < end();
|
||||
}
|
||||
};
|
||||
|
||||
// Static shared state for all locks across all instances
|
||||
struct SharedLockState {
|
||||
std::mutex mutex;
|
||||
std::condition_variable cv;
|
||||
// Map: file path -> list of active lock ranges
|
||||
std::map<std::string, std::vector<LockRange>> activeLocks;
|
||||
};
|
||||
|
||||
static SharedLockState& getSharedState() {
|
||||
static SharedLockState state;
|
||||
return state;
|
||||
}
|
||||
|
||||
std::string filePath;
|
||||
uint64_t offset;
|
||||
uint64_t length;
|
||||
Mode mode;
|
||||
bool locked = false;
|
||||
|
||||
#ifdef _WIN32
|
||||
// Thread-local HANDLE cache for Windows
|
||||
static thread_local std::unordered_map<std::string, HANDLE> handleCache;
|
||||
|
||||
static HANDLE getOrOpenHandle(const std::string& path) {
|
||||
auto it = handleCache.find(path);
|
||||
if (it != handleCache.end()) {
|
||||
return it->second;
|
||||
// Check if acquiring this lock would conflict with existing locks
|
||||
bool wouldConflict(const std::vector<LockRange>& existingLocks) const {
|
||||
for (const auto& existing : existingLocks) {
|
||||
if (existing.overlaps(offset, length)) {
|
||||
// Conflict if either lock is exclusive
|
||||
if (mode == Mode::Exclusive || existing.mode == Mode::Exclusive) {
|
||||
return true;
|
||||
}
|
||||
// Shared locks don't conflict with each other
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
HANDLE handle = CreateFileA(
|
||||
path.c_str(),
|
||||
GENERIC_READ | GENERIC_WRITE,
|
||||
FILE_SHARE_READ | FILE_SHARE_WRITE,
|
||||
NULL,
|
||||
OPEN_EXISTING,
|
||||
FILE_ATTRIBUTE_NORMAL,
|
||||
NULL
|
||||
void acquire() {
|
||||
auto& state = getSharedState();
|
||||
std::unique_lock<std::mutex> lock(state.mutex);
|
||||
|
||||
// Wait until no conflicts
|
||||
state.cv.wait(lock, [&]() {
|
||||
auto it = state.activeLocks.find(filePath);
|
||||
if (it == state.activeLocks.end()) {
|
||||
return true; // No locks on this file yet
|
||||
}
|
||||
return !wouldConflict(it->second);
|
||||
});
|
||||
|
||||
// Add our lock
|
||||
state.activeLocks[filePath].push_back({offset, length, mode});
|
||||
locked = true;
|
||||
}
|
||||
|
||||
void release() {
|
||||
if (!locked) return;
|
||||
|
||||
auto& state = getSharedState();
|
||||
std::unique_lock<std::mutex> lock(state.mutex);
|
||||
|
||||
auto it = state.activeLocks.find(filePath);
|
||||
if (it != state.activeLocks.end()) {
|
||||
auto& locks = it->second;
|
||||
|
||||
// Remove our lock (find by offset/length/mode match)
|
||||
locks.erase(
|
||||
std::remove_if(locks.begin(), locks.end(), [&](const LockRange& r) {
|
||||
return r.offset == offset && r.length == length && r.mode == mode;
|
||||
}),
|
||||
locks.end()
|
||||
);
|
||||
|
||||
if (handle == INVALID_HANDLE_VALUE) {
|
||||
throw std::runtime_error("Failed to open file for locking: " + path);
|
||||
// Clean up if no more locks on this file
|
||||
if (locks.empty()) {
|
||||
state.activeLocks.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
handleCache[path] = handle;
|
||||
return handle;
|
||||
}
|
||||
|
||||
void acquire() {
|
||||
HANDLE handle = getOrOpenHandle(filePath);
|
||||
OVERLAPPED overlapped = {}; // Proper zero-initialization
|
||||
overlapped.Offset = static_cast<DWORD>(offset & 0xFFFFFFFF);
|
||||
overlapped.OffsetHigh = static_cast<DWORD>(offset >> 32);
|
||||
|
||||
DWORD length_low = static_cast<DWORD>(length & 0xFFFFFFFF);
|
||||
DWORD length_high = static_cast<DWORD>(length >> 32);
|
||||
DWORD flags = (mode == Mode::Exclusive) ? LOCKFILE_EXCLUSIVE_LOCK : 0;
|
||||
|
||||
if (!LockFileEx(handle, flags, 0, length_low, length_high, &overlapped)) {
|
||||
throw std::runtime_error("Failed to acquire file lock");
|
||||
}
|
||||
locked = true;
|
||||
}
|
||||
|
||||
void release() {
|
||||
if (locked) {
|
||||
HANDLE handle = getOrOpenHandle(filePath);
|
||||
OVERLAPPED overlapped = {};
|
||||
overlapped.Offset = static_cast<DWORD>(offset & 0xFFFFFFFF);
|
||||
overlapped.OffsetHigh = static_cast<DWORD>(offset >> 32);
|
||||
|
||||
DWORD length_low = static_cast<DWORD>(length & 0xFFFFFFFF);
|
||||
DWORD length_high = static_cast<DWORD>(length >> 32);
|
||||
|
||||
UnlockFileEx(handle, 0, length_low, length_high, &overlapped);
|
||||
locked = false;
|
||||
}
|
||||
}
|
||||
#else
|
||||
// Thread-local FD cache - each thread has its own FD per file
|
||||
static thread_local std::unordered_map<std::string, int> fdCache;
|
||||
|
||||
static int getOrOpenFD(const std::string& path) {
|
||||
auto it = fdCache.find(path);
|
||||
if (it != fdCache.end()) {
|
||||
return it->second;
|
||||
// Notify waiting threads
|
||||
state.cv.notify_all();
|
||||
}
|
||||
|
||||
int fd = open(path.c_str(), O_RDWR);
|
||||
if (fd == -1) {
|
||||
throw std::runtime_error("Failed to open file for locking: " + path);
|
||||
}
|
||||
|
||||
fdCache[path] = fd;
|
||||
return fd;
|
||||
}
|
||||
|
||||
void acquire() {
|
||||
int fd = getOrOpenFD(filePath);
|
||||
|
||||
struct flock lock_info;
|
||||
lock_info.l_type = (mode == Mode::Exclusive) ? F_WRLCK : F_RDLCK;
|
||||
lock_info.l_whence = SEEK_SET;
|
||||
lock_info.l_start = offset;
|
||||
lock_info.l_len = length;
|
||||
lock_info.l_pid = 0;
|
||||
|
||||
if (fcntl(fd, F_SETLKW, &lock_info) == -1) {
|
||||
throw std::runtime_error("Failed to acquire file lock");
|
||||
}
|
||||
locked = true;
|
||||
}
|
||||
|
||||
void release() {
|
||||
if (locked) {
|
||||
int fd = getOrOpenFD(filePath);
|
||||
|
||||
struct flock lock_info;
|
||||
lock_info.l_type = F_UNLCK;
|
||||
lock_info.l_whence = SEEK_SET;
|
||||
lock_info.l_start = offset;
|
||||
lock_info.l_len = length;
|
||||
lock_info.l_pid = 0;
|
||||
|
||||
fcntl(fd, F_SETLK, &lock_info);
|
||||
locked = false;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
public:
|
||||
// Constructor accepts offset/length for byte-range locking
|
||||
SweepstoreFileLock(const std::string& path, uint64_t off, uint64_t len, Mode m)
|
||||
: filePath(path), offset(off), length(len), mode(m) {}
|
||||
|
||||
@@ -156,59 +129,16 @@ public:
|
||||
return locked;
|
||||
}
|
||||
|
||||
// Check if file is currently locked (non-blocking test)
|
||||
bool isLocked() {
|
||||
#ifdef _WIN32
|
||||
HANDLE handle = getOrOpenHandle(filePath);
|
||||
OVERLAPPED overlapped = {};
|
||||
overlapped.Offset = static_cast<DWORD>(offset & 0xFFFFFFFF);
|
||||
overlapped.OffsetHigh = static_cast<DWORD>(offset >> 32);
|
||||
|
||||
DWORD length_low = static_cast<DWORD>(length & 0xFFFFFFFF);
|
||||
DWORD length_high = static_cast<DWORD>(length >> 32);
|
||||
DWORD flags = (mode == Mode::Exclusive) ? LOCKFILE_EXCLUSIVE_LOCK : 0;
|
||||
flags |= LOCKFILE_FAIL_IMMEDIATELY;
|
||||
|
||||
// Try non-blocking lock
|
||||
if (!LockFileEx(handle, flags, 0, length_low, length_high, &overlapped)) {
|
||||
return true; // Already locked
|
||||
}
|
||||
|
||||
// Got the lock, release immediately
|
||||
UnlockFileEx(handle, 0, length_low, length_high, &overlapped);
|
||||
return false;
|
||||
#else
|
||||
int fd = getOrOpenFD(filePath);
|
||||
|
||||
struct flock lock_info;
|
||||
lock_info.l_type = (mode == Mode::Exclusive) ? F_WRLCK : F_RDLCK;
|
||||
lock_info.l_whence = SEEK_SET;
|
||||
lock_info.l_start = offset;
|
||||
lock_info.l_len = length;
|
||||
lock_info.l_pid = 0;
|
||||
|
||||
// Try non-blocking lock
|
||||
if (fcntl(fd, F_SETLK, &lock_info) == -1) {
|
||||
return true; // Already locked
|
||||
}
|
||||
|
||||
// Got the lock, release immediately
|
||||
lock_info.l_type = F_UNLCK;
|
||||
fcntl(fd, F_SETLK, &lock_info);
|
||||
return false;
|
||||
#endif
|
||||
}
|
||||
|
||||
// RAII helper for scoped locking
|
||||
class Scoped {
|
||||
SweepstoreFileLock& lock;
|
||||
SweepstoreFileLock& lockRef;
|
||||
public:
|
||||
Scoped(SweepstoreFileLock& l) : lock(l) {
|
||||
lock.lock();
|
||||
Scoped(SweepstoreFileLock& l) : lockRef(l) {
|
||||
lockRef.lock();
|
||||
}
|
||||
|
||||
~Scoped() {
|
||||
lock.unlock();
|
||||
lockRef.unlock();
|
||||
}
|
||||
|
||||
Scoped(const Scoped&) = delete;
|
||||
|
||||
@@ -341,6 +341,13 @@ inline void preciseSleep(std::chrono::nanoseconds duration) {
|
||||
auto start = std::chrono::high_resolution_clock::now();
|
||||
|
||||
#ifdef _WIN32
|
||||
// Set timer resolution to 1ms once per process
|
||||
[[maybe_unused]] static bool timerResolutionSet = []() {
|
||||
timeBeginPeriod(1);
|
||||
std::atexit([]() { timeEndPeriod(1); });
|
||||
return true;
|
||||
}();
|
||||
|
||||
const auto windowsMinSleepTime = std::chrono::milliseconds(1);
|
||||
|
||||
if (duration < windowsMinSleepTime) {
|
||||
|
||||
213
cpp/src/Public/sweepstore/utils/timing.h
Normal file
213
cpp/src/Public/sweepstore/utils/timing.h
Normal file
@@ -0,0 +1,213 @@
|
||||
#ifndef SWEEPSTORE_TIMING_H
|
||||
#define SWEEPSTORE_TIMING_H
|
||||
|
||||
/**
|
||||
* @file timing.h
|
||||
* @brief Hierarchical scope timing system for SweepStore
|
||||
*
|
||||
* Provides microsecond-precision performance profiling for arbitrary scopes
|
||||
* (functions, loops, code blocks) with automatic parent/child relationship
|
||||
* tracking. Outputs console statistics, CSV data, and Chrome Tracing JSON.
|
||||
*
|
||||
* Usage:
|
||||
* #include "sweepstore/utils/timing.h"
|
||||
*
|
||||
* void myFunction() {
|
||||
* SWEEPSTORE_TIME_FUNCTION(); // Times entire function
|
||||
*
|
||||
* {
|
||||
* SWEEPSTORE_TIME_SCOPE("init"); // Times specific block
|
||||
* // initialization code...
|
||||
* }
|
||||
*
|
||||
* for (int i = 0; i < n; i++) {
|
||||
* SWEEPSTORE_TIME_SCOPE("loop_iteration"); // Times each iteration
|
||||
* // loop body...
|
||||
* }
|
||||
* }
|
||||
*
|
||||
* Build with timing enabled:
|
||||
* cmake -DENABLE_TIMING=ON -B build
|
||||
* cmake --build build
|
||||
*
|
||||
* View results:
|
||||
* 1. Console statistics (automatic at program exit)
|
||||
* 2. sweepstore_timing.csv (flat aggregated data)
|
||||
* 3. sweepstore_trace.json (Chrome Tracing format: chrome://tracing)
|
||||
*
|
||||
* Performance:
|
||||
* - Enabled: ~150-300ns overhead per instrumented scope
|
||||
* - Disabled: Zero overhead (macros compile to nothing)
|
||||
*
|
||||
* Thread Safety:
|
||||
* - Completely lock-free during hot paths
|
||||
* - Thread-local storage eliminates contention
|
||||
* - Each thread maintains independent timing tree
|
||||
*/
|
||||
|
||||
#include <cstdint>
|
||||
#include <string>
|
||||
#include <chrono>
|
||||
#include <thread>
|
||||
|
||||
/**
|
||||
* @brief Aggregated timing statistics for a scope
|
||||
*
|
||||
* Stores cumulative statistics across all invocations of a scope.
|
||||
*/
|
||||
struct ScopeTimingStats {
|
||||
uint64_t callCount = 0; ///< Total number of scope invocations
|
||||
uint64_t totalMicros = 0; ///< Cumulative execution time in microseconds
|
||||
uint64_t minMicros = UINT64_MAX; ///< Fastest single execution in microseconds
|
||||
uint64_t maxMicros = 0; ///< Slowest single execution in microseconds
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Single timing event for Chrome Tracing format
|
||||
*
|
||||
* Represents one execution of a scope. Chrome Tracing viewer automatically
|
||||
* builds hierarchical trees from timestamp overlaps.
|
||||
*/
|
||||
struct TimingTraceEvent {
|
||||
std::string name; ///< Scope name (function name or custom label)
|
||||
uint64_t startMicros; ///< Absolute timestamp in microseconds since epoch
|
||||
uint64_t durationMicros; ///< Duration in microseconds
|
||||
uint64_t threadId; ///< Thread ID (unique per thread)
|
||||
uint64_t parentEventId; ///< Parent event ID (0 if root scope)
|
||||
uint64_t eventId; ///< Unique ID for this event
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Internal timing functions (do not call directly)
|
||||
*
|
||||
* These functions are called automatically by the RAII timer class.
|
||||
* Use SWEEPSTORE_TIME_FUNCTION() or SWEEPSTORE_TIME_SCOPE() instead.
|
||||
*/
|
||||
namespace SweepstoreTiming {
|
||||
void recordScopeStart(const char* name, uint64_t eventId);
|
||||
void recordScopeEnd(const char* name, uint64_t eventId, uint64_t startMicros, uint64_t durationMicros, uint64_t threadId);
|
||||
uint64_t getCurrentEventId();
|
||||
uint64_t getNextEventId();
|
||||
uint64_t getParentEventId();
|
||||
uint64_t getThreadId(); // Get unique sequential thread ID
|
||||
void initOutputFile(); // Call once at program start
|
||||
void flushThreadData(); // Call before thread exits to save timing data
|
||||
void finalizeOutputFile(); // Call once at program end
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief RAII timer for automatic scope timing
|
||||
*
|
||||
* This class implements RAII (Resource Acquisition Is Initialization) pattern
|
||||
* for timing. Timer starts in constructor, stops in destructor. Automatically
|
||||
* tracks parent/child relationships via thread-local scope stack.
|
||||
*
|
||||
* DO NOT instantiate directly - use the macros instead:
|
||||
* - SWEEPSTORE_TIME_FUNCTION() for functions
|
||||
* - SWEEPSTORE_TIME_SCOPE(name) for arbitrary scopes
|
||||
*
|
||||
* Thread Safety:
|
||||
* - Thread-local storage ensures zero contention
|
||||
* - Each thread maintains independent scope stack
|
||||
* - Safe for concurrent use across multiple threads
|
||||
*
|
||||
* Performance:
|
||||
* - Constructor: ~5ns
|
||||
* - Destructor: ~150-300ns (includes stats update and trace event recording)
|
||||
*/
|
||||
class SweepstoreScopeTimer {
|
||||
private:
|
||||
const char* scopeName;
|
||||
std::chrono::high_resolution_clock::time_point startTime;
|
||||
uint64_t myEventId;
|
||||
uint64_t parentEventId;
|
||||
|
||||
public:
|
||||
explicit SweepstoreScopeTimer(const char* name)
|
||||
: scopeName(name)
|
||||
, startTime(std::chrono::high_resolution_clock::now())
|
||||
, myEventId(SweepstoreTiming::getNextEventId())
|
||||
, parentEventId(SweepstoreTiming::getParentEventId())
|
||||
{
|
||||
SweepstoreTiming::recordScopeStart(scopeName, myEventId);
|
||||
}
|
||||
|
||||
~SweepstoreScopeTimer() {
|
||||
auto endTime = std::chrono::high_resolution_clock::now();
|
||||
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(endTime - startTime);
|
||||
auto startMicros = std::chrono::duration_cast<std::chrono::microseconds>(
|
||||
startTime.time_since_epoch()
|
||||
).count();
|
||||
|
||||
// Get thread ID (use sequential thread ID instead of hash)
|
||||
uint64_t threadId = SweepstoreTiming::getThreadId();
|
||||
|
||||
SweepstoreTiming::recordScopeEnd(
|
||||
scopeName,
|
||||
myEventId,
|
||||
startMicros,
|
||||
duration.count(),
|
||||
threadId
|
||||
);
|
||||
}
|
||||
|
||||
// Disable copy and move
|
||||
SweepstoreScopeTimer(const SweepstoreScopeTimer&) = delete;
|
||||
SweepstoreScopeTimer& operator=(const SweepstoreScopeTimer&) = delete;
|
||||
SweepstoreScopeTimer(SweepstoreScopeTimer&&) = delete;
|
||||
SweepstoreScopeTimer& operator=(SweepstoreScopeTimer&&) = delete;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Timing instrumentation macros
|
||||
*
|
||||
* These macros provide a simple interface for instrumenting code with timing.
|
||||
* They compile to nothing when SWEEPSTORE_ENABLE_TIMING is 0 (default).
|
||||
*
|
||||
* @def SWEEPSTORE_TIME_FUNCTION()
|
||||
* Times the entire function. Uses __FUNCTION__ for scope name.
|
||||
* Place at the start of the function body.
|
||||
*
|
||||
* Example:
|
||||
* void myFunction() {
|
||||
* SWEEPSTORE_TIME_FUNCTION();
|
||||
* // function body...
|
||||
* }
|
||||
*
|
||||
* @def SWEEPSTORE_TIME_SCOPE(name)
|
||||
* Times an arbitrary scope with a custom name. Useful for:
|
||||
* - Code blocks
|
||||
* - Loop iterations
|
||||
* - Lambda functions
|
||||
* - Critical sections
|
||||
* - Class methods (use "ClassName::methodName" as the name)
|
||||
*
|
||||
* Example:
|
||||
* for (int i = 0; i < n; i++) {
|
||||
* SWEEPSTORE_TIME_SCOPE("loop_iteration");
|
||||
* // loop body...
|
||||
* }
|
||||
*
|
||||
* void MyClass::myMethod() {
|
||||
* SWEEPSTORE_TIME_SCOPE("MyClass::myMethod");
|
||||
* // method body...
|
||||
* }
|
||||
*
|
||||
* auto lambda = [&]() {
|
||||
* SWEEPSTORE_TIME_SCOPE("lambda_processing");
|
||||
* // lambda body...
|
||||
* };
|
||||
*/
|
||||
#ifndef SWEEPSTORE_ENABLE_TIMING
|
||||
#define SWEEPSTORE_ENABLE_TIMING 0
|
||||
#endif
|
||||
|
||||
#if SWEEPSTORE_ENABLE_TIMING
|
||||
#define SWEEPSTORE_TIME_FUNCTION() SweepstoreScopeTimer __sweepstore_timer_##__LINE__(__FUNCTION__)
|
||||
#define SWEEPSTORE_TIME_SCOPE(name) SweepstoreScopeTimer __sweepstore_timer_##__LINE__(name)
|
||||
#else
|
||||
#define SWEEPSTORE_TIME_FUNCTION() ((void)0)
|
||||
#define SWEEPSTORE_TIME_SCOPE(name) ((void)0)
|
||||
#endif
|
||||
|
||||
#endif // SWEEPSTORE_TIMING_H
|
||||
Reference in New Issue
Block a user