diff --git a/src/main.zig b/src/main.zig index e3300e34..3eac9a38 100644 --- a/src/main.zig +++ b/src/main.zig @@ -40,7 +40,7 @@ pub threadlocal var seed: u64 = undefined; var global_gpa = std.heap.GeneralPurposeAllocator(.{.thread_safe=true}){}; var handled_gpa = utils.ErrorHandlingAllocator.init(global_gpa.allocator()); pub const globalAllocator: utils.NeverFailingAllocator = handled_gpa.allocator(); -pub var threadPool: utils.ThreadPool = undefined; +pub var threadPool: *utils.ThreadPool = undefined; fn cacheStringImpl(comptime len: usize, comptime str: [len]u8) []const u8 { return str[0..len]; diff --git a/src/server/storage.zig b/src/server/storage.zig index 3e667f27..46145856 100644 --- a/src/server/storage.zig +++ b/src/server/storage.zig @@ -18,6 +18,7 @@ pub const RegionFile = struct { // MARK: RegionFile mutex: std.Thread.Mutex = .{}, modified: bool = false, refCount: Atomic(u16) = .init(1), + storedInHashMap: bool = false, saveFolder: []const u8, pub fn getIndex(x: usize, y: usize, z: usize) usize { @@ -101,6 +102,8 @@ pub const RegionFile = struct { // MARK: RegionFile self.store(); } self.deinit(); + } else if(prevVal == 2) { + tryHashmapDeinit(self); } } @@ -179,19 +182,60 @@ pub const RegionFile = struct { // MARK: RegionFile const cacheSize = 1 << 8; // Must be a power of 2! const cacheMask = cacheSize - 1; const associativity = 8; -var cache: main.utils.Cache(RegionFile, cacheSize, associativity, RegionFile.decreaseRefCount) = .{}; +var cache: main.utils.Cache(RegionFile, cacheSize, associativity, cacheDeinit) = .{}; +const HashContext = struct { + pub fn hash(_: HashContext, a: chunk.ChunkPosition) u64 { + return a.hashCode(); + } + pub fn eql(_: HashContext, a: chunk.ChunkPosition, b: chunk.ChunkPosition) bool { + return std.meta.eql(a, b); + } +}; +var stillUsedHashMap: std.HashMap(chunk.ChunkPosition, *RegionFile, HashContext, 50) = undefined; +var hashMapMutex: std.Thread.Mutex = .{}; +fn cacheDeinit(region: *RegionFile) void { + if(region.refCount.load(.monotonic) != 1) { // Someone else might still use it, so we store it in the hashmap. + hashMapMutex.lock(); + defer hashMapMutex.unlock(); + region.storedInHashMap = true; + stillUsedHashMap.put(region.pos, region) catch unreachable; + } else { + region.decreaseRefCount(); + } +} fn cacheInit(pos: chunk.ChunkPosition) *RegionFile { + hashMapMutex.lock(); + if(stillUsedHashMap.fetchRemove(pos)) |kv| { + const region = kv.value; + region.storedInHashMap = false; + hashMapMutex.unlock(); + return region; + } + hashMapMutex.unlock(); const path: []const u8 = std.fmt.allocPrint(main.stackAllocator.allocator, "saves/{s}/chunks", .{server.world.?.name}) catch unreachable; defer main.stackAllocator.free(path); return RegionFile.init(pos, path); } +fn tryHashmapDeinit(region: *RegionFile) void { + { + hashMapMutex.lock(); + defer hashMapMutex.unlock(); + if(!region.storedInHashMap) return; + std.debug.assert(stillUsedHashMap.fetchRemove(region.pos).?.value == region); + region.storedInHashMap = false; + } + std.debug.assert(region.refCount.load(.unordered) == 1); + region.decreaseRefCount(); +} pub fn init() void { + stillUsedHashMap = .init(main.globalAllocator.allocator); } pub fn deinit() void { cache.clear(); + stillUsedHashMap.deinit(); } pub fn loadRegionFileAndIncreaseRefCount(wx: i32, wy: i32, wz: i32, voxelSize: u31) *RegionFile { diff --git a/src/utils.zig b/src/utils.zig index 21a78bad..83d6cd35 100644 --- a/src/utils.zig +++ b/src/utils.zig @@ -862,16 +862,14 @@ pub fn BlockingMaxHeap(comptime T: type) type { // MARK: BlockingMaxHeap allocator: NeverFailingAllocator, closed: bool = false, - pub fn init(allocator: NeverFailingAllocator) *@This() { - const self = allocator.create(@This()); - self.* = @This() { + pub fn init(allocator: NeverFailingAllocator) @This() { + return .{ .size = 0, .array = allocator.alloc(T, initialSize), .waitingThreads = .{}, .mutex = .{}, .allocator = allocator, }; - return self; } pub fn deinit(self: *@This()) void { @@ -886,7 +884,6 @@ pub fn BlockingMaxHeap(comptime T: type) type { // MARK: BlockingMaxHeap } self.mutex.unlock(); self.allocator.free(self.array); - self.allocator.destroy(self); } /// Moves an element from a given index down the heap, such that all children are always smaller than their parents. @@ -1056,13 +1053,6 @@ pub const ThreadPool = struct { // MARK: ThreadPool } } - fn init(allocator: NeverFailingAllocator) *Performance { - const self = allocator.create(Performance); - self.* = .{}; - self.clear(); - return self; - } - pub fn read(self: *Performance) Performance { self.mutex.lock(); defer self.mutex.unlock(); @@ -1073,19 +1063,23 @@ pub const ThreadPool = struct { // MARK: ThreadPool threads: []std.Thread, currentTasks: []Atomic(?*const VTable), - loadList: *BlockingMaxHeap(Task), + loadList: BlockingMaxHeap(Task), allocator: NeverFailingAllocator, - performance: *Performance, + performance: Performance, - pub fn init(allocator: NeverFailingAllocator, threadCount: usize) ThreadPool { - const self = ThreadPool { + trueQueueSize: Atomic(usize) = .init(0), + + pub fn init(allocator: NeverFailingAllocator, threadCount: usize) *ThreadPool { + const self = allocator.create(ThreadPool); + self.* = .{ .threads = allocator.alloc(std.Thread, threadCount), .currentTasks = allocator.alloc(Atomic(?*const VTable), threadCount), .loadList = BlockingMaxHeap(Task).init(allocator), - .performance = Performance.init(allocator), + .performance = .{}, .allocator = allocator, }; + self.performance.clear(); for(self.threads, 0..) |*thread, i| { thread.* = std.Thread.spawn(.{}, run, .{self, i}) catch |err| { std.log.err("Could not spawn Thread due to {s}", .{@errorName(err)}); @@ -1097,7 +1091,7 @@ pub const ThreadPool = struct { // MARK: ThreadPool return self; } - pub fn deinit(self: ThreadPool) void { + pub fn deinit(self: *ThreadPool) void { // Clear the remaining tasks: self.loadList.mutex.lock(); for(self.loadList.array[0..self.loadList.size]) |task| { @@ -1111,10 +1105,10 @@ pub const ThreadPool = struct { // MARK: ThreadPool } self.allocator.free(self.currentTasks); self.allocator.free(self.threads); - self.allocator.destroy(self.performance); + self.allocator.destroy(self); } - pub fn closeAllTasksOfType(self: ThreadPool, vtable: *const VTable) void { + pub fn closeAllTasksOfType(self: *ThreadPool, vtable: *const VTable) void { self.loadList.mutex.lock(); defer self.loadList.mutex.unlock(); var i: u32 = 0; @@ -1135,7 +1129,7 @@ pub const ThreadPool = struct { // MARK: ThreadPool } } - fn run(self: ThreadPool, id: usize) void { + fn run(self: *ThreadPool, id: usize) void { // In case any of the tasks wants to allocate memory: var sta = StackAllocator.init(main.globalAllocator, 1 << 23); defer sta.deinit(); @@ -1151,6 +1145,7 @@ pub const ThreadPool = struct { // MARK: ThreadPool const end = std.time.microTimestamp(); self.performance.add(task.vtable.taskType, end - start); self.currentTasks[id].store(null, .monotonic); + _ = self.trueQueueSize.fetchSub(1, .monotonic); } if(id == 0 and std.time.milliTimestamp() -% lastUpdate > refreshTime) { @@ -1159,6 +1154,7 @@ pub const ThreadPool = struct { // MARK: ThreadPool while(self.loadList.extractAny()) |task| { if(!task.vtable.isStillNeeded(task.self)) { task.vtable.clean(task.self); + _ = self.trueQueueSize.fetchSub(1, .monotonic); } else { const taskPtr = temporaryTaskList.addOne(); taskPtr.* = task; @@ -1171,15 +1167,16 @@ pub const ThreadPool = struct { // MARK: ThreadPool } } - pub fn addTask(self: ThreadPool, task: *anyopaque, vtable: *const VTable) void { + pub fn addTask(self: *ThreadPool, task: *anyopaque, vtable: *const VTable) void { self.loadList.add(Task { .cachedPriority = vtable.getPriority(task), .vtable = vtable, .self = task, }); + _ = self.trueQueueSize.fetchAdd(1, .monotonic); } - pub fn clear(self: ThreadPool) void { + pub fn clear(self: *ThreadPool) void { // Clear the remaining tasks: self.loadList.mutex.lock(); for(self.loadList.array[0..self.loadList.size]) |task| { @@ -1199,10 +1196,8 @@ pub const ThreadPool = struct { // MARK: ThreadPool } } - pub fn queueSize(self: ThreadPool) usize { - self.loadList.mutex.lock(); - defer self.loadList.mutex.unlock(); - return self.loadList.size; + pub fn queueSize(self: *const ThreadPool) usize { + return self.trueQueueSize.load(.monotonic); } };