The new network system (#1297)

* Create the foundations of the new network system.

* Fix memory leak

* Implement a simple additive-increase-multiplicative-decrease congestion control scheme.

Also fixed the locking with the send path

* Fix edge case for mismatch in read and available position

* fix formatting

* Reimplemtent network statistics of the F6 menu

* Fix problem the F6 window size

* Rename size to capacity

* Implement the advanced network debug menu

* Fix a few problems

* Actually call the handlePacketLoss function

* Reduce startup delay and fix possible problems with negative RTTs

* Fix crash in block of partially received packets.

* Use a PriorityQueue for unconfirmed packets.

Also fixed a display issue in the advanced network menu

* Always send confirmations and change how packet loss is handled to avoid killing the network connection by only resending packets outside the receive buffer.

* Use the length instead of the endIndex in the CircularBufferQueue

this just made everything more difficult to work with.

* Optimize CircularBufferQueue.enqueueSlice with memcpy

* Fix problem with zero-length packets, which is a valid use case

* Fix a small edge case

* fix formatting

* Add missing code to increase the capacity in enqueueSlice

* Don't send more data than the receive buffer can fit.

* Use a better data structures that handles the received ranges

* Use memcpy in more circular buffer functions

* Send a keepalive if the connection hasn't been established but the player was invited by the server

* Implement disconnecting for the new protocol

* Reimplement timeout

* Only add new connections when the other side actually sent an init packet.

otherwise the connections stay in the queue, and need keepalives.

also moved the code from the invite window to network.zig, where it belongs

* Don't allow queueing more than 2 GiB of data (i32 limit)

* Reimplement and simplify reconnection

* Use member functions for Socket

* React to packets using a scheme similar to TCP

TCP will send a fast retransmit on first loss discovered by DACKs, shortly after starting the RTO timer.
In essence this means that only second loss of the smallest packets is handled by congestion control.

* Increase scaling after congestion control halves bandwidth.

Now it increases it by at least 1%, isntead of only by the MTU

* Use start and len instead of start and end for the RangeBuffer

* Use ListUnmanaged in RangeBuffer

* Rename flawedReceive to tryReceive

* Add a `ms` constant to make it easier to see what units some of the constants use.

Also added a function alias for microTimestamp to make it potentially easier to change to other timestamps in the future.
This commit is contained in:
IntegratedQuantum 2025-04-19 15:05:02 +02:00 committed by GitHub
parent a6ef0dc57e
commit 5f54d48a73
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 1001 additions and 572 deletions

View File

@ -38,16 +38,22 @@ pub fn render() void {
const loss = @as(f64, @floatFromInt(resent))/@as(f64, @floatFromInt(sent))*100;
draw.print("Packet loss: {d:.1}% ({}/{})", .{loss, resent, sent}, 0, y, 8, .left);
y += 8;
draw.print("Internal message overhead: {}kiB", .{network.Connection.internalMessageOverhead.load(.monotonic) >> 10}, 0, y, 8, .left);
y += 8;
draw.print("Internal header overhead: {}kiB", .{network.Connection.internalHeaderOverhead.load(.monotonic) >> 10}, 0, y, 8, .left);
y += 8;
draw.print("External header overhead: {}kiB", .{network.Connection.externalHeaderOverhead.load(.monotonic) >> 10}, 0, y, 8, .left);
y += 8;
inline for(@typeInfo(network.Protocols).@"struct".decls) |decl| {
if(@TypeOf(@field(network.Protocols, decl.name)) == type) {
const id = @field(network.Protocols, decl.name).id;
draw.print("{s}: {}kiB in {} packets", .{decl.name, network.bytesReceived[id].load(.monotonic) >> 10, network.packetsReceived[id].load(.monotonic)}, 0, y, 8, .left);
draw.print("{s}: received {}kiB sent {}kiB", .{decl.name, network.Protocols.bytesReceived[id].load(.monotonic) >> 10, network.Protocols.bytesSent[id].load(.monotonic) >> 10}, 0, y, 8, .left);
y += 8;
}
}
}
if(window.size[1] != y) {
window.size[1] = y;
if(window.contentSize[1] != y) {
window.contentSize[1] = y;
window.updateWindowPosition();
}
}

View File

@ -23,26 +23,19 @@ pub var window = GuiWindow{
.hideIfMouseIsGrabbed = false,
};
fn renderConnectionData(conn: *main.network.Connection, y: *f32) void {
fn renderConnectionData(conn: *main.network.Connection, name: []const u8, y: *f32) void {
conn.mutex.lock();
defer conn.mutex.unlock();
var unconfirmed: usize = 0;
for(conn.unconfirmedPackets.items) |packet| {
unconfirmed += packet.data.len;
}
var waiting: usize = 0;
{
var i = conn.packetQueue.startIndex;
while(i != conn.packetQueue.endIndex) : (i = (i + 1) & conn.packetQueue.mask) {
const packet = conn.packetQueue.mem[i];
waiting += packet.data.len;
}
}
draw.print("Bandwidth: {d:.0} kiB/s", .{1.0e9/conn.congestionControl_inversebandWidth/1024.0}, 0, y.*, 8, .left);
var unconfirmed: [3]usize = @splat(0);
var queued: [3]usize = @splat(0);
conn.lossyChannel.getStatistics(&unconfirmed[0], &queued[0]);
conn.fastChannel.getStatistics(&unconfirmed[1], &queued[1]);
conn.slowChannel.getStatistics(&unconfirmed[2], &queued[2]);
draw.print("{s} | RTT = {d:.1} ms | {d:.0} kiB/RTT", .{name, conn.rttEstimate/1000.0, conn.bandwidthEstimateInBytesPerRtt/1024.0}, 0, y.*, 8, .left);
y.* += 8;
draw.print("Waiting in queue: {} kiB", .{waiting >> 10}, 0, y.*, 8, .left);
draw.print("Waiting in queue: {: >6} kiB |{: >6} kiB |{: >6} kiB", .{queued[0] >> 10, queued[1] >> 10, queued[2] >> 10}, 0, y.*, 8, .left);
y.* += 8;
draw.print("Sent but not confirmed: {} kiB", .{unconfirmed >> 10}, 0, y.*, 8, .left);
draw.print("Sent but not confirmed:{: >6} kiB |{: >6} kiB |{: >6} kiB", .{unconfirmed[0] >> 10, unconfirmed[1] >> 10, unconfirmed[2] >> 10}, 0, y.*, 8, .left);
y.* += 8;
}
@ -50,21 +43,18 @@ pub fn render() void {
draw.setColor(0xffffffff);
var y: f32 = 0;
if(main.game.world != null) {
draw.print("Client", .{}, 0, y, 8, .left);
y += 8;
renderConnectionData(main.game.world.?.conn, &y);
renderConnectionData(main.game.world.?.conn, "Client", &y);
}
y += 8;
if(main.server.world != null) {
const userList = main.server.getUserListAndIncreaseRefCount(main.stackAllocator);
defer main.server.freeUserListAndDecreaseRefCount(main.stackAllocator, userList);
for(userList) |user| {
draw.print("{s}", .{user.name}, 0, y, 8, .left);
y += 8;
renderConnectionData(user.conn, &y);
renderConnectionData(user.conn, user.name, &y);
}
}
if(window.size[1] != y) {
window.size[1] = y;
if(window.contentSize[1] != y) {
window.contentSize[1] = y;
window.updateWindowPosition();
}
}

View File

@ -59,18 +59,8 @@ fn copyIp(_: usize) void {
main.Window.setClipboardString(ipAddress);
}
fn inviteFromExternal(address: main.network.Address) void {
const ip = std.fmt.allocPrint(main.stackAllocator.allocator, "{}", .{address}) catch unreachable;
defer main.stackAllocator.free(ip);
const user = main.server.User.initAndIncreaseRefCount(main.server.connectionManager, ip) catch |err| {
std.log.err("Cannot connect user from external IP {}: {s}", .{address, @errorName(err)});
return;
};
user.decreaseRefCount();
}
fn makePublic(public: bool) void {
main.server.connectionManager.newConnectionCallback.store(if(public) &inviteFromExternal else null, .monotonic);
main.server.connectionManager.allowNewConnections.store(public, .monotonic);
}
pub fn onOpen() void {
@ -84,7 +74,7 @@ pub fn onOpen() void {
list.add(ipAddressEntry);
list.add(Button.initText(.{0, 0}, 100, "Invite", .{.callback = &invite}));
list.add(Button.initText(.{0, 0}, 100, "Manage Players", gui.openWindowCallback("manage_players")));
list.add(CheckBox.init(.{0, 0}, width, "Allow anyone to join (requires a publicly visible IP address+port which may need some configuration in your router)", main.server.connectionManager.newConnectionCallback.load(.monotonic) != null, &makePublic));
list.add(CheckBox.init(.{0, 0}, width, "Allow anyone to join (requires a publicly visible IP address+port which may need some configuration in your router)", main.server.connectionManager.allowNewConnections.load(.monotonic), &makePublic));
list.finish(.center);
window.rootComponent = list.toComponent();
window.contentSize = window.rootComponent.?.pos() + window.rootComponent.?.size() + @as(Vec2f, @splat(padding));

File diff suppressed because it is too large Load Diff

View File

@ -254,7 +254,7 @@ pub const ChannelChunk = struct {
}
fn propagateFromNeighbor(self: *ChannelChunk, lightQueue: *main.utils.CircularBufferQueue(Entry), lights: []const Entry, lightRefreshList: *main.List(*chunk_meshing.ChunkMesh)) void {
std.debug.assert(lightQueue.startIndex == lightQueue.endIndex);
std.debug.assert(lightQueue.empty());
for(lights) |entry| {
const index = chunk.getIndex(entry.x, entry.y, entry.z);
var result = entry;
@ -265,7 +265,7 @@ pub const ChannelChunk = struct {
}
fn propagateDestructiveFromNeighbor(self: *ChannelChunk, lightQueue: *main.utils.CircularBufferQueue(Entry), lights: []const Entry, constructiveEntries: *main.ListUnmanaged(ChunkEntries), lightRefreshList: *main.List(*chunk_meshing.ChunkMesh)) main.ListUnmanaged(PositionEntry) {
std.debug.assert(lightQueue.startIndex == lightQueue.endIndex);
std.debug.assert(lightQueue.empty());
for(lights) |entry| {
const index = chunk.getIndex(entry.x, entry.y, entry.z);
var result = entry;

View File

@ -125,13 +125,6 @@ pub const User = struct { // MARK: User
return self;
}
pub fn reinitialize(self: *User) void {
removePlayer(self);
self.timeDifference = .{};
main.globalAllocator.free(self.name);
self.name = "";
}
pub fn deinit(self: *User) void {
std.debug.assert(self.refCount.load(.monotonic) == 0);

View File

@ -5,7 +5,7 @@ const ZonElement = @import("zon.zig").ZonElement;
const main = @import("main");
pub const defaultPort: u16 = 47649;
pub const connectionTimeout = 60_000_000_000;
pub const connectionTimeout = 60_000_000;
pub const entityLookback: i16 = 100;

View File

@ -319,13 +319,105 @@ pub fn Array3D(comptime T: type) type { // MARK: Array3D
};
}
pub fn FixedSizeCircularBuffer(T: type, capacity: comptime_int) type { // MARK: FixedSizeCircularBuffer
std.debug.assert(capacity - 1 & capacity == 0 and capacity > 0);
const mask = capacity - 1;
return struct {
const Self = @This();
mem: *[capacity]T = undefined,
startIndex: usize = 0,
len: usize = 0,
pub fn init(allocator: NeverFailingAllocator) Self {
return .{
.mem = allocator.create([capacity]T),
};
}
pub fn deinit(self: Self, allocator: NeverFailingAllocator) void {
allocator.destroy(self.mem);
}
pub fn enqueue(self: *Self, elem: T) !void {
if(self.len >= capacity) return error.OutOfMemory;
self.mem[self.startIndex + self.len & mask] = elem;
self.len += 1;
}
pub fn enqueueSlice(self: *Self, elems: []const T) !void {
if(elems.len + self.len > capacity) {
return error.OutOfMemory;
}
const start = self.startIndex + self.len & mask;
const end = start + elems.len;
if(end < self.mem.len) {
@memcpy(self.mem[start..end], elems);
} else {
const mid = self.mem.len - start;
@memcpy(self.mem[start..], elems[0..mid]);
@memcpy(self.mem[0 .. end & mask], elems[mid..]);
}
self.len += elems.len;
}
pub fn insertSliceAtOffset(self: *Self, elems: []const T, offset: usize) !void {
if(offset + elems.len > capacity) {
return error.OutOfMemory;
}
self.len = @max(self.len, offset + elems.len);
const start = self.startIndex + offset & mask;
const end = start + elems.len;
if(end < self.mem.len) {
@memcpy(self.mem[start..end], elems);
} else {
const mid = self.mem.len - start;
@memcpy(self.mem[start..], elems[0..mid]);
@memcpy(self.mem[0 .. end & mask], elems[mid..]);
}
}
pub fn dequeue(self: *Self) ?T {
if(self.len == 0) return null;
const result = self.mem[self.startIndex];
self.startIndex = (self.startIndex + 1) & mask;
self.len -= 1;
return result;
}
pub fn dequeueSlice(self: *Self, out: []T) !void {
if(out.len > self.len) return error.OutOfBounds;
const start = self.startIndex;
const end = start + out.len;
if(end < self.mem.len) {
@memcpy(out, self.mem[start..end]);
} else {
const mid = self.mem.len - start;
@memcpy(out[0..mid], self.mem[start..]);
@memcpy(out[mid..], self.mem[0 .. end & mask]);
}
self.startIndex = self.startIndex + out.len & mask;
self.len -= out.len;
}
pub fn discardElements(self: *Self, n: usize) void {
self.len -= n;
self.startIndex = (self.startIndex + n) & mask;
}
pub fn getAtOffset(self: Self, i: usize) ?T {
if(i >= self.len) return null;
return self.mem[(self.startIndex + i) & mask];
}
};
}
pub fn CircularBufferQueue(comptime T: type) type { // MARK: CircularBufferQueue
return struct {
const Self = @This();
mem: []T,
mask: usize,
startIndex: usize,
endIndex: usize,
len: usize,
allocator: NeverFailingAllocator,
pub fn init(allocator: NeverFailingAllocator, initialCapacity: usize) Self {
@ -335,7 +427,7 @@ pub fn CircularBufferQueue(comptime T: type) type { // MARK: CircularBufferQueue
.mem = allocator.alloc(T, initialCapacity),
.mask = initialCapacity - 1,
.startIndex = 0,
.endIndex = 0,
.len = 0,
.allocator = allocator,
};
}
@ -344,44 +436,71 @@ pub fn CircularBufferQueue(comptime T: type) type { // MARK: CircularBufferQueue
self.allocator.free(self.mem);
}
pub fn reset(self: *Self) void {
self.len = 0;
}
fn increaseCapacity(self: *Self) void {
const newMem = self.allocator.alloc(T, self.mem.len*2);
@memcpy(newMem[0..(self.mem.len - self.startIndex)], self.mem[self.startIndex..]);
@memcpy(newMem[(self.mem.len - self.startIndex)..][0..self.endIndex], self.mem[0..self.endIndex]);
@memcpy(newMem[(self.mem.len - self.startIndex)..][0..self.startIndex], self.mem[0..self.startIndex]);
self.startIndex = 0;
self.endIndex = self.mem.len;
self.allocator.free(self.mem);
self.mem = newMem;
self.mask = self.mem.len - 1;
}
pub fn enqueue(self: *Self, elem: T) void {
self.mem[self.endIndex] = elem;
self.endIndex = (self.endIndex + 1) & self.mask;
if(self.endIndex == self.startIndex) {
if(self.len == self.mem.len) {
self.increaseCapacity();
}
self.mem[self.startIndex + self.len & self.mask] = elem;
self.len += 1;
}
pub fn enqueueSlice(self: *Self, elems: []const T) void {
while(elems.len + self.len > self.mem.len) {
self.increaseCapacity();
}
const start = self.startIndex + self.len & self.mask;
const end = start + elems.len;
if(end < self.mem.len) {
@memcpy(self.mem[start..end], elems);
} else {
const mid = self.mem.len - start;
@memcpy(self.mem[start..], elems[0..mid]);
@memcpy(self.mem[0 .. end & self.mask], elems[mid..]);
}
self.len += elems.len;
}
pub fn enqueue_back(self: *Self, elem: T) void {
self.startIndex = (self.startIndex -% 1) & self.mask;
self.mem[self.startIndex] = elem;
if(self.endIndex == self.startIndex) {
if(self.len == self.mem.len) {
self.increaseCapacity();
}
self.startIndex = (self.startIndex -% 1) & self.mask;
self.mem[self.startIndex] = elem;
self.len += 1;
}
pub fn dequeue(self: *Self) ?T {
if(self.empty()) return null;
const result = self.mem[self.startIndex];
self.startIndex = (self.startIndex + 1) & self.mask;
self.len -= 1;
return result;
}
pub fn dequeue_front(self: *Self) ?T {
if(self.empty()) return null;
self.endIndex = (self.endIndex -% 1) & self.mask;
return self.mem[self.endIndex];
self.len -= 1;
return self.mem[self.startIndex + self.len & self.mask];
}
pub fn discard(self: *Self, amount: usize) !void {
if(amount > self.len) return error.OutOfBounds;
self.startIndex = (self.startIndex + amount) & self.mask;
self.len -= amount;
}
pub fn peek(self: *Self) ?T {
@ -389,12 +508,30 @@ pub fn CircularBufferQueue(comptime T: type) type { // MARK: CircularBufferQueue
return self.mem[self.startIndex];
}
pub fn getSliceAtOffset(self: Self, offset: usize, result: []T) !void {
if(offset + result.len > self.len) return error.OutOfBounds;
const start = self.startIndex + offset & self.mask;
const end = start + result.len;
if(end < self.mem.len) {
@memcpy(result, self.mem[start..end]);
} else {
const mid = self.mem.len - start;
@memcpy(result[0..mid], self.mem[start..]);
@memcpy(result[mid..], self.mem[0 .. end & self.mask]);
}
}
pub fn getAtOffset(self: Self, offset: usize) !T {
if(offset >= self.len) return error.OutOfBounds;
return self.mem[(self.startIndex + offset) & self.mask];
}
pub fn empty(self: *Self) bool {
return self.startIndex == self.endIndex;
return self.len == 0;
}
pub fn reachedCapacity(self: *Self) bool {
return self.startIndex == (self.endIndex + 1) & self.mask;
return self.len == self.mem.len;
}
};
}

View File

@ -272,7 +272,7 @@ pub fn ListUnmanaged(comptime T: type) type {
self.capacity = newAllocation.len;
}
fn ensureFreeCapacity(self: *@This(), allocator: NeverFailingAllocator, freeCapacity: usize) void {
pub fn ensureFreeCapacity(self: *@This(), allocator: NeverFailingAllocator, freeCapacity: usize) void {
if(freeCapacity + self.items.len <= self.capacity) return;
self.ensureCapacity(allocator, growCapacity(self.capacity, freeCapacity + self.items.len));
}