From 9f3c63ebff3f1569ecbade481a90c235895a185f Mon Sep 17 00:00:00 2001 From: IntegratedQuantum Date: Tue, 31 Dec 2024 14:53:42 +0100 Subject: [PATCH] Split the server mutex into multiple different mutexes that are used in smaller scopes. fixes #864 Also fixed a data race in the itemdrop manager --- src/Inventory.zig | 2 +- src/gui/windows/debug_network.zig | 4 +- src/gui/windows/debug_network_advanced.zig | 6 +- src/gui/windows/manage_players.zig | 4 - src/itemdrop.zig | 9 +- src/renderer.zig | 2 + src/server/server.zig | 95 +++++++++++++--------- src/server/world.zig | 24 +++--- 8 files changed, 82 insertions(+), 64 deletions(-) diff --git a/src/Inventory.zig b/src/Inventory.zig index c5c8c3f6..a558f62d 100644 --- a/src/Inventory.zig +++ b/src/Inventory.zig @@ -20,7 +20,7 @@ const Side = enum{client, server}; pub const Sync = struct { // MARK: Sync pub const ClientSide = struct { - var mutex: std.Thread.Mutex = .{}; + pub var mutex: std.Thread.Mutex = .{}; var commands: main.utils.CircularBufferQueue(Command) = undefined; var maxId: u32 = 0; var freeIdList: main.List(u32) = undefined; diff --git a/src/gui/windows/debug_network.zig b/src/gui/windows/debug_network.zig index 98522633..3b812546 100644 --- a/src/gui/windows/debug_network.zig +++ b/src/gui/windows/debug_network.zig @@ -28,7 +28,9 @@ pub fn render() void { var y: f32 = 0; if (main.game.world != null) { if(main.server.world != null) { - draw.print("Players Connected: {}", .{main.server.users.items.len}, 0, y, 8, .left); + const userList = main.server.getUserListAndIncreaseRefCount(main.stackAllocator); + defer main.server.freeUserListAndDecreaseRefCount(main.stackAllocator, userList); + draw.print("Players Connected: {}", .{userList.len}, 0, y, 8, .left); y += 8; } const sent = network.Connection.packetsSent.load(.monotonic); diff --git a/src/gui/windows/debug_network_advanced.zig b/src/gui/windows/debug_network_advanced.zig index 447aeb1e..4fe8486d 100644 --- a/src/gui/windows/debug_network_advanced.zig +++ b/src/gui/windows/debug_network_advanced.zig @@ -55,9 +55,9 @@ pub fn render() void { renderConnectionData(main.game.world.?.conn, &y); } if(main.server.world != null) { - main.server.mutex.lock(); - defer main.server.mutex.unlock(); - for(main.server.users.items) |user| { + const userList = main.server.getUserListAndIncreaseRefCount(main.stackAllocator); + defer main.server.freeUserListAndDecreaseRefCount(main.stackAllocator, userList); + for(userList) |user| { draw.print("{s}", .{user.name}, 0, y, 8, .left); y += 8; renderConnectionData(user.conn, &y); diff --git a/src/gui/windows/manage_players.zig b/src/gui/windows/manage_players.zig index 93f982f4..a979f0a7 100644 --- a/src/gui/windows/manage_players.zig +++ b/src/gui/windows/manage_players.zig @@ -28,8 +28,6 @@ fn kick(conn: *main.network.Connection) void { pub fn onOpen() void { const list = VerticalList.init(.{padding, 16 + padding}, 300, 16); { - main.server.mutex.lock(); - defer main.server.mutex.unlock(); main.server.connectionManager.mutex.lock(); defer main.server.connectionManager.mutex.unlock(); std.debug.assert(userList.len == 0); @@ -68,11 +66,9 @@ pub fn onClose() void { } pub fn update() void { - main.server.mutex.lock(); main.server.connectionManager.mutex.lock(); const serverListLen = main.server.connectionManager.connections.items.len; main.server.connectionManager.mutex.unlock(); - main.server.mutex.unlock(); if(userList.len != serverListLen) { std.log.err("{} {}", .{userList.len, serverListLen}); onClose(); diff --git a/src/itemdrop.zig b/src/itemdrop.zig index 15877394..7b6e4bfc 100644 --- a/src/itemdrop.zig +++ b/src/itemdrop.zig @@ -202,7 +202,9 @@ pub const ItemDropManager = struct { // MARK: ItemDropManager } pub fn add(self: *ItemDropManager, pos: Vec3d, vel: Vec3d, rot: Vec3f, itemStack: ItemStack, despawnTime: i32, pickupCooldown: i32) void { - if(self.size == maxCapacity) { + self.emptyMutex.lock(); + const i: u16 = @intCast(self.isEmpty.findFirstSet() orelse { + self.emptyMutex.unlock(); const zon = itemStack.store(main.stackAllocator); defer zon.free(main.stackAllocator); const string = zon.toString(main.stackAllocator); @@ -212,10 +214,7 @@ pub const ItemDropManager = struct { // MARK: ItemDropManager item.deinit(); } return; - } - - self.emptyMutex.lock(); - const i: u16 = @intCast(self.isEmpty.findFirstSet().?); + }); self.isEmpty.unset(i); self.emptyMutex.unlock(); self.changeQueue.enqueue(.{.add = .{i, .{ diff --git a/src/renderer.zig b/src/renderer.zig index 4f2c5c11..6a2bdd01 100644 --- a/src/renderer.zig +++ b/src/renderer.zig @@ -810,7 +810,9 @@ pub const MeshSelection = struct { // MARK: MeshSelection var newBlock = block; // TODO: Breaking animation and tools. const relPos: Vec3f = @floatCast(lastPos - @as(Vec3d, @floatFromInt(selectedPos))); + main.items.Inventory.Sync.ClientSide.mutex.lock(); block.mode().onBlockBreaking(inventory.getStack(slot).item, relPos, lastDir, &newBlock); + main.items.Inventory.Sync.ClientSide.mutex.unlock(); if(!std.meta.eql(newBlock, block)) { updateBlockAndSendUpdate(inventory, slot, selectedPos[0], selectedPos[1], selectedPos[2], block, newBlock); } diff --git a/src/server/server.zig b/src/server/server.zig index 3d11f97a..d05aa459 100644 --- a/src/server/server.zig +++ b/src/server/server.zig @@ -46,6 +46,8 @@ pub const User = struct { // MARK: User refCount: Atomic(u32) = .init(1), + mutex: std.Thread.Mutex = .{}, + pub fn initAndIncreaseRefCount(manager: *ConnectionManager, ipPort: []const u8) !*User { const self = main.globalAllocator.create(User); errdefer main.globalAllocator.destroy(self); @@ -157,7 +159,8 @@ pub const User = struct { // MARK: User } pub fn update(self: *User) void { - main.utils.assertLocked(&mutex); + self.mutex.lock(); + defer self.mutex.unlock(); var time = @as(i16, @truncate(std.time.milliTimestamp())) -% main.settings.entityLookback; time -%= self.timeDifference.difference.load(.monotonic); self.interpolation.update(time, self.lastTime); @@ -166,8 +169,8 @@ pub const User = struct { // MARK: User } pub fn receiveData(self: *User, data: []const u8) void { - mutex.lock(); - defer mutex.unlock(); + self.mutex.lock(); + defer self.mutex.unlock(); const position: [3]f64 = .{ @bitCast(std.mem.readInt(u64, data[0..8], .big)), @bitCast(std.mem.readInt(u64, data[8..16], .big)), @@ -198,17 +201,16 @@ pub const updatesPerSec: u32 = 20; const updateNanoTime: u32 = 1000000000/20; pub var world: ?*ServerWorld = null; -pub var users: main.List(*User) = undefined; -pub var userDeinitList: main.utils.ConcurrentQueue(*User) = undefined; -pub var userConnectList: main.utils.ConcurrentQueue(*User) = undefined; +var userMutex: std.Thread.Mutex = .{}; +var users: main.List(*User) = undefined; +var userDeinitList: main.utils.ConcurrentQueue(*User) = undefined; +var userConnectList: main.utils.ConcurrentQueue(*User) = undefined; pub var connectionManager: *ConnectionManager = undefined; pub var running: std.atomic.Value(bool) = .init(false); var lastTime: i128 = undefined; -pub var mutex: std.Thread.Mutex = .{}; - pub var thread: ?std.Thread = null; fn init(name: []const u8, singlePlayerPort: ?u16) void { // MARK: init() @@ -267,6 +269,23 @@ fn deinit() void { command.deinit(); } +pub fn getUserListAndIncreaseRefCount(allocator: utils.NeverFailingAllocator) []*User { + userMutex.lock(); + defer userMutex.unlock(); + const result = allocator.dupe(*User, users.items); + for(result) |user| { + user.increaseRefCount(); + } + return result; +} + +pub fn freeUserListAndDecreaseRefCount(allocator: utils.NeverFailingAllocator, list: []*User) void { + for(list) |user| { + user.decreaseRefCount(); + } + allocator.free(list); +} + fn sendEntityUpdates(comptime getInitialList: bool, allocator: utils.NeverFailingAllocator) if(getInitialList) []const u8 else void { // Send the entity updates: const updateList = main.ZonElement.initArray(main.stackAllocator); @@ -297,11 +316,11 @@ fn sendEntityUpdates(comptime getInitialList: bool, allocator: utils.NeverFailin itemDropList.free(main.stackAllocator); initialList = list.toStringEfficient(allocator, &.{}); } - mutex.lock(); - for(users.items) |user| { + const userList = getUserListAndIncreaseRefCount(main.stackAllocator); + defer freeUserListAndDecreaseRefCount(main.stackAllocator, userList); + for(userList) |user| { main.network.Protocols.entity.send(user.conn, updateData); } - mutex.unlock(); if(getInitialList) { return initialList; } @@ -309,27 +328,27 @@ fn sendEntityUpdates(comptime getInitialList: bool, allocator: utils.NeverFailin fn update() void { // MARK: update() world.?.update(); - mutex.lock(); - for(users.items) |user| { - user.update(); - } - mutex.unlock(); while(userConnectList.dequeue()) |user| { connectInternal(user); } + const userList = getUserListAndIncreaseRefCount(main.stackAllocator); + defer freeUserListAndDecreaseRefCount(main.stackAllocator, userList); + for(userList) |user| { + user.update(); + } + sendEntityUpdates(false, main.stackAllocator); // Send the entity data: - const data = main.stackAllocator.alloc(u8, (4 + 24 + 12 + 24)*users.items.len); + const data = main.stackAllocator.alloc(u8, (4 + 24 + 12 + 24)*userList.len); defer main.stackAllocator.free(data); const itemData = world.?.itemDropManager.getPositionAndVelocityData(main.stackAllocator); defer main.stackAllocator.free(itemData); var remaining = data; - mutex.lock(); - for(users.items) |user| { + for(userList) |user| { const id = user.id; // TODO std.mem.writeInt(u32, remaining[0..4], id, .big); remaining = remaining[4..]; @@ -345,12 +364,10 @@ fn update() void { // MARK: update() std.mem.writeInt(u64, remaining[16..24], @bitCast(user.player.vel[2]), .big); remaining = remaining[24..]; } - for(users.items) |user| { + for(userList) |user| { main.network.Protocols.entityPosition.send(user.conn, data, itemData); } - mutex.unlock(); - while(userDeinitList.dequeue()) |user| { user.decreaseRefCount(); } @@ -393,15 +410,16 @@ pub fn removePlayer(user: *User) void { // MARK: removePlayer() if(!user.connected.load(.unordered)) return; const message = std.fmt.allocPrint(main.stackAllocator.allocator, "{s}§#ffff00 left", .{user.name}) catch unreachable; defer main.stackAllocator.free(message); - mutex.lock(); - defer mutex.unlock(); + userMutex.lock(); for(users.items, 0..) |other, i| { if(other == user) { _ = users.swapRemove(i); break; } } + userMutex.unlock(); + sendMessage(message); // Let the other clients know about that this new one left. const zonArray = main.ZonElement.initArray(main.stackAllocator); @@ -409,7 +427,9 @@ pub fn removePlayer(user: *User) void { // MARK: removePlayer() zonArray.array.append(.{.int = user.id}); const data = zonArray.toStringEfficient(main.stackAllocator, &.{}); defer main.stackAllocator.free(data); - for(users.items) |other| { + const userList = getUserListAndIncreaseRefCount(main.stackAllocator); + defer freeUserListAndDecreaseRefCount(main.stackAllocator, userList); + for(userList) |other| { main.network.Protocols.entity.send(other.conn, data); } } @@ -423,6 +443,8 @@ pub fn connectInternal(user: *User) void { // TODO: addEntity(player); user.id = freeId; freeId += 1; + const userList = getUserListAndIncreaseRefCount(main.stackAllocator); + defer freeUserListAndDecreaseRefCount(main.stackAllocator, userList); // Let the other clients know about this new one. { const zonArray = main.ZonElement.initArray(main.stackAllocator); @@ -433,38 +455,33 @@ pub fn connectInternal(user: *User) void { zonArray.array.append(entityZon); const data = zonArray.toStringEfficient(main.stackAllocator, &.{}); defer main.stackAllocator.free(data); - mutex.lock(); - defer mutex.unlock(); - for(users.items) |other| { + for(userList) |other| { main.network.Protocols.entity.send(other.conn, data); } } { // Let this client know about the others: const zonArray = main.ZonElement.initArray(main.stackAllocator); defer zonArray.free(main.stackAllocator); - mutex.lock(); - for(users.items) |other| { + for(userList) |other| { const entityZon = main.ZonElement.initObject(main.stackAllocator); entityZon.put("id", other.id); entityZon.put("name", other.name); zonArray.array.append(entityZon); } - mutex.unlock(); const data = zonArray.toStringEfficient(main.stackAllocator, &.{}); defer main.stackAllocator.free(data); if(user.connected.load(.unordered)) main.network.Protocols.entity.send(user.conn, data); - } const initialList = sendEntityUpdates(true, main.stackAllocator); main.network.Protocols.entity.send(user.conn, initialList); main.stackAllocator.free(initialList); const message = std.fmt.allocPrint(main.stackAllocator.allocator, "{s}§#ffff00 joined", .{user.name}) catch unreachable; defer main.stackAllocator.free(message); - mutex.lock(); - defer mutex.unlock(); sendMessage(message); + userMutex.lock(); users.append(user); + userMutex.unlock(); } pub fn messageFrom(msg: []const u8, source: *User) void { // MARK: message @@ -474,16 +491,18 @@ pub fn messageFrom(msg: []const u8, source: *User) void { // MARK: message } else { const newMessage = std.fmt.allocPrint(main.stackAllocator.allocator, "[{s}§#ffffff] {s}", .{source.name, msg}) catch unreachable; defer main.stackAllocator.free(newMessage); - main.server.mutex.lock(); - defer main.server.mutex.unlock(); main.server.sendMessage(newMessage); } } +var chatMutex: std.Thread.Mutex = .{}; pub fn sendMessage(msg: []const u8) void { - main.utils.assertLocked(&mutex); + chatMutex.lock(); + defer chatMutex.unlock(); std.log.info("Chat: {s}", .{msg}); // TODO use color \033[0;32m - for(users.items) |user| { + const userList = getUserListAndIncreaseRefCount(main.stackAllocator); + defer freeUserListAndDecreaseRefCount(main.stackAllocator, userList); + for(userList) |user| { user.sendMessage(msg); } } \ No newline at end of file diff --git a/src/server/world.zig b/src/server/world.zig index 2f256b2c..6c7cdf1e 100644 --- a/src/server/world.zig +++ b/src/server/world.zig @@ -229,9 +229,9 @@ const ChunkManager = struct { // MARK: ChunkManager if(self.source) |source| { if(source.connected.load(.unordered)) main.network.Protocols.lightMapTransmission.sendLightMap(source.conn, map); } else { - server.mutex.lock(); - defer server.mutex.unlock(); - for(server.users.items) |user| { + const userList = server.getUserListAndIncreaseRefCount(main.stackAllocator); + defer server.freeUserListAndDecreaseRefCount(main.stackAllocator, userList); + for(userList) |user| { main.network.Protocols.lightMapTransmission.sendLightMap(user.conn, map); } } @@ -815,9 +815,9 @@ pub const ServerWorld = struct { // MARK: ServerWorld } if(self.lastUnimportantDataSent + 2000 < newTime) {// Send unimportant data every ~2s. self.lastUnimportantDataSent = newTime; - server.mutex.lock(); - defer server.mutex.unlock(); - for(server.users.items) |user| { + const userList = server.getUserListAndIncreaseRefCount(main.stackAllocator); + defer server.freeUserListAndDecreaseRefCount(main.stackAllocator, userList); + for(userList) |user| { main.network.Protocols.genericUpdate.sendTimeAndBiome(user.conn, self); } } @@ -826,9 +826,9 @@ pub const ServerWorld = struct { // MARK: ServerWorld // Item Entities self.itemDropManager.update(deltaTime); { // Collect item entities: - server.mutex.lock(); - defer server.mutex.unlock(); - for(server.users.items) |user| { + const userList = server.getUserListAndIncreaseRefCount(main.stackAllocator); + defer server.freeUserListAndDecreaseRefCount(main.stackAllocator, userList); + for(userList) |user| { self.itemDropManager.checkEntity(user); } } @@ -945,9 +945,9 @@ pub const ServerWorld = struct { // MARK: ServerWorld baseChunk.mutex.lock(); defer baseChunk.mutex.unlock(); baseChunk.updateBlockAndSetChanged(x, y, z, newBlock); - server.mutex.lock(); - defer server.mutex.unlock(); - for(main.server.users.items) |user| { + const userList = server.getUserListAndIncreaseRefCount(main.stackAllocator); + defer server.freeUserListAndDecreaseRefCount(main.stackAllocator, userList); + for(userList) |user| { main.network.Protocols.blockUpdate.send(user.conn, wx, wy, wz, _newBlock); } return null;