diff --git a/src/gui/windows/debug_network.zig b/src/gui/windows/debug_network.zig index 56795f8d..6ea8f573 100644 --- a/src/gui/windows/debug_network.zig +++ b/src/gui/windows/debug_network.zig @@ -38,16 +38,22 @@ pub fn render() void { const loss = @as(f64, @floatFromInt(resent))/@as(f64, @floatFromInt(sent))*100; draw.print("Packet loss: {d:.1}% ({}/{})", .{loss, resent, sent}, 0, y, 8, .left); y += 8; + draw.print("Internal message overhead: {}kiB", .{network.Connection.internalMessageOverhead.load(.monotonic) >> 10}, 0, y, 8, .left); + y += 8; + draw.print("Internal header overhead: {}kiB", .{network.Connection.internalHeaderOverhead.load(.monotonic) >> 10}, 0, y, 8, .left); + y += 8; + draw.print("External header overhead: {}kiB", .{network.Connection.externalHeaderOverhead.load(.monotonic) >> 10}, 0, y, 8, .left); + y += 8; inline for(@typeInfo(network.Protocols).@"struct".decls) |decl| { if(@TypeOf(@field(network.Protocols, decl.name)) == type) { const id = @field(network.Protocols, decl.name).id; - draw.print("{s}: {}kiB in {} packets", .{decl.name, network.bytesReceived[id].load(.monotonic) >> 10, network.packetsReceived[id].load(.monotonic)}, 0, y, 8, .left); + draw.print("{s}: received {}kiB sent {}kiB", .{decl.name, network.Protocols.bytesReceived[id].load(.monotonic) >> 10, network.Protocols.bytesSent[id].load(.monotonic) >> 10}, 0, y, 8, .left); y += 8; } } } - if(window.size[1] != y) { - window.size[1] = y; + if(window.contentSize[1] != y) { + window.contentSize[1] = y; window.updateWindowPosition(); } } diff --git a/src/gui/windows/debug_network_advanced.zig b/src/gui/windows/debug_network_advanced.zig index 43648bec..0a8f09f6 100644 --- a/src/gui/windows/debug_network_advanced.zig +++ b/src/gui/windows/debug_network_advanced.zig @@ -23,26 +23,19 @@ pub var window = GuiWindow{ .hideIfMouseIsGrabbed = false, }; -fn renderConnectionData(conn: *main.network.Connection, y: *f32) void { +fn renderConnectionData(conn: *main.network.Connection, name: []const u8, y: *f32) void { conn.mutex.lock(); defer conn.mutex.unlock(); - var unconfirmed: usize = 0; - for(conn.unconfirmedPackets.items) |packet| { - unconfirmed += packet.data.len; - } - var waiting: usize = 0; - { - var i = conn.packetQueue.startIndex; - while(i != conn.packetQueue.endIndex) : (i = (i + 1) & conn.packetQueue.mask) { - const packet = conn.packetQueue.mem[i]; - waiting += packet.data.len; - } - } - draw.print("Bandwidth: {d:.0} kiB/s", .{1.0e9/conn.congestionControl_inversebandWidth/1024.0}, 0, y.*, 8, .left); + var unconfirmed: [3]usize = @splat(0); + var queued: [3]usize = @splat(0); + conn.lossyChannel.getStatistics(&unconfirmed[0], &queued[0]); + conn.fastChannel.getStatistics(&unconfirmed[1], &queued[1]); + conn.slowChannel.getStatistics(&unconfirmed[2], &queued[2]); + draw.print("{s} | RTT = {d:.1} ms | {d:.0} kiB/RTT", .{name, conn.rttEstimate/1000.0, conn.bandwidthEstimateInBytesPerRtt/1024.0}, 0, y.*, 8, .left); y.* += 8; - draw.print("Waiting in queue: {} kiB", .{waiting >> 10}, 0, y.*, 8, .left); + draw.print("Waiting in queue: {: >6} kiB |{: >6} kiB |{: >6} kiB", .{queued[0] >> 10, queued[1] >> 10, queued[2] >> 10}, 0, y.*, 8, .left); y.* += 8; - draw.print("Sent but not confirmed: {} kiB", .{unconfirmed >> 10}, 0, y.*, 8, .left); + draw.print("Sent but not confirmed:{: >6} kiB |{: >6} kiB |{: >6} kiB", .{unconfirmed[0] >> 10, unconfirmed[1] >> 10, unconfirmed[2] >> 10}, 0, y.*, 8, .left); y.* += 8; } @@ -50,21 +43,18 @@ pub fn render() void { draw.setColor(0xffffffff); var y: f32 = 0; if(main.game.world != null) { - draw.print("Client", .{}, 0, y, 8, .left); - y += 8; - renderConnectionData(main.game.world.?.conn, &y); + renderConnectionData(main.game.world.?.conn, "Client", &y); } + y += 8; if(main.server.world != null) { const userList = main.server.getUserListAndIncreaseRefCount(main.stackAllocator); defer main.server.freeUserListAndDecreaseRefCount(main.stackAllocator, userList); for(userList) |user| { - draw.print("{s}", .{user.name}, 0, y, 8, .left); - y += 8; - renderConnectionData(user.conn, &y); + renderConnectionData(user.conn, user.name, &y); } } - if(window.size[1] != y) { - window.size[1] = y; + if(window.contentSize[1] != y) { + window.contentSize[1] = y; window.updateWindowPosition(); } } diff --git a/src/gui/windows/invite.zig b/src/gui/windows/invite.zig index 1a9f8a04..2ab9b42c 100644 --- a/src/gui/windows/invite.zig +++ b/src/gui/windows/invite.zig @@ -59,18 +59,8 @@ fn copyIp(_: usize) void { main.Window.setClipboardString(ipAddress); } -fn inviteFromExternal(address: main.network.Address) void { - const ip = std.fmt.allocPrint(main.stackAllocator.allocator, "{}", .{address}) catch unreachable; - defer main.stackAllocator.free(ip); - const user = main.server.User.initAndIncreaseRefCount(main.server.connectionManager, ip) catch |err| { - std.log.err("Cannot connect user from external IP {}: {s}", .{address, @errorName(err)}); - return; - }; - user.decreaseRefCount(); -} - fn makePublic(public: bool) void { - main.server.connectionManager.newConnectionCallback.store(if(public) &inviteFromExternal else null, .monotonic); + main.server.connectionManager.allowNewConnections.store(public, .monotonic); } pub fn onOpen() void { @@ -84,7 +74,7 @@ pub fn onOpen() void { list.add(ipAddressEntry); list.add(Button.initText(.{0, 0}, 100, "Invite", .{.callback = &invite})); list.add(Button.initText(.{0, 0}, 100, "Manage Players", gui.openWindowCallback("manage_players"))); - list.add(CheckBox.init(.{0, 0}, width, "Allow anyone to join (requires a publicly visible IP address+port which may need some configuration in your router)", main.server.connectionManager.newConnectionCallback.load(.monotonic) != null, &makePublic)); + list.add(CheckBox.init(.{0, 0}, width, "Allow anyone to join (requires a publicly visible IP address+port which may need some configuration in your router)", main.server.connectionManager.allowNewConnections.load(.monotonic), &makePublic)); list.finish(.center); window.rootComponent = list.toComponent(); window.contentSize = window.rootComponent.?.pos() + window.rootComponent.?.size() + @as(Vec2f, @splat(padding)); diff --git a/src/network.zig b/src/network.zig index 4438f3b4..991349ee 100644 --- a/src/network.zig +++ b/src/network.zig @@ -23,6 +23,11 @@ const NeverFailingAllocator = main.heap.NeverFailingAllocator; //TODO: Might want to use SSL or something similar to encode the message +const ms = 1_000; +inline fn networkTimestamp() i64 { + return std.time.microTimestamp(); +} + const Socket = struct { const posix = std.posix; socketID: posix.socket_t, @@ -368,9 +373,9 @@ pub const ConnectionManager = struct { // MARK: ConnectionManager mutex: std.Thread.Mutex = .{}, waitingToFinishReceive: std.Thread.Condition = std.Thread.Condition{}, - newConnectionCallback: Atomic(?*const fn(Address) void) = .init(null), + allowNewConnections: Atomic(bool) = .init(false), - receiveBuffer: [Connection.maxPacketSize]u8 = undefined, + receiveBuffer: [Connection.maxMtu]u8 = undefined, world: ?*game.World = null, @@ -404,7 +409,7 @@ pub const ConnectionManager = struct { // MARK: ConnectionManager break :blk socket; } else return err; }; - errdefer Socket.deinit(result.socket); + errdefer result.socket.deinit(); if(localPort == 0) result.localPort = try result.socket.getPort(); result.thread = try std.Thread.spawn(.{}, run, .{result}); @@ -422,7 +427,7 @@ pub const ConnectionManager = struct { // MARK: ConnectionManager self.running.store(false, .monotonic); self.thread.join(); - Socket.deinit(self.socket); + self.socket.deinit(); self.connections.deinit(); for(self.requests.items) |request| { request.requestNotifier.signal(); @@ -546,8 +551,16 @@ pub const ConnectionManager = struct { // MARK: ConnectionManager } if(self.online.load(.acquire) and source.ip == self.externalAddress.ip and source.port == self.externalAddress.port) return; } - if(self.newConnectionCallback.load(.monotonic)) |callback| { - callback(source); + if(self.allowNewConnections.load(.monotonic)) { + if(data.len != 0 and data[0] == @intFromEnum(Connection.ChannelId.init)) { + const ip = std.fmt.allocPrint(main.stackAllocator.allocator, "{}", .{source}) catch unreachable; + defer main.stackAllocator.free(ip); + const user = main.server.User.initAndIncreaseRefCount(main.server.connectionManager, ip) catch |err| { + std.log.err("Cannot connect user from external IP {}: {s}", .{source, @errorName(err)}); + return; + }; + user.decreaseRefCount(); + } } else { // TODO: Reduce the number of false alarms in the short period after a disconnect. std.log.warn("Unknown connection from address: {}", .{source}); @@ -560,7 +573,7 @@ pub const ConnectionManager = struct { // MARK: ConnectionManager main.initThreadLocals(); defer main.deinitThreadLocals(); - var lastTime: i64 = @truncate(std.time.nanoTimestamp()); + var lastTime: i64 = networkTimestamp(); while(self.running.load(.monotonic)) { self.waitingToFinishReceive.broadcast(); var source: Address = undefined; @@ -576,7 +589,7 @@ pub const ConnectionManager = struct { // MARK: ConnectionManager @panic("Network failed."); } } - const curTime: i64 = @truncate(std.time.nanoTimestamp()); + const curTime: i64 = networkTimestamp(); { self.mutex.lock(); defer self.mutex.unlock(); @@ -587,29 +600,18 @@ pub const ConnectionManager = struct { // MARK: ConnectionManager } } - // Send a keep-alive packet roughly every 100 ms: - if(curTime -% lastTime > 100_000_000) { + // Send packets roughly every 1 ms: + if(curTime -% lastTime > 1*ms) { lastTime = curTime; var i: u32 = 0; self.mutex.lock(); defer self.mutex.unlock(); while(i < self.connections.items.len) { var conn = self.connections.items[i]; - if(lastTime -% conn.lastConnection > settings.connectionTimeout and conn.isConnected()) { - std.log.info("timeout", .{}); - // Timeout a connection if it was connect at some point. New connections are not timed out because that could annoy players(having to restart the connection several times). - self.mutex.unlock(); - conn.disconnect(); - if(conn.user) |user| { - main.server.disconnect(user); - } - self.mutex.lock(); - } else { - self.mutex.unlock(); - conn.sendKeepAlive(); - self.mutex.lock(); - i += 1; - } + self.mutex.unlock(); + conn.processNextPackets(); + self.mutex.lock(); + i += 1; } if(self.connections.items.len == 0 and self.online.load(.acquire)) { // Send a message to external ip, to keep the port open: @@ -628,11 +630,11 @@ const UnconfirmedPacket = struct { }; // MARK: Protocols -pub var bytesReceived: [256]Atomic(usize) = @splat(.init(0)); -pub var packetsReceived: [256]Atomic(usize) = @splat(.init(0)); pub const Protocols = struct { pub var list: [256]?*const fn(*Connection, *utils.BinaryReader) anyerror!void = @splat(null); pub var isAsynchronous: [256]bool = @splat(false); + pub var bytesReceived: [256]Atomic(usize) = @splat(.init(0)); + pub var bytesSent: [256]Atomic(usize) = @splat(.init(0)); pub const keepAlive: u8 = 0; pub const important: u8 = 0xff; @@ -670,7 +672,7 @@ pub const Protocols = struct { defer arrayList.deinit(); arrayList.append(stepAssets); try utils.Compression.pack(dir, arrayList.writer()); - conn.sendImportant(id, arrayList.items); + conn.send(.fast, id, arrayList.items); } conn.user.?.initPlayer(name); @@ -685,10 +687,7 @@ pub const Protocols = struct { const outData = zonObject.toStringEfficient(main.stackAllocator, &[1]u8{stepServerData}); defer main.stackAllocator.free(outData); - conn.sendImportant(id, outData); - conn.mutex.lock(); - conn.flush(); - conn.mutex.unlock(); + conn.send(.fast, id, outData); conn.handShakeState.store(stepServerData, .monotonic); main.server.connect(conn.user.?); }, @@ -728,10 +727,9 @@ pub const Protocols = struct { const prefix = [1]u8{stepUserData}; const data = zonObject.toStringEfficient(main.stackAllocator, &prefix); defer main.stackAllocator.free(data); - conn.sendImportant(id, data); + conn.send(.fast, id, data); conn.mutex.lock(); - conn.flush(); conn.handShakeWaiting.wait(&conn.mutex); conn.mutex.unlock(); } @@ -781,7 +779,7 @@ pub const Protocols = struct { writer.writeInt(i8, @intCast((req.wz -% (basePosition[2] & positionMask)) >> voxelSizeShift + chunk.chunkShift)); writer.writeInt(u5, voxelSizeShift); } - conn.sendImportant(id, writer.data.items); + conn.send(.fast, id, writer.data.items); // TODO: Can this use the slow channel? } }; pub const chunkTransmission = struct { @@ -810,7 +808,7 @@ pub const Protocols = struct { writer.writeInt(i32, ch.super.pos.wz); writer.writeInt(u31, ch.super.pos.voxelSize); writer.writeSlice(chunkData); - conn.sendImportant(id, writer.data.items); + conn.send(.fast, id, writer.data.items); // TODO: Can this use the slow channel? } fn sendChunkLocally(ch: *chunk.ServerChunk) void { const chunkCopy = chunk.Chunk.init(ch.super.pos); @@ -850,23 +848,7 @@ pub const Protocols = struct { writer.writeInt(u32, @bitCast(game.camera.rotation[1])); writer.writeInt(u32, @bitCast(game.camera.rotation[2])); writer.writeInt(u16, time); - conn.sendUnimportant(id, writer.data.items); - } - }; - pub const disconnect = struct { - pub const id: u8 = 5; - pub const asynchronous = false; - fn receive(conn: *Connection, _: *utils.BinaryReader) !void { - conn.disconnect(); - if(conn.user) |user| { - main.server.disconnect(user); - } else { - main.exitToMenu(undefined); - } - } - pub fn disconnect(conn: *Connection) void { - const noData = [0]u8{}; - conn.sendUnimportant(id, &noData); + conn.send(.lossy, id, writer.data.items); } }; pub const entityPosition = struct { @@ -892,7 +874,7 @@ pub const Protocols = struct { writer.writeInt(u8, type_entity); writer.writeInt(i16, @truncate(std.time.milliTimestamp())); writer.writeSlice(entityData); - conn.sendUnimportant(id, writer.data.items); + conn.send(.lossy, id, writer.data.items); } if(itemData.len != 0) { @@ -901,7 +883,7 @@ pub const Protocols = struct { writer.writeInt(u8, type_item); writer.writeInt(i16, @truncate(std.time.milliTimestamp())); writer.writeSlice(itemData); - conn.sendUnimportant(id, writer.data.items); + conn.send(.lossy, id, writer.data.items); } } }; @@ -926,7 +908,7 @@ pub const Protocols = struct { writer.writeInt(i32, y); writer.writeInt(i32, z); writer.writeInt(u32, newBlock.toInt()); - conn.sendImportant(id, writer.data.items); + conn.send(.fast, id, writer.data.items); } }; pub const entity = struct { @@ -966,7 +948,7 @@ pub const Protocols = struct { } } pub fn send(conn: *Connection, msg: []const u8) void { - conn.sendImportant(id, msg); + conn.send(.fast, id, msg); } }; pub const genericUpdate = struct { @@ -1041,7 +1023,7 @@ pub const Protocols = struct { } pub fn sendGamemode(conn: *Connection, gamemode: main.game.Gamemode) void { - conn.sendImportant(id, &.{@intFromEnum(UpdateType.gamemode), @intFromEnum(gamemode)}); + conn.send(.fast, id, &.{@intFromEnum(UpdateType.gamemode), @intFromEnum(gamemode)}); } pub fn sendTPCoordinates(conn: *Connection, pos: Vec3d) void { @@ -1051,7 +1033,7 @@ pub const Protocols = struct { writer.writeEnum(UpdateType, .teleport); writer.writeVec(Vec3d, pos); - conn.sendImportant(id, writer.data.items); + conn.send(.fast, id, writer.data.items); } pub fn sendWorldEditPos(conn: *Connection, posType: WorldEditPosition, maybePos: ?Vec3i) void { @@ -1064,7 +1046,7 @@ pub const Protocols = struct { writer.writeVec(Vec3i, pos); } - conn.sendImportant(id, writer.data.items); + conn.send(.fast, id, writer.data.items); } pub fn sendTimeAndBiome(conn: *Connection, world: *const main.server.ServerWorld) void { @@ -1077,7 +1059,7 @@ pub const Protocols = struct { const pos = @as(Vec3i, @intFromFloat(conn.user.?.player.pos)); writer.writeInt(u32, world.getBiome(pos[0], pos[1], pos[2]).paletteId); - conn.sendImportant(id, writer.data.items); + conn.send(.fast, id, writer.data.items); } }; pub const chat = struct { @@ -1097,7 +1079,7 @@ pub const Protocols = struct { } pub fn send(conn: *Connection, msg: []const u8) void { - conn.sendImportant(id, msg); + conn.send(.fast, id, msg); } }; pub const lightMapRequest = struct { @@ -1129,7 +1111,7 @@ pub const Protocols = struct { writer.writeInt(i32, req.wy); writer.writeInt(u8, req.voxelSizeShift); } - conn.sendImportant(id, writer.data.items); + conn.send(.fast, id, writer.data.items); // TODO: Can this use the slow channel? } }; pub const lightMapTransmission = struct { @@ -1175,7 +1157,7 @@ pub const Protocols = struct { writer.writeInt(i32, map.pos.wy); writer.writeInt(u8, map.pos.voxelSizeShift); writer.writeSlice(compressedData); - conn.sendImportant(id, writer.data.items); + conn.send(.fast, id, writer.data.items); // TODO: Can this use the slow channel? } }; pub const inventory = struct { @@ -1203,7 +1185,7 @@ pub const Protocols = struct { writer.writeEnum(items.Inventory.Command.PayloadType, payloadType); std.debug.assert(writer.data.items[0] != 0xff); writer.writeSlice(_data); - conn.sendImportant(id, writer.data.items); + conn.send(.fast, id, writer.data.items); } pub fn sendConfirmation(conn: *Connection, _data: []const u8) void { std.debug.assert(conn.user != null); @@ -1211,11 +1193,11 @@ pub const Protocols = struct { defer writer.deinit(); writer.writeInt(u8, 0xff); writer.writeSlice(_data); - conn.sendImportant(id, writer.data.items); + conn.send(.fast, id, writer.data.items); } pub fn sendFailure(conn: *Connection) void { std.debug.assert(conn.user != null); - conn.sendImportant(id, &.{0xfe}); + conn.send(.fast, id, &.{0xfe}); } pub fn sendSyncOperation(conn: *Connection, _data: []const u8) void { std.debug.assert(conn.user != null); @@ -1223,24 +1205,481 @@ pub const Protocols = struct { defer writer.deinit(); writer.writeInt(u8, 0); writer.writeSlice(_data); - conn.sendImportant(id, writer.data.items); + conn.send(.fast, id, writer.data.items); } }; }; pub const Connection = struct { // MARK: Connection - const maxPacketSize: u32 = 65507; // max udp packet size + const maxMtu: u32 = 65507; // max udp packet size const importantHeaderSize: u32 = 5; - const maxImportantPacketSize: u32 = 576 - 20 - 8; // IPv4 MTU minus IP header minus udp header + const minMtu: u32 = 576 - 20 - 8; // IPv4 MTU minus IP header minus udp header const headerOverhead = 20 + 8 + 42; // IP Header + UDP Header + Ethernet header/footer const congestionControl_historySize = 16; const congestionControl_historyMask = congestionControl_historySize - 1; const minimumBandWidth = 10_000; - const timeUnit = 100_000_000; + + const receiveBufferSize = 8 << 20; // Statistics: pub var packetsSent: Atomic(u32) = .init(0); pub var packetsResent: Atomic(u32) = .init(0); + pub var internalMessageOverhead: Atomic(usize) = .init(0); + pub var internalHeaderOverhead: Atomic(usize) = .init(0); + pub var externalHeaderOverhead: Atomic(usize) = .init(0); + + const SequenceIndex = i32; + + const LossStatus = enum { + noLoss, + singleLoss, + doubleLoss, + }; + + const RangeBuffer = struct { // MARK: RangeBuffer + const Range = struct { + start: SequenceIndex, + len: SequenceIndex, + + fn end(self: Range) SequenceIndex { + return self.start +% self.len; + } + }; + ranges: main.ListUnmanaged(Range), + + pub fn init() RangeBuffer { + return .{ + .ranges = .{}, + }; + } + + pub fn clear(self: *RangeBuffer) void { + self.ranges.clearRetainingCapacity(); + } + + pub fn deinit(self: RangeBuffer, allocator: NeverFailingAllocator) void { + self.ranges.deinit(allocator); + } + + pub fn addRange(self: *RangeBuffer, allocator: NeverFailingAllocator, range: Range) void { + if(self.hasRange(range)) return; + var startRange: ?Range = null; + var endRange: ?Range = null; + var i: usize = 0; + while(i < self.ranges.items.len) { + const other = self.ranges.items[i]; + if(range.start -% other.start <= 0 and range.end() -% other.end() >= 0) { + _ = self.ranges.swapRemove(i); + continue; + } + if(range.start -% other.end() <= 0 and range.start -% other.start >= 0) { + _ = self.ranges.swapRemove(i); + startRange = other; + continue; + } + if(range.end() -% other.start >= 0 and range.end() -% other.end() <= 0) { + _ = self.ranges.swapRemove(i); + endRange = other; + continue; + } + i += 1; + } + var mergedRange = range; + if(startRange) |start| { + mergedRange.start = start.start; + mergedRange.len = range.end() -% mergedRange.start; + } + if(endRange) |end| { + mergedRange.len = end.end() -% mergedRange.start; + } + self.ranges.append(allocator, mergedRange); + } + + pub fn hasRange(self: *RangeBuffer, range: Range) bool { + for(self.ranges.items) |other| { + if(range.start -% other.start >= 0 and range.end() -% other.end() <= 0) { + return true; + } + } + return false; + } + + pub fn extractFirstRange(self: *RangeBuffer) ?Range { + if(self.ranges.items.len == 0) return null; + var firstRange = self.ranges.items[0]; + var index: usize = 0; + for(self.ranges.items[1..], 1..) |range, i| { + if(range.start -% firstRange.start < 0) { + firstRange = range; + index = i; + } + } + _ = self.ranges.swapRemove(index); + return firstRange; + } + }; + + const ReceiveBuffer = struct { // MARK: ReceiveBuffer + const Range = struct { + start: SequenceIndex, + len: SequenceIndex, + }; + const Header = struct { + protocolIndex: u8, + size: u32, + }; + ranges: RangeBuffer, + availablePosition: SequenceIndex = undefined, + currentReadPosition: SequenceIndex = undefined, + buffer: main.utils.FixedSizeCircularBuffer(u8, receiveBufferSize), + header: ?Header = null, + protocolBuffer: main.ListUnmanaged(u8) = .{}, + + pub fn init() ReceiveBuffer { + return .{ + .ranges = .init(), + .buffer = .init(main.globalAllocator), + }; + } + + pub fn deinit(self: ReceiveBuffer) void { + self.ranges.deinit(main.globalAllocator); + self.protocolBuffer.deinit(main.globalAllocator); + self.buffer.deinit(main.globalAllocator); + } + + fn applyRanges(self: *ReceiveBuffer) void { + const range = self.ranges.extractFirstRange() orelse unreachable; + std.debug.assert(range.start == self.availablePosition); + self.availablePosition = range.end(); + } + + fn getHeaderInformation(self: *ReceiveBuffer) !?Header { + if(self.currentReadPosition == self.availablePosition) return null; + var header: Header = .{ + .protocolIndex = self.buffer.getAtOffset(0) orelse unreachable, + .size = 0, + }; + var i: u8 = 1; + while(true) : (i += 1) { + if(self.currentReadPosition +% i == self.availablePosition) return null; + const nextByte = self.buffer.getAtOffset(i) orelse unreachable; + header.size = header.size << 7 | (nextByte & 0x7f); + if(nextByte & 0x80 == 0) break; + if(header.size > std.math.maxInt(@TypeOf(header.size)) >> 7) return error.Invalid; + } + self.buffer.discardElements(i + 1); + self.currentReadPosition +%= @intCast(i + 1); + return header; + } + + fn collectRangesAndExecuteProtocols(self: *ReceiveBuffer, conn: *Connection) !void { + self.applyRanges(); + while(true) { + if(self.header == null) { + self.header = try self.getHeaderInformation() orelse return; + self.protocolBuffer.ensureCapacity(main.globalAllocator, self.header.?.size); + } + const amount = @min(@as(usize, @intCast(self.availablePosition -% self.currentReadPosition)), self.header.?.size - self.protocolBuffer.items.len); + if(self.availablePosition -% self.currentReadPosition == 0) return; + + self.buffer.dequeueSlice(self.protocolBuffer.addManyAssumeCapacity(amount)) catch unreachable; + self.currentReadPosition +%= @intCast(amount); + if(self.protocolBuffer.items.len != self.header.?.size) return; + + const protocolIndex = self.header.?.protocolIndex; + self.header = null; + const protocolReceive = Protocols.list[protocolIndex] orelse return error.Invalid; + + if(Protocols.isAsynchronous[protocolIndex]) { + ProtocolTask.schedule(conn, protocolIndex, self.protocolBuffer.items); + } else { + var reader = utils.BinaryReader.init(self.protocolBuffer.items); + try protocolReceive(conn, &reader); + } + + _ = Protocols.bytesReceived[protocolIndex].fetchAdd(self.protocolBuffer.items.len, .monotonic); + self.protocolBuffer.clearRetainingCapacity(); + if(self.protocolBuffer.items.len > 1 << 24) { + self.protocolBuffer.shrinkAndFree(main.globalAllocator, 1 << 24); + } + } + } + + const ReceiveStatus = enum { + accepted, + rejected, + }; + + pub fn receive(self: *ReceiveBuffer, conn: *Connection, start: SequenceIndex, data: []const u8) !ReceiveStatus { + const len: SequenceIndex = @intCast(data.len); + if(start -% self.availablePosition < 0) return .accepted; // We accepted it in the past. + const offset: usize = @intCast(start -% self.currentReadPosition); + self.buffer.insertSliceAtOffset(data, offset) catch return .rejected; + self.ranges.addRange(main.globalAllocator, .{.start = start, .len = len}); + if(start == self.availablePosition) { + try self.collectRangesAndExecuteProtocols(conn); + } + return .accepted; + } + }; + + const SendBuffer = struct { // MARK: SendBuffer + const Range = struct { + start: SequenceIndex, + len: SequenceIndex, + timestamp: i64, + wasResent: bool = false, + wasResentAsFirstPacket: bool = false, + considerForCongestionControl: bool, + + fn compareTime(_: void, a: Range, b: Range) std.math.Order { + if(a.timestamp == b.timestamp) return .eq; + if(a.timestamp -% b.timestamp > 0) return .gt; + return .lt; + } + }; + unconfirmedRanges: std.PriorityQueue(Range, void, Range.compareTime), + lostRanges: main.utils.CircularBufferQueue(Range), + buffer: main.utils.CircularBufferQueue(u8), + fullyConfirmedIndex: SequenceIndex, + highestSentIndex: SequenceIndex, + nextIndex: SequenceIndex, + lastUnsentTime: i64, + + pub fn init(index: SequenceIndex) SendBuffer { + return .{ + .unconfirmedRanges = .init(main.globalAllocator.allocator, {}), + .lostRanges = .init(main.globalAllocator, 1 << 10), + .buffer = .init(main.globalAllocator, 1 << 20), + .fullyConfirmedIndex = index, + .highestSentIndex = index, + .nextIndex = index, + .lastUnsentTime = networkTimestamp(), + }; + } + + pub fn deinit(self: SendBuffer) void { + self.unconfirmedRanges.deinit(); + self.lostRanges.deinit(); + self.buffer.deinit(); + } + + pub fn insertMessage(self: *SendBuffer, protocolIndex: u8, data: []const u8, time: i64) !void { + if(self.highestSentIndex == self.fullyConfirmedIndex) { + self.lastUnsentTime = time; + } + if(data.len + self.buffer.len > std.math.maxInt(SequenceIndex)) return error.OutOfMemory; + self.buffer.enqueue(protocolIndex); + self.nextIndex +%= 1; + _ = internalHeaderOverhead.fetchAdd(1, .monotonic); + const bits = 1 + if(data.len == 0) 0 else std.math.log2_int(usize, data.len); + const bytes = std.math.divCeil(usize, bits, 7) catch unreachable; + for(0..bytes) |i| { + const shift = 7*(bytes - i - 1); + const byte = (data.len >> @intCast(shift) & 0x7f) | if(i == bytes - 1) @as(u8, 0) else 0x80; + self.buffer.enqueue(@intCast(byte)); + self.nextIndex +%= 1; + _ = internalHeaderOverhead.fetchAdd(1, .monotonic); + } + self.buffer.enqueueSlice(data); + self.nextIndex +%= @intCast(data.len); + } + + const ReceiveConfirmationResult = struct { + timestamp: i64, + packetLen: SequenceIndex, + considerForCongestionControl: bool, + }; + + pub fn receiveConfirmationAndGetTimestamp(self: *SendBuffer, start: SequenceIndex) ?ReceiveConfirmationResult { + var result: ?ReceiveConfirmationResult = null; + for(self.unconfirmedRanges.items, 0..) |range, i| { + if(range.start == start) { + result = .{ + .timestamp = range.timestamp, + .considerForCongestionControl = range.considerForCongestionControl, + .packetLen = range.len, + }; + _ = self.unconfirmedRanges.removeIndex(i); + break; + } + } + var smallestUnconfirmed = self.highestSentIndex; + for(self.unconfirmedRanges.items) |range| { + if(smallestUnconfirmed -% range.start > 0) { + smallestUnconfirmed = range.start; + } + } + for(0..self.lostRanges.len) |i| { + const range = self.lostRanges.getAtOffset(i) catch unreachable; + if(smallestUnconfirmed -% range.start > 0) { + smallestUnconfirmed = range.start; + } + } + self.buffer.discard(@intCast(smallestUnconfirmed -% self.fullyConfirmedIndex)) catch unreachable; + self.fullyConfirmedIndex = smallestUnconfirmed; + return result; + } + + pub fn checkForLosses(self: *SendBuffer, time: i64, retransmissionTimeout: i64) LossStatus { + var hadLoss: bool = false; + var hadDoubleLoss: bool = false; + while(true) { + var range = self.unconfirmedRanges.peek() orelse break; + if(range.timestamp +% retransmissionTimeout -% time >= 0) break; + _ = self.unconfirmedRanges.remove(); + if(self.fullyConfirmedIndex == range.start) { + // In TCP effectively only the second loss of the lowest unconfirmed packet is counted for congestion control + // This decreases the chance of triggering congestion control from random packet loss + if(range.wasResentAsFirstPacket) hadDoubleLoss = true; + hadLoss = true; + range.wasResentAsFirstPacket = true; + } + range.wasResent = true; + self.lostRanges.enqueue(range); + _ = packetsResent.fetchAdd(1, .monotonic); + } + if(hadDoubleLoss) return .doubleLoss; + if(hadLoss) return .singleLoss; + return .noLoss; + } + + pub fn getNextPacketToSend(self: *SendBuffer, byteIndex: *SequenceIndex, buf: []u8, time: i64, considerForCongestionControl: bool, allowedDelay: i64) ?usize { + self.unconfirmedRanges.ensureUnusedCapacity(1) catch unreachable; + // Resend old packet: + if(self.lostRanges.dequeue()) |_range| { + var range = _range; + if(range.len > buf.len) { // MTU changed → split the data + self.lostRanges.enqueue_back(.{ + .start = range.start +% @as(SequenceIndex, @intCast(buf.len)), + .len = range.len - @as(SequenceIndex, @intCast(buf.len)), + .timestamp = range.timestamp, + .considerForCongestionControl = range.considerForCongestionControl, + }); + range.len = @intCast(buf.len); + } + + self.buffer.getSliceAtOffset(@intCast(range.start -% self.fullyConfirmedIndex), buf[0..@intCast(range.len)]) catch unreachable; + range.timestamp = time; + byteIndex.* = range.start; + self.unconfirmedRanges.add(range) catch unreachable; + return @intCast(range.len); + } + + if(self.highestSentIndex == self.nextIndex) return null; + if(self.highestSentIndex +% @as(i32, @intCast(buf.len)) -% self.fullyConfirmedIndex > receiveBufferSize) return null; + // Send new packet: + const len: SequenceIndex = @min(self.nextIndex -% self.highestSentIndex, @as(i32, @intCast(buf.len))); + if(len < buf.len and time -% self.lastUnsentTime < allowedDelay) return null; + + self.buffer.getSliceAtOffset(@intCast(self.highestSentIndex -% self.fullyConfirmedIndex), buf[0..@intCast(len)]) catch unreachable; + byteIndex.* = self.highestSentIndex; + self.unconfirmedRanges.add(.{ + .start = self.highestSentIndex, + .len = len, + .timestamp = time, + .considerForCongestionControl = considerForCongestionControl, + }) catch unreachable; + self.highestSentIndex +%= len; + return @intCast(len); + } + }; + + const Channel = struct { // MARK: Channel + receiveBuffer: ReceiveBuffer, + sendBuffer: SendBuffer, + allowedDelay: i64, + channelId: ChannelId, + + pub fn init(sequenceIndex: SequenceIndex, delay: i64, id: ChannelId) Channel { + return .{ + .receiveBuffer = .init(), + .sendBuffer = .init(sequenceIndex), + .allowedDelay = delay, + .channelId = id, + }; + } + + pub fn deinit(self: *Channel) void { + self.receiveBuffer.deinit(); + self.sendBuffer.deinit(); + } + + pub fn connect(self: *Channel, remoteStart: SequenceIndex) void { + std.debug.assert(self.receiveBuffer.buffer.len == 0); + self.receiveBuffer.availablePosition = remoteStart; + self.receiveBuffer.currentReadPosition = remoteStart; + } + + pub fn receive(self: *Channel, conn: *Connection, start: SequenceIndex, data: []const u8) !ReceiveBuffer.ReceiveStatus { + return self.receiveBuffer.receive(conn, start, data); + } + + pub fn send(self: *Channel, protocolIndex: u8, data: []const u8, time: i64) !void { + return self.sendBuffer.insertMessage(protocolIndex, data, time); + } + + pub fn receiveConfirmationAndGetTimestamp(self: *Channel, start: SequenceIndex) ?SendBuffer.ReceiveConfirmationResult { + return self.sendBuffer.receiveConfirmationAndGetTimestamp(start); + } + + pub fn checkForLosses(self: *Channel, conn: *Connection, time: i64) LossStatus { + const retransmissionTimeout: i64 = @intFromFloat(conn.rttEstimate + 3*conn.rttUncertainty + @as(f32, @floatFromInt(self.allowedDelay))); + return self.sendBuffer.checkForLosses(time, retransmissionTimeout); + } + + pub fn sendNextPacketAndGetSize(self: *Channel, conn: *Connection, time: i64, considerForCongestionControl: bool) ?usize { + var writer = utils.BinaryWriter.initCapacity(main.stackAllocator, conn.mtuEstimate); + defer writer.deinit(); + + writer.writeEnum(ChannelId, self.channelId); + + var byteIndex: SequenceIndex = undefined; + const packetLen = self.sendBuffer.getNextPacketToSend(&byteIndex, writer.data.items.ptr[5..writer.data.capacity], time, considerForCongestionControl, self.allowedDelay) orelse return null; + writer.writeInt(SequenceIndex, byteIndex); + _ = internalHeaderOverhead.fetchAdd(5, .monotonic); + _ = externalHeaderOverhead.fetchAdd(headerOverhead, .monotonic); + writer.data.items.len += packetLen; + + _ = packetsSent.fetchAdd(1, .monotonic); + conn.manager.send(writer.data.items, conn.remoteAddress, null); + return writer.data.items.len; + } + + pub fn getStatistics(self: *Channel, unconfirmed: *usize, queued: *usize) void { + for(self.sendBuffer.unconfirmedRanges.items) |range| { + unconfirmed.* += @intCast(range.len); + } + queued.* = @intCast(self.sendBuffer.nextIndex -% self.sendBuffer.highestSentIndex); + } + }; + + const ChannelId = enum(u8) { // MARK: ChannelId + lossy = 0, + fast = 1, + slow = 2, + confirmation = 3, + init = 4, + keepalive = 5, + disconnect = 6, + }; + + const ConfirmationData = struct { + channel: ChannelId, + start: SequenceIndex, + receiveTimeStamp: i64, + }; + + const ConnectionState = enum(u8) { + awaitingClientConnection, + awaitingServerResponse, + awaitingClientAcknowledgement, + connected, + disconnectDesired, + }; + + // MARK: fields manager: *ConnectionManager, user: ?*main.server.User, @@ -1249,37 +1688,33 @@ pub const Connection = struct { // MARK: Connection bruteforcingPort: bool = false, bruteForcedPortRange: u16 = 0, - streamBuffer: [maxImportantPacketSize]u8 = undefined, - streamPosition: u32 = importantHeaderSize, - messageID: u32 = 0, - packetQueue: main.utils.CircularBufferQueue(UnconfirmedPacket) = undefined, - unconfirmedPackets: main.List(UnconfirmedPacket) = undefined, - receivedPackets: [3]main.List(u32) = undefined, - __lastReceivedPackets: [65536]?[]const u8 = @splat(null), // TODO: Wait for #12215 fix. - lastReceivedPackets: []?[]const u8, // TODO: Wait for #12215 fix. - packetMemory: *[65536][maxImportantPacketSize]u8 = undefined, - lastIndex: u32 = 0, + lossyChannel: Channel, // TODO: Actually allow it to be lossy + fastChannel: Channel, + slowChannel: Channel, - lastIncompletePacket: u32 = 0, + hasRttEstimate: bool = false, + rttEstimate: f32 = 1000*ms, + rttUncertainty: f32 = 0.0, + lastRttSampleTime: i64, + nextPacketTimestamp: i64, + nextConfirmationTimestamp: i64, + queuedConfirmations: main.utils.CircularBufferQueue(ConfirmationData), + mtuEstimate: u16 = minMtu, - lastKeepAliveSent: u32 = 0, - lastKeepAliveReceived: u32 = 0, - otherKeepAliveReceived: u32 = 0, + bandwidthEstimateInBytesPerRtt: f32 = minMtu, + slowStart: bool = true, + relativeSendTime: i64 = 0, + relativeIdleTime: i64 = 0, - congestionControl_bandWidthSentHistory: [congestionControl_historySize]usize = @splat(0), - congestionControl_bandWidthReceivedHistory: [congestionControl_historySize]usize = @splat(0), - congestionControl_bandWidthEstimate: usize = minimumBandWidth, - congestionControl_inversebandWidth: f32 = timeUnit/minimumBandWidth, - congestionControl_lastSendTime: i64, - congestionControl_sendTimeLimit: i64, - congestionControl_bandWidthUsed: usize = 0, - congestionControl_curPosition: usize = 0, - - disconnected: Atomic(bool) = .init(false), + connectionState: Atomic(ConnectionState), handShakeState: Atomic(u8) = .init(Protocols.handShake.stepStart), handShakeWaiting: std.Thread.Condition = std.Thread.Condition{}, lastConnection: i64, + // To distinguish different connections from the same computer to avoid multiple reconnects + connectionIdentifier: i64, + remoteConnectionIdentifier: i64, + mutex: std.Thread.Mutex = .{}, pub fn init(manager: *ConnectionManager, ipPort: []const u8, user: ?*main.server.User) !*Connection { @@ -1289,25 +1724,26 @@ pub const Connection = struct { // MARK: Connection .manager = manager, .user = user, .remoteAddress = undefined, - .lastConnection = @truncate(std.time.nanoTimestamp()), - .lastReceivedPackets = &result.__lastReceivedPackets, // TODO: Wait for #12215 fix. - .packetMemory = main.globalAllocator.create([65536][maxImportantPacketSize]u8), - .congestionControl_lastSendTime = @truncate(std.time.nanoTimestamp()), - .congestionControl_sendTimeLimit = @as(i64, @truncate(std.time.nanoTimestamp())) +% timeUnit*21/20, - }; - errdefer main.globalAllocator.free(result.packetMemory); - result.unconfirmedPackets = .init(main.globalAllocator); - errdefer result.unconfirmedPackets.deinit(); - result.packetQueue = .init(main.globalAllocator, 1024); - errdefer result.packetQueue.deinit(); - result.receivedPackets = [3]main.List(u32){ - .init(main.globalAllocator), - .init(main.globalAllocator), - .init(main.globalAllocator), - }; - errdefer for(&result.receivedPackets) |*list| { - list.deinit(); + .connectionState = .init(if(user != null) .awaitingClientConnection else .awaitingServerResponse), + .lastConnection = networkTimestamp(), + .nextPacketTimestamp = networkTimestamp(), + .nextConfirmationTimestamp = networkTimestamp(), + .lastRttSampleTime = networkTimestamp() -% 10_000*ms, + .queuedConfirmations = .init(main.globalAllocator, 1024), + .lossyChannel = .init(main.random.nextInt(SequenceIndex, &main.seed), 1*ms, .lossy), + .fastChannel = .init(main.random.nextInt(SequenceIndex, &main.seed), 10*ms, .fast), + .slowChannel = .init(main.random.nextInt(SequenceIndex, &main.seed), 100*ms, .slow), + .connectionIdentifier = networkTimestamp(), + .remoteConnectionIdentifier = 0, }; + errdefer { + result.lossyChannel.deinit(); + result.fastChannel.deinit(); + result.slowChannel.deinit(); + result.queuedConfirmations.deinit(); + } + if(result.connectionIdentifier == 0) result.connectionIdentifier = 1; + var splitter = std.mem.splitScalar(u8, ipPort, ':'); const ip = splitter.first(); result.remoteAddress.ip = try Socket.resolveIP(ip); @@ -1326,379 +1762,122 @@ pub const Connection = struct { // MARK: Connection return result; } - fn reinitialize(self: *Connection) void { - main.utils.assertLocked(&self.mutex); - self.streamPosition = importantHeaderSize; - self.messageID = 0; - while(self.packetQueue.dequeue()) |packet| { - main.globalAllocator.free(packet.data); - } - for(self.unconfirmedPackets.items) |packet| { - main.globalAllocator.free(packet.data); - } - self.unconfirmedPackets.clearRetainingCapacity(); - self.receivedPackets[0].clearRetainingCapacity(); - self.receivedPackets[1].clearRetainingCapacity(); - self.receivedPackets[2].clearRetainingCapacity(); - self.lastIndex = 0; - self.lastIncompletePacket = 0; - self.handShakeState = .init(Protocols.handShake.stepStart); - } - pub fn deinit(self: *Connection) void { self.disconnect(); self.manager.finishCurrentReceive(); // Wait until all currently received packets are done. - for(self.unconfirmedPackets.items) |packet| { - main.globalAllocator.free(packet.data); - } - self.unconfirmedPackets.deinit(); - while(self.packetQueue.dequeue()) |packet| { - main.globalAllocator.free(packet.data); - } - self.packetQueue.deinit(); - self.receivedPackets[0].deinit(); - self.receivedPackets[1].deinit(); - self.receivedPackets[2].deinit(); - main.globalAllocator.destroy(self.packetMemory); + self.lossyChannel.deinit(); + self.fastChannel.deinit(); + self.slowChannel.deinit(); + self.queuedConfirmations.deinit(); main.globalAllocator.destroy(self); } - fn trySendingPacket(self: *Connection, data: []const u8) bool { - std.debug.assert(data[0] == Protocols.important); - const curTime: i64 = @truncate(std.time.nanoTimestamp()); - if(curTime -% self.congestionControl_lastSendTime > 0) { - self.congestionControl_lastSendTime = curTime; - } - const shouldSend = self.congestionControl_bandWidthUsed < self.congestionControl_bandWidthEstimate and self.congestionControl_lastSendTime -% self.congestionControl_sendTimeLimit < 0; - if(shouldSend) { - _ = packetsSent.fetchAdd(1, .monotonic); - self.manager.send(data, self.remoteAddress, self.congestionControl_lastSendTime); - const packetSize = data.len + headerOverhead; - self.congestionControl_lastSendTime +%= @intFromFloat(@as(f32, @floatFromInt(packetSize))*self.congestionControl_inversebandWidth); - self.congestionControl_bandWidthUsed += packetSize; - } - return shouldSend; - } + pub fn send(self: *Connection, comptime channel: ChannelId, protocolIndex: u8, data: []const u8) void { + _ = Protocols.bytesSent[protocolIndex].fetchAdd(data.len, .monotonic); + self.mutex.lock(); + defer self.mutex.unlock(); - fn flush(self: *Connection) void { - main.utils.assertLocked(&self.mutex); - if(self.streamPosition == importantHeaderSize) return; // Don't send empty packets. - // Fill the header: - self.streamBuffer[0] = Protocols.important; - const id = self.messageID; - self.messageID += 1; - std.mem.writeInt(u32, self.streamBuffer[1..5], id, .big); - - const packet = UnconfirmedPacket{ - .data = main.globalAllocator.dupe(u8, self.streamBuffer[0..self.streamPosition]), - .lastKeepAliveSentBefore = self.lastKeepAliveSent, - .id = id, + _ = switch(channel) { + .lossy => self.lossyChannel.send(protocolIndex, data, networkTimestamp()), + .fast => self.fastChannel.send(protocolIndex, data, networkTimestamp()), + .slow => self.slowChannel.send(protocolIndex, data, networkTimestamp()), + else => comptime unreachable, + } catch { + std.log.err("Cannot send any more packets. Disconnecting", .{}); + self.disconnect(); }; - if(self.trySendingPacket(packet.data)) { - self.unconfirmedPackets.append(packet); - } else { - self.packetQueue.enqueue(packet); - } - self.streamPosition = importantHeaderSize; - } - - fn writeByteToStream(self: *Connection, data: u8) void { - self.streamBuffer[self.streamPosition] = data; - self.streamPosition += 1; - if(self.streamPosition == self.streamBuffer.len) { - self.flush(); - } - } - - pub fn sendImportant(self: *Connection, id: u8, data: []const u8) void { - self.mutex.lock(); - defer self.mutex.unlock(); - - if(self.disconnected.load(.unordered)) return; - self.writeByteToStream(id); - var processedLength = data.len; - while(processedLength > 0x7f) { - self.writeByteToStream(@as(u8, @intCast(processedLength & 0x7f)) | 0x80); - processedLength >>= 7; - } - self.writeByteToStream(@intCast(processedLength & 0x7f)); - - var remaining: []const u8 = data; - while(remaining.len != 0) { - const copyableSize = @min(remaining.len, self.streamBuffer.len - self.streamPosition); - @memcpy(self.streamBuffer[self.streamPosition..][0..copyableSize], remaining[0..copyableSize]); - remaining = remaining[copyableSize..]; - self.streamPosition += @intCast(copyableSize); - if(self.streamPosition == self.streamBuffer.len) { - self.flush(); - } - } - } - - pub fn sendUnimportant(self: *Connection, id: u8, data: []const u8) void { - self.mutex.lock(); - defer self.mutex.unlock(); - - if(self.disconnected.load(.unordered)) return; - std.debug.assert(data.len + 1 < maxPacketSize); - const fullData = main.stackAllocator.alloc(u8, data.len + 1); - defer main.stackAllocator.free(fullData); - fullData[0] = id; - @memcpy(fullData[1..], data); - self.manager.send(fullData, self.remoteAddress, null); - } - - fn receiveKeepAlive(self: *Connection, data: []const u8) !void { - std.debug.assert(self.manager.threadId == std.Thread.getCurrentId()); - if(data.len == 0) return; // This is sent when brute forcing the port. - self.mutex.lock(); - defer self.mutex.unlock(); - - var reader = utils.BinaryReader.init(data); - - self.otherKeepAliveReceived = try reader.readInt(u32); - self.lastKeepAliveReceived = try reader.readInt(u32); - - while(reader.remaining.len > 0) { - const start = try reader.readInt(u32); - const len = try reader.readInt(u32); - - var j: usize = 0; - while(j < self.unconfirmedPackets.items.len) { - const diff = self.unconfirmedPackets.items[j].id -% start; - if(diff < len) { - const index = self.unconfirmedPackets.items[j].lastKeepAliveSentBefore & congestionControl_historyMask; - self.congestionControl_bandWidthReceivedHistory[index] += self.unconfirmedPackets.items[j].data.len + headerOverhead; - main.globalAllocator.free(self.unconfirmedPackets.items[j].data); - _ = self.unconfirmedPackets.swapRemove(j); - } else { - j += 1; - } - } - } - } - - fn sendKeepAlive(self: *Connection) void { - std.debug.assert(self.manager.threadId == std.Thread.getCurrentId()); - self.mutex.lock(); - defer self.mutex.unlock(); - - var runLengthEncodingStarts = main.List(u32).init(main.stackAllocator); - defer runLengthEncodingStarts.deinit(); - var runLengthEncodingLengths = main.List(u32).init(main.stackAllocator); - defer runLengthEncodingLengths.deinit(); - - for(self.receivedPackets) |list| { - for(list.items) |packetID| { - var leftRegion: ?u32 = null; - var rightRegion: ?u32 = null; - for(runLengthEncodingStarts.items, runLengthEncodingLengths.items, 0..) |start, length, reg| { - const diff = packetID -% start; - if(diff < length) continue; - if(diff == length) { - leftRegion = @intCast(reg); - } - if(diff == std.math.maxInt(u32)) { - rightRegion = @intCast(reg); - } - } - if(leftRegion) |left| { - if(rightRegion) |right| { - // Needs to combine the regions: - runLengthEncodingLengths.items[left] += runLengthEncodingLengths.items[right] + 1; - _ = runLengthEncodingStarts.swapRemove(right); - _ = runLengthEncodingLengths.swapRemove(right); - } else { - runLengthEncodingLengths.items[left] += 1; - } - } else if(rightRegion) |right| { - runLengthEncodingStarts.items[right] -= 1; - runLengthEncodingLengths.items[right] += 1; - } else { - runLengthEncodingStarts.append(packetID); - runLengthEncodingLengths.append(1); - } - } - } - { // Cycle the receivedPackets lists: - const putBackToFront: main.List(u32) = self.receivedPackets[self.receivedPackets.len - 1]; - var i: u32 = self.receivedPackets.len - 1; - while(i >= 1) : (i -= 1) { - self.receivedPackets[i] = self.receivedPackets[i - 1]; - } - self.receivedPackets[0] = putBackToFront; - self.receivedPackets[0].clearRetainingCapacity(); - } - var writer = utils.BinaryWriter.initCapacity(main.stackAllocator, runLengthEncodingStarts.items.len*8 + 9); - defer writer.deinit(); - - writer.writeInt(u8, Protocols.keepAlive); - writer.writeInt(u32, self.lastKeepAliveSent); - writer.writeInt(u32, self.otherKeepAliveReceived); - - for(runLengthEncodingStarts.items, 0..) |_, i| { - writer.writeInt(u32, runLengthEncodingStarts.items[i]); - writer.writeInt(u32, runLengthEncodingLengths.items[i]); - } - self.manager.send(writer.data.items, self.remoteAddress, null); - - // Congestion control: - self.congestionControl_bandWidthSentHistory[self.lastKeepAliveSent & congestionControl_historyMask] = self.congestionControl_bandWidthUsed; - self.lastKeepAliveSent += 1; - self.congestionControl_bandWidthReceivedHistory[self.lastKeepAliveSent & congestionControl_historyMask] = 0; - //self.congestionControl_bandWidthUsed = 0; - var maxBandWidth: usize = minimumBandWidth; - var dataSentAtMaxBandWidth: usize = minimumBandWidth; - var maxDataSent: usize = 0; - { - var i: usize = self.lastKeepAliveReceived -% 1 & congestionControl_historyMask; - while(i != self.lastKeepAliveReceived -% 1 & congestionControl_historyMask) : (i = i -% 1 & congestionControl_historyMask) { - const dataSent: usize = self.congestionControl_bandWidthSentHistory[i]; - const dataReceived: usize = self.congestionControl_bandWidthReceivedHistory[i]; - if(dataReceived > maxBandWidth) { - maxBandWidth = dataReceived; - dataSentAtMaxBandWidth = dataSent; - } - maxDataSent = @max(maxDataSent, dataSent); - if(dataSent > dataReceived + dataReceived/64) { // Only look into the history until a packet loss occured to react fast to sudden bandwidth reductions. - break; - } - } - } - for(0..congestionControl_historySize) |i| { - if(self.congestionControl_bandWidthReceivedHistory[i] > maxBandWidth) { - maxBandWidth = self.congestionControl_bandWidthReceivedHistory[i]; - dataSentAtMaxBandWidth = self.congestionControl_bandWidthSentHistory[i]; - } - maxDataSent = @max(maxDataSent, self.congestionControl_bandWidthSentHistory[i]); - } - - if(maxBandWidth == dataSentAtMaxBandWidth and maxDataSent < maxBandWidth + maxBandWidth/64) { // Startup phase → Try to ramp up fast - self.congestionControl_bandWidthEstimate = maxBandWidth*2; - } else { - self.congestionControl_bandWidthEstimate = maxBandWidth + maxBandWidth/64; - if(dataSentAtMaxBandWidth < maxBandWidth + maxBandWidth/128) { // Ramp up faster - self.congestionControl_bandWidthEstimate += maxBandWidth/16; - } - } - self.congestionControl_inversebandWidth = timeUnit/@as(f32, @floatFromInt(self.congestionControl_bandWidthEstimate)); - self.congestionControl_bandWidthUsed = 0; - self.congestionControl_sendTimeLimit = @as(i64, @truncate(std.time.nanoTimestamp())) + timeUnit*21/20; - - // Resend packets that didn't receive confirmation within the last 2 keep-alive signals. - for(self.unconfirmedPackets.items) |*packet| { - if(self.lastKeepAliveReceived -% @as(i33, packet.lastKeepAliveSentBefore) >= 2) { - if(self.trySendingPacket(packet.data)) { - _ = packetsResent.fetchAdd(1, .monotonic); - packet.lastKeepAliveSentBefore = self.lastKeepAliveSent; - } else break; - } - } - while(true) { - if(self.packetQueue.peek()) |_packet| { - if(self.trySendingPacket(_packet.data)) { - std.debug.assert(std.meta.eql(self.packetQueue.dequeue(), _packet)); // Remove it from the queue - var packet = _packet; - packet.lastKeepAliveSentBefore = self.lastKeepAliveSent; - self.unconfirmedPackets.append(packet); - } else break; - } else break; - } - self.flush(); - if(self.bruteforcingPort) { - // This is called every 100 ms, so if I send 10 requests it shouldn't be too bad. - for(0..5) |_| { - const data = [1]u8{0}; - if(self.remoteAddress.port +% self.bruteForcedPortRange != 0) { - self.manager.send(&data, Address{.ip = self.remoteAddress.ip, .port = self.remoteAddress.port +% self.bruteForcedPortRange}, null); - } - if(self.remoteAddress.port - self.bruteForcedPortRange != 0) { - self.manager.send(&data, Address{.ip = self.remoteAddress.ip, .port = self.remoteAddress.port -% self.bruteForcedPortRange}, null); - } - self.bruteForcedPortRange +%= 1; - } - } } pub fn isConnected(self: *Connection) bool { self.mutex.lock(); defer self.mutex.unlock(); - return self.otherKeepAliveReceived != 0; + return self.connectionState.load(.unordered) == .connected; } - fn collectPackets(self: *Connection) !void { - std.debug.assert(self.manager.threadId == std.Thread.getCurrentId()); - while(true) { - var id = self.lastIncompletePacket; - var receivedPacket = self.lastReceivedPackets[id & 65535] orelse return; - var newIndex = self.lastIndex; - const protocol = receivedPacket[newIndex]; - newIndex += 1; - if(self.manager.world == null and self.user == null and protocol != Protocols.handShake.id) - return; + fn handlePacketLoss(self: *Connection, loss: LossStatus) void { + if(loss == .noLoss) return; + self.slowStart = false; + if(loss == .doubleLoss) { + self.rttEstimate *= 1.5; + self.bandwidthEstimateInBytesPerRtt /= 2; + self.bandwidthEstimateInBytesPerRtt = @max(self.bandwidthEstimateInBytesPerRtt, minMtu); + } + } - // Determine the next packet length: - var len: u32 = 0; - var shift: u5 = 0; - while(true) { - if(newIndex == receivedPacket.len) { - newIndex = 0; - id += 1; - receivedPacket = self.lastReceivedPackets[id & 65535] orelse return; - } - const nextByte = receivedPacket[newIndex]; - newIndex += 1; - len |= @as(u32, @intCast(nextByte & 0x7f)) << shift; - if(nextByte & 0x80 != 0) { - shift += 7; - } else { - break; - } - } + fn increaseCongestionBandwidth(self: *Connection, packetLen: SequenceIndex) void { + const fullPacketLen: f32 = @floatFromInt(packetLen + headerOverhead); + if(self.slowStart) { + self.bandwidthEstimateInBytesPerRtt += fullPacketLen; + } else { + self.bandwidthEstimateInBytesPerRtt += fullPacketLen/self.bandwidthEstimateInBytesPerRtt*@as(f32, @floatFromInt(self.mtuEstimate)) + fullPacketLen/100.0; + } + } - // Check if there is enough data available to fill the packets needs: - var dataAvailable = receivedPacket.len - newIndex; - var idd = id + 1; - while(dataAvailable < len) : (idd += 1) { - const otherPacket = self.lastReceivedPackets[idd & 65535] orelse return; - dataAvailable += otherPacket.len; - } + fn receiveConfirmationPacket(self: *Connection, reader: *utils.BinaryReader, timestamp: i64) !void { + self.mutex.lock(); + defer self.mutex.unlock(); - // Copy the data to an array: - const data = main.stackAllocator.alloc(u8, len); - defer main.stackAllocator.free(data); - var remaining = data[0..]; - while(remaining.len != 0) { - dataAvailable = @min(self.lastReceivedPackets[id & 65535].?.len - newIndex, remaining.len); - @memcpy(remaining[0..dataAvailable], self.lastReceivedPackets[id & 65535].?[newIndex .. newIndex + dataAvailable]); - newIndex += @intCast(dataAvailable); - remaining = remaining[dataAvailable..]; - if(newIndex == self.lastReceivedPackets[id & 65535].?.len) { - id += 1; - newIndex = 0; - } + var minRtt: f32 = std.math.floatMax(f32); + var maxRtt: f32 = 1000; + var sumRtt: f32 = 0; + var numRtt: f32 = 0; + while(reader.remaining.len != 0) { + const channel = try reader.readEnum(ChannelId); + const timeOffset = 2*@as(i64, try reader.readInt(u16)); + const start = try reader.readInt(SequenceIndex); + const confirmationResult = switch(channel) { + .lossy => self.lossyChannel.receiveConfirmationAndGetTimestamp(start) orelse continue, + .fast => self.fastChannel.receiveConfirmationAndGetTimestamp(start) orelse continue, + .slow => self.slowChannel.receiveConfirmationAndGetTimestamp(start) orelse continue, + else => return error.Invalid, + }; + const rtt: f32 = @floatFromInt(@max(1, timestamp -% confirmationResult.timestamp -% timeOffset)); + numRtt += 1; + sumRtt += rtt; + minRtt = @min(minRtt, rtt); + maxRtt = @max(maxRtt, rtt); + if(confirmationResult.considerForCongestionControl) { + self.increaseCongestionBandwidth(confirmationResult.packetLen); } - while(self.lastIncompletePacket != id) : (self.lastIncompletePacket += 1) { - self.lastReceivedPackets[self.lastIncompletePacket & 65535] = null; - } - self.lastIndex = newIndex; - _ = bytesReceived[protocol].fetchAdd(data.len + 1 + (7 + std.math.log2_int(usize, 1 + data.len))/7, .monotonic); - if(Protocols.list[protocol]) |prot| { - if(Protocols.isAsynchronous[protocol]) { - ProtocolTask.schedule(self, protocol, data); - } else { - var reader = utils.BinaryReader.init(data); - try prot(self, &reader); - } - } else { - std.log.err("Received unknown important protocol with id {}", .{protocol}); + } + if(numRtt > 0) { + // Taken mostly from RFC 6298 with some minor changes + const averageRtt = sumRtt/numRtt; + const largestDifference = @max(maxRtt - averageRtt, averageRtt - minRtt, @abs(maxRtt - self.rttEstimate), @abs(self.rttEstimate - minRtt)); + const timeDifference: f32 = @floatFromInt(timestamp -% self.lastRttSampleTime); + const alpha = 1.0 - std.math.pow(f32, 7.0/8.0, timeDifference/self.rttEstimate); + const beta = 1.0 - std.math.pow(f32, 3.0/4.0, timeDifference/self.rttEstimate); + self.rttEstimate = (1 - alpha)*self.rttEstimate + alpha*averageRtt; + self.rttUncertainty = (1 - beta)*self.rttUncertainty + beta*largestDifference; + self.lastRttSampleTime = timestamp; + if(!self.hasRttEstimate) { // Kill the 1 second delay caused by the first packet + self.nextPacketTimestamp = timestamp; + self.hasRttEstimate = true; } } } + fn sendConfirmationPacket(self: *Connection, timestamp: i64) void { + std.debug.assert(self.manager.threadId == std.Thread.getCurrentId()); + var writer = utils.BinaryWriter.initCapacity(main.stackAllocator, self.mtuEstimate); + defer writer.deinit(); + + writer.writeEnum(ChannelId, .confirmation); + + while(self.queuedConfirmations.dequeue()) |confirmation| { + writer.writeEnum(ChannelId, confirmation.channel); + writer.writeInt(u16, std.math.lossyCast(u16, @divTrunc(timestamp -% confirmation.receiveTimeStamp, 2))); + writer.writeInt(SequenceIndex, confirmation.start); + if(writer.data.capacity - writer.data.items.len < @sizeOf(ChannelId) + @sizeOf(u16) + @sizeOf(SequenceIndex)) break; + } + + _ = internalMessageOverhead.fetchAdd(writer.data.items.len + headerOverhead, .monotonic); + self.manager.send(writer.data.items, self.remoteAddress, null); + } + pub fn receive(self: *Connection, data: []const u8) void { - self.flawedReceive(data) catch |err| { + self.tryReceive(data) catch |err| { std.log.err("Got error while processing received network data: {s}", .{@errorName(err)}); if(@errorReturnTrace()) |trace| { std.log.info("{}", .{trace}); @@ -1707,64 +1886,198 @@ pub const Connection = struct { // MARK: Connection }; } - pub fn flawedReceive(self: *Connection, data: []const u8) !void { + fn tryReceive(self: *Connection, data: []const u8) !void { std.debug.assert(self.manager.threadId == std.Thread.getCurrentId()); - const protocol = data[0]; - if(self.handShakeState.load(.monotonic) != Protocols.handShake.stepComplete and protocol != Protocols.handShake.id and protocol != Protocols.keepAlive and protocol != Protocols.important) { - return; // Reject all non-handshake packets until the handshake is done. - } - self.lastConnection = @truncate(std.time.nanoTimestamp()); - _ = bytesReceived[protocol].fetchAdd(data.len + 20 + 8, .monotonic); // Including IP header and udp header; - _ = packetsReceived[protocol].fetchAdd(1, .monotonic); - if(protocol == Protocols.important) { - const id = std.mem.readInt(u32, data[1..5], .big); - if(self.handShakeState.load(.monotonic) == Protocols.handShake.stepComplete and id == 0) { // Got a new "first" packet from client. So the client tries to reconnect, but we still think it's connected. - if(self.user) |user| { - user.reinitialize(); - self.mutex.lock(); - defer self.mutex.unlock(); - self.reinitialize(); - } else { - std.log.err("Server reconnected?", .{}); - self.disconnect(); + var reader = utils.BinaryReader.init(data); + const channel = try reader.readEnum(ChannelId); + if(channel == .init) { + const remoteConnectionIdentifier = try reader.readInt(i64); + const isAcknowledgement = reader.remaining.len == 0; + if(isAcknowledgement) { + switch(self.connectionState.load(.monotonic)) { + .awaitingClientAcknowledgement => { + if(self.remoteConnectionIdentifier == remoteConnectionIdentifier) { + _ = self.connectionState.cmpxchgStrong(.awaitingClientAcknowledgement, .connected, .monotonic, .monotonic); + } + }, + else => {}, } - } - if(id - @as(i33, self.lastIncompletePacket) >= 65536) { - std.log.warn("Many incomplete packets. Cannot process any more packets for now.", .{}); return; } - self.receivedPackets[0].append(id); - if(id < self.lastIncompletePacket or self.lastReceivedPackets[id & 65535] != null) { - return; // Already received the package in the past. + const lossyStart = try reader.readInt(SequenceIndex); + const fastStart = try reader.readInt(SequenceIndex); + const slowStart = try reader.readInt(SequenceIndex); + switch(self.connectionState.load(.monotonic)) { + .awaitingClientConnection => { + self.lossyChannel.connect(lossyStart); + self.fastChannel.connect(fastStart); + self.slowChannel.connect(slowStart); + _ = self.connectionState.cmpxchgStrong(.awaitingClientConnection, .awaitingClientAcknowledgement, .monotonic, .monotonic); + self.remoteConnectionIdentifier = remoteConnectionIdentifier; + }, + .awaitingServerResponse => { + self.lossyChannel.connect(lossyStart); + self.fastChannel.connect(fastStart); + self.slowChannel.connect(slowStart); + _ = self.connectionState.cmpxchgStrong(.awaitingServerResponse, .connected, .monotonic, .monotonic); + self.remoteConnectionIdentifier = remoteConnectionIdentifier; + }, + .awaitingClientAcknowledgement => {}, + .connected => { + if(self.remoteConnectionIdentifier != remoteConnectionIdentifier) { // Reconnection attempt + if(self.user) |user| { + self.manager.removeConnection(self); + main.server.disconnect(user); + } else { + std.log.err("Server reconnected?", .{}); + self.disconnect(); + } + return; + } + }, + .disconnectDesired => {}, } - const temporaryMemory: []u8 = (&self.packetMemory[id & 65535])[0 .. data.len - importantHeaderSize]; - @memcpy(temporaryMemory, data[importantHeaderSize..]); - self.lastReceivedPackets[id & 65535] = temporaryMemory; - // Check if a message got completed: - try self.collectPackets(); - } else if(protocol == Protocols.keepAlive) { - try self.receiveKeepAlive(data[1..]); - } else { - if(Protocols.list[protocol]) |prot| { - var reader = utils.BinaryReader.init(data[1..]); - try prot(self, &reader); - } else { - std.log.err("Received unknown protocol with id {}", .{protocol}); - return error.Invalid; + // Acknowledge the packet on the client: + if(self.user == null) { + var writer = utils.BinaryWriter.initCapacity(main.stackAllocator, 1 + @sizeOf(i64)); + defer writer.deinit(); + + writer.writeEnum(ChannelId, .init); + writer.writeInt(i64, self.connectionIdentifier); + + _ = internalMessageOverhead.fetchAdd(writer.data.items.len + headerOverhead, .monotonic); + self.manager.send(writer.data.items, self.remoteAddress, null); } + return; + } + if(self.connectionState.load(.monotonic) != .connected) return; // Reject all non-handshake packets until the handshake is done. + switch(channel) { + .lossy => { + const start = try reader.readInt(SequenceIndex); + if(try self.lossyChannel.receive(self, start, reader.remaining) == .accepted) { + self.queuedConfirmations.enqueue(.{ + .channel = channel, + .start = start, + .receiveTimeStamp = networkTimestamp(), + }); + } + }, + .fast => { + const start = try reader.readInt(SequenceIndex); + if(try self.fastChannel.receive(self, start, reader.remaining) == .accepted) { + self.queuedConfirmations.enqueue(.{ + .channel = channel, + .start = start, + .receiveTimeStamp = networkTimestamp(), + }); + } + }, + .slow => { + const start = try reader.readInt(SequenceIndex); + if(try self.slowChannel.receive(self, start, reader.remaining) == .accepted) { + self.queuedConfirmations.enqueue(.{ + .channel = channel, + .start = start, + .receiveTimeStamp = networkTimestamp(), + }); + } + }, + .confirmation => { + try self.receiveConfirmationPacket(&reader, networkTimestamp()); + }, + .init => unreachable, + .keepalive => {}, + .disconnect => { + self.disconnect(); + }, + } + self.lastConnection = networkTimestamp(); + + // TODO: Packet statistics + } + + pub fn processNextPackets(self: *Connection) void { + const timestamp = networkTimestamp(); + + switch(self.connectionState.load(.monotonic)) { + .awaitingClientConnection => { + if(timestamp -% self.nextPacketTimestamp < 0) return; + self.nextPacketTimestamp = timestamp +% 100*ms; + self.manager.send(&.{@intFromEnum(ChannelId.keepalive)}, self.remoteAddress, null); + }, + .awaitingServerResponse, .awaitingClientAcknowledgement => { + // Send the initial packet once every 100 ms. + if(timestamp -% self.nextPacketTimestamp < 0) return; + self.nextPacketTimestamp = timestamp +% 100*ms; + var writer = utils.BinaryWriter.initCapacity(main.stackAllocator, 1 + @sizeOf(i64) + 3*@sizeOf(SequenceIndex)); + defer writer.deinit(); + + writer.writeEnum(ChannelId, .init); + writer.writeInt(i64, self.connectionIdentifier); + writer.writeInt(SequenceIndex, self.lossyChannel.sendBuffer.fullyConfirmedIndex); + writer.writeInt(SequenceIndex, self.fastChannel.sendBuffer.fullyConfirmedIndex); + writer.writeInt(SequenceIndex, self.slowChannel.sendBuffer.fullyConfirmedIndex); + _ = internalMessageOverhead.fetchAdd(writer.data.items.len + headerOverhead, .monotonic); + self.manager.send(writer.data.items, self.remoteAddress, null); + return; + }, + .connected => { + if(timestamp -% self.lastConnection -% settings.connectionTimeout > 0) { + std.log.info("timeout", .{}); + self.disconnect(); + return; + } + }, + .disconnectDesired => return, + } + + self.handlePacketLoss(self.lossyChannel.checkForLosses(self, timestamp)); + self.handlePacketLoss(self.fastChannel.checkForLosses(self, timestamp)); + self.handlePacketLoss(self.slowChannel.checkForLosses(self, timestamp)); + + // We don't want to send too many packets at once if there was a period of no traffic. + if(timestamp -% 10*ms -% self.nextPacketTimestamp > 0) { + self.relativeIdleTime += timestamp -% 10*ms -% self.nextPacketTimestamp; + self.nextPacketTimestamp = timestamp -% 10*ms; + } + + if(self.relativeIdleTime + self.relativeSendTime > @as(i64, @intFromFloat(self.rttEstimate))) { + self.relativeIdleTime >>= 1; + self.relativeSendTime >>= 1; + } + + while(timestamp -% self.nextConfirmationTimestamp > 0 and !self.queuedConfirmations.empty()) { + self.sendConfirmationPacket(timestamp); + } + + while(timestamp -% self.nextPacketTimestamp > 0) { + // Only attempt to increase the congestion bandwidth if we actual use the bandwidth, to prevent unbounded growth + const considerForCongestionControl = @divFloor(self.relativeSendTime, 2) > self.relativeIdleTime; + const dataLen = blk: { + self.mutex.lock(); + defer self.mutex.unlock(); + if(self.lossyChannel.sendNextPacketAndGetSize(self, timestamp, considerForCongestionControl)) |dataLen| break :blk dataLen; + if(self.fastChannel.sendNextPacketAndGetSize(self, timestamp, considerForCongestionControl)) |dataLen| break :blk dataLen; + if(self.slowChannel.sendNextPacketAndGetSize(self, timestamp, considerForCongestionControl)) |dataLen| break :blk dataLen; + + break; + }; + const networkLen: f32 = @floatFromInt(dataLen + headerOverhead); + const packetTime: i64 = @intFromFloat(@max(1, networkLen/self.bandwidthEstimateInBytesPerRtt*self.rttEstimate)); + self.nextPacketTimestamp +%= packetTime; + self.relativeSendTime += packetTime; } } pub fn disconnect(self: *Connection) void { - // Send 3 disconnect packages to the other side, just to be sure. - // If all of them don't get through then there is probably a network issue anyways which would lead to a timeout. - Protocols.disconnect.disconnect(self); - std.time.sleep(10000000); - Protocols.disconnect.disconnect(self); - std.time.sleep(10000000); - Protocols.disconnect.disconnect(self); - self.disconnected.store(true, .unordered); + self.manager.send(&.{@intFromEnum(ChannelId.disconnect)}, self.remoteAddress, null); + self.connectionState.store(.disconnectDesired, .unordered); self.manager.removeConnection(self); + if(self.user) |user| { + main.server.disconnect(user); + } else { + main.exitToMenu(undefined); + } std.log.info("Disconnected", .{}); } }; diff --git a/src/renderer/lighting.zig b/src/renderer/lighting.zig index 3e42153a..c54fbcc5 100644 --- a/src/renderer/lighting.zig +++ b/src/renderer/lighting.zig @@ -254,7 +254,7 @@ pub const ChannelChunk = struct { } fn propagateFromNeighbor(self: *ChannelChunk, lightQueue: *main.utils.CircularBufferQueue(Entry), lights: []const Entry, lightRefreshList: *main.List(*chunk_meshing.ChunkMesh)) void { - std.debug.assert(lightQueue.startIndex == lightQueue.endIndex); + std.debug.assert(lightQueue.empty()); for(lights) |entry| { const index = chunk.getIndex(entry.x, entry.y, entry.z); var result = entry; @@ -265,7 +265,7 @@ pub const ChannelChunk = struct { } fn propagateDestructiveFromNeighbor(self: *ChannelChunk, lightQueue: *main.utils.CircularBufferQueue(Entry), lights: []const Entry, constructiveEntries: *main.ListUnmanaged(ChunkEntries), lightRefreshList: *main.List(*chunk_meshing.ChunkMesh)) main.ListUnmanaged(PositionEntry) { - std.debug.assert(lightQueue.startIndex == lightQueue.endIndex); + std.debug.assert(lightQueue.empty()); for(lights) |entry| { const index = chunk.getIndex(entry.x, entry.y, entry.z); var result = entry; diff --git a/src/server/server.zig b/src/server/server.zig index afb6429a..07f427fc 100644 --- a/src/server/server.zig +++ b/src/server/server.zig @@ -125,13 +125,6 @@ pub const User = struct { // MARK: User return self; } - pub fn reinitialize(self: *User) void { - removePlayer(self); - self.timeDifference = .{}; - main.globalAllocator.free(self.name); - self.name = ""; - } - pub fn deinit(self: *User) void { std.debug.assert(self.refCount.load(.monotonic) == 0); diff --git a/src/settings.zig b/src/settings.zig index 1a3f1286..a83ea283 100644 --- a/src/settings.zig +++ b/src/settings.zig @@ -5,7 +5,7 @@ const ZonElement = @import("zon.zig").ZonElement; const main = @import("main"); pub const defaultPort: u16 = 47649; -pub const connectionTimeout = 60_000_000_000; +pub const connectionTimeout = 60_000_000; pub const entityLookback: i16 = 100; diff --git a/src/utils.zig b/src/utils.zig index 4420b79a..a5315e82 100644 --- a/src/utils.zig +++ b/src/utils.zig @@ -319,13 +319,105 @@ pub fn Array3D(comptime T: type) type { // MARK: Array3D }; } +pub fn FixedSizeCircularBuffer(T: type, capacity: comptime_int) type { // MARK: FixedSizeCircularBuffer + std.debug.assert(capacity - 1 & capacity == 0 and capacity > 0); + const mask = capacity - 1; + return struct { + const Self = @This(); + mem: *[capacity]T = undefined, + startIndex: usize = 0, + len: usize = 0, + + pub fn init(allocator: NeverFailingAllocator) Self { + return .{ + .mem = allocator.create([capacity]T), + }; + } + + pub fn deinit(self: Self, allocator: NeverFailingAllocator) void { + allocator.destroy(self.mem); + } + + pub fn enqueue(self: *Self, elem: T) !void { + if(self.len >= capacity) return error.OutOfMemory; + self.mem[self.startIndex + self.len & mask] = elem; + self.len += 1; + } + + pub fn enqueueSlice(self: *Self, elems: []const T) !void { + if(elems.len + self.len > capacity) { + return error.OutOfMemory; + } + const start = self.startIndex + self.len & mask; + const end = start + elems.len; + if(end < self.mem.len) { + @memcpy(self.mem[start..end], elems); + } else { + const mid = self.mem.len - start; + @memcpy(self.mem[start..], elems[0..mid]); + @memcpy(self.mem[0 .. end & mask], elems[mid..]); + } + self.len += elems.len; + } + + pub fn insertSliceAtOffset(self: *Self, elems: []const T, offset: usize) !void { + if(offset + elems.len > capacity) { + return error.OutOfMemory; + } + self.len = @max(self.len, offset + elems.len); + const start = self.startIndex + offset & mask; + const end = start + elems.len; + if(end < self.mem.len) { + @memcpy(self.mem[start..end], elems); + } else { + const mid = self.mem.len - start; + @memcpy(self.mem[start..], elems[0..mid]); + @memcpy(self.mem[0 .. end & mask], elems[mid..]); + } + } + + pub fn dequeue(self: *Self) ?T { + if(self.len == 0) return null; + const result = self.mem[self.startIndex]; + self.startIndex = (self.startIndex + 1) & mask; + self.len -= 1; + return result; + } + + pub fn dequeueSlice(self: *Self, out: []T) !void { + if(out.len > self.len) return error.OutOfBounds; + const start = self.startIndex; + const end = start + out.len; + if(end < self.mem.len) { + @memcpy(out, self.mem[start..end]); + } else { + const mid = self.mem.len - start; + @memcpy(out[0..mid], self.mem[start..]); + @memcpy(out[mid..], self.mem[0 .. end & mask]); + } + self.startIndex = self.startIndex + out.len & mask; + self.len -= out.len; + } + + pub fn discardElements(self: *Self, n: usize) void { + self.len -= n; + self.startIndex = (self.startIndex + n) & mask; + } + + pub fn getAtOffset(self: Self, i: usize) ?T { + if(i >= self.len) return null; + return self.mem[(self.startIndex + i) & mask]; + } + }; +} + pub fn CircularBufferQueue(comptime T: type) type { // MARK: CircularBufferQueue return struct { const Self = @This(); mem: []T, mask: usize, startIndex: usize, - endIndex: usize, + len: usize, allocator: NeverFailingAllocator, pub fn init(allocator: NeverFailingAllocator, initialCapacity: usize) Self { @@ -335,7 +427,7 @@ pub fn CircularBufferQueue(comptime T: type) type { // MARK: CircularBufferQueue .mem = allocator.alloc(T, initialCapacity), .mask = initialCapacity - 1, .startIndex = 0, - .endIndex = 0, + .len = 0, .allocator = allocator, }; } @@ -344,44 +436,71 @@ pub fn CircularBufferQueue(comptime T: type) type { // MARK: CircularBufferQueue self.allocator.free(self.mem); } + pub fn reset(self: *Self) void { + self.len = 0; + } + fn increaseCapacity(self: *Self) void { const newMem = self.allocator.alloc(T, self.mem.len*2); @memcpy(newMem[0..(self.mem.len - self.startIndex)], self.mem[self.startIndex..]); - @memcpy(newMem[(self.mem.len - self.startIndex)..][0..self.endIndex], self.mem[0..self.endIndex]); + @memcpy(newMem[(self.mem.len - self.startIndex)..][0..self.startIndex], self.mem[0..self.startIndex]); self.startIndex = 0; - self.endIndex = self.mem.len; self.allocator.free(self.mem); self.mem = newMem; self.mask = self.mem.len - 1; } pub fn enqueue(self: *Self, elem: T) void { - self.mem[self.endIndex] = elem; - self.endIndex = (self.endIndex + 1) & self.mask; - if(self.endIndex == self.startIndex) { + if(self.len == self.mem.len) { self.increaseCapacity(); } + self.mem[self.startIndex + self.len & self.mask] = elem; + self.len += 1; + } + + pub fn enqueueSlice(self: *Self, elems: []const T) void { + while(elems.len + self.len > self.mem.len) { + self.increaseCapacity(); + } + const start = self.startIndex + self.len & self.mask; + const end = start + elems.len; + if(end < self.mem.len) { + @memcpy(self.mem[start..end], elems); + } else { + const mid = self.mem.len - start; + @memcpy(self.mem[start..], elems[0..mid]); + @memcpy(self.mem[0 .. end & self.mask], elems[mid..]); + } + self.len += elems.len; } pub fn enqueue_back(self: *Self, elem: T) void { - self.startIndex = (self.startIndex -% 1) & self.mask; - self.mem[self.startIndex] = elem; - if(self.endIndex == self.startIndex) { + if(self.len == self.mem.len) { self.increaseCapacity(); } + self.startIndex = (self.startIndex -% 1) & self.mask; + self.mem[self.startIndex] = elem; + self.len += 1; } pub fn dequeue(self: *Self) ?T { if(self.empty()) return null; const result = self.mem[self.startIndex]; self.startIndex = (self.startIndex + 1) & self.mask; + self.len -= 1; return result; } pub fn dequeue_front(self: *Self) ?T { if(self.empty()) return null; - self.endIndex = (self.endIndex -% 1) & self.mask; - return self.mem[self.endIndex]; + self.len -= 1; + return self.mem[self.startIndex + self.len & self.mask]; + } + + pub fn discard(self: *Self, amount: usize) !void { + if(amount > self.len) return error.OutOfBounds; + self.startIndex = (self.startIndex + amount) & self.mask; + self.len -= amount; } pub fn peek(self: *Self) ?T { @@ -389,12 +508,30 @@ pub fn CircularBufferQueue(comptime T: type) type { // MARK: CircularBufferQueue return self.mem[self.startIndex]; } + pub fn getSliceAtOffset(self: Self, offset: usize, result: []T) !void { + if(offset + result.len > self.len) return error.OutOfBounds; + const start = self.startIndex + offset & self.mask; + const end = start + result.len; + if(end < self.mem.len) { + @memcpy(result, self.mem[start..end]); + } else { + const mid = self.mem.len - start; + @memcpy(result[0..mid], self.mem[start..]); + @memcpy(result[mid..], self.mem[0 .. end & self.mask]); + } + } + + pub fn getAtOffset(self: Self, offset: usize) !T { + if(offset >= self.len) return error.OutOfBounds; + return self.mem[(self.startIndex + offset) & self.mask]; + } + pub fn empty(self: *Self) bool { - return self.startIndex == self.endIndex; + return self.len == 0; } pub fn reachedCapacity(self: *Self) bool { - return self.startIndex == (self.endIndex + 1) & self.mask; + return self.len == self.mem.len; } }; } diff --git a/src/utils/list.zig b/src/utils/list.zig index c1272023..59c7e7bf 100644 --- a/src/utils/list.zig +++ b/src/utils/list.zig @@ -272,7 +272,7 @@ pub fn ListUnmanaged(comptime T: type) type { self.capacity = newAllocation.len; } - fn ensureFreeCapacity(self: *@This(), allocator: NeverFailingAllocator, freeCapacity: usize) void { + pub fn ensureFreeCapacity(self: *@This(), allocator: NeverFailingAllocator, freeCapacity: usize) void { if(freeCapacity + self.items.len <= self.capacity) return; self.ensureCapacity(allocator, growCapacity(self.capacity, freeCapacity + self.items.len)); }