Prevent the game from opening multiple instances of region files.

Also made the ThreadPool task count more accurate.

fixes #770
This commit is contained in:
IntegratedQuantum 2024-11-12 22:23:39 +01:00
parent e6db1ed81b
commit 60b91bfdb8
3 changed files with 68 additions and 29 deletions

View File

@ -40,7 +40,7 @@ pub threadlocal var seed: u64 = undefined;
var global_gpa = std.heap.GeneralPurposeAllocator(.{.thread_safe=true}){}; var global_gpa = std.heap.GeneralPurposeAllocator(.{.thread_safe=true}){};
var handled_gpa = utils.ErrorHandlingAllocator.init(global_gpa.allocator()); var handled_gpa = utils.ErrorHandlingAllocator.init(global_gpa.allocator());
pub const globalAllocator: utils.NeverFailingAllocator = handled_gpa.allocator(); pub const globalAllocator: utils.NeverFailingAllocator = handled_gpa.allocator();
pub var threadPool: utils.ThreadPool = undefined; pub var threadPool: *utils.ThreadPool = undefined;
fn cacheStringImpl(comptime len: usize, comptime str: [len]u8) []const u8 { fn cacheStringImpl(comptime len: usize, comptime str: [len]u8) []const u8 {
return str[0..len]; return str[0..len];

View File

@ -18,6 +18,7 @@ pub const RegionFile = struct { // MARK: RegionFile
mutex: std.Thread.Mutex = .{}, mutex: std.Thread.Mutex = .{},
modified: bool = false, modified: bool = false,
refCount: Atomic(u16) = .init(1), refCount: Atomic(u16) = .init(1),
storedInHashMap: bool = false,
saveFolder: []const u8, saveFolder: []const u8,
pub fn getIndex(x: usize, y: usize, z: usize) usize { pub fn getIndex(x: usize, y: usize, z: usize) usize {
@ -101,6 +102,8 @@ pub const RegionFile = struct { // MARK: RegionFile
self.store(); self.store();
} }
self.deinit(); self.deinit();
} else if(prevVal == 2) {
tryHashmapDeinit(self);
} }
} }
@ -179,19 +182,60 @@ pub const RegionFile = struct { // MARK: RegionFile
const cacheSize = 1 << 8; // Must be a power of 2! const cacheSize = 1 << 8; // Must be a power of 2!
const cacheMask = cacheSize - 1; const cacheMask = cacheSize - 1;
const associativity = 8; const associativity = 8;
var cache: main.utils.Cache(RegionFile, cacheSize, associativity, RegionFile.decreaseRefCount) = .{}; var cache: main.utils.Cache(RegionFile, cacheSize, associativity, cacheDeinit) = .{};
const HashContext = struct {
pub fn hash(_: HashContext, a: chunk.ChunkPosition) u64 {
return a.hashCode();
}
pub fn eql(_: HashContext, a: chunk.ChunkPosition, b: chunk.ChunkPosition) bool {
return std.meta.eql(a, b);
}
};
var stillUsedHashMap: std.HashMap(chunk.ChunkPosition, *RegionFile, HashContext, 50) = undefined;
var hashMapMutex: std.Thread.Mutex = .{};
fn cacheDeinit(region: *RegionFile) void {
if(region.refCount.load(.monotonic) != 1) { // Someone else might still use it, so we store it in the hashmap.
hashMapMutex.lock();
defer hashMapMutex.unlock();
region.storedInHashMap = true;
stillUsedHashMap.put(region.pos, region) catch unreachable;
} else {
region.decreaseRefCount();
}
}
fn cacheInit(pos: chunk.ChunkPosition) *RegionFile { fn cacheInit(pos: chunk.ChunkPosition) *RegionFile {
hashMapMutex.lock();
if(stillUsedHashMap.fetchRemove(pos)) |kv| {
const region = kv.value;
region.storedInHashMap = false;
hashMapMutex.unlock();
return region;
}
hashMapMutex.unlock();
const path: []const u8 = std.fmt.allocPrint(main.stackAllocator.allocator, "saves/{s}/chunks", .{server.world.?.name}) catch unreachable; const path: []const u8 = std.fmt.allocPrint(main.stackAllocator.allocator, "saves/{s}/chunks", .{server.world.?.name}) catch unreachable;
defer main.stackAllocator.free(path); defer main.stackAllocator.free(path);
return RegionFile.init(pos, path); return RegionFile.init(pos, path);
} }
fn tryHashmapDeinit(region: *RegionFile) void {
{
hashMapMutex.lock();
defer hashMapMutex.unlock();
if(!region.storedInHashMap) return;
std.debug.assert(stillUsedHashMap.fetchRemove(region.pos).?.value == region);
region.storedInHashMap = false;
}
std.debug.assert(region.refCount.load(.unordered) == 1);
region.decreaseRefCount();
}
pub fn init() void { pub fn init() void {
stillUsedHashMap = .init(main.globalAllocator.allocator);
} }
pub fn deinit() void { pub fn deinit() void {
cache.clear(); cache.clear();
stillUsedHashMap.deinit();
} }
pub fn loadRegionFileAndIncreaseRefCount(wx: i32, wy: i32, wz: i32, voxelSize: u31) *RegionFile { pub fn loadRegionFileAndIncreaseRefCount(wx: i32, wy: i32, wz: i32, voxelSize: u31) *RegionFile {

View File

@ -862,16 +862,14 @@ pub fn BlockingMaxHeap(comptime T: type) type { // MARK: BlockingMaxHeap
allocator: NeverFailingAllocator, allocator: NeverFailingAllocator,
closed: bool = false, closed: bool = false,
pub fn init(allocator: NeverFailingAllocator) *@This() { pub fn init(allocator: NeverFailingAllocator) @This() {
const self = allocator.create(@This()); return .{
self.* = @This() {
.size = 0, .size = 0,
.array = allocator.alloc(T, initialSize), .array = allocator.alloc(T, initialSize),
.waitingThreads = .{}, .waitingThreads = .{},
.mutex = .{}, .mutex = .{},
.allocator = allocator, .allocator = allocator,
}; };
return self;
} }
pub fn deinit(self: *@This()) void { pub fn deinit(self: *@This()) void {
@ -886,7 +884,6 @@ pub fn BlockingMaxHeap(comptime T: type) type { // MARK: BlockingMaxHeap
} }
self.mutex.unlock(); self.mutex.unlock();
self.allocator.free(self.array); self.allocator.free(self.array);
self.allocator.destroy(self);
} }
/// Moves an element from a given index down the heap, such that all children are always smaller than their parents. /// Moves an element from a given index down the heap, such that all children are always smaller than their parents.
@ -1056,13 +1053,6 @@ pub const ThreadPool = struct { // MARK: ThreadPool
} }
} }
fn init(allocator: NeverFailingAllocator) *Performance {
const self = allocator.create(Performance);
self.* = .{};
self.clear();
return self;
}
pub fn read(self: *Performance) Performance { pub fn read(self: *Performance) Performance {
self.mutex.lock(); self.mutex.lock();
defer self.mutex.unlock(); defer self.mutex.unlock();
@ -1073,19 +1063,23 @@ pub const ThreadPool = struct { // MARK: ThreadPool
threads: []std.Thread, threads: []std.Thread,
currentTasks: []Atomic(?*const VTable), currentTasks: []Atomic(?*const VTable),
loadList: *BlockingMaxHeap(Task), loadList: BlockingMaxHeap(Task),
allocator: NeverFailingAllocator, allocator: NeverFailingAllocator,
performance: *Performance, performance: Performance,
pub fn init(allocator: NeverFailingAllocator, threadCount: usize) ThreadPool { trueQueueSize: Atomic(usize) = .init(0),
const self = ThreadPool {
pub fn init(allocator: NeverFailingAllocator, threadCount: usize) *ThreadPool {
const self = allocator.create(ThreadPool);
self.* = .{
.threads = allocator.alloc(std.Thread, threadCount), .threads = allocator.alloc(std.Thread, threadCount),
.currentTasks = allocator.alloc(Atomic(?*const VTable), threadCount), .currentTasks = allocator.alloc(Atomic(?*const VTable), threadCount),
.loadList = BlockingMaxHeap(Task).init(allocator), .loadList = BlockingMaxHeap(Task).init(allocator),
.performance = Performance.init(allocator), .performance = .{},
.allocator = allocator, .allocator = allocator,
}; };
self.performance.clear();
for(self.threads, 0..) |*thread, i| { for(self.threads, 0..) |*thread, i| {
thread.* = std.Thread.spawn(.{}, run, .{self, i}) catch |err| { thread.* = std.Thread.spawn(.{}, run, .{self, i}) catch |err| {
std.log.err("Could not spawn Thread due to {s}", .{@errorName(err)}); std.log.err("Could not spawn Thread due to {s}", .{@errorName(err)});
@ -1097,7 +1091,7 @@ pub const ThreadPool = struct { // MARK: ThreadPool
return self; return self;
} }
pub fn deinit(self: ThreadPool) void { pub fn deinit(self: *ThreadPool) void {
// Clear the remaining tasks: // Clear the remaining tasks:
self.loadList.mutex.lock(); self.loadList.mutex.lock();
for(self.loadList.array[0..self.loadList.size]) |task| { for(self.loadList.array[0..self.loadList.size]) |task| {
@ -1111,10 +1105,10 @@ pub const ThreadPool = struct { // MARK: ThreadPool
} }
self.allocator.free(self.currentTasks); self.allocator.free(self.currentTasks);
self.allocator.free(self.threads); self.allocator.free(self.threads);
self.allocator.destroy(self.performance); self.allocator.destroy(self);
} }
pub fn closeAllTasksOfType(self: ThreadPool, vtable: *const VTable) void { pub fn closeAllTasksOfType(self: *ThreadPool, vtable: *const VTable) void {
self.loadList.mutex.lock(); self.loadList.mutex.lock();
defer self.loadList.mutex.unlock(); defer self.loadList.mutex.unlock();
var i: u32 = 0; var i: u32 = 0;
@ -1135,7 +1129,7 @@ pub const ThreadPool = struct { // MARK: ThreadPool
} }
} }
fn run(self: ThreadPool, id: usize) void { fn run(self: *ThreadPool, id: usize) void {
// In case any of the tasks wants to allocate memory: // In case any of the tasks wants to allocate memory:
var sta = StackAllocator.init(main.globalAllocator, 1 << 23); var sta = StackAllocator.init(main.globalAllocator, 1 << 23);
defer sta.deinit(); defer sta.deinit();
@ -1151,6 +1145,7 @@ pub const ThreadPool = struct { // MARK: ThreadPool
const end = std.time.microTimestamp(); const end = std.time.microTimestamp();
self.performance.add(task.vtable.taskType, end - start); self.performance.add(task.vtable.taskType, end - start);
self.currentTasks[id].store(null, .monotonic); self.currentTasks[id].store(null, .monotonic);
_ = self.trueQueueSize.fetchSub(1, .monotonic);
} }
if(id == 0 and std.time.milliTimestamp() -% lastUpdate > refreshTime) { if(id == 0 and std.time.milliTimestamp() -% lastUpdate > refreshTime) {
@ -1159,6 +1154,7 @@ pub const ThreadPool = struct { // MARK: ThreadPool
while(self.loadList.extractAny()) |task| { while(self.loadList.extractAny()) |task| {
if(!task.vtable.isStillNeeded(task.self)) { if(!task.vtable.isStillNeeded(task.self)) {
task.vtable.clean(task.self); task.vtable.clean(task.self);
_ = self.trueQueueSize.fetchSub(1, .monotonic);
} else { } else {
const taskPtr = temporaryTaskList.addOne(); const taskPtr = temporaryTaskList.addOne();
taskPtr.* = task; taskPtr.* = task;
@ -1171,15 +1167,16 @@ pub const ThreadPool = struct { // MARK: ThreadPool
} }
} }
pub fn addTask(self: ThreadPool, task: *anyopaque, vtable: *const VTable) void { pub fn addTask(self: *ThreadPool, task: *anyopaque, vtable: *const VTable) void {
self.loadList.add(Task { self.loadList.add(Task {
.cachedPriority = vtable.getPriority(task), .cachedPriority = vtable.getPriority(task),
.vtable = vtable, .vtable = vtable,
.self = task, .self = task,
}); });
_ = self.trueQueueSize.fetchAdd(1, .monotonic);
} }
pub fn clear(self: ThreadPool) void { pub fn clear(self: *ThreadPool) void {
// Clear the remaining tasks: // Clear the remaining tasks:
self.loadList.mutex.lock(); self.loadList.mutex.lock();
for(self.loadList.array[0..self.loadList.size]) |task| { for(self.loadList.array[0..self.loadList.size]) |task| {
@ -1199,10 +1196,8 @@ pub const ThreadPool = struct { // MARK: ThreadPool
} }
} }
pub fn queueSize(self: ThreadPool) usize { pub fn queueSize(self: *const ThreadPool) usize {
self.loadList.mutex.lock(); return self.trueQueueSize.load(.monotonic);
defer self.loadList.mutex.unlock();
return self.loadList.size;
} }
}; };