Split the server mutex into multiple different mutexes that are used in smaller scopes.

fixes #864
Also fixed a data race in the itemdrop manager
This commit is contained in:
IntegratedQuantum 2024-12-31 14:53:42 +01:00
parent 47ed704e49
commit 9f3c63ebff
8 changed files with 82 additions and 64 deletions

View File

@ -20,7 +20,7 @@ const Side = enum{client, server};
pub const Sync = struct { // MARK: Sync pub const Sync = struct { // MARK: Sync
pub const ClientSide = struct { pub const ClientSide = struct {
var mutex: std.Thread.Mutex = .{}; pub var mutex: std.Thread.Mutex = .{};
var commands: main.utils.CircularBufferQueue(Command) = undefined; var commands: main.utils.CircularBufferQueue(Command) = undefined;
var maxId: u32 = 0; var maxId: u32 = 0;
var freeIdList: main.List(u32) = undefined; var freeIdList: main.List(u32) = undefined;

View File

@ -28,7 +28,9 @@ pub fn render() void {
var y: f32 = 0; var y: f32 = 0;
if (main.game.world != null) { if (main.game.world != null) {
if(main.server.world != null) { if(main.server.world != null) {
draw.print("Players Connected: {}", .{main.server.users.items.len}, 0, y, 8, .left); const userList = main.server.getUserListAndIncreaseRefCount(main.stackAllocator);
defer main.server.freeUserListAndDecreaseRefCount(main.stackAllocator, userList);
draw.print("Players Connected: {}", .{userList.len}, 0, y, 8, .left);
y += 8; y += 8;
} }
const sent = network.Connection.packetsSent.load(.monotonic); const sent = network.Connection.packetsSent.load(.monotonic);

View File

@ -55,9 +55,9 @@ pub fn render() void {
renderConnectionData(main.game.world.?.conn, &y); renderConnectionData(main.game.world.?.conn, &y);
} }
if(main.server.world != null) { if(main.server.world != null) {
main.server.mutex.lock(); const userList = main.server.getUserListAndIncreaseRefCount(main.stackAllocator);
defer main.server.mutex.unlock(); defer main.server.freeUserListAndDecreaseRefCount(main.stackAllocator, userList);
for(main.server.users.items) |user| { for(userList) |user| {
draw.print("{s}", .{user.name}, 0, y, 8, .left); draw.print("{s}", .{user.name}, 0, y, 8, .left);
y += 8; y += 8;
renderConnectionData(user.conn, &y); renderConnectionData(user.conn, &y);

View File

@ -28,8 +28,6 @@ fn kick(conn: *main.network.Connection) void {
pub fn onOpen() void { pub fn onOpen() void {
const list = VerticalList.init(.{padding, 16 + padding}, 300, 16); const list = VerticalList.init(.{padding, 16 + padding}, 300, 16);
{ {
main.server.mutex.lock();
defer main.server.mutex.unlock();
main.server.connectionManager.mutex.lock(); main.server.connectionManager.mutex.lock();
defer main.server.connectionManager.mutex.unlock(); defer main.server.connectionManager.mutex.unlock();
std.debug.assert(userList.len == 0); std.debug.assert(userList.len == 0);
@ -68,11 +66,9 @@ pub fn onClose() void {
} }
pub fn update() void { pub fn update() void {
main.server.mutex.lock();
main.server.connectionManager.mutex.lock(); main.server.connectionManager.mutex.lock();
const serverListLen = main.server.connectionManager.connections.items.len; const serverListLen = main.server.connectionManager.connections.items.len;
main.server.connectionManager.mutex.unlock(); main.server.connectionManager.mutex.unlock();
main.server.mutex.unlock();
if(userList.len != serverListLen) { if(userList.len != serverListLen) {
std.log.err("{} {}", .{userList.len, serverListLen}); std.log.err("{} {}", .{userList.len, serverListLen});
onClose(); onClose();

View File

@ -202,7 +202,9 @@ pub const ItemDropManager = struct { // MARK: ItemDropManager
} }
pub fn add(self: *ItemDropManager, pos: Vec3d, vel: Vec3d, rot: Vec3f, itemStack: ItemStack, despawnTime: i32, pickupCooldown: i32) void { pub fn add(self: *ItemDropManager, pos: Vec3d, vel: Vec3d, rot: Vec3f, itemStack: ItemStack, despawnTime: i32, pickupCooldown: i32) void {
if(self.size == maxCapacity) { self.emptyMutex.lock();
const i: u16 = @intCast(self.isEmpty.findFirstSet() orelse {
self.emptyMutex.unlock();
const zon = itemStack.store(main.stackAllocator); const zon = itemStack.store(main.stackAllocator);
defer zon.free(main.stackAllocator); defer zon.free(main.stackAllocator);
const string = zon.toString(main.stackAllocator); const string = zon.toString(main.stackAllocator);
@ -212,10 +214,7 @@ pub const ItemDropManager = struct { // MARK: ItemDropManager
item.deinit(); item.deinit();
} }
return; return;
} });
self.emptyMutex.lock();
const i: u16 = @intCast(self.isEmpty.findFirstSet().?);
self.isEmpty.unset(i); self.isEmpty.unset(i);
self.emptyMutex.unlock(); self.emptyMutex.unlock();
self.changeQueue.enqueue(.{.add = .{i, .{ self.changeQueue.enqueue(.{.add = .{i, .{

View File

@ -810,7 +810,9 @@ pub const MeshSelection = struct { // MARK: MeshSelection
var newBlock = block; var newBlock = block;
// TODO: Breaking animation and tools. // TODO: Breaking animation and tools.
const relPos: Vec3f = @floatCast(lastPos - @as(Vec3d, @floatFromInt(selectedPos))); const relPos: Vec3f = @floatCast(lastPos - @as(Vec3d, @floatFromInt(selectedPos)));
main.items.Inventory.Sync.ClientSide.mutex.lock();
block.mode().onBlockBreaking(inventory.getStack(slot).item, relPos, lastDir, &newBlock); block.mode().onBlockBreaking(inventory.getStack(slot).item, relPos, lastDir, &newBlock);
main.items.Inventory.Sync.ClientSide.mutex.unlock();
if(!std.meta.eql(newBlock, block)) { if(!std.meta.eql(newBlock, block)) {
updateBlockAndSendUpdate(inventory, slot, selectedPos[0], selectedPos[1], selectedPos[2], block, newBlock); updateBlockAndSendUpdate(inventory, slot, selectedPos[0], selectedPos[1], selectedPos[2], block, newBlock);
} }

View File

@ -46,6 +46,8 @@ pub const User = struct { // MARK: User
refCount: Atomic(u32) = .init(1), refCount: Atomic(u32) = .init(1),
mutex: std.Thread.Mutex = .{},
pub fn initAndIncreaseRefCount(manager: *ConnectionManager, ipPort: []const u8) !*User { pub fn initAndIncreaseRefCount(manager: *ConnectionManager, ipPort: []const u8) !*User {
const self = main.globalAllocator.create(User); const self = main.globalAllocator.create(User);
errdefer main.globalAllocator.destroy(self); errdefer main.globalAllocator.destroy(self);
@ -157,7 +159,8 @@ pub const User = struct { // MARK: User
} }
pub fn update(self: *User) void { pub fn update(self: *User) void {
main.utils.assertLocked(&mutex); self.mutex.lock();
defer self.mutex.unlock();
var time = @as(i16, @truncate(std.time.milliTimestamp())) -% main.settings.entityLookback; var time = @as(i16, @truncate(std.time.milliTimestamp())) -% main.settings.entityLookback;
time -%= self.timeDifference.difference.load(.monotonic); time -%= self.timeDifference.difference.load(.monotonic);
self.interpolation.update(time, self.lastTime); self.interpolation.update(time, self.lastTime);
@ -166,8 +169,8 @@ pub const User = struct { // MARK: User
} }
pub fn receiveData(self: *User, data: []const u8) void { pub fn receiveData(self: *User, data: []const u8) void {
mutex.lock(); self.mutex.lock();
defer mutex.unlock(); defer self.mutex.unlock();
const position: [3]f64 = .{ const position: [3]f64 = .{
@bitCast(std.mem.readInt(u64, data[0..8], .big)), @bitCast(std.mem.readInt(u64, data[0..8], .big)),
@bitCast(std.mem.readInt(u64, data[8..16], .big)), @bitCast(std.mem.readInt(u64, data[8..16], .big)),
@ -198,17 +201,16 @@ pub const updatesPerSec: u32 = 20;
const updateNanoTime: u32 = 1000000000/20; const updateNanoTime: u32 = 1000000000/20;
pub var world: ?*ServerWorld = null; pub var world: ?*ServerWorld = null;
pub var users: main.List(*User) = undefined; var userMutex: std.Thread.Mutex = .{};
pub var userDeinitList: main.utils.ConcurrentQueue(*User) = undefined; var users: main.List(*User) = undefined;
pub var userConnectList: main.utils.ConcurrentQueue(*User) = undefined; var userDeinitList: main.utils.ConcurrentQueue(*User) = undefined;
var userConnectList: main.utils.ConcurrentQueue(*User) = undefined;
pub var connectionManager: *ConnectionManager = undefined; pub var connectionManager: *ConnectionManager = undefined;
pub var running: std.atomic.Value(bool) = .init(false); pub var running: std.atomic.Value(bool) = .init(false);
var lastTime: i128 = undefined; var lastTime: i128 = undefined;
pub var mutex: std.Thread.Mutex = .{};
pub var thread: ?std.Thread = null; pub var thread: ?std.Thread = null;
fn init(name: []const u8, singlePlayerPort: ?u16) void { // MARK: init() fn init(name: []const u8, singlePlayerPort: ?u16) void { // MARK: init()
@ -267,6 +269,23 @@ fn deinit() void {
command.deinit(); command.deinit();
} }
pub fn getUserListAndIncreaseRefCount(allocator: utils.NeverFailingAllocator) []*User {
userMutex.lock();
defer userMutex.unlock();
const result = allocator.dupe(*User, users.items);
for(result) |user| {
user.increaseRefCount();
}
return result;
}
pub fn freeUserListAndDecreaseRefCount(allocator: utils.NeverFailingAllocator, list: []*User) void {
for(list) |user| {
user.decreaseRefCount();
}
allocator.free(list);
}
fn sendEntityUpdates(comptime getInitialList: bool, allocator: utils.NeverFailingAllocator) if(getInitialList) []const u8 else void { fn sendEntityUpdates(comptime getInitialList: bool, allocator: utils.NeverFailingAllocator) if(getInitialList) []const u8 else void {
// Send the entity updates: // Send the entity updates:
const updateList = main.ZonElement.initArray(main.stackAllocator); const updateList = main.ZonElement.initArray(main.stackAllocator);
@ -297,11 +316,11 @@ fn sendEntityUpdates(comptime getInitialList: bool, allocator: utils.NeverFailin
itemDropList.free(main.stackAllocator); itemDropList.free(main.stackAllocator);
initialList = list.toStringEfficient(allocator, &.{}); initialList = list.toStringEfficient(allocator, &.{});
} }
mutex.lock(); const userList = getUserListAndIncreaseRefCount(main.stackAllocator);
for(users.items) |user| { defer freeUserListAndDecreaseRefCount(main.stackAllocator, userList);
for(userList) |user| {
main.network.Protocols.entity.send(user.conn, updateData); main.network.Protocols.entity.send(user.conn, updateData);
} }
mutex.unlock();
if(getInitialList) { if(getInitialList) {
return initialList; return initialList;
} }
@ -309,27 +328,27 @@ fn sendEntityUpdates(comptime getInitialList: bool, allocator: utils.NeverFailin
fn update() void { // MARK: update() fn update() void { // MARK: update()
world.?.update(); world.?.update();
mutex.lock();
for(users.items) |user| {
user.update();
}
mutex.unlock();
while(userConnectList.dequeue()) |user| { while(userConnectList.dequeue()) |user| {
connectInternal(user); connectInternal(user);
} }
const userList = getUserListAndIncreaseRefCount(main.stackAllocator);
defer freeUserListAndDecreaseRefCount(main.stackAllocator, userList);
for(userList) |user| {
user.update();
}
sendEntityUpdates(false, main.stackAllocator); sendEntityUpdates(false, main.stackAllocator);
// Send the entity data: // Send the entity data:
const data = main.stackAllocator.alloc(u8, (4 + 24 + 12 + 24)*users.items.len); const data = main.stackAllocator.alloc(u8, (4 + 24 + 12 + 24)*userList.len);
defer main.stackAllocator.free(data); defer main.stackAllocator.free(data);
const itemData = world.?.itemDropManager.getPositionAndVelocityData(main.stackAllocator); const itemData = world.?.itemDropManager.getPositionAndVelocityData(main.stackAllocator);
defer main.stackAllocator.free(itemData); defer main.stackAllocator.free(itemData);
var remaining = data; var remaining = data;
mutex.lock(); for(userList) |user| {
for(users.items) |user| {
const id = user.id; // TODO const id = user.id; // TODO
std.mem.writeInt(u32, remaining[0..4], id, .big); std.mem.writeInt(u32, remaining[0..4], id, .big);
remaining = remaining[4..]; remaining = remaining[4..];
@ -345,12 +364,10 @@ fn update() void { // MARK: update()
std.mem.writeInt(u64, remaining[16..24], @bitCast(user.player.vel[2]), .big); std.mem.writeInt(u64, remaining[16..24], @bitCast(user.player.vel[2]), .big);
remaining = remaining[24..]; remaining = remaining[24..];
} }
for(users.items) |user| { for(userList) |user| {
main.network.Protocols.entityPosition.send(user.conn, data, itemData); main.network.Protocols.entityPosition.send(user.conn, data, itemData);
} }
mutex.unlock();
while(userDeinitList.dequeue()) |user| { while(userDeinitList.dequeue()) |user| {
user.decreaseRefCount(); user.decreaseRefCount();
} }
@ -393,15 +410,16 @@ pub fn removePlayer(user: *User) void { // MARK: removePlayer()
if(!user.connected.load(.unordered)) return; if(!user.connected.load(.unordered)) return;
const message = std.fmt.allocPrint(main.stackAllocator.allocator, "{s}§#ffff00 left", .{user.name}) catch unreachable; const message = std.fmt.allocPrint(main.stackAllocator.allocator, "{s}§#ffff00 left", .{user.name}) catch unreachable;
defer main.stackAllocator.free(message); defer main.stackAllocator.free(message);
mutex.lock();
defer mutex.unlock();
userMutex.lock();
for(users.items, 0..) |other, i| { for(users.items, 0..) |other, i| {
if(other == user) { if(other == user) {
_ = users.swapRemove(i); _ = users.swapRemove(i);
break; break;
} }
} }
userMutex.unlock();
sendMessage(message); sendMessage(message);
// Let the other clients know about that this new one left. // Let the other clients know about that this new one left.
const zonArray = main.ZonElement.initArray(main.stackAllocator); const zonArray = main.ZonElement.initArray(main.stackAllocator);
@ -409,7 +427,9 @@ pub fn removePlayer(user: *User) void { // MARK: removePlayer()
zonArray.array.append(.{.int = user.id}); zonArray.array.append(.{.int = user.id});
const data = zonArray.toStringEfficient(main.stackAllocator, &.{}); const data = zonArray.toStringEfficient(main.stackAllocator, &.{});
defer main.stackAllocator.free(data); defer main.stackAllocator.free(data);
for(users.items) |other| { const userList = getUserListAndIncreaseRefCount(main.stackAllocator);
defer freeUserListAndDecreaseRefCount(main.stackAllocator, userList);
for(userList) |other| {
main.network.Protocols.entity.send(other.conn, data); main.network.Protocols.entity.send(other.conn, data);
} }
} }
@ -423,6 +443,8 @@ pub fn connectInternal(user: *User) void {
// TODO: addEntity(player); // TODO: addEntity(player);
user.id = freeId; user.id = freeId;
freeId += 1; freeId += 1;
const userList = getUserListAndIncreaseRefCount(main.stackAllocator);
defer freeUserListAndDecreaseRefCount(main.stackAllocator, userList);
// Let the other clients know about this new one. // Let the other clients know about this new one.
{ {
const zonArray = main.ZonElement.initArray(main.stackAllocator); const zonArray = main.ZonElement.initArray(main.stackAllocator);
@ -433,38 +455,33 @@ pub fn connectInternal(user: *User) void {
zonArray.array.append(entityZon); zonArray.array.append(entityZon);
const data = zonArray.toStringEfficient(main.stackAllocator, &.{}); const data = zonArray.toStringEfficient(main.stackAllocator, &.{});
defer main.stackAllocator.free(data); defer main.stackAllocator.free(data);
mutex.lock(); for(userList) |other| {
defer mutex.unlock();
for(users.items) |other| {
main.network.Protocols.entity.send(other.conn, data); main.network.Protocols.entity.send(other.conn, data);
} }
} }
{ // Let this client know about the others: { // Let this client know about the others:
const zonArray = main.ZonElement.initArray(main.stackAllocator); const zonArray = main.ZonElement.initArray(main.stackAllocator);
defer zonArray.free(main.stackAllocator); defer zonArray.free(main.stackAllocator);
mutex.lock(); for(userList) |other| {
for(users.items) |other| {
const entityZon = main.ZonElement.initObject(main.stackAllocator); const entityZon = main.ZonElement.initObject(main.stackAllocator);
entityZon.put("id", other.id); entityZon.put("id", other.id);
entityZon.put("name", other.name); entityZon.put("name", other.name);
zonArray.array.append(entityZon); zonArray.array.append(entityZon);
} }
mutex.unlock();
const data = zonArray.toStringEfficient(main.stackAllocator, &.{}); const data = zonArray.toStringEfficient(main.stackAllocator, &.{});
defer main.stackAllocator.free(data); defer main.stackAllocator.free(data);
if(user.connected.load(.unordered)) main.network.Protocols.entity.send(user.conn, data); if(user.connected.load(.unordered)) main.network.Protocols.entity.send(user.conn, data);
} }
const initialList = sendEntityUpdates(true, main.stackAllocator); const initialList = sendEntityUpdates(true, main.stackAllocator);
main.network.Protocols.entity.send(user.conn, initialList); main.network.Protocols.entity.send(user.conn, initialList);
main.stackAllocator.free(initialList); main.stackAllocator.free(initialList);
const message = std.fmt.allocPrint(main.stackAllocator.allocator, "{s}§#ffff00 joined", .{user.name}) catch unreachable; const message = std.fmt.allocPrint(main.stackAllocator.allocator, "{s}§#ffff00 joined", .{user.name}) catch unreachable;
defer main.stackAllocator.free(message); defer main.stackAllocator.free(message);
mutex.lock();
defer mutex.unlock();
sendMessage(message); sendMessage(message);
userMutex.lock();
users.append(user); users.append(user);
userMutex.unlock();
} }
pub fn messageFrom(msg: []const u8, source: *User) void { // MARK: message pub fn messageFrom(msg: []const u8, source: *User) void { // MARK: message
@ -474,16 +491,18 @@ pub fn messageFrom(msg: []const u8, source: *User) void { // MARK: message
} else { } else {
const newMessage = std.fmt.allocPrint(main.stackAllocator.allocator, "[{s}§#ffffff] {s}", .{source.name, msg}) catch unreachable; const newMessage = std.fmt.allocPrint(main.stackAllocator.allocator, "[{s}§#ffffff] {s}", .{source.name, msg}) catch unreachable;
defer main.stackAllocator.free(newMessage); defer main.stackAllocator.free(newMessage);
main.server.mutex.lock();
defer main.server.mutex.unlock();
main.server.sendMessage(newMessage); main.server.sendMessage(newMessage);
} }
} }
var chatMutex: std.Thread.Mutex = .{};
pub fn sendMessage(msg: []const u8) void { pub fn sendMessage(msg: []const u8) void {
main.utils.assertLocked(&mutex); chatMutex.lock();
defer chatMutex.unlock();
std.log.info("Chat: {s}", .{msg}); // TODO use color \033[0;32m std.log.info("Chat: {s}", .{msg}); // TODO use color \033[0;32m
for(users.items) |user| { const userList = getUserListAndIncreaseRefCount(main.stackAllocator);
defer freeUserListAndDecreaseRefCount(main.stackAllocator, userList);
for(userList) |user| {
user.sendMessage(msg); user.sendMessage(msg);
} }
} }

View File

@ -229,9 +229,9 @@ const ChunkManager = struct { // MARK: ChunkManager
if(self.source) |source| { if(self.source) |source| {
if(source.connected.load(.unordered)) main.network.Protocols.lightMapTransmission.sendLightMap(source.conn, map); if(source.connected.load(.unordered)) main.network.Protocols.lightMapTransmission.sendLightMap(source.conn, map);
} else { } else {
server.mutex.lock(); const userList = server.getUserListAndIncreaseRefCount(main.stackAllocator);
defer server.mutex.unlock(); defer server.freeUserListAndDecreaseRefCount(main.stackAllocator, userList);
for(server.users.items) |user| { for(userList) |user| {
main.network.Protocols.lightMapTransmission.sendLightMap(user.conn, map); main.network.Protocols.lightMapTransmission.sendLightMap(user.conn, map);
} }
} }
@ -815,9 +815,9 @@ pub const ServerWorld = struct { // MARK: ServerWorld
} }
if(self.lastUnimportantDataSent + 2000 < newTime) {// Send unimportant data every ~2s. if(self.lastUnimportantDataSent + 2000 < newTime) {// Send unimportant data every ~2s.
self.lastUnimportantDataSent = newTime; self.lastUnimportantDataSent = newTime;
server.mutex.lock(); const userList = server.getUserListAndIncreaseRefCount(main.stackAllocator);
defer server.mutex.unlock(); defer server.freeUserListAndDecreaseRefCount(main.stackAllocator, userList);
for(server.users.items) |user| { for(userList) |user| {
main.network.Protocols.genericUpdate.sendTimeAndBiome(user.conn, self); main.network.Protocols.genericUpdate.sendTimeAndBiome(user.conn, self);
} }
} }
@ -826,9 +826,9 @@ pub const ServerWorld = struct { // MARK: ServerWorld
// Item Entities // Item Entities
self.itemDropManager.update(deltaTime); self.itemDropManager.update(deltaTime);
{ // Collect item entities: { // Collect item entities:
server.mutex.lock(); const userList = server.getUserListAndIncreaseRefCount(main.stackAllocator);
defer server.mutex.unlock(); defer server.freeUserListAndDecreaseRefCount(main.stackAllocator, userList);
for(server.users.items) |user| { for(userList) |user| {
self.itemDropManager.checkEntity(user); self.itemDropManager.checkEntity(user);
} }
} }
@ -945,9 +945,9 @@ pub const ServerWorld = struct { // MARK: ServerWorld
baseChunk.mutex.lock(); baseChunk.mutex.lock();
defer baseChunk.mutex.unlock(); defer baseChunk.mutex.unlock();
baseChunk.updateBlockAndSetChanged(x, y, z, newBlock); baseChunk.updateBlockAndSetChanged(x, y, z, newBlock);
server.mutex.lock(); const userList = server.getUserListAndIncreaseRefCount(main.stackAllocator);
defer server.mutex.unlock(); defer server.freeUserListAndDecreaseRefCount(main.stackAllocator, userList);
for(main.server.users.items) |user| { for(userList) |user| {
main.network.Protocols.blockUpdate.send(user.conn, wx, wy, wz, _newBlock); main.network.Protocols.blockUpdate.send(user.conn, wx, wy, wz, _newBlock);
} }
return null; return null;