Refactor sendKeepAlive and receiveKeepAlive (#1265)

This commit is contained in:
Krzysztof Wiśniewski 2025-04-02 21:18:17 +02:00 committed by GitHub
parent 92fcd10069
commit b0af58b21b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1450,19 +1450,21 @@ pub const Connection = struct { // MARK: Connection
self.manager.send(fullData, self.remoteAddress, null);
}
fn receiveKeepAlive(self: *Connection, data: []const u8) void {
fn receiveKeepAlive(self: *Connection, data: []const u8) !void {
std.debug.assert(self.manager.threadId == std.Thread.getCurrentId());
if(data.len == 0) return; // This is sent when brute forcing the port.
self.mutex.lock();
defer self.mutex.unlock();
self.otherKeepAliveReceived = std.mem.readInt(u32, data[0..4], .big);
self.lastKeepAliveReceived = std.mem.readInt(u32, data[4..8], .big);
var remaining: []const u8 = data[8..];
while(remaining.len >= 8) {
const start = std.mem.readInt(u32, remaining[0..4], .big);
const len = std.mem.readInt(u32, remaining[4..8], .big);
remaining = remaining[8..];
var reader = utils.BinaryReader.init(data, .big);
self.otherKeepAliveReceived = try reader.readInt(u32);
self.lastKeepAliveReceived = try reader.readInt(u32);
while(reader.remaining.len > 0) {
const start = try reader.readInt(u32);
const len = try reader.readInt(u32);
var j: usize = 0;
while(j < self.unconfirmedPackets.items.len) {
const diff = self.unconfirmedPackets.items[j].id -% start;
@ -1529,18 +1531,18 @@ pub const Connection = struct { // MARK: Connection
self.receivedPackets[0] = putBackToFront;
self.receivedPackets[0].clearRetainingCapacity();
}
const output = main.stackAllocator.alloc(u8, runLengthEncodingStarts.items.len*8 + 9);
defer main.stackAllocator.free(output);
output[0] = Protocols.keepAlive;
std.mem.writeInt(u32, output[1..5], self.lastKeepAliveSent, .big);
std.mem.writeInt(u32, output[5..9], self.otherKeepAliveReceived, .big);
var remaining: []u8 = output[9..];
var writer = utils.BinaryWriter.initCapacity(main.stackAllocator, .big, runLengthEncodingStarts.items.len*8 + 9);
defer writer.deinit();
writer.writeInt(u8, Protocols.keepAlive);
writer.writeInt(u32, self.lastKeepAliveSent);
writer.writeInt(u32, self.otherKeepAliveReceived);
for(runLengthEncodingStarts.items, 0..) |_, i| {
std.mem.writeInt(u32, remaining[0..4], runLengthEncodingStarts.items[i], .big);
std.mem.writeInt(u32, remaining[4..8], runLengthEncodingLengths.items[i], .big);
remaining = remaining[8..];
writer.writeInt(u32, runLengthEncodingStarts.items[i]);
writer.writeInt(u32, runLengthEncodingLengths.items[i]);
}
self.manager.send(output, self.remoteAddress, null);
self.manager.send(writer.data.items, self.remoteAddress, null);
// Congestion control:
self.congestionControl_bandWidthSentHistory[self.lastKeepAliveSent & congestionControl_historyMask] = self.congestionControl_bandWidthUsed;
@ -1743,7 +1745,7 @@ pub const Connection = struct { // MARK: Connection
// Check if a message got completed:
try self.collectPackets();
} else if(protocol == Protocols.keepAlive) {
self.receiveKeepAlive(data[1..]);
try self.receiveKeepAlive(data[1..]);
} else {
if(Protocols.list[protocol]) |prot| {
var reader = utils.BinaryReader.init(data[1..], networkEndian);