Refactor benchmark configuration and improve file handling with byte-range locks
This commit is contained in:
@@ -15,6 +15,7 @@
|
||||
#include <condition_variable>
|
||||
#include <atomic>
|
||||
#include <vector>
|
||||
#include <memory>
|
||||
#include <cstdlib>
|
||||
|
||||
#include "sweepstore/utils/helpers.h"
|
||||
@@ -26,8 +27,8 @@
|
||||
// BENCHMARK CONFIGURATION
|
||||
// ============================================================================
|
||||
const int SWEEPSTORE_CONCURRENT_WORKERS = 32; // Number of concurrent workers for Sweepstore
|
||||
const int WORKER_THREAD_COUNT = 32; // Number of worker threads in the benchmark pool
|
||||
const int BENCHMARK_ITERATIONS = 16; // Number of benchmark iterations
|
||||
const int WORKER_THREAD_COUNT = 8; // Number of worker threads in the benchmark pool
|
||||
const int BENCHMARK_ITERATIONS = 10; // Number of benchmark iterations
|
||||
const int INITIAL_CONCURRENT_WORKERS = 1; // Starting number of concurrent operations (doubles each iteration)
|
||||
const int ITERATION_DELAY_MS = 200; // Delay between iterations in milliseconds
|
||||
const int INITIAL_SLEEP_MS = 1000; // Initial sleep before benchmark starts
|
||||
@@ -41,15 +42,16 @@ int main() {
|
||||
|
||||
std::string filePath = "./example.bin";
|
||||
|
||||
Sweepstore sweepstore(filePath);
|
||||
sweepstore.initialise(SWEEPSTORE_CONCURRENT_WORKERS);
|
||||
// Use unique_ptr to control destruction timing
|
||||
auto sweepstore = std::make_unique<Sweepstore>(filePath);
|
||||
sweepstore->initialise(SWEEPSTORE_CONCURRENT_WORKERS);
|
||||
|
||||
preciseSleep(std::chrono::milliseconds(INITIAL_SLEEP_MS));
|
||||
|
||||
std::vector<uint8_t> fileData = loadFile(filePath);
|
||||
std::cout << binaryDump(fileData) << std::endl;
|
||||
|
||||
std::cout << "Concurrent Workers: " << sweepstore.getConcurrencyHeader()->readNumberOfWorkers() << std::endl;
|
||||
std::cout << "Concurrent Workers: " << sweepstore->getConcurrencyHeader()->readNumberOfWorkers() << std::endl;
|
||||
std::cout << "Stale Ticket Threshold: " << STALE_HEARTBEAT_THRESHOLD_MS << std::endl;
|
||||
|
||||
SweepstoreConcurrency::initialiseMasterAsync(filePath);
|
||||
@@ -107,7 +109,7 @@ int main() {
|
||||
std::unique_lock<std::mutex> lock(queueMutex);
|
||||
for (int i = 0; i < concurrencyTest; i++) {
|
||||
taskQueue.push([i, &sweepstore, &completedJobs]() {
|
||||
sweepstore["key_" + std::to_string(i)] = "value_" + std::to_string(i);
|
||||
(*sweepstore)["key_" + std::to_string(i)] = "value_" + std::to_string(i);
|
||||
++completedJobs;
|
||||
});
|
||||
}
|
||||
@@ -141,8 +143,8 @@ int main() {
|
||||
worker.join();
|
||||
}
|
||||
|
||||
// Flush main thread timing data and finalize output file
|
||||
std::cout << "Main thread flushing..." << std::endl;
|
||||
// Write timing data NOW while everything is still valid
|
||||
std::cout << "Flushing timing data..." << std::endl;
|
||||
std::cout.flush();
|
||||
SweepstoreTiming::flushThreadData();
|
||||
|
||||
@@ -150,12 +152,9 @@ int main() {
|
||||
std::cout.flush();
|
||||
SweepstoreTiming::finalizeOutputFile();
|
||||
|
||||
std::cout << "Exiting..." << std::endl;
|
||||
std::cout << "Benchmark complete." << std::endl;
|
||||
std::cout.flush();
|
||||
|
||||
// Small delay to ensure all file I/O completes
|
||||
preciseSleep(std::chrono::milliseconds(100));
|
||||
|
||||
// Exit immediately without running destructors to avoid crashes
|
||||
std::_Exit(0);
|
||||
// Exit immediately to avoid thread-local destructors conflicting with detached master thread
|
||||
_Exit(0);
|
||||
}
|
||||
@@ -155,9 +155,6 @@ void SweepstoreWorkerTicket::write(SweepstoreWorkerTicketSnapshot &snapshot) {
|
||||
SWEEPSTORE_TIME_FUNCTION();
|
||||
RandomAccessMemory buffer;
|
||||
|
||||
|
||||
|
||||
|
||||
buffer.setPositionSync(0);
|
||||
buffer.writeIntSync(snapshot.identifier, 4);
|
||||
buffer.writeIntSync(snapshot.workerHeartbeat, 4);
|
||||
@@ -177,9 +174,7 @@ void SweepstoreWorkerTicket::write(SweepstoreWorkerTicketSnapshot &snapshot) {
|
||||
std::vector<uint8_t> data = buffer.readSync(buffer.length());
|
||||
char* dataPtr = reinterpret_cast<char*>(data.data());
|
||||
|
||||
// Write to file with byte-range lock (allows parallel access to different tickets)
|
||||
SweepstoreFileLock lock(file.getPath(), getOffset(), TICKET_SIZE, SweepstoreFileLock::Mode::Exclusive);
|
||||
SweepstoreFileLock::Scoped scopedLock(lock);
|
||||
// Write to file (byte-range locking handled automatically by seekAndWrite)
|
||||
file.seekAndWrite(getOffset(), dataPtr, data.size());
|
||||
file.flush();
|
||||
}
|
||||
@@ -193,10 +188,7 @@ bool SweepstoreWorkerTicket::writable() {
|
||||
SweepstoreWorkerTicketSnapshot SweepstoreWorkerTicket::snapshot() {
|
||||
SWEEPSTORE_TIME_FUNCTION();
|
||||
|
||||
// Byte-range lock for this ticket (allows parallel access to different tickets)
|
||||
SweepstoreFileLock lock(file.getPath(), getOffset(), TICKET_SIZE, SweepstoreFileLock::Mode::Shared);
|
||||
SweepstoreFileLock::Scoped scopedLock(lock);
|
||||
|
||||
// Byte-range locking handled automatically by seekAndRead
|
||||
std::unique_ptr<char[]> buffer(new char[TICKET_SIZE]);
|
||||
file.seekAndRead(getOffset(), buffer.get(), TICKET_SIZE);
|
||||
RandomAccessMemory ram(reinterpret_cast<uint8_t*>(buffer.get()), TICKET_SIZE);
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
#include "sweepstore/utils/file_handle.h"
|
||||
#include "sweepstore/utils/file_lock.h"
|
||||
#include "sweepstore/utils/timing.h"
|
||||
#include <iostream>
|
||||
|
||||
@@ -70,10 +71,17 @@ void SweepstoreFileHandle::seekAndRead(uint64_t offset, char* buffer, size_t siz
|
||||
unrealHandle->Seek(offset);
|
||||
unrealHandle->Read(reinterpret_cast<uint8*>(buffer), size);
|
||||
#else
|
||||
std::lock_guard<std::mutex> lock(streamMutex);
|
||||
stream.seekg(offset, std::ios::beg);
|
||||
if (stream.fail()) stream.clear();
|
||||
stream.read(buffer, size);
|
||||
// Acquire byte-range lock (allows parallel access to different byte ranges)
|
||||
SweepstoreFileLock rangeLock(path, offset, size, SweepstoreFileLock::Mode::Shared);
|
||||
rangeLock.lock();
|
||||
|
||||
// Brief stream mutex only during actual I/O
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(streamMutex);
|
||||
stream.seekg(offset, std::ios::beg);
|
||||
if (stream.fail()) stream.clear();
|
||||
stream.read(buffer, size);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
@@ -85,11 +93,18 @@ void SweepstoreFileHandle::seekAndWrite(uint64_t offset, const char* buffer, siz
|
||||
unrealHandle->Write(reinterpret_cast<const uint8*>(buffer), size);
|
||||
unrealHandle->Flush();
|
||||
#else
|
||||
std::lock_guard<std::mutex> lock(streamMutex);
|
||||
stream.seekp(offset, std::ios::beg);
|
||||
if (stream.fail()) stream.clear();
|
||||
stream.write(buffer, size);
|
||||
stream.flush();
|
||||
// Acquire byte-range lock (allows parallel access to different byte ranges)
|
||||
SweepstoreFileLock rangeLock(path, offset, size, SweepstoreFileLock::Mode::Exclusive);
|
||||
rangeLock.lock();
|
||||
|
||||
// Brief stream mutex only during actual I/O
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(streamMutex);
|
||||
stream.seekp(offset, std::ios::beg);
|
||||
if (stream.fail()) stream.clear();
|
||||
stream.write(buffer, size);
|
||||
stream.flush();
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
|
||||
#include "sweepstore/utils/timing.h"
|
||||
|
||||
#define STALE_HEARTBEAT_THRESHOLD_MS 250
|
||||
#define STALE_HEARTBEAT_THRESHOLD_MS 1000
|
||||
|
||||
enum SweepstoreTicketOperation : int;
|
||||
class SweepstoreFileHandle;
|
||||
|
||||
@@ -55,6 +55,8 @@ public:
|
||||
sizeof(T),
|
||||
[this, key = this->key, &value]() {
|
||||
|
||||
|
||||
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user